diff --git a/examples/pubsubBenchmark.js b/examples/pubsubBenchmark.js index 94e2273..34a7eb8 100644 --- a/examples/pubsubBenchmark.js +++ b/examples/pubsubBenchmark.js @@ -39,12 +39,14 @@ let run = (async(() => { queriesPerSecond = 0; }, 1000); - while(true) { + setInterval(async(() => { + // while(true) { channel.add(id + totalQueries); totalQueries ++; lastTenSeconds ++; queriesPerSecond ++; - } + // } + }), 100); } catch(e) { console.error("error:", e); diff --git a/src/OrbitClient.js b/src/OrbitClient.js index 5f96dcd..6770636 100644 --- a/src/OrbitClient.js +++ b/src/OrbitClient.js @@ -30,7 +30,11 @@ class OrbitClient { channel(hash, password) { if(password === undefined) password = ''; - await(this._pubsub.subscribe(hash, password)); + await(this._pubsub.subscribe(hash, password, async((hash, message, seq) => { + let m = await(Aggregator._fetchOne(this.ipfs, message, password)); + // console.log(">", m.key, m.seq, m.Payload); + }))); + // await(this._pubsub.subscribe(hash, password)); // this._pubsub.subscribe(hash, password, async((hash, message, seq) => { // let m = Aggregator._fetchOne(this.ipfs, message, password); // console.log(">", message); @@ -149,8 +153,12 @@ class OrbitClient { let newHead = { Hash: data.Hash }; // If this is not the first item in the channel, patch with the previous (ie. link as next) - if(seq > 0) - newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, head)); + try { + if(seq > 0) + newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, head)); + } catch(e) { + console.error("!!!!", e) + } return { hash: newHead, seq: seq }; } @@ -180,7 +188,7 @@ class OrbitClient { // console.log("posting...") message = this._createMessage(channel, password, operation, key, value); res = await(this._pubsub.publish(channel, message.hash, message.seq)); - if(!res) console.log("retry", message) + if(!res) console.log("retry", message.hash, message.seq) } // console.log("posted") return message.Hash; diff --git a/src/PubSub.js b/src/PubSub.js index 70df347..efba9d3 100644 --- a/src/PubSub.js +++ b/src/PubSub.js @@ -37,7 +37,7 @@ class PubSub { this.client3.get("orbit." + hash, (err, reply) => { if(reply) { let d = JSON.parse(reply); - this._subscriptions[hash].seq = d.seq + 1; + this._subscriptions[hash].seq = d.seq; this._subscriptions[hash].head = d.head; if(err) console.log(err); console.log(`head of '${hash}' is`, this._subscriptions[hash].head, "seq:", this._subscriptions[hash].seq); @@ -62,17 +62,19 @@ class PubSub { publish(hash, message, seq, callback) { return new Promise((resolve, reject) => { - if(this.publishQueue.length === 0) { - this.publishQueue.splice(0, 0, { hash: message.Hash, callback: resolve }); - this.client2.publish(hash, JSON.stringify({ hash: message.Hash, seq: seq })); - } else { - // console.log("too early") - // resolve(false); - } // setTimeout(() => { + // console.log("timeout") // this.publishQueue.pop(); // resolve(false); - // }, 200) + // }, 2000) + + // if(this.publishQueue.length === 0) { + this.publishQueue.splice(0, 0, { hash: message.Hash, callback: resolve }); + this.client2.publish(hash, JSON.stringify({ hash: message.Hash, seq: seq })); + // } else { + // console.log("too early") + // resolve(false); + // } }); }