Merge pull request #93 from orbitdb/fix/oplogrefs

Fix oplog references collection
This commit is contained in:
Hayden Young 2023-07-31 20:04:38 +08:00 committed by GitHub
commit f43413e281
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 10 deletions

View File

@ -16,7 +16,7 @@ let lastTenSeconds = 0
const benchmarkDuration = 20 // seconds
const queryLoop = async () => {
await log.append(totalQueries.toString())
await log.append(totalQueries.toString(), { referencesCount: 0 })
totalQueries++
lastTenSeconds++
queriesPerSecond++

View File

@ -155,14 +155,11 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
// 4. return Entry
// Get current heads of the log
const heads_ = await heads()
// Get references (we skip the heads which are covered by the next field)
let refs = []
for await (const { hash } of iterator({ amount: options.referencesCount + heads_.length })) {
refs.push(hash)
}
refs = refs.slice(heads_.length, options.referencesCount + heads_.length)
// Create the next pointers from heads
const nexts = heads_.map(entry => entry.hash)
// Get references (pointers) to multiple entries in the past
// (skips the heads which are covered by the next field)
const refs = await getReferences(heads_, options.referencesCount + heads_.length)
// Create the entry
const entry = await Entry.create(
identity,
@ -281,7 +278,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
* @memberof module:Log~Log
* @instance
*/
const traverse = async function * (rootEntries, shouldStopFn) {
const traverse = async function * (rootEntries, shouldStopFn, useRefs = true) {
// By default, we don't stop traversal and traverse
// until the end of the log
const defaultStopFn = () => false
@ -320,7 +317,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
fetched[hash] = true
// Add the next and refs hashes to the list of hashes to fetch next,
// filter out traversed and fetched hashes
toFetch = [...toFetch, ...next, ...refs].filter(notIndexed)
toFetch = [...toFetch, ...next, ...(useRefs ? refs : [])].filter(notIndexed)
// Function to fetch an entry and making sure it's not a duplicate (check the hash indices)
const fetchEntries = (hash) => {
if (!traversed[hash] && !fetched[hash]) {
@ -334,7 +331,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
// Add the next and refs fields from the fetched entries to the next round
toFetch = nexts
.filter(e => e != null)
.reduce((res, acc) => [...res, ...acc.next, ...acc.refs], [])
.reduce((res, acc) => Array.from(new Set([...res, ...acc.next, ...(useRefs ? acc.refs : [])])), [])
.filter(notIndexed)
// Add the fetched entries to the stack to be processed
stack = [...nexts, ...stack]
@ -486,6 +483,25 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
obj.storage !== undefined
}
/**
* Get an array of references to multiple entries in the past.
* @param {Array<Entry>} heads An array of Log heads starting rom which the references are collected from.
* @param {number} amount The number of references to return.
* @return {Array<string>}
* @private
*/
const getReferences = async (heads, amount = 0) => {
let refs = []
const shouldStopTraversal = async (entry) => {
return refs.length >= amount && amount !== -1
}
for await (const { hash } of traverse(heads, shouldStopTraversal, false)) {
refs.push(hash)
}
refs = refs.slice(heads.length + 1, amount)
return refs
}
return {
id,
clock,