Move oplog storages to their own module

This commit is contained in:
haad 2024-06-30 07:32:32 +03:00
parent f30789fece
commit 383420750e
8 changed files with 171 additions and 80 deletions

View File

@ -45,8 +45,10 @@ EventEmitter.defaultMaxListeners = 10000
let connected = false let connected = false
const onJoin = async (peerId) => (connected = true) const onJoin = async (peerId) => (connected = true)
const onError = async (err) => console.error(err)
db2.events.on('join', onJoin) db2.events.on('join', onJoin)
db2.events.on('error', onError)
await waitFor(() => connected, () => true) await waitFor(() => connected, () => true)

View File

@ -86,7 +86,7 @@ const create = async (identity, id, payload, encryptPayloadFn, clock = null, nex
entry.identity = identity.hash entry.identity = identity.hash
entry.sig = signature entry.sig = signature
entry.payload = payload entry.payload = payload
entry.encryptedPayload = encryptedPayload entry._payload = encryptedPayload
return entry return entry
} }
@ -100,7 +100,7 @@ const create = async (identity, id, payload, encryptPayloadFn, clock = null, nex
* @memberof module:Log~Entry * @memberof module:Log~Entry
* @private * @private
*/ */
const verify = async (identities, entry, encryptPayloadFn) => { const verify = async (identities, entry) => {
if (!identities) throw new Error('Identities is required, cannot verify entry') if (!identities) throw new Error('Identities is required, cannot verify entry')
if (!isEntry(entry)) throw new Error('Invalid Log entry') if (!isEntry(entry)) throw new Error('Invalid Log entry')
if (!entry.key) throw new Error("Entry doesn't have a key") if (!entry.key) throw new Error("Entry doesn't have a key")
@ -110,7 +110,7 @@ const verify = async (identities, entry, encryptPayloadFn) => {
const value = { const value = {
id: e.id, id: e.id,
payload: e.encryptedPayload || e.payload, payload: e._payload || e.payload,
next: e.next, next: e.next,
refs: e.refs, refs: e.refs,
clock: e.clock, clock: e.clock,
@ -147,7 +147,7 @@ const isEntry = (obj) => {
* @private * @private
*/ */
const isEqual = (a, b) => { const isEqual = (a, b) => {
return a && b && a.hash === b.hash return a && b && a.hash && a.hash === b.hash
} }
/** /**
@ -157,32 +157,36 @@ const isEqual = (a, b) => {
* @memberof module:Log~Entry * @memberof module:Log~Entry
* @private * @private
*/ */
const decode = async (bytes, decryptPayloadFn, decryptEntryFn) => { const decode = async (bytes, decryptEntryFn, decryptPayloadFn) => {
let cid let cid
if (decryptEntryFn) { if (decryptEntryFn) {
const encryptedEntry = await Block.decode({ bytes, codec, hasher }) try {
bytes = await decryptEntryFn(encryptedEntry.value) const encryptedEntry = await Block.decode({ bytes, codec, hasher })
cid = encryptedEntry.cid bytes = await decryptEntryFn(encryptedEntry.value)
cid = encryptedEntry.cid
} catch (e) {
throw new Error('Could not decrypt entry')
}
} }
const decodedEntry = await Block.decode({ bytes, codec, hasher }) const decodedEntry = await Block.decode({ bytes, codec, hasher })
const entry = decodedEntry.value const entry = decodedEntry.value
cid = cid || decodedEntry.cid
const hash = cid.toString(hashStringEncoding)
if (decryptPayloadFn) { if (decryptPayloadFn) {
try { try {
const decryptedPayloadBytes = await decryptPayloadFn(entry.payload) const decryptedPayloadBytes = await decryptPayloadFn(entry.payload)
const { value: decryptedPayload } = await Block.decode({ bytes: decryptedPayloadBytes, codec, hasher }) const { value: decryptedPayload } = await Block.decode({ bytes: decryptedPayloadBytes, codec, hasher })
entry.encryptedPayload = entry.payload entry._payload = entry.payload
entry.payload = decryptedPayload entry.payload = decryptedPayload
} catch (e) { } catch (e) {
throw new Error('Could not decrypt entry') throw new Error('Could not decrypt payload')
} }
} }
cid = cid || decodedEntry.cid
const hash = cid.toString(hashStringEncoding)
return { return {
...entry, ...entry,
hash hash
@ -200,10 +204,10 @@ const encode = async (entry, encryptEntryFn, encryptPayloadFn) => {
const e = Object.assign({}, entry) const e = Object.assign({}, entry)
if (encryptPayloadFn) { if (encryptPayloadFn) {
e.payload = e.encryptedPayload e.payload = e._payload
} }
delete e.encryptedPayload delete e._payload
delete e.hash delete e.hash
let { cid, bytes } = await Block.encode({ value: e, codec, hasher }) let { cid, bytes } = await Block.encode({ value: e, codec, hasher })

View File

@ -15,11 +15,13 @@ const Heads = async ({ storage, heads, decryptPayloadFn, decryptEntryFn }) => {
const put = async (heads) => { const put = async (heads) => {
heads = findHeads(heads) heads = findHeads(heads)
for (const head of heads) { for (const head of heads) {
await storage.put(head.hash, head.bytes) // Store the entry's hash and nexts
await storage.put(head.hash, head.next)
} }
} }
const set = async (heads) => { const set = async (heads) => {
// TODO: fix storage write fluctuation
await storage.clear() await storage.clear()
await put(heads) await put(heads)
} }
@ -42,9 +44,8 @@ const Heads = async ({ storage, heads, decryptPayloadFn, decryptEntryFn }) => {
const iterator = async function * () { const iterator = async function * () {
const it = storage.iterator() const it = storage.iterator()
for await (const [, bytes] of it) { for await (const [hash, next] of it) {
const head = await Entry.decode(bytes, decryptPayloadFn, decryptEntryFn) yield { hash, next }
yield head
} }
} }

View File

@ -10,18 +10,14 @@ import LRU from 'lru'
import PQueue from 'p-queue' import PQueue from 'p-queue'
import Entry from './entry.js' import Entry from './entry.js'
import Clock, { tickClock } from './clock.js' import Clock, { tickClock } from './clock.js'
import Heads from './heads.js'
import ConflictResolution from './conflict-resolution.js' import ConflictResolution from './conflict-resolution.js'
import MemoryStorage from '../storage/memory.js' import OplogIndex from './oplog-index.js'
const { LastWriteWins, NoZeroes } = ConflictResolution const { LastWriteWins, NoZeroes } = ConflictResolution
const randomId = () => new Date().getTime().toString() const randomId = () => new Date().getTime().toString()
const maxClockTimeReducer = (res, acc) => Math.max(res, acc.clock.time) const maxClockTimeReducer = (res, acc) => Math.max(res, acc.clock.time)
// Default storage for storing the Log and its entries. Default: Memory. Options: Memory, LRU, IPFS.
const DefaultStorage = MemoryStorage
// Default AccessController for the Log. // Default AccessController for the Log.
// Default policy is that anyone can write to the Log. // Default policy is that anyone can write to the Log.
// Signature of an entry will always be verified regardless of AccessController policy. // Signature of an entry will always be verified regardless of AccessController policy.
@ -68,21 +64,20 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
if (logHeads != null && !Array.isArray(logHeads)) { if (logHeads != null && !Array.isArray(logHeads)) {
throw new Error('\'logHeads\' argument must be an array') throw new Error('\'logHeads\' argument must be an array')
} }
// Set Log's id // Set Log's id
const id = logId || randomId() const id = logId || randomId()
// Encryption of entries and payloads // Encryption of entries and payloads
encryption = encryption || {} encryption = encryption || {}
const { encryptPayloadFn, decryptPayloadFn, encryptEntryFn, decryptEntryFn } = encryption const { encryptPayloadFn } = encryption
// Access Controller // Access Controller
access = access || await DefaultAccessController() access = access || await DefaultAccessController()
// Oplog entry storage
const _entries = entryStorage || await DefaultStorage() // Index and storage of entries for this Log
// Entry index for keeping track which entries are already in the log const index = await OplogIndex({ logHeads, entryStorage, indexStorage, headsStorage, encryption })
const _index = indexStorage || await DefaultStorage()
// Heads storage
headsStorage = headsStorage || await DefaultStorage()
// Add heads to the state storage, ie. init the log state
const _heads = await Heads({ storage: headsStorage, heads: logHeads, decryptPayloadFn, decryptEntryFn })
// Conflict-resolution sorting function // Conflict-resolution sorting function
sortFn = NoZeroes(sortFn || LastWriteWins) sortFn = NoZeroes(sortFn || LastWriteWins)
@ -110,8 +105,8 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
* @instance * @instance
*/ */
const heads = async () => { const heads = async () => {
const res = await _heads.all() const heads_ = await index.heads()
return res.sort(sortFn).reverse() return heads_.sort(sortFn).reverse()
} }
/** /**
@ -141,16 +136,11 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
if (!hash) { if (!hash) {
throw new Error('hash is required') throw new Error('hash is required')
} }
const bytes = await _entries.get(hash) return index.get(hash)
if (bytes) {
const entry = await Entry.decode(bytes, decryptPayloadFn, decryptEntryFn)
return entry
}
} }
const has = async (hash) => { const has = async (hash) => {
const entry = await _index.get(hash) return index.has(hash)
return entry != null
} }
/** /**
@ -169,6 +159,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
// 2. Authorize entry // 2. Authorize entry
// 3. Store entry // 3. Store entry
// 4. return Entry // 4. return Entry
// Get current heads of the log // Get current heads of the log
const heads_ = await heads() const heads_ = await heads()
// Create the next pointers from heads // Create the next pointers from heads
@ -187,19 +178,16 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
nexts, nexts,
refs refs
) )
// Authorize the entry // Authorize the entry
const canAppend = await access.canAppend(entry) const canAppend = await access.canAppend(entry)
if (!canAppend) { if (!canAppend) {
throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`) throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`)
} }
const { hash, bytes } = await Entry.encode(entry, encryptEntryFn, encryptPayloadFn) // Add the entry to the index (=store and index it)
// The appended entry is now the latest head const hash = await index.setHead(entry)
await _heads.set([{ hash, bytes, next: entry.next }])
// Add entry to the entry storage
await _entries.put(hash, bytes)
// Add entry to the entry index
await _index.put(hash, true)
// Return the appended entry // Return the appended entry
return { ...entry, hash } return { ...entry, hash }
} }
@ -228,9 +216,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
if (!isLog(log)) { if (!isLog(log)) {
throw new Error('Given argument is not an instance of Log') throw new Error('Given argument is not an instance of Log')
} }
if (_entries.merge) { await index.storage.merge(log.storage)
await _entries.merge(log.storage)
}
const heads = await log.heads() const heads = await log.heads()
for (const entry of heads) { for (const entry of heads) {
await joinEntry(entry) await joinEntry(entry)
@ -268,7 +254,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`) throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`)
} }
// Verify signature for the entry // Verify signature for the entry
const isValid = await Entry.verify(identity, entry, encryptPayloadFn) const isValid = await Entry.verify(identity, entry)
if (!isValid) { if (!isValid) {
throw new Error(`Could not validate signature for entry "${entry.hash}"`) throw new Error(`Could not validate signature for entry "${entry.hash}"`)
} }
@ -313,19 +299,11 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
await traverseAndVerify() await traverseAndVerify()
/* 4. Add missing entries to the index (=to the log) */ /* 4. Add missing entries to the index (=to the log) */
for (const hash of hashesToAdd.values()) { await index.addVerified(hashesToAdd.values())
await _index.put(hash, true)
}
/* 5. Remove heads which new entries are connect to */ /* 5. Remove heads which new entries are connect to */
for (const hash of connectedHeads.values()) { await index.removeHeads(connectedHeads.values())
await _heads.remove(hash)
}
/* 6. Add the new entry to heads (=union with current heads) */ /* 6. Add the new entry to heads (=union with current heads) */
const { hash, next } = entry await index.addHead(entry)
const bytes = await _entries.get(hash)
await _heads.add({ hash, bytes, next })
// await _heads.add(entry)
return true return true
} }
@ -510,9 +488,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
* @instance * @instance
*/ */
const clear = async () => { const clear = async () => {
await _index.clear() await index.clear()
await _heads.clear()
await _entries.clear()
} }
/** /**
@ -521,9 +497,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
* @instance * @instance
*/ */
const close = async () => { const close = async () => {
await _index.close() await index.close()
await _heads.close()
await _entries.close()
} }
/** /**
@ -579,7 +553,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
close, close,
access, access,
identity, identity,
storage: _entries, storage: index.storage,
encryption encryption
} }
} }

106
src/oplog/oplog-index.js Normal file
View File

@ -0,0 +1,106 @@
import Entry from './entry.js'
import Heads from './heads.js'
import MemoryStorage from '../storage/memory.js'
// Default storage for storing the Log and its entries. Default: Memory. Options: Memory, LRU, IPFS.
const DefaultStorage = MemoryStorage
const OplogIndex = async ({ logHeads, entryStorage, headsStorage, indexStorage, encryption }) => {
encryption = encryption || {}
const { encryptPayloadFn, decryptPayloadFn, encryptEntryFn, decryptEntryFn } = encryption
// Oplog entry storage
const _entries = entryStorage || await DefaultStorage()
// Entry index for keeping track which entries are already in the log
const _index = indexStorage || await DefaultStorage()
// Heads storage
headsStorage = headsStorage || await DefaultStorage()
// Add heads to the state storage, ie. init the log state
const _heads = await Heads({ storage: headsStorage, heads: logHeads, decryptPayloadFn, decryptEntryFn })
const get = async (hash) => {
const bytes = await _entries.get(hash)
if (bytes) {
const entry = await Entry.decode(bytes, decryptEntryFn, decryptPayloadFn)
return entry
}
}
const getBytes = async (hash) => {
return _entries.get(hash)
}
const has = async (hash) => {
const entry = await _index.get(hash)
return entry != null
}
const heads = async () => {
const heads_ = []
for (const { hash } of await _heads.all()) {
const head = await get(hash)
heads_.push(head)
}
return heads_
}
const setHead = async (entry) => {
const { hash, bytes } = await Entry.encode(entry, encryptEntryFn, encryptPayloadFn)
// The appended entry is now the latest head
await _heads.set([{ hash, ...entry }])
// Add entry to the entry storage
await _entries.put(hash, bytes)
// Add entry to the entry index
await _index.put(hash, true)
return hash
}
const addHead = async (entry) => {
/* 6. Add the new entry to heads (=union with current heads) */
await _heads.add(entry)
return entry.hash
}
const removeHeads = async (hashes) => {
/* 5. Remove heads which new entries are connect to */
for (const hash of hashes) {
await _heads.remove(hash)
}
}
const addVerified = async (hashes) => {
/* 4. Add missing entries to the index (=to the log) */
for (const hash of hashes) {
await _index.put(hash, true)
}
}
const clear = async () => {
await _index.clear()
await _heads.clear()
await _entries.clear()
}
const close = async () => {
await _index.close()
await _heads.close()
await _entries.close()
}
return {
get,
getBytes,
has,
heads,
setHead,
addHead,
removeHeads,
addVerified,
storage: _entries,
clear,
close
}
}
export default OplogIndex

View File

@ -147,7 +147,8 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
const sendHeads = (source) => { const sendHeads = (source) => {
return (async function * () { return (async function * () {
const heads = await log.heads() const heads = await log.heads()
for await (const { bytes } of heads) { for await (const { hash } of heads) {
const bytes = await log.storage.get(hash)
yield bytes yield bytes
} }
})() })()
@ -157,7 +158,8 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
for await (const value of source) { for await (const value of source) {
const headBytes = value.subarray() const headBytes = value.subarray()
if (headBytes && onSynced) { if (headBytes && onSynced) {
await onSynced(headBytes) const entry = await Entry.decode(headBytes, log.encryption.decryptEntryFn, log.encryption.decryptPayloadFn)
await onSynced(entry)
} }
} }
if (started) { if (started) {
@ -221,7 +223,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
const task = async () => { const task = async () => {
try { try {
if (data && onSynced) { if (data && onSynced) {
const entry = await Entry.decode(data, log.encryption.decryptPayloadFn, log.encryption.decryptEntryFn) const entry = await Entry.decode(data, log.encryption.decryptEntryFn, log.encryption.decryptPayloadFn)
await onSynced(entry) await onSynced(entry)
} }
} catch (e) { } catch (e) {

View File

@ -9,7 +9,7 @@ import createHelia from '../utils/create-helia.js'
const keysPath = './testkeys' const keysPath = './testkeys'
describe('Log - Replication', function () { describe.only('Log - Replication', function () {
let ipfs1, ipfs2 let ipfs1, ipfs2
let id1, id2 let id1, id2
let keystore let keystore
@ -69,7 +69,7 @@ describe('Log - Replication', function () {
try { try {
if (!messageIsFromMe(message)) { if (!messageIsFromMe(message)) {
const entry = await Entry.decode(message.detail.data) const entry = await Entry.decode(message.detail.data)
await storage1.put(entry.hash, entry.bytes) await storage1.put(entry.hash, message.detail.data)
await log1.joinEntry(entry) await log1.joinEntry(entry)
} }
} catch (e) { } catch (e) {
@ -83,7 +83,7 @@ describe('Log - Replication', function () {
try { try {
if (!messageIsFromMe(message)) { if (!messageIsFromMe(message)) {
const entry = await Entry.decode(message.detail.data) const entry = await Entry.decode(message.detail.data)
await storage2.put(entry.hash, entry.bytes) await storage2.put(entry.hash, message.detail.data)
await log2.joinEntry(entry) await log2.joinEntry(entry)
} }
} catch (e) { } catch (e) {
@ -114,8 +114,10 @@ describe('Log - Replication', function () {
for (let i = 1; i <= amount; i++) { for (let i = 1; i <= amount; i++) {
const entry1 = await input1.append('A' + i) const entry1 = await input1.append('A' + i)
const entry2 = await input2.append('B' + i) const entry2 = await input2.append('B' + i)
await ipfs1.libp2p.services.pubsub.publish(logId, entry1.bytes) const bytes1 = await input1.storage.get(entry1.hash)
await ipfs2.libp2p.services.pubsub.publish(logId, entry2.bytes) const bytes2 = await input1.storage.get(entry2.hash)
await ipfs1.libp2p.services.pubsub.publish(logId, bytes1)
await ipfs2.libp2p.services.pubsub.publish(logId, bytes2)
} }
console.log('Messages sent') console.log('Messages sent')

View File

@ -76,7 +76,7 @@ describe.only('Encryption/Decryption', function () {
strictEqual(await db1.get(hash), 'record 1') strictEqual(await db1.get(hash), 'record 1')
}) })
it.only('encrypts/decrypts entry', async () => { it('encrypts/decrypts entry', async () => {
let connected = false let connected = false
let updated = false let updated = false
let error = false let error = false