WOrking version (WIP)

This commit is contained in:
haad
2016-02-10 13:43:07 +01:00
parent c39c8a9983
commit 78481cd96a
7 changed files with 268 additions and 116 deletions

View File

@@ -0,0 +1,56 @@
'use strict';
var async = require('asyncawait/async');
var OrbitClient = require('../src/OrbitClient');
var Timer = require('./Timer');
// Redis host
var host = '188.166.73.174';
var port = 6379;
var username = 'testrunner';
var password = '';
let run = (async(() => {
try {
// Connect
var orbit = OrbitClient.connect(host, port, username, password);
const id = process.argv[2] ? process.argv[2] : 'a';
const channelName = 'c1';
const channel = orbit.channel(channelName);
// Metrics
let totalQueries = 0;
let seconds = 0;
let queriesPerSecond = 0;
let lastTenSeconds = 0;
// Metrics output
setInterval(() => {
seconds ++;
if(seconds % 10 === 0) {
console.log(`--> Average of ${lastTenSeconds/10} q/s in the last 10 seconds`)
lastTenSeconds = 0
}
console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`)
queriesPerSecond = 0;
}, 1000);
while(true) {
channel.add(id + totalQueries);
totalQueries ++;
lastTenSeconds ++;
queriesPerSecond ++;
}
} catch(e) {
console.error("error:", e);
console.error(e.stack);
process.exit(1);
}
}))();
module.exports = run;

67
examples/pubsubReader.js Normal file
View File

@@ -0,0 +1,67 @@
'use strict';
var async = require('asyncawait/async');
var await = require('asyncawait/await');
var OrbitClient = require('../src/OrbitClient');
var Timer = require('./Timer');
var host = '188.166.73.174';
var port = 6379;
var username = 'LambOfGod';
var password = '';
let run = (async(() => {
try {
var orbit = OrbitClient.connect(host, port, username, password);
const c1 = 'c1';
const channel = orbit.channel(c1);
let count = 1;
let id = 'Log: Query '
let running = false;
setInterval(async(() => {
if(!running) {
let timer = new Timer(true);
running = true;
channel.add(id + count);
let items = channel.iterator({ limit: 20 }).collect();
var g = items.filter((e) => e.item.Payload.startsWith(id))
var prev = -1;
g.reverse().forEach((e) => {
var a = parseInt(e.item.Payload.replace(id, ''));
if(prev > -1 && prev + 1 !== a) {
console.log("MISSING VALUE!!!", prev + 1, items)
process.exit(1);
}
prev = a;
})
items = items.map((e) => {
return e.item.seq + " | " + e.item.Payload;
});
console.log("---------------------------------------------------")
console.log("Seq | Payload")
console.log("---------------------------------------------------")
console.log(items.join("\n"));
console.log("---------------------------------------------------")
console.log(`Query #${count} took ${timer.stop(true)} ms\n`);
running = false;
count ++;
}
}), 1000);
} catch(e) {
console.error("error:", e);
console.error(e.stack);
process.exit(1);
}
}))();
module.exports = run;

View File

