Merge pull request #24 from orbitdb/fix/sync

Improve sync protocol event handling
This commit is contained in:
Haad 2023-03-03 07:41:54 +02:00 committed by GitHub
commit 215da1bdea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 176 additions and 90 deletions

View File

@ -74,6 +74,7 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController
drop,
addOperation,
log,
sync: syncProtocol,
peers: syncProtocol.peers,
events
}

View File

@ -1,17 +1,13 @@
import { pipe } from 'it-pipe'
import PQueue from 'p-queue'
import Path from 'path'
const Sync = async ({ ipfs, log, events, sync }) => {
const address = log.id
const headsSyncAddress = Path.join('/orbitdb/heads/', address)
const queue = new PQueue({ concurrency: 1 })
let peers = new Set()
const receiveHeads = async (source) => {
for await (const value of source) {
const headBytes = value.subarray()
await sync(headBytes)
}
}
let peers = new Set()
const sendHeads = async (source) => {
return (async function * () {
@ -22,11 +18,20 @@ const Sync = async ({ ipfs, log, events, sync }) => {
})()
}
const receiveHeads = (peerId) => async (source) => {
for await (const value of source) {
const headBytes = value.subarray()
await sync(headBytes)
}
const heads = await log.heads()
events.emit('join', peerId, heads)
}
const handleReceiveHeads = async ({ connection, stream }) => {
peers.add(connection.remotePeer.toString())
try {
await pipe(stream, receiveHeads, sendHeads, stream)
events.emit('join', connection.remotePeer)
const peerId = connection.remotePeer
peers.add(peerId)
await pipe(stream, receiveHeads(peerId), sendHeads, stream)
} catch (e) {
console.error(e)
events.emit('error', e)
@ -41,14 +46,13 @@ const Sync = async ({ ipfs, log, events, sync }) => {
return
}
if (subscription.subscribe) {
if (peers.has(peerId.toString())) {
if (peers.has(peerId)) {
return
}
try {
peers.add(peerId.toString())
const stream = await ipfs.libp2p.dialProtocol(peerId, '/heads' + address)
await pipe(sendHeads, stream, receiveHeads)
events.emit('join', peerId)
peers.add(peerId)
const stream = await ipfs.libp2p.dialProtocol(peerId, headsSyncAddress)
await pipe(sendHeads, stream, receiveHeads(peerId))
} catch (e) {
if (e.code === 'ERR_UNSUPPORTED_PROTOCOL') {
// Skip peer, they don't have this database currently
@ -59,7 +63,7 @@ const Sync = async ({ ipfs, log, events, sync }) => {
}
}
} else {
peers.delete(peerId.toString())
peers.delete(peerId)
events.emit('leave', peerId)
}
}
@ -86,27 +90,32 @@ const Sync = async ({ ipfs, log, events, sync }) => {
}
const publish = async (entry) => {
await ipfs.pubsub.publish(address.toString(), entry.bytes)
await ipfs.pubsub.publish(address, entry.bytes)
}
const stop = async () => {
await queue.onIdle()
ipfs.libp2p.pubsub.removeEventListener('subscription-change', handlePeerSubscribed)
await ipfs.libp2p.unhandle('/heads' + address)
await ipfs.libp2p.unhandle(headsSyncAddress)
await ipfs.pubsub.unsubscribe(address, handleUpdateMessage)
peers = new Set()
}
// Exchange head entries with peers when connected
await ipfs.libp2p.handle('/heads' + address, handleReceiveHeads)
ipfs.libp2p.pubsub.addEventListener('subscription-change', handlePeerSubscribed)
const start = async () => {
// Exchange head entries with peers when connected
await ipfs.libp2p.handle(headsSyncAddress, handleReceiveHeads)
ipfs.libp2p.pubsub.addEventListener('subscription-change', handlePeerSubscribed)
// Subscribe to the pubsub channel for this database through which updates are sent
await ipfs.pubsub.subscribe(address, handleUpdateMessage)
}
// Subscribe to the pubsub channel for this database through which updates are sent
await ipfs.pubsub.subscribe(address, handleUpdateMessage)
// Start Sync automatically
await start()
return {
publish,
stop
stop,
start
}
}

View File

@ -1,4 +1,4 @@
import { strictEqual, deepStrictEqual } from 'assert'
import { deepStrictEqual } from 'assert'
import rmrf from 'rimraf'
import * as IPFS from 'ipfs'
import { OrbitDB } from '../src/index.js'
@ -13,69 +13,72 @@ describe('Replicating databases', function () {
let orbitdb1, orbitdb2
before(async () => {
await rmrf('./ipfs1')
await rmrf('./ipfs2')
await rmrf('./orbitdb1')
await rmrf('./orbitdb2')
ipfs1 = await IPFS.create({ ...config.daemon1, repo: './ipfs1' })
ipfs2 = await IPFS.create({ ...config.daemon2, repo: './ipfs2' })
await connectPeers(ipfs1, ipfs2)
orbitdb1 = await OrbitDB({ ipfs: ipfs1, id: 'user1', directory: './orbitdb1' })
orbitdb2 = await OrbitDB({ ipfs: ipfs2, id: 'user2', directory: './orbitdb2' })
})
after(async () => {
if (ipfs1) {
await ipfs1.stop()
}
if (ipfs2) {
await ipfs2.stop()
}
await rmrf('./orbitdb1')
await rmrf('./orbitdb2')
await ipfs1.stop()
await ipfs2.stop()
await orbitdb1.stop()
await orbitdb2.stop()
await rmrf('./ipfs1')
await rmrf('./ipfs2')
await rmrf('./orbitdb1')
await rmrf('./orbitdb2')
})
describe('replicating a database', () => {
describe('replicating a database of 1', () => {
const amount = 1
const expected = []
for (let i = 0; i < amount; i++) {
expected.push('hello' + i)
}
let db1, db2
const amount = 128 + 1 // Same amount as in oplog replication test
before(async () => {
orbitdb1 = await OrbitDB({ ipfs: ipfs1, id: 'user1', directory: './orbitdb1' })
orbitdb2 = await OrbitDB({ ipfs: ipfs2, id: 'user2', directory: './orbitdb2' })
db1 = await orbitdb1.open('helloworld')
for (let i = 0; i < amount; i++) {
await db1.add('hello' + i)
console.log('generate')
console.time('generate')
for (let i = 0; i < expected.length; i++) {
await db1.add(expected[i])
}
console.timeEnd('generate')
})
after(async () => {
if (db1) {
await db1.close()
}
if (db2) {
await db2.close()
}
if (orbitdb1) {
await orbitdb1.stop()
}
if (orbitdb2) {
await orbitdb2.stop()
}
await rmrf('./orbitdb1')
await rmrf('./orbitdb2')
await db1.drop()
await db1.close()
await db2.drop()
await db2.close()
})
it('returns all entries in the replicated database', async () => {
console.time('replicate2')
let replicated = false
console.log('replicate')
console.log('sync')
console.time('replicate')
console.time('sync')
const onConnected = async (peerId) => {
const head = (await db2.log.heads())[0]
let synced = false
const onJoin = async (peerId, heads) => {
const head = heads[0]
if (head && head.clock.time === amount) {
replicated = true
}
}
const onUpdated = (entry) => {
if (entry.clock.time === amount) {
replicated = true
console.timeEnd('sync')
synced = true
}
}
@ -83,42 +86,115 @@ describe('Replicating databases', function () {
console.error(err)
}
db2 = await orbitdb2.open(db1.address)
db2.events.on('join', onJoin)
db2.events.on('error', onError)
db1.events.on('error', onError)
db2 = await orbitdb2.open(db1.address)
db2.events.on('join', onConnected)
db2.events.on('update', onUpdated)
db2.events.on('error', onError)
await waitFor(() => synced, () => true)
await waitFor(() => replicated, () => true)
strictEqual(db1.address, db2.address)
strictEqual(db1.name, db2.name)
strictEqual(db1.type, db2.type)
const all2 = []
console.time('all2')
console.time('query 1')
const eventsFromDb2 = []
for await (const event of db2.iterator()) {
all2.unshift(event)
eventsFromDb2.unshift(event)
}
console.timeEnd('all2')
console.timeEnd('replicate2')
console.timeEnd('query 1')
const expected = []
for (let i = 0; i < amount; i++) {
expected.push('hello' + i)
}
console.timeEnd('replicate')
deepStrictEqual(all2, expected)
deepStrictEqual(eventsFromDb2, expected)
const all1 = []
console.time('all1')
console.time('query 2')
const eventsFromDb1 = []
for await (const event of db1.iterator()) {
all1.unshift(event)
eventsFromDb1.unshift(event)
}
console.timeEnd('all1')
console.timeEnd('query 2')
deepStrictEqual(all1, expected)
deepStrictEqual(eventsFromDb1, expected)
console.log('events:', amount)
})
})
describe('replicating a database of 129', () => {
const amount = 128 + 1
const expected = []
for (let i = 0; i < amount; i++) {
expected.push('hello' + i)
}
let db1, db2
before(async () => {
db1 = await orbitdb1.open('helloworld')
console.log('generate')
console.time('generate')
for (let i = 0; i < expected.length; i++) {
await db1.add(expected[i])
}
console.timeEnd('generate')
})
after(async () => {
await db1.drop()
await db1.close()
await db2.drop()
await db2.close()
})
it('returns all entries in the replicated database', async () => {
console.log('replicate')
console.log('sync')
console.time('replicate')
console.time('sync')
let synced = false
const onJoin = async (peerId, heads) => {
const head = heads[0]
if (head && head.clock.time === amount) {
console.timeEnd('sync')
synced = true
}
}
const onError = (err) => {
console.error(err)
}
db2 = await orbitdb2.open(db1.address)
db2.events.on('join', onJoin)
db2.events.on('error', onError)
db1.events.on('error', onError)
await waitFor(() => synced, () => true)
console.time('query 1')
const eventsFromDb2 = []
for await (const event of db2.iterator()) {
eventsFromDb2.unshift(event)
}
console.timeEnd('query 1')
console.timeEnd('replicate')
deepStrictEqual(eventsFromDb2, expected)
console.time('query 2')
const eventsFromDb1 = []
for await (const event of db1.iterator()) {
eventsFromDb1.unshift(event)
}
console.timeEnd('query 2')
deepStrictEqual(eventsFromDb1, expected)
console.log('events:', amount)
})
})
})