diff --git a/src/sync.js b/src/sync.js index 7bb3555..088e8e4 100644 --- a/src/sync.js +++ b/src/sync.js @@ -3,7 +3,45 @@ import PQueue from 'p-queue' import Path from 'path' import { EventEmitter } from 'events' +/** + * @description + * Syncs an append-only, conflict-free replicated data type (CRDT) log between + * multiple peers. + * + * The sync protocol synchronizes heads between multiple peers, both during + * startup and also when new entries are appended to the log. + * + * When Sync is started, peers "dial" each other using libp2p's custom protocol + * handler and initiate the exchange of heads each peer currently has. Once + * initial sync has completed, peers notify one another of updates to heads + * using pubsub "subscribe" with the same log.id topic. A peer with new heads + * can broadcast changes to other peers using pubsub "publish". Peers + * subscribed to the same topic will then be notified and will update their + * heads accordingly. + * + * The sync protocol only guarantees that the message is published; it does not + * guarantee the order in which messages are received or even that the message + * is recieved at all. The sync protocol only guarantees that heads will + * eventually reach consistency between all peers with the same address. + */ + +/** + * Creates a Sync instance for sychronizing logs between multiple peers. + * @param {Object} params One or more parameters for configuring Sync. + * @param {IPFS} params.ipfs An IPFS instance. Used for synchronizing peers. + * @param {Log} params.log The Log instance to sync. + * @param {Object} params.events An event emitter. Defaults to an instance of + * EventEmitter. Events emitted are 'join', 'error' and 'leave'. + * @param {Function} params.onSynced A function that is called after the peer + * has received heads from another peer. + * @param {Boolean} params.start True if sync should start automatically, false + * otherwise. Defaults to true. + * @return {Sync} The Sync protocol instance. + */ const Sync = async ({ ipfs, log, events, onSynced, start }) => { + if (!ipfs) throw new Error('An instance of ipfs is required.') + if (!log) throw new Error('An instance of log is required.') + const address = log.id const headsSyncAddress = Path.join('/orbitdb/heads/', address) @@ -42,7 +80,6 @@ const Sync = async ({ ipfs, log, events, onSynced, start }) => { peers.add(peerId) await pipe(stream, receiveHeads(peerId), sendHeads, stream) } catch (e) { - console.error(e) peers.delete(peerId) events.emit('error', e) } @@ -68,7 +105,6 @@ const Sync = async ({ ipfs, log, events, onSynced, start }) => { if (e.code === 'ERR_UNSUPPORTED_PROTOCOL') { // Skip peer, they don't have this database currently } else { - console.error(e) peers.delete(peerId) events.emit('error', e) } @@ -91,7 +127,6 @@ const Sync = async ({ ipfs, log, events, onSynced, start }) => { await onSynced(message.data) } } catch (e) { - console.error(e) events.emit('error', e) } } diff --git a/test/sync.test.js b/test/sync.test.js index 6bcd316..83c7538 100644 --- a/test/sync.test.js +++ b/test/sync.test.js @@ -8,6 +8,9 @@ import config from './config.js' import connectPeers from './utils/connect-nodes.js' import waitFor from './utils/wait-for.js' import testKeysPath from './fixtures/test-keys-path.js' +import LRUStorage from '../src/storage/lru.js' +import IPFSBlockStorage from '../src/storage/ipfs-block.js' +import ComposedStorage from '../src/storage/composed.js' const keysPath = './testkeys' @@ -68,7 +71,7 @@ describe('Sync protocol', function () { notStrictEqual(sync, undefined) }) - it('has an ad function', async () => { + it('has an add function', async () => { notStrictEqual(sync.add, undefined) strictEqual(typeof sync.add, 'function') }) @@ -93,23 +96,57 @@ describe('Sync protocol', function () { }) }) + describe('Params', () => { + it('throws an error when IPFS is not defined', async () => { + let err + try { + await Sync({}) + } catch (e) { + err = e.toString() + } + strictEqual(err, 'Error: An instance of ipfs is required.') + }) + + it('throws an error when log is not defined', async () => { + let err + try { + await Sync({ ipfs: ipfs1 }) + } catch (e) { + err = e.toString() + } + strictEqual(err, 'Error: An instance of log is required.') + }) + }) + describe('Syncing automatically', () => { let sync1, sync2 + let log1, log2 let joinEventFired = false let syncedEventFired = false let syncedHead let expectedEntry before(async () => { - const log1 = await Log(testIdentity1, { logId: 'synclog1' }) - const log2 = await Log(testIdentity2, { logId: 'synclog1' }) + const entryStorage1 = await ComposedStorage( + await LRUStorage({ size: 1000 }), + await IPFSBlockStorage({ ipfs: ipfs1, pin: true }) + ) + + const entryStorage2 = await ComposedStorage( + await LRUStorage({ size: 1000 }), + await IPFSBlockStorage({ ipfs: ipfs2, pin: true }) + ) + + log1 = await Log(testIdentity1, { logId: 'synclog1', entryStorage: entryStorage1 }) + log2 = await Log(testIdentity2, { logId: 'synclog1', entryStorage: entryStorage2 }) const onSynced = async (bytes) => { syncedHead = await Entry.decode(bytes) + log2.joinEntry(syncedHead) syncedEventFired = true } - const onJoin = async (peerId, heads) => { + const onJoin = (peerId, heads) => { joinEventFired = true } @@ -136,35 +173,58 @@ describe('Sync protocol', function () { deepStrictEqual(syncedHead, expectedEntry) }) - it('updates the set of connected peers', async () => { + it('updates the set of connected peers', () => { strictEqual(sync2.peers.has(String(peerId1)), true) strictEqual(sync1.peers.has(String(peerId2)), true) }) + + it('eventually reaches consistency', async () => { + await sync1.add(await log1.append('hello2')) + await sync1.add(await log1.append('hello3')) + await sync1.add(await log1.append('hello4')) + expectedEntry = await log1.append('hello5') + await sync1.add(expectedEntry) + + await waitFor(() => syncedEventFired, () => true) + + deepStrictEqual(syncedHead, expectedEntry) + + deepStrictEqual(await log1.heads(), await log2.heads()) + + const all1 = [] + for await (const item of log1.iterator()) { + all1.unshift(item) + } + + const all2 = [] + for await (const item of log2.iterator()) { + all2.unshift(item) + } + + deepStrictEqual(all1, all2) + }) }) describe('Starting sync manually', () => { let sync1, sync2 + let log1, log2 let syncedEventFired = false let syncedHead let expectedEntry before(async () => { - const log1 = await Log(testIdentity1, { logId: 'synclog1' }) - const log2 = await Log(testIdentity2, { logId: 'synclog1' }) + log1 = await Log(testIdentity1, { logId: 'synclog1' }) + log2 = await Log(testIdentity2, { logId: 'synclog1' }) const onSynced = async (bytes) => { syncedHead = await Entry.decode(bytes) - syncedEventFired = true + syncedEventFired = expectedEntry.hash === syncedHead.hash } sync1 = await Sync({ ipfs: ipfs1, log: log1 }) sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced, start: false }) - await log1.append('hello1') - await log1.append('hello2') - await log1.append('hello3') - await log1.append('hello4') - expectedEntry = await log1.append('hello5') + expectedEntry = await log1.append('hello1') }) after(async () => { @@ -184,17 +244,17 @@ describe('Sync protocol', function () { strictEqual(syncedEventFired, true) }) - it('syncs the correct head', async () => { + it('syncs the correct head', () => { deepStrictEqual(syncedHead, expectedEntry) }) - it('updates the set of connected peers', async () => { + it('updates the set of connected peers', () => { strictEqual(sync2.peers.has(String(peerId1)), true) strictEqual(sync1.peers.has(String(peerId2)), true) }) }) - describe.skip('Stopping sync', () => { + describe('Stopping sync', () => { let sync1, sync2 let log1, log2 let syncedEventFired = false @@ -209,10 +269,12 @@ describe('Sync protocol', function () { const onSynced = async (bytes) => { syncedHead = await Entry.decode(bytes) - syncedEventFired = true + if (expectedEntry) { + syncedEventFired = expectedEntry.hash === syncedHead.hash + } } - const onLeave = async (peerId) => { + const onLeave = (peerId) => { leaveEventFired = true leavingPeerId = peerId } @@ -220,13 +282,14 @@ describe('Sync protocol', function () { sync1 = await Sync({ ipfs: ipfs1, log: log1 }) sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced }) - sync2.events.on('leave', onLeave) + sync1.events.on('leave', onLeave) - await log1.append('hello1') - await log1.append('hello2') - await log1.append('hello3') - await log1.append('hello4') + await sync1.add(await log1.append('hello1')) + await sync1.add(await log1.append('hello2')) + await sync1.add(await log1.append('hello3')) + await sync1.add(await log1.append('hello4')) expectedEntry = await log1.append('hello5') + await sync1.add(expectedEntry) }) after(async () => { @@ -249,36 +312,36 @@ describe('Sync protocol', function () { }) it('stops syncing', async () => { - await sync1.stop() + await sync2.stop() - await log1.append('hello6') - await log1.append('hello7') - await log1.append('hello8') - await log1.append('hello9') - await log1.append('hello10') + await sync1.add(await log1.append('hello6')) + await sync1.add(await log1.append('hello7')) + await sync1.add(await log1.append('hello8')) + await sync1.add(await log1.append('hello9')) + await sync1.add(await log1.append('hello10')) await waitFor(() => leaveEventFired, () => true) deepStrictEqual(syncedHead, expectedEntry) }) - it('the peerId passed by the \'leave\' event is the expected peer ID', async () => { - strictEqual(String(leavingPeerId), String(peerId1)) + it('the peerId passed by the \'leave\' event is the expected peer ID', () => { + strictEqual(String(leavingPeerId), String(peerId2)) }) - it('updates the set of connected peers', async () => { + it('updates the set of connected peers', () => { strictEqual(sync2.peers.has(String(leavingPeerId)), false) strictEqual(sync1.peers.has(String(peerId2)), false) }) }) - describe.skip('Restarting sync after stopping it manually', () => { + describe('Restarting sync after stopping it manually', () => { let sync1, sync2 let log1, log2 let syncedEventFired = false let leaveEventFired = false let syncedHead - let expectedEntry, expectedEntry2 + let expectedEntry before(async () => { log1 = await Log(testIdentity1, { logId: 'synclog1' }) @@ -286,10 +349,12 @@ describe('Sync protocol', function () { const onSynced = async (bytes) => { syncedHead = await Entry.decode(bytes) - syncedEventFired = true + if (expectedEntry) { + syncedEventFired = expectedEntry.hash === syncedHead.hash + } } - const onLeave = async (peerId) => { + const onLeave = (peerId) => { leaveEventFired = true } @@ -298,11 +363,12 @@ describe('Sync protocol', function () { sync2.events.on('leave', onLeave) - await log1.append('hello1') - await log1.append('hello2') - await log1.append('hello3') - await log1.append('hello4') + await sync1.add(await log1.append('hello1')) + await sync1.add(await log1.append('hello2')) + await sync1.add(await log1.append('hello3')) + await sync1.add(await log1.append('hello4')) expectedEntry = await log1.append('hello5') + await sync1.add(expectedEntry) await waitFor(() => syncedEventFired, () => true) @@ -330,7 +396,7 @@ describe('Sync protocol', function () { await log1.append('hello7') await log1.append('hello8') await log1.append('hello9') - expectedEntry2 = await log1.append('hello10') + expectedEntry = await log1.append('hello10') syncedEventFired = false @@ -339,16 +405,16 @@ describe('Sync protocol', function () { await waitFor(() => syncedEventFired, () => true) strictEqual(syncedEventFired, true) - deepStrictEqual(syncedHead, expectedEntry2) + deepStrictEqual(syncedHead, expectedEntry) }) - it('updates the set of connected peers', async () => { + it('updates the set of connected peers', () => { strictEqual(sync1.peers.has(String(peerId2)), true) strictEqual(sync2.peers.has(String(peerId1)), true) }) }) - describe.skip('Syncing after initial sync', () => { + describe('Syncing after initial sync', () => { let sync1, sync2 let log1, log2 let syncedEventFired = false @@ -361,17 +427,20 @@ describe('Sync protocol', function () { const onSynced = async (bytes) => { syncedHead = await Entry.decode(bytes) - syncedEventFired = true + if (expectedEntry) { + syncedEventFired = expectedEntry.hash === syncedHead.hash + } } sync1 = await Sync({ ipfs: ipfs1, log: log1 }) sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced }) - await log1.append('hello1') - await log1.append('hello2') - await log1.append('hello3') - await log1.append('hello4') + await sync1.add(await log1.append('hello1')) + await sync1.add(await log1.append('hello2')) + await sync1.add(await log1.append('hello3')) + await sync1.add(await log1.append('hello4')) expectedEntry = await log1.append('hello5') + await sync1.add(expectedEntry) await waitFor(() => syncedEventFired, () => true) @@ -386,6 +455,11 @@ describe('Sync protocol', function () { if (sync2) { await sync2.stop() } + + await ipfs1.stop() + await ipfs2.stop() + await ipfs1.start() + await ipfs2.start() }) it('doesn\'t sync when an entry is added to a log', async () => { @@ -400,6 +474,7 @@ describe('Sync protocol', function () { await log1.append('hello9') const expectedEntry2 = await log1.append('hello10') await sync1.add(expectedEntry2) + expectedEntry = expectedEntry2 await waitFor(() => syncedEventFired, () => true) deepStrictEqual(syncedHead, expectedEntry2) }) @@ -409,6 +484,8 @@ describe('Sync protocol', function () { let sync1, sync2 let joinEventFired = false let leaveEventFired = false + let errorEventFired = false + let err let receivedHeads = [] let joiningPeerId let leavingPeerId @@ -417,25 +494,36 @@ describe('Sync protocol', function () { const log1 = await Log(testIdentity1, { logId: 'synclog2' }) const log2 = await Log(testIdentity2, { logId: 'synclog2' }) - const onJoin = async (peerId, heads) => { + const onJoin = (peerId, heads) => { joinEventFired = true joiningPeerId = peerId receivedHeads = heads } - const onLeave = async (peerId) => { + const onLeave = (peerId) => { leaveEventFired = true leavingPeerId = peerId } + const onError = (e) => { + errorEventFired = true + err = e.toString() + } + + const onSynced = (bytes) => { + throw new Error('Sync Error') + } + await log1.append('hello!') sync1 = await Sync({ ipfs: ipfs1, log: log1 }) - sync2 = await Sync({ ipfs: ipfs2, log: log2 }) + sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced }) sync1.events.on('join', onJoin) sync1.events.on('leave', onLeave) + sync2.events.on('error', onError) await waitFor(() => joinEventFired, () => true) + await waitFor(() => errorEventFired, () => true) await sync2.stop() @@ -451,11 +539,11 @@ describe('Sync protocol', function () { } }) - it('emits \'join\' event when a peer starts syncing', async () => { + it('emits \'join\' event when a peer starts syncing', () => { strictEqual(joinEventFired, true) }) - it('heads passed by the \'join\' event are the expected heads', async () => { + it('heads passed by the \'join\' event are the expected heads', () => { strictEqual(receivedHeads.length, 1) strictEqual(receivedHeads[0].payload, 'hello!') }) @@ -470,8 +558,9 @@ describe('Sync protocol', function () { strictEqual(String(leavingPeerId), String(id)) }) - it.skip('emits an \'error\' event', async () => { - // TODO + it('emits an \'error\' event', () => { + strictEqual(errorEventFired, true) + strictEqual(err, 'Error: Sync Error') }) }) })