mirror of
https://github.com/orbitdb/orbitdb.git
synced 2025-03-30 15:08:28 +00:00

* Fix error event test in Sync tests * Fix race condition * Debug timeouts * More tests fixing * Try running local webrtc-star-signalling server for tests * Fix sync tests * Skip Log references tests in the browser tests
677 lines
18 KiB
JavaScript
677 lines
18 KiB
JavaScript
import { deepStrictEqual, strictEqual, notStrictEqual } from 'assert'
|
|
import rmrf from 'rimraf'
|
|
import { copy } from 'fs-extra'
|
|
import * as IPFS from 'ipfs-core'
|
|
import Sync from '../src/sync.js'
|
|
import { Log, Entry, Identities, KeyStore } from '../src/index.js'
|
|
import config from './config.js'
|
|
import connectPeers from './utils/connect-nodes.js'
|
|
import waitFor from './utils/wait-for.js'
|
|
import testKeysPath from './fixtures/test-keys-path.js'
|
|
import LRUStorage from '../src/storage/lru.js'
|
|
import IPFSBlockStorage from '../src/storage/ipfs-block.js'
|
|
import ComposedStorage from '../src/storage/composed.js'
|
|
|
|
const keysPath = './testkeys'
|
|
|
|
describe('Sync protocol', function () {
|
|
this.timeout(10000)
|
|
|
|
let ipfs1, ipfs2
|
|
let keystore
|
|
let identities
|
|
let testIdentity1, testIdentity2
|
|
let peerId1, peerId2
|
|
|
|
before(async () => {
|
|
await rmrf('./ipfs1')
|
|
await rmrf('./ipfs2')
|
|
|
|
ipfs1 = await IPFS.create({ ...config.daemon1, repo: './ipfs1' })
|
|
ipfs2 = await IPFS.create({ ...config.daemon2, repo: './ipfs2' })
|
|
|
|
peerId1 = (await ipfs1.id()).id
|
|
peerId2 = (await ipfs2.id()).id
|
|
|
|
await connectPeers(ipfs1, ipfs2)
|
|
|
|
await copy(testKeysPath, keysPath)
|
|
keystore = await KeyStore({ path: keysPath })
|
|
identities = await Identities({ keystore })
|
|
testIdentity1 = await identities.createIdentity({ id: 'userA' })
|
|
testIdentity2 = await identities.createIdentity({ id: 'userB' })
|
|
})
|
|
|
|
after(async () => {
|
|
await ipfs1.stop()
|
|
await ipfs2.stop()
|
|
await rmrf('./ipfs1')
|
|
await rmrf('./ipfs2')
|
|
if (keystore) {
|
|
await keystore.close()
|
|
}
|
|
await rmrf(keysPath)
|
|
})
|
|
|
|
describe('Creating an instance', () => {
|
|
let sync
|
|
|
|
before(async () => {
|
|
const log = await Log(testIdentity1)
|
|
sync = await Sync({ ipfs: ipfs1, log })
|
|
})
|
|
|
|
after(async () => {
|
|
if (sync) {
|
|
await sync.stop()
|
|
}
|
|
})
|
|
|
|
it('creates an instance', async () => {
|
|
notStrictEqual(sync, undefined)
|
|
})
|
|
|
|
it('has an add function', async () => {
|
|
notStrictEqual(sync.add, undefined)
|
|
strictEqual(typeof sync.add, 'function')
|
|
})
|
|
|
|
it('has a start function', async () => {
|
|
notStrictEqual(sync.start, undefined)
|
|
strictEqual(typeof sync.stop, 'function')
|
|
})
|
|
|
|
it('has a stop function', async () => {
|
|
notStrictEqual(sync.stop, undefined)
|
|
strictEqual(typeof sync.stop, 'function')
|
|
})
|
|
|
|
it('has events', async () => {
|
|
notStrictEqual(sync.events, undefined)
|
|
})
|
|
|
|
it('has a set of peers', async () => {
|
|
notStrictEqual(sync.peers, undefined)
|
|
strictEqual(sync.peers instanceof Set, true)
|
|
})
|
|
})
|
|
|
|
describe('Params', () => {
|
|
it('throws an error when IPFS is not defined', async () => {
|
|
let err
|
|
try {
|
|
await Sync({})
|
|
} catch (e) {
|
|
err = e.toString()
|
|
}
|
|
strictEqual(err, 'Error: An instance of ipfs is required.')
|
|
})
|
|
|
|
it('throws an error when log is not defined', async () => {
|
|
let err
|
|
try {
|
|
await Sync({ ipfs: ipfs1 })
|
|
} catch (e) {
|
|
err = e.toString()
|
|
}
|
|
strictEqual(err, 'Error: An instance of log is required.')
|
|
})
|
|
})
|
|
|
|
describe('Syncing automatically', () => {
|
|
let sync1, sync2
|
|
let log1, log2
|
|
let joinEventFired = false
|
|
let syncedEventFired = false
|
|
let syncedHead
|
|
let expectedEntry
|
|
|
|
before(async () => {
|
|
const entryStorage1 = await ComposedStorage(
|
|
await LRUStorage({ size: 1000 }),
|
|
await IPFSBlockStorage({ ipfs: ipfs1, pin: true })
|
|
)
|
|
|
|
const entryStorage2 = await ComposedStorage(
|
|
await LRUStorage({ size: 1000 }),
|
|
await IPFSBlockStorage({ ipfs: ipfs2, pin: true })
|
|
)
|
|
|
|
log1 = await Log(testIdentity1, { logId: 'synclog1', entryStorage: entryStorage1 })
|
|
log2 = await Log(testIdentity2, { logId: 'synclog1', entryStorage: entryStorage2 })
|
|
|
|
const onSynced = async (bytes) => {
|
|
const entry = await Entry.decode(bytes)
|
|
if (await log2.joinEntry(entry)) {
|
|
syncedHead = entry
|
|
syncedEventFired = true
|
|
}
|
|
}
|
|
|
|
const onJoin = (peerId, heads) => {
|
|
joinEventFired = true
|
|
}
|
|
|
|
expectedEntry = await log1.append('hello1')
|
|
|
|
sync1 = await Sync({ ipfs: ipfs1, log: log1, onSynced: () => {} })
|
|
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
|
|
|
|
sync1.events.on('join', onJoin)
|
|
sync2.events.on('join', onJoin)
|
|
|
|
await waitFor(() => joinEventFired, () => true)
|
|
await waitFor(() => syncedEventFired, () => true)
|
|
})
|
|
|
|
after(async () => {
|
|
if (sync1) {
|
|
await sync1.stop()
|
|
}
|
|
if (sync2) {
|
|
await sync2.stop()
|
|
}
|
|
})
|
|
|
|
it('syncs the head', async () => {
|
|
deepStrictEqual(syncedHead, expectedEntry)
|
|
})
|
|
|
|
it('updates the set of connected peers', () => {
|
|
strictEqual(sync2.peers.has(String(peerId1)), true)
|
|
strictEqual(sync1.peers.has(String(peerId2)), true)
|
|
})
|
|
})
|
|
|
|
describe('Eventual Consistency', () => {
|
|
let sync1, sync2
|
|
let log1, log2
|
|
let joinEventFired = false
|
|
let syncedHead
|
|
|
|
before(async () => {
|
|
const entryStorage1 = await ComposedStorage(
|
|
await LRUStorage({ size: 1000 }),
|
|
await IPFSBlockStorage({ ipfs: ipfs1, pin: true })
|
|
)
|
|
|
|
const entryStorage2 = await ComposedStorage(
|
|
await LRUStorage({ size: 1000 }),
|
|
await IPFSBlockStorage({ ipfs: ipfs2, pin: true })
|
|
)
|
|
|
|
log1 = await Log(testIdentity1, { logId: 'synclog7', entryStorage: entryStorage1 })
|
|
log2 = await Log(testIdentity2, { logId: 'synclog7', entryStorage: entryStorage2 })
|
|
|
|
const onSynced = async (bytes) => {
|
|
const entry = await Entry.decode(bytes)
|
|
if (await log2.joinEntry(entry)) {
|
|
syncedHead = entry
|
|
}
|
|
}
|
|
|
|
const onJoin = (peerId, heads) => {
|
|
joinEventFired = true
|
|
}
|
|
|
|
sync1 = await Sync({ ipfs: ipfs1, log: log1, onSynced: () => {} })
|
|
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
|
|
|
|
sync1.events.on('join', onJoin)
|
|
sync2.events.on('join', onJoin)
|
|
|
|
await waitFor(() => joinEventFired, () => true)
|
|
})
|
|
|
|
after(async () => {
|
|
if (sync1) {
|
|
await sync1.stop()
|
|
}
|
|
if (sync2) {
|
|
await sync2.stop()
|
|
}
|
|
})
|
|
|
|
it('is eventually consistent', async () => {
|
|
const e2 = await log1.append('hello2')
|
|
const e3 = await log1.append('hello3')
|
|
const e4 = await log1.append('hello4')
|
|
const expected = await log1.append('hello5')
|
|
|
|
await sync1.add(e3)
|
|
await sync1.add(e2)
|
|
await sync1.add(e4)
|
|
await sync1.add(expected)
|
|
|
|
await waitFor(() => Entry.isEqual(expected, syncedHead), () => true)
|
|
|
|
deepStrictEqual(syncedHead, expected)
|
|
|
|
deepStrictEqual(await log1.heads(), await log2.heads())
|
|
|
|
const all1 = []
|
|
for await (const item of log1.iterator()) {
|
|
all1.unshift(item)
|
|
}
|
|
|
|
const all2 = []
|
|
for await (const item of log2.iterator()) {
|
|
all2.unshift(item)
|
|
}
|
|
|
|
deepStrictEqual(all1.map(e => e.payload), [
|
|
'hello2',
|
|
'hello3',
|
|
'hello4',
|
|
'hello5'
|
|
])
|
|
|
|
deepStrictEqual(all1, all2)
|
|
})
|
|
})
|
|
|
|
describe('Starting sync manually', () => {
|
|
let sync1, sync2
|
|
let log1, log2
|
|
let syncedEventFired = false
|
|
let syncedHead
|
|
let expectedEntry
|
|
|
|
before(async () => {
|
|
log1 = await Log(testIdentity1, { logId: 'synclog1' })
|
|
log2 = await Log(testIdentity2, { logId: 'synclog1' })
|
|
|
|
const onSynced = async (bytes) => {
|
|
syncedHead = await Entry.decode(bytes)
|
|
syncedEventFired = expectedEntry.hash === syncedHead.hash
|
|
}
|
|
|
|
sync1 = await Sync({ ipfs: ipfs1, log: log1 })
|
|
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced, start: false })
|
|
|
|
expectedEntry = await log1.append('hello1')
|
|
})
|
|
|
|
after(async () => {
|
|
if (sync1) {
|
|
await sync1.stop()
|
|
}
|
|
if (sync2) {
|
|
await sync2.stop()
|
|
}
|
|
})
|
|
|
|
it('starts syncing', async () => {
|
|
await sync2.start()
|
|
|
|
await waitFor(() => syncedEventFired, () => true)
|
|
|
|
strictEqual(syncedEventFired, true)
|
|
})
|
|
|
|
it('syncs the correct head', () => {
|
|
deepStrictEqual(syncedHead, expectedEntry)
|
|
})
|
|
|
|
it('updates the set of connected peers', () => {
|
|
strictEqual(sync2.peers.has(String(peerId1)), true)
|
|
strictEqual(sync1.peers.has(String(peerId2)), true)
|
|
})
|
|
})
|
|
|
|
describe('Stopping sync', () => {
|
|
let sync1, sync2
|
|
let log1, log2
|
|
let syncedEventFired = false
|
|
let leaveEventFired = false
|
|
let syncedHead
|
|
let expectedEntry
|
|
let leavingPeerId
|
|
|
|
before(async () => {
|
|
log1 = await Log(testIdentity1, { logId: 'synclog1' })
|
|
log2 = await Log(testIdentity2, { logId: 'synclog1' })
|
|
|
|
const onSynced = async (bytes) => {
|
|
syncedHead = await Entry.decode(bytes)
|
|
if (expectedEntry) {
|
|
syncedEventFired = expectedEntry.hash === syncedHead.hash
|
|
}
|
|
}
|
|
|
|
const onLeave = (peerId) => {
|
|
leaveEventFired = true
|
|
leavingPeerId = peerId
|
|
}
|
|
|
|
sync1 = await Sync({ ipfs: ipfs1, log: log1 })
|
|
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
|
|
|
|
sync1.events.on('leave', onLeave)
|
|
|
|
await sync1.add(await log1.append('hello1'))
|
|
await sync1.add(await log1.append('hello2'))
|
|
await sync1.add(await log1.append('hello3'))
|
|
await sync1.add(await log1.append('hello4'))
|
|
expectedEntry = await log1.append('hello5')
|
|
await sync1.add(expectedEntry)
|
|
})
|
|
|
|
after(async () => {
|
|
if (sync1) {
|
|
await sync1.stop()
|
|
}
|
|
if (sync2) {
|
|
await sync2.stop()
|
|
}
|
|
})
|
|
|
|
it('starts syncing', async () => {
|
|
await waitFor(() => syncedEventFired, () => true)
|
|
|
|
strictEqual(syncedEventFired, true)
|
|
deepStrictEqual(syncedHead, expectedEntry)
|
|
|
|
strictEqual(sync1.peers.has(String(peerId2)), true)
|
|
strictEqual(sync2.peers.has(String(peerId1)), true)
|
|
})
|
|
|
|
it('stops syncing', async () => {
|
|
await sync2.stop()
|
|
|
|
await sync1.add(await log1.append('hello6'))
|
|
await sync1.add(await log1.append('hello7'))
|
|
await sync1.add(await log1.append('hello8'))
|
|
await sync1.add(await log1.append('hello9'))
|
|
await sync1.add(await log1.append('hello10'))
|
|
|
|
await waitFor(() => leaveEventFired, () => true)
|
|
|
|
deepStrictEqual(syncedHead, expectedEntry)
|
|
})
|
|
|
|
it('the peerId passed by the \'leave\' event is the expected peer ID', () => {
|
|
strictEqual(String(leavingPeerId), String(peerId2))
|
|
})
|
|
|
|
it('updates the set of connected peers', () => {
|
|
strictEqual(sync2.peers.has(String(leavingPeerId)), false)
|
|
strictEqual(sync1.peers.has(String(peerId2)), false)
|
|
})
|
|
})
|
|
|
|
describe('Restarting sync after stopping it manually', () => {
|
|
let sync1, sync2
|
|
let log1, log2
|
|
let syncedEventFired = false
|
|
let leaveEventFired = false
|
|
let syncedHead
|
|
let expectedEntry
|
|
|
|
before(async () => {
|
|
log1 = await Log(testIdentity1, { logId: 'synclog1' })
|
|
log2 = await Log(testIdentity2, { logId: 'synclog1' })
|
|
|
|
const onSynced = async (bytes) => {
|
|
if (expectedEntry && !syncedEventFired) {
|
|
syncedHead = await Entry.decode(bytes)
|
|
syncedEventFired = expectedEntry.hash === syncedHead.hash
|
|
}
|
|
}
|
|
|
|
const onLeave = (peerId) => {
|
|
leaveEventFired = true
|
|
}
|
|
|
|
sync1 = await Sync({ ipfs: ipfs1, log: log1 })
|
|
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
|
|
|
|
sync2.events.on('leave', onLeave)
|
|
|
|
await sync1.add(await log1.append('hello1'))
|
|
await sync1.add(await log1.append('hello2'))
|
|
await sync1.add(await log1.append('hello3'))
|
|
await sync1.add(await log1.append('hello4'))
|
|
expectedEntry = await log1.append('hello5')
|
|
await sync1.add(expectedEntry)
|
|
|
|
await waitFor(() => syncedEventFired, () => true)
|
|
|
|
strictEqual(syncedEventFired, true)
|
|
deepStrictEqual(syncedHead, expectedEntry)
|
|
|
|
await sync1.stop()
|
|
|
|
await waitFor(() => leaveEventFired, () => true)
|
|
|
|
strictEqual(leaveEventFired, true)
|
|
})
|
|
|
|
after(async () => {
|
|
if (sync1) {
|
|
await sync1.stop()
|
|
}
|
|
if (sync2) {
|
|
await sync2.stop()
|
|
}
|
|
})
|
|
|
|
it('restarts syncing', async () => {
|
|
await log1.append('hello6')
|
|
await log1.append('hello7')
|
|
await log1.append('hello8')
|
|
await log1.append('hello9')
|
|
expectedEntry = await log1.append('hello10')
|
|
|
|
syncedEventFired = false
|
|
|
|
await sync1.start()
|
|
|
|
await waitFor(() => syncedEventFired, () => true)
|
|
|
|
strictEqual(syncedEventFired, true)
|
|
deepStrictEqual(syncedHead, expectedEntry)
|
|
})
|
|
|
|
it('updates the set of connected peers', () => {
|
|
strictEqual(sync1.peers.has(String(peerId2)), true)
|
|
strictEqual(sync2.peers.has(String(peerId1)), true)
|
|
})
|
|
})
|
|
|
|
describe('Syncing after initial sync', () => {
|
|
let sync1, sync2
|
|
let log1, log2
|
|
let syncedEventFired = false
|
|
let syncedHead
|
|
let expectedEntry
|
|
|
|
before(async () => {
|
|
log1 = await Log(testIdentity1, { logId: 'synclog2' })
|
|
log2 = await Log(testIdentity2, { logId: 'synclog2' })
|
|
|
|
const onSynced = async (bytes) => {
|
|
syncedHead = await Entry.decode(bytes)
|
|
if (expectedEntry) {
|
|
syncedEventFired = expectedEntry.hash === syncedHead.hash
|
|
}
|
|
}
|
|
|
|
sync1 = await Sync({ ipfs: ipfs1, log: log1 })
|
|
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
|
|
|
|
await sync1.add(await log1.append('hello1'))
|
|
await sync1.add(await log1.append('hello2'))
|
|
await sync1.add(await log1.append('hello3'))
|
|
await sync1.add(await log1.append('hello4'))
|
|
expectedEntry = await log1.append('hello5')
|
|
await sync1.add(expectedEntry)
|
|
|
|
await waitFor(() => syncedEventFired, () => true)
|
|
|
|
strictEqual(syncedEventFired, true)
|
|
deepStrictEqual(syncedHead, expectedEntry)
|
|
})
|
|
|
|
after(async () => {
|
|
if (sync1) {
|
|
await sync1.stop()
|
|
}
|
|
if (sync2) {
|
|
await sync2.stop()
|
|
}
|
|
|
|
await ipfs1.stop()
|
|
await ipfs2.stop()
|
|
await ipfs1.start()
|
|
await ipfs2.start()
|
|
})
|
|
|
|
it('doesn\'t sync when an entry is added to a log', async () => {
|
|
await log1.append('hello6')
|
|
deepStrictEqual(syncedHead, expectedEntry)
|
|
})
|
|
|
|
it('syncs new entries', async () => {
|
|
syncedEventFired = false
|
|
await log1.append('hello7')
|
|
await log1.append('hello8')
|
|
await log1.append('hello9')
|
|
const expectedEntry2 = await log1.append('hello10')
|
|
await sync1.add(expectedEntry2)
|
|
expectedEntry = expectedEntry2
|
|
await waitFor(() => syncedEventFired, () => true)
|
|
deepStrictEqual(syncedHead, expectedEntry2)
|
|
})
|
|
})
|
|
|
|
describe('Timeouts', () => {
|
|
let sync1, sync2
|
|
let log1, log2
|
|
|
|
const timeoutTime = 1 // 1 millisecond
|
|
|
|
before(async () => {
|
|
log1 = await Log(testIdentity1, { logId: 'synclog5' })
|
|
log2 = await Log(testIdentity2, { logId: 'synclog5' })
|
|
|
|
sync1 = await Sync({ ipfs: ipfs1, log: log1, timeout: timeoutTime })
|
|
sync2 = await Sync({ ipfs: ipfs2, log: log2, start: false, timeout: timeoutTime })
|
|
|
|
await log1.append('hello1')
|
|
})
|
|
|
|
after(async () => {
|
|
if (sync1) {
|
|
await sync1.stop()
|
|
}
|
|
if (sync2) {
|
|
await sync2.stop()
|
|
}
|
|
})
|
|
|
|
it('emits an error when connecting to peer was cancelled due to timeout', async () => {
|
|
let err = null
|
|
|
|
const onError = (error) => {
|
|
err = error
|
|
}
|
|
|
|
sync1.events.on('error', onError)
|
|
sync2.events.on('error', onError)
|
|
|
|
await sync2.start()
|
|
|
|
await waitFor(() => err !== null, () => true)
|
|
|
|
notStrictEqual(err, null)
|
|
strictEqual(err.type, 'aborted')
|
|
strictEqual(err.message, 'The operation was aborted')
|
|
})
|
|
})
|
|
|
|
describe('Events', () => {
|
|
let sync1, sync2
|
|
let joinEventFired = false
|
|
let leaveEventFired = false
|
|
let errorEventFired = false
|
|
let err
|
|
let receivedHeads = []
|
|
let joiningPeerId
|
|
let leavingPeerId
|
|
|
|
before(async () => {
|
|
const log1 = await Log(testIdentity1, { logId: 'synclog3' })
|
|
const log2 = await Log(testIdentity2, { logId: 'synclog3' })
|
|
|
|
const onJoin = (peerId, heads) => {
|
|
joinEventFired = true
|
|
joiningPeerId = peerId
|
|
receivedHeads = heads
|
|
}
|
|
|
|
const onLeave = (peerId) => {
|
|
leaveEventFired = true
|
|
leavingPeerId = peerId
|
|
}
|
|
|
|
const onError = (e) => {
|
|
errorEventFired = true
|
|
err = e.toString()
|
|
}
|
|
|
|
const onSynced = (bytes) => {
|
|
sync2.events.emit('error', new Error('Sync Error'))
|
|
}
|
|
|
|
await log1.append('hello!')
|
|
|
|
sync1 = await Sync({ ipfs: ipfs1, log: log1, onSynced })
|
|
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
|
|
sync1.events.on('join', onJoin)
|
|
sync1.events.on('leave', onLeave)
|
|
sync2.events.on('error', onError)
|
|
|
|
await waitFor(() => joinEventFired, () => true)
|
|
await waitFor(() => errorEventFired, () => true)
|
|
|
|
await sync2.stop()
|
|
|
|
await waitFor(() => leaveEventFired, () => true)
|
|
})
|
|
|
|
after(async () => {
|
|
if (sync1) {
|
|
await sync1.stop()
|
|
}
|
|
if (sync2) {
|
|
await sync2.stop()
|
|
}
|
|
})
|
|
|
|
it('emits \'join\' event when a peer starts syncing', () => {
|
|
strictEqual(joinEventFired, true)
|
|
})
|
|
|
|
it('heads passed by the \'join\' event are the expected heads', () => {
|
|
strictEqual(receivedHeads.length, 1)
|
|
strictEqual(receivedHeads[0].payload, 'hello!')
|
|
})
|
|
|
|
it('the peerId passed by the \'join\' event is the expected peer ID', async () => {
|
|
const { id } = await ipfs2.id()
|
|
strictEqual(String(joiningPeerId), String(id))
|
|
})
|
|
|
|
it('the peerId passed by the \'leave\' event is the expected peer ID', async () => {
|
|
const { id } = await ipfs2.id()
|
|
strictEqual(String(leavingPeerId), String(id))
|
|
})
|
|
|
|
it('emits an \'error\' event', () => {
|
|
strictEqual(errorEventFired, true)
|
|
strictEqual(err, 'Error: Sync Error')
|
|
})
|
|
})
|
|
})
|