Merge pull request #25 from orbitdb/fix/sync

Refactor names and functions in Sync
This commit is contained in:
Haad 2023-03-03 09:12:35 +02:00 committed by GitHub
commit 1adb63902b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 26 deletions

View File

@ -27,7 +27,7 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController
const addOperation = async (op) => {
const task = async () => {
const entry = await log.append(op, { pointerCount })
await syncProtocol.publish(entry)
await sync.add(entry)
events.emit('update', entry)
return entry.hash
}
@ -45,10 +45,11 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController
}
}
await queue.add(task)
await queue.onIdle()
}
const close = async () => {
await syncProtocol.stop()
await sync.stop()
await queue.onIdle()
await log.close()
events.emit('close')
@ -64,7 +65,7 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController
// Start the Sync protocol
// Sync protocol exchanges OpLog heads (latest known entries) between peers when they connect
// Sync emits 'join', 'leave' and 'error' events through the given event emitter
const syncProtocol = await Sync({ ipfs, log, events, sync: applyOperation })
const sync = await Sync({ ipfs, log, events, onSynced: applyOperation })
return {
address,
@ -74,8 +75,8 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController
drop,
addOperation,
log,
sync: syncProtocol,
peers: syncProtocol.peers,
sync,
peers: sync.peers,
events
}
}

View File

@ -2,12 +2,17 @@ import { pipe } from 'it-pipe'
import PQueue from 'p-queue'
import Path from 'path'
const Sync = async ({ ipfs, log, events, sync }) => {
const Sync = async ({ ipfs, log, events, onSynced }) => {
const address = log.id
const headsSyncAddress = Path.join('/orbitdb/heads/', address)
const queue = new PQueue({ concurrency: 1 })
let peers = new Set()
const queue = new PQueue({ concurrency: 1 })
const peers = new Set()
const onPeerJoined = async (peerId) => {
const heads = await log.heads()
events.emit('join', peerId, heads)
}
const sendHeads = async (source) => {
return (async function * () {
@ -21,19 +26,19 @@ const Sync = async ({ ipfs, log, events, sync }) => {
const receiveHeads = (peerId) => async (source) => {
for await (const value of source) {
const headBytes = value.subarray()
await sync(headBytes)
await onSynced(headBytes)
}
const heads = await log.heads()
events.emit('join', peerId, heads)
await onPeerJoined(peerId)
}
const handleReceiveHeads = async ({ connection, stream }) => {
const peerId = connection.remotePeer
try {
const peerId = connection.remotePeer
peers.add(peerId)
await pipe(stream, receiveHeads(peerId), sendHeads, stream)
} catch (e) {
console.error(e)
peers.delete(peerId)
events.emit('error', e)
}
}
@ -59,6 +64,7 @@ const Sync = async ({ ipfs, log, events, sync }) => {
console.log(e.message)
} else {
console.error(e)
peers.delete(peerId)
events.emit('error', e)
}
}
@ -78,7 +84,7 @@ const Sync = async ({ ipfs, log, events, sync }) => {
const messageHasData = (message) => message.data !== undefined
try {
if (messageIsNotFromMe(message) && messageHasData(message)) {
await sync(message.data)
await onSynced(message.data)
}
} catch (e) {
console.error(e)
@ -89,7 +95,7 @@ const Sync = async ({ ipfs, log, events, sync }) => {
await queue.add(task)
}
const publish = async (entry) => {
const add = async (entry) => {
await ipfs.pubsub.publish(address, entry.bytes)
}
@ -98,7 +104,7 @@ const Sync = async ({ ipfs, log, events, sync }) => {
ipfs.libp2p.pubsub.removeEventListener('subscription-change', handlePeerSubscribed)
await ipfs.libp2p.unhandle(headsSyncAddress)
await ipfs.pubsub.unsubscribe(address, handleUpdateMessage)
peers = new Set()
peers.clear()
}
const start = async () => {
@ -113,7 +119,7 @@ const Sync = async ({ ipfs, log, events, sync }) => {
await start()
return {
publish,
add,
stop,
start
}

View File

@ -75,13 +75,19 @@ describe('Replicating databases', function () {
let synced = false
const onJoin = async (peerId, heads) => {
const head = heads[0]
if (head && head.clock.time === amount) {
console.timeEnd('sync')
synced = true
}
// const head = heads[0]
// if (head && head.clock.time === amount) {
console.timeEnd('sync')
synced = true
// }
}
// const onUpdated = (entry) => {
// if (entry.clock.time === amount) {
// synced = true
// }
// }
const onError = (err) => {
console.error(err)
}
@ -89,6 +95,7 @@ describe('Replicating databases', function () {
db2 = await orbitdb2.open(db1.address)
db2.events.on('join', onJoin)
// db2.events.on('update', onUpdated)
db2.events.on('error', onError)
db1.events.on('error', onError)
@ -114,6 +121,15 @@ describe('Replicating databases', function () {
deepStrictEqual(eventsFromDb1, expected)
console.time('query 3')
const eventsFromDb3 = []
for await (const event of db2.iterator()) {
eventsFromDb3.unshift(event)
}
console.timeEnd('query 3')
deepStrictEqual(eventsFromDb3, expected)
console.log('events:', amount)
})
})
@ -155,13 +171,19 @@ describe('Replicating databases', function () {
let synced = false
const onJoin = async (peerId, heads) => {
const head = heads[0]
if (head && head.clock.time === amount) {
console.timeEnd('sync')
synced = true
}
// const head = heads[0]
// if (head && head.clock.time === amount) {
console.timeEnd('sync')
synced = true
// }
}
// const onUpdated = (entry) => {
// if (entry.clock.time === amount) {
// synced = true
// }
// }
const onError = (err) => {
console.error(err)
}
@ -169,6 +191,7 @@ describe('Replicating databases', function () {
db2 = await orbitdb2.open(db1.address)
db2.events.on('join', onJoin)
// db2.events.on('update', onUpdated)
db2.events.on('error', onError)
db1.events.on('error', onError)
@ -194,6 +217,15 @@ describe('Replicating databases', function () {
deepStrictEqual(eventsFromDb1, expected)
console.time('query 3')
const eventsFromDb3 = []
for await (const event of db2.iterator()) {
eventsFromDb3.unshift(event)
}
console.timeEnd('query 3')
deepStrictEqual(eventsFromDb3, expected)
console.log('events:', amount)
})
})