diff --git a/src/database.js b/src/database.js index b1cc2e7..50c6055 100644 --- a/src/database.js +++ b/src/database.js @@ -74,6 +74,7 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController drop, addOperation, log, + sync: syncProtocol, peers: syncProtocol.peers, events } diff --git a/src/sync.js b/src/sync.js index c2efe90..bf1b5f9 100644 --- a/src/sync.js +++ b/src/sync.js @@ -1,17 +1,13 @@ import { pipe } from 'it-pipe' import PQueue from 'p-queue' +import Path from 'path' const Sync = async ({ ipfs, log, events, sync }) => { const address = log.id + const headsSyncAddress = Path.join('/orbitdb/heads/', address) const queue = new PQueue({ concurrency: 1 }) - let peers = new Set() - const receiveHeads = async (source) => { - for await (const value of source) { - const headBytes = value.subarray() - await sync(headBytes) - } - } + let peers = new Set() const sendHeads = async (source) => { return (async function * () { @@ -22,11 +18,20 @@ const Sync = async ({ ipfs, log, events, sync }) => { })() } + const receiveHeads = (peerId) => async (source) => { + for await (const value of source) { + const headBytes = value.subarray() + await sync(headBytes) + } + const heads = await log.heads() + events.emit('join', peerId, heads) + } + const handleReceiveHeads = async ({ connection, stream }) => { - peers.add(connection.remotePeer.toString()) try { - await pipe(stream, receiveHeads, sendHeads, stream) - events.emit('join', connection.remotePeer) + const peerId = connection.remotePeer + peers.add(peerId) + await pipe(stream, receiveHeads(peerId), sendHeads, stream) } catch (e) { console.error(e) events.emit('error', e) @@ -41,14 +46,13 @@ const Sync = async ({ ipfs, log, events, sync }) => { return } if (subscription.subscribe) { - if (peers.has(peerId.toString())) { + if (peers.has(peerId)) { return } try { - peers.add(peerId.toString()) - const stream = await ipfs.libp2p.dialProtocol(peerId, '/heads' + address) - await pipe(sendHeads, stream, receiveHeads) - events.emit('join', peerId) + peers.add(peerId) + const stream = await ipfs.libp2p.dialProtocol(peerId, headsSyncAddress) + await pipe(sendHeads, stream, receiveHeads(peerId)) } catch (e) { if (e.code === 'ERR_UNSUPPORTED_PROTOCOL') { // Skip peer, they don't have this database currently @@ -59,7 +63,7 @@ const Sync = async ({ ipfs, log, events, sync }) => { } } } else { - peers.delete(peerId.toString()) + peers.delete(peerId) events.emit('leave', peerId) } } @@ -86,27 +90,32 @@ const Sync = async ({ ipfs, log, events, sync }) => { } const publish = async (entry) => { - await ipfs.pubsub.publish(address.toString(), entry.bytes) + await ipfs.pubsub.publish(address, entry.bytes) } const stop = async () => { await queue.onIdle() ipfs.libp2p.pubsub.removeEventListener('subscription-change', handlePeerSubscribed) - await ipfs.libp2p.unhandle('/heads' + address) + await ipfs.libp2p.unhandle(headsSyncAddress) await ipfs.pubsub.unsubscribe(address, handleUpdateMessage) peers = new Set() } - // Exchange head entries with peers when connected - await ipfs.libp2p.handle('/heads' + address, handleReceiveHeads) - ipfs.libp2p.pubsub.addEventListener('subscription-change', handlePeerSubscribed) + const start = async () => { + // Exchange head entries with peers when connected + await ipfs.libp2p.handle(headsSyncAddress, handleReceiveHeads) + ipfs.libp2p.pubsub.addEventListener('subscription-change', handlePeerSubscribed) + // Subscribe to the pubsub channel for this database through which updates are sent + await ipfs.pubsub.subscribe(address, handleUpdateMessage) + } - // Subscribe to the pubsub channel for this database through which updates are sent - await ipfs.pubsub.subscribe(address, handleUpdateMessage) + // Start Sync automatically + await start() return { publish, - stop + stop, + start } } diff --git a/test/orbitdb-replication.test.js b/test/orbitdb-replication.test.js index 211c2ea..24b9412 100644 --- a/test/orbitdb-replication.test.js +++ b/test/orbitdb-replication.test.js @@ -1,4 +1,4 @@ -import { strictEqual, deepStrictEqual } from 'assert' +import { deepStrictEqual } from 'assert' import rmrf from 'rimraf' import * as IPFS from 'ipfs' import { OrbitDB } from '../src/index.js' @@ -13,69 +13,72 @@ describe('Replicating databases', function () { let orbitdb1, orbitdb2 before(async () => { + await rmrf('./ipfs1') + await rmrf('./ipfs2') + await rmrf('./orbitdb1') + await rmrf('./orbitdb2') + ipfs1 = await IPFS.create({ ...config.daemon1, repo: './ipfs1' }) ipfs2 = await IPFS.create({ ...config.daemon2, repo: './ipfs2' }) + await connectPeers(ipfs1, ipfs2) + + orbitdb1 = await OrbitDB({ ipfs: ipfs1, id: 'user1', directory: './orbitdb1' }) + orbitdb2 = await OrbitDB({ ipfs: ipfs2, id: 'user2', directory: './orbitdb2' }) }) after(async () => { - if (ipfs1) { - await ipfs1.stop() - } - if (ipfs2) { - await ipfs2.stop() - } - await rmrf('./orbitdb1') - await rmrf('./orbitdb2') + await ipfs1.stop() + await ipfs2.stop() + await orbitdb1.stop() + await orbitdb2.stop() await rmrf('./ipfs1') await rmrf('./ipfs2') + await rmrf('./orbitdb1') + await rmrf('./orbitdb2') }) - describe('replicating a database', () => { + describe('replicating a database of 1', () => { + const amount = 1 + + const expected = [] + for (let i = 0; i < amount; i++) { + expected.push('hello' + i) + } + let db1, db2 - const amount = 128 + 1 // Same amount as in oplog replication test - before(async () => { - orbitdb1 = await OrbitDB({ ipfs: ipfs1, id: 'user1', directory: './orbitdb1' }) - orbitdb2 = await OrbitDB({ ipfs: ipfs2, id: 'user2', directory: './orbitdb2' }) db1 = await orbitdb1.open('helloworld') - for (let i = 0; i < amount; i++) { - await db1.add('hello' + i) + + console.log('generate') + console.time('generate') + for (let i = 0; i < expected.length; i++) { + await db1.add(expected[i]) } + console.timeEnd('generate') }) after(async () => { - if (db1) { - await db1.close() - } - if (db2) { - await db2.close() - } - if (orbitdb1) { - await orbitdb1.stop() - } - if (orbitdb2) { - await orbitdb2.stop() - } - await rmrf('./orbitdb1') - await rmrf('./orbitdb2') + await db1.drop() + await db1.close() + await db2.drop() + await db2.close() }) it('returns all entries in the replicated database', async () => { - console.time('replicate2') - let replicated = false + console.log('replicate') + console.log('sync') + console.time('replicate') + console.time('sync') - const onConnected = async (peerId) => { - const head = (await db2.log.heads())[0] + let synced = false + + const onJoin = async (peerId, heads) => { + const head = heads[0] if (head && head.clock.time === amount) { - replicated = true - } - } - - const onUpdated = (entry) => { - if (entry.clock.time === amount) { - replicated = true + console.timeEnd('sync') + synced = true } } @@ -83,42 +86,115 @@ describe('Replicating databases', function () { console.error(err) } + db2 = await orbitdb2.open(db1.address) + + db2.events.on('join', onJoin) + db2.events.on('error', onError) db1.events.on('error', onError) - db2 = await orbitdb2.open(db1.address) - db2.events.on('join', onConnected) - db2.events.on('update', onUpdated) - db2.events.on('error', onError) + await waitFor(() => synced, () => true) - await waitFor(() => replicated, () => true) - - strictEqual(db1.address, db2.address) - strictEqual(db1.name, db2.name) - strictEqual(db1.type, db2.type) - - const all2 = [] - console.time('all2') + console.time('query 1') + const eventsFromDb2 = [] for await (const event of db2.iterator()) { - all2.unshift(event) + eventsFromDb2.unshift(event) } - console.timeEnd('all2') - console.timeEnd('replicate2') + console.timeEnd('query 1') - const expected = [] - for (let i = 0; i < amount; i++) { - expected.push('hello' + i) - } + console.timeEnd('replicate') - deepStrictEqual(all2, expected) + deepStrictEqual(eventsFromDb2, expected) - const all1 = [] - console.time('all1') + console.time('query 2') + const eventsFromDb1 = [] for await (const event of db1.iterator()) { - all1.unshift(event) + eventsFromDb1.unshift(event) } - console.timeEnd('all1') + console.timeEnd('query 2') - deepStrictEqual(all1, expected) + deepStrictEqual(eventsFromDb1, expected) + + console.log('events:', amount) + }) + }) + + describe('replicating a database of 129', () => { + const amount = 128 + 1 + + const expected = [] + for (let i = 0; i < amount; i++) { + expected.push('hello' + i) + } + + let db1, db2 + + before(async () => { + db1 = await orbitdb1.open('helloworld') + + console.log('generate') + console.time('generate') + for (let i = 0; i < expected.length; i++) { + await db1.add(expected[i]) + } + console.timeEnd('generate') + }) + + after(async () => { + await db1.drop() + await db1.close() + await db2.drop() + await db2.close() + }) + + it('returns all entries in the replicated database', async () => { + console.log('replicate') + console.log('sync') + console.time('replicate') + console.time('sync') + + let synced = false + + const onJoin = async (peerId, heads) => { + const head = heads[0] + if (head && head.clock.time === amount) { + console.timeEnd('sync') + synced = true + } + } + + const onError = (err) => { + console.error(err) + } + + db2 = await orbitdb2.open(db1.address) + + db2.events.on('join', onJoin) + db2.events.on('error', onError) + db1.events.on('error', onError) + + await waitFor(() => synced, () => true) + + console.time('query 1') + const eventsFromDb2 = [] + for await (const event of db2.iterator()) { + eventsFromDb2.unshift(event) + } + console.timeEnd('query 1') + + console.timeEnd('replicate') + + deepStrictEqual(eventsFromDb2, expected) + + console.time('query 2') + const eventsFromDb1 = [] + for await (const event of db1.iterator()) { + eventsFromDb1.unshift(event) + } + console.timeEnd('query 2') + + deepStrictEqual(eventsFromDb1, expected) + + console.log('events:', amount) }) }) })