Timeouts, save head in redis

This commit is contained in:
haad
2016-02-11 11:57:10 +01:00
parent d60f8d66a2
commit 61e063a7ed
3 changed files with 53 additions and 17 deletions

View File

@@ -26,10 +26,10 @@ let run = (async(() => {
let timer = new Timer(true);
running = true;
channel.add(id + count);
// channel.add(id + count);
console.log("Query...");
let items = channel.iterator({ limit: 3 }).collect();
let items = channel.iterator({ limit: 1 }).collect();
console.log(`Found items ${items.length} items`);
var g = items.filter((e) => e.item.Payload.startsWith(id))

View File

@@ -30,10 +30,11 @@ class OrbitClient {
channel(hash, password) {
if(password === undefined) password = '';
this._pubsub.subscribe(hash, password, async((hash, message, seq) => {
await(this._pubsub.subscribe(hash, password));
// this._pubsub.subscribe(hash, password, async((hash, message, seq) => {
// let m = Aggregator._fetchOne(this.ipfs, message, password);
// console.log(">", message);
}));
// }));
return {
info: (options) => this._info(hash, password),
@@ -179,7 +180,7 @@ class OrbitClient {
// console.log("posting...")
message = this._createMessage(channel, password, operation, key, value);
res = await(this._pubsub.publish(channel, message.hash, message.seq));
// if(!res) console.log("retry", message)
if(!res) console.log("retry", message)
}
// console.log("posted")
return message.Hash;
@@ -207,15 +208,20 @@ class OrbitClient {
}
_connect(host, port, username, password) {
this._pubsub = new PubSub(this.ipfs, host, port, username, password);
// this.client = this._pubsub._client;
// this.user = this.client.info.user;
this.user = { id: 'hello' }
// this.network = {
// id: this.client.info.networkId,
// name: this.client.info.name,
// config: this.client.info.config
// };
return new Promise((resolve, reject) => {
this._pubsub = new PubSub(this.ipfs, host, port, username, password, resolve);
// this.client = this._pubsub._client;
// this.user = this.client.info.user;
this.user = { id: 'hello' }
// this.network = {
// id: this.client.info.networkId,
// name: this.client.info.name,
// config: this.client.info.config
// };
// setTimeout(() => {
// resolve();
// }, 1000);
});
}
}

View File

@@ -6,13 +6,23 @@ var redis = require("redis");
var Aggregator = require('./Aggregator');
class PubSub {
constructor(ipfs, host, port, username, password) {
constructor(ipfs, host, port, username, password, resolve) {
this.ipfs = ipfs;
this._subscriptions = {};
this.client1 = redis.createClient({ host: host, port: port });
this.client2 = redis.createClient({ host: host, port: port });
this.client3 = redis.createClient({ host: host, port: port });
this.client1.on("message", this._handleMessage.bind(this));
this.publishQueue = [];
this.client1.on('connect', function() {
console.log('redis connected');
resolve();
});
this.client1.on("subscribe", function (channel, count) {
console.log("subscribed to pubsub topic '" + channel + "' (" + count + " peers)");
});
}
subscribe(hash, password, callback) {
@@ -24,8 +34,24 @@ class PubSub {
callback: callback,
seq: -1
};
this.client3.get("orbit." + hash, (err, reply) => {
if(reply) {
let d = JSON.parse(reply);
this._subscriptions[hash].seq = d.seq + 1;
this._subscriptions[hash].head = d.head;
if(err) console.log(err);
console.log(`head of '${hash}' is`, this._subscriptions[hash].head, "seq:", this._subscriptions[hash].seq);
}
});
this.client1.subscribe(hash);
}
return new Promise((resolve, reject) => {
setTimeout(() => {
console.log("pubsub initialized")
resolve();
}, 1000);
});
}
unsubscribe(hash) {
@@ -59,6 +85,7 @@ class PubSub {
}
_handleMessage(hash, event) {
// console.log(".")
if(this._subscriptions[hash]) {
var message = JSON.parse(event)
var newHead = message.hash;
@@ -66,7 +93,7 @@ class PubSub {
var isNewer = seq > this._subscriptions[hash].seq;
var item = this.publishQueue[this.publishQueue.length - 1];
// console.log(".", isNewer, newHead, item ? item.hash : '', seq, this._subscriptions[hash].seq)
// console.log(".", newHead, item ? item.hash : '', seq, this._subscriptions[hash].seq, isNewer)
// console.log(isNewer, seq, this._subscriptions[hash].seq)
if(item) {
@@ -80,9 +107,12 @@ class PubSub {
}
_updateSubscription(hash, message, seq) {
this.client3.set("orbit." + hash, JSON.stringify({ head: message, seq: seq }));
this._subscriptions[hash].seq = seq;
this._subscriptions[hash].head = message;
this._subscriptions[hash].callback(hash, message, seq);
if(this._subscriptions[hash].callback)
this._subscriptions[hash].callback(hash, message, seq);
}
}