More refactoring

This commit is contained in:
haad
2023-04-06 14:27:13 +03:00
parent eddf5b853b
commit 3030336db9
11 changed files with 67 additions and 81 deletions

View File

@@ -65,7 +65,7 @@ const OrbitDB = async ({ ipfs, id, identity, keystore, directory } = {}) => {
let databases = {}
const open = async (address, { type, meta, sync, Database, AccessController } = {}) => {
const open = async (address, { type, meta, sync, Database, AccessController, headsStorage, entryStorage, indexStorage, referencesCount } = {}) => {
let name, manifest, accessController
if (type && !databaseTypes[type]) {
@@ -108,7 +108,7 @@ const OrbitDB = async ({ ipfs, id, identity, keystore, directory } = {}) => {
if (!Database) {
throw new Error(`Unsupported database type: '${type}'`)
}
const db = await Database({ ipfs, identity, address: address.toString(), name, access: accessController, directory, meta, syncAutomatically: sync != null ? sync : true })
const db = await Database({ ipfs, identity, address: address.toString(), name, access: accessController, directory, meta, syncAutomatically: sync != null ? sync : true, headsStorage, entryStorage, indexStorage, referencesCount })
db.events.on('close', onDatabaseClosed(address.toString()))

View File

@@ -38,7 +38,7 @@ const Database = async ({ ipfs, identity, address, name, access, directory, meta
const entry = await log.append(op, { referencesCount })
await sync.add(entry)
if (onUpdate) {
await onUpdate(entry)
await onUpdate(log, entry)
}
events.emit('update', entry)
return entry.hash
@@ -55,7 +55,7 @@ const Database = async ({ ipfs, identity, address, name, access, directory, meta
const updated = await log.joinEntry(entry)
if (updated) {
if (onUpdate) {
await onUpdate(entry)
await onUpdate(log, entry)
}
events.emit('update', entry)
}

View File

@@ -2,8 +2,8 @@ import Database from '../database.js'
const DefaultOptions = { indexBy: '_id' }
const Documents = ({ indexBy } = DefaultOptions) => async ({ ipfs, identity, address, name, access, directory, storage, meta, syncAutomatically }) => {
const database = await Database({ ipfs, identity, address, name, access, directory, storage, meta, syncAutomatically })
const Documents = ({ indexBy } = DefaultOptions) => async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => {
const database = await Database({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically })
const { addOperation, log } = database

View File

@@ -1,14 +1,10 @@
import Database from '../database.js'
const Events = () => async ({ ipfs, identity, address, name, access, directory, storage, meta, syncAutomatically }) => {
const database = await Database({ ipfs, identity, address, name, access, directory, storage, meta, syncAutomatically })
const Events = () => async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => {
const database = await Database({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate })
const { addOperation, log } = database
const put = async (key = null, value) => {
return add(value)
}
const add = async (value) => {
return addOperation({ op: 'ADD', key: null, value })
}
@@ -38,7 +34,6 @@ const Events = () => async ({ ipfs, identity, address, name, access, directory,
return {
...database,
type: 'events',
put,
add,
get,
iterator,

View File

@@ -1,18 +1,20 @@
import { KeyValue } from './index.js'
import LevelStorage from '../storage/level.js'
import pathJoin from '../utils/path-join.js'
import PQueue from 'p-queue'
const valueEncoding = 'json'
const KeyValueIndexed = ({ indexStorage } = {}) => async ({ ipfs, identity, address, name, access, directory, storage, meta }) => {
directory = pathJoin(directory || './orbitdb', `./${address}/_index/`)
const index = indexStorage || await LevelStorage({ path: directory, valueEncoding })
const KeyValueIndexed = ({ storage } = {}) => async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => {
const indexDirectory = pathJoin(directory || './orbitdb', `./${address}/_index/`)
const index = storage || await LevelStorage({ path: indexDirectory, valueEncoding })
const updateIndex = async (entry) => {
let latestOplogHash
const _updateIndex = async (log, entry) => {
const keys = {}
const it = await log.iterator({ gt: latestOplogHash })
for await (const entry of log.iterator({ gt: latestOplogHash })) {
for await (const entry of it) {
const { op, key, value } = entry.payload
if (op === 'PUT' && !keys[key]) {
@@ -23,18 +25,17 @@ const KeyValueIndexed = ({ indexStorage } = {}) => async ({ ipfs, identity, addr
await index.del(key)
}
}
latestOplogHash = entry.hash
latestOplogHash = entry ? entry.hash : null
}
const keyValueStore = await KeyValue()({ ipfs, identity, address, name, access, directory, storage, meta, onUpdate: updateIndex })
const { events, log } = keyValueStore
// Create the underlying KeyValue database
const keyValueStore = await KeyValue()({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate: _updateIndex })
const queue = new PQueue({ concurrency: 1 })
let latestOplogHash
// Compute the index
await _updateIndex(keyValueStore.log)
const get = async (key) => {
await queue.onIdle()
const value = await index.get(key)
if (value) {
return value
@@ -43,34 +44,22 @@ const KeyValueIndexed = ({ indexStorage } = {}) => async ({ ipfs, identity, addr
}
const iterator = async function * ({ amount } = {}) {
await queue.onIdle()
for await (const { hash, key, value } of keyValueStore.iterator({ amount })) {
yield { hash, key, value }
const it = keyValueStore.iterator({ amount })
for await (const { key, value, hash } of it) {
yield { key, value, hash }
}
}
const task = async () => {
await queue.add(updateIndex(index))
}
const close = async () => {
events.off('update', task)
await queue.onIdle()
await index.close()
await keyValueStore.close()
}
// TOD: rename to clear()
const drop = async () => {
events.off('update', task)
await queue.onIdle()
await index.clear()
await keyValueStore.drop()
}
// Listen for update events from the database and update the index on every update
// events.on('update', task)
return {
...keyValueStore,
get,

View File

@@ -1,7 +1,7 @@
import Database from '../database.js'
const KeyValue = () => async ({ ipfs, identity, address, name, access, directory, storage, meta, syncAutomatically, onUpdate }) => {
const database = await Database({ ipfs, identity, address, name, access, directory, storage, meta, syncAutomatically, onUpdate })
const KeyValue = () => async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => {
const database = await Database({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate })
const { addOperation, log } = database
@@ -33,7 +33,7 @@ const KeyValue = () => async ({ ipfs, identity, address, name, access, directory
keys[key] = true
count++
const hash = entry.hash
yield { hash, key, value }
yield { key, value, hash }
} else if (op === 'DEL' && !keys[key]) {
keys[key] = true
}