diff --git a/benchmarks/orbitdb-replicate.js b/benchmarks/orbitdb-replicate.js index 9917036..edc50b4 100644 --- a/benchmarks/orbitdb-replicate.js +++ b/benchmarks/orbitdb-replicate.js @@ -45,8 +45,10 @@ EventEmitter.defaultMaxListeners = 10000 let connected = false const onJoin = async (peerId) => (connected = true) + const onError = async (err) => console.error(err) db2.events.on('join', onJoin) + db2.events.on('error', onError) await waitFor(() => connected, () => true) diff --git a/src/oplog/entry.js b/src/oplog/entry.js index a1d65d3..1183383 100644 --- a/src/oplog/entry.js +++ b/src/oplog/entry.js @@ -86,7 +86,7 @@ const create = async (identity, id, payload, encryptPayloadFn, clock = null, nex entry.identity = identity.hash entry.sig = signature entry.payload = payload - entry.encryptedPayload = encryptedPayload + entry._payload = encryptedPayload return entry } @@ -100,7 +100,7 @@ const create = async (identity, id, payload, encryptPayloadFn, clock = null, nex * @memberof module:Log~Entry * @private */ -const verify = async (identities, entry, encryptPayloadFn) => { +const verify = async (identities, entry) => { if (!identities) throw new Error('Identities is required, cannot verify entry') if (!isEntry(entry)) throw new Error('Invalid Log entry') if (!entry.key) throw new Error("Entry doesn't have a key") @@ -110,7 +110,7 @@ const verify = async (identities, entry, encryptPayloadFn) => { const value = { id: e.id, - payload: e.encryptedPayload || e.payload, + payload: e._payload || e.payload, next: e.next, refs: e.refs, clock: e.clock, @@ -147,7 +147,7 @@ const isEntry = (obj) => { * @private */ const isEqual = (a, b) => { - return a && b && a.hash === b.hash + return a && b && a.hash && a.hash === b.hash } /** @@ -157,32 +157,36 @@ const isEqual = (a, b) => { * @memberof module:Log~Entry * @private */ -const decode = async (bytes, decryptPayloadFn, decryptEntryFn) => { +const decode = async (bytes, decryptEntryFn, decryptPayloadFn) => { let cid if (decryptEntryFn) { - const encryptedEntry = await Block.decode({ bytes, codec, hasher }) - bytes = await decryptEntryFn(encryptedEntry.value) - cid = encryptedEntry.cid + try { + const encryptedEntry = await Block.decode({ bytes, codec, hasher }) + bytes = await decryptEntryFn(encryptedEntry.value) + cid = encryptedEntry.cid + } catch (e) { + throw new Error('Could not decrypt entry') + } } const decodedEntry = await Block.decode({ bytes, codec, hasher }) const entry = decodedEntry.value - cid = cid || decodedEntry.cid - - const hash = cid.toString(hashStringEncoding) if (decryptPayloadFn) { try { const decryptedPayloadBytes = await decryptPayloadFn(entry.payload) const { value: decryptedPayload } = await Block.decode({ bytes: decryptedPayloadBytes, codec, hasher }) - entry.encryptedPayload = entry.payload + entry._payload = entry.payload entry.payload = decryptedPayload } catch (e) { - throw new Error('Could not decrypt entry') + throw new Error('Could not decrypt payload') } } + cid = cid || decodedEntry.cid + const hash = cid.toString(hashStringEncoding) + return { ...entry, hash @@ -200,10 +204,10 @@ const encode = async (entry, encryptEntryFn, encryptPayloadFn) => { const e = Object.assign({}, entry) if (encryptPayloadFn) { - e.payload = e.encryptedPayload + e.payload = e._payload } - delete e.encryptedPayload + delete e._payload delete e.hash let { cid, bytes } = await Block.encode({ value: e, codec, hasher }) diff --git a/src/oplog/heads.js b/src/oplog/heads.js index 6603e6d..8e0483e 100644 --- a/src/oplog/heads.js +++ b/src/oplog/heads.js @@ -15,11 +15,13 @@ const Heads = async ({ storage, heads, decryptPayloadFn, decryptEntryFn }) => { const put = async (heads) => { heads = findHeads(heads) for (const head of heads) { - await storage.put(head.hash, head.bytes) + // Store the entry's hash and nexts + await storage.put(head.hash, head.next) } } const set = async (heads) => { + // TODO: fix storage write fluctuation await storage.clear() await put(heads) } @@ -42,9 +44,8 @@ const Heads = async ({ storage, heads, decryptPayloadFn, decryptEntryFn }) => { const iterator = async function * () { const it = storage.iterator() - for await (const [, bytes] of it) { - const head = await Entry.decode(bytes, decryptPayloadFn, decryptEntryFn) - yield head + for await (const [hash, next] of it) { + yield { hash, next } } } diff --git a/src/oplog/log.js b/src/oplog/log.js index 523e552..51cf0b5 100644 --- a/src/oplog/log.js +++ b/src/oplog/log.js @@ -10,18 +10,14 @@ import LRU from 'lru' import PQueue from 'p-queue' import Entry from './entry.js' import Clock, { tickClock } from './clock.js' -import Heads from './heads.js' import ConflictResolution from './conflict-resolution.js' -import MemoryStorage from '../storage/memory.js' +import OplogIndex from './oplog-index.js' const { LastWriteWins, NoZeroes } = ConflictResolution const randomId = () => new Date().getTime().toString() const maxClockTimeReducer = (res, acc) => Math.max(res, acc.clock.time) -// Default storage for storing the Log and its entries. Default: Memory. Options: Memory, LRU, IPFS. -const DefaultStorage = MemoryStorage - // Default AccessController for the Log. // Default policy is that anyone can write to the Log. // Signature of an entry will always be verified regardless of AccessController policy. @@ -68,21 +64,20 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora if (logHeads != null && !Array.isArray(logHeads)) { throw new Error('\'logHeads\' argument must be an array') } + // Set Log's id const id = logId || randomId() + // Encryption of entries and payloads encryption = encryption || {} - const { encryptPayloadFn, decryptPayloadFn, encryptEntryFn, decryptEntryFn } = encryption + const { encryptPayloadFn } = encryption + // Access Controller access = access || await DefaultAccessController() - // Oplog entry storage - const _entries = entryStorage || await DefaultStorage() - // Entry index for keeping track which entries are already in the log - const _index = indexStorage || await DefaultStorage() - // Heads storage - headsStorage = headsStorage || await DefaultStorage() - // Add heads to the state storage, ie. init the log state - const _heads = await Heads({ storage: headsStorage, heads: logHeads, decryptPayloadFn, decryptEntryFn }) + + // Index and storage of entries for this Log + const index = await OplogIndex({ logHeads, entryStorage, indexStorage, headsStorage, encryption }) + // Conflict-resolution sorting function sortFn = NoZeroes(sortFn || LastWriteWins) @@ -110,8 +105,8 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora * @instance */ const heads = async () => { - const res = await _heads.all() - return res.sort(sortFn).reverse() + const heads_ = await index.heads() + return heads_.sort(sortFn).reverse() } /** @@ -141,16 +136,11 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora if (!hash) { throw new Error('hash is required') } - const bytes = await _entries.get(hash) - if (bytes) { - const entry = await Entry.decode(bytes, decryptPayloadFn, decryptEntryFn) - return entry - } + return index.get(hash) } const has = async (hash) => { - const entry = await _index.get(hash) - return entry != null + return index.has(hash) } /** @@ -169,6 +159,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora // 2. Authorize entry // 3. Store entry // 4. return Entry + // Get current heads of the log const heads_ = await heads() // Create the next pointers from heads @@ -187,19 +178,16 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora nexts, refs ) + // Authorize the entry const canAppend = await access.canAppend(entry) if (!canAppend) { throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`) } - const { hash, bytes } = await Entry.encode(entry, encryptEntryFn, encryptPayloadFn) - // The appended entry is now the latest head - await _heads.set([{ hash, bytes, next: entry.next }]) - // Add entry to the entry storage - await _entries.put(hash, bytes) - // Add entry to the entry index - await _index.put(hash, true) + // Add the entry to the index (=store and index it) + const hash = await index.setHead(entry) + // Return the appended entry return { ...entry, hash } } @@ -228,9 +216,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora if (!isLog(log)) { throw new Error('Given argument is not an instance of Log') } - if (_entries.merge) { - await _entries.merge(log.storage) - } + await index.storage.merge(log.storage) const heads = await log.heads() for (const entry of heads) { await joinEntry(entry) @@ -268,7 +254,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`) } // Verify signature for the entry - const isValid = await Entry.verify(identity, entry, encryptPayloadFn) + const isValid = await Entry.verify(identity, entry) if (!isValid) { throw new Error(`Could not validate signature for entry "${entry.hash}"`) } @@ -313,19 +299,11 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora await traverseAndVerify() /* 4. Add missing entries to the index (=to the log) */ - for (const hash of hashesToAdd.values()) { - await _index.put(hash, true) - } - + await index.addVerified(hashesToAdd.values()) /* 5. Remove heads which new entries are connect to */ - for (const hash of connectedHeads.values()) { - await _heads.remove(hash) - } + await index.removeHeads(connectedHeads.values()) /* 6. Add the new entry to heads (=union with current heads) */ - const { hash, next } = entry - const bytes = await _entries.get(hash) - await _heads.add({ hash, bytes, next }) - // await _heads.add(entry) + await index.addHead(entry) return true } @@ -510,9 +488,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora * @instance */ const clear = async () => { - await _index.clear() - await _heads.clear() - await _entries.clear() + await index.clear() } /** @@ -521,9 +497,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora * @instance */ const close = async () => { - await _index.close() - await _heads.close() - await _entries.close() + await index.close() } /** @@ -579,7 +553,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora close, access, identity, - storage: _entries, + storage: index.storage, encryption } } diff --git a/src/oplog/oplog-index.js b/src/oplog/oplog-index.js new file mode 100644 index 0000000..4ded634 --- /dev/null +++ b/src/oplog/oplog-index.js @@ -0,0 +1,106 @@ +import Entry from './entry.js' +import Heads from './heads.js' +import MemoryStorage from '../storage/memory.js' + +// Default storage for storing the Log and its entries. Default: Memory. Options: Memory, LRU, IPFS. +const DefaultStorage = MemoryStorage + +const OplogIndex = async ({ logHeads, entryStorage, headsStorage, indexStorage, encryption }) => { + encryption = encryption || {} + const { encryptPayloadFn, decryptPayloadFn, encryptEntryFn, decryptEntryFn } = encryption + + // Oplog entry storage + const _entries = entryStorage || await DefaultStorage() + // Entry index for keeping track which entries are already in the log + const _index = indexStorage || await DefaultStorage() + // Heads storage + headsStorage = headsStorage || await DefaultStorage() + // Add heads to the state storage, ie. init the log state + const _heads = await Heads({ storage: headsStorage, heads: logHeads, decryptPayloadFn, decryptEntryFn }) + + const get = async (hash) => { + const bytes = await _entries.get(hash) + if (bytes) { + const entry = await Entry.decode(bytes, decryptEntryFn, decryptPayloadFn) + return entry + } + } + + const getBytes = async (hash) => { + return _entries.get(hash) + } + + const has = async (hash) => { + const entry = await _index.get(hash) + return entry != null + } + + const heads = async () => { + const heads_ = [] + for (const { hash } of await _heads.all()) { + const head = await get(hash) + heads_.push(head) + } + return heads_ + } + + const setHead = async (entry) => { + const { hash, bytes } = await Entry.encode(entry, encryptEntryFn, encryptPayloadFn) + // The appended entry is now the latest head + await _heads.set([{ hash, ...entry }]) + // Add entry to the entry storage + await _entries.put(hash, bytes) + // Add entry to the entry index + await _index.put(hash, true) + + return hash + } + + const addHead = async (entry) => { + /* 6. Add the new entry to heads (=union with current heads) */ + await _heads.add(entry) + return entry.hash + } + + const removeHeads = async (hashes) => { + /* 5. Remove heads which new entries are connect to */ + for (const hash of hashes) { + await _heads.remove(hash) + } + } + + const addVerified = async (hashes) => { + /* 4. Add missing entries to the index (=to the log) */ + for (const hash of hashes) { + await _index.put(hash, true) + } + } + + const clear = async () => { + await _index.clear() + await _heads.clear() + await _entries.clear() + } + + const close = async () => { + await _index.close() + await _heads.close() + await _entries.close() + } + + return { + get, + getBytes, + has, + heads, + setHead, + addHead, + removeHeads, + addVerified, + storage: _entries, + clear, + close + } +} + +export default OplogIndex diff --git a/src/sync.js b/src/sync.js index ca6b640..21fe4c5 100644 --- a/src/sync.js +++ b/src/sync.js @@ -147,7 +147,8 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => { const sendHeads = (source) => { return (async function * () { const heads = await log.heads() - for await (const { bytes } of heads) { + for await (const { hash } of heads) { + const bytes = await log.storage.get(hash) yield bytes } })() @@ -157,7 +158,8 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => { for await (const value of source) { const headBytes = value.subarray() if (headBytes && onSynced) { - await onSynced(headBytes) + const entry = await Entry.decode(headBytes, log.encryption.decryptEntryFn, log.encryption.decryptPayloadFn) + await onSynced(entry) } } if (started) { @@ -221,7 +223,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => { const task = async () => { try { if (data && onSynced) { - const entry = await Entry.decode(data, log.encryption.decryptPayloadFn, log.encryption.decryptEntryFn) + const entry = await Entry.decode(data, log.encryption.decryptEntryFn, log.encryption.decryptPayloadFn) await onSynced(entry) } } catch (e) { diff --git a/test/oplog/replicate.test.js b/test/oplog/replicate.test.js index 010f77f..c70c9f4 100644 --- a/test/oplog/replicate.test.js +++ b/test/oplog/replicate.test.js @@ -9,7 +9,7 @@ import createHelia from '../utils/create-helia.js' const keysPath = './testkeys' -describe('Log - Replication', function () { +describe.only('Log - Replication', function () { let ipfs1, ipfs2 let id1, id2 let keystore @@ -69,7 +69,7 @@ describe('Log - Replication', function () { try { if (!messageIsFromMe(message)) { const entry = await Entry.decode(message.detail.data) - await storage1.put(entry.hash, entry.bytes) + await storage1.put(entry.hash, message.detail.data) await log1.joinEntry(entry) } } catch (e) { @@ -83,7 +83,7 @@ describe('Log - Replication', function () { try { if (!messageIsFromMe(message)) { const entry = await Entry.decode(message.detail.data) - await storage2.put(entry.hash, entry.bytes) + await storage2.put(entry.hash, message.detail.data) await log2.joinEntry(entry) } } catch (e) { @@ -114,8 +114,10 @@ describe('Log - Replication', function () { for (let i = 1; i <= amount; i++) { const entry1 = await input1.append('A' + i) const entry2 = await input2.append('B' + i) - await ipfs1.libp2p.services.pubsub.publish(logId, entry1.bytes) - await ipfs2.libp2p.services.pubsub.publish(logId, entry2.bytes) + const bytes1 = await input1.storage.get(entry1.hash) + const bytes2 = await input1.storage.get(entry2.hash) + await ipfs1.libp2p.services.pubsub.publish(logId, bytes1) + await ipfs2.libp2p.services.pubsub.publish(logId, bytes2) } console.log('Messages sent') diff --git a/test/orbitdb-encryption.test.js b/test/orbitdb-encryption.test.js index 255556c..f93a53a 100644 --- a/test/orbitdb-encryption.test.js +++ b/test/orbitdb-encryption.test.js @@ -76,7 +76,7 @@ describe.only('Encryption/Decryption', function () { strictEqual(await db1.get(hash), 'record 1') }) - it.only('encrypts/decrypts entry', async () => { + it('encrypts/decrypts entry', async () => { let connected = false let updated = false let error = false