Refactor joinEntry implementation

This commit is contained in:
haad 2023-11-22 12:43:53 +02:00
parent 000e072251
commit f675641982
6 changed files with 626 additions and 58 deletions

View File

@ -65,6 +65,8 @@ const ipfsConfig = {
const db2 = await orbitdb2.open(db1.address)
const startTime2 = new Date().getTime()
let connected = false
const onJoin = async (peerId) => (connected = true)
@ -74,7 +76,6 @@ const ipfsConfig = {
await waitFor(() => connected, () => true)
console.log(`Iterate ${entryCount} events to replicate them`)
const startTime2 = new Date().getTime()
const all = []
for await (const { value } of db2.iterator()) {

View File

@ -79,7 +79,7 @@ const create = async (identity, id, payload, clock = null, next = [], refs = [])
entry.identity = identity.hash
entry.sig = signature
return _encodeEntry(entry)
return encode(entry)
}
/**
@ -148,10 +148,17 @@ const isEqual = (a, b) => {
*/
const decode = async (bytes) => {
const { value } = await Block.decode({ bytes, codec, hasher })
return _encodeEntry(value)
return encode(value)
}
const _encodeEntry = async (entry) => {
/**
* Encodes an Entry and adds bytes field to it
* @param {Entry} entry
* @return {module:Log~Entry}
* @memberof module:Log~Entry
* @private
*/
const encode = async (entry) => {
const { cid, bytes } = await Block.encode({ value: entry, codec, hasher })
const hash = cid.toString(hashStringEncoding)
const clock = Clock(entry.clock.id, entry.clock.time)
@ -167,6 +174,7 @@ export default {
create,
verify,
decode,
encode,
isEntry,
isEqual
}

View File

@ -35,6 +35,12 @@ const Heads = async ({ storage, heads }) => {
return newHeads
}
const remove = async (hash) => {
const currentHeads = await all()
const newHeads = currentHeads.filter(e => e.hash !== hash)
await set(newHeads)
}
const iterator = async function * () {
const it = storage.iterator()
for await (const [, bytes] of it) {
@ -66,6 +72,7 @@ const Heads = async ({ storage, heads }) => {
put,
set,
add,
remove,
iterator,
all,
clear,

View File

@ -128,7 +128,6 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
const bytes = await _entries.get(hash)
if (bytes) {
const entry = await Entry.decode(bytes)
await _index.put(hash, true)
return entry
}
}
@ -206,13 +205,13 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
if (!isLog(log)) {
throw new Error('Given argument is not an instance of Log')
}
if (_entries.merge) {
await _entries.merge(log.storage)
}
const heads = await log.heads()
for (const entry of heads) {
await joinEntry(entry)
}
if (_entries.merge) {
await _entries.merge(log.storage)
}
}
/**
@ -222,54 +221,86 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
*
* @example
*
* await log.join(entry)
* await log.joinEntry(entry)
*
* @memberof module:Log~Log
* @instance
*/
const joinEntry = async (entry) => {
const { hash } = entry
// Check if the entry is already in the log and return early if it is
const isAlreadyInTheLog = await has(hash)
/* 1. Check if the entry is already in the log and return early if it is */
const isAlreadyInTheLog = await has(entry.hash)
if (isAlreadyInTheLog) {
return false
} else {
// Check that the entry is not an entry that hasn't been indexed
const it = traverse(await heads(), (e) => e.next.includes(hash) || entry.next.includes(e.hash))
for await (const e of it) {
if (e.next.includes(hash)) {
await _index.put(hash, true)
return false
}
}
const verifyEntry = async (entry) => {
// Check that the Entry belongs to this Log
if (entry.id !== id) {
throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`)
}
// Verify if entry is allowed to be added to the log
const canAppend = await access.canAppend(entry)
if (!canAppend) {
throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`)
}
// Verify signature for the entry
const isValid = await Entry.verify(identity, entry)
if (!isValid) {
throw new Error(`Could not validate signature for entry "${entry.hash}"`)
}
}
// Check that the Entry belongs to this Log
if (entry.id !== id) {
throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`)
}
// Verify if entry is allowed to be added to the log
const canAppend = await access.canAppend(entry)
if (!canAppend) {
throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`)
}
// Verify signature for the entry
const isValid = await Entry.verify(identity, entry)
if (!isValid) {
throw new Error(`Could not validate signature for entry "${hash}"`)
/* 2. Verify the entry */
await verifyEntry(entry)
/* 3. Find missing entries and connections (=path in the DAG) to the current heads */
const headsHashes = (await heads()).map(e => e.hash)
const hashesToAdd = new Set([entry.hash])
const hashesToGet = new Set([...entry.next, ...entry.refs])
const connectedHeads = new Set()
const traverseAndVerify = async () => {
const getEntries = Array.from(hashesToGet.values()).filter(has).map(get)
const entries = await Promise.all(getEntries)
for (const e of entries) {
hashesToGet.delete(e.hash)
await verifyEntry(e)
hashesToAdd.add(e.hash)
for (const hash of [...e.next, ...e.refs]) {
const isInTheLog = await has(hash)
if (!isInTheLog && !hashesToAdd.has(hash)) {
hashesToGet.add(hash)
} else if (headsHashes.includes(hash)) {
connectedHeads.add(hash)
}
}
}
if (hashesToGet.size > 0) {
await traverseAndVerify()
}
}
// Add the new entry to heads (union with current heads)
const newHeads = await _heads.add(entry)
await traverseAndVerify()
if (!newHeads) {
return false
/* 4. Add missing entries to the index (=to the log) */
for (const hash of hashesToAdd.values()) {
await _index.put(hash, true)
}
// Add the new entry to the entry storage
await _entries.put(hash, entry.bytes)
// Add the new entry to the entry index
await _index.put(hash, true)
// We've added the entry to the log
/* 5. Remove heads which new entries are connect to */
for (const hash of connectedHeads.values()) {
await _heads.remove(hash)
}
/* 6. Add the new entry to heads (=union with current heads) */
await _heads.add(entry)
return true
}
@ -330,7 +361,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
// Add the next and refs fields from the fetched entries to the next round
toFetch = nexts
.filter(e => e != null)
.filter(e => e !== null && e !== undefined)
.reduce((res, acc) => Array.from(new Set([...res, ...acc.next, ...(useRefs ? acc.refs : [])])), [])
.filter(notIndexed)
// Add the fetched entries to the stack to be processed

View File

@ -1,8 +1,9 @@
import { strictEqual, notStrictEqual, deepStrictEqual } from 'assert'
import { rimraf } from 'rimraf'
import { copy } from 'fs-extra'
import { Log, Identities, KeyStore } from '../../src/index.js'
import { Log, Entry, Identities, KeyStore } from '../../src/index.js'
import { Clock } from '../../src/oplog/log.js'
import { MemoryStorage } from '../../src/storage/index.js'
import testKeysPath from '../fixtures/test-keys-path.js'
const keysPath = './testkeys'
@ -427,19 +428,549 @@ describe('Log - Join', async function () {
deepStrictEqual(values.map((e) => e.payload), expectedData)
})
it('has correct heads after joining logs', async () => {
it('doesn\'t add the given entry to the log when the given entry is already in the log', async () => {
const e1 = await log1.append('hello1')
const e2 = await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.join(log1)
const all1 = await log2.all()
deepStrictEqual(all1.length, 3)
deepStrictEqual(all1[0], e1)
deepStrictEqual(all1[1], e2)
deepStrictEqual(all1[2], e3)
await log2.joinEntry(e1)
await log2.joinEntry(e2)
await log2.joinEntry(e3)
const all2 = await log2.all()
deepStrictEqual(all2.length, 3)
deepStrictEqual(all2[0], e1)
deepStrictEqual(all2[1], e2)
deepStrictEqual(all2[2], e3)
})
it('doesn\'t add the given entry to the heads when the given entry is already in the log', async () => {
const e1 = await log1.append('hello1')
const e2 = await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.join(log1)
const heads1 = await log2.heads()
deepStrictEqual(heads1, [e3])
await log2.joinEntry(e1)
await log2.joinEntry(e2)
await log2.joinEntry(e3)
const heads2 = await log2.heads()
strictEqual(heads2.length, 1)
deepStrictEqual(heads2[0].hash, e3.hash)
})
it('joinEntry returns false when the given entry is already in the log', async () => {
const e1 = await log1.append('hello1')
const e2 = await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.join(log1)
const heads1 = await log2.heads()
deepStrictEqual(heads1, [e3])
const r1 = await log2.joinEntry(e1)
const r2 = await log2.joinEntry(e2)
const r3 = await log2.joinEntry(e3)
deepStrictEqual([r1, r2, r3].every(e => e === false), true)
})
it('replaces the heads if the given entry is a new head and has a direct path to the old head', async () => {
await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.join(log1)
const heads1 = await log2.heads()
deepStrictEqual(heads1, [e3])
await log2.joinEntry(e1)
await log1.append('hello4')
await log1.append('hello5')
const e6 = await log1.append('hello6')
await log2.storage.merge(log1.storage)
await log2.joinEntry(e6)
const heads2 = await log2.heads()
const all = await log2.all()
strictEqual(heads2.length, 1)
deepStrictEqual(heads2[0].hash, e6.hash)
strictEqual(all.length, 6)
deepStrictEqual(all.map(e => e.payload), ['hello1', 'hello2', 'hello3', 'hello4', 'hello5', 'hello6'])
})
it('replaces a head when given entry is a new head and there are multiple current heads', async () => {
await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.append('helloA')
await log2.append('helloB')
const eC = await log2.append('helloC')
await log3.join(log1)
await log3.join(log2)
const heads1 = await log3.heads()
deepStrictEqual(heads1, [eC, e3])
await log1.append('hello4')
await log1.append('hello5')
const e6 = await log1.append('hello6')
await log1.storage.merge(log3.storage)
await log3.storage.merge(log1.storage)
await log2.storage.merge(log1.storage)
await log2.storage.merge(log3.storage)
await log3.joinEntry(e6)
const heads2 = await log3.heads()
strictEqual(heads2.length, 2)
deepStrictEqual(heads2[0].hash, e6.hash)
deepStrictEqual(heads2[1].hash, eC.hash)
})
it('replaces both heads when given entries are new heads and there are two current heads', async () => {
await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.append('helloA')
await log2.append('helloB')
const eC = await log2.append('helloC')
await log3.join(log1)
await log3.join(log2)
const heads1 = await log3.heads()
deepStrictEqual(heads1, [eC, e3])
await log1.append('hello4')
await log1.append('hello5')
const e6 = await log1.append('hello6')
await log2.append('helloD')
await log2.append('helloE')
const eF = await log2.append('helloF')
await log3.storage.merge(log1.storage)
await log3.storage.merge(log2.storage)
await log3.joinEntry(e6)
await log3.joinEntry(eF)
const heads2 = await log3.heads()
strictEqual(heads2.length, 2)
deepStrictEqual(heads2[0].hash, eF.hash)
deepStrictEqual(heads2[1].hash, e6.hash)
})
it('adds the given entry to the heads when forked logs have multiple heads', async () => {
await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.join(log1)
const heads1 = await log1.heads()
const heads2 = await log2.heads()
deepStrictEqual(heads1, [e3])
deepStrictEqual(heads2, [e3])
await log2.append('helloX')
const eY = await log2.append('helloY')
await log1.append('hello4')
await log1.append('hello5')
const e6 = await log1.append('hello6')
await log2.storage.merge(log1.storage)
await log2.joinEntry(e6)
const heads3 = await log2.heads()
strictEqual(heads3.length, 2)
deepStrictEqual(heads3[0].hash, e6.hash)
deepStrictEqual(heads3[1].hash, eY.hash)
})
it('replaces one head but not the other when forked logs have multiple heads', async () => {
await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.join(log1)
const heads1 = await log1.heads()
const heads2 = await log2.heads()
deepStrictEqual(heads1, [e3])
deepStrictEqual(heads2, [e3])
await log2.append('helloX')
const eY = await log2.append('helloY')
await log1.append('hello4')
await log1.append('hello5')
const e6 = await log1.append('hello6')
await log2.storage.merge(log1.storage)
await log2.joinEntry(e6)
const heads3 = await log2.heads()
strictEqual(heads3.length, 2)
deepStrictEqual(heads3[0].hash, e6.hash)
deepStrictEqual(heads3[1].hash, eY.hash)
await log1.append('hello7')
const e8 = await log1.append('hello8')
await log2.storage.merge(log1.storage)
await log2.joinEntry(e8)
const heads4 = await log2.heads()
strictEqual(heads4.length, 2)
deepStrictEqual(heads4[0].hash, e8.hash)
deepStrictEqual(heads4[1].hash, eY.hash)
})
it('doesn\'t add the joined entry to the log when previously joined logs have forks and multiple heads', async () => {
const e1 = await log1.append('hello1')
const e2 = await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.join(log1)
const heads1 = await log1.heads()
const heads2 = await log2.heads()
deepStrictEqual(heads1, [e3])
deepStrictEqual(heads2, [e3])
await log2.append('helloX')
const eY = await log2.append('helloY')
const e4 = await log1.append('hello4')
const e5 = await log1.append('hello5')
const e6 = await log1.append('hello6')
await log2.storage.merge(log1.storage)
await log2.joinEntry(e6)
const res5 = await log2.joinEntry(e5)
const res4 = await log2.joinEntry(e4)
const res3 = await log2.joinEntry(e3)
const res2 = await log2.joinEntry(e2)
const res1 = await log2.joinEntry(e1)
strictEqual(res1, false)
strictEqual(res2, false)
strictEqual(res3, false)
strictEqual(res4, false)
strictEqual(res5, false)
const heads3 = await log2.heads()
strictEqual(heads3.length, 2)
deepStrictEqual(heads3[0].hash, e6.hash)
deepStrictEqual(heads3[1].hash, eY.hash)
})
it('replaces both heads when forked logs have multiple heads', async () => {
await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.append('helloA')
await log2.append('helloB')
const eC = await log2.append('helloC')
await log2.storage.merge(log1.storage)
await log2.joinEntry(e3)
const heads1 = await log2.heads()
strictEqual(heads1.length, 2)
deepStrictEqual(heads1[0].hash, eC.hash)
deepStrictEqual(heads1[1].hash, e3.hash)
await log1.append('hello4')
await log1.append('hello5')
const e6 = await log1.append('hello6')
await log2.append('helloD')
await log2.append('helloE')
const eF = await log2.append('helloF')
await log2.storage.merge(log1.storage)
await log2.joinEntry(e6)
const heads2 = await log2.heads()
strictEqual(heads2.length, 2)
deepStrictEqual(heads2[0].hash, eF.hash)
deepStrictEqual(heads2[1].hash, e6.hash)
})
describe('trying to join an entry with invalid preceeding entries', () => {
it('throws an error if an entry belongs to another log', async () => {
const headsStorage1 = await MemoryStorage()
const log0 = await Log(testIdentity2, { logId: 'Y' })
log1 = await Log(testIdentity, { logId: 'X', headsStorage: headsStorage1 })
log2 = await Log(testIdentity2, { logId: 'X' })
const e0 = await log0.append('helloA')
await log1.storage.merge(log0.storage)
await headsStorage1.put(e0.hash, e0.bytes)
await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.storage.merge(log1.storage)
let err
try {
await log2.joinEntry(e3)
} catch (e) {
err = e
}
notStrictEqual(err, undefined)
strictEqual(err.message, 'Entry\'s id (Y) doesn\'t match the log\'s id (X).')
deepStrictEqual(await log2.all(), [])
deepStrictEqual(await log2.heads(), [])
})
it('throws an error if an entry doesn\'t pass access controller #1', async () => {
const canAppend = (entry) => {
if (entry.payload === 'hello1') {
return false
}
return true
}
log1 = await Log(testIdentity, { logId: 'X' })
log2 = await Log(testIdentity2, { logId: 'X', access: { canAppend } })
await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.storage.merge(log1.storage)
let err
try {
await log2.joinEntry(e3)
} catch (e) {
err = e
}
notStrictEqual(err, undefined)
strictEqual(err.message, 'Could not append entry:\nKey "zdpuAvqN22Rxwx5EEenq6EyeydVKPKn43MXHzauuicjLEp8jP" is not allowed to write to the log')
deepStrictEqual(await log2.all(), [])
deepStrictEqual(await log2.heads(), [])
})
it('throws an error if an entry doesn\'t pass access controller #2', async () => {
const canAppend = (entry) => {
if (entry.payload === 'hello2') {
return false
}
return true
}
log1 = await Log(testIdentity, { logId: 'X' })
log2 = await Log(testIdentity2, { logId: 'X' })
log3 = await Log(testIdentity3, { logId: 'X', access: { canAppend } })
await log1.append('hello1')
await log1.append('hello2')
const e3 = await log1.append('hello3')
await log2.append('helloA')
await log2.append('helloB')
const eC = await log2.append('helloC')
await log3.storage.merge(log1.storage)
await log3.storage.merge(log2.storage)
await log3.joinEntry(eC)
await log2.storage.merge(log1.storage)
await log2.joinEntry(e3)
await log2.append('helloD')
await log2.append('helloE')
const eF = await log2.append('helloF')
await log3.storage.merge(log1.storage)
await log3.storage.merge(log2.storage)
let err
try {
await log3.joinEntry(eF)
} catch (e) {
err = e
}
notStrictEqual(err, undefined)
strictEqual(err.message, 'Could not append entry:\nKey "zdpuAvqN22Rxwx5EEenq6EyeydVKPKn43MXHzauuicjLEp8jP" is not allowed to write to the log')
deepStrictEqual((await log3.all()).map(e => e.payload), ['helloA', 'helloB', 'helloC'])
deepStrictEqual((await log3.heads()).map(e => e.payload), ['helloC'])
})
})
describe('throws an error if verification of an entry in given entry\'s history fails', async () => {
let e1, e3
let headsStorage1, headsStorage2
before(async () => {
headsStorage1 = await MemoryStorage()
headsStorage2 = await MemoryStorage()
log1 = await Log(testIdentity, { logId: 'X', entryStorage: headsStorage1 })
log2 = await Log(testIdentity2, { logId: 'X', entryStorage: headsStorage2 })
e1 = await log1.append('hello1')
await log1.append('hello2')
e3 = await log1.append('hello3')
})
it('throws an error if an entry doesn\'t have a payload field', async () => {
const e = Object.assign({}, e1)
delete e.payload
delete e.bytes
delete e.hash
const ee = await Entry.encode(e)
await headsStorage1.put(e1.hash, ee.bytes)
await log2.storage.merge(headsStorage1)
let err
try {
await log2.joinEntry(e3)
} catch (e) {
err = e
}
notStrictEqual(err, undefined)
strictEqual(err.message, 'Invalid Log entry')
deepStrictEqual(await log2.all(), [])
deepStrictEqual(await log2.heads(), [])
})
it('throws an error if an entry doesn\'t have a key field', async () => {
const e = Object.assign({}, e1)
delete e.key
delete e.bytes
delete e.hash
const ee = await Entry.encode(e)
await headsStorage1.put(e1.hash, ee.bytes)
await log2.storage.merge(headsStorage1)
let err
try {
await log2.joinEntry(e3)
} catch (e) {
err = e
}
notStrictEqual(err, undefined)
strictEqual(err.message, 'Entry doesn\'t have a key')
deepStrictEqual(await log2.all(), [])
deepStrictEqual(await log2.heads(), [])
})
it('throws an error if an entry doesn\'t have a signature field', async () => {
const e = Object.assign({}, e1)
delete e.sig
delete e.bytes
delete e.hash
const ee = await Entry.encode(e)
await headsStorage1.put(e1.hash, ee.bytes)
await log2.storage.merge(headsStorage1)
let err
try {
await log2.joinEntry(e3)
} catch (e) {
err = e
}
notStrictEqual(err, undefined)
strictEqual(err.message, 'Entry doesn\'t have a signature')
deepStrictEqual(await log2.all(), [])
deepStrictEqual(await log2.heads(), [])
})
it('throws an error if an entry signature doesn\'t verify', async () => {
const e = Object.assign({}, e1)
e.sig = '1234567890'
delete e.bytes
delete e.hash
const ee = await Entry.encode(e)
await headsStorage1.put(e1.hash, ee.bytes)
await log2.storage.merge(headsStorage1)
let err
try {
await log2.joinEntry(e3)
} catch (e) {
err = e
}
notStrictEqual(err, undefined)
strictEqual(err.message, 'Could not validate signature for entry "zdpuAvkAJ8C46cnGdtFpcBratA5MqK7CcjqCJjjmuKuFvZir3"')
deepStrictEqual(await log2.all(), [])
deepStrictEqual(await log2.heads(), [])
})
})
})

View File

@ -7,7 +7,7 @@ import connectPeers from './utils/connect-nodes.js'
import waitFor from './utils/wait-for.js'
describe('Replicating databases', function () {
this.timeout(30000)
this.timeout(10000)
let ipfs1, ipfs2
let orbitdb1, orbitdb2
@ -62,16 +62,7 @@ describe('Replicating databases', function () {
let replicated = false
const onJoin = async (peerId, heads) => {
const head = (await db2.log.heads())[0]
if (head && head.clock.time === amount) {
replicated = true
}
}
const onUpdated = (entry) => {
if (entry.clock.time === amount) {
replicated = true
}
replicated = true
}
const onError = (err) => {
@ -81,7 +72,6 @@ describe('Replicating databases', function () {
db2 = await orbitdb2.open(db1.address)
db2.events.on('join', onJoin)
db2.events.on('update', onUpdated)
db2.events.on('error', onError)
db1.events.on('error', onError)