Merge pull request #677 from orbitdb/fix/createInstance

Using identity's keystore as default first
This commit is contained in:
Mark Robert Henderson
2019-09-24 11:06:40 -04:00
committed by GitHub
10 changed files with 799 additions and 1025 deletions

1661
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -30,7 +30,7 @@
"orbit-db-keystore": "next",
"orbit-db-kvstore": "next",
"orbit-db-pubsub": "~0.5.5",
"orbit-db-storage-adapter": "^0.5.0",
"orbit-db-storage-adapter": "^0.5.3",
"orbit-db-store": "next"
},
"devDependencies": {

View File

@@ -1,6 +1,5 @@
'use strict'
const fs = require('./fs-shim')
const path = require('path')
const EventStore = require('orbit-db-eventstore')
const FeedStore = require('orbit-db-feedstore')
@@ -45,16 +44,21 @@ class OrbitDB {
? new options.broker(this._ipfs) // eslint-disable-line
: new Pubsub(this._ipfs, this.id)
this.directory = options.directory || './orbitdb'
this.keystore = options.keystore
this.caches = { 'default': options.cache }
this.storage = options.storage
this.stores = {}
this._directConnections = {}
this.caches = {}
this.caches[this.directory] = { cache: options.cache, handlers: new Set() }
this.keystore = options.keystore
this.stores = {}
// AccessControllers module can be passed in to enable
// testing with orbit-db-access-controller
AccessControllers = options.AccessControllers || AccessControllers
}
get cache () { return this.caches[this.directory].cache }
static async createInstance (ipfs, options = {}) {
if (!isDefined(ipfs)) { throw new Error('IPFS is a required argument. See https://github.com/orbitdb/orbit-db/blob/master/API.md#createinstance') }
@@ -65,16 +69,14 @@ class OrbitDB {
if (!options.storage) {
let storageOptions = {}
if (fs && fs.mkdirSync) {
storageOptions.preCreate = async (directory) => {
fs.mkdirSync(directory, { recursive: true })
}
}
// Create default `level` store
options.storage = Storage(null, storageOptions)
}
if (options.identity && options.identity.provider.keystore) {
options.keystore = options.identity.provider.keystore
}
if (!options.keystore) {
const keystorePath = path.join(options.directory, id, '/keystore')
let keyStorage = await options.storage.createStore(keystorePath)
@@ -137,14 +139,9 @@ class OrbitDB {
}
async disconnect () {
// close Keystore
// close keystore
await this.keystore.close()
// close Cache
await Promise.all(Object.values(this.caches).map((cache) => {
return cache.close()
}))
// Close all open databases
const databases = Object.values(this.stores)
for (let db of databases) {
@@ -152,6 +149,12 @@ class OrbitDB {
delete this.stores[db.address.toString()]
}
const caches = Object.keys(this.caches)
for (let directory of caches) {
await this.caches[directory].cache.close()
delete this.caches[directory]
}
// Close a direct connection and remove it from internal state
const removeDirectConnect = e => {
this._directConnections[e].close()
@@ -175,6 +178,11 @@ class OrbitDB {
await this.disconnect()
}
async _createCache (path) {
let cacheStorage = await this.storage.createStore(path)
return new Cache(cacheStorage)
}
/* Private methods */
async _createStore (type, address, options) {
// Get the type -> class mapping
@@ -189,14 +197,16 @@ class OrbitDB {
const opts = Object.assign({ replicate: true }, options, {
accessController: accessController,
keystore: this.keystore,
cache: options.cache,
onClose: this._onClose.bind(this)
onClose: this._onClose.bind(this),
onDrop: this._onDrop.bind(this),
onLoad: this._onLoad.bind(this)
})
const identity = options.identity || this.identity
const store = new Store(this._ipfs, identity, address, opts)
store.events.on('write', this._onWrite.bind(this))
// ID of the store is the address as a string
const addr = address.toString()
this.stores[addr] = store
@@ -252,7 +262,8 @@ class OrbitDB {
}
// Callback when database was closed
async _onClose (address) {
async _onClose (db) {
const address = db.address.toString()
logger.debug(`Close ${address}`)
// Unsubscribe from pubsub
@@ -260,9 +271,32 @@ class OrbitDB {
await this._pubsub.unsubscribe(address)
}
const store = this.stores[address]
const dir = store && store.options.directory ? store.options.directory : this.directory
const cache = this.caches[dir]
if (cache && cache.handlers.has(address)) {
cache.handlers.delete(address)
if (!cache.handlers.size) await cache.cache.close()
}
delete this.stores[address]
}
async _onDrop (db) {
const address = db.address.toString()
const dir = db && db.options.directory ? db.options.directory : this.directory
await this._requestCache(address, dir, db._cache)
this.stores[address] = db
}
async _onLoad (db) {
const address = db.address.toString()
const dir = db && db.options.directory ? db.options.directory : this.directory
await this._requestCache(address, dir, db._cache)
this.stores[address] = db
}
async _determineAddress (name, type, options = {}) {
if (!OrbitDB.isValidType(type)) { throw new Error(`Invalid database type '${type}'`) }
@@ -295,11 +329,7 @@ class OrbitDB {
// Create the database address
const dbAddress = await this._determineAddress(name, type, options)
options.cache = this.caches[options.directory || 'default']
if (!options.cache) {
const cacheStorage = await this.storage.createStore(options.directory)
this.caches[options.directory] = options.cache = new Cache(cacheStorage)
}
options.cache = await this._requestCache(dbAddress.toString(), options.directory)
// Check if we have the database locally
const haveDB = await this._haveLocalData(options.cache, dbAddress)
@@ -322,6 +352,21 @@ class OrbitDB {
return this._determineAddress(name, type, opts)
}
async _requestCache (address, directory, existingCache) {
const dir = directory || this.directory
if (!this.caches[dir]) {
const newCache = existingCache || await this._createCache(dir)
this.caches[dir] = { cache: newCache, handlers: new Set() }
}
this.caches[dir].handlers.add(address)
const cache = this.caches[dir].cache
// "Wake up" the caches if they need it
if (cache) await cache.open()
return cache
}
/*
options = {
localOnly: false // if set to true, throws an error if database can't be found locally
@@ -353,7 +398,7 @@ class OrbitDB {
// Parse the database address
const dbAddress = OrbitDBAddress.parse(address)
if (!options.cache) options.cache = this.caches['default']
options.cache = await this._requestCache(dbAddress.toString(), options.directory)
// Check if we have the database
const haveDB = await this._haveLocalData(options.cache, dbAddress)

View File

@@ -1,3 +1,4 @@
/* eslint-disable */
const fs = (typeof window === 'object' || typeof self === 'object') ? null
: eval('require("fs")')

View File

@@ -8,7 +8,7 @@ const logger = Logger.create('orbit-db')
Logger.setLogLevel('ERROR')
async function migrate (OrbitDB, options, dbAddress) {
let oldCache = OrbitDB.caches[options.directory]
let oldCache = OrbitDB.caches[options.directory] ? OrbitDB.caches[options.directory].cache : null
let oldStore
if (!oldCache) {

View File

@@ -151,7 +151,6 @@ Object.keys(testAPIs).forEach(API => {
await db.load()
assert.equal((await db.get('key')), undefined)
await db.close()
await db.drop()
await fs.copy(migrationFixturePath, migrationDataPath)
@@ -176,14 +175,12 @@ Object.keys(testAPIs).forEach(API => {
describe('Access Controller', function() {
before(async () => {
if (db) {
await db.close()
await db.drop()
}
})
afterEach(async () => {
if (db) {
await db.close()
await db.drop()
}
})
@@ -340,5 +337,57 @@ Object.keys(testAPIs).forEach(API => {
assert.equal(res[1].payload.value, 'hello2')
})
})
describe("Close", function() {
before(async () => {
if (orbitdb) await orbitdb.stop()
orbitdb = await OrbitDB.createInstance(ipfs, { directory: dbPath })
})
it('closes a custom store', async () => {
const directory = path.join(dbPath, "custom-store")
db = await orbitdb.open('xyz', { create: true, type: 'feed', directory })
await db.close()
assert.strictEqual(db._cache._store._db.status, 'closed')
})
it("close load close sets status to 'closed'", async () => {
const directory = path.join(dbPath, "custom-store")
db = await orbitdb.open('xyz', { create: true, type: 'feed', directory })
await db.close()
await db.load()
await db.close()
assert.strictEqual(db._cache._store._db.status, 'closed')
})
it('successfully manages multiple caches', async() => {
// Cleaning up cruft from other tests
const directory = path.join(dbPath, "custom-store")
const directory2 = path.join(dbPath, "custom-store2")
const db1 = await orbitdb.open('xyz1', { create: true, type: 'feed', })
const db2 = await orbitdb.open('xyz2', { create: true, type: 'feed', directory })
const db3 = await orbitdb.open('xyz3', { create: true, type: 'feed', directory })
const db4 = await orbitdb.open('xyz4', { create: true, type: 'feed', directory: directory2 })
const db5 = await orbitdb.open('xyz5', { create: true, type: 'feed', })
await db1.close()
await db2.close()
await db4.close()
assert.strictEqual(orbitdb.cache._store._db.status, 'open')
assert.strictEqual(db2._cache._store._db.status, 'open')
assert.strictEqual(db3._cache._store._db.status, 'open')
assert.strictEqual(db4._cache._store._db.status, 'closed')
await db3.close()
await db5.close()
assert.strictEqual(orbitdb.cache._store._db.status, 'closed')
assert.strictEqual(db2._cache._store._db.status, 'closed')
assert.strictEqual(db3._cache._store._db.status, 'closed')
assert.strictEqual(db4._cache._store._db.status, 'closed')
assert.strictEqual(db5._cache._store._db.status, 'closed')
})
})
})
})

View File

@@ -52,11 +52,13 @@ Object.keys(testAPIs).forEach(API => {
it('removes local database cache', async () => {
await db.drop()
await db._cache.open()
assert.equal(await db._cache.get(db.localHeadsPath), undefined)
assert.equal(await db._cache.get(db.remoteHeadsPath), undefined)
assert.equal(await db._cache.get(db.snapshotPath), undefined)
assert.equal(await db._cache.get(db.queuePath), undefined)
assert.equal(await db._cache.get(db.manifestPath), undefined)
await db._cache.close()
})
})
})

View File

@@ -211,9 +211,10 @@ Object.keys(testAPIs).forEach(API => {
assert.equal(minClock, 1)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
const minProgressClock = Math.min(...replicateProgressEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time))
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateProgressEvents[0].entry.clock.time, 1)
assert.equal(minProgressClock, 1)
assert.equal(replicateProgressEvents[0].replicationInfo.max >= 1, true)
assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1)

View File

@@ -89,6 +89,7 @@ Object.keys(testAPIs).forEach(API => {
})
it('has correct replication info after loading from snapshot', async () => {
await db._cache._store.open();
await db.saveSnapshot()
await db.close()
await db.loadFromSnapshot()

View File

@@ -44,6 +44,8 @@ class CustomTestKeystore {
getPublic (key) {
return key.public.marshal()
}
close () {}
}
class CustomIdProvider extends IdentityProvider {