From 5a73b9cdfb909466ccf0e0f380de2411d2098923 Mon Sep 17 00:00:00 2001 From: haad Date: Fri, 16 Feb 2024 16:58:06 +0100 Subject: [PATCH 1/2] Add amount and reverse iterator options to storage iterators --- src/storage/composed.js | 5 +++-- src/storage/level.js | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/storage/composed.js b/src/storage/composed.js index bacd70d..91b9b8d 100644 --- a/src/storage/composed.js +++ b/src/storage/composed.js @@ -76,10 +76,11 @@ const ComposedStorage = async (storage1, storage2) => { * @memberof module:Storage.Storage-Composed * @instance */ - const iterator = async function * () { + const iterator = async function * ({ amount, reverse } = {}) { const keys = [] + const iteratorOptions = { amount: amount || -1, reverse: reverse || false } for (const storage of [storage1, storage2]) { - for await (const [key, value] of storage.iterator()) { + for await (const [key, value] of storage.iterator(iteratorOptions)) { if (!keys[key]) { keys[key] = true yield [key, value] diff --git a/src/storage/level.js b/src/storage/level.js index 02fe9c6..a279785 100644 --- a/src/storage/level.js +++ b/src/storage/level.js @@ -78,8 +78,9 @@ const LevelStorage = async ({ path, valueEncoding } = {}) => { * @memberof module:Storage.Storage-Level * @instance */ - const iterator = async function * () { - for await (const [key, value] of db.iterator()) { + const iterator = async function * ({ amount, reverse } = {}) { + const iteratorOptions = { limit: amount || -1, reverse: reverse || false } + for await (const [key, value] of db.iterator(iteratorOptions)) { yield [key, value] } } From 40b600d9e64c29217c340a559a77ea50e921c625 Mon Sep 17 00:00:00 2001 From: haad Date: Fri, 16 Feb 2024 17:01:13 +0100 Subject: [PATCH 2/2] Fix indexing in KeyValueIndexed database --- benchmarks/orbitdb-kv-indexed.js | 60 ++++++ src/databases/keyvalue-indexed.js | 132 +++++++++---- test/databases/keyvalue-indexed.js | 2 +- .../replication/keyvalue-indexed.test.js | 180 +++++++++++++++++- test/orbitdb-open.test.js | 22 +-- 5 files changed, 333 insertions(+), 63 deletions(-) create mode 100644 benchmarks/orbitdb-kv-indexed.js diff --git a/benchmarks/orbitdb-kv-indexed.js b/benchmarks/orbitdb-kv-indexed.js new file mode 100644 index 0000000..38dc69e --- /dev/null +++ b/benchmarks/orbitdb-kv-indexed.js @@ -0,0 +1,60 @@ +import { createOrbitDB, KeyValueIndexed } from '../src/index.js' +import { rimraf as rmrf } from 'rimraf' +import createHelia from '../test/utils/create-helia.js' + +import { EventEmitter } from 'events' +EventEmitter.defaultMaxListeners = 10000 + +;(async () => { + console.log('Starting benchmark...') + + const entryCount = 1000 + + await rmrf('./ipfs') + await rmrf('./orbitdb') + + const ipfs = await createHelia() + const orbitdb = await createOrbitDB({ ipfs }) + + console.log(`Set ${entryCount} keys/values`) + + const db1 = await orbitdb.open('benchmark-keyvalue-indexed', { Database: KeyValueIndexed() }) + + const startTime1 = new Date().getTime() + + for (let i = 0; i < entryCount; i++) { + await db1.set(i.toString(), 'hello' + i) + } + + const endTime1 = new Date().getTime() + const duration1 = endTime1 - startTime1 + const operationsPerSecond1 = Math.floor(entryCount / (duration1 / 1000)) + const millisecondsPerOp1 = duration1 / entryCount + console.log(`Setting ${entryCount} key/values took ${duration1} ms, ${operationsPerSecond1} ops/s, ${millisecondsPerOp1} ms/op`) + + console.log(`Iterate ${entryCount} key/values`) + const startTime2 = new Date().getTime() + + const all = [] + for await (const { key, value } of db1.iterator()) { + all.unshift({ key, value }) + } + + const endTime2 = new Date().getTime() + const duration2 = endTime2 - startTime2 + const operationsPerSecond2 = Math.floor(entryCount / (duration2 / 1000)) + const millisecondsPerOp2 = duration2 / entryCount + + console.log(`Iterating ${all.length} key/values took ${duration2} ms, ${operationsPerSecond2} ops/s, ${millisecondsPerOp2} ms/op`) + + await db1.drop() + await db1.close() + + await orbitdb.stop() + await ipfs.stop() + + await rmrf('./ipfs') + await rmrf('./orbitdb') + + process.exit(0) +})() diff --git a/src/databases/keyvalue-indexed.js b/src/databases/keyvalue-indexed.js index a35ac4c..ceff536 100644 --- a/src/databases/keyvalue-indexed.js +++ b/src/databases/keyvalue-indexed.js @@ -23,6 +23,84 @@ import pathJoin from '../utils/path-join.js' const valueEncoding = 'json' +/** + * Defines an index for a KeyValue database. + * @param {string} [directory] A location for storing the index-related data + * @return {Index} A Index function. + */ +const Index = ({ directory } = {}) => async () => { + const index = await LevelStorage({ path: directory, valueEncoding }) + const indexedEntries = await LevelStorage({ path: pathJoin(directory, '/_indexedEntries/'), valueEncoding }) + + const update = async (log, entry) => { + const keys = new Set() + const toBeIndexed = new Set() + const latest = entry.hash + + // Function to check if a hash is in the entry index + const isIndexed = async (hash) => (await indexedEntries.get(hash)) === true + const isNotIndexed = async (hash) => !(await isIndexed(hash)) + + // Function to decide when the log traversal should be stopped + const shoudStopTraverse = async (entry) => { + // Go through the nexts of an entry and if any is not yet + // indexed, add it to the list of entries-to-be-indexed + for await (const hash of entry.next) { + if (await isNotIndexed(hash)) { + toBeIndexed.add(hash) + } + } + // If the latest entry and all its nexts are indexed and to-be-indexed list is empty, + // we don't have anything more to process, so return true to stop the traversal + return await isIndexed(latest) && toBeIndexed.size === 0 + } + + // Traverse the log and stop when everything has been processed + for await (const entry of log.traverse(null, shoudStopTraverse)) { + const { hash, payload } = entry + // If an entry is not yet indexed, process it + if (await isNotIndexed(hash)) { + const { op, key } = payload + if (op === 'PUT' && !keys.has(key)) { + keys.add(key) + await index.put(key, entry) + await indexedEntries.put(hash, true) + } else if (op === 'DEL' && !keys.has(key)) { + keys.add(key) + await index.del(key) + await indexedEntries.put(hash, true) + } + // Remove the entry (hash) from the list of to-be-indexed entries + toBeIndexed.delete(hash) + } + } + } + + /** + * Closes the index and its storages. + */ + const close = async () => { + await index.close() + await indexedEntries.close() + } + + /** + * Drops all records from the index and its storages. + */ + const drop = async () => { + await index.clear() + await indexedEntries.clear() + } + + return { + get: index.get, + iterator: index.iterator, + update, + close, + drop + } +} + /** * Defines a KeyValueIndexed database. * @param {module:Storage} [storage=LevelStorage] A compatible storage where @@ -31,36 +109,15 @@ const valueEncoding = 'json' * function. * @memberof module:Databases */ -const KeyValueIndexed = ({ storage } = {}) => async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => { - const indexDirectory = pathJoin(directory || './orbitdb', `./${address}/_index/`) - const index = storage || await LevelStorage({ path: indexDirectory, valueEncoding }) +const KeyValueIndexed = () => async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => { + // Set up the directory for an index + directory = pathJoin(directory || './orbitdb', `./${address}/_index/`) - let latestOplogHash + // Set up the index + const index = await Index({ directory })() - const _updateIndex = async (log, entry) => { - const keys = {} - const it = await log.iterator({ gt: latestOplogHash }) - - for await (const entry of it) { - const { op, key, value } = entry.payload - - if (op === 'PUT' && !keys[key]) { - keys[key] = true - await index.put(key, value) - } else if (op === 'DEL' && !keys[key]) { - keys[key] = true - await index.del(key) - } - } - - latestOplogHash = entry ? entry.hash : null - } - - // Create the underlying KeyValue database - const keyValueStore = await KeyValue()({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate: _updateIndex }) - - // Compute the index - await _updateIndex(keyValueStore.log) + // Set up the underlying KeyValue database + const keyValueStore = await KeyValue()({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate: index.update }) /** * Gets a value from the store by key. @@ -71,11 +128,10 @@ const KeyValueIndexed = ({ storage } = {}) => async ({ ipfs, identity, address, * @instance */ const get = async (key) => { - const value = await index.get(key) - if (value) { - return value + const entry = await index.get(key) + if (entry) { + return entry.payload.value } - return keyValueStore.get(key) } /** @@ -88,8 +144,12 @@ const KeyValueIndexed = ({ storage } = {}) => async ({ ipfs, identity, address, * @instance */ const iterator = async function * ({ amount } = {}) { - const it = keyValueStore.iterator({ amount }) - for await (const { key, value, hash } of it) { + const it = index.iterator({ amount, reverse: true }) + for await (const record of it) { + // 'index' is a LevelStorage that returns a [key, value] pair + const entry = record[1] + const { key, value } = entry.payload + const hash = entry.hash yield { key, value, hash } } } @@ -98,16 +158,16 @@ const KeyValueIndexed = ({ storage } = {}) => async ({ ipfs, identity, address, * Closes the index and underlying storage. */ const close = async () => { - await index.close() await keyValueStore.close() + await index.close() } /** * Drops all records from the index and underlying storage. */ const drop = async () => { - await index.clear() await keyValueStore.drop() + await index.drop() } return { diff --git a/test/databases/keyvalue-indexed.js b/test/databases/keyvalue-indexed.js index 0fbe532..3e54a11 100644 --- a/test/databases/keyvalue-indexed.js +++ b/test/databases/keyvalue-indexed.js @@ -287,7 +287,7 @@ describe('KeyValueIndexed Database', function () { await db.put('key', 'value') let result - for await (const [key, value] of storage.iterator()) { + for await (const { key, value } of db.iterator()) { result = [key, value] } diff --git a/test/databases/replication/keyvalue-indexed.test.js b/test/databases/replication/keyvalue-indexed.test.js index f29dcbb..590bf68 100644 --- a/test/databases/replication/keyvalue-indexed.test.js +++ b/test/databases/replication/keyvalue-indexed.test.js @@ -24,7 +24,7 @@ describe('KeyValueIndexed Database Replication', function () { const accessController = { canAppend: async (entry) => { const identity = await identities.getIdentity(entry.identity) - return identity.id === testIdentity1.id + return identity.id === testIdentity1.id || identity.id === testIdentity2.id } } @@ -32,6 +32,12 @@ describe('KeyValueIndexed Database Replication', function () { [ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()]) await connectPeers(ipfs1, ipfs2) + await rimraf(keysPath) + await rimraf('./orbitdb1') + await rimraf('./orbitdb2') + await rimraf('./ipfs1') + await rimraf('./ipfs2') + await copy(testKeysPath, keysPath) keystore = await KeyStore({ path: keysPath }) identities = await Identities({ keystore }) @@ -123,8 +129,8 @@ describe('KeyValueIndexed Database Replication', function () { all2.push(keyValue) } deepStrictEqual(all2.map(e => { return { key: e.key, value: e.value } }), [ - { key: 'hello', value: 'friend3' }, - { key: 'init', value: true } + { key: 'init', value: true }, + { key: 'hello', value: 'friend3' } ]) const all1 = [] @@ -132,8 +138,8 @@ describe('KeyValueIndexed Database Replication', function () { all1.push(keyValue) } deepStrictEqual(all1.map(e => { return { key: e.key, value: e.value } }), [ - { key: 'hello', value: 'friend3' }, - { key: 'init', value: true } + { key: 'init', value: true }, + { key: 'hello', value: 'friend3' } ]) }) @@ -196,8 +202,8 @@ describe('KeyValueIndexed Database Replication', function () { all2.push(keyValue) } deepStrictEqual(all2.map(e => { return { key: e.key, value: e.value } }), [ - { key: 'hello', value: 'friend3' }, - { key: 'init', value: true } + { key: 'init', value: true }, + { key: 'hello', value: 'friend3' } ]) const all1 = [] @@ -205,8 +211,164 @@ describe('KeyValueIndexed Database Replication', function () { all1.push(keyValue) } deepStrictEqual(all1.map(e => { return { key: e.key, value: e.value } }), [ - { key: 'hello', value: 'friend3' }, - { key: 'init', value: true } + { key: 'init', value: true }, + { key: 'hello', value: 'friend3' } ]) }) + + it('indexes the database correctly', async () => { + let replicated1 = false + let replicated2 = false + let replicated3 = false + let expectedEntryHash1 = null + let expectedEntryHash2 = null + let expectedEntryHash3 = null + + const onError = (err) => { + console.error(err) + deepStrictEqual(err, undefined) + } + + const onUpdate = (entry) => { + replicated1 = expectedEntryHash1 !== null && entry.hash === expectedEntryHash1 + } + + kv1 = await KeyValueIndexed()({ ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1' }) + kv2 = await KeyValueIndexed()({ ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' }) + + kv2.events.on('update', onUpdate) + + kv2.events.on('error', onError) + kv1.events.on('error', onError) + + await kv1.set('init', true) + await kv1.set('hello', 'friend') + await kv1.del('hello') + await kv1.set('hello', 'friend2') + await kv1.del('hello') + await kv1.set('empty', '') + await kv1.del('empty') + expectedEntryHash1 = await kv1.set('hello', 'friend3') + + await waitFor(() => replicated1, () => true) + + await kv1.close() + + await kv2.set('A', 'AAA') + await kv2.set('B', 'BBB') + expectedEntryHash3 = await kv2.set('C', 'CCC') + + await kv2.close() + + kv1 = await KeyValueIndexed()({ ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1' }) + + const onUpdate3 = async (entry) => { + replicated3 = expectedEntryHash3 && entry.hash === expectedEntryHash3 + } + + kv1.events.on('update', onUpdate3) + kv1.events.on('error', onError) + + await kv1.set('one', 1) + await kv1.set('two', 2) + await kv1.set('three', 3) + await kv1.del('three') + expectedEntryHash2 = await kv1.set('four', 4) + + kv2 = await KeyValueIndexed()({ ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' }) + + const onUpdate2 = (entry) => { + replicated2 = expectedEntryHash2 && entry.hash === expectedEntryHash2 + } + + kv2.events.on('update', onUpdate2) + kv2.events.on('error', onError) + + await waitFor(() => replicated2 && replicated3, () => true) + + const all1 = [] + for await (const keyValue of kv1.iterator()) { + all1.push(keyValue) + } + + const all2 = [] + for await (const keyValue of kv2.iterator()) { + all2.push(keyValue) + } + + deepStrictEqual(all2.map(e => { return { key: e.key, value: e.value } }), [ + { key: 'two', value: 2 }, + { key: 'one', value: 1 }, + { key: 'init', value: true }, + { key: 'hello', value: 'friend3' }, + { key: 'four', value: 4 }, + { key: 'C', value: 'CCC' }, + { key: 'B', value: 'BBB' }, + { key: 'A', value: 'AAA' } + ]) + + deepStrictEqual(all1.map(e => { return { key: e.key, value: e.value } }), [ + { key: 'two', value: 2 }, + { key: 'one', value: 1 }, + { key: 'init', value: true }, + { key: 'hello', value: 'friend3' }, + { key: 'four', value: 4 }, + { key: 'C', value: 'CCC' }, + { key: 'B', value: 'BBB' }, + { key: 'A', value: 'AAA' } + ]) + }) + + it('indexes deletes correctly', async () => { + let replicated = false + + const onError = (err) => { + console.error(err) + deepStrictEqual(err, undefined) + } + + kv1 = await KeyValueIndexed()({ ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb11' }) + + kv1.events.on('error', onError) + + await kv1.set('init', true) + await kv1.set('hello', 'friend') + await kv1.del('delete') + await kv1.set('delete', 'this value') + await kv1.del('delete') + + kv2 = await KeyValueIndexed()({ ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb22' }) + + const onUpdate = (entry) => { + replicated = true + } + + kv2.events.on('update', onUpdate) + kv2.events.on('error', onError) + + await waitFor(() => replicated, () => true) + + const all1 = [] + for await (const keyValue of kv1.iterator()) { + all1.push(keyValue) + } + + const all2 = [] + for await (const keyValue of kv2.iterator()) { + all2.push(keyValue) + } + + deepStrictEqual(all2.map(e => { return { key: e.key, value: e.value } }), [ + { key: 'init', value: true }, + { key: 'hello', value: 'friend' } + ]) + + deepStrictEqual(all1.map(e => { return { key: e.key, value: e.value } }), [ + { key: 'init', value: true }, + { key: 'hello', value: 'friend' } + ]) + + await rimraf('./orbitdb11') + await rimraf('./orbitdb22') + }) }) diff --git a/test/orbitdb-open.test.js b/test/orbitdb-open.test.js index 6303309..165ba29 100644 --- a/test/orbitdb-open.test.js +++ b/test/orbitdb-open.test.js @@ -2,7 +2,7 @@ import { deepStrictEqual, strictEqual, notStrictEqual } from 'assert' import { rimraf } from 'rimraf' import fs from 'fs' import path from 'path' -import { createOrbitDB, isValidAddress, LevelStorage } from '../src/index.js' +import { createOrbitDB, isValidAddress } from '../src/index.js' import KeyValueIndexed from '../src/databases/keyvalue-indexed.js' import connectPeers from './utils/connect-nodes.js' import waitFor from './utils/wait-for.js' @@ -419,14 +419,13 @@ describe('Open databases', function () { }) describe('opening an indexed keyvalue database', () => { - let storage let db, address const amount = 10 before(async () => { orbitdb1 = await createOrbitDB({ ipfs: ipfs1, id: 'user1' }) - db = await orbitdb1.open('helloworld', { type: 'keyvalue' }) + db = await orbitdb1.open('helloworld', { Database: KeyValueIndexed() }) address = db.address for (let i = 0; i < amount; i++) { @@ -437,9 +436,6 @@ describe('Open databases', function () { }) after(async () => { - if (storage) { - await storage.close() - } if (db) { await db.close() } @@ -451,8 +447,7 @@ describe('Open databases', function () { }) it('returns all entries in the database and in the index', async () => { - storage = await LevelStorage({ path: './index', valueEncoding: 'json' }) - db = await orbitdb1.open(address, { Database: KeyValueIndexed({ storage }) }) + db = await orbitdb1.open(address, { Database: KeyValueIndexed() }) strictEqual(db.address, address) strictEqual(db.type, 'keyvalue') @@ -464,18 +459,11 @@ describe('Open databases', function () { } const result = [] - for await (const [key, value] of storage.iterator()) { - result.push({ key, value }) + for await (const { key, value } of db.iterator()) { + result.unshift({ key, value }) } deepStrictEqual(result, expected) - - const all = [] - for await (const { key, value } of db.iterator()) { - all.unshift({ key, value }) - } - - deepStrictEqual(all, expected) }) })