diff --git a/.gitignore b/.gitignore index 1be490e..ea32c98 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ node_modules/ debug.log WIP/ +.vagrant/ diff --git a/examples/pubsubReader.js b/examples/pubsubReader.js index 2588212..acd532e 100644 --- a/examples/pubsubReader.js +++ b/examples/pubsubReader.js @@ -26,9 +26,11 @@ let run = (async(() => { let timer = new Timer(true); running = true; - channel.add(id + count); + // channel.add(id + count); - let items = channel.iterator({ limit: 20 }).collect(); + console.log("Query..."); + let items = channel.iterator({ limit: 1 }).collect(); + console.log(`Found items ${items.length} items`); var g = items.filter((e) => e.item.Payload.startsWith(id)) var prev = -1; diff --git a/src/OrbitClient.js b/src/OrbitClient.js index 792c9c9..c8e1f4e 100644 --- a/src/OrbitClient.js +++ b/src/OrbitClient.js @@ -1,6 +1,5 @@ 'use strict'; -var a = require('async'); var async = require('asyncawait/async'); var await = require('asyncawait/await'); var Keystore = require('orbit-common/lib/Keystore'); @@ -31,7 +30,10 @@ class OrbitClient { channel(hash, password) { if(password === undefined) password = ''; - this._pubsub.subscribe(hash, password); + this._pubsub.subscribe(hash, password, async((hash, message, seq) => { + // let m = Aggregator._fetchOne(this.ipfs, message, password); + // console.log(">", message); + })); return { info: (options) => this._info(hash, password), @@ -206,6 +208,7 @@ class OrbitClient { // this.client = this._pubsub._client; // this.user = this.client.info.user; this.user = { id: 'hello' } + console.log("Connected to redis") // this.network = { // id: this.client.info.networkId, // name: this.client.info.name, diff --git a/src/PubSub.js b/src/PubSub.js index 2acd3ea..51ceacc 100644 --- a/src/PubSub.js +++ b/src/PubSub.js @@ -11,8 +11,8 @@ class PubSub { this._subscriptions = {}; this.client1 = redis.createClient({ host: host, port: port }); this.client2 = redis.createClient({ host: host, port: port }); - this.publishQueue = []; this.client1.on("message", this._handleMessage.bind(this)); + this.publishQueue = []; } subscribe(hash, password, callback) { @@ -51,17 +51,20 @@ class PubSub { _handleMessage(hash, event) { if(this._subscriptions[hash]) { - var e = JSON.parse(event) - var newHead = e.hash; - var seq = e.seq; + var message = JSON.parse(event) + var newHead = message.hash; + var seq = message.seq; var isNewer = seq > this._subscriptions[hash].seq; + var item = this.publishQueue[this.publishQueue.length - 1]; - var item = this.publishQueue[this.publishQueue.length - 1]; - if(item && item.hash === newHead) { - item.callback(isNewer); + // console.log(".", newHead, item ? item.hash : '') + + if(item) { + item.callback(isNewer && newHead === item.hash); this.publishQueue.pop(); } + // console.log(isNewer, seq, this._subscriptions[hash].seq) if(isNewer) this._updateSubscription(hash, newHead, seq); } @@ -70,6 +73,7 @@ class PubSub { _updateSubscription(hash, message, seq) { this._subscriptions[hash].seq = seq; this._subscriptions[hash].head = message; + this._subscriptions[hash].callback(hash, message, seq); } }