From 78481cd96ad3bb3024a1c0c448d36a6e4da1f701 Mon Sep 17 00:00:00 2001 From: haad Date: Wed, 10 Feb 2016 13:43:07 +0100 Subject: [PATCH] WOrking version (WIP) --- examples/pubsubBenchmark.js | 56 ++++++++++++++++++++++ examples/pubsubReader.js | 67 +++++++++++++++++++++++++++ examples/readMessages.js | 80 ++++++++++++++++---------------- examples/writeMessages.js | 92 +++++++++++++++++++++++++------------ src/OrbitClient.js | 22 ++++----- src/PubSub.js | 63 ++++++++++++------------- test/orbit-client-tests.js | 4 +- 7 files changed, 268 insertions(+), 116 deletions(-) create mode 100644 examples/pubsubBenchmark.js create mode 100644 examples/pubsubReader.js diff --git a/examples/pubsubBenchmark.js b/examples/pubsubBenchmark.js new file mode 100644 index 0000000..5d5a83d --- /dev/null +++ b/examples/pubsubBenchmark.js @@ -0,0 +1,56 @@ +'use strict'; + +var async = require('asyncawait/async'); +var OrbitClient = require('../src/OrbitClient'); +var Timer = require('./Timer'); + +// Redis host +var host = '188.166.73.174'; +var port = 6379; + +var username = 'testrunner'; +var password = ''; + +let run = (async(() => { + try { + // Connect + var orbit = OrbitClient.connect(host, port, username, password); + + const id = process.argv[2] ? process.argv[2] : 'a'; + const channelName = 'c1'; + const channel = orbit.channel(channelName); + + // Metrics + let totalQueries = 0; + let seconds = 0; + let queriesPerSecond = 0; + let lastTenSeconds = 0; + + // Metrics output + setInterval(() => { + seconds ++; + + if(seconds % 10 === 0) { + console.log(`--> Average of ${lastTenSeconds/10} q/s in the last 10 seconds`) + lastTenSeconds = 0 + } + + console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`) + queriesPerSecond = 0; + }, 1000); + + while(true) { + channel.add(id + totalQueries); + totalQueries ++; + lastTenSeconds ++; + queriesPerSecond ++; + } + + } catch(e) { + console.error("error:", e); + console.error(e.stack); + process.exit(1); + } +}))(); + +module.exports = run; diff --git a/examples/pubsubReader.js b/examples/pubsubReader.js new file mode 100644 index 0000000..2588212 --- /dev/null +++ b/examples/pubsubReader.js @@ -0,0 +1,67 @@ +'use strict'; + +var async = require('asyncawait/async'); +var await = require('asyncawait/await'); +var OrbitClient = require('../src/OrbitClient'); +var Timer = require('./Timer'); + +var host = '188.166.73.174'; +var port = 6379; + +var username = 'LambOfGod'; +var password = ''; + +let run = (async(() => { + try { + var orbit = OrbitClient.connect(host, port, username, password); + const c1 = 'c1'; + const channel = orbit.channel(c1); + + let count = 1; + let id = 'Log: Query ' + let running = false; + + setInterval(async(() => { + if(!running) { + let timer = new Timer(true); + running = true; + + channel.add(id + count); + + let items = channel.iterator({ limit: 20 }).collect(); + + var g = items.filter((e) => e.item.Payload.startsWith(id)) + var prev = -1; + g.reverse().forEach((e) => { + var a = parseInt(e.item.Payload.replace(id, '')); + if(prev > -1 && prev + 1 !== a) { + console.log("MISSING VALUE!!!", prev + 1, items) + process.exit(1); + } + prev = a; + }) + + items = items.map((e) => { + return e.item.seq + " | " + e.item.Payload; + }); + + console.log("---------------------------------------------------") + console.log("Seq | Payload") + console.log("---------------------------------------------------") + console.log(items.join("\n")); + console.log("---------------------------------------------------") + console.log(`Query #${count} took ${timer.stop(true)} ms\n`); + + running = false; + count ++; + } + }), 1000); + + } catch(e) { + console.error("error:", e); + console.error(e.stack); + process.exit(1); + } +}))(); + +module.exports = run; diff --git a/examples/readMessages.js b/examples/readMessages.js index 277f7ee..7083470 100644 --- a/examples/readMessages.js +++ b/examples/readMessages.js @@ -4,24 +4,20 @@ var async = require('asyncawait/async'); var OrbitClient = require('../src/OrbitClient'); var Timer = require('./Timer'); -var host = 'localhost:3006'; +// Redis host +var host = 'localhost'; +var port = '6379' + var username = 'testrunner'; var password = ''; var util = require('util'); var exec = require('child_process').exec; -function clear(cb){ - exec('clear', function(error, stdout, stderr){ - util.puts(stdout); - cb(); - }); -} - let run = (async(() => { try { // Connect - var orbit = OrbitClient.connect(host, username, password); + var orbit = OrbitClient.connect(host, port, username, password); /* var timer = new Timer(true); @@ -60,43 +56,49 @@ let run = (async(() => { }); console.log(JSON.stringify(items, null, 2)); */ - const id = 'a'; + const id = process.argv[2] ? process.argv[2] : 'a'; const c1 = 'c1'; const cc = orbit.channel(c1); let i = 0; - let running = false; - let missCount = 0; - setInterval(async(() => { - if(!running) { - let timer = new Timer(true); - running = true; - // orbit.channel(c1).add("hello at #" + i); - cc.add(id + i); - i ++; - console.log(`Insert took ${timer.stop(true)} ms`); + let seconds = 0; + let round = 0; + let lastTen = 0; - let timer2 = new Timer(true); - var items = cc.iterator({ limit: 10 }).collect(); - console.log("Iterator took " + timer2.stop(true) + " ms"); - items = items.map((e) => { - return e.item; - }); + // Metrics + setInterval(() => { + seconds ++; - var g = items.filter((e) => e.Payload.startsWith(id)) - var prev = -1; - g.reverse().forEach((e) => { - var a = parseInt(e.Payload.replace(id, '')); - if(prev > -1 && prev + 1 !== a) { - console.log("Missing message: " + id, prev + 1) - } - prev = a; - }) - console.log(JSON.stringify(items.map((e) => e.seq + " - " + e.Payload), null, 2)); - // console.log("\n\n"); - running = false; + if(seconds % 10 === 0) { + console.log(`--> Average of ${lastTen/10} q/s in the last 10 seconds`) + lastTen = 0 } - }), 50); + + console.log(`${round} queries per second, ${i} queries in ${seconds} seconds`) + round = 0; + }, 1000); + + while(true) { + cc.add(id + i); + + i ++; + lastTen ++; + round ++; + + // let items = cc.iterator({ limit: 10 }).collect(); + // items = items.map((e) => e.item); + // let g = items.filter((e) => e.Payload.startsWith(id)) + // let prev = -1; + // g.reverse().forEach((e) => { + // const a = parseInt(e.Payload.replace(id, '')); + // if(prev > -1 && prev + 1 !== a) { + // console.log("!! Missing message: " + id, prev + 1) + // process.exit(1); + // } + // prev = a; + // }) + } + /* // You can also get the event based on its hash var value = orbit.channel(c1).get(hash2); diff --git a/examples/writeMessages.js b/examples/writeMessages.js index 69c4502..3877df0 100644 --- a/examples/writeMessages.js +++ b/examples/writeMessages.js @@ -1,10 +1,13 @@ 'use strict'; var async = require('asyncawait/async'); +var await = require('asyncawait/await'); var OrbitClient = require('../src/OrbitClient'); var Timer = require('./Timer'); -var host = 'localhost:3006'; +var host = 'localhost'; +var port = 6379; + var username = 'LambOfGod'; var password = ''; @@ -53,45 +56,74 @@ let run = (async(() => { // orbit.channel(channel, '').remove(items[items.length - 8].hash); // 9 */ - var orbit = OrbitClient.connect(host, username, password); + var orbit = OrbitClient.connect(host, port, username, password); const c1 = 'c1'; const cc = orbit.channel(c1); - let i = 0; + let i = 1; + let id = 'b' let running = false; setInterval(async(() => { - try { - if(!running) { - let timer = new Timer(true); - running = true; + if(!running) { + let timer = new Timer(true); + running = true; - cc.add("b" + i); + await(cc.add(id + i)); - let items = cc.iterator({ limit: 10 }).collect(); + let items = cc.iterator({ limit: 10 }).collect(); - var g = items.filter((e) => e.item.Payload.startsWith('b')) - var prev = -1; - g.reverse().forEach((e) => { - var a = parseInt(e.item.Payload.replace('b', '')); - if(prev > -1 && prev + 1 !== a) { - console.log("MISSSS!!!", prev + 1) - } - prev = a; - }) + var g = items.filter((e) => e.item.Payload.startsWith(id)) + var prev = -1; + g.reverse().forEach((e) => { + var a = parseInt(e.item.Payload.replace(id, '')); + if(prev > -1 && prev + 1 !== a) { + console.log("MISSSS!!!", prev + 1, items) + process.exit(1); + } + prev = a; + }) - items = items.map((e) => { - return e.item.seq + " - " + e.item.Payload; - }); - console.log(JSON.stringify(items, null, 2)); - console.log(`Query took ${timer.stop(true)} ms`); - running = false; - } - // console.log("\n\n"); - } catch(e) { - console.error(e); + items = items.map((e) => { + return e.item.seq + " - " + e.item.Payload; + }); + console.log(JSON.stringify(items, null, 2)); + console.log(`Query ${i} took ${timer.stop(true)} ms`); + running = false; + i ++; } - i ++; - }), 100); + // while(true) { + // } + }), 1000); + + // setInterval(async(() => { + // if(!running) { + // let timer = new Timer(true); + // running = true; + + // await(cc.add(id + i)); + + // let items = cc.iterator({ limit: 10 }).collect(); + + // var g = items.filter((e) => e.item.Payload.startsWith(id)) + // var prev = -1; + // g.reverse().forEach((e) => { + // var a = parseInt(e.item.Payload.replace(id, '')); + // if(prev > -1 && prev + 1 !== a) { + // console.log("MISSSS!!!", prev + 1, items) + // process.exit(1); + // } + // prev = a; + // }) + + // items = items.map((e) => { + // return e.item.seq + " - " + e.item.Payload; + // }); + // console.log(JSON.stringify(items, null, 2)); + // console.log(`Query took ${timer.stop(true)} ms`); + // running = false; + // i ++; + // } + // }), 36); } catch(e) { console.error("error:", e); diff --git a/src/OrbitClient.js b/src/OrbitClient.js index c591dfc..792c9c9 100644 --- a/src/OrbitClient.js +++ b/src/OrbitClient.js @@ -1,5 +1,6 @@ 'use strict'; +var a = require('async'); var async = require('asyncawait/async'); var await = require('asyncawait/await'); var Keystore = require('orbit-common/lib/Keystore'); @@ -135,6 +136,7 @@ class OrbitClient { // Get the current channel head and bump the sequence number let seq = this._info(channel, password).seq + 1; + let head = this._info(channel, password).head; // Create the hash cache item const hcItem = new HashCacheItem(operation, key, seq, target, metaInfo, null, pubkey, privkey, password); @@ -145,16 +147,16 @@ class OrbitClient { // If this is not the first item in the channel, patch with the previous (ie. link as next) if(seq > 0) - newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, this._info(channel, password).head)); + newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, head)); - return newHead; + return { hash: newHead, seq: seq }; } /* DB Operations */ _add(channel, password, data) { const post = this._publish(data); const key = post.Hash; - await(this._createOperation(channel, password, HashCacheOps.Add, key, post.Hash)); + await(this._createOperation(channel, password, HashCacheOps.Add, key, post.Hash, data)); return key; } @@ -169,14 +171,12 @@ class OrbitClient { return this._createOperation(channel, password, HashCacheOps.Delete, key, target); } - _createOperation(channel, password, operation, key, value) { + _createOperation(channel, password, operation, key, value, data) { let message, res = false; while(!res) { message = this._createMessage(channel, password, operation, key, value); - res = await(this._pubsub.publish(channel, message)); - if(!res) console.log("Retry <-->") + res = await(this._pubsub.publish(channel, message.hash, message.seq)); } - // console.log("Posted!") return message.Hash; } @@ -201,8 +201,8 @@ class OrbitClient { return l; } - _connect(host, username, password) { - this._pubsub = new PubSub(this.ipfs, host, username, password); + _connect(host, port, username, password) { + this._pubsub = new PubSub(this.ipfs, host, port, username, password); // this.client = this._pubsub._client; // this.user = this.client.info.user; this.user = { id: 'hello' } @@ -215,14 +215,14 @@ class OrbitClient { } class OrbitClientFactory { - static connect(host, username, password, ipfs) { + static connect(host, port, username, password, ipfs) { if(!ipfs) { let ipfsd = await(ipfsDaemon()); ipfs = ipfsd.daemon; } const client = new OrbitClient(ipfs); - await(client._connect(host, username, password)) + await(client._connect(host, port, username, password)) return client; } } diff --git a/src/PubSub.js b/src/PubSub.js index 6a8c450..2acd3ea 100644 --- a/src/PubSub.js +++ b/src/PubSub.js @@ -6,35 +6,13 @@ var redis = require("redis"); var Aggregator = require('./Aggregator'); class PubSub { - constructor(ipfs, host, username, password) { + constructor(ipfs, host, port, username, password) { this.ipfs = ipfs; this._subscriptions = {}; - this.client1 = redis.createClient(); - this.client2 = redis.createClient(); - this.currentPost = null; - - this.client1.on("message", async((hash, message) => { - const currentHead = this._subscriptions[hash] ? this._subscriptions[hash].head : null; - if(this._subscriptions[hash]) { - let item = Aggregator._fetchOne(this.ipfs, message, this._subscriptions[hash].password); - - if(item.seq > this._subscriptions[hash].seq) { - this._subscriptions[hash].seq = item.seq; - - if(currentHead !== message) - this._handleNewMessage(hash, message); - - if(this.currentPost) { - if(message === this.currentPost.hash) { - this.currentPost.callback(true); - this.currentPost = null; - } else { - this.currentPost.callback(false); - } - } - } - } - })); + this.client1 = redis.createClient({ host: host, port: port }); + this.client2 = redis.createClient({ host: host, port: port }); + this.publishQueue = []; + this.client1.on("message", this._handleMessage.bind(this)); } subscribe(hash, password, callback) { @@ -56,10 +34,10 @@ class PubSub { this.client2.unsubscribe(); } - publish(hash, message, callback) { + publish(hash, message, seq, callback) { return new Promise((resolve, reject) => { - this.currentPost = { hash: message.Hash, callback: resolve }; - this.client2.publish(hash, message.Hash); + this.publishQueue.splice(0, 0, { hash: message.Hash, callback: resolve }); + this.client2.publish(hash, JSON.stringify({ hash: message.Hash, seq: seq })); }); } @@ -71,10 +49,27 @@ class PubSub { delete this._subscriptions[hash]; } - _handleNewMessage(hash, newHead) { - this._subscriptions[hash].head = newHead; - if(this._subscriptions[hash].callback) - this._subscriptions[hash].callback(hash, newHead); + _handleMessage(hash, event) { + if(this._subscriptions[hash]) { + var e = JSON.parse(event) + var newHead = e.hash; + var seq = e.seq; + var isNewer = seq > this._subscriptions[hash].seq; + + var item = this.publishQueue[this.publishQueue.length - 1]; + if(item && item.hash === newHead) { + item.callback(isNewer); + this.publishQueue.pop(); + } + + if(isNewer) + this._updateSubscription(hash, newHead, seq); + } + } + + _updateSubscription(hash, message, seq) { + this._subscriptions[hash].seq = seq; + this._subscriptions[hash].head = message; } } diff --git a/test/orbit-client-tests.js b/test/orbit-client-tests.js index 2b134cc..3b879d9 100644 --- a/test/orbit-client-tests.js +++ b/test/orbit-client-tests.js @@ -20,7 +20,7 @@ var serverConfig = { // Orbit const host = 'localhost'; -const port = 3006; +const port = 6379; const username = 'testrunner'; const password = ''; @@ -47,7 +47,7 @@ describe('Orbit Client', () => { before(async((done) => { var initialize = () => new Promise(async((resolve, reject) => { - orbit = OrbitClient.connect(`${host}:${port}`, username, password); + orbit = OrbitClient.connect(host, port, username, password); orbit.channel(channel, '').delete(); resolve(); }));