diff --git a/benchmarks/log-append.js b/benchmarks/log-append.js index 894f1a4..48ab3b1 100644 --- a/benchmarks/log-append.js +++ b/benchmarks/log-append.js @@ -37,14 +37,17 @@ const queryLoop = async () => { // in case we want to benchmark different storage modules const entryStorage = await MemoryStorage() const headsStorage = await MemoryStorage() + const indexStorage = await MemoryStorage() // Test LRUStorage // const entryStorage = await LRUStorage() // const headsStorage = await LRUStorage() + // const indexStorage = await LRUStorage() // Test LevelStorage // const entryStorage = await LevelStorage({ path: './logA/entries' }) // const headsStorage = await LevelStorage({ path: './logA/heads' }) + // const indexStorage = await LevelStorage({ path: './logA/index' }) - log = await Log(testIdentity, { logId: 'A', entryStorage, headsStorage }) + log = await Log(testIdentity, { logId: 'A', entryStorage, headsStorage, indexStorage }) // Output metrics at 1 second interval const interval = setInterval(async () => { diff --git a/benchmarks/log-iterator.js b/benchmarks/log-iterator.js index d904074..4da047b 100644 --- a/benchmarks/log-iterator.js +++ b/benchmarks/log-iterator.js @@ -15,14 +15,17 @@ import rmrf from 'rimraf' // in case we want to benchmark different storage modules const entryStorage = await MemoryStorage() const headsStorage = await MemoryStorage() + const indexStorage = await MemoryStorage() // Test LRUStorage // const entryStorage = await LRUStorage() // const headsStorage = await LRUStorage() + // const indexStorage = await LRUStorage() // Test LevelStorage // const entryStorage = await LevelStorage({ path: './logA/entries' }) // const headsStorage = await LevelStorage({ path: './logA/heads' }) + // const indexStorage = await LevelStorage({ path: './logA/index' }) - const log = await Log(testIdentity, { logId: 'A', entryStorage, headsStorage }) + const log = await Log(testIdentity, { logId: 'A', entryStorage, headsStorage, indexStorage }) const entryCount = 10000 diff --git a/src/OrbitDB.js b/src/OrbitDB.js index 1f5c2c3..a41e0e5 100644 --- a/src/OrbitDB.js +++ b/src/OrbitDB.js @@ -48,7 +48,7 @@ const OrbitDB = async ({ ipfs, id, identity, keystore, directory } = {}) => { directory = directory || './orbitdb' keystore = keystore || await KeyStore({ path: path.join(directory, './keystore') }) const identities = await Identities({ ipfs, keystore }) - identity = identity || await identities.createIdentity({ id, keystore }) + identity = identity || await identities.createIdentity({ id }) const manifestStorage = await ComposedStorage( await LRUStorage({ size: 1000 }), diff --git a/src/database.js b/src/database.js index 9e485ef..5b7af4b 100644 --- a/src/database.js +++ b/src/database.js @@ -7,7 +7,7 @@ import { ComposedStorage, LRUStorage, IPFSBlockStorage, LevelStorage } from './s const defaultPointerCount = 0 const defaultCacheSize = 1000 -const Database = async ({ OpLog, ipfs, identity, address, name, accessController, directory, meta, headsStorage, entryStorage, pointerCount, syncAutomatically }) => { +const Database = async ({ OpLog, ipfs, identity, address, name, accessController, directory, meta, headsStorage, entryStorage, indexStorage, pointerCount, syncAutomatically }) => { const { Log, Entry } = OpLog directory = Path.join(directory || './orbitdb', `./${address}/`) @@ -24,10 +24,12 @@ const Database = async ({ OpLog, ipfs, identity, address, name, accessController await LevelStorage({ path: Path.join(directory, '/log/_heads/') }) ) - const log = await Log(identity, { logId: address, access: accessController, entryStorage, headsStorage }) + indexStorage = indexStorage || await ComposedStorage( + await LRUStorage({ size: defaultCacheSize }), + await LevelStorage({ path: Path.join(directory, '/log/_index/') }) + ) - // const indexStorage = await LevelStorage({ path: Path.join(directory, '/log/_index/') }) - // const log = await Log(identity, { logId: address.toString(), access: accessController, entryStorage, headsStorage, indexStorage }) + const log = await Log(identity, { logId: address, access: accessController, entryStorage, headsStorage, indexStorage }) const events = new EventEmitter() const queue = new PQueue({ concurrency: 1 }) diff --git a/src/key-store.js b/src/key-store.js index c73e91b..144d9bc 100644 --- a/src/key-store.js +++ b/src/key-store.js @@ -21,7 +21,7 @@ const verifySignature = async (signature, publicKey, data) => { } if (!(data instanceof Uint8Array)) { - data = typeof data === "string" ? uint8ArrayFromString(data) : new Uint8Array(data) + data = typeof data === 'string' ? uint8ArrayFromString(data) : new Uint8Array(data) } const isValid = (key, msg, sig) => key.verify(msg, sig) @@ -46,8 +46,8 @@ const signMessage = async (key, data) => { throw new Error('Given input data was undefined') } - if (!(data instanceof Uint8Array)) { - data = typeof data === "string" ? uint8ArrayFromString(data) : new Uint8Array(data) + if (!(data instanceof Uint8Array)) { + data = typeof data === 'string' ? uint8ArrayFromString(data) : new Uint8Array(data) } return uint8ArrayToString(await key.sign(data), 'base16') diff --git a/src/oplog/log.js b/src/oplog/log.js index a8351a2..11f057d 100644 --- a/src/oplog/log.js +++ b/src/oplog/log.js @@ -4,10 +4,6 @@ import Clock from './lamport-clock.js' import Heads from './heads.js' import Sorting from './sorting.js' import MemoryStorage from '../storage/memory.js' -// import LRUStorage from './storage/lru.js' -// import LevelStorage from './storage/level.js' -// import IPFSBlockStorage from './storage/ipfs-block.js' -// import ComposedStorage from './storage/composed.js' import { isDefined } from '../utils/index.js' const { LastWriteWins, NoZeroes } = Sorting @@ -17,9 +13,6 @@ const maxClockTimeReducer = (res, acc) => Math.max(res, acc.clock.time) // Default storage for storing the Log and its entries. Default: Memory. Options: Memory, LRU, IPFS. const DefaultStorage = MemoryStorage -// const DefaultStorage = LRUStorage -// const DefaultStorage = LevelStorage -// const DefaultStorage = IPFSBlockStorage // Default AccessController for the Log. // Default policy is that anyone can write to the Log. @@ -69,7 +62,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora // Oplog entry storage const _entries = entryStorage || await DefaultStorage() // Entry index for keeping track which entries are already in the log - // const _index = indexStorage || await DefaultStorage() + const _index = indexStorage || await DefaultStorage() // Heads storage headsStorage = headsStorage || await DefaultStorage() // Add heads to the state storage, ie. init the log state @@ -117,15 +110,14 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora const bytes = await _entries.get(hash) if (bytes) { const entry = await Entry.decode(bytes) - // await _index.put(hash, true) + await _index.put(hash, true) return entry } } const has = async (hash) => { - return false - // const entry = await _index.get(hash) - // return isDefined(entry) + const entry = await _index.get(hash) + return isDefined(entry) } /** @@ -163,7 +155,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora // Add entry to the entry storage await _entries.put(entry.hash, entry.bytes) // Add entry to the entry index - // await _index.put(entry.hash, true) + await _index.put(entry.hash, true) // Return the appended entry return entry } @@ -207,6 +199,15 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora const isAlreadyInTheLog = await has(entry.hash) if (isAlreadyInTheLog) { return false + } else { + // Check that the entry is not an entry that hasn't been indexed + const it = traverse(await heads(), (e) => e.next.includes(entry.hash)) + for await (const e of it) { + if (e.next.includes(entry.hash)) { + await _index.put(entry.hash, true) + return false + } + } } // Check that the Entry belongs to this Log if (entry.id !== id) { @@ -233,7 +234,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora // Add the new entry to the entry storage await _entries.put(entry.hash, entry.bytes) // Add the new entry to the entry index - // await _index.put(entry.hash, true) + await _index.put(entry.hash, true) // We've added the entry to the log return true } @@ -393,13 +394,13 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora } const clear = async () => { - // await _index.clear() + await _index.clear() await _heads.clear() await _entries.clear() } const close = async () => { - // await _index.close() + await _index.close() await _heads.close() await _entries.close() } @@ -451,6 +452,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora values, all: values, // Alias for values() get, + has, append, join, joinEntry, diff --git a/src/storage/ipfs-block.js b/src/storage/ipfs-block.js index d181056..ade6d20 100644 --- a/src/storage/ipfs-block.js +++ b/src/storage/ipfs-block.js @@ -20,9 +20,7 @@ const IPFSBlockStorage = async ({ ipfs, timeout, pin } = {}) => { }) } - const del = async (hash) => { - // TODO? - } + const del = async (hash) => {} const get = async (hash) => { const cid = CID.parse(hash, base58btc) @@ -34,8 +32,6 @@ const IPFSBlockStorage = async ({ ipfs, timeout, pin } = {}) => { const iterator = async function * () {} - // TODO: all() - const merge = async (other) => {} const clear = async () => {} @@ -47,7 +43,6 @@ const IPFSBlockStorage = async ({ ipfs, timeout, pin } = {}) => { del, get, iterator, - // TODO: all, merge, clear, close diff --git a/src/storage/level.js b/src/storage/level.js index 28f0e32..663b29f 100644 --- a/src/storage/level.js +++ b/src/storage/level.js @@ -34,8 +34,6 @@ const LevelStorage = async ({ path, valueEncoding } = {}) => { } } - // TODO: all() - const merge = async (other) => {} const clear = async () => { @@ -51,7 +49,6 @@ const LevelStorage = async ({ path, valueEncoding } = {}) => { del, get, iterator, - // TODO: all, merge, clear, close diff --git a/src/storage/lru.js b/src/storage/lru.js index 62d09b2..e6f0ff3 100644 --- a/src/storage/lru.js +++ b/src/storage/lru.js @@ -24,8 +24,6 @@ const LRUStorage = async ({ size } = {}) => { } } - // TODO: all() - const merge = async (other) => { if (other) { for await (const [key, value] of other.iterator()) { @@ -45,7 +43,6 @@ const LRUStorage = async ({ size } = {}) => { del, get, iterator, - // TODO: all, merge, clear, close diff --git a/src/storage/memory.js b/src/storage/memory.js index ac15145..a1ddbf7 100644 --- a/src/storage/memory.js +++ b/src/storage/memory.js @@ -27,8 +27,6 @@ const MemoryStorage = async () => { } } - // TODO: all() - const clear = async () => { memory = {} } @@ -40,7 +38,6 @@ const MemoryStorage = async () => { del, get, iterator, - // TODO: all, merge, clear, close diff --git a/test/access-controllers/ipfs-access-controller.test.js b/test/access-controllers/ipfs-access-controller.test.js index bfe160a..3a7e38e 100644 --- a/test/access-controllers/ipfs-access-controller.test.js +++ b/test/access-controllers/ipfs-access-controller.test.js @@ -14,6 +14,7 @@ describe('IPFSAccessController', function () { this.timeout(config.timeout) let ipfs1, ipfs2 + let keystore1, keystore2 let identities1, identities2 let testIdentity1, testIdentity2 @@ -22,8 +23,8 @@ describe('IPFSAccessController', function () { ipfs2 = await IPFS.create({ ...config.daemon2, repo: './ipfs2' }) await connectPeers(ipfs1, ipfs2) - const keystore1 = await Keystore({ path: dbPath1 + '/keys' }) - const keystore2 = await Keystore({ path: dbPath2 + '/keys' }) + keystore1 = await Keystore({ path: dbPath1 + '/keys' }) + keystore2 = await Keystore({ path: dbPath2 + '/keys' }) identities1 = await Identities({ keystore: keystore1 }) identities2 = await Identities({ keystore: keystore2 }) @@ -41,6 +42,14 @@ describe('IPFSAccessController', function () { await ipfs2.stop() } + if (keystore1) { + await keystore1.close() + } + + if (keystore2) { + await keystore2.close() + } + await rmrf('./orbitdb') await rmrf('./ipfs1') await rmrf('./ipfs2') diff --git a/test/config.js b/test/config.js index 258ac00..813e4f3 100644 --- a/test/config.js +++ b/test/config.js @@ -32,6 +32,7 @@ export default { } }, daemon1: { + silent: true, EXPERIMENTAL: { pubsub: true }, @@ -54,6 +55,7 @@ export default { } }, daemon2: { + silent: true, EXPERIMENTAL: { pubsub: true }, diff --git a/test/database-replication.test.js b/test/database-replication.test.js index 1dcfa37..2d15664 100644 --- a/test/database-replication.test.js +++ b/test/database-replication.test.js @@ -83,27 +83,25 @@ describe('Database - Replication', function () { }) it('replicates databases across two peers', async () => { - let connected1 = false - let connected2 = false + let replicated = false + let expectedEntryHash = null - const onConnected1 = (peerId, heads) => { - connected1 = true + const onConnected = (peerId, heads) => { + replicated = expectedEntryHash && heads.map(e => e.hash).includes(expectedEntryHash) + } + const onUpdate = (entry) => { + replicated = expectedEntryHash && entry.hash === expectedEntryHash } - const onConnected2 = (peerId, heads) => { - connected2 = true - } - - db1.events.on('join', onConnected1) - db2.events.on('join', onConnected2) + db2.events.on('join', onConnected) + db2.events.on('update', onUpdate) await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' }) await db1.addOperation({ op: 'PUT', key: 2, value: 'record 2 on db 1' }) await db1.addOperation({ op: 'PUT', key: 3, value: 'record 3 on db 1' }) - await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' }) + expectedEntryHash = await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' }) - await waitFor(() => connected1, () => true) - await waitFor(() => connected2, () => true) + await waitFor(() => replicated, () => true) const all1 = [] for await (const item of db1.log.iterator()) { @@ -119,19 +117,19 @@ describe('Database - Replication', function () { }) it('replicates databases across two peers with delays', async () => { - let connected1 = false - let connected2 = false + let replicated = false + let expectedEntryHash = null - const onConnected1 = (peerId, heads) => { - connected1 = true + const onConnected = (peerId, heads) => { + replicated = expectedEntryHash && heads.map(e => e.hash).includes(expectedEntryHash) } - const onConnected2 = (peerId, heads) => { - connected2 = true + const onUpdate = (entry) => { + replicated = expectedEntryHash && entry.hash === expectedEntryHash } - db1.events.on('join', onConnected1) - db2.events.on('join', onConnected2) + db2.events.on('join', onConnected) + db2.events.on('update', onUpdate) await db1.addOperation({ op: 'PUT', key: 1, value: 'record 1 on db 1' }) @@ -146,10 +144,9 @@ describe('Database - Replication', function () { setTimeout(() => resolve(), 1000) }) - await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' }) + expectedEntryHash = await db1.addOperation({ op: 'PUT', key: 4, value: 'record 4 on db 1' }) - await waitFor(() => connected1, () => true) - await waitFor(() => connected2, () => true) + await waitFor(() => replicated, () => true) const all1 = [] for await (const item of db1.log.iterator()) { @@ -166,6 +163,7 @@ describe('Database - Replication', function () { it('adds an operation before db2 is instantiated', async () => { let connected = false + const onConnected = (peerId, heads) => { connected = true } diff --git a/test/oplog/join.test.js b/test/oplog/join.test.js index 4dee235..524ce66 100644 --- a/test/oplog/join.test.js +++ b/test/oplog/join.test.js @@ -426,4 +426,20 @@ describe('Log - Join', async function () { strictEqual(values.length, 10) deepStrictEqual(values.map((e) => e.payload), expectedData) }) + + it('has correct heads after joining logs', async () => { + const e1 = await log1.append('hello1') + await log1.append('hello2') + const e3 = await log1.append('hello3') + + await log2.join(log1) + + const heads1 = await log2.heads() + deepStrictEqual(heads1, [e3]) + + await log2.joinEntry(e1) + + const heads2 = await log2.heads() + deepStrictEqual(heads2, [e3]) + }) }) diff --git a/test/oplog/replicate.test.js b/test/oplog/replicate.test.js index 68aebbb..b3fbd67 100644 --- a/test/oplog/replicate.test.js +++ b/test/oplog/replicate.test.js @@ -97,10 +97,10 @@ describe('Log - Replication', function () { } beforeEach(async () => { - log1 = await Log(testIdentity1, { logId, storage: storage1 }) - log2 = await Log(testIdentity2, { logId, storage: storage2 }) - input1 = await Log(testIdentity1, { logId, storage: storage1 }) - input2 = await Log(testIdentity2, { logId, storage: storage2 }) + log1 = await Log(testIdentity1, { logId, entryStorage: storage1 }) + log2 = await Log(testIdentity2, { logId, entryStorage: storage2 }) + input1 = await Log(testIdentity1, { logId, entryStorage: storage1 }) + input2 = await Log(testIdentity2, { logId, entryStorage: storage2 }) await ipfs1.pubsub.subscribe(logId, handleMessage1) await ipfs2.pubsub.subscribe(logId, handleMessage2) }) @@ -141,7 +141,7 @@ describe('Log - Replication', function () { await whileProcessingMessages(config.timeout) - const result = await Log(testIdentity1, { logId, storage: storage1 }) + const result = await Log(testIdentity1, { logId, entryStorage: storage1 }) await result.join(log1) await result.join(log2) diff --git a/test/orbitdb-multiple-databases.test.js b/test/orbitdb-multiple-databases.test.js index 8310617..564a016 100644 --- a/test/orbitdb-multiple-databases.test.js +++ b/test/orbitdb-multiple-databases.test.js @@ -157,7 +157,15 @@ describe('orbit-db - Multiple Databases', function () { } // Function to check if all databases have been replicated - const allReplicated = () => remoteDatabases.every(isReplicated) + const allReplicated = async () => { + for (const db of remoteDatabases) { + const replicated = await isReplicated(db) + if (!replicated) { + return false + } + } + return true + } console.log('Waiting for replication to finish') diff --git a/test/sync.test.js b/test/sync.test.js index 2673c0e..654a716 100644 --- a/test/sync.test.js +++ b/test/sync.test.js @@ -142,7 +142,7 @@ describe('Sync protocol', function () { const onSynced = async (bytes) => { syncedHead = await Entry.decode(bytes) - log2.joinEntry(syncedHead) + await log2.joinEntry(syncedHead) syncedEventFired = true } @@ -178,16 +178,17 @@ describe('Sync protocol', function () { strictEqual(sync1.peers.has(String(peerId2)), true) }) - it('eventually reaches consistency', async () => { + it('is eventually consistent', async () => { await sync1.add(await log1.append('hello2')) await sync1.add(await log1.append('hello3')) await sync1.add(await log1.append('hello4')) - expectedEntry = await log1.append('hello5') - await sync1.add(expectedEntry) + const expected = await log1.append('hello5') - await waitFor(() => syncedEventFired, () => true) + await sync1.add(expected) - deepStrictEqual(syncedHead, expectedEntry) + await waitFor(() => Entry.isEqual(expected, syncedHead), () => true) + + deepStrictEqual(syncedHead, expected) deepStrictEqual(await log1.heads(), await log2.heads()) @@ -348,8 +349,8 @@ describe('Sync protocol', function () { log2 = await Log(testIdentity2, { logId: 'synclog1' }) const onSynced = async (bytes) => { - syncedHead = await Entry.decode(bytes) - if (expectedEntry) { + if (expectedEntry && !syncedEventFired) { + syncedHead = await Entry.decode(bytes) syncedEventFired = expectedEntry.hash === syncedHead.hash } } @@ -491,8 +492,8 @@ describe('Sync protocol', function () { let leavingPeerId before(async () => { - const log1 = await Log(testIdentity1, { logId: 'synclog2' }) - const log2 = await Log(testIdentity2, { logId: 'synclog2' }) + const log1 = await Log(testIdentity1, { logId: 'synclog3' }) + const log2 = await Log(testIdentity2, { logId: 'synclog3' }) const onJoin = (peerId, heads) => { joinEventFired = true @@ -511,12 +512,12 @@ describe('Sync protocol', function () { } const onSynced = (bytes) => { - throw new Error('Sync Error') + sync2.events.emit('error', new Error('Sync Error')) } await log1.append('hello!') - sync1 = await Sync({ ipfs: ipfs1, log: log1 }) + sync1 = await Sync({ ipfs: ipfs1, log: log1, onSynced }) sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced }) sync1.events.on('join', onJoin) sync1.events.on('leave', onLeave)