Test/sync (#43)

* test: Re-enable skipped tests.

* test: Re-enable all tests.

* test: Wait for final entry when evaluating onSynced.

* test: Check if latest append is head to determine if sync has completed.

* test: Do not rely on order of sync-ed entries.

* docs: Sync-ing notes.

* docs: Simplify dial and pubsub explanation. Formatting.

* refactor: Check for required params.

* docs: Explain initial sync and pubsub updates.

* test: Save updated heads during sync.

* test: Remove extraneous expected entries.

* test: For eventual consistency.

* chore: Remove console.logs. Log errors using error event.

* chore: Remove extraneous asyncs.
This commit is contained in:
Hayden Young
2023-03-17 04:33:39 +08:00
committed by GitHub
parent aabfd4e2bc
commit 1990cb9b87
2 changed files with 183 additions and 59 deletions

View File

@@ -3,7 +3,45 @@ import PQueue from 'p-queue'
import Path from 'path'
import { EventEmitter } from 'events'
/**
* @description
* Syncs an append-only, conflict-free replicated data type (CRDT) log between
* multiple peers.
*
* The sync protocol synchronizes heads between multiple peers, both during
* startup and also when new entries are appended to the log.
*
* When Sync is started, peers "dial" each other using libp2p's custom protocol
* handler and initiate the exchange of heads each peer currently has. Once
* initial sync has completed, peers notify one another of updates to heads
* using pubsub "subscribe" with the same log.id topic. A peer with new heads
* can broadcast changes to other peers using pubsub "publish". Peers
* subscribed to the same topic will then be notified and will update their
* heads accordingly.
*
* The sync protocol only guarantees that the message is published; it does not
* guarantee the order in which messages are received or even that the message
* is recieved at all. The sync protocol only guarantees that heads will
* eventually reach consistency between all peers with the same address.
*/
/**
* Creates a Sync instance for sychronizing logs between multiple peers.
* @param {Object} params One or more parameters for configuring Sync.
* @param {IPFS} params.ipfs An IPFS instance. Used for synchronizing peers.
* @param {Log} params.log The Log instance to sync.
* @param {Object} params.events An event emitter. Defaults to an instance of
* EventEmitter. Events emitted are 'join', 'error' and 'leave'.
* @param {Function} params.onSynced A function that is called after the peer
* has received heads from another peer.
* @param {Boolean} params.start True if sync should start automatically, false
* otherwise. Defaults to true.
* @return {Sync} The Sync protocol instance.
*/
const Sync = async ({ ipfs, log, events, onSynced, start }) => {
if (!ipfs) throw new Error('An instance of ipfs is required.')
if (!log) throw new Error('An instance of log is required.')
const address = log.id
const headsSyncAddress = Path.join('/orbitdb/heads/', address)
@@ -42,7 +80,6 @@ const Sync = async ({ ipfs, log, events, onSynced, start }) => {
peers.add(peerId)
await pipe(stream, receiveHeads(peerId), sendHeads, stream)
} catch (e) {
console.error(e)
peers.delete(peerId)
events.emit('error', e)
}
@@ -68,7 +105,6 @@ const Sync = async ({ ipfs, log, events, onSynced, start }) => {
if (e.code === 'ERR_UNSUPPORTED_PROTOCOL') {
// Skip peer, they don't have this database currently
} else {
console.error(e)
peers.delete(peerId)
events.emit('error', e)
}
@@ -91,7 +127,6 @@ const Sync = async ({ ipfs, log, events, onSynced, start }) => {
await onSynced(message.data)
}
} catch (e) {
console.error(e)
events.emit('error', e)
}
}

View File

@@ -8,6 +8,9 @@ import config from './config.js'
import connectPeers from './utils/connect-nodes.js'
import waitFor from './utils/wait-for.js'
import testKeysPath from './fixtures/test-keys-path.js'
import LRUStorage from '../src/storage/lru.js'
import IPFSBlockStorage from '../src/storage/ipfs-block.js'
import ComposedStorage from '../src/storage/composed.js'
const keysPath = './testkeys'
@@ -68,7 +71,7 @@ describe('Sync protocol', function () {
notStrictEqual(sync, undefined)
})
it('has an ad function', async () => {
it('has an add function', async () => {
notStrictEqual(sync.add, undefined)
strictEqual(typeof sync.add, 'function')
})
@@ -93,23 +96,57 @@ describe('Sync protocol', function () {
})
})
describe('Params', () => {
it('throws an error when IPFS is not defined', async () => {
let err
try {
await Sync({})
} catch (e) {
err = e.toString()
}
strictEqual(err, 'Error: An instance of ipfs is required.')
})
it('throws an error when log is not defined', async () => {
let err
try {
await Sync({ ipfs: ipfs1 })
} catch (e) {
err = e.toString()
}
strictEqual(err, 'Error: An instance of log is required.')
})
})
describe('Syncing automatically', () => {
let sync1, sync2
let log1, log2
let joinEventFired = false
let syncedEventFired = false
let syncedHead
let expectedEntry
before(async () => {
const log1 = await Log(testIdentity1, { logId: 'synclog1' })
const log2 = await Log(testIdentity2, { logId: 'synclog1' })
const entryStorage1 = await ComposedStorage(
await LRUStorage({ size: 1000 }),
await IPFSBlockStorage({ ipfs: ipfs1, pin: true })
)
const entryStorage2 = await ComposedStorage(
await LRUStorage({ size: 1000 }),
await IPFSBlockStorage({ ipfs: ipfs2, pin: true })
)
log1 = await Log(testIdentity1, { logId: 'synclog1', entryStorage: entryStorage1 })
log2 = await Log(testIdentity2, { logId: 'synclog1', entryStorage: entryStorage2 })
const onSynced = async (bytes) => {
syncedHead = await Entry.decode(bytes)
log2.joinEntry(syncedHead)
syncedEventFired = true
}
const onJoin = async (peerId, heads) => {
const onJoin = (peerId, heads) => {
joinEventFired = true
}
@@ -136,35 +173,58 @@ describe('Sync protocol', function () {
deepStrictEqual(syncedHead, expectedEntry)
})
it('updates the set of connected peers', async () => {
it('updates the set of connected peers', () => {
strictEqual(sync2.peers.has(String(peerId1)), true)
strictEqual(sync1.peers.has(String(peerId2)), true)
})
it('eventually reaches consistency', async () => {
await sync1.add(await log1.append('hello2'))
await sync1.add(await log1.append('hello3'))
await sync1.add(await log1.append('hello4'))
expectedEntry = await log1.append('hello5')
await sync1.add(expectedEntry)
await waitFor(() => syncedEventFired, () => true)
deepStrictEqual(syncedHead, expectedEntry)
deepStrictEqual(await log1.heads(), await log2.heads())
const all1 = []
for await (const item of log1.iterator()) {
all1.unshift(item)
}
const all2 = []
for await (const item of log2.iterator()) {
all2.unshift(item)
}
deepStrictEqual(all1, all2)
})
})
describe('Starting sync manually', () => {
let sync1, sync2
let log1, log2
let syncedEventFired = false
let syncedHead
let expectedEntry
before(async () => {
const log1 = await Log(testIdentity1, { logId: 'synclog1' })
const log2 = await Log(testIdentity2, { logId: 'synclog1' })
log1 = await Log(testIdentity1, { logId: 'synclog1' })
log2 = await Log(testIdentity2, { logId: 'synclog1' })
const onSynced = async (bytes) => {
syncedHead = await Entry.decode(bytes)
syncedEventFired = true
syncedEventFired = expectedEntry.hash === syncedHead.hash
}
sync1 = await Sync({ ipfs: ipfs1, log: log1 })
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced, start: false })
await log1.append('hello1')
await log1.append('hello2')
await log1.append('hello3')
await log1.append('hello4')
expectedEntry = await log1.append('hello5')
expectedEntry = await log1.append('hello1')
})
after(async () => {
@@ -184,17 +244,17 @@ describe('Sync protocol', function () {
strictEqual(syncedEventFired, true)
})
it('syncs the correct head', async () => {
it('syncs the correct head', () => {
deepStrictEqual(syncedHead, expectedEntry)
})
it('updates the set of connected peers', async () => {
it('updates the set of connected peers', () => {
strictEqual(sync2.peers.has(String(peerId1)), true)
strictEqual(sync1.peers.has(String(peerId2)), true)
})
})
describe.skip('Stopping sync', () => {
describe('Stopping sync', () => {
let sync1, sync2
let log1, log2
let syncedEventFired = false
@@ -209,10 +269,12 @@ describe('Sync protocol', function () {
const onSynced = async (bytes) => {
syncedHead = await Entry.decode(bytes)
syncedEventFired = true
if (expectedEntry) {
syncedEventFired = expectedEntry.hash === syncedHead.hash
}
}
const onLeave = async (peerId) => {
const onLeave = (peerId) => {
leaveEventFired = true
leavingPeerId = peerId
}
@@ -220,13 +282,14 @@ describe('Sync protocol', function () {
sync1 = await Sync({ ipfs: ipfs1, log: log1 })
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
sync2.events.on('leave', onLeave)
sync1.events.on('leave', onLeave)
await log1.append('hello1')
await log1.append('hello2')
await log1.append('hello3')
await log1.append('hello4')
await sync1.add(await log1.append('hello1'))
await sync1.add(await log1.append('hello2'))
await sync1.add(await log1.append('hello3'))
await sync1.add(await log1.append('hello4'))
expectedEntry = await log1.append('hello5')
await sync1.add(expectedEntry)
})
after(async () => {
@@ -249,36 +312,36 @@ describe('Sync protocol', function () {
})
it('stops syncing', async () => {
await sync1.stop()
await sync2.stop()
await log1.append('hello6')
await log1.append('hello7')
await log1.append('hello8')
await log1.append('hello9')
await log1.append('hello10')
await sync1.add(await log1.append('hello6'))
await sync1.add(await log1.append('hello7'))
await sync1.add(await log1.append('hello8'))
await sync1.add(await log1.append('hello9'))
await sync1.add(await log1.append('hello10'))
await waitFor(() => leaveEventFired, () => true)
deepStrictEqual(syncedHead, expectedEntry)
})
it('the peerId passed by the \'leave\' event is the expected peer ID', async () => {
strictEqual(String(leavingPeerId), String(peerId1))
it('the peerId passed by the \'leave\' event is the expected peer ID', () => {
strictEqual(String(leavingPeerId), String(peerId2))
})
it('updates the set of connected peers', async () => {
it('updates the set of connected peers', () => {
strictEqual(sync2.peers.has(String(leavingPeerId)), false)
strictEqual(sync1.peers.has(String(peerId2)), false)
})
})
describe.skip('Restarting sync after stopping it manually', () => {
describe('Restarting sync after stopping it manually', () => {
let sync1, sync2
let log1, log2
let syncedEventFired = false
let leaveEventFired = false
let syncedHead
let expectedEntry, expectedEntry2
let expectedEntry
before(async () => {
log1 = await Log(testIdentity1, { logId: 'synclog1' })
@@ -286,10 +349,12 @@ describe('Sync protocol', function () {
const onSynced = async (bytes) => {
syncedHead = await Entry.decode(bytes)
syncedEventFired = true
if (expectedEntry) {
syncedEventFired = expectedEntry.hash === syncedHead.hash
}
}
const onLeave = async (peerId) => {
const onLeave = (peerId) => {
leaveEventFired = true
}
@@ -298,11 +363,12 @@ describe('Sync protocol', function () {
sync2.events.on('leave', onLeave)
await log1.append('hello1')
await log1.append('hello2')
await log1.append('hello3')
await log1.append('hello4')
await sync1.add(await log1.append('hello1'))
await sync1.add(await log1.append('hello2'))
await sync1.add(await log1.append('hello3'))
await sync1.add(await log1.append('hello4'))
expectedEntry = await log1.append('hello5')
await sync1.add(expectedEntry)
await waitFor(() => syncedEventFired, () => true)
@@ -330,7 +396,7 @@ describe('Sync protocol', function () {
await log1.append('hello7')
await log1.append('hello8')
await log1.append('hello9')
expectedEntry2 = await log1.append('hello10')
expectedEntry = await log1.append('hello10')
syncedEventFired = false
@@ -339,16 +405,16 @@ describe('Sync protocol', function () {
await waitFor(() => syncedEventFired, () => true)
strictEqual(syncedEventFired, true)
deepStrictEqual(syncedHead, expectedEntry2)
deepStrictEqual(syncedHead, expectedEntry)
})
it('updates the set of connected peers', async () => {
it('updates the set of connected peers', () => {
strictEqual(sync1.peers.has(String(peerId2)), true)
strictEqual(sync2.peers.has(String(peerId1)), true)
})
})
describe.skip('Syncing after initial sync', () => {
describe('Syncing after initial sync', () => {
let sync1, sync2
let log1, log2
let syncedEventFired = false
@@ -361,17 +427,20 @@ describe('Sync protocol', function () {
const onSynced = async (bytes) => {
syncedHead = await Entry.decode(bytes)
syncedEventFired = true
if (expectedEntry) {
syncedEventFired = expectedEntry.hash === syncedHead.hash
}
}
sync1 = await Sync({ ipfs: ipfs1, log: log1 })
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
await log1.append('hello1')
await log1.append('hello2')
await log1.append('hello3')
await log1.append('hello4')
await sync1.add(await log1.append('hello1'))
await sync1.add(await log1.append('hello2'))
await sync1.add(await log1.append('hello3'))
await sync1.add(await log1.append('hello4'))
expectedEntry = await log1.append('hello5')
await sync1.add(expectedEntry)
await waitFor(() => syncedEventFired, () => true)
@@ -386,6 +455,11 @@ describe('Sync protocol', function () {
if (sync2) {
await sync2.stop()
}
await ipfs1.stop()
await ipfs2.stop()
await ipfs1.start()
await ipfs2.start()
})
it('doesn\'t sync when an entry is added to a log', async () => {
@@ -400,6 +474,7 @@ describe('Sync protocol', function () {
await log1.append('hello9')
const expectedEntry2 = await log1.append('hello10')
await sync1.add(expectedEntry2)
expectedEntry = expectedEntry2
await waitFor(() => syncedEventFired, () => true)
deepStrictEqual(syncedHead, expectedEntry2)
})
@@ -409,6 +484,8 @@ describe('Sync protocol', function () {
let sync1, sync2
let joinEventFired = false
let leaveEventFired = false
let errorEventFired = false
let err
let receivedHeads = []
let joiningPeerId
let leavingPeerId
@@ -417,25 +494,36 @@ describe('Sync protocol', function () {
const log1 = await Log(testIdentity1, { logId: 'synclog2' })
const log2 = await Log(testIdentity2, { logId: 'synclog2' })
const onJoin = async (peerId, heads) => {
const onJoin = (peerId, heads) => {
joinEventFired = true
joiningPeerId = peerId
receivedHeads = heads
}
const onLeave = async (peerId) => {
const onLeave = (peerId) => {
leaveEventFired = true
leavingPeerId = peerId
}
const onError = (e) => {
errorEventFired = true
err = e.toString()
}
const onSynced = (bytes) => {
throw new Error('Sync Error')
}
await log1.append('hello!')
sync1 = await Sync({ ipfs: ipfs1, log: log1 })
sync2 = await Sync({ ipfs: ipfs2, log: log2 })
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
sync1.events.on('join', onJoin)
sync1.events.on('leave', onLeave)
sync2.events.on('error', onError)
await waitFor(() => joinEventFired, () => true)
await waitFor(() => errorEventFired, () => true)
await sync2.stop()
@@ -451,11 +539,11 @@ describe('Sync protocol', function () {
}
})
it('emits \'join\' event when a peer starts syncing', async () => {
it('emits \'join\' event when a peer starts syncing', () => {
strictEqual(joinEventFired, true)
})
it('heads passed by the \'join\' event are the expected heads', async () => {
it('heads passed by the \'join\' event are the expected heads', () => {
strictEqual(receivedHeads.length, 1)
strictEqual(receivedHeads[0].payload, 'hello!')
})
@@ -470,8 +558,9 @@ describe('Sync protocol', function () {
strictEqual(String(leavingPeerId), String(id))
})
it.skip('emits an \'error\' event', async () => {
// TODO
it('emits an \'error\' event', () => {
strictEqual(errorEventFired, true)
strictEqual(err, 'Error: Sync Error')
})
})
})