Process appends and joins through a queue

This commit is contained in:
haad 2024-02-21 08:57:05 +01:00
parent 734f50ed7e
commit 16b3358355

View File

@ -7,6 +7,7 @@
* ["Merkle-CRDTs: Merkle-DAGs meet CRDTs"]{@link https://arxiv.org/abs/2004.00107} * ["Merkle-CRDTs: Merkle-DAGs meet CRDTs"]{@link https://arxiv.org/abs/2004.00107}
*/ */
import LRU from 'lru' import LRU from 'lru'
import PQueue from 'p-queue'
import Entry from './entry.js' import Entry from './entry.js'
import Clock, { tickClock } from './clock.js' import Clock, { tickClock } from './clock.js'
import Heads from './heads.js' import Heads from './heads.js'
@ -81,6 +82,9 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
const _heads = await Heads({ storage: headsStorage, heads: logHeads }) const _heads = await Heads({ storage: headsStorage, heads: logHeads })
// Conflict-resolution sorting function // Conflict-resolution sorting function
sortFn = NoZeroes(sortFn || LastWriteWins) sortFn = NoZeroes(sortFn || LastWriteWins)
// Internal queues for processing appends and joins in their call-order
const appendQueue = new PQueue({ concurrency: 1 })
const joinQueue = new PQueue({ concurrency: 1 })
/** /**
* Returns the clock of the log. * Returns the clock of the log.
@ -153,40 +157,44 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
* @instance * @instance
*/ */
const append = async (data, options = { referencesCount: 0 }) => { const append = async (data, options = { referencesCount: 0 }) => {
// 1. Prepare entry const task = async () => {
// 2. Authorize entry // 1. Prepare entry
// 3. Store entry // 2. Authorize entry
// 4. return Entry // 3. Store entry
// Get current heads of the log // 4. return Entry
const heads_ = await heads() // Get current heads of the log
// Create the next pointers from heads const heads_ = await heads()
const nexts = heads_.map(entry => entry.hash) // Create the next pointers from heads
// Get references (pointers) to multiple entries in the past const nexts = heads_.map(entry => entry.hash)
// (skips the heads which are covered by the next field) // Get references (pointers) to multiple entries in the past
const refs = await getReferences(heads_, options.referencesCount + heads_.length) // (skips the heads which are covered by the next field)
// Create the entry const refs = await getReferences(heads_, options.referencesCount + heads_.length)
const entry = await Entry.create( // Create the entry
identity, const entry = await Entry.create(
id, identity,
data, id,
tickClock(await clock()), data,
nexts, tickClock(await clock()),
refs nexts,
) refs
// Authorize the entry )
const canAppend = await access.canAppend(entry) // Authorize the entry
if (!canAppend) { const canAppend = await access.canAppend(entry)
throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`) if (!canAppend) {
throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`)
}
// The appended entry is now the latest head
await _heads.set([entry])
// Add entry to the entry storage
await _entries.put(entry.hash, entry.bytes)
// Add entry to the entry index
await _index.put(entry.hash, true)
// Return the appended entry
return entry
} }
// The appended entry is now the latest head return appendQueue.add(task)
await _heads.set([entry])
// Add entry to the entry storage
await _entries.put(entry.hash, entry.bytes)
// Add entry to the entry index
await _index.put(entry.hash, true)
// Return the appended entry
return entry
} }
/** /**
@ -232,81 +240,85 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
* @instance * @instance
*/ */
const joinEntry = async (entry) => { const joinEntry = async (entry) => {
/* 1. Check if the entry is already in the log and return early if it is */ const task = async () => {
const isAlreadyInTheLog = await has(entry.hash) /* 1. Check if the entry is already in the log and return early if it is */
if (isAlreadyInTheLog) { const isAlreadyInTheLog = await has(entry.hash)
return false if (isAlreadyInTheLog) {
} return false
const verifyEntry = async (entry) => {
// Check that the Entry belongs to this Log
if (entry.id !== id) {
throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`)
} }
// Verify if entry is allowed to be added to the log
const canAppend = await access.canAppend(entry)
if (!canAppend) {
throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`)
}
// Verify signature for the entry
const isValid = await Entry.verify(identity, entry)
if (!isValid) {
throw new Error(`Could not validate signature for entry "${entry.hash}"`)
}
}
/* 2. Verify the entry */ const verifyEntry = async (entry) => {
await verifyEntry(entry) // Check that the Entry belongs to this Log
if (entry.id !== id) {
/* 3. Find missing entries and connections (=path in the DAG) to the current heads */ throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`)
const headsHashes = (await heads()).map(e => e.hash) }
const hashesToAdd = new Set([entry.hash]) // Verify if entry is allowed to be added to the log
const hashesToGet = new Set([...entry.next, ...entry.refs]) const canAppend = await access.canAppend(entry)
const connectedHeads = new Set() if (!canAppend) {
throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`)
const traverseAndVerify = async () => { }
const getEntries = Array.from(hashesToGet.values()).filter(has).map(get) // Verify signature for the entry
const entries = await Promise.all(getEntries) const isValid = await Entry.verify(identity, entry)
if (!isValid) {
for (const e of entries) { throw new Error(`Could not validate signature for entry "${entry.hash}"`)
hashesToGet.delete(e.hash)
await verifyEntry(e)
hashesToAdd.add(e.hash)
for (const hash of [...e.next, ...e.refs]) {
const isInTheLog = await has(hash)
if (!isInTheLog && !hashesToAdd.has(hash)) {
hashesToGet.add(hash)
} else if (headsHashes.includes(hash)) {
connectedHeads.add(hash)
}
} }
} }
if (hashesToGet.size > 0) { /* 2. Verify the entry */
await traverseAndVerify() await verifyEntry(entry)
/* 3. Find missing entries and connections (=path in the DAG) to the current heads */
const headsHashes = (await heads()).map(e => e.hash)
const hashesToAdd = new Set([entry.hash])
const hashesToGet = new Set([...entry.next, ...entry.refs])
const connectedHeads = new Set()
const traverseAndVerify = async () => {
const getEntries = Array.from(hashesToGet.values()).filter(has).map(get)
const entries = await Promise.all(getEntries)
for (const e of entries) {
hashesToGet.delete(e.hash)
await verifyEntry(e)
hashesToAdd.add(e.hash)
for (const hash of [...e.next, ...e.refs]) {
const isInTheLog = await has(hash)
if (!isInTheLog && !hashesToAdd.has(hash)) {
hashesToGet.add(hash)
} else if (headsHashes.includes(hash)) {
connectedHeads.add(hash)
}
}
}
if (hashesToGet.size > 0) {
await traverseAndVerify()
}
} }
await traverseAndVerify()
/* 4. Add missing entries to the index (=to the log) */
for (const hash of hashesToAdd.values()) {
await _index.put(hash, true)
}
/* 5. Remove heads which new entries are connect to */
for (const hash of connectedHeads.values()) {
await _heads.remove(hash)
}
/* 6. Add the new entry to heads (=union with current heads) */
await _heads.add(entry)
return true
} }
await traverseAndVerify() return joinQueue.add(task)
/* 4. Add missing entries to the index (=to the log) */
for (const hash of hashesToAdd.values()) {
await _index.put(hash, true)
}
/* 5. Remove heads which new entries are connect to */
for (const hash of connectedHeads.values()) {
await _heads.remove(hash)
}
/* 6. Add the new entry to heads (=union with current heads) */
await _heads.add(entry)
return true
} }
/** /**