mirror of
https://github.com/orbitdb/orbitdb.git
synced 2025-03-30 15:08:28 +00:00
Merge pull request #1161 from orbitdb/fix/log-concurrency
Process appends and joins through a queue
This commit is contained in:
commit
16686c6a82
210
src/oplog/log.js
210
src/oplog/log.js
@ -7,6 +7,7 @@
|
||||
* ["Merkle-CRDTs: Merkle-DAGs meet CRDTs"]{@link https://arxiv.org/abs/2004.00107}
|
||||
*/
|
||||
import LRU from 'lru'
|
||||
import PQueue from 'p-queue'
|
||||
import Entry from './entry.js'
|
||||
import Clock, { tickClock } from './clock.js'
|
||||
import Heads from './heads.js'
|
||||
@ -81,6 +82,9 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
|
||||
const _heads = await Heads({ storage: headsStorage, heads: logHeads })
|
||||
// Conflict-resolution sorting function
|
||||
sortFn = NoZeroes(sortFn || LastWriteWins)
|
||||
// Internal queues for processing appends and joins in their call-order
|
||||
const appendQueue = new PQueue({ concurrency: 1 })
|
||||
const joinQueue = new PQueue({ concurrency: 1 })
|
||||
|
||||
/**
|
||||
* Returns the clock of the log.
|
||||
@ -153,40 +157,44 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
|
||||
* @instance
|
||||
*/
|
||||
const append = async (data, options = { referencesCount: 0 }) => {
|
||||
// 1. Prepare entry
|
||||
// 2. Authorize entry
|
||||
// 3. Store entry
|
||||
// 4. return Entry
|
||||
// Get current heads of the log
|
||||
const heads_ = await heads()
|
||||
// Create the next pointers from heads
|
||||
const nexts = heads_.map(entry => entry.hash)
|
||||
// Get references (pointers) to multiple entries in the past
|
||||
// (skips the heads which are covered by the next field)
|
||||
const refs = await getReferences(heads_, options.referencesCount + heads_.length)
|
||||
// Create the entry
|
||||
const entry = await Entry.create(
|
||||
identity,
|
||||
id,
|
||||
data,
|
||||
tickClock(await clock()),
|
||||
nexts,
|
||||
refs
|
||||
)
|
||||
// Authorize the entry
|
||||
const canAppend = await access.canAppend(entry)
|
||||
if (!canAppend) {
|
||||
throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`)
|
||||
const task = async () => {
|
||||
// 1. Prepare entry
|
||||
// 2. Authorize entry
|
||||
// 3. Store entry
|
||||
// 4. return Entry
|
||||
// Get current heads of the log
|
||||
const heads_ = await heads()
|
||||
// Create the next pointers from heads
|
||||
const nexts = heads_.map(entry => entry.hash)
|
||||
// Get references (pointers) to multiple entries in the past
|
||||
// (skips the heads which are covered by the next field)
|
||||
const refs = await getReferences(heads_, options.referencesCount + heads_.length)
|
||||
// Create the entry
|
||||
const entry = await Entry.create(
|
||||
identity,
|
||||
id,
|
||||
data,
|
||||
tickClock(await clock()),
|
||||
nexts,
|
||||
refs
|
||||
)
|
||||
// Authorize the entry
|
||||
const canAppend = await access.canAppend(entry)
|
||||
if (!canAppend) {
|
||||
throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`)
|
||||
}
|
||||
|
||||
// The appended entry is now the latest head
|
||||
await _heads.set([entry])
|
||||
// Add entry to the entry storage
|
||||
await _entries.put(entry.hash, entry.bytes)
|
||||
// Add entry to the entry index
|
||||
await _index.put(entry.hash, true)
|
||||
// Return the appended entry
|
||||
return entry
|
||||
}
|
||||
|
||||
// The appended entry is now the latest head
|
||||
await _heads.set([entry])
|
||||
// Add entry to the entry storage
|
||||
await _entries.put(entry.hash, entry.bytes)
|
||||
// Add entry to the entry index
|
||||
await _index.put(entry.hash, true)
|
||||
// Return the appended entry
|
||||
return entry
|
||||
return appendQueue.add(task)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -232,81 +240,85 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
|
||||
* @instance
|
||||
*/
|
||||
const joinEntry = async (entry) => {
|
||||
/* 1. Check if the entry is already in the log and return early if it is */
|
||||
const isAlreadyInTheLog = await has(entry.hash)
|
||||
if (isAlreadyInTheLog) {
|
||||
return false
|
||||
}
|
||||
|
||||
const verifyEntry = async (entry) => {
|
||||
// Check that the Entry belongs to this Log
|
||||
if (entry.id !== id) {
|
||||
throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`)
|
||||
const task = async () => {
|
||||
/* 1. Check if the entry is already in the log and return early if it is */
|
||||
const isAlreadyInTheLog = await has(entry.hash)
|
||||
if (isAlreadyInTheLog) {
|
||||
return false
|
||||
}
|
||||
// Verify if entry is allowed to be added to the log
|
||||
const canAppend = await access.canAppend(entry)
|
||||
if (!canAppend) {
|
||||
throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`)
|
||||
}
|
||||
// Verify signature for the entry
|
||||
const isValid = await Entry.verify(identity, entry)
|
||||
if (!isValid) {
|
||||
throw new Error(`Could not validate signature for entry "${entry.hash}"`)
|
||||
}
|
||||
}
|
||||
|
||||
/* 2. Verify the entry */
|
||||
await verifyEntry(entry)
|
||||
|
||||
/* 3. Find missing entries and connections (=path in the DAG) to the current heads */
|
||||
const headsHashes = (await heads()).map(e => e.hash)
|
||||
const hashesToAdd = new Set([entry.hash])
|
||||
const hashesToGet = new Set([...entry.next, ...entry.refs])
|
||||
const connectedHeads = new Set()
|
||||
|
||||
const traverseAndVerify = async () => {
|
||||
const getEntries = Array.from(hashesToGet.values()).filter(has).map(get)
|
||||
const entries = await Promise.all(getEntries)
|
||||
|
||||
for (const e of entries) {
|
||||
hashesToGet.delete(e.hash)
|
||||
|
||||
await verifyEntry(e)
|
||||
|
||||
hashesToAdd.add(e.hash)
|
||||
|
||||
for (const hash of [...e.next, ...e.refs]) {
|
||||
const isInTheLog = await has(hash)
|
||||
|
||||
if (!isInTheLog && !hashesToAdd.has(hash)) {
|
||||
hashesToGet.add(hash)
|
||||
} else if (headsHashes.includes(hash)) {
|
||||
connectedHeads.add(hash)
|
||||
}
|
||||
const verifyEntry = async (entry) => {
|
||||
// Check that the Entry belongs to this Log
|
||||
if (entry.id !== id) {
|
||||
throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`)
|
||||
}
|
||||
// Verify if entry is allowed to be added to the log
|
||||
const canAppend = await access.canAppend(entry)
|
||||
if (!canAppend) {
|
||||
throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`)
|
||||
}
|
||||
// Verify signature for the entry
|
||||
const isValid = await Entry.verify(identity, entry)
|
||||
if (!isValid) {
|
||||
throw new Error(`Could not validate signature for entry "${entry.hash}"`)
|
||||
}
|
||||
}
|
||||
|
||||
if (hashesToGet.size > 0) {
|
||||
await traverseAndVerify()
|
||||
/* 2. Verify the entry */
|
||||
await verifyEntry(entry)
|
||||
|
||||
/* 3. Find missing entries and connections (=path in the DAG) to the current heads */
|
||||
const headsHashes = (await heads()).map(e => e.hash)
|
||||
const hashesToAdd = new Set([entry.hash])
|
||||
const hashesToGet = new Set([...entry.next, ...entry.refs])
|
||||
const connectedHeads = new Set()
|
||||
|
||||
const traverseAndVerify = async () => {
|
||||
const getEntries = Array.from(hashesToGet.values()).filter(has).map(get)
|
||||
const entries = await Promise.all(getEntries)
|
||||
|
||||
for (const e of entries) {
|
||||
hashesToGet.delete(e.hash)
|
||||
|
||||
await verifyEntry(e)
|
||||
|
||||
hashesToAdd.add(e.hash)
|
||||
|
||||
for (const hash of [...e.next, ...e.refs]) {
|
||||
const isInTheLog = await has(hash)
|
||||
|
||||
if (!isInTheLog && !hashesToAdd.has(hash)) {
|
||||
hashesToGet.add(hash)
|
||||
} else if (headsHashes.includes(hash)) {
|
||||
connectedHeads.add(hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hashesToGet.size > 0) {
|
||||
await traverseAndVerify()
|
||||
}
|
||||
}
|
||||
|
||||
await traverseAndVerify()
|
||||
|
||||
/* 4. Add missing entries to the index (=to the log) */
|
||||
for (const hash of hashesToAdd.values()) {
|
||||
await _index.put(hash, true)
|
||||
}
|
||||
|
||||
/* 5. Remove heads which new entries are connect to */
|
||||
for (const hash of connectedHeads.values()) {
|
||||
await _heads.remove(hash)
|
||||
}
|
||||
|
||||
/* 6. Add the new entry to heads (=union with current heads) */
|
||||
await _heads.add(entry)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
await traverseAndVerify()
|
||||
|
||||
/* 4. Add missing entries to the index (=to the log) */
|
||||
for (const hash of hashesToAdd.values()) {
|
||||
await _index.put(hash, true)
|
||||
}
|
||||
|
||||
/* 5. Remove heads which new entries are connect to */
|
||||
for (const hash of connectedHeads.values()) {
|
||||
await _heads.remove(hash)
|
||||
}
|
||||
|
||||
/* 6. Add the new entry to heads (=union with current heads) */
|
||||
await _heads.add(entry)
|
||||
|
||||
return true
|
||||
return joinQueue.add(task)
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user