From 4c31514a1bf8bc58c9315352812a20f65ba11c72 Mon Sep 17 00:00:00 2001 From: haad Date: Fri, 3 Mar 2023 08:29:24 +0200 Subject: [PATCH] Refactor names and functions in Sync --- src/database.js | 11 ++++--- src/sync.js | 28 ++++++++++------- test/orbitdb-replication.test.js | 52 ++++++++++++++++++++++++++------ 3 files changed, 65 insertions(+), 26 deletions(-) diff --git a/src/database.js b/src/database.js index 50c6055..bcb7ad6 100644 --- a/src/database.js +++ b/src/database.js @@ -27,7 +27,7 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController const addOperation = async (op) => { const task = async () => { const entry = await log.append(op, { pointerCount }) - await syncProtocol.publish(entry) + await sync.add(entry) events.emit('update', entry) return entry.hash } @@ -45,10 +45,11 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController } } await queue.add(task) + await queue.onIdle() } const close = async () => { - await syncProtocol.stop() + await sync.stop() await queue.onIdle() await log.close() events.emit('close') @@ -64,7 +65,7 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController // Start the Sync protocol // Sync protocol exchanges OpLog heads (latest known entries) between peers when they connect // Sync emits 'join', 'leave' and 'error' events through the given event emitter - const syncProtocol = await Sync({ ipfs, log, events, sync: applyOperation }) + const sync = await Sync({ ipfs, log, events, onSynced: applyOperation }) return { address, @@ -74,8 +75,8 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController drop, addOperation, log, - sync: syncProtocol, - peers: syncProtocol.peers, + sync, + peers: sync.peers, events } } diff --git a/src/sync.js b/src/sync.js index bf1b5f9..d75d56b 100644 --- a/src/sync.js +++ b/src/sync.js @@ -2,12 +2,17 @@ import { pipe } from 'it-pipe' import PQueue from 'p-queue' import Path from 'path' -const Sync = async ({ ipfs, log, events, sync }) => { +const Sync = async ({ ipfs, log, events, onSynced }) => { const address = log.id const headsSyncAddress = Path.join('/orbitdb/heads/', address) - const queue = new PQueue({ concurrency: 1 }) - let peers = new Set() + const queue = new PQueue({ concurrency: 1 }) + const peers = new Set() + + const onPeerJoined = async (peerId) => { + const heads = await log.heads() + events.emit('join', peerId, heads) + } const sendHeads = async (source) => { return (async function * () { @@ -21,19 +26,19 @@ const Sync = async ({ ipfs, log, events, sync }) => { const receiveHeads = (peerId) => async (source) => { for await (const value of source) { const headBytes = value.subarray() - await sync(headBytes) + await onSynced(headBytes) } - const heads = await log.heads() - events.emit('join', peerId, heads) + await onPeerJoined(peerId) } const handleReceiveHeads = async ({ connection, stream }) => { + const peerId = connection.remotePeer try { - const peerId = connection.remotePeer peers.add(peerId) await pipe(stream, receiveHeads(peerId), sendHeads, stream) } catch (e) { console.error(e) + peers.delete(peerId) events.emit('error', e) } } @@ -59,6 +64,7 @@ const Sync = async ({ ipfs, log, events, sync }) => { console.log(e.message) } else { console.error(e) + peers.delete(peerId) events.emit('error', e) } } @@ -78,7 +84,7 @@ const Sync = async ({ ipfs, log, events, sync }) => { const messageHasData = (message) => message.data !== undefined try { if (messageIsNotFromMe(message) && messageHasData(message)) { - await sync(message.data) + await onSynced(message.data) } } catch (e) { console.error(e) @@ -89,7 +95,7 @@ const Sync = async ({ ipfs, log, events, sync }) => { await queue.add(task) } - const publish = async (entry) => { + const add = async (entry) => { await ipfs.pubsub.publish(address, entry.bytes) } @@ -98,7 +104,7 @@ const Sync = async ({ ipfs, log, events, sync }) => { ipfs.libp2p.pubsub.removeEventListener('subscription-change', handlePeerSubscribed) await ipfs.libp2p.unhandle(headsSyncAddress) await ipfs.pubsub.unsubscribe(address, handleUpdateMessage) - peers = new Set() + peers.clear() } const start = async () => { @@ -113,7 +119,7 @@ const Sync = async ({ ipfs, log, events, sync }) => { await start() return { - publish, + add, stop, start } diff --git a/test/orbitdb-replication.test.js b/test/orbitdb-replication.test.js index 24b9412..71d5a83 100644 --- a/test/orbitdb-replication.test.js +++ b/test/orbitdb-replication.test.js @@ -75,13 +75,19 @@ describe('Replicating databases', function () { let synced = false const onJoin = async (peerId, heads) => { - const head = heads[0] - if (head && head.clock.time === amount) { - console.timeEnd('sync') - synced = true - } + // const head = heads[0] + // if (head && head.clock.time === amount) { + console.timeEnd('sync') + synced = true + // } } + // const onUpdated = (entry) => { + // if (entry.clock.time === amount) { + // synced = true + // } + // } + const onError = (err) => { console.error(err) } @@ -89,6 +95,7 @@ describe('Replicating databases', function () { db2 = await orbitdb2.open(db1.address) db2.events.on('join', onJoin) + // db2.events.on('update', onUpdated) db2.events.on('error', onError) db1.events.on('error', onError) @@ -114,6 +121,15 @@ describe('Replicating databases', function () { deepStrictEqual(eventsFromDb1, expected) + console.time('query 3') + const eventsFromDb3 = [] + for await (const event of db2.iterator()) { + eventsFromDb3.unshift(event) + } + console.timeEnd('query 3') + + deepStrictEqual(eventsFromDb3, expected) + console.log('events:', amount) }) }) @@ -155,13 +171,19 @@ describe('Replicating databases', function () { let synced = false const onJoin = async (peerId, heads) => { - const head = heads[0] - if (head && head.clock.time === amount) { - console.timeEnd('sync') - synced = true - } + // const head = heads[0] + // if (head && head.clock.time === amount) { + console.timeEnd('sync') + synced = true + // } } + // const onUpdated = (entry) => { + // if (entry.clock.time === amount) { + // synced = true + // } + // } + const onError = (err) => { console.error(err) } @@ -169,6 +191,7 @@ describe('Replicating databases', function () { db2 = await orbitdb2.open(db1.address) db2.events.on('join', onJoin) + // db2.events.on('update', onUpdated) db2.events.on('error', onError) db1.events.on('error', onError) @@ -194,6 +217,15 @@ describe('Replicating databases', function () { deepStrictEqual(eventsFromDb1, expected) + console.time('query 3') + const eventsFromDb3 = [] + for await (const event of db2.iterator()) { + eventsFromDb3.unshift(event) + } + console.timeEnd('query 3') + + deepStrictEqual(eventsFromDb3, expected) + console.log('events:', amount) }) })