Merge remote-tracking branch 'origin' into docs/blockstore-config

This commit is contained in:
Hayden Young 2024-04-16 15:42:51 +01:00
commit a2454064b4
7 changed files with 339 additions and 67 deletions

View File

@ -0,0 +1,60 @@
import { createOrbitDB, KeyValueIndexed } from '../src/index.js'
import { rimraf as rmrf } from 'rimraf'
import createHelia from '../test/utils/create-helia.js'
import { EventEmitter } from 'events'
EventEmitter.defaultMaxListeners = 10000
;(async () => {
console.log('Starting benchmark...')
const entryCount = 1000
await rmrf('./ipfs')
await rmrf('./orbitdb')
const ipfs = await createHelia()
const orbitdb = await createOrbitDB({ ipfs })
console.log(`Set ${entryCount} keys/values`)
const db1 = await orbitdb.open('benchmark-keyvalue-indexed', { Database: KeyValueIndexed() })
const startTime1 = new Date().getTime()
for (let i = 0; i < entryCount; i++) {
await db1.set(i.toString(), 'hello' + i)
}
const endTime1 = new Date().getTime()
const duration1 = endTime1 - startTime1
const operationsPerSecond1 = Math.floor(entryCount / (duration1 / 1000))
const millisecondsPerOp1 = duration1 / entryCount
console.log(`Setting ${entryCount} key/values took ${duration1} ms, ${operationsPerSecond1} ops/s, ${millisecondsPerOp1} ms/op`)
console.log(`Iterate ${entryCount} key/values`)
const startTime2 = new Date().getTime()
const all = []
for await (const { key, value } of db1.iterator()) {
all.unshift({ key, value })
}
const endTime2 = new Date().getTime()
const duration2 = endTime2 - startTime2
const operationsPerSecond2 = Math.floor(entryCount / (duration2 / 1000))
const millisecondsPerOp2 = duration2 / entryCount
console.log(`Iterating ${all.length} key/values took ${duration2} ms, ${operationsPerSecond2} ops/s, ${millisecondsPerOp2} ms/op`)
await db1.drop()
await db1.close()
await orbitdb.stop()
await ipfs.stop()
await rmrf('./ipfs')
await rmrf('./orbitdb')
process.exit(0)
})()

View File

