Change how heads are stored, add migrations

This commit is contained in:
haad 2024-04-17 14:52:11 +03:00
parent 4fccfda975
commit 7599b99051
34 changed files with 200 additions and 93 deletions

View File

@ -1,68 +1,86 @@
import { createOrbitDB } from '../src/index.js'
// import { createOrbitDB, MemoryStorage } from '../src/index.js'
import { rimraf as rmrf } from 'rimraf'
import createHelia from '../test/utils/create-helia.js'
// import { MemoryStorage, LevelStorage, LRUStorage } from '../src/storage/index.js'
import { rimraf as rmrf } from 'rimraf'
import { EventEmitter } from 'events'
EventEmitter.defaultMaxListeners = 10000
let db
let interval
// Metrics
let totalQueries = 0
let seconds = 0
let queriesPerSecond = 0
let lastTenSeconds = 0
// Settings
const benchmarkDuration = 20 // seconds
const queryLoop = async () => {
const doc = { _id: 'id-' + totalQueries, content: 'hello ' + totalQueries }
// await db.put(totalQueries.toString(), { referencesCount: 0 })
await db.put(doc)
totalQueries++
lastTenSeconds++
queriesPerSecond++
if (interval) {
setImmediate(queryLoop)
}
}
;(async () => {
console.log('Starting benchmark...')
const entryCount = 1000
console.log('Benchmark duration is ' + benchmarkDuration + ' seconds')
await rmrf('./ipfs')
await rmrf('./orbitdb')
// const identities = await Identities()
// const testIdentity = await identities.createIdentity({ id: 'userA' })
const ipfs = await createHelia()
const orbitdb = await createOrbitDB({ ipfs })
console.log(`Insert ${entryCount} documents`)
// MemoryStorage is the default storage for Log but defining them here
// in case we want to benchmark different storage modules
// const entryStorage = await MemoryStorage()
// const headsStorage = await MemoryStorage()
// const indexStorage = await MemoryStorage()
// Test LRUStorage
// const entryStorage = await LRUStorage()
// const headsStorage = await LRUStorage()
// const indexStorage = await LRUStorage()
// Test LevelStorage
// const entryStorage = await LevelStorage({ path: './logA/entries' })
// const headsStorage = await LevelStorage({ path: './orbitdb/benchmark-documents-2/heads', valueEncoding: 'json' })
// const headsStorage = await LevelStorage({ path: './orbitdb/benchmark-documents-2/heads' })
// const indexStorage = await LevelStorage({ path: './logA/index' })
// const db1 = await orbitdb.open('benchmark-documents', { type: 'documents', referencesCount: 16, entryStorage, headsStorage, indexStorage })
// db = await orbitdb.open('benchmark-documents-2', { type: 'documents', entryStorage, headsStorage, indexStorage })
db = await orbitdb.open('benchmark-documents-2', { type: 'documents' })
const db1 = await orbitdb.open('benchmark-documents', { type: 'documents' })
// Output metrics at 1 second interval
interval = setInterval(async () => {
seconds++
console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`)
queriesPerSecond = 0
const startTime1 = new Date().getTime()
for (let i = 0; i < entryCount; i++) {
const doc = { _id: i.toString(), message: 'hello ' + i }
await db1.put(doc)
if (seconds % 10 === 0) {
console.log(`--> Average of ${lastTenSeconds / 10} q/s in the last 10 seconds`)
if (lastTenSeconds === 0) throw new Error('Problems!')
lastTenSeconds = 0
}
const endTime1 = new Date().getTime()
const duration1 = endTime1 - startTime1
const operationsPerSecond1 = Math.floor(entryCount / (duration1 / 1000))
const millisecondsPerOp1 = duration1 / entryCount
console.log(`Inserting ${entryCount} documents took ${duration1} ms, ${operationsPerSecond1} ops/s, ${millisecondsPerOp1} ms/op`)
console.log(`Query ${entryCount} documents`)
const startTime2 = new Date().getTime()
const all = []
for await (const { key, value } of db1.iterator()) {
all.unshift({ key, value })
}
const endTime2 = new Date().getTime()
const duration2 = endTime2 - startTime2
const operationsPerSecond2 = Math.floor(entryCount / (duration2 / 1000))
const millisecondsPerOp2 = duration2 / entryCount
console.log(`Querying ${all.length} documents took ${duration2} ms, ${operationsPerSecond2} ops/s, ${millisecondsPerOp2} ms/op`)
await db1.drop()
await db1.close()
if (seconds >= benchmarkDuration) {
clearInterval(interval)
interval = null
process.nextTick(async () => {
await db.close()
await orbitdb.stop()
await ipfs.stop()
await rmrf('./ipfs')
await rmrf('./orbitdb')
process.exit(0)
}, 1000)
}
}, 1000)
setImmediate(queryLoop)
})()

View File

@ -9,19 +9,13 @@ import MemoryStorage from '../storage/memory.js'
const DefaultStorage = MemoryStorage
const Heads = async ({ storage, heads }) => {
const Heads = async ({ storage, heads, entryStorage }) => {
storage = storage || await DefaultStorage()
const put = async (heads) => {
heads = findHeads(heads)
for (const head of heads) {
await storage.put(head.hash, head.bytes)
}
}
const set = async (heads) => {
await storage.clear()
await put(heads)
const headHashes = heads.map(e => e.hash)
await storage.put('heads', JSON.stringify(headHashes))
}
const add = async (head) => {
@ -30,7 +24,7 @@ const Heads = async ({ storage, heads }) => {
return
}
const newHeads = findHeads([...currentHeads, head])
await set(newHeads)
await put(newHeads)
return newHeads
}
@ -38,16 +32,20 @@ const Heads = async ({ storage, heads }) => {
const remove = async (hash) => {
const currentHeads = await all()
const newHeads = currentHeads.filter(e => e.hash !== hash)
await set(newHeads)
await put(newHeads)
}
const iterator = async function * () {
const it = storage.iterator()
for await (const [, bytes] of it) {
const head = await Entry.decode(bytes)
const e = await storage.get('heads')
const headHashes = e ? JSON.parse(e) : []
for (const hash of headHashes) {
const entry = await entryStorage.get(hash)
if (entry) {
const head = await Entry.decode(entry)
yield head
}
}
}
const all = async () => {
const values = []
@ -66,11 +64,32 @@ const Heads = async ({ storage, heads }) => {
}
// Initialize the heads if given as parameter
await put(heads || [])
if (heads) {
await put(heads)
}
// Migrate from 2.4.3 -> 2.5.0
const migrate1 = async () => {
const it_ = storage.iterator()
const values = []
for await (const [hash] of it_) {
if (hash !== 'heads') {
values.push(hash)
}
}
if (values.length > 0) {
console.log('Migrate pre v2.5.0 heads database')
console.log('Heads:', values)
await storage.clear()
await storage.put('heads', JSON.stringify(values))
}
}
await migrate1()
return {
put,
set,
set: put,
add,
remove,
iterator,

View File

@ -79,7 +79,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
// Heads storage
headsStorage = headsStorage || await DefaultStorage()
// Add heads to the state storage, ie. init the log state
const _heads = await Heads({ storage: headsStorage, heads: logHeads })
const _heads = await Heads({ storage: headsStorage, heads: logHeads, entryStorage: _entries })
// Conflict-resolution sorting function
sortFn = NoZeroes(sortFn || LastWriteWins)
// Internal queues for processing appends and joins in their call-order
@ -184,10 +184,10 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`)
}
// The appended entry is now the latest head
await _heads.set([entry])
// Add entry to the entry storage
await _entries.put(entry.hash, entry.bytes)
// The appended entry is now the latest head
await _heads.set([entry])
// Add entry to the entry index
await _index.put(entry.hash, true)
// Return the appended entry
@ -315,7 +315,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
/* 6. Add new entry to entries (for pinning) */
await _entries.put(entry.hash, entry.bytes)
/* 6. Add the new entry to heads (=union with current heads) */
/* 7. Add the new entry to heads (=union with current heads) */
await _heads.add(entry)
return true

View File

@ -9,7 +9,7 @@ import { base58btc } from 'multiformats/bases/base58'
import { TimeoutController } from 'timeout-abort-controller'
import drain from 'it-drain'
const DefaultTimeout = 30000 // 30 seconds
const DefaultTimeout = 10000 // 30 seconds
/**
* Creates an instance of IPFSBlockStorage.
@ -28,6 +28,8 @@ const DefaultTimeout = 30000 // 30 seconds
const IPFSBlockStorage = async ({ ipfs, pin, timeout } = {}) => {
if (!ipfs) throw new Error('An instance of ipfs is required.')
const signals = new Set()
/**
* Puts data to an IPFS block.
* @function
@ -59,7 +61,9 @@ const IPFSBlockStorage = async ({ ipfs, pin, timeout } = {}) => {
const get = async (hash) => {
const cid = CID.parse(hash, base58btc)
const { signal } = new TimeoutController(timeout || DefaultTimeout)
signals.add(signal)
const block = await ipfs.blockstore.get(cid, { signal })
signals.delete(signal)
if (block) {
return block
}
@ -71,7 +75,12 @@ const IPFSBlockStorage = async ({ ipfs, pin, timeout } = {}) => {
const clear = async () => {}
const close = async () => {}
const close = async () => {
for (const s in signals) {
s.abort()
}
signals.clear()
}
return {
put,

View File

@ -228,7 +228,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
}
if (topic === address) {
queue.add(task)
await queue.add(task)
}
}

View File

@ -30,7 +30,7 @@ describe('Database - Replication', function () {
}
beforeEach(async () => {
[ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()])
[ipfs1, ipfs2] = await Promise.all([createHelia({ directory: './ipfs1' }), createHelia({ directory: './ipfs2' })])
await connectPeers(ipfs1, ipfs2)
await copy(testKeysPath, keysPath)
@ -42,23 +42,21 @@ describe('Database - Replication', function () {
afterEach(async () => {
if (db1) {
await db1.drop()
await db1.close()
await rimraf('./orbitdb1')
}
if (db2) {
await db2.drop()
await db2.close()
await rimraf('./orbitdb2')
}
if (ipfs1) {
await ipfs1.blockstore.child.child.child.close()
await ipfs1.stop()
}
if (ipfs2) {
await ipfs2.blockstore.child.child.child.close()
await ipfs2.stop()
}

View File

@ -77,9 +77,9 @@ describe('Database', function () {
await db.close()
const headsStorage = await LevelStorage({ path: headsPath })
const headsStorage = await LevelStorage({ path: headsPath, valueEncoding: 'json' })
deepStrictEqual((await Entry.decode(await headsStorage.get(hash))).payload, op)
deepStrictEqual(await headsStorage.get('heads'), [hash])
await headsStorage.close()
@ -97,9 +97,9 @@ describe('Database', function () {
await db.close()
const headsStorage = await LevelStorage({ path: headsPath })
const headsStorage = await LevelStorage({ path: headsPath, valueEncoding: 'json' })
deepStrictEqual((await Entry.decode(await headsStorage.get(hash))).payload, op)
deepStrictEqual(await headsStorage.get('heads'), [hash])
await headsStorage.close()
@ -113,7 +113,7 @@ describe('Database', function () {
const op = { op: 'PUT', key: 1, value: 'record 1 on db 1' }
const hash = await db.addOperation(op)
deepStrictEqual((await Entry.decode(await headsStorage.get(hash))).payload, op)
deepStrictEqual(JSON.parse(await headsStorage.get('heads')), [hash])
await db.close()
})

View File

@ -30,7 +30,7 @@ describe('Documents Database Replication', function () {
}
before(async () => {
[ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()])
[ipfs1, ipfs2] = await Promise.all([createHelia({ directory: './ipfs1' }), createHelia({ directory: './ipfs2' })])
await connectPeers(ipfs1, ipfs2)
await copy(testKeysPath, keysPath)
@ -42,10 +42,12 @@ describe('Documents Database Replication', function () {
after(async () => {
if (ipfs1) {
await ipfs1.blockstore.child.child.child.close()
await ipfs1.stop()
}
if (ipfs2) {
await ipfs2.blockstore.child.child.child.close()
await ipfs2.stop()
}

View File

@ -40,7 +40,7 @@ describe('Events Database Replication', function () {
]
before(async () => {
[ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()])
[ipfs1, ipfs2] = await Promise.all([createHelia({ directory: './ipfs1' }), createHelia({ directory: './ipfs2' })])
await connectPeers(ipfs1, ipfs2)
await copy(testKeysPath, keysPath)
@ -52,10 +52,12 @@ describe('Events Database Replication', function () {
after(async () => {
if (ipfs1) {
await ipfs1.blockstore.child.child.child.close()
await ipfs1.stop()
}
if (ipfs2) {
await ipfs2.blockstore.child.child.child.close()
await ipfs2.stop()
}

View File

@ -29,7 +29,7 @@ describe('KeyValueIndexed Database Replication', function () {
}
before(async () => {
[ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()])
[ipfs1, ipfs2] = await Promise.all([createHelia({ directory: './ipfs1' }), createHelia({ directory: './ipfs2' })])
await connectPeers(ipfs1, ipfs2)
await rimraf(keysPath)
@ -47,10 +47,12 @@ describe('KeyValueIndexed Database Replication', function () {
after(async () => {
if (ipfs1) {
await ipfs1.blockstore.child.child.child.close()
await ipfs1.stop()
}
if (ipfs2) {
await ipfs2.blockstore.child.child.child.close()
await ipfs2.stop()
}

View File

@ -29,7 +29,7 @@ describe('KeyValue Database Replication', function () {
}
before(async () => {
[ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()])
[ipfs1, ipfs2] = await Promise.all([createHelia({ directory: './ipfs1' }), createHelia({ directory: './ipfs2' })])
await connectPeers(ipfs1, ipfs2)
await copy(testKeysPath, keysPath)
@ -41,10 +41,12 @@ describe('KeyValue Database Replication', function () {
after(async () => {
if (ipfs1) {
await ipfs1.blockstore.child.child.child.close()
await ipfs1.stop()
}
if (ipfs2) {
await ipfs2.blockstore.child.child.child.close()
await ipfs2.stop()
}

Binary file not shown.

View File

@ -0,0 +1 @@
MANIFEST-000002

View File

View File

@ -0,0 +1 @@
2024/11/29-11:58:49.407658 172883000 Delete type=3 #1

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
MANIFEST-000002

View File

View File

@ -0,0 +1 @@
2024/11/29-11:58:49.433589 172883000 Delete type=3 #1

Binary file not shown.

View File

@ -0,0 +1 @@
2024/11/29-11:58:49.466751 172077000 Delete type=3 #1

View File

@ -0,0 +1 @@
2024/11/29-11:58:49.467617 17389b000 Delete type=3 #1

View File

@ -0,0 +1,52 @@
import { deepStrictEqual } from 'assert'
import { copy } from 'fs-extra'
import { rimraf } from 'rimraf'
import { createOrbitDB } from '../src/index.js'
import createHelia from './utils/create-helia.js'
describe('Migrations', function () {
this.timeout(10000)
let ipfs
let orbitdb
const fixturesPath = './test/fixtures/pre-2.5.0'
const testDir = './test-heads-migration'
before(async () => {
await copy(fixturesPath, testDir)
ipfs = await createHelia({ directory: `${testDir}/ipfs` })
orbitdb = await createOrbitDB({ ipfs, id: 'user1', directory: `${testDir}/orbitdb` })
})
after(async () => {
await orbitdb.stop()
await ipfs.blockstore.child.child.child.close()
await ipfs.stop()
await rimraf(testDir)
})
it('migrates the heads database from pre 2.5.0 to 2.5.0 format', async () => {
const db = await orbitdb.open('/orbitdb/zdpuAoE5P3f5zsPGkNDVgK4XF61oyE5c5JY6Yz5d74oWFCYES')
const res = []
for await (const event of db.iterator()) {
res.unshift(event)
}
deepStrictEqual(res.length, 129)
})
it('can read the database after migration to 2.5.0 format', async () => {
const db = await orbitdb.open('/orbitdb/zdpuAoE5P3f5zsPGkNDVgK4XF61oyE5c5JY6Yz5d74oWFCYES')
const res = []
for await (const event of db.iterator()) {
res.unshift(event)
}
deepStrictEqual(res.length, 129)
})
})

View File

@ -760,7 +760,7 @@ describe('Log - Join', async function () {
await log1.storage.merge(log0.storage)
await headsStorage1.put(e0.hash, e0.bytes)
await headsStorage1.put('heads', JSON.stringify([e0.hash]))
await log1.append('hello1')
await log1.append('hello2')

View File

@ -14,8 +14,7 @@ describe('Replicating databases', function () {
let orbitdb1, orbitdb2
before(async () => {
ipfs1 = await createHelia({ directory: './ipfs1' })
ipfs2 = await createHelia({ directory: './ipfs2' })
[ipfs1, ipfs2] = await Promise.all([createHelia({ directory: './ipfs1' }), createHelia({ directory: './ipfs2' })])
await connectPeers(ipfs1, ipfs2)
orbitdb1 = await createOrbitDB({ ipfs: ipfs1, id: 'user1', directory: './orbitdb1' })
@ -138,10 +137,6 @@ describe('Replicating databases', function () {
await orbitdb1.stop()
await orbitdb2.stop()
// TODO: Strange issue with ClassicLevel. Causes subsequent Helia
// instantiations to error with db closed. Explicitly closing the
// nested ClassicLevel db seems to resolve the issue. Requires further
// investigation.
await ipfs1.blockstore.child.child.child.close()
await ipfs2.blockstore.child.child.child.close()
await ipfs1.stop()
@ -153,10 +148,8 @@ describe('Replicating databases', function () {
await connectPeers(ipfs1, ipfs2)
orbitdb1 = await createOrbitDB({ ipfs: ipfs1, id: 'user1', directory: './orbitdb1' })
orbitdb2 = await createOrbitDB({ ipfs: ipfs2, id: 'user2', directory: './orbitdb2' })
db1 = await orbitdb1.open('helloworld', { referencesCount: 0 })
db2 = await orbitdb2.open(db1.address)
console.time('query 2')
const eventsFromDb1 = []
@ -180,6 +173,8 @@ describe('Replicating databases', function () {
replicated = true
}
orbitdb2 = await createOrbitDB({ ipfs: ipfs2, id: 'user2', directory: './orbitdb2' })
const db2 = await orbitdb2.open(db1.address)
db2.events.on('join', onJoin)