This commit is contained in:
haad 2016-02-11 14:10:23 +01:00
parent 61e063a7ed
commit aa619a81c8
3 changed files with 27 additions and 15 deletions

View File

@ -39,12 +39,14 @@ let run = (async(() => {
queriesPerSecond = 0;
}, 1000);
while(true) {
setInterval(async(() => {
// while(true) {
channel.add(id + totalQueries);
totalQueries ++;
lastTenSeconds ++;
queriesPerSecond ++;
}
// }
}), 100);
} catch(e) {
console.error("error:", e);

View File

@ -30,7 +30,11 @@ class OrbitClient {
channel(hash, password) {
if(password === undefined) password = '';
await(this._pubsub.subscribe(hash, password));
await(this._pubsub.subscribe(hash, password, async((hash, message, seq) => {
let m = await(Aggregator._fetchOne(this.ipfs, message, password));
// console.log(">", m.key, m.seq, m.Payload);
})));
// await(this._pubsub.subscribe(hash, password));
// this._pubsub.subscribe(hash, password, async((hash, message, seq) => {
// let m = Aggregator._fetchOne(this.ipfs, message, password);
// console.log(">", message);
@ -149,8 +153,12 @@ class OrbitClient {
let newHead = { Hash: data.Hash };
// If this is not the first item in the channel, patch with the previous (ie. link as next)
if(seq > 0)
newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, head));
try {
if(seq > 0)
newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, head));
} catch(e) {
console.error("!!!!", e)
}
return { hash: newHead, seq: seq };
}
@ -180,7 +188,7 @@ class OrbitClient {
// console.log("posting...")
message = this._createMessage(channel, password, operation, key, value);
res = await(this._pubsub.publish(channel, message.hash, message.seq));
if(!res) console.log("retry", message)
if(!res) console.log("retry", message.hash, message.seq)
}
// console.log("posted")
return message.Hash;

View File

@ -37,7 +37,7 @@ class PubSub {
this.client3.get("orbit." + hash, (err, reply) => {
if(reply) {
let d = JSON.parse(reply);
this._subscriptions[hash].seq = d.seq + 1;
this._subscriptions[hash].seq = d.seq;
this._subscriptions[hash].head = d.head;
if(err) console.log(err);
console.log(`head of '${hash}' is`, this._subscriptions[hash].head, "seq:", this._subscriptions[hash].seq);
@ -62,17 +62,19 @@ class PubSub {
publish(hash, message, seq, callback) {
return new Promise((resolve, reject) => {
if(this.publishQueue.length === 0) {
this.publishQueue.splice(0, 0, { hash: message.Hash, callback: resolve });
this.client2.publish(hash, JSON.stringify({ hash: message.Hash, seq: seq }));
} else {
// console.log("too early")
// resolve(false);
}
// setTimeout(() => {
// console.log("timeout")
// this.publishQueue.pop();
// resolve(false);
// }, 200)
// }, 2000)
// if(this.publishQueue.length === 0) {
this.publishQueue.splice(0, 0, { hash: message.Hash, callback: resolve });
this.client2.publish(hash, JSON.stringify({ hash: message.Hash, seq: seq }));
// } else {
// console.log("too early")
// resolve(false);
// }
});
}