mirror of
https://github.com/orbitdb/orbitdb.git
synced 2025-10-07 22:57:07 +00:00
WIP 4
This commit is contained in:
@@ -2,11 +2,11 @@ import { EventEmitter } from 'events'
|
||||
|
||||
const defaultPointerCount = 16
|
||||
|
||||
const Database = async ({ OpLog, ipfs, identity, databaseId, accessController, storage }) => {
|
||||
const Database = async ({ OpLog, ipfs, identity, databaseId, accessController, storage, headsStorage, pointerCount }) => {
|
||||
const { Log, Entry, IPFSBlockStorage, LevelStorage } = OpLog
|
||||
|
||||
const entryStorage = storage || await IPFSBlockStorage({ ipfs, pin: true })
|
||||
const headsStorage = await LevelStorage({ path: `./${identity.id}/${databaseId}/log/_heads/` })
|
||||
headsStorage = headsStorage || await LevelStorage({ path: `./${identity.id}/${databaseId}/log/_heads/` })
|
||||
// const indexStorage = await LevelStorage({ path: `./${identity.id}/${databaseId}/log/_index/` })
|
||||
|
||||
// const log = await Log(identity, { logId: databaseId, access: accessController, entryStorage, headsStorage, indexStorage })
|
||||
@@ -14,8 +14,10 @@ const Database = async ({ OpLog, ipfs, identity, databaseId, accessController, s
|
||||
|
||||
const events = new EventEmitter()
|
||||
|
||||
pointerCount = pointerCount || defaultPointerCount
|
||||
|
||||
const addOperation = async (op) => {
|
||||
const entry = await log.append(op, { pointerCount: defaultPointerCount })
|
||||
const entry = await log.append(op, { pointerCount })
|
||||
await ipfs.pubsub.publish(databaseId, entry.bytes)
|
||||
events.emit('update', entry)
|
||||
return entry.hash
|
||||
@@ -26,12 +28,13 @@ const Database = async ({ OpLog, ipfs, identity, databaseId, accessController, s
|
||||
const messageIsNotFromMe = (message) => String(peerId) !== String(message.from)
|
||||
const messageHasData = (message) => message.data !== undefined
|
||||
try {
|
||||
if (messageIsNotFromMe(message) && messageHasData(message)) {
|
||||
// if (messageIsNotFromMe(message) && messageHasData(message)) {
|
||||
if (messageHasData(message)) {
|
||||
await sync(message.data)
|
||||
}
|
||||
} catch (e) {
|
||||
events.emit('error', e)
|
||||
console.error(e)
|
||||
events.emit('error', e)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import Database from '../src/database.js'
|
||||
// Test utils
|
||||
import { config, testAPIs, getIpfsPeerId, waitForPeers, startIpfs, stopIpfs } from 'orbit-db-test-utils'
|
||||
import connectPeers from './utils/connect-nodes.js'
|
||||
import waitFor from './utils/wait-for.js'
|
||||
import { identityKeys, signingKeys } from './fixtures/orbit-db-identity-keys.js'
|
||||
|
||||
const { sync: rmrf } = rimraf
|
||||
@@ -25,11 +26,13 @@ Object.keys(testAPIs).forEach((IPFS) => {
|
||||
let peerId1, peerId2
|
||||
let testIdentity1, testIdentity2
|
||||
let db1, db2
|
||||
let accessController
|
||||
|
||||
const databaseId = 'documents-AAA'
|
||||
|
||||
before(async () => {
|
||||
rmrf('./keys_1')
|
||||
rmrf('./keys_2')
|
||||
|
||||
// Start two IPFS instances
|
||||
ipfsd1 = await startIpfs(IPFS, config.daemon1)
|
||||
ipfsd2 = await startIpfs(IPFS, config.daemon2)
|
||||
@@ -57,25 +60,6 @@ Object.keys(testAPIs).forEach((IPFS) => {
|
||||
// Create an identity for each peers
|
||||
testIdentity1 = await createIdentity({ id: 'userA', keystore, signingKeystore })
|
||||
testIdentity2 = await createIdentity({ id: 'userB', keystore, signingKeystore })
|
||||
|
||||
const accessController = {
|
||||
canAppend: (entry) => entry.identity.id === testIdentity1.id
|
||||
}
|
||||
})
|
||||
|
||||
beforeEach(async () => {
|
||||
db1 = await Documents({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
if (db1) {
|
||||
await db1.drop()
|
||||
await db1.close()
|
||||
}
|
||||
if (db2) {
|
||||
await db2.drop()
|
||||
await db2.close()
|
||||
}
|
||||
})
|
||||
|
||||
after(async () => {
|
||||
@@ -102,7 +86,30 @@ Object.keys(testAPIs).forEach((IPFS) => {
|
||||
rmrf('./keys_2')
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
if (db1) {
|
||||
await db1.close()
|
||||
}
|
||||
if (db2) {
|
||||
await db2.close()
|
||||
}
|
||||
})
|
||||
|
||||
describe('using database', () => {
|
||||
beforeEach(async () => {
|
||||
const accessController = {
|
||||
canAppend: (entry) => entry.identity.id === testIdentity1.id
|
||||
}
|
||||
|
||||
db1 = await Documents({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
if (db1) {
|
||||
await db1.drop()
|
||||
}
|
||||
})
|
||||
|
||||
it('gets a document', async () => {
|
||||
const key = 'hello world 1'
|
||||
|
||||
@@ -127,14 +134,14 @@ Object.keys(testAPIs).forEach((IPFS) => {
|
||||
it('throws an error when deleting a non-existent document', async () => {
|
||||
const key = 'i do not exist'
|
||||
let err
|
||||
|
||||
|
||||
try {
|
||||
await db1.del(key)
|
||||
} catch (e) {
|
||||
err = e
|
||||
}
|
||||
|
||||
strictEqual(err.message, `No document with key \'${key}\' in the database`)
|
||||
|
||||
strictEqual(err.message, `No document with key '${key}' in the database`)
|
||||
})
|
||||
|
||||
it('queries for a document', async () => {
|
||||
@@ -151,5 +158,139 @@ Object.keys(testAPIs).forEach((IPFS) => {
|
||||
deepStrictEqual(await db1.query(findFn), [expected])
|
||||
})
|
||||
})
|
||||
|
||||
describe('replicate database', () => {
|
||||
it('returns all entries in the database', async () => {
|
||||
let updateCount = 0
|
||||
|
||||
const accessController = {
|
||||
canAppend: (entry) => entry.identity.id === testIdentity1.id
|
||||
}
|
||||
|
||||
const onUpdate = (entry) => {
|
||||
++updateCount
|
||||
}
|
||||
|
||||
const onError = () => {
|
||||
}
|
||||
|
||||
db1 = await Documents({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
|
||||
db2 = await Documents({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
|
||||
|
||||
db2.events.on('update', onUpdate)
|
||||
db2.events.on('error', onError)
|
||||
|
||||
strictEqual(db1.type, 'documents')
|
||||
strictEqual(db2.type, 'documents')
|
||||
|
||||
await waitForPeers(ipfs1, [peerId2], databaseId)
|
||||
await waitForPeers(ipfs2, [peerId1], databaseId)
|
||||
|
||||
await db1.put({ _id: "init", value: true })
|
||||
await db1.put({ _id: "init", value: false })
|
||||
await db1.put({ _id: "hello", text: "friend" })
|
||||
await db1.del("hello")
|
||||
await db1.put({ _id: "hello", text: "friend2" })
|
||||
await db1.put({ _id: "empty" })
|
||||
await db1.del("empty")
|
||||
await db1.put({ _id: "hello", text: "friend3" })
|
||||
|
||||
await waitFor(() => updateCount, () => 8)
|
||||
|
||||
strictEqual(updateCount, 8)
|
||||
|
||||
const documents2 = []
|
||||
console.time('documents2')
|
||||
for await (const event of db2.iterator()) {
|
||||
documents2.unshift(event)
|
||||
}
|
||||
console.timeEnd('documents2')
|
||||
deepStrictEqual(documents2, [
|
||||
{ _id: "init", value: false },
|
||||
{ _id: "hello", text: "friend3" }
|
||||
])
|
||||
|
||||
const documents1 = []
|
||||
console.time('documents1')
|
||||
for await (const event of db1.iterator()) {
|
||||
documents1.unshift(event)
|
||||
}
|
||||
console.timeEnd('documents1')
|
||||
deepStrictEqual(documents1, [
|
||||
{ _id: "init", value: false },
|
||||
{ _id: "hello", text: "friend3" }
|
||||
])
|
||||
})
|
||||
})
|
||||
|
||||
describe('load database', () => {
|
||||
it('returns all entries in the database', async () => {
|
||||
let updateCount = 0
|
||||
|
||||
const accessController = {
|
||||
canAppend: (entry) => entry.identity.id === testIdentity1.id
|
||||
}
|
||||
|
||||
const onUpdate = (entry) => {
|
||||
++updateCount
|
||||
}
|
||||
|
||||
const onError = () => {
|
||||
}
|
||||
|
||||
db1 = await Documents({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
|
||||
db2 = await Documents({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
|
||||
|
||||
db2.events.on('update', onUpdate)
|
||||
db2.events.on('error', onError)
|
||||
|
||||
strictEqual(db1.type, 'documents')
|
||||
strictEqual(db2.type, 'documents')
|
||||
|
||||
await waitForPeers(ipfs1, [peerId2], databaseId)
|
||||
await waitForPeers(ipfs2, [peerId1], databaseId)
|
||||
|
||||
await db1.put({ _id: "init", value: true })
|
||||
await db1.put({ _id: "init", value: false })
|
||||
await db1.put({ _id: "hello", text: "friend" })
|
||||
await db1.del("hello")
|
||||
await db1.put({ _id: "hello", text: "friend2" })
|
||||
await db1.put({ _id: "empty" })
|
||||
await db1.del("empty")
|
||||
await db1.put({ _id: "hello", text: "friend3" })
|
||||
|
||||
await waitFor(() => updateCount, () => 8)
|
||||
|
||||
strictEqual(updateCount, 8)
|
||||
|
||||
await db1.close()
|
||||
await db2.close()
|
||||
|
||||
db1 = await Documents({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
|
||||
db2 = await Documents({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
|
||||
|
||||
const documents2 = []
|
||||
console.time('documents2')
|
||||
for await (const event of db2.iterator()) {
|
||||
documents2.unshift(event)
|
||||
}
|
||||
console.timeEnd('documents2')
|
||||
deepStrictEqual(documents2, [
|
||||
{ _id: "init", value: false },
|
||||
{ _id: "hello", text: "friend3" }
|
||||
])
|
||||
|
||||
const documents1 = []
|
||||
console.time('documents1')
|
||||
for await (const event of db1.iterator()) {
|
||||
documents1.unshift(event)
|
||||
}
|
||||
console.timeEnd('documents1')
|
||||
deepStrictEqual(documents1, [
|
||||
{ _id: "init", value: false },
|
||||
{ _id: "hello", text: "friend3" }
|
||||
])
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user