@@ -4,24 +4,20 @@ var async = require('asyncawait/async');
var OrbitClient = require('../src/OrbitClient');
var Timer = require('./Timer');
var host = 'localhost:3006';
// Redis host
var host = 'localhost';
var port = '6379'
var username = 'testrunner';
var password = '';
var util = require('util');
var exec = require('child_process').exec;
function clear(cb){
exec('clear', function(error, stdout, stderr){
util.puts(stdout);
cb();
});
}
let run = (async(() => {
try {
// Connect
var orbit = OrbitClient.connect(host, username, password);
var orbit = OrbitClient.connect(host, port, username, password);
/* var timer = new Timer(true);
@@ -60,43 +56,49 @@ let run = (async(() => {
});
console.log(JSON.stringify(items, null, 2));
*/
const id = 'a';
const id = process.argv[2] ? process.argv[2] : 'a';
const c1 = 'c1';
const cc = orbit.channel(c1);
let i = 0;
let running = false;
let missCount = 0;
setInterval(async(() => {
if(!running) {
let timer = new Timer(true);
running = true;
// orbit.channel(c1).add("hello at #" + i);
cc.add(id + i);
i ++;
console.log(`Insert took ${timer.stop(true)} ms`);
let seconds = 0;
let round = 0;
let lastTen = 0;
let timer2 = new Timer(true);
var items = cc.iterator({ limit: 10 }).collect();
console.log("Iterator took " + timer2.stop(true) + " ms");
items = items.map((e) => {
return e.item;
});
// Metrics
setInterval(() => {
seconds ++;
var g = items.filter((e) => e.Payload.startsWith(id))
var prev = -1;
g.reverse().forEach((e) => {
var a = parseInt(e.Payload.replace(id, ''));
if(prev > -1 && prev + 1 !== a) {
console.log("Missing message: " + id, prev + 1)
}
prev = a;
})
console.log(JSON.stringify(items.map((e) => e.seq + " - " + e.Payload), null, 2));
// console.log("\n\n");
running = false;
if(seconds % 10 === 0) {
console.log(`--> Average of ${lastTen/10} q/s in the last 10 seconds`)
lastTen = 0
}
}), 50);
console.log(`${round} queries per second, ${i} queries in ${seconds} seconds`)
round = 0;
}, 1000);
while(true) {
cc.add(id + i);
i ++;
lastTen ++;
round ++;
// let items = cc.iterator({ limit: 10 }).collect();
// items = items.map((e) => e.item);
// let g = items.filter((e) => e.Payload.startsWith(id))
// let prev = -1;
// g.reverse().forEach((e) => {
// const a = parseInt(e.Payload.replace(id, ''));
// if(prev > -1 && prev + 1 !== a) {
// console.log("!! Missing message: " + id, prev + 1)
// process.exit(1);
// }
// prev = a;
// })
}
/*
// You can also get the event based on its hash
var value = orbit.channel(c1).get(hash2);

View File

@@ -1,10 +1,13 @@
'use strict';
var async = require('asyncawait/async');
var await = require('asyncawait/await');
var OrbitClient = require('../src/OrbitClient');
var Timer = require('./Timer');
var host = 'localhost:3006';
var host = 'localhost';
var port = 6379;
var username = 'LambOfGod';
var password = '';
@@ -53,45 +56,74 @@ let run = (async(() => {
// orbit.channel(channel, '').remove(items[items.length - 8].hash); // 9
*/
var orbit = OrbitClient.connect(host, username, password);
var orbit = OrbitClient.connect(host, port, username, password);
const c1 = 'c1';
const cc = orbit.channel(c1);
let i = 0;
let i = 1;
let id = 'b'
let running = false;
setInterval(async(() => {
try {
if(!running) {
let timer = new Timer(true);
running = true;
if(!running) {
let timer = new Timer(true);
running = true;
cc.add("b" + i);
await(cc.add(id + i));
let items = cc.iterator({ limit: 10 }).collect();
let items = cc.iterator({ limit: 10 }).collect();
var g = items.filter((e) => e.item.Payload.startsWith('b'))
var prev = -1;
g.reverse().forEach((e) => {
var a = parseInt(e.item.Payload.replace('b', ''));
if(prev > -1 && prev + 1 !== a) {
console.log("MISSSS!!!", prev + 1)
}
prev = a;
})
var g = items.filter((e) => e.item.Payload.startsWith(id))
var prev = -1;
g.reverse().forEach((e) => {
var a = parseInt(e.item.Payload.replace(id, ''));
if(prev > -1 && prev + 1 !== a) {
console.log("MISSSS!!!", prev + 1, items)
process.exit(1);
}
prev = a;
})
items = items.map((e) => {
return e.item.seq + " - " + e.item.Payload;
});
console.log(JSON.stringify(items, null, 2));
console.log(`Query took ${timer.stop(true)} ms`);
running = false;
}
// console.log("\n\n");
} catch(e) {
console.error(e);
items = items.map((e) => {
return e.item.seq + " - " + e.item.Payload;
});
console.log(JSON.stringify(items, null, 2));
console.log(`Query ${i} took ${timer.stop(true)} ms`);
running = false;
i ++;
}
i ++;
}), 100);
// while(true) {
// }
}), 1000);
// setInterval(async(() => {
// if(!running) {
// let timer = new Timer(true);
// running = true;
// await(cc.add(id + i));
// let items = cc.iterator({ limit: 10 }).collect();
// var g = items.filter((e) => e.item.Payload.startsWith(id))
// var prev = -1;
// g.reverse().forEach((e) => {
// var a = parseInt(e.item.Payload.replace(id, ''));
// if(prev > -1 && prev + 1 !== a) {
// console.log("MISSSS!!!", prev + 1, items)
// process.exit(1);
// }
// prev = a;
// })
// items = items.map((e) => {
// return e.item.seq + " - " + e.item.Payload;
// });
// console.log(JSON.stringify(items, null, 2));
// console.log(`Query took ${timer.stop(true)} ms`);
// running = false;
// i ++;
// }
// }), 36);
} catch(e) {
console.error("error:", e);

View File

@@ -1,5 +1,6 @@
'use strict';
var a = require('async');
var async = require('asyncawait/async');
var await = require('asyncawait/await');
var Keystore = require('orbit-common/lib/Keystore');
@@ -135,6 +136,7 @@ class OrbitClient {
// Get the current channel head and bump the sequence number
let seq = this._info(channel, password).seq + 1;
let head = this._info(channel, password).head;
// Create the hash cache item
const hcItem = new HashCacheItem(operation, key, seq, target, metaInfo, null, pubkey, privkey, password);
@@ -145,16 +147,16 @@ class OrbitClient {
// If this is not the first item in the channel, patch with the previous (ie. link as next)
if(seq > 0)
newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, this._info(channel, password).head));
newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, head));
return newHead;
return { hash: newHead, seq: seq };
}
/* DB Operations */
_add(channel, password, data) {
const post = this._publish(data);
const key = post.Hash;
await(this._createOperation(channel, password, HashCacheOps.Add, key, post.Hash));
await(this._createOperation(channel, password, HashCacheOps.Add, key, post.Hash, data));
return key;
}
@@ -169,14 +171,12 @@ class OrbitClient {
return this._createOperation(channel, password, HashCacheOps.Delete, key, target);
}
_createOperation(channel, password, operation, key, value) {
_createOperation(channel, password, operation, key, value, data) {
let message, res = false;
while(!res) {
message = this._createMessage(channel, password, operation, key, value);
res = await(this._pubsub.publish(channel, message));
if(!res) console.log("Retry <-->")
res = await(this._pubsub.publish(channel, message.hash, message.seq));
}
// console.log("Posted!")
return message.Hash;
}
@@ -201,8 +201,8 @@ class OrbitClient {
return l;
}
_connect(host, username, password) {
this._pubsub = new PubSub(this.ipfs, host, username, password);
_connect(host, port, username, password) {
this._pubsub = new PubSub(this.ipfs, host, port, username, password);
// this.client = this._pubsub._client;
// this.user = this.client.info.user;
this.user = { id: 'hello' }
@@ -215,14 +215,14 @@ class OrbitClient {
}
class OrbitClientFactory {
static connect(host, username, password, ipfs) {
static connect(host, port, username, password, ipfs) {
if(!ipfs) {
let ipfsd = await(ipfsDaemon());
ipfs = ipfsd.daemon;
}
const client = new OrbitClient(ipfs);
await(client._connect(host, username, password))
await(client._connect(host, port, username, password))
return client;
}
}

View File

@@ -6,35 +6,13 @@ var redis = require("redis");
var Aggregator = require('./Aggregator');
class PubSub {
constructor(ipfs, host, username, password) {
constructor(ipfs, host, port, username, password) {
this.ipfs = ipfs;
this._subscriptions = {};
this.client1 = redis.createClient();
this.client2 = redis.createClient();
this.currentPost = null;
this.client1.on("message", async((hash, message) => {
const currentHead = this._subscriptions[hash] ? this._subscriptions[hash].head : null;
if(this._subscriptions[hash]) {
let item = Aggregator._fetchOne(this.ipfs, message, this._subscriptions[hash].password);
if(item.seq > this._subscriptions[hash].seq) {
this._subscriptions[hash].seq = item.seq;
if(currentHead !== message)
this._handleNewMessage(hash, message);
if(this.currentPost) {
if(message === this.currentPost.hash) {
this.currentPost.callback(true);
this.currentPost = null;
} else {
this.currentPost.callback(false);
}
}
}
}
}));
this.client1 = redis.createClient({ host: host, port: port });
this.client2 = redis.createClient({ host: host, port: port });
this.publishQueue = [];
this.client1.on("message", this._handleMessage.bind(this));
}
subscribe(hash, password, callback) {
@@ -56,10 +34,10 @@ class PubSub {
this.client2.unsubscribe();
}
publish(hash, message, callback) {
publish(hash, message, seq, callback) {
return new Promise((resolve, reject) => {
this.currentPost = { hash: message.Hash, callback: resolve };
this.client2.publish(hash, message.Hash);
this.publishQueue.splice(0, 0, { hash: message.Hash, callback: resolve });
this.client2.publish(hash, JSON.stringify({ hash: message.Hash, seq: seq }));
});
}
@@ -71,10 +49,27 @@ class PubSub {
delete this._subscriptions[hash];
}
_handleNewMessage(hash, newHead) {
this._subscriptions[hash].head = newHead;
if(this._subscriptions[hash].callback)
this._subscriptions[hash].callback(hash, newHead);
_handleMessage(hash, event) {
if(this._subscriptions[hash]) {
var e = JSON.parse(event)
var newHead = e.hash;
var seq = e.seq;
var isNewer = seq > this._subscriptions[hash].seq;
var item = this.publishQueue[this.publishQueue.length - 1];
if(item && item.hash === newHead) {
item.callback(isNewer);
this.publishQueue.pop();
}
if(isNewer)
this._updateSubscription(hash, newHead, seq);
}
}
_updateSubscription(hash, message, seq) {
this._subscriptions[hash].seq = seq;
this._subscriptions[hash].head = message;
}
}

View File

@@ -20,7 +20,7 @@ var serverConfig = {
// Orbit
const host = 'localhost';
const port = 3006;
const port = 6379;
const username = 'testrunner';
const password = '';
@@ -47,7 +47,7 @@ describe('Orbit Client', () => {
before(async((done) => {
var initialize = () => new Promise(async((resolve, reject) => {
orbit = OrbitClient.connect(`${host}:${port}`, username, password);
orbit = OrbitClient.connect(host, port, username, password);
orbit.channel(channel, '').delete();
resolve();
}));