Pubsub proto (WIP)

This commit is contained in:
haad 2016-02-09 17:01:39 +01:00
parent bfb7dfc311
commit c39c8a9983
6 changed files with 188 additions and 99 deletions

View File

@ -8,12 +8,22 @@ var host = 'localhost:3006';
var username = 'testrunner'; var username = 'testrunner';
var password = ''; var password = '';
var util = require('util');
var exec = require('child_process').exec;
function clear(cb){
exec('clear', function(error, stdout, stderr){
util.puts(stdout);
cb();
});
}
let run = (async(() => { let run = (async(() => {
try { try {
// Connect // Connect
var orbit = OrbitClient.connect(host, username, password); var orbit = OrbitClient.connect(host, username, password);
var timer = new Timer(true); /* var timer = new Timer(true);
console.log("-------- KV store -------") console.log("-------- KV store -------")
var channel = 'keyspace1' var channel = 'keyspace1'
@ -49,10 +59,44 @@ let run = (async(() => {
return { key: e.item.key, val: e.item.Payload }; return { key: e.item.key, val: e.item.Payload };
}); });
console.log(JSON.stringify(items, null, 2)); console.log(JSON.stringify(items, null, 2));
*/
const id = 'a';
const c1 = 'c1';
const cc = orbit.channel(c1);
let i = 0;
let running = false;
let missCount = 0;
setInterval(async(() => { setInterval(async(() => {
orbit.channel(c1).add("hello at " + new Date().getTime()); if(!running) {
}), 1234); let timer = new Timer(true);
running = true;
// orbit.channel(c1).add("hello at #" + i);
cc.add(id + i);
i ++;
console.log(`Insert took ${timer.stop(true)} ms`);
let timer2 = new Timer(true);
var items = cc.iterator({ limit: 10 }).collect();
console.log("Iterator took " + timer2.stop(true) + " ms");
items = items.map((e) => {
return e.item;
});
var g = items.filter((e) => e.Payload.startsWith(id))
var prev = -1;
g.reverse().forEach((e) => {
var a = parseInt(e.Payload.replace(id, ''));
if(prev > -1 && prev + 1 !== a) {
console.log("Missing message: " + id, prev + 1)
}
prev = a;
})
console.log(JSON.stringify(items.map((e) => e.seq + " - " + e.Payload), null, 2));
// console.log("\n\n");
running = false;
}
}), 50);
/* /*
// You can also get the event based on its hash // You can also get the event based on its hash
var value = orbit.channel(c1).get(hash2); var value = orbit.channel(c1).get(hash2);

View File

@ -5,12 +5,12 @@ var OrbitClient = require('../src/OrbitClient');
var Timer = require('./Timer'); var Timer = require('./Timer');
var host = 'localhost:3006'; var host = 'localhost:3006';
var username = 'testrunner'; var username = 'LambOfGod';
var password = ''; var password = '';
let run = (async(() => { let run = (async(() => {
try { try {
var channel = 'hello-world-test1' /* var channel = 'hello-world-test1'
// Connect // Connect
var orbit = OrbitClient.connect(host, username, password); var orbit = OrbitClient.connect(host, username, password);
@ -51,6 +51,47 @@ let run = (async(() => {
// orbit.channel(channel, '').remove(items[items.length - 10].hash); // 11 // orbit.channel(channel, '').remove(items[items.length - 10].hash); // 11
// orbit.channel(channel, '').remove(items[items.length - 9].hash); // 10 // orbit.channel(channel, '').remove(items[items.length - 9].hash); // 10
// orbit.channel(channel, '').remove(items[items.length - 8].hash); // 9 // orbit.channel(channel, '').remove(items[items.length - 8].hash); // 9
*/
var orbit = OrbitClient.connect(host, username, password);
const c1 = 'c1';
const cc = orbit.channel(c1);
let i = 0;
let running = false;
setInterval(async(() => {
try {
if(!running) {
let timer = new Timer(true);
running = true;
cc.add("b" + i);
let items = cc.iterator({ limit: 10 }).collect();
var g = items.filter((e) => e.item.Payload.startsWith('b'))
var prev = -1;
g.reverse().forEach((e) => {
var a = parseInt(e.item.Payload.replace('b', ''));
if(prev > -1 && prev + 1 !== a) {
console.log("MISSSS!!!", prev + 1)
}
prev = a;
})
items = items.map((e) => {
return e.item.seq + " - " + e.item.Payload;
});
console.log(JSON.stringify(items, null, 2));
console.log(`Query took ${timer.stop(true)} ms`);
running = false;
}
// console.log("\n\n");
} catch(e) {
console.error(e);
}
i ++;
}), 100);
} catch(e) { } catch(e) {
console.error("error:", e); console.error("error:", e);

View File

@ -13,6 +13,7 @@
"bluebird": "^3.1.1", "bluebird": "^3.1.1",
"bs58": "^3.0.0", "bs58": "^3.0.0",
"orbit-common": "^0.1.0", "orbit-common": "^0.1.0",
"redis": "^2.4.2",
"unirest": "^0.4.2" "unirest": "^0.4.2"
}, },
"devDependencies": { "devDependencies": {

View File

@ -18,6 +18,8 @@ var PubSub = require('./PubSub');
var pubkey = Keystore.getKeys().publicKey; var pubkey = Keystore.getKeys().publicKey;
var privkey = Keystore.getKeys().privateKey; var privkey = Keystore.getKeys().privateKey;
let vvv = {};
class OrbitClient { class OrbitClient {
constructor(ipfs) { constructor(ipfs) {
this.ipfs = ipfs; this.ipfs = ipfs;
@ -28,14 +30,7 @@ class OrbitClient {
channel(hash, password) { channel(hash, password) {
if(password === undefined) password = ''; if(password === undefined) password = '';
this._pubsub.subscribe(hash, password, (channel, message) => { this._pubsub.subscribe(hash, password);
const m = this._getMessages(hash, password, { gte: message });
m.forEach((e) => {
const userData = await(ipfsAPI.getObject(this.ipfs, e.item.meta.from))
const user = JSON.parse(userData.Data)["user"];
console.log(`${user}>`, e.item.Payload, `(op: ${e.item.op}, ${e.item.key})`);
});
});
return { return {
info: (options) => this._info(hash, password), info: (options) => this._info(hash, password),
@ -92,11 +87,8 @@ class OrbitClient {
let startFromHash; let startFromHash;
if(lte || lt) { if(lte || lt) {
startFromHash = lte ? lte : lt; startFromHash = lte ? lte : lt;
} else if (gte || gt) {
startFromHash = gte ? gte : gt;
} else { } else {
// var channel = await (this.client.linkedList(channel, password).head()); var channel = this._info(channel, password);
var channel = this._pubsub.latest(channel);
startFromHash = channel.head ? channel.head : null; startFromHash = channel.head ? channel.head : null;
} }
@ -137,19 +129,13 @@ class OrbitClient {
} }
_createMessage(channel, password, operation, key, target) { _createMessage(channel, password, operation, key, target) {
// Get the current channel head and bump the sequence number
let seq = 0;
// const currentHead = await(this.client.linkedList(channel, password).head())
const currentHead = this._pubsub.latest(channel);
if(currentHead.head) {
const headItem = await (ipfsAPI.getObject(this.ipfs, currentHead.head));
seq = JSON.parse(headItem.Data)["seq"] + 1;
}
// Create meta info // Create meta info
const size = -1; const size = -1;
const metaInfo = new MetaInfo(ItemTypes.Message, size, this.user.id, new Date().getTime()); const metaInfo = new MetaInfo(ItemTypes.Message, size, this.user.id, new Date().getTime());
// Get the current channel head and bump the sequence number
let seq = this._info(channel, password).seq + 1;
// Create the hash cache item // Create the hash cache item
const hcItem = new HashCacheItem(operation, key, seq, target, metaInfo, null, pubkey, privkey, password); const hcItem = new HashCacheItem(operation, key, seq, target, metaInfo, null, pubkey, privkey, password);
@ -159,7 +145,7 @@ class OrbitClient {
// If this is not the first item in the channel, patch with the previous (ie. link as next) // If this is not the first item in the channel, patch with the previous (ie. link as next)
if(seq > 0) if(seq > 0)
newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, currentHead.head)); newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, this._info(channel, password).head));
return newHead; return newHead;
} }
@ -168,13 +154,13 @@ class OrbitClient {
_add(channel, password, data) { _add(channel, password, data) {
const post = this._publish(data); const post = this._publish(data);
const key = post.Hash; const key = post.Hash;
this._createOperation(channel, password, HashCacheOps.Add, key, post.Hash); await(this._createOperation(channel, password, HashCacheOps.Add, key, post.Hash));
return key; return key;
} }
_put(channel, password, key, data) { _put(channel, password, key, data) {
const post = this._publish(data); const post = this._publish(data);
return this._createOperation(channel, password, HashCacheOps.Put, key, post.Hash); return await(this._createOperation(channel, password, HashCacheOps.Put, key, post.Hash));
} }
_remove(channel, password, options) { _remove(channel, password, options) {
@ -184,15 +170,18 @@ class OrbitClient {
} }
_createOperation(channel, password, operation, key, value) { _createOperation(channel, password, operation, key, value) {
const message = this._createMessage(channel, password, operation, key, value); let message, res = false;
// await(this.client.linkedList(channel, password).add(message.Hash)); while(!res) {
this._pubsub.publish(channel, message.Hash) message = this._createMessage(channel, password, operation, key, value);
res = await(this._pubsub.publish(channel, message));
if(!res) console.log("Retry <-->")
}
// console.log("Posted!")
return message.Hash; return message.Hash;
} }
_deleteChannel(channel, password) { _deleteChannel(channel, password) {
// await(this.client.linkedList(channel, password).delete()); this._pubsub.delete(channel, password);
this._pubsub.delete(channel);
return true; return true;
} }
@ -202,26 +191,26 @@ class OrbitClient {
m.push(modes); m.push(modes);
else else
m = modes; m = modes;
const res = await(this.client.linkedList(channel, password).setMode(m)); // const res = await(this.client.linkedList(channel, password).setMode(m));
return res.modes; // return res.modes;
return { todo: 'TODO!' }
} }
_info(channel, password) { _info(channel, password) {
// return await(this.client.linkedList(channel, password).head()); var l = this._pubsub.latest(channel, password);
var l = this._pubsub.latest(channel);
return l; return l;
} }
_connect(host, username, password) { _connect(host, username, password) {
this._pubsub = new PubSub(host, username, password); this._pubsub = new PubSub(this.ipfs, host, username, password);
// this.client = await(HashCache.connect(host, username, password)); // this.client = this._pubsub._client;
this.client = this._pubsub._client; // this.user = this.client.info.user;
this.user = this.client.info.user; this.user = { id: 'hello' }
this.network = { // this.network = {
id: this.client.info.networkId, // id: this.client.info.networkId,
name: this.client.info.name, // name: this.client.info.name,
config: this.client.info.config // config: this.client.info.config
}; // };
} }
} }

View File

@ -2,68 +2,81 @@
var async = require('asyncawait/async'); var async = require('asyncawait/async');
var await = require('asyncawait/await'); var await = require('asyncawait/await');
var HashCache = require('./HashCacheClient'); var redis = require("redis");
var Aggregator = require('./Aggregator');
class PubSub { class PubSub {
constructor(host, username, password) { constructor(ipfs, host, username, password) {
this._subscriptions = []; this.ipfs = ipfs;
this._messages = {}; this._subscriptions = {};
this._client = await(HashCache.connect(host, username, password)); this.client1 = redis.createClient();
this.client2 = redis.createClient();
this.currentPost = null;
// Poll for the new head this.client1.on("message", async((hash, message) => {
setInterval(async(() => { const currentHead = this._subscriptions[hash] ? this._subscriptions[hash].head : null;
Object.keys(this._subscriptions).forEach(this._poll.bind(this)); if(this._subscriptions[hash]) {
}), 500); let item = Aggregator._fetchOne(this.ipfs, message, this._subscriptions[hash].password);
if(item.seq > this._subscriptions[hash].seq) {
this._subscriptions[hash].seq = item.seq;
if(currentHead !== message)
this._handleNewMessage(hash, message);
if(this.currentPost) {
if(message === this.currentPost.hash) {
this.currentPost.callback(true);
this.currentPost = null;
} else {
this.currentPost.callback(false);
}
}
}
}
}));
} }
_poll(hash) { subscribe(hash, password, callback) {
const currentHead = this._subscriptions[hash].head; if(!this._subscriptions[hash] || this._subscriptions[hash].password !== password) {
const channel = await(this._client.linkedList(hash, this._subscriptions[hash].password).head()); this._subscriptions[hash] = {
const newHead = channel.head; topic: hash,
if(currentHead !== newHead) { password: password,
// console.log("NEW HEAD!", newHead); head: null,
callback: callback,
seq: -1
};
this.client1.subscribe(hash);
}
}
unsubscribe(hash) {
delete this._subscriptions[hash];
this.client1.unsubscribe();
this.client2.unsubscribe();
}
publish(hash, message, callback) {
return new Promise((resolve, reject) => {
this.currentPost = { hash: message.Hash, callback: resolve };
this.client2.publish(hash, message.Hash);
});
}
latest(hash, password) {
return { head: this._subscriptions[hash].head, modes: {}, seq: this._subscriptions[hash].seq };
}
delete(hash, password) {
delete this._subscriptions[hash];
}
_handleNewMessage(hash, newHead) {
this._subscriptions[hash].head = newHead; this._subscriptions[hash].head = newHead;
if(!this._messages[hash])
this._messages[hash] = [];
this._messages[hash].push(newHead);
if(this._subscriptions[hash].callback) if(this._subscriptions[hash].callback)
this._subscriptions[hash].callback(hash, newHead); this._subscriptions[hash].callback(hash, newHead);
} }
}
subscribe(channel, password, callback) {
if(!this._subscriptions[channel] || this._subscriptions[channel].password !== password) {
console.log("SUBSCRIBE:", channel);
this._subscriptions[channel] = {
channel: channel,
password: password,
head: null,
callback: callback
};
}
}
unsubscribe(channel) {
delete this._subscriptions[channel];
delete this._messages[channel];
}
publish(hash, message) {
if(!this._messages[hash]) this._messages[hash] = [];
await(this._client.linkedList(hash, this._subscriptions[hash].password).add(message));
}
latest(hash) {
return { head: this._messages[hash] && this._messages[hash].length > 0 ? this._messages[hash][this._messages[hash].length - 1] : null, modes: {} };
}
delete(hash) {
this._messages[hash] = [];
}
} }
module.exports = PubSub; module.exports = PubSub;

View File

@ -70,12 +70,12 @@ describe('Orbit Client', () => {
describe('Connect', function() { describe('Connect', function() {
it('connects to hash-cache-server', async((done) => { it('connects to hash-cache-server', async((done) => {
assert.notEqual(orbit, null); assert.notEqual(orbit, null);
assert.notEqual(orbit.client, null); // assert.notEqual(orbit.client, null);
assert.equal(orbit.user.id, 'Qmf5A5RSTQmcfvigT3j29Fqh2fAHRANk5ooBYKdWsPtr8U'); // assert.equal(orbit.user.id, 'hello');
assert.equal(orbit.network.id, serverConfig.networkId); // assert.equal(orbit.network.id, serverConfig.networkId);
assert.equal(orbit.network.name, serverConfig.networkName); // assert.equal(orbit.network.name, serverConfig.networkName);
assert.notEqual(orbit.network.config.SupernodeRouting, null); // assert.notEqual(orbit.network.config.SupernodeRouting, null);
assert.notEqual(orbit.network.config.Bootstrap.length, 0); // assert.notEqual(orbit.network.config.Bootstrap.length, 0);
done(); done();
})); }));
}); });
@ -389,6 +389,7 @@ describe('Orbit Client', () => {
var iter = orbit.channel(channel, '').iterator({ gte: gte, limit: -1 }); var iter = orbit.channel(channel, '').iterator({ gte: gte, limit: -1 });
var messages = iter.collect().map((e) => e.hash); var messages = iter.collect().map((e) => e.hash);
// console.log(messages, all)
assert.equal(messages.length, 2); assert.equal(messages.length, 2);
assert.equal(messages[0], all[0].hash); assert.equal(messages[0], all[0].hash);
assert.equal(messages[1], all[1].hash); assert.equal(messages[1], all[1].hash);