Add Entry index to Log (#48)

* Turn off IPFS console.log output for tests

* Fix replication and sync tests

* Close keystore after tests in IPFSAccessController tests

* Fix error event test in Sync tests

* Fix race condition

* Add entry index to Log

* Clean up storage files

* Fix tests

* Fix linter
This commit is contained in:
Haad 2023-03-27 13:09:44 +03:00 committed by GitHub
parent f396d97a69
commit 4fe1b0c1a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 115 additions and 85 deletions

View File

@ -37,14 +37,17 @@ const queryLoop = async () => {
// in case we want to benchmark different storage modules
const entryStorage = await MemoryStorage()
const headsStorage = await MemoryStorage()
const indexStorage = await MemoryStorage()
// Test LRUStorage
// const entryStorage = await LRUStorage()
// const headsStorage = await LRUStorage()
// const indexStorage = await LRUStorage()
// Test LevelStorage
// const entryStorage = await LevelStorage({ path: './logA/entries' })
// const headsStorage = await LevelStorage({ path: './logA/heads' })
// const indexStorage = await LevelStorage({ path: './logA/index' })
log = await Log(testIdentity, { logId: 'A', entryStorage, headsStorage })
log = await Log(testIdentity, { logId: 'A', entryStorage, headsStorage, indexStorage })
// Output metrics at 1 second interval
const interval = setInterval(async () => {

View File

@ -15,14 +15,17 @@ import rmrf from 'rimraf'
// in case we want to benchmark different storage modules
const entryStorage = await MemoryStorage()
const headsStorage = await MemoryStorage()
const indexStorage = await MemoryStorage()
// Test LRUStorage
// const entryStorage = await LRUStorage()
// const headsStorage = await LRUStorage()
// const indexStorage = await LRUStorage()
// Test LevelStorage
// const entryStorage = await LevelStorage({ path: './logA/entries' })
// const headsStorage = await LevelStorage({ path: './logA/heads' })
// const indexStorage = await LevelStorage({ path: './logA/index' })
const log = await Log(testIdentity, { logId: 'A', entryStorage, headsStorage })
const log = await Log(testIdentity, { logId: 'A', entryStorage, headsStorage, indexStorage })
const entryCount = 10000

View File

@ -48,7 +48,7 @@ const OrbitDB = async ({ ipfs, id, identity, keystore, directory } = {}) => {
directory = directory || './orbitdb'
keystore = keystore || await KeyStore({ path: path.join(directory, './keystore') })
const identities = await Identities({ ipfs, keystore })
identity = identity || await identities.createIdentity({ id, keystore })
identity = identity || await identities.createIdentity({ id })
const manifestStorage = await ComposedStorage(
await LRUStorage({ size: 1000 }),

View File

@ -7,7 +7,7 @@ import { ComposedStorage, LRUStorage, IPFSBlockStorage, LevelStorage } from './s
const defaultPointerCount = 0
const defaultCacheSize = 1000
const Database = async ({ OpLog, ipfs, identity, address, name, accessController, directory, meta, headsStorage, entryStorage, pointerCount, syncAutomatically }) => {
const Database = async ({ OpLog, ipfs, identity, address, name, accessController, directory, meta, headsStorage, entryStorage, indexStorage, pointerCount, syncAutomatically }) => {
const { Log, Entry } = OpLog
directory = Path.join(directory || './orbitdb', `./${address}/`)
@ -24,10 +24,12 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController
await LevelStorage({ path: Path.join(directory, '/log/_heads/') })
)
const log = await Log(identity, { logId: address, access: accessController, entryStorage, headsStorage })
indexStorage = indexStorage || await ComposedStorage(
await LRUStorage({ size: defaultCacheSize }),
await LevelStorage({ path: Path.join(directory, '/log/_index/') })
)
// const indexStorage = await LevelStorage({ path: Path.join(directory, '/log/_index/') })
// const log = await Log(identity, { logId: address.toString(), access: accessController, entryStorage, headsStorage, indexStorage })
const log = await Log(identity, { logId: address, access: accessController, entryStorage, headsStorage, indexStorage })
const events = new EventEmitter()
const queue = new PQueue({ concurrency: 1 })

View File

@ -21,7 +21,7 @@ const verifySignature = async (signature, publicKey, data) => {
}
if (!(data instanceof Uint8Array)) {
data = typeof data === "string" ? uint8ArrayFromString(data) : new Uint8Array(data)
data = typeof data === 'string' ? uint8ArrayFromString(data) : new Uint8Array(data)
}
const isValid = (key, msg, sig) => key.verify(msg, sig)
@ -46,8 +46,8 @@ const signMessage = async (key, data) => {
throw new Error('Given input data was undefined')
}
if (!(data instanceof Uint8Array)) {
data = typeof data === "string" ? uint8ArrayFromString(data) : new Uint8Array(data)
if (!(data instanceof Uint8Array)) {
data = typeof data === 'string' ? uint8ArrayFromString(data) : new Uint8Array(data)
}
return uint8ArrayToString(await key.sign(data), 'base16')

View File

@ -4,10 +4,6 @@ import Clock from './lamport-clock.js'
import Heads from './heads.js'
import Sorting from './sorting.js'
import MemoryStorage from '../storage/memory.js'
// import LRUStorage from './storage/lru.js'
// import LevelStorage from './storage/level.js'
// import IPFSBlockStorage from './storage/ipfs-block.js'
// import ComposedStorage from './storage/composed.js'
import { isDefined } from '../utils/index.js'
const { LastWriteWins, NoZeroes } = Sorting
@ -17,9 +13,6 @@ const maxClockTimeReducer = (res, acc) => Math.max(res, acc.clock.time)
// Default storage for storing the Log and its entries. Default: Memory. Options: Memory, LRU, IPFS.
const DefaultStorage = MemoryStorage
// const DefaultStorage = LRUStorage
// const DefaultStorage = LevelStorage
// const DefaultStorage = IPFSBlockStorage
// Default AccessController for the Log.
// Default policy is that anyone can write to the Log.
@ -69,7 +62,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
// Oplog entry storage
const _entries = entryStorage || await DefaultStorage()
// Entry index for keeping track which entries are already in the log
// const _index = indexStorage || await DefaultStorage()
const _index = indexStorage || await DefaultStorage()
// Heads storage
headsStorage = headsStorage || await DefaultStorage()
// Add heads to the state storage, ie. init the log state
@ -117,15 +110,14 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
const bytes = await _entries.get(hash)
if (bytes) {
const entry = await Entry.decode(bytes)
// await _index.put(hash, true)
await _index.put(hash, true)
return entry
}
}
const has = async (hash) => {
return false
// const entry = await _index.get(hash)
// return isDefined(entry)
const entry = await _index.get(hash)
return isDefined(entry)
}
/**
@ -163,7 +155,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
// Add entry to the entry storage
await _entries.put(entry.hash, entry.bytes)
// Add entry to the entry index
// await _index.put(entry.hash, true)
await _index.put(entry.hash, true)
// Return the appended entry
return entry
}
@ -207,6 +199,15 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
const isAlreadyInTheLog = await has(entry.hash)
if (isAlreadyInTheLog) {
return false
} else {
// Check that the entry is not an entry that hasn't been indexed
const it = traverse(await heads(), (e) => e.next.includes(entry.hash))
for await (const e of it) {
if (e.next.includes(entry.hash)) {
await _index.put(entry.hash, true)
return false
}
}
}
// Check that the Entry belongs to this Log
if (entry.id !== id) {
@ -233,7 +234,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
// Add the new entry to the entry storage
await _entries.put(entry.hash, entry.bytes)
// Add the new entry to the entry index
// await _index.put(entry.hash, true)
await _index.put(entry.hash, true)
// We've added the entry to the log
return true
}
@ -393,13 +394,13 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
}
const clear = async () => {
// await _index.clear()
await _index.clear()
await _heads.clear()
await _entries.clear()
}
const close = async () => {
// await _index.close()
await _index.close()
await _heads.close()
await _entries.close()
}
@ -451,6 +452,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
values,
all: values, // Alias for values()
get,
has,
append,
join,
joinEntry,

View File

@ -20,9 +20,7 @@ const IPFSBlockStorage = async ({ ipfs, timeout, pin } = {}) => {
})
}
const del = async (hash) => {
// TODO?
}
const del = async (hash) => {}
const get = async (hash) => {
const cid = CID.parse(hash, base58btc)
@ -34,8 +32,6 @@ const IPFSBlockStorage = async ({ ipfs, timeout, pin } = {}) => {
const iterator = async function * () {}
// TODO: all()
const merge = async (other) => {}
const clear = async () => {}
@ -47,7 +43,6 @@ const IPFSBlockStorage = async ({ ipfs, timeout, pin } = {}) => {
del,
get,
iterator,
// TODO: all,
merge,
clear,
close

View File

@ -34,8 +34,6 @@ const LevelStorage = async ({ path, valueEncoding } = {}) => {
}
}
// TODO: all()
const merge = async (other) => {}
const clear = async () => {
@ -51,7 +49,6 @@ const LevelStorage = async ({ path, valueEncoding } = {}) => {
del,
get,
iterator,
// TODO: all,
merge,
clear,
close

View File

@ -24,8 +24,6 @@ const LRUStorage = async ({ size } = {}) => {
}
}
// TODO: all()
const merge = async (other) => {
if (other) {
for await (const [key, value] of other.iterator()) {
@ -45,7 +43,6 @@ const LRUStorage = async ({ size } = {}) => {
del,
get,
iterator,
// TODO: all,
merge,
clear,
close

View File

@ -27,8 +27,6 @@ const MemoryStorage = async () => {
}
}
// TODO: all()
const clear = async () => {
memory = {}
}
@ -40,7 +38,6 @@ const MemoryStorage = async () => {
del,
get,
iterator,
// TODO: all,
merge,
clear,
close

View File

@ -14,6 +14,7 @@ describe('IPFSAccessController', function () {
this.timeout(config.timeout)
let ipfs1, ipfs2
let keystore1, keystore2
let identities1, identities2
let testIdentity1, testIdentity2
@ -22,8 +23,8 @@ describe('IPFSAccessController', function () {
ipfs2 = await IPFS.create({ ...config.daemon2, repo: './ipfs2' })
await connectPeers(ipfs1, ipfs2)
const keystore1 = await Keystore({ path: dbPath1 + '/keys' })
const keystore2 = await Keystore({ path: dbPath2 + '/keys' })
keystore1 = await Keystore({ path: dbPath1 + '/keys' })
keystore2 = await Keystore({ path: dbPath2 + '/keys' })
identities1 = await Identities({ keystore: keystore1 })
identities2 = await Identities({ keystore: keystore2 })
@ -41,6 +42,14 @@ describe('IPFSAccessController', function () {
await ipfs2.stop()
}
if (keystore1) {
await keystore1.close()
}
if (keystore2) {
await keystore2.close()
}
await rmrf('./orbitdb')
await rmrf('./ipfs1')
await rmrf('./ipfs2')

View File

@ -32,6 +32,7 @@ export default {
}
},
daemon1: {
silent: true,
EXPERIMENTAL: {
pubsub: true
},
@ -54,6 +55,7 @@ export default {
}
},
daemon2: {
silent: true,
EXPERIMENTAL: {
pubsub: true
},

View File

@ -83,27 +83,25 @@ describe('Database - Replication', function () {
})
it('replicates databases across two peers', async () => {
let connected1 = false
let connected2 = false
let replicated = false
let expectedEntryHash = null
const onConnected1 = (peerId, heads) => {
connected1 = true
const onConnected = (peerId, heads) => {
replicated = expectedEntryHash && heads.map(e => e.hash).includes(expectedEntryHash)
}
const onUpdate = (entry) => {
replicated = expectedEntryHash && entry.hash === expectedEntryHash
}
const onConnected2 = (peerId, heads) => {
connected2 = true
}
db1.events.on('join', onConnected1)
db2.events.on('join', onConnected2)
db2.events.on('join', onConnected)
db2.events.on('update', onUpdate)
await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' })
await db1.addOperation({ op: 'PUT', key: 2, value: 'record 2 on db 1' })
await db1.addOperation({ op: 'PUT', key: 3, value: 'record 3 on db 1' })
await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' })
expectedEntryHash = await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' })
await waitFor(() => connected1, () => true)
await waitFor(() => connected2, () => true)
await waitFor(() => replicated, () => true)
const all1 = []
for await (const item of db1.log.iterator()) {
@ -119,19 +117,19 @@ describe('Database - Replication', function () {
})
it('replicates databases across two peers with delays', async () => {
let connected1 = false
let connected2 = false
let replicated = false
let expectedEntryHash = null
const onConnected1 = (peerId, heads) => {
connected1 = true
const onConnected = (peerId, heads) => {
replicated = expectedEntryHash && heads.map(e => e.hash).includes(expectedEntryHash)
}
const onConnected2 = (peerId, heads) => {
connected2 = true
const onUpdate = (entry) => {
replicated = expectedEntryHash && entry.hash === expectedEntryHash
}
db1.events.on('join', onConnected1)
db2.events.on('join', onConnected2)
db2.events.on('join', onConnected)
db2.events.on('update', onUpdate)
await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' })
@ -146,10 +144,9 @@ describe('Database - Replication', function () {
setTimeout(() => resolve(), 1000)
})
await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' })
expectedEntryHash = await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' })
await waitFor(() => connected1, () => true)
await waitFor(() => connected2, () => true)
await waitFor(() => replicated, () => true)
const all1 = []
for await (const item of db1.log.iterator()) {
@ -166,6 +163,7 @@ describe('Database - Replication', function () {
it('adds an operation before db2 is instantiated', async () => {
let connected = false
const onConnected = (peerId, heads) => {
connected = true
}

View File

@ -426,4 +426,20 @@ describe('Log - Join', async function () {
strictEqual(values.length, 10)
deepStrictEqual(values.map((e) => e.payload), expectedData)
})
it('has correct heads after joining logs', async () => {
const e1 = await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.join(log1)
const heads1 = await log2.heads()
deepStrictEqual(heads1, [e3])
await log2.joinEntry(e1)
const heads2 = await log2.heads()
deepStrictEqual(heads2, [e3])
})
})

View File

@ -97,10 +97,10 @@ describe('Log - Replication', function () {
}
beforeEach(async () => {
log1 = await Log(testIdentity1, { logId, storage: storage1 })
log2 = await Log(testIdentity2, { logId, storage: storage2 })
input1 = await Log(testIdentity1, { logId, storage: storage1 })
input2 = await Log(testIdentity2, { logId, storage: storage2 })
log1 = await Log(testIdentity1, { logId, entryStorage: storage1 })
log2 = await Log(testIdentity2, { logId, entryStorage: storage2 })
input1 = await Log(testIdentity1, { logId, entryStorage: storage1 })
input2 = await Log(testIdentity2, { logId, entryStorage: storage2 })
await ipfs1.pubsub.subscribe(logId, handleMessage1)
await ipfs2.pubsub.subscribe(logId, handleMessage2)
})
@ -141,7 +141,7 @@ describe('Log - Replication', function () {
await whileProcessingMessages(config.timeout)
const result = await Log(testIdentity1, { logId, storage: storage1 })
const result = await Log(testIdentity1, { logId, entryStorage: storage1 })
await result.join(log1)
await result.join(log2)

View File

@ -157,7 +157,15 @@ describe('orbit-db - Multiple Databases', function () {
}
// Function to check if all databases have been replicated
const allReplicated = () => remoteDatabases.every(isReplicated)
const allReplicated = async () => {
for (const db of remoteDatabases) {
const replicated = await isReplicated(db)
if (!replicated) {
return false
}
}
return true
}
console.log('Waiting for replication to finish')

View File

@ -142,7 +142,7 @@ describe('Sync protocol', function () {
const onSynced = async (bytes) => {
syncedHead = await Entry.decode(bytes)
log2.joinEntry(syncedHead)
await log2.joinEntry(syncedHead)
syncedEventFired = true
}
@ -178,16 +178,17 @@ describe('Sync protocol', function () {
strictEqual(sync1.peers.has(String(peerId2)), true)
})
it('eventually reaches consistency', async () => {
it('is eventually consistent', async () => {
await sync1.add(await log1.append('hello2'))
await sync1.add(await log1.append('hello3'))
await sync1.add(await log1.append('hello4'))
expectedEntry = await log1.append('hello5')
await sync1.add(expectedEntry)
const expected = await log1.append('hello5')
await waitFor(() => syncedEventFired, () => true)
await sync1.add(expected)
deepStrictEqual(syncedHead, expectedEntry)
await waitFor(() => Entry.isEqual(expected, syncedHead), () => true)
deepStrictEqual(syncedHead, expected)
deepStrictEqual(await log1.heads(), await log2.heads())
@ -348,8 +349,8 @@ describe('Sync protocol', function () {
log2 = await Log(testIdentity2, { logId: 'synclog1' })
const onSynced = async (bytes) => {
syncedHead = await Entry.decode(bytes)
if (expectedEntry) {
if (expectedEntry && !syncedEventFired) {
syncedHead = await Entry.decode(bytes)
syncedEventFired = expectedEntry.hash === syncedHead.hash
}
}
@ -491,8 +492,8 @@ describe('Sync protocol', function () {
let leavingPeerId
before(async () => {
const log1 = await Log(testIdentity1, { logId: 'synclog2' })
const log2 = await Log(testIdentity2, { logId: 'synclog2' })
const log1 = await Log(testIdentity1, { logId: 'synclog3' })
const log2 = await Log(testIdentity2, { logId: 'synclog3' })
const onJoin = (peerId, heads) => {
joinEventFired = true
@ -511,12 +512,12 @@ describe('Sync protocol', function () {
}
const onSynced = (bytes) => {
throw new Error('Sync Error')
sync2.events.emit('error', new Error('Sync Error'))
}
await log1.append('hello!')
sync1 = await Sync({ ipfs: ipfs1, log: log1 })
sync1 = await Sync({ ipfs: ipfs1, log: log1, onSynced })
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
sync1.events.on('join', onJoin)
sync1.events.on('leave', onLeave)