This commit is contained in:
haad 2023-02-02 14:07:14 +02:00
parent f576795c3e
commit c5bfcb8bd9
17 changed files with 236 additions and 239 deletions

View File

@ -22,9 +22,12 @@ const queryLoop = async () => {
console.log('Starting benchmark...')
const identity = await IdentityProvider.createIdentity({ id: 'userA' })
// MemoeryStorage is the default storage for Log but defining them here
// in case we want to benchmark different storage modules
const storage = await MemoryStorage()
const stateStorage = await MemoryStorage()
log = await Log(identity, { logId: 'A', storage })
log = await Log(identity, { logId: 'A', storage, stateStorage })
// Output metrics at 1 second interval
setInterval(() => {

2
dist/ipfslog.min.js vendored

File diff suppressed because one or more lines are too long

View File

@ -2,14 +2,14 @@ import { Level } from 'level'
import { EventEmitter } from 'events'
const valueEncoding = 'view'
const defaultPointerCount = 64
const defaultPointerCount = 16
const Database = async ({ OpLog, ipfs, identity, databaseId, accessController, storage }) => {
const { Log, Entry, IPFSBlockStorage } = OpLog
storage = storage || await IPFSBlockStorage({ ipfs, pin: true })
const path = `./${identity.id}/${databaseId}_state`
const path = `./${identity.id}/${databaseId}/_state`
const stateStorage = new Level(path, { valueEncoding })
await stateStorage.open()
@ -26,23 +26,26 @@ const Database = async ({ OpLog, ipfs, identity, databaseId, accessController, s
const handleMessage = async (message) => {
const { id: peerId } = await ipfs.id()
const messageIsFromMe = (message) => String(peerId) === String(message.from)
const messageIsNotFromMe = (message) => String(peerId) !== String(message.from)
const messageHasData = (message) => message.data !== undefined
try {
if (!messageIsFromMe(message)) {
if (messageIsNotFromMe(message) && messageHasData(message)) {
await sync(message.data)
}
} catch (e) {
events.emit('error', e)
// console.error(e)
console.error(e)
}
}
const sync = async (bytes) => {
const entry = await Entry.decode(bytes)
events.emit('sync', entry)
const updated = await log.joinEntry(entry)
if (updated) {
events.emit('update', entry)
if (entry) {
events.emit('sync', entry)
const updated = await log.joinEntry(entry)
if (updated) {
events.emit('update', entry)
}
}
}
@ -53,11 +56,14 @@ const Database = async ({ OpLog, ipfs, identity, databaseId, accessController, s
events.emit('close')
}
// TODO: rename to clear()
const drop = async () => {
await stateStorage.clear()
await storage.clear()
}
const merge = async (other) => {}
// Automatically subscribe to the pubsub channel for this database
await ipfs.pubsub.subscribe(log.id, handleMessage)
@ -65,6 +71,7 @@ const Database = async ({ OpLog, ipfs, identity, databaseId, accessController, s
databaseId,
identity,
sync,
merge,
close,
drop,
addOperation,

View File

@ -3,17 +3,22 @@ const EventStore = async ({ OpLog, Database, ipfs, identity, databaseId, accessC
const { addOperation, log } = database
const put = async (key = null, value) => {
return add(value)
}
const add = async (value) => {
return addOperation({ op: 'ADD', key: null, value })
}
const get = async (hash) => {
const entry = await log.get(hash)
return entry.payload
return entry.payload.value
}
const iterator = async function * ({ gt, gte, lt, lte, amount } = {}) {
for await (const event of log.iterator({ gt, gte, lt, lte, amount })) {
const it = log.iterator({ gt, gte, lt, lte, amount })
for await (const event of it) {
yield event.payload.value
}
}
@ -29,6 +34,7 @@ const EventStore = async ({ OpLog, Database, ipfs, identity, databaseId, accessC
return {
...database,
type: 'events',
put,
add,
get,
iterator,

View File

@ -3,6 +3,10 @@ const Feed = async ({ OpLog, Database, ipfs, identity, databaseId, accessControl
const { addOperation, log } = database
const put = async (key = null, value) => {
return add(value)
}
const add = async (value) => {
return addOperation({ op: 'ADD', key: null, value })
}
@ -13,12 +17,13 @@ const Feed = async ({ OpLog, Database, ipfs, identity, databaseId, accessControl
const get = async (hash) => {
const entry = await log.get(hash)
return entry.payload
return entry.payload.value
}
const iterator = async function * ({ gt, gte, lt, lte, amount } = {}) {
const deleted = {}
for await (const entry of log.iterator({ gt, gte, lt, lte, amount })) {
const it = log.iterator({ gt, gte, lt, lte, amount })
for await (const entry of it) {
const { hash, payload } = entry
const { op, key, value } = payload
if (op === 'ADD' && !deleted[hash]) {
@ -40,6 +45,7 @@ const Feed = async ({ OpLog, Database, ipfs, identity, databaseId, accessControl
return {
...database,
type: 'feed',
put,
add,
del,
get,

View File

@ -3,7 +3,7 @@ import { base58btc } from 'multiformats/bases/base58'
const defaultTimeout = 30000
const IPFSBlockStorage = async ({ ipfs, timeout, pin }) => {
const IPFSBlockStorage = async ({ ipfs, timeout, pin } = {}) => {
timeout = timeout || defaultTimeout
const put = async (hash, data) => {
@ -18,6 +18,10 @@ const IPFSBlockStorage = async ({ ipfs, timeout, pin }) => {
})
}
const del = async (hash) => {
// TODO?
}
const get = async (hash) => {
const cid = CID.parse(hash, base58btc)
const block = await ipfs.block.get(cid, { timeout })
@ -28,6 +32,8 @@ const IPFSBlockStorage = async ({ ipfs, timeout, pin }) => {
const iterator = async function * () {}
// TODO: all()
const merge = async (other) => {}
const clear = async () => {}
@ -36,8 +42,10 @@ const IPFSBlockStorage = async ({ ipfs, timeout, pin }) => {
return {
put,
del,
get,
iterator,
// TODO: all,
merge,
clear,
close

View File

@ -6,7 +6,7 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da
const keyValueStore = await KeyValue({ OpLog, Database, ipfs, identity, databaseId, accessController, storage })
const { events, log } = keyValueStore
const path = `./${identity.id}/${databaseId}_index`
const path = `./${identity.id}/${databaseId}/_index`
const index = new Level(path, { valueEncoding })
await index.open()
@ -45,12 +45,15 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da
}
}
// TODO: all()
const close = async () => {
events.off('update', updateIndex(index))
await index.close()
await keyValueStore.close()
}
// TOD: rename to clear()
const drop = async () => {
events.off('update', updateIndex(index))
await index.clear()
@ -64,6 +67,7 @@ const KeyValuePersisted = async ({ KeyValue, OpLog, Database, ipfs, identity, da
...keyValueStore,
get,
iterator,
// TODO: all,
close,
drop
}

View File

@ -35,6 +35,8 @@ const KeyValue = async ({ OpLog, Database, ipfs, identity, databaseId, accessCon
}
}
// TODO: all()
return {
...database,
type: 'kv',
@ -43,6 +45,7 @@ const KeyValue = async ({ OpLog, Database, ipfs, identity, databaseId, accessCon
del,
get,
iterator
// TODO: all,
}
}

View File

@ -1,6 +1,6 @@
import { Level } from 'level'
const LevelStorage = async ({ path, valueEncoding } = {}, next) => {
const LevelStorage = async ({ path, valueEncoding } = {}) => {
path = path || './level'
// console.log("Path:", path)
@ -8,11 +8,16 @@ const LevelStorage = async ({ path, valueEncoding } = {}, next) => {
const db = new Level(path, { valueEncoding: valueEncoding || 'view', passive: true })
await db.open()
const add = async (hash, data) => {
await db.put(hash, data, { valueEncoding })
if (next) {
return next.add(data)
}
const put = async (key = null, value) => {
return add(null, value)
}
const add = async (hash, value) => {
await db.put(hash, value, { valueEncoding })
}
const del = async (hash) => {
await db.del(hash)
}
const get = async (hash) => {
@ -20,18 +25,9 @@ const LevelStorage = async ({ path, valueEncoding } = {}, next) => {
if (value !== undefined) {
return value
}
if (next) {
return next.get(hash)
}
}
const del = async (hash) => {
await db.del(hash)
if (next) {
return next.add(hash)
}
}
// TODO: rename to iterator()
// const values = async () => {
// const res = {}
// for await (const [key, value] of await db.iterator({ valueEncoding }).all()) {
@ -40,6 +36,8 @@ const LevelStorage = async ({ path, valueEncoding } = {}, next) => {
// return res
// }
// TODO: all()
const merge = async (other) => {}
const clear = async () => {
@ -52,9 +50,11 @@ const LevelStorage = async ({ path, valueEncoding } = {}, next) => {
return {
add,
get,
put,
del,
// values,
get,
// TODO: iterator,
// TODO: all,
merge,
clear,
close

View File

@ -5,6 +5,7 @@ import Sorting from './log-sorting.js'
import IPFSBlockStorage from './ipfs-block-storage.js'
import MemoryStorage from './memory-storage.js'
import LRUStorage from './lru-storage.js'
// import LevelStorage from './level-storage.js'
import ComposedStorage from './composed-storage.js'
import { isDefined } from './utils/index.js'
@ -16,6 +17,7 @@ const maxClockTimeReducer = (res, acc) => Math.max(res, acc.clock.time)
// Default storage for storing the Log and its entries. Default: Memory. Options: Memory, LRU, IPFS.
const DefaultStorage = MemoryStorage
// const DefaultStorage = LRUStorage
// const DefaultStorage = LevelStorage
// const DefaultStorage = IPFSBlockStorage
// Default AccessController for the Log.
@ -132,6 +134,11 @@ const Log = async (identity, { logId, logHeads, access, storage, stateStorage, s
* @return {Promise<Entry>} Entry that was appended
*/
const append = async (data, options = { pointerCount: 1 }) => {
// 1. Prepare entry
// 2. Authorize entry
// 3. Store entry
// 4. return Entry
// Get references (entry at every pow2 of distance)
const refs = await getReferences(options.pointerCount)
// Create the next pointers from heads
@ -252,7 +259,10 @@ const Log = async (identity, { logId, logHeads, access, storage, stateStorage, s
}
// Get the next entry from the stack
entry = stack.pop()
if (entry) {
// If we have an entry that we haven't traversed yet, process it
if (entry && !traversed[entry.hash]) {
// Add to the hashes we've traversed
traversed[entry.hash] = true
// Yield the current entry
yield entry
// Add hashes of next entries to the stack from entry's
@ -260,8 +270,6 @@ const Log = async (identity, { logId, logHeads, access, storage, stateStorage, s
for (const hash of [...entry.next, ...entry.refs]) {
// Check if we've already traversed this entry
if (!traversed[hash]) {
// Add to the hashes we've traversed
traversed[hash] = true
// Fetch the next entry
const next = await get(hash)
if (next) {
@ -424,6 +432,7 @@ const Log = async (identity, { logId, logHeads, access, storage, stateStorage, s
clock,
heads,
values,
all: values, // Alias for values()
get,
append,
join,

View File

@ -9,6 +9,10 @@ const LRUStorage = async ({ size } = {}) => {
lru.set(hash, data)
}
const del = async (hash) => {
lru.remove(hash)
}
const get = async (hash) => {
if (lru.peek(hash)) {
return lru.get(hash)
@ -22,6 +26,8 @@ const LRUStorage = async ({ size } = {}) => {
}
}
// TODO: all()
const merge = async (other) => {
if (other) {
for await (const [key, value] of other.iterator()) {
@ -38,8 +44,10 @@ const LRUStorage = async ({ size } = {}) => {
return {
put,
del,
get,
iterator,
// TODO: all,
merge,
clear,
close

View File

@ -5,6 +5,10 @@ const MemoryStorage = async () => {
memory[hash] = data
}
const del = async (hash) => {
delete memory[hash]
}
const get = async (hash) => {
if (memory[hash]) {
return memory[hash]
@ -25,14 +29,18 @@ const MemoryStorage = async () => {
}
}
// TODO: all()
const clear = async () => (memory = {})
const close = async () => {}
return {
put,
del,
get,
iterator,
// TODO: all,
merge,
clear,
close

View File

@ -10,6 +10,7 @@ import Database from '../src/database.js'
// Test utils
import { config, testAPIs, startIpfs, stopIpfs, getIpfsPeerId, waitForPeers } from 'orbit-db-test-utils'
import connectPeers from './utils/connect-nodes.js'
import waitFor from './utils/wait-for.js'
import { identityKeys, signingKeys } from './fixtures/orbit-db-identity-keys.js'
const { sync: rmrf } = rimraf
@ -29,6 +30,9 @@ Object.keys(testAPIs).forEach((IPFS) => {
const databaseId = 'events-AAA'
before(async () => {
rmrf('./keys_1')
rmrf('./keys_2')
// Start two IPFS instances
ipfsd1 = await startIpfs(IPFS, config.daemon1)
ipfsd2 = await startIpfs(IPFS, config.daemon2)
@ -56,6 +60,9 @@ Object.keys(testAPIs).forEach((IPFS) => {
// Create an identity for each peers
testIdentity1 = await createIdentity({ id: 'userA', keystore, signingKeystore })
testIdentity2 = await createIdentity({ id: 'userB', keystore, signingKeystore })
rmrf(testIdentity1.id)
rmrf(testIdentity2.id)
})
afterEach(async () => {
@ -96,29 +103,30 @@ Object.keys(testAPIs).forEach((IPFS) => {
describe('using database', () => {
it('returns all entries in the database', async () => {
let updateCount = 0
let syncCount = 0
// let syncCount = 0
const accessController = {
canAppend: (entry) => entry.identity.id === testIdentity1.id
}
const onUpdate = (entry) => {
updateCount++
}
const onSync = (entry) => {
syncCount++
// console.log(".", updateCount, entry.payload)
++updateCount
}
// const onSync = (entry) => {
// ++syncCount
// }
const onError = () => {
}
kv1 = await EventStore({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
kv2 = await EventStore({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
kv1.events.on('update', onUpdate)
// kv1.events.on('update', onUpdate)
kv2.events.on('update', onUpdate)
kv1.events.on('sync', onSync)
kv2.events.on('sync', onSync)
kv1.events.on('error', onError)
// kv1.events.on('sync', onSync)
// kv2.events.on('sync', onSync)
// kv1.events.on('error', onError)
kv2.events.on('error', onError)
strictEqual(kv1.type, 'events')
@ -139,40 +147,18 @@ Object.keys(testAPIs).forEach((IPFS) => {
await kv1.add('')
await kv1.add('friend33')
// const hash = await kv1.add('friend33')
// await kv1.set('init', true)
// await kv1.set('hello', 'friend')
// await kv1.del('hello')
// await kv1.set('hello', 'friend2')
// await kv1.del('hello')
// await kv1.set('empty', '')
// await kv1.del('empty')
// const hash = await kv1.set('hello', 'friend3')
// const lastEntry = await kv1.get(hash)
// const sleep = (time) => new Promise((resolve) => {
// setTimeout(() => {
// resolve()
// }, time)
// })
// await sleep(5000) // give some time for ipfs peers to sync
const waitForAllUpdates = async () => {
return new Promise((resolve) => {
const interval = setInterval(() => {
if (updateCount >= 8 * 2 || syncCount >= 8) {
clearInterval(interval)
resolve()
}
}, 100)
})
}
await waitForAllUpdates()
// sync() test
// console.time('sync')
// await kv2.sync(lastEntry.bytes)
// console.timeEnd('sync')
// await sleep(1000) // give some time for ipfs peers to sync
await waitFor(() => updateCount, () => 8)
// onUpdate test
strictEqual(updateCount, 8)
// // write access test
// let errorMessage
// try {
@ -219,16 +205,13 @@ Object.keys(testAPIs).forEach((IPFS) => {
// onError test
// notStrictEqual(error, undefined)
// strictEqual(error.message, 'CBOR decode error: too many terminals, data makes no sense')
// onUpdate test
strictEqual(updateCount, 8 * 2)
})
})
describe.skip('load database', () => {
describe('load database', () => {
it('returns all entries in the database', async () => {
let updateCount = 0
let syncCount = 0
// let syncCount = 0
const accessController = {
canAppend: (entry) => entry.identity.id === testIdentity1.id
@ -237,17 +220,17 @@ Object.keys(testAPIs).forEach((IPFS) => {
const onUpdate = (entry) => {
++updateCount
}
const onSync = (entry) => {
++syncCount
}
// const onSync = (entry) => {
// ++syncCount
// }
kv1 = await EventStore({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
kv2 = await EventStore({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
kv1.events.on('update', onUpdate)
// kv1.events.on('update', onUpdate)
kv2.events.on('update', onUpdate)
kv1.events.on('sync', onSync)
kv2.events.on('sync', onSync)
// kv1.events.on('sync', onSync)
// kv2.events.on('sync', onSync)
await waitForPeers(ipfs1, [peerId2], databaseId)
await waitForPeers(ipfs2, [peerId1], databaseId)
@ -263,34 +246,21 @@ Object.keys(testAPIs).forEach((IPFS) => {
// const hash = await kv1.add('friend33')
// const lastEntry = await kv1.log.get(hash)
// const sleep = (time) => new Promise((resolve) => {
// setTimeout(() => {
// resolve()
// }, time)
// })
// await sleep(10000) // give some time for ipfs peers to sync
const waitForAllUpdates = async () => {
return new Promise((resolve) => {
const interval = setInterval(() => {
if (updateCount >= 8 * 2 || syncCount >= 8) {
clearInterval(interval)
resolve()
}
}, 100)
})
}
await waitForAllUpdates()
// sync() test
// console.time('sync')
// await kv2.sync(lastEntry.bytes)
// console.timeEnd('sync')
// await kv1.close()
// await kv2.close()
await waitFor(() => updateCount, () => 8)
// kv1 = await EventStore({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
// kv2 = await EventStore({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
// onUpdate test
strictEqual(updateCount, 8)
await kv1.close()
await kv2.close()
kv1 = await EventStore({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
kv2 = await EventStore({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
// all() test
const all2 = []

View File

@ -10,6 +10,7 @@ import Database from '../src/database.js'
// Test utils
import { config, testAPIs, getIpfsPeerId, waitForPeers, startIpfs, stopIpfs } from 'orbit-db-test-utils'
import connectPeers from './utils/connect-nodes.js'
import waitFor from './utils/wait-for.js'
import { identityKeys, signingKeys } from './fixtures/orbit-db-identity-keys.js'
const { sync: rmrf } = rimraf
@ -56,6 +57,9 @@ Object.keys(testAPIs).forEach((IPFS) => {
// Create an identity for each peers
testIdentity1 = await createIdentity({ id: 'userA', keystore, signingKeystore })
testIdentity2 = await createIdentity({ id: 'userB', keystore, signingKeystore })
rmrf(testIdentity1.id)
rmrf(testIdentity2.id)
})
afterEach(async () => {
@ -96,7 +100,7 @@ Object.keys(testAPIs).forEach((IPFS) => {
describe('using database', () => {
it('returns all entries in the database', async () => {
let updateCount = 0
let syncCount = 0
// let syncCount = 0
const accessController = {
canAppend: (entry) => entry.identity.id === testIdentity1.id
@ -105,19 +109,19 @@ Object.keys(testAPIs).forEach((IPFS) => {
const onUpdate = (entry) => {
++updateCount
}
const onSync = (entry) => {
++syncCount
}
// const onSync = (entry) => {
// ++syncCount
// }
const onError = () => {
}
kv1 = await Feed({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
kv2 = await Feed({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
kv1.events.on('update', onUpdate)
// kv1.events.on('update', onUpdate)
kv2.events.on('update', onUpdate)
kv1.events.on('sync', onSync)
kv2.events.on('sync', onSync)
// kv1.events.on('sync', onSync)
// kv2.events.on('sync', onSync)
kv1.events.on('error', onError)
kv2.events.on('error', onError)
@ -141,30 +145,16 @@ Object.keys(testAPIs).forEach((IPFS) => {
// const hash = await kv1.add('friend33')
// const lastEntry = await kv1.get(hash)
// const sleep = (time) => new Promise((resolve) => {
// setTimeout(() => {
// resolve()
// }, time)
// })
// await sleep(10000) // give some time for ipfs peers to sync
const waitForAllUpdates = async () => {
return new Promise((resolve) => {
const interval = setInterval(() => {
if (updateCount >= 8 * 2 || syncCount >= 8) {
clearInterval(interval)
resolve()
}
}, 100)
})
}
await waitForAllUpdates()
// // sync() test
// console.time('sync')
// await kv2.sync(lastEntry.bytes)
// console.timeEnd('sync')
// await sleep(1000) // give some time for ipfs peers to sync
await waitFor(() => updateCount, () => 8)
// onUpdate test
strictEqual(updateCount, 8)
// // write access test
// let errorMessage
// try {
@ -211,16 +201,13 @@ Object.keys(testAPIs).forEach((IPFS) => {
// onError test
// notStrictEqual(error, undefined)
// strictEqual(error.message, 'CBOR decode error: too many terminals, data makes no sense')
// onUpdate test
strictEqual(updateCount, 8 * 2)
})
})
describe.skip('load database', () => {
describe('load database', () => {
it('returns all entries in the database', async () => {
let updateCount = 0
let syncCount = 0
// let syncCount = 0
const accessController = {
canAppend: (entry) => entry.identity.id === testIdentity1.id
@ -229,19 +216,19 @@ Object.keys(testAPIs).forEach((IPFS) => {
const onUpdate = (entry) => {
++updateCount
}
const onSync = (entry) => {
++syncCount
}
// const onSync = (entry) => {
// ++syncCount
// }
const onError = () => {
}
kv1 = await Feed({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
kv2 = await Feed({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
kv1.events.on('update', onUpdate)
// kv1.events.on('update', onUpdate)
kv2.events.on('update', onUpdate)
kv1.events.on('sync', onSync)
kv2.events.on('sync', onSync)
// kv1.events.on('sync', onSync)
// kv2.events.on('sync', onSync)
kv1.events.on('error', onError)
kv2.events.on('error', onError)
@ -262,36 +249,23 @@ Object.keys(testAPIs).forEach((IPFS) => {
// const hashX = await kv1.del(hash)
// const lastEntry = await kv1.log.get(hashX)
// const sleep = (time) => new Promise((resolve) => {
// setTimeout(() => {
// resolve()
// }, time)
// })
// await sleep(10000) // give some time for ipfs peers to sync
const waitForAllUpdates = async () => {
return new Promise((resolve) => {
const interval = setInterval(() => {
if (updateCount >= 8 * 2 || syncCount >= 8) {
clearInterval(interval)
resolve()
}
}, 100)
})
}
await waitForAllUpdates()
// sync() test
// console.time('sync')
// await kv2.sync(lastEntry.bytes)
// console.timeEnd('sync')
// await kv1.close()
// await kv2.close()
await waitFor(() => updateCount, () => 11)
// onUpdate test
strictEqual(updateCount, 11)
await kv1.close()
await kv2.close()
// // await sleep(1000) // give some time for ipfs peers to sync
// kv1 = await Feed({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
// kv2 = await Feed({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
kv1 = await Feed({ OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
kv2 = await Feed({ OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
// all() test
const all2 = []

View File

@ -11,6 +11,7 @@ import Database from '../src/database.js'
// Test utils
import { config, testAPIs, getIpfsPeerId, waitForPeers, startIpfs, stopIpfs } from 'orbit-db-test-utils'
import connectPeers from './utils/connect-nodes.js'
import waitFor from './utils/wait-for.js'
import { identityKeys, signingKeys } from './fixtures/orbit-db-identity-keys.js'
const { sync: rmrf } = rimraf
@ -57,6 +58,9 @@ Object.keys(testAPIs).forEach((IPFS) => {
// Create an identity for each peers
testIdentity1 = await createIdentity({ id: 'userA', keystore, signingKeystore })
testIdentity2 = await createIdentity({ id: 'userB', keystore, signingKeystore })
rmrf(testIdentity1.id)
rmrf(testIdentity2.id)
})
after(async () => {
@ -98,18 +102,18 @@ Object.keys(testAPIs).forEach((IPFS) => {
it('returns all entries in the database', async () => {
// let error
let updateCount = 0
let syncCount = 0
// const syncCount = 0
const accessController = {
canAppend: (entry) => entry.identity.id === testIdentity1.id
}
const onUpdate = (entry) => {
updateCount++
}
const onSync = (entry) => {
syncCount++
++updateCount
}
// const onSync = (entry) => {
// ++syncCount
// }
const onError = () => {
// error = err
}
@ -119,10 +123,10 @@ Object.keys(testAPIs).forEach((IPFS) => {
kv1 = await KeyValueStorePersisted({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
kv2 = await KeyValueStorePersisted({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
kv1.events.on('update', onUpdate)
// kv1.events.on('update', onUpdate)
kv2.events.on('update', onUpdate)
kv1.events.on('sync', onSync)
kv2.events.on('sync', onSync)
// kv1.events.on('sync', onSync)
// kv2.events.on('sync', onSync)
kv1.events.on('error', onError)
kv2.events.on('error', onError)
@ -146,29 +150,18 @@ Object.keys(testAPIs).forEach((IPFS) => {
// const hash = await kv1.set('hello', 'friend3')
// const lastEntry = await kv1.database.log.get(hash)
// const sleep = (time) => new Promise((resolve) => {
// setTimeout(() => {
// resolve()
// }, time)
// })
// await sleep(10000) // give some time for ipfs peers to sync
const waitForAllUpdates = async () => {
return new Promise((resolve) => {
const interval = setInterval(() => {
if (updateCount >= 8 * 2 || syncCount >= 8) {
clearInterval(interval)
resolve()
}
}, 100)
})
}
await waitForAllUpdates()
// sync() test
// console.time('sync')
// await kv2.sync(lastEntry.bytes)
// console.timeEnd('sync')
await waitFor(() => updateCount, () => 8)
// update event test
strictEqual(updateCount, 8)
// sync event test
// strictEqual(syncCount, 8)
// write access test
// let errorMessage
// try {
@ -229,39 +222,34 @@ Object.keys(testAPIs).forEach((IPFS) => {
// onError test
// notStrictEqual(error, undefined)
// strictEqual(error.message, 'CBOR decode error: too many terminals, data makes no sense')
// update event test
strictEqual(updateCount, 8 * 2)
// sync event test
strictEqual(syncCount, 8)
})
})
describe.skip('load database', () => {
describe('load database', () => {
it('returns all entries in the database', async () => {
let updateCount = 0
let syncCount = 0
// let syncCount = 0
const accessController = {
canAppend: (entry) => entry.identity.id === testIdentity1.id
}
const onUpdate = (entry) => {
updateCount++
}
const onSync = (entry) => {
syncCount++
++updateCount
}
// const onSync = (entry) => {
// ++syncCount
// }
// kv1 = await KeyValueStore({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
// kv2 = await KeyValueStore({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
kv1 = await KeyValueStorePersisted({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
kv2 = await KeyValueStorePersisted({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
kv1.events.on('update', onUpdate)
// kv1.events.on('update', onUpdate)
kv2.events.on('update', onUpdate)
kv1.events.on('sync', onSync)
kv2.events.on('sync', onSync)
// kv1.events.on('sync', onSync)
// kv2.events.on('sync', onSync)
await waitForPeers(ipfs1, [peerId2], databaseId)
await waitForPeers(ipfs2, [peerId1], databaseId)
@ -277,36 +265,21 @@ Object.keys(testAPIs).forEach((IPFS) => {
// const hash = await kv1.set('hello', 'friend3')
// const lastEntry = await kv1.log.get(hash)
// const sleep = (time) => new Promise((resolve) => {
// setTimeout(() => {
// resolve()
// }, time)
// })
// await sleep(10000) // give some time for ipfs peers to sync
const waitForAllUpdates = async () => {
return new Promise((resolve) => {
const interval = setInterval(() => {
if (updateCount >= 8 * 2 || syncCount >= 8) {
clearInterval(interval)
resolve()
}
}, 100)
})
}
await waitForAllUpdates()
// sync() test
// console.time('sync')
// await kv2.sync(lastEntry.bytes)
// console.timeEnd('sync')
// await kv1.close()
// await kv2.close()
await waitFor(() => updateCount, () => 8)
strictEqual(updateCount, 8)
await kv1.close()
await kv2.close()
// kv1 = await KeyValueStore({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
// kv2 = await KeyValueStore({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
// kv1 = await KeyValueStorePersisted({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
// kv2 = await KeyValueStorePersisted({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
kv1 = await KeyValueStorePersisted({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs1, identity: testIdentity1, databaseId, accessController })
kv2 = await KeyValueStorePersisted({ KeyValue: KeyValueStore, OpLog: Log, Database, ipfs: ipfs2, identity: testIdentity2, databaseId, accessController })
console.time('get')
const value0 = await kv2.get('init')
@ -349,8 +322,6 @@ Object.keys(testAPIs).forEach((IPFS) => {
{ key: 'hello', value: 'friend3' },
{ key: 'init', value: true }
])
strictEqual(syncCount, 8)
})
})
})

View File

@ -1,7 +1,7 @@
import { strictEqual } from 'assert'
import rimraf from 'rimraf'
import { copy } from 'fs-extra'
import { Log } from '../src/log.js'
import { Log, MemoryStorage } from '../src/log.js'
import IdentityProvider from 'orbit-db-identity-provider'
import Keystore from '../src/Keystore.js'
@ -110,20 +110,26 @@ Object.keys(testAPIs).forEach((IPFS) => {
{ amount: 32, referenceCount: 16, refLength: 4 },
{ amount: 18, referenceCount: 32, refLength: 5 },
{ amount: 128, referenceCount: 32, refLength: 5 },
{ amount: 63, referenceCount: 64, refLength: 5 },
{ amount: 64, referenceCount: 64, refLength: 6 },
{ amount: 65, referenceCount: 64, refLength: 6 },
{ amount: 91, referenceCount: 64, refLength: 6 },
{ amount: 128, referenceCount: 64, refLength: 6 },
{ amount: 128, referenceCount: 1, refLength: 0 },
{ amount: 128, referenceCount: 2, refLength: 1 },
{ amount: 256, referenceCount: 1, refLength: 0 },
{ amount: 256, referenceCount: 256, refLength: 8 },
{ amount: 256, referenceCount: 1024, refLength: 8 }
{ amount: 256, referenceCount: 4, refLength: 2 },
{ amount: 256, referenceCount: 8, refLength: 3 },
{ amount: 256, referenceCount: 16, refLength: 4 },
{ amount: 256, referenceCount: 32, refLength: 5 },
{ amount: 1024, referenceCount: 2, refLength: 1 }
]
inputs.forEach(input => {
it(`has ${input.refLength} references, max distance ${input.referenceCount}, total of ${input.amount} entries`, async () => {
const test = async (amount, referenceCount, refLength) => {
const log1 = await Log(testIdentity, { logId: 'A' })
const storage = await MemoryStorage()
const log1 = await Log(testIdentity, { logId: 'A', storage })
for (let i = 0; i < amount; i++) {
await log1.append((i + 1).toString(), { pointerCount: referenceCount })
}
@ -140,19 +146,21 @@ Object.keys(testAPIs).forEach((IPFS) => {
// Check the first ref (distance 2)
if (values[idx].refs.length > 0) { strictEqual(values[idx].refs[0], values[idx - 2].hash) }
// Check the second ref (distance 2)
// Check the second ref (distance 4)
if (values[idx].refs.length > 1 && idx > referenceCount) { strictEqual(values[idx].refs[1], values[idx - 4].hash) }
// Check the third ref (distance 4)
// Check the third ref (distance 8)
if (values[idx].refs.length > 2 && idx > referenceCount) { strictEqual(values[idx].refs[2], values[idx - 8].hash) }
// Check the fourth ref (distance 8)
// Check the fourth ref (distance 16)
if (values[idx].refs.length > 3 && idx > referenceCount) { strictEqual(values[idx].refs[3], values[idx - 16].hash) }
// Check the fifth ref (distance 16)
// Check the fifth ref (distance 32)
if (values[idx].refs.length > 4 && idx > referenceCount) { strictEqual(values[idx].refs[4], values[idx - 32].hash) }
// Check the fifth ref (distance 64)
if (values[idx].refs.length > 5 && idx > referenceCount) { strictEqual(values[idx].refs[5], values[idx - 64].hash) }
// Check the reference of each entry
if (idx > referenceCount) { strictEqual(values[idx].refs.length, refLength) }
}

12
test/utils/wait-for.js Normal file
View File

@ -0,0 +1,12 @@
const waitFor = async (valueA, toBeValueB, pollInterval = 100) => {
return new Promise((resolve) => {
const interval = setInterval(() => {
if (valueA() === toBeValueB()) {
clearInterval(interval)
resolve()
}
}, pollInterval)
})
}
export default waitFor