From 16b335835504c65b0405f88927fcdd10994c06db Mon Sep 17 00:00:00 2001 From: haad Date: Wed, 21 Feb 2024 08:57:05 +0100 Subject: [PATCH] Process appends and joins through a queue --- src/oplog/log.js | 210 +++++++++++++++++++++++++---------------------- 1 file changed, 111 insertions(+), 99 deletions(-) diff --git a/src/oplog/log.js b/src/oplog/log.js index 7197491..5b3750d 100644 --- a/src/oplog/log.js +++ b/src/oplog/log.js @@ -7,6 +7,7 @@ * ["Merkle-CRDTs: Merkle-DAGs meet CRDTs"]{@link https://arxiv.org/abs/2004.00107} */ import LRU from 'lru' +import PQueue from 'p-queue' import Entry from './entry.js' import Clock, { tickClock } from './clock.js' import Heads from './heads.js' @@ -81,6 +82,9 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora const _heads = await Heads({ storage: headsStorage, heads: logHeads }) // Conflict-resolution sorting function sortFn = NoZeroes(sortFn || LastWriteWins) + // Internal queues for processing appends and joins in their call-order + const appendQueue = new PQueue({ concurrency: 1 }) + const joinQueue = new PQueue({ concurrency: 1 }) /** * Returns the clock of the log. @@ -153,40 +157,44 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora * @instance */ const append = async (data, options = { referencesCount: 0 }) => { - // 1. Prepare entry - // 2. Authorize entry - // 3. Store entry - // 4. return Entry - // Get current heads of the log - const heads_ = await heads() - // Create the next pointers from heads - const nexts = heads_.map(entry => entry.hash) - // Get references (pointers) to multiple entries in the past - // (skips the heads which are covered by the next field) - const refs = await getReferences(heads_, options.referencesCount + heads_.length) - // Create the entry - const entry = await Entry.create( - identity, - id, - data, - tickClock(await clock()), - nexts, - refs - ) - // Authorize the entry - const canAppend = await access.canAppend(entry) - if (!canAppend) { - throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`) + const task = async () => { + // 1. Prepare entry + // 2. Authorize entry + // 3. Store entry + // 4. return Entry + // Get current heads of the log + const heads_ = await heads() + // Create the next pointers from heads + const nexts = heads_.map(entry => entry.hash) + // Get references (pointers) to multiple entries in the past + // (skips the heads which are covered by the next field) + const refs = await getReferences(heads_, options.referencesCount + heads_.length) + // Create the entry + const entry = await Entry.create( + identity, + id, + data, + tickClock(await clock()), + nexts, + refs + ) + // Authorize the entry + const canAppend = await access.canAppend(entry) + if (!canAppend) { + throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`) + } + + // The appended entry is now the latest head + await _heads.set([entry]) + // Add entry to the entry storage + await _entries.put(entry.hash, entry.bytes) + // Add entry to the entry index + await _index.put(entry.hash, true) + // Return the appended entry + return entry } - // The appended entry is now the latest head - await _heads.set([entry]) - // Add entry to the entry storage - await _entries.put(entry.hash, entry.bytes) - // Add entry to the entry index - await _index.put(entry.hash, true) - // Return the appended entry - return entry + return appendQueue.add(task) } /** @@ -232,81 +240,85 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora * @instance */ const joinEntry = async (entry) => { - /* 1. Check if the entry is already in the log and return early if it is */ - const isAlreadyInTheLog = await has(entry.hash) - if (isAlreadyInTheLog) { - return false - } - - const verifyEntry = async (entry) => { - // Check that the Entry belongs to this Log - if (entry.id !== id) { - throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`) + const task = async () => { + /* 1. Check if the entry is already in the log and return early if it is */ + const isAlreadyInTheLog = await has(entry.hash) + if (isAlreadyInTheLog) { + return false } - // Verify if entry is allowed to be added to the log - const canAppend = await access.canAppend(entry) - if (!canAppend) { - throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`) - } - // Verify signature for the entry - const isValid = await Entry.verify(identity, entry) - if (!isValid) { - throw new Error(`Could not validate signature for entry "${entry.hash}"`) - } - } - /* 2. Verify the entry */ - await verifyEntry(entry) - - /* 3. Find missing entries and connections (=path in the DAG) to the current heads */ - const headsHashes = (await heads()).map(e => e.hash) - const hashesToAdd = new Set([entry.hash]) - const hashesToGet = new Set([...entry.next, ...entry.refs]) - const connectedHeads = new Set() - - const traverseAndVerify = async () => { - const getEntries = Array.from(hashesToGet.values()).filter(has).map(get) - const entries = await Promise.all(getEntries) - - for (const e of entries) { - hashesToGet.delete(e.hash) - - await verifyEntry(e) - - hashesToAdd.add(e.hash) - - for (const hash of [...e.next, ...e.refs]) { - const isInTheLog = await has(hash) - - if (!isInTheLog && !hashesToAdd.has(hash)) { - hashesToGet.add(hash) - } else if (headsHashes.includes(hash)) { - connectedHeads.add(hash) - } + const verifyEntry = async (entry) => { + // Check that the Entry belongs to this Log + if (entry.id !== id) { + throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`) + } + // Verify if entry is allowed to be added to the log + const canAppend = await access.canAppend(entry) + if (!canAppend) { + throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`) + } + // Verify signature for the entry + const isValid = await Entry.verify(identity, entry) + if (!isValid) { + throw new Error(`Could not validate signature for entry "${entry.hash}"`) } } - if (hashesToGet.size > 0) { - await traverseAndVerify() + /* 2. Verify the entry */ + await verifyEntry(entry) + + /* 3. Find missing entries and connections (=path in the DAG) to the current heads */ + const headsHashes = (await heads()).map(e => e.hash) + const hashesToAdd = new Set([entry.hash]) + const hashesToGet = new Set([...entry.next, ...entry.refs]) + const connectedHeads = new Set() + + const traverseAndVerify = async () => { + const getEntries = Array.from(hashesToGet.values()).filter(has).map(get) + const entries = await Promise.all(getEntries) + + for (const e of entries) { + hashesToGet.delete(e.hash) + + await verifyEntry(e) + + hashesToAdd.add(e.hash) + + for (const hash of [...e.next, ...e.refs]) { + const isInTheLog = await has(hash) + + if (!isInTheLog && !hashesToAdd.has(hash)) { + hashesToGet.add(hash) + } else if (headsHashes.includes(hash)) { + connectedHeads.add(hash) + } + } + } + + if (hashesToGet.size > 0) { + await traverseAndVerify() + } } + + await traverseAndVerify() + + /* 4. Add missing entries to the index (=to the log) */ + for (const hash of hashesToAdd.values()) { + await _index.put(hash, true) + } + + /* 5. Remove heads which new entries are connect to */ + for (const hash of connectedHeads.values()) { + await _heads.remove(hash) + } + + /* 6. Add the new entry to heads (=union with current heads) */ + await _heads.add(entry) + + return true } - await traverseAndVerify() - - /* 4. Add missing entries to the index (=to the log) */ - for (const hash of hashesToAdd.values()) { - await _index.put(hash, true) - } - - /* 5. Remove heads which new entries are connect to */ - for (const hash of connectedHeads.values()) { - await _heads.remove(hash) - } - - /* 6. Add the new entry to heads (=union with current heads) */ - await _heads.add(entry) - - return true + return joinQueue.add(task) } /**