refactor: queue writes to index.

This commit is contained in:
Hayden Young 2023-02-24 02:51:56 +00:00
parent 3f9afe627c
commit 1fe62104c6
3 changed files with 225 additions and 23 deletions

View File

@ -1,4 +1,5 @@
import { Level } from 'level'
import PQueue from 'p-queue'
const valueEncoding = 'json'
@ -6,6 +7,8 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da
const keyValueStore = await KeyValue({ OpLog, Database, ipfs, identity, databaseId, accessController, storage })
const { events, log } = keyValueStore
const queue = new PQueue({ concurrency: 1 })
const path = `./${identity.id}/${databaseId}/_index`
const index = new Level(path, { valueEncoding })
await index.open()
@ -14,8 +17,10 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da
const updateIndex = (index) => async (entry) => {
const keys = {}
for await (const entry of log.iterator({ gt: latestOplogHash })) {
const { op, key, value } = entry.payload
if (op === 'PUT' && !keys[key]) {
keys[key] = true
await index.put(key, value)
@ -28,6 +33,8 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da
}
const get = async (key) => {
await queue.onIdle()
try {
const value = await index.get(key)
if (value) {
@ -36,32 +43,39 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da
} catch (e) {
// LEVEL_NOT_FOUND (ie. key not found)
}
return keyValueStore.get(key)
}
const iterator = async function * () {
await queue.onIdle()
for await (const [key, value] of index.iterator()) {
yield { key, value }
}
}
const task = async () => {
await queue.add(updateIndex(index))
}
// TODO: all()
const close = async () => {
events.off('update', updateIndex(index))
events.off('update', task)
await queue.onIdle()
await index.close()
await keyValueStore.close()
}
// TOD: rename to clear()
const drop = async () => {
events.off('update', updateIndex(index))
events.off('update', task)
await queue.onIdle()
await index.clear()
await keyValueStore.drop()
}
// Listen for update events from the database and update the index on every update
events.on('update', updateIndex(index))
events.on('update', task)
return {
...keyValueStore,

View File

@ -0,0 +1,188 @@
import { deepStrictEqual, strictEqual } from 'assert'
import mapSeries from 'p-map-series'
import rimraf from 'rimraf'
import { Log, Entry } from '../../src/oplog/index.js'
import { KeyValuePersisted, KeyValue, Database } from '../../src/db/index.js'
import { IPFSBlockStorage, LevelStorage } from '../../src/storage/index.js'
import { config, testAPIs, startIpfs, stopIpfs } from 'orbit-db-test-utils'
import { createTestIdentities, cleanUpTestIdentities } from '../fixtures/orbit-db-identity-keys.js'
const { sync: rmrf } = rimraf
const OpLog = { Log, Entry, IPFSBlockStorage, LevelStorage }
Object.keys(testAPIs).forEach((IPFS) => {
describe('KeyValuePersisted Database (' + IPFS + ')', function () {
this.timeout(config.timeout * 2)
let ipfsd
let ipfs
let keystore, signingKeyStore
let accessController
let identities1
let testIdentity1
let db
const databaseId = 'keyvalue-AAA'
before(async () => {
// Start two IPFS instances
ipfsd = await startIpfs(IPFS, config.daemon1)
ipfs = ipfsd.api
const [identities, testIdentities] = await createTestIdentities(ipfs)
identities1 = identities[0]
testIdentity1 = testIdentities[0]
rmrf(testIdentity1.id)
})
after(async () => {
await cleanUpTestIdentities([identities1])
if (ipfsd) {
await stopIpfs(ipfsd)
}
if (keystore) {
await keystore.close()
}
if (signingKeyStore) {
await signingKeyStore.close()
}
if (testIdentity1) {
rmrf(testIdentity1.id)
}
})
beforeEach(async () => {
db = await KeyValuePersisted({ OpLog, KeyValue, Database, ipfs, identity: testIdentity1, databaseId, accessController })
})
afterEach(async () => {
if (db) {
await db.drop()
await db.close()
}
})
it('creates a keyvalue store', async () => {
strictEqual(db.databaseId, databaseId)
strictEqual(db.type, 'keyvalue')
})
it('returns 0 items when it\'s a fresh database', async () => {
const all = []
for await (const item of db.iterator()) {
all.unshift(item)
}
strictEqual(all.length, 0)
})
it('sets a key/value pair', async () => {
const expected = 'zdpuAyRbzMUs1v7B1gqRRHe6rnxwYbHKzDhxh3rJanEjoucHt'
const actual = await db.set('key1', 'value1')
strictEqual(actual, expected)
})
it('puts a key/value pair', async () => {
const expected = 'zdpuAyRbzMUs1v7B1gqRRHe6rnxwYbHKzDhxh3rJanEjoucHt'
const actual = await db.put('key1', 'value1')
strictEqual(actual, expected)
})
it('gets a key/value pair\'s value', async () => {
const key = 'key1'
const expected = 'value1'
const hash = await db.put(key, expected)
const actual = await db.get(key)
strictEqual(actual, expected)
})
it('get key\'s updated value when using put', async () => {
const key = 'key1'
const expected = 'hello2'
await db.put(key, 'value1')
await db.put(key, expected)
const actual = await db.get(key)
strictEqual(actual, expected)
})
it('get key\'s updated value when using set', async () => {
const key = 'key1'
const expected = 'hello2'
await db.set(key, 'value1')
await db.set(key, expected)
const actual = await db.get(key)
strictEqual(actual, expected)
})
it('get key\'s updated value when using set then put', async () => {
const key = 'key1'
const expected = 'hello2'
await db.set(key, 'value1')
await db.put(key, expected)
const actual = await db.get(key)
strictEqual(actual, expected)
})
it('get key\'s updated value when using put then set', async () => {
const key = 'key1'
const expected = 'hello2'
await db.put(key, 'value1')
await db.set(key, expected)
const actual = await db.get(key)
strictEqual(actual, expected)
})
it('deletes a key/value pair', async () => {
const key = 'key1'
const expected = undefined
await db.put(key, 'value1')
const hash = await db.del(key)
const actual = await db.get(hash)
strictEqual(actual, expected)
})
it('deletes a non-existent key/value pair', async () => {
const expected = undefined
const del = await db.del('zdpuApFgnZNp6qQqeuHRLJhEKsmMnXEEJfSZofLc3ZZXEihWE')
const actual = await db.get(del)
strictEqual(actual, expected)
})
it('returns all key/value pairs', async () => {
const keyvalue = [
{ key: 'key1', value: 'init' },
{ key: 'key2', value: true },
{ key: 'key3', value: 'hello' },
{ key: 'key4', value: 'friend' },
{ key: 'key5', value: '12345' },
{ key: 'key6', value: 'empty' },
{ key: 'key7', value: 'friend33' }
]
for (const { key, value } of Object.values(keyvalue)) {
await db.put(key, value)
}
const all = []
for await (const pair of db.iterator()) {
all.push(pair)
}
deepStrictEqual(all, keyvalue)
})
})
})

View File

@ -69,16 +69,16 @@ Object.keys(testAPIs).forEach((IPFS) => {
strictEqual(db.databaseId, databaseId)
strictEqual(db.type, 'keyvalue')
})
it('returns 0 items when it\'s a fresh database', async () => {
const all = []
for await (const item of db.iterator()) {
all.unshift(item)
}
strictEqual(all.length, 0)
})
it('sets a key/value pair', async () => {
const expected = 'zdpuAyRbzMUs1v7B1gqRRHe6rnxwYbHKzDhxh3rJanEjoucHt'
@ -91,7 +91,7 @@ Object.keys(testAPIs).forEach((IPFS) => {
const actual = await db.put('key1', 'value1')
strictEqual(actual, expected)
})
})
it('gets a key/value pair\'s value', async () => {
const key = 'key1'
@ -101,7 +101,7 @@ Object.keys(testAPIs).forEach((IPFS) => {
const actual = await db.get(key)
strictEqual(actual, expected)
})
it('get key\'s updated value when using put', async () => {
const key = 'key1'
const expected = 'hello2'
@ -111,7 +111,7 @@ Object.keys(testAPIs).forEach((IPFS) => {
const actual = await db.get(key)
strictEqual(actual, expected)
})
it('get key\'s updated value when using set', async () => {
const key = 'key1'
const expected = 'hello2'
@ -121,7 +121,7 @@ Object.keys(testAPIs).forEach((IPFS) => {
const actual = await db.get(key)
strictEqual(actual, expected)
})
it('get key\'s updated value when using set then put', async () => {
const key = 'key1'
const expected = 'hello2'
@ -131,7 +131,7 @@ Object.keys(testAPIs).forEach((IPFS) => {
const actual = await db.get(key)
strictEqual(actual, expected)
})
it('get key\'s updated value when using put then set', async () => {
const key = 'key1'
const expected = 'hello2'
@ -152,10 +152,10 @@ Object.keys(testAPIs).forEach((IPFS) => {
const actual = await db.get(hash)
strictEqual(actual, expected)
})
it('deletes a non-existent key/value pair', async () => {
const expected = undefined
const del = await db.del('zdpuApFgnZNp6qQqeuHRLJhEKsmMnXEEJfSZofLc3ZZXEihWE')
const actual = await db.get(del)
@ -163,16 +163,16 @@ Object.keys(testAPIs).forEach((IPFS) => {
})
it('returns all key/value pairs', async () => {
const keyvalue = [
{ key: 'key1', value: 'init' },
{ key: 'key2', value: true },
{ key: 'key3', value: 'hello' },
{ key: 'key4', value: 'friend' },
{ key: 'key5', value: '12345' },
{ key: 'key6', value: 'empty' },
{ key: 'key7', value: 'friend33' }
]
const keyvalue = [
{ key: 'key1', value: 'init' },
{ key: 'key2', value: true },
{ key: 'key3', value: 'hello' },
{ key: 'key4', value: 'friend' },
{ key: 'key5', value: '12345' },
{ key: 'key6', value: 'empty' },
{ key: 'key7', value: 'friend33' }
]
for (const { key, value } of Object.values(keyvalue)) {
await db.put(key, value)
}