@ -23,6 +23,84 @@ import pathJoin from '../utils/path-join.js'
const valueEncoding = 'json'
/**
* Defines an index for a KeyValue database.
* @param {string} [directory] A location for storing the index-related data
* @return {Index} A Index function.
*/
const Index = ({ directory } = {}) => async () => {
const index = await LevelStorage({ path: directory, valueEncoding })
const indexedEntries = await LevelStorage({ path: pathJoin(directory, '/_indexedEntries/'), valueEncoding })
const update = async (log, entry) => {
const keys = new Set()
const toBeIndexed = new Set()
const latest = entry.hash
// Function to check if a hash is in the entry index
const isIndexed = async (hash) => (await indexedEntries.get(hash)) === true
const isNotIndexed = async (hash) => !(await isIndexed(hash))
// Function to decide when the log traversal should be stopped
const shoudStopTraverse = async (entry) => {
// Go through the nexts of an entry and if any is not yet
// indexed, add it to the list of entries-to-be-indexed
for await (const hash of entry.next) {
if (await isNotIndexed(hash)) {
toBeIndexed.add(hash)
}
}
// If the latest entry and all its nexts are indexed and to-be-indexed list is empty,
// we don't have anything more to process, so return true to stop the traversal
return await isIndexed(latest) && toBeIndexed.size === 0
}
// Traverse the log and stop when everything has been processed
for await (const entry of log.traverse(null, shoudStopTraverse)) {
const { hash, payload } = entry
// If an entry is not yet indexed, process it
if (await isNotIndexed(hash)) {
const { op, key } = payload
if (op === 'PUT' && !keys.has(key)) {
keys.add(key)
await index.put(key, entry)
await indexedEntries.put(hash, true)
} else if (op === 'DEL' && !keys.has(key)) {
keys.add(key)
await index.del(key)
await indexedEntries.put(hash, true)
}
// Remove the entry (hash) from the list of to-be-indexed entries
toBeIndexed.delete(hash)
}
}
}
/**
* Closes the index and its storages.
*/
const close = async () => {
await index.close()
await indexedEntries.close()
}
/**
* Drops all records from the index and its storages.
*/
const drop = async () => {
await index.clear()
await indexedEntries.clear()
}
return {
get: index.get,
iterator: index.iterator,
update,
close,
drop
}
}
/**
* Defines a KeyValueIndexed database.
* @param {module:Storage} [storage=LevelStorage] A compatible storage where
@ -31,36 +109,15 @@ const valueEncoding = 'json'
* function.
* @memberof module:Databases
*/
const KeyValueIndexed = ({ storage } = {}) => async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => {
const indexDirectory = pathJoin(directory || './orbitdb', `./${address}/_index/`)
const index = storage || await LevelStorage({ path: indexDirectory, valueEncoding })
const KeyValueIndexed = () => async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => {
// Set up the directory for an index
directory = pathJoin(directory || './orbitdb', `./${address}/_index/`)
let latestOplogHash
// Set up the index
const index = await Index({ directory })()
const _updateIndex = async (log, entry) => {
const keys = {}
const it = await log.iterator({ gt: latestOplogHash })
for await (const entry of it) {
const { op, key, value } = entry.payload
if (op === 'PUT' && !keys[key]) {
keys[key] = true
await index.put(key, value)
} else if (op === 'DEL' && !keys[key]) {
keys[key] = true
await index.del(key)
}
}
latestOplogHash = entry ? entry.hash : null
}
// Create the underlying KeyValue database
const keyValueStore = await KeyValue()({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate: _updateIndex })
// Compute the index
await _updateIndex(keyValueStore.log)
// Set up the underlying KeyValue database
const keyValueStore = await KeyValue()({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate: index.update })
/**
* Gets a value from the store by key.
@ -71,11 +128,10 @@ const KeyValueIndexed = ({ storage } = {}) => async ({ ipfs, identity, address,
* @instance
*/
const get = async (key) => {
const value = await index.get(key)
if (value) {
return value
const entry = await index.get(key)
if (entry) {
return entry.payload.value
}
return keyValueStore.get(key)
}
/**
@ -88,8 +144,12 @@ const KeyValueIndexed = ({ storage } = {}) => async ({ ipfs, identity, address,
* @instance
*/
const iterator = async function * ({ amount } = {}) {
const it = keyValueStore.iterator({ amount })
for await (const { key, value, hash } of it) {
const it = index.iterator({ amount, reverse: true })
for await (const record of it) {
// 'index' is a LevelStorage that returns a [key, value] pair
const entry = record[1]
const { key, value } = entry.payload
const hash = entry.hash
yield { key, value, hash }
}
}
@ -98,16 +158,16 @@ const KeyValueIndexed = ({ storage } = {}) => async ({ ipfs, identity, address,
* Closes the index and underlying storage.
*/
const close = async () => {
await index.close()
await keyValueStore.close()
await index.close()
}
/**
* Drops all records from the index and underlying storage.
*/
const drop = async () => {
await index.clear()
await keyValueStore.drop()
await index.drop()
}
return {

View File

@ -76,10 +76,11 @@ const ComposedStorage = async (storage1, storage2) => {
* @memberof module:Storage.Storage-Composed
* @instance
*/
const iterator = async function * () {
const iterator = async function * ({ amount, reverse } = {}) {
const keys = []
const iteratorOptions = { amount: amount || -1, reverse: reverse || false }
for (const storage of [storage1, storage2]) {
for await (const [key, value] of storage.iterator()) {
for await (const [key, value] of storage.iterator(iteratorOptions)) {
if (!keys[key]) {
keys[key] = true
yield [key, value]

View File

@ -78,8 +78,9 @@ const LevelStorage = async ({ path, valueEncoding } = {}) => {
* @memberof module:Storage.Storage-Level
* @instance
*/
const iterator = async function * () {
for await (const [key, value] of db.iterator()) {
const iterator = async function * ({ amount, reverse } = {}) {
const iteratorOptions = { limit: amount || -1, reverse: reverse || false }
for await (const [key, value] of db.iterator(iteratorOptions)) {
yield [key, value]
}
}

View File

@ -287,7 +287,7 @@ describe('KeyValueIndexed Database', function () {
await db.put('key', 'value')
let result
for await (const [key, value] of storage.iterator()) {
for await (const { key, value } of db.iterator()) {
result = [key, value]
}

View File

@ -24,7 +24,7 @@ describe('KeyValueIndexed Database Replication', function () {
const accessController = {
canAppend: async (entry) => {
const identity = await identities.getIdentity(entry.identity)
return identity.id === testIdentity1.id
return identity.id === testIdentity1.id || identity.id === testIdentity2.id
}
}
@ -32,6 +32,12 @@ describe('KeyValueIndexed Database Replication', function () {
[ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()])
await connectPeers(ipfs1, ipfs2)
await rimraf(keysPath)
await rimraf('./orbitdb1')
await rimraf('./orbitdb2')
await rimraf('./ipfs1')
await rimraf('./ipfs2')
await copy(testKeysPath, keysPath)
keystore = await KeyStore({ path: keysPath })
identities = await Identities({ keystore })
@ -123,8 +129,8 @@ describe('KeyValueIndexed Database Replication', function () {
all2.push(keyValue)
}
deepStrictEqual(all2.map(e => { return { key: e.key, value: e.value } }), [
{ key: 'hello', value: 'friend3' },
{ key: 'init', value: true }
{ key: 'init', value: true },
{ key: 'hello', value: 'friend3' }
])
const all1 = []
@ -132,8 +138,8 @@ describe('KeyValueIndexed Database Replication', function () {
all1.push(keyValue)
}
deepStrictEqual(all1.map(e => { return { key: e.key, value: e.value } }), [
{ key: 'hello', value: 'friend3' },
{ key: 'init', value: true }
{ key: 'init', value: true },
{ key: 'hello', value: 'friend3' }
])
})
@ -196,8 +202,8 @@ describe('KeyValueIndexed Database Replication', function () {
all2.push(keyValue)
}
deepStrictEqual(all2.map(e => { return { key: e.key, value: e.value } }), [
{ key: 'hello', value: 'friend3' },
{ key: 'init', value: true }
{ key: 'init', value: true },
{ key: 'hello', value: 'friend3' }
])
const all1 = []
@ -205,8 +211,164 @@ describe('KeyValueIndexed Database Replication', function () {
all1.push(keyValue)
}
deepStrictEqual(all1.map(e => { return { key: e.key, value: e.value } }), [
{ key: 'hello', value: 'friend3' },
{ key: 'init', value: true }
{ key: 'init', value: true },
{ key: 'hello', value: 'friend3' }
])
})
it('indexes the database correctly', async () => {
let replicated1 = false
let replicated2 = false
let replicated3 = false
let expectedEntryHash1 = null
let expectedEntryHash2 = null
let expectedEntryHash3 = null
const onError = (err) => {
console.error(err)
deepStrictEqual(err, undefined)
}
const onUpdate = (entry) => {
replicated1 = expectedEntryHash1 !== null && entry.hash === expectedEntryHash1
}
kv1 = await KeyValueIndexed()({ ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1' })
kv2 = await KeyValueIndexed()({ ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
kv2.events.on('update', onUpdate)
kv2.events.on('error', onError)
kv1.events.on('error', onError)
await kv1.set('init', true)
await kv1.set('hello', 'friend')
await kv1.del('hello')
await kv1.set('hello', 'friend2')
await kv1.del('hello')
await kv1.set('empty', '')
await kv1.del('empty')
expectedEntryHash1 = await kv1.set('hello', 'friend3')
await waitFor(() => replicated1, () => true)
await kv1.close()
await kv2.set('A', 'AAA')
await kv2.set('B', 'BBB')
expectedEntryHash3 = await kv2.set('C', 'CCC')
await kv2.close()
kv1 = await KeyValueIndexed()({ ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1' })
const onUpdate3 = async (entry) => {
replicated3 = expectedEntryHash3 && entry.hash === expectedEntryHash3
}
kv1.events.on('update', onUpdate3)
kv1.events.on('error', onError)
await kv1.set('one', 1)
await kv1.set('two', 2)
await kv1.set('three', 3)
await kv1.del('three')
expectedEntryHash2 = await kv1.set('four', 4)
kv2 = await KeyValueIndexed()({ ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
const onUpdate2 = (entry) => {
replicated2 = expectedEntryHash2 && entry.hash === expectedEntryHash2
}
kv2.events.on('update', onUpdate2)
kv2.events.on('error', onError)
await waitFor(() => replicated2 && replicated3, () => true)
const all1 = []
for await (const keyValue of kv1.iterator()) {
all1.push(keyValue)
}
const all2 = []
for await (const keyValue of kv2.iterator()) {
all2.push(keyValue)
}
deepStrictEqual(all2.map(e => { return { key: e.key, value: e.value } }), [
{ key: 'two', value: 2 },
{ key: 'one', value: 1 },
{ key: 'init', value: true },
{ key: 'hello', value: 'friend3' },
{ key: 'four', value: 4 },
{ key: 'C', value: 'CCC' },
{ key: 'B', value: 'BBB' },
{ key: 'A', value: 'AAA' }
])
deepStrictEqual(all1.map(e => { return { key: e.key, value: e.value } }), [
{ key: 'two', value: 2 },
{ key: 'one', value: 1 },
{ key: 'init', value: true },
{ key: 'hello', value: 'friend3' },
{ key: 'four', value: 4 },
{ key: 'C', value: 'CCC' },
{ key: 'B', value: 'BBB' },
{ key: 'A', value: 'AAA' }
])
})
it('indexes deletes correctly', async () => {
let replicated = false
const onError = (err) => {
console.error(err)
deepStrictEqual(err, undefined)
}
kv1 = await KeyValueIndexed()({ ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb11' })
kv1.events.on('error', onError)
await kv1.set('init', true)
await kv1.set('hello', 'friend')
await kv1.del('delete')
await kv1.set('delete', 'this value')
await kv1.del('delete')
kv2 = await KeyValueIndexed()({ ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb22' })
const onUpdate = (entry) => {
replicated = true
}
kv2.events.on('update', onUpdate)
kv2.events.on('error', onError)
await waitFor(() => replicated, () => true)
const all1 = []
for await (const keyValue of kv1.iterator()) {
all1.push(keyValue)
}
const all2 = []
for await (const keyValue of kv2.iterator()) {
all2.push(keyValue)
}
deepStrictEqual(all2.map(e => { return { key: e.key, value: e.value } }), [
{ key: 'init', value: true },
{ key: 'hello', value: 'friend' }
])
deepStrictEqual(all1.map(e => { return { key: e.key, value: e.value } }), [
{ key: 'init', value: true },
{ key: 'hello', value: 'friend' }
])
await rimraf('./orbitdb11')
await rimraf('./orbitdb22')
})
})

View File

@ -2,7 +2,7 @@ import { deepStrictEqual, strictEqual, notStrictEqual } from 'assert'
import { rimraf } from 'rimraf'
import fs from 'fs'
import path from 'path'
import { createOrbitDB, isValidAddress, LevelStorage } from '../src/index.js'
import { createOrbitDB, isValidAddress } from '../src/index.js'
import KeyValueIndexed from '../src/databases/keyvalue-indexed.js'
import connectPeers from './utils/connect-nodes.js'
import waitFor from './utils/wait-for.js'
@ -419,14 +419,13 @@ describe('Open databases', function () {
})
describe('opening an indexed keyvalue database', () => {
let storage
let db, address
const amount = 10
before(async () => {
orbitdb1 = await createOrbitDB({ ipfs: ipfs1, id: 'user1' })
db = await orbitdb1.open('helloworld', { type: 'keyvalue' })
db = await orbitdb1.open('helloworld', { Database: KeyValueIndexed() })
address = db.address
for (let i = 0; i < amount; i++) {
@ -437,9 +436,6 @@ describe('Open databases', function () {
})
after(async () => {
if (storage) {
await storage.close()
}
if (db) {
await db.close()
}
@ -451,8 +447,7 @@ describe('Open databases', function () {
})
it('returns all entries in the database and in the index', async () => {
storage = await LevelStorage({ path: './index', valueEncoding: 'json' })
db = await orbitdb1.open(address, { Database: KeyValueIndexed({ storage }) })
db = await orbitdb1.open(address, { Database: KeyValueIndexed() })
strictEqual(db.address, address)
strictEqual(db.type, 'keyvalue')
@ -464,18 +459,11 @@ describe('Open databases', function () {
}
const result = []
for await (const [key, value] of storage.iterator()) {
result.push({ key, value })
for await (const { key, value } of db.iterator()) {
result.unshift({ key, value })
}
deepStrictEqual(result, expected)
const all = []
for await (const { key, value } of db.iterator()) {
all.unshift({ key, value })
}
deepStrictEqual(all, expected)
})
})