Fix a timeout/race condition when closing databases

This commit is contained in:
haad 2025-05-09 12:37:40 +02:00
parent 0295262092
commit dc150b9cc2
4 changed files with 44 additions and 41 deletions

View File

@ -138,7 +138,6 @@ const Database = async ({ ipfs, identity, address, name, access, directory, meta
return entry.hash
}
const hash = await queue.add(task)
await queue.onIdle()
return hash
}

View File

@ -262,7 +262,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
const stopSync = async () => {
if (started) {
started = false
await queue.onIdle()
await queue.clear()
pubsub.removeEventListener('subscription-change', handlePeerSubscribed)
pubsub.removeEventListener('message', handleUpdateMessage)
await libp2p.unhandle(headsSyncAddress)

View File

@ -1,4 +1,4 @@
import { strictEqual, deepStrictEqual, notEqual } from 'assert'
import { strictEqual, deepStrictEqual } from 'assert'
import { rimraf } from 'rimraf'
import { existsSync } from 'fs'
import { copy } from 'fs-extra'
@ -74,6 +74,7 @@ describe('Database', function () {
await db.addOperation(op1)
const hash = await db.addOperation(op2)
const entry = await db.log.get(hash)
const headsPath = Path.join('./orbitdb/', `${databaseId}/`, '/log/_heads/')
@ -82,10 +83,13 @@ describe('Database', function () {
await db.close()
const headsStorage = await LevelStorage({ path: headsPath })
const e = await headsStorage.get('heads')
const heads = e ? JSON.parse(e) : []
const bytes = Uint8Array.from(await headsStorage.get(hash))
notEqual(bytes.length, 0)
strictEqual(heads.length, 1)
strictEqual(heads.at(0).hash, hash)
strictEqual(heads.at(0).next.length, 1)
strictEqual(heads.at(0).next.at(0), entry.next.at(0))
await headsStorage.close()
@ -99,6 +103,7 @@ describe('Database', function () {
await db.addOperation(op1)
const hash = await db.addOperation(op2)
const entry = await db.log.get(hash)
const headsPath = Path.join('./custom-directory/', `${databaseId}/`, '/log/_heads/')
@ -108,9 +113,13 @@ describe('Database', function () {
const headsStorage = await LevelStorage({ path: headsPath })
const bytes = Uint8Array.from(await headsStorage.get(hash))
const e = await headsStorage.get('heads')
const heads = e ? JSON.parse(e) : []
notEqual(bytes.length, 0)
strictEqual(heads.length, 1)
strictEqual(heads.at(0).hash, hash)
strictEqual(heads.at(0).next.length, 1)
strictEqual(heads.at(0).next.at(0), entry.next.at(0))
await headsStorage.close()
@ -126,10 +135,15 @@ describe('Database', function () {
await db.addOperation(op1)
const hash = await db.addOperation(op2)
const entry = await db.log.get(hash)
const bytes = Uint8Array.from(await headsStorage.get(hash))
const e = await headsStorage.get('heads')
const heads = e ? JSON.parse(e) : []
notEqual(bytes.length, 0)
strictEqual(heads.length, 1)
strictEqual(heads.at(0).hash, hash)
strictEqual(heads.at(0).next.length, 1)
strictEqual(heads.at(0).next.at(0), entry.next.at(0))
await db.close()
@ -146,10 +160,15 @@ describe('Database', function () {
await db.addOperation(op1)
const hash = await db.addOperation(op2)
const entry = await db.log.get(hash)
const e = await entryStorage.get(hash)
const bytes = Uint8Array.from(e)
notEqual(bytes.length, 0)
const e = await headsStorage.get('heads')
const heads = e ? JSON.parse(e) : []
strictEqual(heads.length, 1)
strictEqual(heads.at(0).hash, hash)
strictEqual(heads.at(0).next.length, 1)
strictEqual(heads.at(0).next.at(0), entry.next.at(0))
await db.close()

View File

@ -1,7 +1,7 @@
import { strictEqual, notStrictEqual, deepStrictEqual } from 'assert'
import { rimraf } from 'rimraf'
import { copy } from 'fs-extra'
import { Log, Entry, Identities, KeyStore } from '../../src/index.js'
import { Log, Identities, KeyStore } from '../../src/index.js'
import { Clock } from '../../src/oplog/log.js'
import { MemoryStorage } from '../../src/storage/index.js'
import testKeysPath from '../fixtures/test-keys-path.js'
@ -760,7 +760,7 @@ describe('Log - Join', async function () {
await log1.storage.merge(log0.storage)
await headsStorage1.put(e0.hash, e0.bytes)
await headsStorage1.put('heads', JSON.stringify([{ hash: e0.hash, next: e0.next }]))
await log1.append('hello1')
await log1.append('hello2')
@ -863,7 +863,7 @@ describe('Log - Join', async function () {
})
describe('throws an error if verification of an entry in given entry\'s history fails', async () => {
let e1, e3
let e1
let headsStorage1, headsStorage2
before(async () => {
@ -875,23 +875,19 @@ describe('Log - Join', async function () {
e1 = await log1.append('hello1')
await log1.append('hello2')
e3 = await log1.append('hello3')
await log1.append('hello3')
})
it('throws an error if an entry doesn\'t have a payload field', async () => {
const e = Object.assign({}, e1)
delete e.payload
delete e.bytes
delete e.hash
const ee = await Entry.encode(e)
await headsStorage1.put(e1.hash, ee.bytes)
await headsStorage1.put('heads', JSON.stringify([{ hash: e1.hash, next: e1.next }]))
await log2.storage.merge(headsStorage1)
let err
try {
await log2.joinEntry(e3)
await log2.joinEntry(e)
} catch (e) {
err = e
}
@ -906,16 +902,12 @@ describe('Log - Join', async function () {
const e = Object.assign({}, e1)
delete e.key
delete e.bytes
delete e.hash
const ee = await Entry.encode(e)
await headsStorage1.put(e1.hash, ee.bytes)
await headsStorage1.put('heads', JSON.stringify([{ hash: e1.hash, next: e1.next }]))
await log2.storage.merge(headsStorage1)
let err
try {
await log2.joinEntry(e3)
await log2.joinEntry(e)
} catch (e) {
err = e
}
@ -930,16 +922,12 @@ describe('Log - Join', async function () {
const e = Object.assign({}, e1)
delete e.sig
delete e.bytes
delete e.hash
const ee = await Entry.encode(e)
await headsStorage1.put(e1.hash, ee.bytes)
await headsStorage1.put('heads', JSON.stringify([{ hash: e1.hash, next: e1.next }]))
await log2.storage.merge(headsStorage1)
let err
try {
await log2.joinEntry(e3)
await log2.joinEntry(e)
} catch (e) {
err = e
}
@ -953,22 +941,19 @@ describe('Log - Join', async function () {
it('throws an error if an entry signature doesn\'t verify', async () => {
const e = Object.assign({}, e1)
e.sig = '1234567890'
delete e.bytes
delete e.hash
const ee = await Entry.encode(e)
await headsStorage1.put(e1.hash, ee.bytes)
await headsStorage1.put('heads', JSON.stringify([{ hash: e1.hash, next: e1.next }]))
await log2.storage.merge(headsStorage1)
let err
try {
await log2.joinEntry(e3)
await log2.joinEntry(e)
} catch (e) {
err = e
}
notStrictEqual(err, undefined)
strictEqual(err.message, 'Could not validate signature for entry "zdpuAvkAJ8C46cnGdtFpcBratA5MqK7CcjqCJjjmuKuFvZir3"')
strictEqual(err.message, 'Could not validate signature for entry "zdpuAxyE4ScWLf4X6VvkhMrpDQvwdvQno1DhzY5p1U3GPHrBT"')
deepStrictEqual(await log2.all(), [])
deepStrictEqual(await log2.heads(), [])
})