Using identity's keystore as default first

package-lock

pointing to branch

more cache management stuff

WIP

Passing tests

Removing static linking

fixing tests and linting

fixing package.json

removing last debugger

removing last debugger

Adding keystore and cache getters

PR comments

Removing extraneous cache management

Package files

Closing caches

using dbAddress as this.caches key

new tests for store management

Working but with slightly different semantics

Rebuild

package-lock

Dependency updates

removeHandler

restoring db.close in replication status test

package.json files

move handler to orbitdb.caches

Test updates

Cache management cleanup

use store.options.directory

requestCache in onLoad and onDrop

add status test

Adding db to this.stores in onLoad and onDrop

Working RC5 before rebase

Updating package-lock

restoring original replicaiton status test

package files

removing keystore getter

more keystore cleanup

typo
This commit is contained in:
Mark Henderson 2019-09-03 18:11:21 -04:00
parent 7902972277
commit f9feb27c09
10 changed files with 799 additions and 1025 deletions

1661
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -30,7 +30,7 @@
"orbit-db-keystore": "next",
"orbit-db-kvstore": "next",
"orbit-db-pubsub": "~0.5.5",
"orbit-db-storage-adapter": "^0.5.0",
"orbit-db-storage-adapter": "^0.5.3",
"orbit-db-store": "next"
},
"devDependencies": {

View File

@ -1,6 +1,5 @@
'use strict'
const fs = require('./fs-shim')
const path = require('path')
const EventStore = require('orbit-db-eventstore')
const FeedStore = require('orbit-db-feedstore')
@ -45,16 +44,21 @@ class OrbitDB {
? new options.broker(this._ipfs) // eslint-disable-line
: new Pubsub(this._ipfs, this.id)
this.directory = options.directory || './orbitdb'
this.keystore = options.keystore
this.caches = { 'default': options.cache }
this.storage = options.storage
this.stores = {}
this._directConnections = {}
this.caches = {}
this.caches[this.directory] = { cache: options.cache, handlers: new Set() }
this.keystore = options.keystore
this.stores = {}
// AccessControllers module can be passed in to enable
// testing with orbit-db-access-controller
AccessControllers = options.AccessControllers || AccessControllers
}
get cache () { return this.caches[this.directory].cache }
static async createInstance (ipfs, options = {}) {
if (!isDefined(ipfs)) { throw new Error('IPFS is a required argument. See https://github.com/orbitdb/orbit-db/blob/master/API.md#createinstance') }
@ -65,16 +69,14 @@ class OrbitDB {
if (!options.storage) {
let storageOptions = {}
if (fs && fs.mkdirSync) {
storageOptions.preCreate = async (directory) => {
fs.mkdirSync(directory, { recursive: true })
}
}
// Create default `level` store
options.storage = Storage(null, storageOptions)
}
if (options.identity && options.identity.provider.keystore) {
options.keystore = options.identity.provider.keystore
}
if (!options.keystore) {
const keystorePath = path.join(options.directory, id, '/keystore')
let keyStorage = await options.storage.createStore(keystorePath)
@ -137,14 +139,9 @@ class OrbitDB {
}
async disconnect () {
// close Keystore
// close keystore
await this.keystore.close()
// close Cache
await Promise.all(Object.values(this.caches).map((cache) => {
return cache.close()
}))
// Close all open databases
const databases = Object.values(this.stores)
for (let db of databases) {
@ -152,6 +149,12 @@ class OrbitDB {
delete this.stores[db.address.toString()]
}
const caches = Object.keys(this.caches)
for (let directory of caches) {
await this.caches[directory].cache.close()
delete this.caches[directory]
}
// Close a direct connection and remove it from internal state
const removeDirectConnect = e => {
this._directConnections[e].close()
@ -175,6 +178,11 @@ class OrbitDB {
await this.disconnect()
}
async _createCache (path) {
let cacheStorage = await this.storage.createStore(path)
return new Cache(cacheStorage)
}
/* Private methods */
async _createStore (type, address, options) {
// Get the type -> class mapping
@ -189,14 +197,16 @@ class OrbitDB {
const opts = Object.assign({ replicate: true }, options, {
accessController: accessController,
keystore: this.keystore,
cache: options.cache,
onClose: this._onClose.bind(this)
onClose: this._onClose.bind(this),
onDrop: this._onDrop.bind(this),
onLoad: this._onLoad.bind(this)
})
const identity = options.identity || this.identity
const store = new Store(this._ipfs, identity, address, opts)
store.events.on('write', this._onWrite.bind(this))
// ID of the store is the address as a string
const addr = address.toString()
this.stores[addr] = store
@ -252,7 +262,8 @@ class OrbitDB {
}
// Callback when database was closed
async _onClose (address) {
async _onClose (db) {
const address = db.address.toString()
logger.debug(`Close ${address}`)
// Unsubscribe from pubsub
@ -260,9 +271,32 @@ class OrbitDB {
await this._pubsub.unsubscribe(address)
}
const store = this.stores[address]
const dir = store && store.options.directory ? store.options.directory : this.directory
const cache = this.caches[dir]
if (cache && cache.handlers.has(address)) {
cache.handlers.delete(address)
if (!cache.handlers.size) await cache.cache.close()
}
delete this.stores[address]
}
async _onDrop (db) {
const address = db.address.toString()
const dir = db && db.options.directory ? db.options.directory : this.directory
await this._requestCache(address, dir, db._cache)
this.stores[address] = db
}
async _onLoad (db) {
const address = db.address.toString()
const dir = db && db.options.directory ? db.options.directory : this.directory
await this._requestCache(address, dir, db._cache)
this.stores[address] = db
}
async _determineAddress (name, type, options = {}) {
if (!OrbitDB.isValidType(type)) { throw new Error(`Invalid database type '${type}'`) }
@ -295,11 +329,7 @@ class OrbitDB {
// Create the database address
const dbAddress = await this._determineAddress(name, type, options)
options.cache = this.caches[options.directory || 'default']
if (!options.cache) {
const cacheStorage = await this.storage.createStore(options.directory)
this.caches[options.directory] = options.cache = new Cache(cacheStorage)
}
options.cache = await this._requestCache(dbAddress.toString(), options.directory)
// Check if we have the database locally
const haveDB = await this._haveLocalData(options.cache, dbAddress)
@ -322,6 +352,21 @@ class OrbitDB {
return this._determineAddress(name, type, opts)
}
async _requestCache (address, directory, existingCache) {
const dir = directory || this.directory
if (!this.caches[dir]) {
const newCache = existingCache || await this._createCache(dir)
this.caches[dir] = { cache: newCache, handlers: new Set() }
}
this.caches[dir].handlers.add(address)
const cache = this.caches[dir].cache
// "Wake up" the caches if they need it
if (cache) await cache.open()
return cache
}
/*
options = {
localOnly: false // if set to true, throws an error if database can't be found locally
@ -353,7 +398,7 @@ class OrbitDB {
// Parse the database address
const dbAddress = OrbitDBAddress.parse(address)
if (!options.cache) options.cache = this.caches['default']
options.cache = await this._requestCache(dbAddress.toString(), options.directory)
// Check if we have the database
const haveDB = await this._haveLocalData(options.cache, dbAddress)

View File

@ -1,3 +1,4 @@
/* eslint-disable */
const fs = (typeof window === 'object' || typeof self === 'object') ? null
: eval('require("fs")')

View File

@ -8,7 +8,7 @@ const logger = Logger.create('orbit-db')
Logger.setLogLevel('ERROR')
async function migrate (OrbitDB, options, dbAddress) {
let oldCache = OrbitDB.caches[options.directory]
let oldCache = OrbitDB.caches[options.directory] ? OrbitDB.caches[options.directory].cache : null
let oldStore
if (!oldCache) {

View File

@ -151,7 +151,6 @@ Object.keys(testAPIs).forEach(API => {
await db.load()
assert.equal((await db.get('key')), undefined)
await db.close()
await db.drop()
await fs.copy(migrationFixturePath, migrationDataPath)
@ -176,14 +175,12 @@ Object.keys(testAPIs).forEach(API => {
describe('Access Controller', function() {
before(async () => {
if (db) {
await db.close()
await db.drop()
}
})
afterEach(async () => {
if (db) {
await db.close()
await db.drop()
}
})
@ -340,5 +337,57 @@ Object.keys(testAPIs).forEach(API => {
assert.equal(res[1].payload.value, 'hello2')
})
})
describe("Close", function() {
before(async () => {
if (orbitdb) await orbitdb.stop()
orbitdb = await OrbitDB.createInstance(ipfs, { directory: dbPath })
})
it('closes a custom store', async () => {
const directory = path.join(dbPath, "custom-store")
db = await orbitdb.open('xyz', { create: true, type: 'feed', directory })
await db.close()
assert.strictEqual(db._cache._store._db.status, 'closed')
})
it("close load close sets status to 'closed'", async () => {
const directory = path.join(dbPath, "custom-store")
db = await orbitdb.open('xyz', { create: true, type: 'feed', directory })
await db.close()
await db.load()
await db.close()
assert.strictEqual(db._cache._store._db.status, 'closed')
})
it('successfully manages multiple caches', async() => {
// Cleaning up cruft from other tests
const directory = path.join(dbPath, "custom-store")
const directory2 = path.join(dbPath, "custom-store2")
const db1 = await orbitdb.open('xyz1', { create: true, type: 'feed', })
const db2 = await orbitdb.open('xyz2', { create: true, type: 'feed', directory })
const db3 = await orbitdb.open('xyz3', { create: true, type: 'feed', directory })
const db4 = await orbitdb.open('xyz4', { create: true, type: 'feed', directory: directory2 })
const db5 = await orbitdb.open('xyz5', { create: true, type: 'feed', })
await db1.close()
await db2.close()
await db4.close()
assert.strictEqual(orbitdb.cache._store._db.status, 'open')
assert.strictEqual(db2._cache._store._db.status, 'open')
assert.strictEqual(db3._cache._store._db.status, 'open')
assert.strictEqual(db4._cache._store._db.status, 'closed')
await db3.close()
await db5.close()
assert.strictEqual(orbitdb.cache._store._db.status, 'closed')
assert.strictEqual(db2._cache._store._db.status, 'closed')
assert.strictEqual(db3._cache._store._db.status, 'closed')
assert.strictEqual(db4._cache._store._db.status, 'closed')
assert.strictEqual(db5._cache._store._db.status, 'closed')
})
})
})
})

View File

@ -52,11 +52,13 @@ Object.keys(testAPIs).forEach(API => {
it('removes local database cache', async () => {
await db.drop()
await db._cache.open()
assert.equal(await db._cache.get(db.localHeadsPath), undefined)
assert.equal(await db._cache.get(db.remoteHeadsPath), undefined)
assert.equal(await db._cache.get(db.snapshotPath), undefined)
assert.equal(await db._cache.get(db.queuePath), undefined)
assert.equal(await db._cache.get(db.manifestPath), undefined)
await db._cache.close()
})
})
})

View File

@ -211,9 +211,10 @@ Object.keys(testAPIs).forEach(API => {
assert.equal(minClock, 1)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
const minProgressClock = Math.min(...replicateProgressEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time))
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateProgressEvents[0].entry.clock.time, 1)
assert.equal(minProgressClock, 1)
assert.equal(replicateProgressEvents[0].replicationInfo.max >= 1, true)
assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1)

View File

@ -89,6 +89,7 @@ Object.keys(testAPIs).forEach(API => {
})
it('has correct replication info after loading from snapshot', async () => {
await db._cache._store.open();
await db.saveSnapshot()
await db.close()
await db.loadFromSnapshot()

View File

@ -44,6 +44,8 @@ class CustomTestKeystore {
getPublic (key) {
return key.public.marshal()
}
close () {}
}
class CustomIdProvider extends IdentityProvider {