Fix timeout and timing bug in Sync

This commit is contained in:
haad 2025-05-20 09:51:11 +03:00
parent 90f84bf12f
commit 85686ba478
4 changed files with 21 additions and 26 deletions

View File

@ -135,7 +135,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
*/
events = events || new EventEmitter()
timeout = timeout || DefaultTimeout
timeout ??= DefaultTimeout
let started = false
@ -280,12 +280,12 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
*/
const startSync = async () => {
if (!started) {
// Exchange head entries with peers when connected
await libp2p.handle(headsSyncAddress, handleReceiveHeads)
pubsub.addEventListener('subscription-change', handlePeerSubscribed)
pubsub.addEventListener('message', handleUpdateMessage)
// Subscribe to the pubsub channel for this database through which updates are sent
await pubsub.subscribe(address)
// Exchange head entries with peers when connected
await libp2p.handle(headsSyncAddress, handleReceiveHeads)
// Remove disconnected peers from `peers`, as otherwise they will not resync heads on reconnection
libp2p.addEventListener('peer:disconnect', handlePeerDisconnected)
started = true

View File

@ -196,27 +196,20 @@ describe('Database - Replication', function () {
db1 = await Database({ ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1', entryStorage: storage1 })
db2 = await Database({ ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2', entryStorage: storage2 })
let connected1 = false
let connected2 = false
let connected = false
const onConnected1 = (peerId, heads) => {
connected1 = true
const onConnected = (peerId, heads) => {
connected = true
}
const onConnected2 = (peerId, heads) => {
connected2 = true
}
db1.events.on('join', onConnected1)
db2.events.on('join', onConnected2)
db2.events.on('join', onConnected)
await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' })
await db1.addOperation({ op: 'PUT', key: 2, value: 'record 2 on db 1' })
await db1.addOperation({ op: 'PUT', key: 3, value: 'record 3 on db 1' })
await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' })
await waitFor(() => connected1, () => true)
await waitFor(() => connected2, () => true)
await waitFor(() => connected, () => true)
const all1 = []
for await (const item of db1.log.iterator()) {

View File

@ -319,12 +319,12 @@ describe('KeyValueIndexed Database Replication', function () {
])
})
it('indexes deletes correctly', async () => {
it.only('indexes deletes correctly', async () => {
let replicated = false
let err
const onError = (err) => {
console.error(err)
deepStrictEqual(err, undefined)
const onError = (error) => {
err = error
}
kv1 = await KeyValueIndexed()({ ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb11' })
@ -339,11 +339,11 @@ describe('KeyValueIndexed Database Replication', function () {
kv2 = await KeyValueIndexed()({ ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb22' })
const onUpdate = (entry) => {
const onConnected = (entry) => {
replicated = true
}
kv2.events.on('update', onUpdate)
kv2.events.on('join', onConnected)
kv2.events.on('error', onError)
await waitFor(() => replicated, () => true)
@ -358,6 +358,8 @@ describe('KeyValueIndexed Database Replication', function () {
all2.push(keyValue)
}
deepStrictEqual(err, undefined)
deepStrictEqual(all2.map(e => { return { key: e.key, value: e.value } }), [
{ key: 'init', value: true },
{ key: 'hello', value: 'friend' }

View File

@ -663,7 +663,7 @@ describe('Sync protocol', function () {
let sync1, sync2
let log1, log2
const timeoutTime = 1 // 1 millisecond
const timeoutTime = 0 // 0 milliseconds
before(async () => {
[ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()])
@ -699,13 +699,13 @@ describe('Sync protocol', function () {
let err = null
const onError = (error) => {
(!err) && (err = error)
err ??= error
}
sync1 = await Sync({ ipfs: ipfs1, log: log1, timeout: timeoutTime })
sync2 = await Sync({ ipfs: ipfs2, log: log2, start: false, timeout: timeoutTime })
sync1.events.on('error', onError)
sync2 = await Sync({ ipfs: ipfs2, log: log2, start: false, timeout: timeoutTime })
sync2.events.on('error', onError)
await log1.append('hello1')
@ -716,7 +716,7 @@ describe('Sync protocol', function () {
notStrictEqual(err, null)
strictEqual(err.type, 'aborted')
strictEqual(err.message, 'Read aborted')
strictEqual(err.message.includes('aborted'), true)
})
})
})