// import assert from 'assert' // import mapSeries from 'p-each-series' // import rmrf from 'rimraf' // import OrbitDB from '../src/OrbitDB.js' // // Include test utilities // import { // config, // startIpfs, // stopIpfs, // testAPIs, // connectPeers, // waitForPeers, // } from 'orbit-db-test-utils' // const orbitdbPath1 = './orbitdb/tests/replication/1' // const orbitdbPath2 = './orbitdb/tests/replication/2' // const dbPath1 = './orbitdb/tests/replication/1/db1' // const dbPath2 = './orbitdb/tests/replication/2/db2' // Object.keys(testAPIs).forEach(API => { // describe(`orbit-db - Replication (${API})`, function() { // this.timeout(config.timeout * 2) // let ipfsd1, ipfsd2, ipfs1, ipfs2 // let orbitdb1, orbitdb2, db1, db2 // let timer // let options // before(async () => { // ipfsd1 = await startIpfs(API, config.daemon1) // ipfsd2 = await startIpfs(API, config.daemon2) // ipfs1 = ipfsd1.api // ipfs2 = ipfsd2.api // // Connect the peers manually to speed up test times // const isLocalhostAddress = (addr) => addr.toString().includes('127.0.0.1') // await connectPeers(ipfs1, ipfs2, { filter: isLocalhostAddress }) // console.log("Peers connected") // }) // after(async () => { // if (ipfsd1) // await stopIpfs(ipfsd1) // if (ipfsd2) // await stopIpfs(ipfsd2) // }) // beforeEach(async () => { // clearInterval(timer) // rmrf.sync(orbitdbPath1) // rmrf.sync(orbitdbPath2) // rmrf.sync(dbPath1) // rmrf.sync(dbPath2) // orbitdb1 = await OrbitDB.createInstance(ipfs1, { directory: orbitdbPath1 }) // orbitdb2 = await OrbitDB.createInstance(ipfs2, { directory: orbitdbPath2 }) // options = { // // Set write access for both clients // accessController: { // write: [ // orbitdb1.identity.id, // orbitdb2.identity.id // ] // } // } // options = Object.assign({}, options, { directory: dbPath1 }) // db1 = await orbitdb1.eventlog('replication-tests', options) // }) // afterEach(async () => { // clearInterval(timer) // options = {} // if (db1) // await db1.drop() // if (db2) // await db2.drop() // if(orbitdb1) // await orbitdb1.stop() // if(orbitdb2) // await orbitdb2.stop() // }) // it('replicates database of 1 entry', async () => { // console.log("Waiting for peers to connect") // await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) // // Set 'sync' flag on. It'll prevent creating a new local database and rather // // fetch the database from the network // options = Object.assign({}, options, { directory: dbPath2, sync: true }) // db2 = await orbitdb2.eventlog(db1.address.toString(), options) // let finished = false // await db1.add('hello') // return new Promise(resolve => { // let replicatedEventCount = 0 // db2.events.on('replicated', (address, length) => { // replicatedEventCount++ // // Once db2 has finished replication, make sure it has all elements // // and process to the asserts below // const all = db2.iterator({ limit: -1 }).collect().length // finished = (all === 1) // }) // timer = setInterval(() => { // if (finished) { // clearInterval(timer) // const entries = db2.iterator({ limit: -1 }).collect() // assert.equal(entries.length, 1) // assert.equal(entries[0].payload.value, 'hello') // assert.equal(replicatedEventCount, 1) // resolve() // } // }, 100) // }) // }) // it('replicates database of 100 entries', async () => { // console.log("Waiting for peers to connect") // await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) // options = Object.assign({}, options, { directory: dbPath2, sync: true }) // db2 = await orbitdb2.eventlog(db1.address.toString(), options) // let finished = false // const entryCount = 100 // const entryArr = [] // for (let i = 0; i < entryCount; i ++) // entryArr.push(i) // return new Promise(async (resolve, reject) => { // db2.events.on('replicated', () => { // // Once db2 has finished replication, make sure it has all elements // // and process to the asserts below // const all = db2.iterator({ limit: -1 }).collect().length // finished = (all === entryCount) // }) // try { // const add = i => db1.add('hello' + i) // await mapSeries(entryArr, add) // } catch (e) { // reject(e) // } // timer = setInterval(() => { // if (finished) { // clearInterval(timer) // const entries = db2.iterator({ limit: -1 }).collect() // assert.equal(entries.length, entryCount) // assert.equal(entries[0].payload.value, 'hello0') // assert.equal(entries[entries.length - 1].payload.value, 'hello99') // resolve() // } // }, 100) // }) // }) // it('emits correct replication info', async () => { // console.log("Waiting for peers to connect") // await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) // options = Object.assign({}, options, { directory: dbPath2, sync: true }) // db2 = await orbitdb2.eventlog(db1.address.toString(), options) // let finished = false // const entryCount = 99 // return new Promise(async (resolve, reject) => { // // Test that none of the entries gets into the replication queue twice // const replicateSet = new Set() // db2.events.on('replicate', (address, entry) => { // if (!replicateSet.has(entry.hash)) { // replicateSet.add(entry.hash) // } else { // reject(new Error('Shouldn\'t have started replication twice for entry ' + entry.hash + '\n' + entry.payload.value)) // } // }) // // Verify that progress count increases monotonically by saving // // each event's current progress into an array // const progressEvents = [] // db2.events.on('replicate.progress', () => { // progressEvents.push(db2.replicationStatus.progress) // }) // db2.events.on('replicated', (address, length) => { // // Once db2 has finished replication, make sure it has all elements // // and process to the asserts below // const all = db2.iterator({ limit: -1 }).collect().length // finished = (all === entryCount) // }) // try { // timer = setInterval(() => { // if (finished) { // clearInterval(timer) // // All entries should be in the database // assert.equal(db2.iterator({ limit: -1 }).collect().length, entryCount) // // progress events should increase monotonically // assert.equal(progressEvents.length, entryCount) // for (const [idx, e] of progressEvents.entries()) { // assert.equal(e, idx + 1) // } // // Verify replication status // assert.equal(db2.replicationStatus.progress, entryCount) // assert.equal(db2.replicationStatus.max, entryCount) // // Verify replicator state // assert.equal(db2._replicator.tasksRunning, 0) // assert.equal(db2._replicator.tasksQueued, 0) // assert.equal(db2._replicator.unfinished.length, 0) // // Replicator's internal caches should be empty // assert.equal(db2._replicator._logs.length, 0) // assert.equal(Object.keys(db2._replicator._fetching).length, 0) // resolve() // } // }, 1000) // } catch (e) { // reject(e) // } // // Trigger replication // let adds = [] // for (let i = 0; i < entryCount; i ++) { // adds.push(i) // } // await mapSeries(adds, i => db1.add('hello ' + i)) // }) // }) // it('emits correct replication info on fresh replication', async () => { // return new Promise(async (resolve, reject) => { // let finished = false // const entryCount = 512 // // Trigger replication // const adds = [] // for (let i = 0; i < entryCount; i ++) { // adds.push(i) // } // const add = async (i) => { // process.stdout.write("\rWriting " + (i + 1) + " / " + entryCount + " ") // await db1.add('hello ' + i) // } // await mapSeries(adds, add) // console.log() // // Open second instance again // options = { // directory: dbPath2, // overwrite: true, // sync: true, // } // db2 = await orbitdb2.eventlog(db1.address.toString(), options) // // Test that none of the entries gets into the replication queue twice // const replicateSet = new Set() // db2.events.on('replicate', (address, entry) => { // if (!replicateSet.has(entry.hash)) { // replicateSet.add(entry.hash) // } else { // reject(new Error('Shouldn\'t have started replication twice for entry ' + entry.hash)) // } // }) // // Verify that progress count increases monotonically by saving // // each event's current progress into an array // const progressEvents = [] // db2.events.on('replicate.progress', (address, hash, entry) => { // progressEvents.push(db2.replicationStatus.progress) // }) // let replicatedEventCount = 0 // db2.events.on('replicated', (address, length) => { // replicatedEventCount++ // // Once db2 has finished replication, make sure it has all elements // // and process to the asserts below // const all = db2.iterator({ limit: -1 }).collect().length // finished = (all === entryCount) // }) // timer = setInterval(async () => { // if (finished) { // clearInterval(timer) // try { // // All entries should be in the database // assert.equal(db2.iterator({ limit: -1 }).collect().length, entryCount) // // 'replicated' event should've been received only once // assert.equal(replicatedEventCount, 1) // // progress events should increase monotonically // assert.equal(progressEvents.length, entryCount) // for (const [idx, e] of progressEvents.entries()) { // assert.equal(e, idx + 1) // } // // Verify replication status // assert.equal(db2.replicationStatus.progress, entryCount) // assert.equal(db2.replicationStatus.max, entryCount) // // Verify replicator state // assert.equal(db2._replicator.tasksRunning, 0) // assert.equal(db2._replicator.tasksQueued, 0) // assert.equal(db2._replicator.unfinished.length, 0) // // Replicator's internal caches should be empty // assert.equal(db2._replicator._logs.length, 0) // assert.equal(Object.keys(db2._replicator._fetching).length, 0) // resolve() // } catch (e) { // reject(e) // } // } // }, 100) // }) // }) // it('emits correct replication info in two-way replication', async () => { // return new Promise(async (resolve, reject) => { // console.log("Waiting for peers to connect") // await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) // let finished = false // const entryCount = 100 // // Trigger replication // const adds = [] // for (let i = 0; i < entryCount; i ++) { // adds.push(i) // } // const add = async (i) => { // process.stdout.write("\rWriting " + (i + 1) + " / " + entryCount + " ") // await Promise.all([db1.add('hello-1-' + i), db2.add('hello-2-' + i)]) // } // // Open second instance again // let options = { // directory: dbPath2 + '2', // overwrite: true, // sync: true, // } // db2 = await orbitdb2.eventlog(db1.address.toString(), options) // assert.equal(db1.address.toString(), db2.address.toString()) // // Test that none of the entries gets into the replication queue twice // const replicateSet = new Set() // db2.events.on('replicate', (address, entry) => { // if (!replicateSet.has(entry.hash)) { // replicateSet.add(entry.hash) // } else { // reject(new Error('Shouldn\'t have started replication twice for entry ' + entry.hash)) // } // }) // db2.events.on('replicated', (address, length) => { // // Once db2 has finished replication, make sure it has all elements // // and process to the asserts below // const all = db2.iterator({ limit: -1 }).collect().length // finished = (all === entryCount * 2) // }) // try { // await mapSeries(adds, add) // console.log() // timer = setInterval(() => { // if (finished) { // clearInterval(timer) // // Database values should match // const values1 = db1.iterator({limit: -1}).collect() // const values2 = db2.iterator({limit: -1}).collect() // assert.equal(values1.length, values2.length) // assert.deepEqual(values1, values2) // // All entries should be in the database // assert.equal(values1.length, entryCount * 2) // assert.equal(values2.length, entryCount * 2) // // Verify replication status // assert.equal(db2.replicationStatus.progress, entryCount * 2) // assert.equal(db2.replicationStatus.max, entryCount * 2) // // Verify replicator state // assert.equal(db2._replicator.tasksRunning, 0) // assert.equal(db2._replicator.tasksQueued, 0) // assert.equal(db2._replicator.unfinished.length, 0) // // Replicator's internal caches should be empty // assert.equal(db2._replicator._logs.length, 0) // assert.equal(Object.keys(db2._replicator._fetching).length, 0) // resolve() // } // }, 500) // } catch (e) { // reject(e) // } // }) // }) // }) // })