Files
orbitdb/test/oplog/replicate.test.js
haad a063b3fb4a Refactor OrbitDB
Fix sync
Fix linter
Fix tests
Clean up
Set default references count to 0
Fix sync
Use address instead of databaseId
Sync protocol
Keep references to open databases in OrbitDB
Fix append benchmark
Initial version of heads exchange
Remove Feed
Fix KeyValuePersisted iterator
Refactor OrbitDBAddress a bit more
Add rest of the database types
Refactor OrbitDB addresses
Initial version for the full circle
Initial structure and tests for new OrbitDB
Make sure KeyStore is open when a Database is created
Re-organize OrbitDB
Use new databases and Log
More clean up
Add 'drop' event to Database
Clean up OrbitDB
Remove id from OrbitDB
Use new KeyStore and Identities
Remove storage from OrbitDB
Remove migrations from OrbitDB
Remove caches from OrbitDB
Remove pubsub from OrbitDB
2023-03-01 16:21:07 +02:00

154 lines
5.2 KiB
JavaScript

import { strictEqual } from 'assert'
import { Log, Entry } from '../../src/index.js'
import { IPFSBlockStorage } from '../../src/storage/index.js'
// Test utils
import { config, testAPIs, startIpfs, stopIpfs, getIpfsPeerId, waitForPeers, connectPeers } from 'orbit-db-test-utils'
import { createTestIdentities, cleanUpTestIdentities } from '../fixtures/orbit-db-identity-keys.js'
Object.keys(testAPIs).forEach((IPFS) => {
describe('ipfs-log - Replication (' + IPFS + ')', function () {
this.timeout(config.timeout * 2)
let ipfsd1, ipfsd2
let ipfs1, ipfs2
let id1, id2
let identities1, identities2
let testIdentity1, testIdentity2
let storage1, storage2
before(async () => {
// Start two IPFS instances
ipfsd1 = await startIpfs(IPFS, config.daemon1)
ipfsd2 = await startIpfs(IPFS, config.daemon2)
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
await connectPeers(ipfs1, ipfs2)
// Get the peer IDs
id1 = await getIpfsPeerId(ipfs1)
id2 = await getIpfsPeerId(ipfs2)
const [identities, testIdentities] = await createTestIdentities(ipfs1, ipfs2)
identities1 = identities[0]
identities2 = identities[1]
testIdentity2 = testIdentities[0]
testIdentity1 = testIdentities[1]
storage1 = await IPFSBlockStorage({ ipfs: ipfs1 })
storage2 = await IPFSBlockStorage({ ipfs: ipfs2 })
})
after(async () => {
await cleanUpTestIdentities([identities1, identities2])
await stopIpfs(ipfsd1)
await stopIpfs(ipfsd2)
await storage1.close()
await storage2.close()
})
describe('replicates logs deterministically', async function () {
const amount = 128 + 1
const logId = 'A'
let log1, log2, input1, input2
const handleMessage1 = async (message) => {
const { id: peerId } = await ipfs1.id()
const messageIsFromMe = (message) => String(peerId) === String(message.from)
try {
if (!messageIsFromMe(message)) {
const entry = await Entry.decode(message.data)
await storage1.put(entry.hash, entry.bytes)
await log1.joinEntry(entry)
}
} catch (e) {
console.error(e)
}
}
const handleMessage2 = async (message) => {
const { id: peerId } = await ipfs2.id()
const messageIsFromMe = (message) => String(peerId) === String(message.from)
try {
if (!messageIsFromMe(message)) {
const entry = await Entry.decode(message.data)
await storage2.put(entry.hash, entry.bytes)
await log2.joinEntry(entry)
}
} catch (e) {
console.error(e)
}
}
beforeEach(async () => {
log1 = await Log(testIdentity1, { logId, storage: storage1 })
log2 = await Log(testIdentity2, { logId, storage: storage2 })
input1 = await Log(testIdentity1, { logId, storage: storage1 })
input2 = await Log(testIdentity2, { logId, storage: storage2 })
await ipfs1.pubsub.subscribe(logId, handleMessage1)
await ipfs2.pubsub.subscribe(logId, handleMessage2)
})
afterEach(async () => {
await ipfs1.pubsub.unsubscribe(logId, handleMessage1)
await ipfs2.pubsub.unsubscribe(logId, handleMessage2)
})
it('replicates logs', async () => {
await waitForPeers(ipfs1, [id2], logId)
await waitForPeers(ipfs2, [id1], logId)
for (let i = 1; i <= amount; i++) {
const entry1 = await input1.append('A' + i)
const entry2 = await input2.append('B' + i)
await ipfs1.pubsub.publish(logId, entry1.bytes)
await ipfs2.pubsub.publish(logId, entry2.bytes)
}
console.log('Messages sent')
const whileProcessingMessages = (timeoutMs) => {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('timeout')), timeoutMs)
const timer = setInterval(async () => {
const valuesA = await log1.values()
const valuesB = await log2.values()
if (valuesA.length + valuesB.length === amount * 2) {
clearInterval(timer)
clearTimeout(timeout)
console.log('Messages received')
resolve()
}
}, 200)
})
}
await whileProcessingMessages(config.timeout)
const result = await Log(testIdentity1, { logId, storage: storage1 })
await result.join(log1)
await result.join(log2)
const values1 = await log1.values()
const values2 = await log2.values()
const values3 = await result.values()
strictEqual(values1.length, amount)
strictEqual(values2.length, amount)
strictEqual(values3.length, amount * 2)
strictEqual(values3[0].payload, 'A1')
strictEqual(values3[1].payload, 'B1')
strictEqual(values3[2].payload, 'A2')
strictEqual(values3[3].payload, 'B2')
strictEqual(values3[99].payload, 'B50')
strictEqual(values3[100].payload, 'A51')
strictEqual(values3[198].payload, 'A100')
strictEqual(values3[199].payload, 'B100')
})
})
})
})