feat: Configurable heads and entry storage. (#42)

* feat: Configurable heads and entry storage.

* fix: Linting.

* test: Re-enable all tests.

* test: Custom entry storage.

* test: Check for heads paths.

* fix: Check for path using fs.
This commit is contained in:
Hayden Young 2023-03-14 05:42:05 +08:00 committed by GitHub
parent a40bc8bdcf
commit aabfd4e2bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 227 additions and 103 deletions

View File

@ -7,19 +7,19 @@ import { ComposedStorage, LRUStorage, IPFSBlockStorage, LevelStorage } from './s
const defaultPointerCount = 0 const defaultPointerCount = 0
const defaultCacheSize = 1000 const defaultCacheSize = 1000
const Database = async ({ OpLog, ipfs, identity, address, name, accessController, directory, storage, meta, headsStorage, pointerCount }) => { const Database = async ({ OpLog, ipfs, identity, address, name, accessController, directory, meta, headsStorage, entryStorage, pointerCount }) => {
const { Log, Entry } = OpLog const { Log, Entry } = OpLog
directory = Path.join(directory || './orbitdb', `./${address}/`) directory = Path.join(directory || './orbitdb', `./${address}/`)
meta = meta || {} meta = meta || {}
pointerCount = pointerCount || defaultPointerCount pointerCount = pointerCount || defaultPointerCount
const entryStorage = await ComposedStorage( entryStorage = entryStorage || await ComposedStorage(
await LRUStorage({ size: defaultCacheSize }), await LRUStorage({ size: defaultCacheSize }),
await IPFSBlockStorage({ ipfs, pin: true }) await IPFSBlockStorage({ ipfs, pin: true })
) )
headsStorage = await ComposedStorage( headsStorage = headsStorage || await ComposedStorage(
await LRUStorage({ size: defaultCacheSize }), await LRUStorage({ size: defaultCacheSize }),
await LevelStorage({ path: Path.join(directory, '/log/_heads/') }) await LevelStorage({ path: Path.join(directory, '/log/_heads/') })
) )

View File

@ -7,6 +7,9 @@ import config from './config.js'
import testKeysPath from './fixtures/test-keys-path.js' import testKeysPath from './fixtures/test-keys-path.js'
import connectPeers from './utils/connect-nodes.js' import connectPeers from './utils/connect-nodes.js'
import waitFor from './utils/wait-for.js' import waitFor from './utils/wait-for.js'
import ComposedStorage from '../src/storage/composed.js'
import IPFSBlockStorage from '../src/storage/ipfs-block.js'
import MemoryStorage from '../src/storage/memory.js'
const OpLog = { Log, Entry } const OpLog = { Log, Entry }
const keysPath = './testkeys' const keysPath = './testkeys'
@ -40,9 +43,6 @@ describe('Database - Replication', function () {
identities = await Identities({ keystore }) identities = await Identities({ keystore })
testIdentity1 = await identities.createIdentity({ id: 'userA' }) testIdentity1 = await identities.createIdentity({ id: 'userA' })
testIdentity2 = await identities.createIdentity({ id: 'userB' }) testIdentity2 = await identities.createIdentity({ id: 'userB' })
db1 = await Database({ OpLog, ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1' })
db2 = await Database({ OpLog, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
}) })
afterEach(async () => { afterEach(async () => {
@ -76,121 +76,176 @@ describe('Database - Replication', function () {
await rmrf('./ipfs2') await rmrf('./ipfs2')
}) })
it('replicates databases across two peers', async () => { describe('Replicate across peers', () => {
let connected1 = false beforeEach(async () => {
let connected2 = false db1 = await Database({ OpLog, ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1' })
db2 = await Database({ OpLog, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
const onConnected1 = (peerId, heads) => {
connected1 = true
}
const onConnected2 = (peerId, heads) => {
connected2 = true
}
db1.events.on('join', onConnected1)
db2.events.on('join', onConnected2)
await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' })
await db1.addOperation({ op: 'PUT', key: 2, value: 'record 2 on db 1' })
await db1.addOperation({ op: 'PUT', key: 3, value: 'record 3 on db 1' })
await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' })
await waitFor(() => connected1, () => true)
await waitFor(() => connected2, () => true)
const all1 = []
for await (const item of db1.log.iterator()) {
all1.unshift(item)
}
const all2 = []
for await (const item of db2.log.iterator()) {
all2.unshift(item)
}
deepStrictEqual(all1, all2)
})
it('replicates databases across two peers with delays', async () => {
let connected1 = false
let connected2 = false
const onConnected1 = (peerId, heads) => {
connected1 = true
}
const onConnected2 = (peerId, heads) => {
connected2 = true
}
db1.events.on('join', onConnected1)
db2.events.on('join', onConnected2)
await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' })
await new Promise(resolve => {
setTimeout(() => resolve(), 1000)
}) })
await db1.addOperation({ op: 'PUT', key: 2, value: 'record 2 on db 1' }) it('replicates databases across two peers', async () => {
await db1.addOperation({ op: 'PUT', key: 3, value: 'record 3 on db 1' }) let connected1 = false
let connected2 = false
await new Promise(resolve => { const onConnected1 = (peerId, heads) => {
setTimeout(() => resolve(), 1000) connected1 = true
}
const onConnected2 = (peerId, heads) => {
connected2 = true
}
db1.events.on('join', onConnected1)
db2.events.on('join', onConnected2)
await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' })
await db1.addOperation({ op: 'PUT', key: 2, value: 'record 2 on db 1' })
await db1.addOperation({ op: 'PUT', key: 3, value: 'record 3 on db 1' })
await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' })
await waitFor(() => connected1, () => true)
await waitFor(() => connected2, () => true)
const all1 = []
for await (const item of db1.log.iterator()) {
all1.unshift(item)
}
const all2 = []
for await (const item of db2.log.iterator()) {
all2.unshift(item)
}
deepStrictEqual(all1, all2)
}) })
await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' }) it('replicates databases across two peers with delays', async () => {
let connected1 = false
let connected2 = false
await waitFor(() => connected1, () => true) const onConnected1 = (peerId, heads) => {
await waitFor(() => connected2, () => true) connected1 = true
}
const all1 = [] const onConnected2 = (peerId, heads) => {
for await (const item of db1.log.iterator()) { connected2 = true
all1.unshift(item) }
}
const all2 = [] db1.events.on('join', onConnected1)
for await (const item of db2.log.iterator()) { db2.events.on('join', onConnected2)
all2.unshift(item)
}
deepStrictEqual(all1, all2) await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' })
await new Promise(resolve => {
setTimeout(() => resolve(), 1000)
})
await db1.addOperation({ op: 'PUT', key: 2, value: 'record 2 on db 1' })
await db1.addOperation({ op: 'PUT', key: 3, value: 'record 3 on db 1' })
await new Promise(resolve => {
setTimeout(() => resolve(), 1000)
})
await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' })
await waitFor(() => connected1, () => true)
await waitFor(() => connected2, () => true)
const all1 = []
for await (const item of db1.log.iterator()) {
all1.unshift(item)
}
const all2 = []
for await (const item of db2.log.iterator()) {
all2.unshift(item)
}
deepStrictEqual(all1, all2)
})
it('adds an operation before db2 is instantiated', async () => {
let connected = false
const onConnected = (peerId, heads) => {
connected = true
}
await db2.drop()
await db2.close()
await rmrf('./orbitdb2')
await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' })
db2 = await Database({ OpLog, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
db2.events.on('join', onConnected)
await waitFor(() => connected, () => true)
const all1 = []
for await (const item of db1.log.iterator()) {
all1.unshift(item)
}
const all2 = []
for await (const item of db2.log.iterator()) {
all2.unshift(item)
}
deepStrictEqual(all1, all2)
})
}) })
it('adds an operation before db2 is instantiated', async () => { describe('Options', () => {
let connected = false it('uses given ComposedStorage with MemoryStorage/IPFSBlockStorage for entryStorage', async () => {
const onConnected = (peerId, heads) => { const storage1 = await ComposedStorage(await MemoryStorage(), await IPFSBlockStorage({ ipfs: ipfs1, pin: true }))
connected = true const storage2 = await ComposedStorage(await MemoryStorage(), await IPFSBlockStorage({ ipfs: ipfs2, pin: true }))
} db1 = await Database({ OpLog, ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1', entryStorage: storage1 })
db2 = await Database({ OpLog, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2', entryStorage: storage2 })
await db2.drop() let connected1 = false
await db2.close() let connected2 = false
await rmrf('./orbitdb2') const onConnected1 = (peerId, heads) => {
connected1 = true
}
await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' }) const onConnected2 = (peerId, heads) => {
connected2 = true
}
db2 = await Database({ OpLog, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' }) db1.events.on('join', onConnected1)
db2.events.on('join', onConnected2)
db2.events.on('join', onConnected) await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' })
await db1.addOperation({ op: 'PUT', key: 2, value: 'record 2 on db 1' })
await db1.addOperation({ op: 'PUT', key: 3, value: 'record 3 on db 1' })
await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' })
await waitFor(() => connected, () => true) await waitFor(() => connected1, () => true)
await waitFor(() => connected2, () => true)
const all1 = [] const all1 = []
for await (const item of db1.log.iterator()) { for await (const item of db1.log.iterator()) {
all1.unshift(item) all1.unshift(item)
} }
const all2 = [] const all2 = []
for await (const item of db2.log.iterator()) { for await (const item of db2.log.iterator()) {
all2.unshift(item) all2.unshift(item)
} }
deepStrictEqual(all1, all2) deepStrictEqual(all1, all2)
})
}) })
describe('Events', () => { describe('Events', () => {
beforeEach(async () => {
db1 = await Database({ OpLog, ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1' })
db2 = await Database({ OpLog, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
})
it('emits \'update\' once when one operation is added', async () => { it('emits \'update\' once when one operation is added', async () => {
const expected = 1 const expected = 1
let connected1 = false let connected1 = false

View File

@ -1,8 +1,12 @@
import { strictEqual, deepStrictEqual } from 'assert' import { strictEqual, deepStrictEqual } from 'assert'
import rmrf from 'rimraf' import rmrf from 'rimraf'
import { existsSync } from 'fs'
import { copy } from 'fs-extra' import { copy } from 'fs-extra'
import * as IPFS from 'ipfs' import * as IPFS from 'ipfs'
import Path from 'path'
import { Log, Entry, Database, KeyStore, Identities } from '../src/index.js' import { Log, Entry, Database, KeyStore, Identities } from '../src/index.js'
import LevelStorage from '../src/storage/level.js'
import MemoryStorage from '../src/storage/memory.js'
import config from './config.js' import config from './config.js'
import testKeysPath from './fixtures/test-keys-path.js' import testKeysPath from './fixtures/test-keys-path.js'
@ -49,15 +53,12 @@ describe('Database', function () {
await rmrf('./ipfs1') await rmrf('./ipfs1')
}) })
beforeEach(async () => {
db = await Database({ OpLog, ipfs, identity: testIdentity, address: databaseId, accessController, directory: './orbitdb1' })
})
afterEach(async () => { afterEach(async () => {
await rmrf('./orbitdb1') await rmrf('./orbitdb')
}) })
it('adds an operation', async () => { it('adds an operation', async () => {
db = await Database({ OpLog, ipfs, identity: testIdentity, address: databaseId, accessController, directory: './orbitdb' })
const expected = 'zdpuAqQ9TJpMhPShuT315m2D9LUBkBPy8YX9zatjEynd2suZv' const expected = 'zdpuAqQ9TJpMhPShuT315m2D9LUBkBPy8YX9zatjEynd2suZv'
const op = { op: 'PUT', key: 1, value: 'record 1 on db 1' } const op = { op: 'PUT', key: 1, value: 'record 1 on db 1' }
const actual = await db.addOperation(op) const actual = await db.addOperation(op)
@ -67,7 +68,75 @@ describe('Database', function () {
await db.close() await db.close()
}) })
describe('Options', () => {
it('uses default directory for headsStorage', async () => {
db = await Database({ OpLog, ipfs, identity: testIdentity, address: databaseId, accessController })
const op = { op: 'PUT', key: 1, value: 'record 1 on db 1' }
const hash = await db.addOperation(op)
const headsPath = Path.join('./orbitdb/', `${databaseId}/`, '/log/_heads/')
strictEqual(await existsSync(headsPath), true)
await db.close()
const headsStorage = await LevelStorage({ path: headsPath })
deepStrictEqual((await Entry.decode(await headsStorage.get(hash))).payload, op)
await headsStorage.close()
await rmrf(headsPath)
})
it('uses given directory for headsStorage', async () => {
db = await Database({ OpLog, ipfs, identity: testIdentity, address: databaseId, accessController, directory: './custom-directory' })
const op = { op: 'PUT', key: 1, value: 'record 1 on db 1' }
const hash = await db.addOperation(op)
const headsPath = Path.join('./custom-directory/', `${databaseId}/`, '/log/_heads/')
strictEqual(await existsSync(headsPath), true)
await db.close()
const headsStorage = await LevelStorage({ path: headsPath })
deepStrictEqual((await Entry.decode(await headsStorage.get(hash))).payload, op)
await headsStorage.close()
await rmrf(headsPath)
})
it('uses given MemoryStorage for headsStorage', async () => {
const headsStorage = await MemoryStorage()
db = await Database({ OpLog, ipfs, identity: testIdentity, address: databaseId, accessController, directory: './orbitdb', headsStorage })
const op = { op: 'PUT', key: 1, value: 'record 1 on db 1' }
const hash = await db.addOperation(op)
deepStrictEqual((await Entry.decode(await headsStorage.get(hash))).payload, op)
await db.close()
})
it('uses given MemoryStorage for entryStorage', async () => {
const entryStorage = await MemoryStorage()
db = await Database({ OpLog, ipfs, identity: testIdentity, address: databaseId, accessController, directory: './orbitdb', entryStorage })
const op = { op: 'PUT', key: 1, value: 'record 1 on db 1' }
const hash = await db.addOperation(op)
deepStrictEqual((await Entry.decode(await entryStorage.get(hash))).payload, op)
await db.close()
})
})
describe('Events', () => { describe('Events', () => {
beforeEach(async () => {
db = await Database({ OpLog, ipfs, identity: testIdentity, address: databaseId, accessController, directory: './orbitdb' })
})
it('emits \'close\' when the database is closed', async () => { it('emits \'close\' when the database is closed', async () => {
let closed = false let closed = false
const onClose = () => { const onClose = () => {