This commit is contained in:
haad 2016-02-10 17:04:23 +01:00
parent 78481cd96a
commit 079ea9853b
4 changed files with 21 additions and 11 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
node_modules/
debug.log
WIP/
.vagrant/

View File

@ -26,9 +26,11 @@ let run = (async(() => {
let timer = new Timer(true);
running = true;
channel.add(id + count);
// channel.add(id + count);
let items = channel.iterator({ limit: 20 }).collect();
console.log("Query...");
let items = channel.iterator({ limit: 1 }).collect();
console.log(`Found items ${items.length} items`);
var g = items.filter((e) => e.item.Payload.startsWith(id))
var prev = -1;

View File

@ -1,6 +1,5 @@
'use strict';
var a = require('async');
var async = require('asyncawait/async');
var await = require('asyncawait/await');
var Keystore = require('orbit-common/lib/Keystore');
@ -31,7 +30,10 @@ class OrbitClient {
channel(hash, password) {
if(password === undefined) password = '';
this._pubsub.subscribe(hash, password);
this._pubsub.subscribe(hash, password, async((hash, message, seq) => {
// let m = Aggregator._fetchOne(this.ipfs, message, password);
// console.log(">", message);
}));
return {
info: (options) => this._info(hash, password),
@ -206,6 +208,7 @@ class OrbitClient {
// this.client = this._pubsub._client;
// this.user = this.client.info.user;
this.user = { id: 'hello' }
console.log("Connected to redis")
// this.network = {
// id: this.client.info.networkId,
// name: this.client.info.name,

View File

@ -11,8 +11,8 @@ class PubSub {
this._subscriptions = {};
this.client1 = redis.createClient({ host: host, port: port });
this.client2 = redis.createClient({ host: host, port: port });
this.publishQueue = [];
this.client1.on("message", this._handleMessage.bind(this));
this.publishQueue = [];
}
subscribe(hash, password, callback) {
@ -51,17 +51,20 @@ class PubSub {
_handleMessage(hash, event) {
if(this._subscriptions[hash]) {
var e = JSON.parse(event)
var newHead = e.hash;
var seq = e.seq;
var message = JSON.parse(event)
var newHead = message.hash;
var seq = message.seq;
var isNewer = seq > this._subscriptions[hash].seq;
var item = this.publishQueue[this.publishQueue.length - 1];
var item = this.publishQueue[this.publishQueue.length - 1];
if(item && item.hash === newHead) {
item.callback(isNewer);
// console.log(".", newHead, item ? item.hash : '')
if(item) {
item.callback(isNewer && newHead === item.hash);
this.publishQueue.pop();
}
// console.log(isNewer, seq, this._subscriptions[hash].seq)
if(isNewer)
this._updateSubscription(hash, newHead, seq);
}
@ -70,6 +73,7 @@ class PubSub {
_updateSubscription(hash, message, seq) {
this._subscriptions[hash].seq = seq;
this._subscriptions[hash].head = message;
this._subscriptions[hash].callback(hash, message, seq);
}
}