From 327ef5e88d36380d81374b7e3e5874c0efd1f069 Mon Sep 17 00:00:00 2001 From: haad Date: Fri, 30 Mar 2018 12:42:09 +0200 Subject: [PATCH] Refactor replication benchmark to two separate sender/receiver processes --- benchmarks/benchmark-replication-sender.js | 166 +++++++++++++++++++++ benchmarks/benchmark-replication.js | 46 +++--- 2 files changed, 190 insertions(+), 22 deletions(-) create mode 100644 benchmarks/benchmark-replication-sender.js diff --git a/benchmarks/benchmark-replication-sender.js b/benchmarks/benchmark-replication-sender.js new file mode 100644 index 0000000..d998321 --- /dev/null +++ b/benchmarks/benchmark-replication-sender.js @@ -0,0 +1,166 @@ +'use strict' + +const IPFS = require('ipfs') +const IPFSRepo = require('ipfs-repo') +const DatastoreLevel = require('datastore-level') +const OrbitDB = require('../src/OrbitDB') +const startIpfs = require('../test/utils/start-ipfs') +const pMapSeries = require('p-map-series') + +// Metrics +let metrics1 = { + totalQueries: 0, + seconds: 0, + queriesPerSecond: 0, + lastTenSeconds: 0, +} + +let metrics2 = { + totalQueries: 0, + seconds: 0, + queriesPerSecond: 0, + lastTenSeconds: 0, +} + +const ipfsConf = { + Addresses: { + API: '/ip4/127.0.0.1/tcp/0', + Swarm: ['/ip4/0.0.0.0/tcp/0'], + Gateway: '/ip4/0.0.0.0/tcp/0' + }, + Bootstrap: [], + Discovery: { + MDNS: { + Enabled: true, + Interval: 1 + }, + }, +} + +const repoConf = { + storageBackends: { + blocks: DatastoreLevel, + }, +} + +const defaultConfig = Object.assign({}, { + start: true, + EXPERIMENTAL: { + pubsub: true, + sharding: false, + dht: false, + }, + config: ipfsConf +}) + +const conf1 = Object.assign({}, defaultConfig, { + repo: new IPFSRepo('./orbitdb/benchmarks/replication/client1/ipfs', repoConf) +}) + +const conf2 = Object.assign({}, defaultConfig, { + repo: new IPFSRepo('./orbitdb/benchmarks/replication/client2/ipfs', repoConf) +}) + +// Write loop +const queryLoop = async (db) => { + if (metrics1.totalQueries < updateCount) { + try { + await db.add(metrics1.totalQueries) + } catch (e) { + console.error(e) + } + metrics1.totalQueries ++ + metrics1.lastTenSeconds ++ + metrics1.queriesPerSecond ++ + setImmediate(() => queryLoop(db)) + } +} + +// Metrics output function +const outputMetrics = (name, db, metrics) => { + metrics.seconds ++ + console.log(`[${name}] ${metrics.queriesPerSecond} queries per second, ${metrics.totalQueries} queries in ${metrics.seconds} seconds (Oplog: ${db._oplog.length})`) + metrics.queriesPerSecond = 0 + + if(metrics.seconds % 10 === 0) { + console.log(`[${name}] --> Average of ${metrics.lastTenSeconds/10} q/s in the last 10 seconds`) + metrics.lastTenSeconds = 0 + } +} + +const database = 'benchmark-replication' +const updateCount = 20000 + +// Start +console.log("Starting IPFS daemons...") + +pMapSeries([conf1,], d => startIpfs(d)) + .then(async ([ipfs1]) => { + try { + // await ipfs2.swarm.connect(ipfs1._peerInfo.multiaddrs._multiaddrs[0].toString()) + // await ipfs1.swarm.connect(ipfs2._peerInfo.multiaddrs._multiaddrs[0].toString()) + + // Create the databases + const orbit1 = new OrbitDB(ipfs1, './orbitdb/benchmarks/replication/client1') + // const orbit2 = new OrbitDB(ipfs2, './orbitdb/benchmarks/replication/client2') + const db1 = await orbit1.eventlog(database, { overwrite: true }) + console.log(db1.address.toString()) + + // const db2 = await orbit2.eventlog(db1.address.toString()) + + let db1Connected = false + // let db2Connected = false + + console.log('Waiting for peers to connect...') + + db1.events.on('peer', () => { + db1Connected = true + console.log('Peer 1 connected') + }) + + // db2.events.on('peer', () => { + // db2Connected = true + // console.log('Peer 2 connected') + // }) + + const startInterval = setInterval(() => { + // if (db1Connected && db2Connected) { + if (db1Connected) { + clearInterval(startInterval) + // Start the write loop + queryLoop(db1) + + // Metrics output for the writer, once/sec + const writeInterval = setInterval(() => { + outputMetrics("WRITE", db1, metrics1) + if (metrics1.totalQueries === updateCount) { + clearInterval(writeInterval) + } + }, 1000) + + // Metrics output for the reader + let prevCount = 0 + // setInterval(() => { + // try { + // metrics2.totalQueries = db2._oplog.length + // metrics2.queriesPerSecond = metrics2.totalQueries - prevCount + // metrics2.lastTenSeconds += metrics2.queriesPerSecond + // prevCount = metrics2.totalQueries + + // outputMetrics("READ", db2, metrics2) + + // if (db2._oplog.length === updateCount) { + // console.log("Finished") + // process.exit(0) + // } + // } catch (e) { + // console.error(e) + // } + // }, 1000) + } + }, 100) + } catch (e) { + console.log(e) + process.exit(1) + } + }) diff --git a/benchmarks/benchmark-replication.js b/benchmarks/benchmark-replication.js index dbaddda..9c0c912 100644 --- a/benchmarks/benchmark-replication.js +++ b/benchmarks/benchmark-replication.js @@ -32,7 +32,7 @@ const ipfsConf = { Discovery: { MDNS: { Enabled: true, - Interval: 1 + Interval: 0 }, }, } @@ -94,27 +94,28 @@ const updateCount = 20000 // Start console.log("Starting IPFS daemons...") -pMapSeries([conf1, conf2], d => startIpfs(d)) - .then(async ([ipfs1, ipfs2]) => { +pMapSeries([conf2], d => startIpfs(d)) + .then(async ([ipfs2]) => { try { - await ipfs2.swarm.connect(ipfs1._peerInfo.multiaddrs._multiaddrs[0].toString()) - await ipfs1.swarm.connect(ipfs2._peerInfo.multiaddrs._multiaddrs[0].toString()) + // await ipfs2.swarm.connect(ipfs1._peerInfo.multiaddrs._multiaddrs[0].toString()) + // await ipfs1.swarm.connect(ipfs2._peerInfo.multiaddrs._multiaddrs[0].toString()) // Create the databases - const orbit1 = new OrbitDB(ipfs1, './orbitdb/benchmarks/replication/client1') + // const orbit1 = new OrbitDB(ipfs1, './orbitdb/benchmarks/replication/client1') const orbit2 = new OrbitDB(ipfs2, './orbitdb/benchmarks/replication/client2') - const db1 = await orbit1.eventlog(database, { overwrite: true }) - const db2 = await orbit2.eventlog(db1.address.toString()) + const address = process.argv[2] + // const db1 = await orbit1.eventlog(database, { overwrite: true }) + const db2 = await orbit2.eventlog(address) - let db1Connected = false + // let db1Connected = false let db2Connected = false console.log('Waiting for peers to connect...') - db1.events.on('peer', () => { - db1Connected = true - console.log('Peer 1 connected') - }) + // db1.events.on('peer', () => { + // db1Connected = true + // console.log('Peer 1 connected') + // }) db2.events.on('peer', () => { db2Connected = true @@ -122,18 +123,19 @@ pMapSeries([conf1, conf2], d => startIpfs(d)) }) const startInterval = setInterval(() => { - if (db1Connected && db2Connected) { + // if (db1Connected && db2Connected) { + if (db2Connected) { clearInterval(startInterval) // Start the write loop - queryLoop(db1) + // queryLoop(db1) - // Metrics output for the writer, once/sec - const writeInterval = setInterval(() => { - outputMetrics("WRITE", db1, metrics1) - if (metrics1.totalQueries === updateCount) { - clearInterval(writeInterval) - } - }, 1000) + // // Metrics output for the writer, once/sec + // const writeInterval = setInterval(() => { + // outputMetrics("WRITE", db1, metrics1) + // if (metrics1.totalQueries === updateCount) { + // clearInterval(writeInterval) + // } + // }, 1000) // Metrics output for the reader let prevCount = 0