From 1fe62104c63070ac4d1c5ed795cbaeaf86f302b5 Mon Sep 17 00:00:00 2001 From: Hayden Young Date: Fri, 24 Feb 2023 02:51:56 +0000 Subject: [PATCH] refactor: queue writes to index. --- src/db/keyvalue-persisted.js | 20 +++- test/db/keyvalue-persisted.js | 188 ++++++++++++++++++++++++++++++++++ test/db/keyvalue.test.js | 40 ++++---- 3 files changed, 225 insertions(+), 23 deletions(-) create mode 100644 test/db/keyvalue-persisted.js diff --git a/src/db/keyvalue-persisted.js b/src/db/keyvalue-persisted.js index deaa629..ac173b0 100644 --- a/src/db/keyvalue-persisted.js +++ b/src/db/keyvalue-persisted.js @@ -1,4 +1,5 @@ import { Level } from 'level' +import PQueue from 'p-queue' const valueEncoding = 'json' @@ -6,6 +7,8 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da const keyValueStore = await KeyValue({ OpLog, Database, ipfs, identity, databaseId, accessController, storage }) const { events, log } = keyValueStore + const queue = new PQueue({ concurrency: 1 }) + const path = `./${identity.id}/${databaseId}/_index` const index = new Level(path, { valueEncoding }) await index.open() @@ -14,8 +17,10 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da const updateIndex = (index) => async (entry) => { const keys = {} + for await (const entry of log.iterator({ gt: latestOplogHash })) { const { op, key, value } = entry.payload + if (op === 'PUT' && !keys[key]) { keys[key] = true await index.put(key, value) @@ -28,6 +33,8 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da } const get = async (key) => { + await queue.onIdle() + try { const value = await index.get(key) if (value) { @@ -36,32 +43,39 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da } catch (e) { // LEVEL_NOT_FOUND (ie. key not found) } + return keyValueStore.get(key) } const iterator = async function * () { + await queue.onIdle() for await (const [key, value] of index.iterator()) { yield { key, value } } } + const task = async () => { + await queue.add(updateIndex(index)) + } // TODO: all() const close = async () => { - events.off('update', updateIndex(index)) + events.off('update', task) + await queue.onIdle() await index.close() await keyValueStore.close() } // TOD: rename to clear() const drop = async () => { - events.off('update', updateIndex(index)) + events.off('update', task) + await queue.onIdle() await index.clear() await keyValueStore.drop() } // Listen for update events from the database and update the index on every update - events.on('update', updateIndex(index)) + events.on('update', task) return { ...keyValueStore, diff --git a/test/db/keyvalue-persisted.js b/test/db/keyvalue-persisted.js new file mode 100644 index 0000000..3ea0bf6 --- /dev/null +++ b/test/db/keyvalue-persisted.js @@ -0,0 +1,188 @@ +import { deepStrictEqual, strictEqual } from 'assert' +import mapSeries from 'p-map-series' +import rimraf from 'rimraf' +import { Log, Entry } from '../../src/oplog/index.js' +import { KeyValuePersisted, KeyValue, Database } from '../../src/db/index.js' +import { IPFSBlockStorage, LevelStorage } from '../../src/storage/index.js' +import { config, testAPIs, startIpfs, stopIpfs } from 'orbit-db-test-utils' +import { createTestIdentities, cleanUpTestIdentities } from '../fixtures/orbit-db-identity-keys.js' + +const { sync: rmrf } = rimraf + +const OpLog = { Log, Entry, IPFSBlockStorage, LevelStorage } + +Object.keys(testAPIs).forEach((IPFS) => { + describe('KeyValuePersisted Database (' + IPFS + ')', function () { + this.timeout(config.timeout * 2) + + let ipfsd + let ipfs + let keystore, signingKeyStore + let accessController + let identities1 + let testIdentity1 + let db + + const databaseId = 'keyvalue-AAA' + + before(async () => { + // Start two IPFS instances + ipfsd = await startIpfs(IPFS, config.daemon1) + ipfs = ipfsd.api + + const [identities, testIdentities] = await createTestIdentities(ipfs) + identities1 = identities[0] + testIdentity1 = testIdentities[0] + + rmrf(testIdentity1.id) + }) + + after(async () => { + await cleanUpTestIdentities([identities1]) + + if (ipfsd) { + await stopIpfs(ipfsd) + } + if (keystore) { + await keystore.close() + } + if (signingKeyStore) { + await signingKeyStore.close() + } + if (testIdentity1) { + rmrf(testIdentity1.id) + } + }) + + beforeEach(async () => { + db = await KeyValuePersisted({ OpLog, KeyValue, Database, ipfs, identity: testIdentity1, databaseId, accessController }) + }) + + afterEach(async () => { + if (db) { + await db.drop() + await db.close() + } + }) + + it('creates a keyvalue store', async () => { + strictEqual(db.databaseId, databaseId) + strictEqual(db.type, 'keyvalue') + }) + + it('returns 0 items when it\'s a fresh database', async () => { + const all = [] + for await (const item of db.iterator()) { + all.unshift(item) + } + + strictEqual(all.length, 0) + }) + + it('sets a key/value pair', async () => { + const expected = 'zdpuAyRbzMUs1v7B1gqRRHe6rnxwYbHKzDhxh3rJanEjoucHt' + + const actual = await db.set('key1', 'value1') + strictEqual(actual, expected) + }) + + it('puts a key/value pair', async () => { + const expected = 'zdpuAyRbzMUs1v7B1gqRRHe6rnxwYbHKzDhxh3rJanEjoucHt' + + const actual = await db.put('key1', 'value1') + strictEqual(actual, expected) + }) + + it('gets a key/value pair\'s value', async () => { + const key = 'key1' + const expected = 'value1' + + const hash = await db.put(key, expected) + const actual = await db.get(key) + strictEqual(actual, expected) + }) + + it('get key\'s updated value when using put', async () => { + const key = 'key1' + const expected = 'hello2' + + await db.put(key, 'value1') + await db.put(key, expected) + const actual = await db.get(key) + strictEqual(actual, expected) + }) + + it('get key\'s updated value when using set', async () => { + const key = 'key1' + const expected = 'hello2' + + await db.set(key, 'value1') + await db.set(key, expected) + const actual = await db.get(key) + strictEqual(actual, expected) + }) + + it('get key\'s updated value when using set then put', async () => { + const key = 'key1' + const expected = 'hello2' + + await db.set(key, 'value1') + await db.put(key, expected) + const actual = await db.get(key) + strictEqual(actual, expected) + }) + + it('get key\'s updated value when using put then set', async () => { + const key = 'key1' + const expected = 'hello2' + + await db.put(key, 'value1') + await db.set(key, expected) + const actual = await db.get(key) + strictEqual(actual, expected) + }) + + it('deletes a key/value pair', async () => { + const key = 'key1' + const expected = undefined + + await db.put(key, 'value1') + const hash = await db.del(key) + + const actual = await db.get(hash) + strictEqual(actual, expected) + }) + + it('deletes a non-existent key/value pair', async () => { + const expected = undefined + + const del = await db.del('zdpuApFgnZNp6qQqeuHRLJhEKsmMnXEEJfSZofLc3ZZXEihWE') + + const actual = await db.get(del) + strictEqual(actual, expected) + }) + + it('returns all key/value pairs', async () => { + const keyvalue = [ + { key: 'key1', value: 'init' }, + { key: 'key2', value: true }, + { key: 'key3', value: 'hello' }, + { key: 'key4', value: 'friend' }, + { key: 'key5', value: '12345' }, + { key: 'key6', value: 'empty' }, + { key: 'key7', value: 'friend33' } + ] + + for (const { key, value } of Object.values(keyvalue)) { + await db.put(key, value) + } + + const all = [] + for await (const pair of db.iterator()) { + all.push(pair) + } + + deepStrictEqual(all, keyvalue) + }) + }) +}) diff --git a/test/db/keyvalue.test.js b/test/db/keyvalue.test.js index 20b5e1c..a8ade8e 100644 --- a/test/db/keyvalue.test.js +++ b/test/db/keyvalue.test.js @@ -69,16 +69,16 @@ Object.keys(testAPIs).forEach((IPFS) => { strictEqual(db.databaseId, databaseId) strictEqual(db.type, 'keyvalue') }) - + it('returns 0 items when it\'s a fresh database', async () => { const all = [] for await (const item of db.iterator()) { all.unshift(item) } - + strictEqual(all.length, 0) }) - + it('sets a key/value pair', async () => { const expected = 'zdpuAyRbzMUs1v7B1gqRRHe6rnxwYbHKzDhxh3rJanEjoucHt' @@ -91,7 +91,7 @@ Object.keys(testAPIs).forEach((IPFS) => { const actual = await db.put('key1', 'value1') strictEqual(actual, expected) - }) + }) it('gets a key/value pair\'s value', async () => { const key = 'key1' @@ -101,7 +101,7 @@ Object.keys(testAPIs).forEach((IPFS) => { const actual = await db.get(key) strictEqual(actual, expected) }) - + it('get key\'s updated value when using put', async () => { const key = 'key1' const expected = 'hello2' @@ -111,7 +111,7 @@ Object.keys(testAPIs).forEach((IPFS) => { const actual = await db.get(key) strictEqual(actual, expected) }) - + it('get key\'s updated value when using set', async () => { const key = 'key1' const expected = 'hello2' @@ -121,7 +121,7 @@ Object.keys(testAPIs).forEach((IPFS) => { const actual = await db.get(key) strictEqual(actual, expected) }) - + it('get key\'s updated value when using set then put', async () => { const key = 'key1' const expected = 'hello2' @@ -131,7 +131,7 @@ Object.keys(testAPIs).forEach((IPFS) => { const actual = await db.get(key) strictEqual(actual, expected) }) - + it('get key\'s updated value when using put then set', async () => { const key = 'key1' const expected = 'hello2' @@ -152,10 +152,10 @@ Object.keys(testAPIs).forEach((IPFS) => { const actual = await db.get(hash) strictEqual(actual, expected) }) - + it('deletes a non-existent key/value pair', async () => { const expected = undefined - + const del = await db.del('zdpuApFgnZNp6qQqeuHRLJhEKsmMnXEEJfSZofLc3ZZXEihWE') const actual = await db.get(del) @@ -163,16 +163,16 @@ Object.keys(testAPIs).forEach((IPFS) => { }) it('returns all key/value pairs', async () => { - const keyvalue = [ - { key: 'key1', value: 'init' }, - { key: 'key2', value: true }, - { key: 'key3', value: 'hello' }, - { key: 'key4', value: 'friend' }, - { key: 'key5', value: '12345' }, - { key: 'key6', value: 'empty' }, - { key: 'key7', value: 'friend33' } - ] - + const keyvalue = [ + { key: 'key1', value: 'init' }, + { key: 'key2', value: true }, + { key: 'key3', value: 'hello' }, + { key: 'key4', value: 'friend' }, + { key: 'key5', value: '12345' }, + { key: 'key6', value: 'empty' }, + { key: 'key7', value: 'friend33' } + ] + for (const { key, value } of Object.values(keyvalue)) { await db.put(key, value) }