From f67564198206b1d9f16eeac9df31f4e9581c25b0 Mon Sep 17 00:00:00 2001 From: haad Date: Wed, 22 Nov 2023 12:43:53 +0200 Subject: [PATCH] Refactor joinEntry implementation --- benchmarks/orbitdb-replicate.js | 3 +- src/oplog/entry.js | 14 +- src/oplog/heads.js | 7 + src/oplog/log.js | 109 ++++--- test/oplog/join.test.js | 537 ++++++++++++++++++++++++++++++- test/orbitdb-replication.test.js | 14 +- 6 files changed, 626 insertions(+), 58 deletions(-) diff --git a/benchmarks/orbitdb-replicate.js b/benchmarks/orbitdb-replicate.js index e5fc3d2..3b641e9 100644 --- a/benchmarks/orbitdb-replicate.js +++ b/benchmarks/orbitdb-replicate.js @@ -65,6 +65,8 @@ const ipfsConfig = { const db2 = await orbitdb2.open(db1.address) + const startTime2 = new Date().getTime() + let connected = false const onJoin = async (peerId) => (connected = true) @@ -74,7 +76,6 @@ const ipfsConfig = { await waitFor(() => connected, () => true) console.log(`Iterate ${entryCount} events to replicate them`) - const startTime2 = new Date().getTime() const all = [] for await (const { value } of db2.iterator()) { diff --git a/src/oplog/entry.js b/src/oplog/entry.js index 8b5cf54..650c1d7 100644 --- a/src/oplog/entry.js +++ b/src/oplog/entry.js @@ -79,7 +79,7 @@ const create = async (identity, id, payload, clock = null, next = [], refs = []) entry.identity = identity.hash entry.sig = signature - return _encodeEntry(entry) + return encode(entry) } /** @@ -148,10 +148,17 @@ const isEqual = (a, b) => { */ const decode = async (bytes) => { const { value } = await Block.decode({ bytes, codec, hasher }) - return _encodeEntry(value) + return encode(value) } -const _encodeEntry = async (entry) => { +/** + * Encodes an Entry and adds bytes field to it + * @param {Entry} entry + * @return {module:Log~Entry} + * @memberof module:Log~Entry + * @private + */ +const encode = async (entry) => { const { cid, bytes } = await Block.encode({ value: entry, codec, hasher }) const hash = cid.toString(hashStringEncoding) const clock = Clock(entry.clock.id, entry.clock.time) @@ -167,6 +174,7 @@ export default { create, verify, decode, + encode, isEntry, isEqual } diff --git a/src/oplog/heads.js b/src/oplog/heads.js index e5e0c56..cf34b7c 100644 --- a/src/oplog/heads.js +++ b/src/oplog/heads.js @@ -35,6 +35,12 @@ const Heads = async ({ storage, heads }) => { return newHeads } + const remove = async (hash) => { + const currentHeads = await all() + const newHeads = currentHeads.filter(e => e.hash !== hash) + await set(newHeads) + } + const iterator = async function * () { const it = storage.iterator() for await (const [, bytes] of it) { @@ -66,6 +72,7 @@ const Heads = async ({ storage, heads }) => { put, set, add, + remove, iterator, all, clear, diff --git a/src/oplog/log.js b/src/oplog/log.js index 506e414..74c27a0 100644 --- a/src/oplog/log.js +++ b/src/oplog/log.js @@ -128,7 +128,6 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora const bytes = await _entries.get(hash) if (bytes) { const entry = await Entry.decode(bytes) - await _index.put(hash, true) return entry } } @@ -206,13 +205,13 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora if (!isLog(log)) { throw new Error('Given argument is not an instance of Log') } + if (_entries.merge) { + await _entries.merge(log.storage) + } const heads = await log.heads() for (const entry of heads) { await joinEntry(entry) } - if (_entries.merge) { - await _entries.merge(log.storage) - } } /** @@ -222,54 +221,86 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora * * @example * - * await log.join(entry) + * await log.joinEntry(entry) * * @memberof module:Log~Log * @instance */ const joinEntry = async (entry) => { - const { hash } = entry - // Check if the entry is already in the log and return early if it is - const isAlreadyInTheLog = await has(hash) + /* 1. Check if the entry is already in the log and return early if it is */ + const isAlreadyInTheLog = await has(entry.hash) if (isAlreadyInTheLog) { return false - } else { - // Check that the entry is not an entry that hasn't been indexed - const it = traverse(await heads(), (e) => e.next.includes(hash) || entry.next.includes(e.hash)) - for await (const e of it) { - if (e.next.includes(hash)) { - await _index.put(hash, true) - return false - } + } + + const verifyEntry = async (entry) => { + // Check that the Entry belongs to this Log + if (entry.id !== id) { + throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`) + } + // Verify if entry is allowed to be added to the log + const canAppend = await access.canAppend(entry) + if (!canAppend) { + throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`) + } + // Verify signature for the entry + const isValid = await Entry.verify(identity, entry) + if (!isValid) { + throw new Error(`Could not validate signature for entry "${entry.hash}"`) } } - // Check that the Entry belongs to this Log - if (entry.id !== id) { - throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`) - } - // Verify if entry is allowed to be added to the log - const canAppend = await access.canAppend(entry) - if (!canAppend) { - throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`) - } - // Verify signature for the entry - const isValid = await Entry.verify(identity, entry) - if (!isValid) { - throw new Error(`Could not validate signature for entry "${hash}"`) + + /* 2. Verify the entry */ + await verifyEntry(entry) + + /* 3. Find missing entries and connections (=path in the DAG) to the current heads */ + const headsHashes = (await heads()).map(e => e.hash) + const hashesToAdd = new Set([entry.hash]) + const hashesToGet = new Set([...entry.next, ...entry.refs]) + const connectedHeads = new Set() + + const traverseAndVerify = async () => { + const getEntries = Array.from(hashesToGet.values()).filter(has).map(get) + const entries = await Promise.all(getEntries) + + for (const e of entries) { + hashesToGet.delete(e.hash) + + await verifyEntry(e) + + hashesToAdd.add(e.hash) + + for (const hash of [...e.next, ...e.refs]) { + const isInTheLog = await has(hash) + + if (!isInTheLog && !hashesToAdd.has(hash)) { + hashesToGet.add(hash) + } else if (headsHashes.includes(hash)) { + connectedHeads.add(hash) + } + } + } + + if (hashesToGet.size > 0) { + await traverseAndVerify() + } } - // Add the new entry to heads (union with current heads) - const newHeads = await _heads.add(entry) + await traverseAndVerify() - if (!newHeads) { - return false + /* 4. Add missing entries to the index (=to the log) */ + for (const hash of hashesToAdd.values()) { + await _index.put(hash, true) } - // Add the new entry to the entry storage - await _entries.put(hash, entry.bytes) - // Add the new entry to the entry index - await _index.put(hash, true) - // We've added the entry to the log + /* 5. Remove heads which new entries are connect to */ + for (const hash of connectedHeads.values()) { + await _heads.remove(hash) + } + + /* 6. Add the new entry to heads (=union with current heads) */ + await _heads.add(entry) + return true } @@ -330,7 +361,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora // Add the next and refs fields from the fetched entries to the next round toFetch = nexts - .filter(e => e != null) + .filter(e => e !== null && e !== undefined) .reduce((res, acc) => Array.from(new Set([...res, ...acc.next, ...(useRefs ? acc.refs : [])])), []) .filter(notIndexed) // Add the fetched entries to the stack to be processed diff --git a/test/oplog/join.test.js b/test/oplog/join.test.js index 0c927e5..7c313c9 100644 --- a/test/oplog/join.test.js +++ b/test/oplog/join.test.js @@ -1,8 +1,9 @@ import { strictEqual, notStrictEqual, deepStrictEqual } from 'assert' import { rimraf } from 'rimraf' import { copy } from 'fs-extra' -import { Log, Identities, KeyStore } from '../../src/index.js' +import { Log, Entry, Identities, KeyStore } from '../../src/index.js' import { Clock } from '../../src/oplog/log.js' +import { MemoryStorage } from '../../src/storage/index.js' import testKeysPath from '../fixtures/test-keys-path.js' const keysPath = './testkeys' @@ -427,19 +428,549 @@ describe('Log - Join', async function () { deepStrictEqual(values.map((e) => e.payload), expectedData) }) - it('has correct heads after joining logs', async () => { + it('doesn\'t add the given entry to the log when the given entry is already in the log', async () => { const e1 = await log1.append('hello1') + const e2 = await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.join(log1) + + const all1 = await log2.all() + + deepStrictEqual(all1.length, 3) + deepStrictEqual(all1[0], e1) + deepStrictEqual(all1[1], e2) + deepStrictEqual(all1[2], e3) + + await log2.joinEntry(e1) + await log2.joinEntry(e2) + await log2.joinEntry(e3) + + const all2 = await log2.all() + + deepStrictEqual(all2.length, 3) + deepStrictEqual(all2[0], e1) + deepStrictEqual(all2[1], e2) + deepStrictEqual(all2[2], e3) + }) + + it('doesn\'t add the given entry to the heads when the given entry is already in the log', async () => { + const e1 = await log1.append('hello1') + const e2 = await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.join(log1) + + const heads1 = await log2.heads() + + deepStrictEqual(heads1, [e3]) + + await log2.joinEntry(e1) + await log2.joinEntry(e2) + await log2.joinEntry(e3) + + const heads2 = await log2.heads() + + strictEqual(heads2.length, 1) + deepStrictEqual(heads2[0].hash, e3.hash) + }) + + it('joinEntry returns false when the given entry is already in the log', async () => { + const e1 = await log1.append('hello1') + const e2 = await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.join(log1) + + const heads1 = await log2.heads() + + deepStrictEqual(heads1, [e3]) + + const r1 = await log2.joinEntry(e1) + const r2 = await log2.joinEntry(e2) + const r3 = await log2.joinEntry(e3) + + deepStrictEqual([r1, r2, r3].every(e => e === false), true) + }) + + it('replaces the heads if the given entry is a new head and has a direct path to the old head', async () => { + await log1.append('hello1') await log1.append('hello2') const e3 = await log1.append('hello3') await log2.join(log1) const heads1 = await log2.heads() + deepStrictEqual(heads1, [e3]) - await log2.joinEntry(e1) + await log1.append('hello4') + await log1.append('hello5') + const e6 = await log1.append('hello6') + + await log2.storage.merge(log1.storage) + + await log2.joinEntry(e6) const heads2 = await log2.heads() + const all = await log2.all() + + strictEqual(heads2.length, 1) + deepStrictEqual(heads2[0].hash, e6.hash) + strictEqual(all.length, 6) + deepStrictEqual(all.map(e => e.payload), ['hello1', 'hello2', 'hello3', 'hello4', 'hello5', 'hello6']) + }) + + it('replaces a head when given entry is a new head and there are multiple current heads', async () => { + await log1.append('hello1') + await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.append('helloA') + await log2.append('helloB') + const eC = await log2.append('helloC') + + await log3.join(log1) + await log3.join(log2) + + const heads1 = await log3.heads() + + deepStrictEqual(heads1, [eC, e3]) + + await log1.append('hello4') + await log1.append('hello5') + const e6 = await log1.append('hello6') + + await log1.storage.merge(log3.storage) + await log3.storage.merge(log1.storage) + await log2.storage.merge(log1.storage) + await log2.storage.merge(log3.storage) + + await log3.joinEntry(e6) + + const heads2 = await log3.heads() + + strictEqual(heads2.length, 2) + deepStrictEqual(heads2[0].hash, e6.hash) + deepStrictEqual(heads2[1].hash, eC.hash) + }) + + it('replaces both heads when given entries are new heads and there are two current heads', async () => { + await log1.append('hello1') + await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.append('helloA') + await log2.append('helloB') + const eC = await log2.append('helloC') + + await log3.join(log1) + await log3.join(log2) + + const heads1 = await log3.heads() + + deepStrictEqual(heads1, [eC, e3]) + + await log1.append('hello4') + await log1.append('hello5') + const e6 = await log1.append('hello6') + + await log2.append('helloD') + await log2.append('helloE') + const eF = await log2.append('helloF') + + await log3.storage.merge(log1.storage) + await log3.storage.merge(log2.storage) + + await log3.joinEntry(e6) + await log3.joinEntry(eF) + + const heads2 = await log3.heads() + + strictEqual(heads2.length, 2) + deepStrictEqual(heads2[0].hash, eF.hash) + deepStrictEqual(heads2[1].hash, e6.hash) + }) + + it('adds the given entry to the heads when forked logs have multiple heads', async () => { + await log1.append('hello1') + await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.join(log1) + + const heads1 = await log1.heads() + const heads2 = await log2.heads() + + deepStrictEqual(heads1, [e3]) deepStrictEqual(heads2, [e3]) + + await log2.append('helloX') + const eY = await log2.append('helloY') + + await log1.append('hello4') + await log1.append('hello5') + const e6 = await log1.append('hello6') + + await log2.storage.merge(log1.storage) + + await log2.joinEntry(e6) + + const heads3 = await log2.heads() + + strictEqual(heads3.length, 2) + deepStrictEqual(heads3[0].hash, e6.hash) + deepStrictEqual(heads3[1].hash, eY.hash) + }) + + it('replaces one head but not the other when forked logs have multiple heads', async () => { + await log1.append('hello1') + await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.join(log1) + + const heads1 = await log1.heads() + const heads2 = await log2.heads() + + deepStrictEqual(heads1, [e3]) + deepStrictEqual(heads2, [e3]) + + await log2.append('helloX') + const eY = await log2.append('helloY') + + await log1.append('hello4') + await log1.append('hello5') + const e6 = await log1.append('hello6') + + await log2.storage.merge(log1.storage) + + await log2.joinEntry(e6) + + const heads3 = await log2.heads() + + strictEqual(heads3.length, 2) + deepStrictEqual(heads3[0].hash, e6.hash) + deepStrictEqual(heads3[1].hash, eY.hash) + + await log1.append('hello7') + const e8 = await log1.append('hello8') + + await log2.storage.merge(log1.storage) + + await log2.joinEntry(e8) + + const heads4 = await log2.heads() + + strictEqual(heads4.length, 2) + deepStrictEqual(heads4[0].hash, e8.hash) + deepStrictEqual(heads4[1].hash, eY.hash) + }) + + it('doesn\'t add the joined entry to the log when previously joined logs have forks and multiple heads', async () => { + const e1 = await log1.append('hello1') + const e2 = await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.join(log1) + + const heads1 = await log1.heads() + const heads2 = await log2.heads() + + deepStrictEqual(heads1, [e3]) + deepStrictEqual(heads2, [e3]) + + await log2.append('helloX') + const eY = await log2.append('helloY') + + const e4 = await log1.append('hello4') + const e5 = await log1.append('hello5') + const e6 = await log1.append('hello6') + + await log2.storage.merge(log1.storage) + + await log2.joinEntry(e6) + + const res5 = await log2.joinEntry(e5) + const res4 = await log2.joinEntry(e4) + const res3 = await log2.joinEntry(e3) + const res2 = await log2.joinEntry(e2) + const res1 = await log2.joinEntry(e1) + + strictEqual(res1, false) + strictEqual(res2, false) + strictEqual(res3, false) + strictEqual(res4, false) + strictEqual(res5, false) + + const heads3 = await log2.heads() + + strictEqual(heads3.length, 2) + deepStrictEqual(heads3[0].hash, e6.hash) + deepStrictEqual(heads3[1].hash, eY.hash) + }) + + it('replaces both heads when forked logs have multiple heads', async () => { + await log1.append('hello1') + await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.append('helloA') + await log2.append('helloB') + const eC = await log2.append('helloC') + + await log2.storage.merge(log1.storage) + + await log2.joinEntry(e3) + + const heads1 = await log2.heads() + + strictEqual(heads1.length, 2) + deepStrictEqual(heads1[0].hash, eC.hash) + deepStrictEqual(heads1[1].hash, e3.hash) + + await log1.append('hello4') + await log1.append('hello5') + const e6 = await log1.append('hello6') + + await log2.append('helloD') + await log2.append('helloE') + const eF = await log2.append('helloF') + + await log2.storage.merge(log1.storage) + + await log2.joinEntry(e6) + + const heads2 = await log2.heads() + + strictEqual(heads2.length, 2) + deepStrictEqual(heads2[0].hash, eF.hash) + deepStrictEqual(heads2[1].hash, e6.hash) + }) + + describe('trying to join an entry with invalid preceeding entries', () => { + it('throws an error if an entry belongs to another log', async () => { + const headsStorage1 = await MemoryStorage() + + const log0 = await Log(testIdentity2, { logId: 'Y' }) + log1 = await Log(testIdentity, { logId: 'X', headsStorage: headsStorage1 }) + log2 = await Log(testIdentity2, { logId: 'X' }) + + const e0 = await log0.append('helloA') + + await log1.storage.merge(log0.storage) + + await headsStorage1.put(e0.hash, e0.bytes) + + await log1.append('hello1') + await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.storage.merge(log1.storage) + + let err + try { + await log2.joinEntry(e3) + } catch (e) { + err = e + } + + notStrictEqual(err, undefined) + strictEqual(err.message, 'Entry\'s id (Y) doesn\'t match the log\'s id (X).') + deepStrictEqual(await log2.all(), []) + deepStrictEqual(await log2.heads(), []) + }) + + it('throws an error if an entry doesn\'t pass access controller #1', async () => { + const canAppend = (entry) => { + if (entry.payload === 'hello1') { + return false + } + return true + } + + log1 = await Log(testIdentity, { logId: 'X' }) + log2 = await Log(testIdentity2, { logId: 'X', access: { canAppend } }) + + await log1.append('hello1') + await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.storage.merge(log1.storage) + + let err + try { + await log2.joinEntry(e3) + } catch (e) { + err = e + } + + notStrictEqual(err, undefined) + strictEqual(err.message, 'Could not append entry:\nKey "zdpuAvqN22Rxwx5EEenq6EyeydVKPKn43MXHzauuicjLEp8jP" is not allowed to write to the log') + deepStrictEqual(await log2.all(), []) + deepStrictEqual(await log2.heads(), []) + }) + + it('throws an error if an entry doesn\'t pass access controller #2', async () => { + const canAppend = (entry) => { + if (entry.payload === 'hello2') { + return false + } + return true + } + + log1 = await Log(testIdentity, { logId: 'X' }) + log2 = await Log(testIdentity2, { logId: 'X' }) + log3 = await Log(testIdentity3, { logId: 'X', access: { canAppend } }) + + await log1.append('hello1') + await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.append('helloA') + await log2.append('helloB') + const eC = await log2.append('helloC') + + await log3.storage.merge(log1.storage) + await log3.storage.merge(log2.storage) + + await log3.joinEntry(eC) + + await log2.storage.merge(log1.storage) + + await log2.joinEntry(e3) + + await log2.append('helloD') + await log2.append('helloE') + const eF = await log2.append('helloF') + + await log3.storage.merge(log1.storage) + await log3.storage.merge(log2.storage) + + let err + try { + await log3.joinEntry(eF) + } catch (e) { + err = e + } + + notStrictEqual(err, undefined) + strictEqual(err.message, 'Could not append entry:\nKey "zdpuAvqN22Rxwx5EEenq6EyeydVKPKn43MXHzauuicjLEp8jP" is not allowed to write to the log') + + deepStrictEqual((await log3.all()).map(e => e.payload), ['helloA', 'helloB', 'helloC']) + deepStrictEqual((await log3.heads()).map(e => e.payload), ['helloC']) + }) + }) + + describe('throws an error if verification of an entry in given entry\'s history fails', async () => { + let e1, e3 + let headsStorage1, headsStorage2 + + before(async () => { + headsStorage1 = await MemoryStorage() + headsStorage2 = await MemoryStorage() + + log1 = await Log(testIdentity, { logId: 'X', entryStorage: headsStorage1 }) + log2 = await Log(testIdentity2, { logId: 'X', entryStorage: headsStorage2 }) + + e1 = await log1.append('hello1') + await log1.append('hello2') + e3 = await log1.append('hello3') + }) + + it('throws an error if an entry doesn\'t have a payload field', async () => { + const e = Object.assign({}, e1) + delete e.payload + + delete e.bytes + delete e.hash + const ee = await Entry.encode(e) + + await headsStorage1.put(e1.hash, ee.bytes) + await log2.storage.merge(headsStorage1) + + let err + try { + await log2.joinEntry(e3) + } catch (e) { + err = e + } + + notStrictEqual(err, undefined) + strictEqual(err.message, 'Invalid Log entry') + deepStrictEqual(await log2.all(), []) + deepStrictEqual(await log2.heads(), []) + }) + + it('throws an error if an entry doesn\'t have a key field', async () => { + const e = Object.assign({}, e1) + delete e.key + + delete e.bytes + delete e.hash + const ee = await Entry.encode(e) + + await headsStorage1.put(e1.hash, ee.bytes) + await log2.storage.merge(headsStorage1) + + let err + try { + await log2.joinEntry(e3) + } catch (e) { + err = e + } + + notStrictEqual(err, undefined) + strictEqual(err.message, 'Entry doesn\'t have a key') + deepStrictEqual(await log2.all(), []) + deepStrictEqual(await log2.heads(), []) + }) + + it('throws an error if an entry doesn\'t have a signature field', async () => { + const e = Object.assign({}, e1) + delete e.sig + + delete e.bytes + delete e.hash + const ee = await Entry.encode(e) + + await headsStorage1.put(e1.hash, ee.bytes) + await log2.storage.merge(headsStorage1) + + let err + try { + await log2.joinEntry(e3) + } catch (e) { + err = e + } + + notStrictEqual(err, undefined) + strictEqual(err.message, 'Entry doesn\'t have a signature') + deepStrictEqual(await log2.all(), []) + deepStrictEqual(await log2.heads(), []) + }) + + it('throws an error if an entry signature doesn\'t verify', async () => { + const e = Object.assign({}, e1) + e.sig = '1234567890' + delete e.bytes + delete e.hash + const ee = await Entry.encode(e) + + await headsStorage1.put(e1.hash, ee.bytes) + await log2.storage.merge(headsStorage1) + + let err + try { + await log2.joinEntry(e3) + } catch (e) { + err = e + } + + notStrictEqual(err, undefined) + strictEqual(err.message, 'Could not validate signature for entry "zdpuAvkAJ8C46cnGdtFpcBratA5MqK7CcjqCJjjmuKuFvZir3"') + deepStrictEqual(await log2.all(), []) + deepStrictEqual(await log2.heads(), []) + }) }) }) diff --git a/test/orbitdb-replication.test.js b/test/orbitdb-replication.test.js index 53dfc30..8cfcadd 100644 --- a/test/orbitdb-replication.test.js +++ b/test/orbitdb-replication.test.js @@ -7,7 +7,7 @@ import connectPeers from './utils/connect-nodes.js' import waitFor from './utils/wait-for.js' describe('Replicating databases', function () { - this.timeout(30000) + this.timeout(10000) let ipfs1, ipfs2 let orbitdb1, orbitdb2 @@ -62,16 +62,7 @@ describe('Replicating databases', function () { let replicated = false const onJoin = async (peerId, heads) => { - const head = (await db2.log.heads())[0] - if (head && head.clock.time === amount) { - replicated = true - } - } - - const onUpdated = (entry) => { - if (entry.clock.time === amount) { - replicated = true - } + replicated = true } const onError = (err) => { @@ -81,7 +72,6 @@ describe('Replicating databases', function () { db2 = await orbitdb2.open(db1.address) db2.events.on('join', onJoin) - db2.events.on('update', onUpdated) db2.events.on('error', onError) db1.events.on('error', onError)