diff --git a/examples/readMessages.js b/examples/readMessages.js index b427519..a789feb 100644 --- a/examples/readMessages.js +++ b/examples/readMessages.js @@ -41,8 +41,8 @@ let run = (async(() => { }); console.log(JSON.stringify(items, null, 2)); - console.log("--> remove", hash1); - orbit.channel(c1).remove({ key: hash1 }); + // console.log("--> remove", hash1); + // orbit.channel(c1).remove({ key: hash1 }); items = orbit.channel(c1).iterator({ limit: -1 }).collect(); items = items.map((e) => { @@ -50,15 +50,21 @@ let run = (async(() => { }); console.log(JSON.stringify(items, null, 2)); + setInterval(async(() => { + orbit.channel(c1).add("hello at " + new Date().getTime()); + }), 1234); +/* // You can also get the event based on its hash var value = orbit.channel(c1).get(hash2); console.log("key:", hash2, "value:", value); +*/ + // console.log("--> remove", hash2); + // orbit.channel(c1).remove({ key: hash2 }); - console.log("--> remove", hash2); - orbit.channel(c1).remove({ key: hash2 }); + // items = orbit.channel(c1).iterator({ limit: -1 }).collect(); + // console.log(JSON.stringify(items, null, 2)); - items = orbit.channel(c1).iterator({ limit: -1 }).collect(); - console.log(JSON.stringify(items, null, 2)); + // process.exit(0); } catch(e) { console.error("error:", e); diff --git a/src/OrbitClient.js b/src/OrbitClient.js index 19e8da5..d1c791e 100644 --- a/src/OrbitClient.js +++ b/src/OrbitClient.js @@ -27,6 +27,16 @@ class OrbitClient { channel(hash, password) { if(password === undefined) password = ''; + + this._pubsub.subscribe(hash, password, (channel, message) => { + const m = this._getMessages(hash, password, { gte: message }); + m.forEach((e) => { + const userData = await(ipfsAPI.getObject(this.ipfs, e.item.meta.from)) + const user = JSON.parse(userData.Data)["user"]; + console.log(`${user}>`, e.item.Payload, `(op: ${e.item.op}, ${e.item.key})`); + }); + }); + return { info: (options) => this._info(hash, password), delete: () => this._deleteChannel(hash, password), @@ -80,11 +90,13 @@ class OrbitClient { const key = options.key ? options.key : null; let startFromHash; - if(lt || lte) { + if(lte || lt) { startFromHash = lte ? lte : lt; + } else if (gte || gt) { + startFromHash = gte ? gte : gt; } else { // var channel = await (this.client.linkedList(channel, password).head()); - var channel = PubSub.latest(channel); + var channel = this._pubsub.latest(channel); startFromHash = channel.head ? channel.head : null; } @@ -128,7 +140,7 @@ class OrbitClient { // Get the current channel head and bump the sequence number let seq = 0; // const currentHead = await(this.client.linkedList(channel, password).head()) - const currentHead = PubSub.latest(channel); + const currentHead = this._pubsub.latest(channel); if(currentHead.head) { const headItem = await (ipfsAPI.getObject(this.ipfs, currentHead.head)); seq = JSON.parse(headItem.Data)["seq"] + 1; @@ -136,7 +148,7 @@ class OrbitClient { // Create meta info const size = -1; - const metaInfo = new MetaInfo(ItemTypes.Message, size, new Date().getTime()); + const metaInfo = new MetaInfo(ItemTypes.Message, size, this.user.id, new Date().getTime()); // Create the hash cache item const hcItem = new HashCacheItem(operation, key, seq, target, metaInfo, null, pubkey, privkey, password); @@ -174,13 +186,13 @@ class OrbitClient { _createOperation(channel, password, operation, key, value) { const message = this._createMessage(channel, password, operation, key, value); // await(this.client.linkedList(channel, password).add(message.Hash)); - PubSub.publish(channel, message.Hash) + this._pubsub.publish(channel, message.Hash) return message.Hash; } _deleteChannel(channel, password) { // await(this.client.linkedList(channel, password).delete()); - PubSub.delete(channel); + this._pubsub.delete(channel); return true; } @@ -196,12 +208,14 @@ class OrbitClient { _info(channel, password) { // return await(this.client.linkedList(channel, password).head()); - var l = PubSub.latest(channel); + var l = this._pubsub.latest(channel); return l; } _connect(host, username, password) { - this.client = await(HashCache.connect(host, username, password)); + this._pubsub = new PubSub(host, username, password); + // this.client = await(HashCache.connect(host, username, password)); + this.client = this._pubsub._client; this.user = this.client.info.user; this.network = { id: this.client.info.networkId, diff --git a/src/PubSub.js b/src/PubSub.js index ae113ff..37c97f3 100644 --- a/src/PubSub.js +++ b/src/PubSub.js @@ -1,39 +1,68 @@ 'use strict'; -let messages = {}; +var async = require('asyncawait/async'); +var await = require('asyncawait/await'); +var HashCache = require('./HashCacheClient'); class PubSub { - constructor() { + constructor(host, username, password) { + this._subscriptions = []; + this._messages = {}; + this._client = await(HashCache.connect(host, username, password)); + // Poll for the new head + setInterval(async(() => { + Object.keys(this._subscriptions).forEach(this._poll.bind(this)); + }), 500); } - static latest(hash) { - return { head: messages[hash] && messages[hash].length > 0 ? messages[hash][messages[hash].length - 1] : null, modes: {} }; + _poll(hash) { + const currentHead = this._subscriptions[hash].head; + const channel = await(this._client.linkedList(hash, this._subscriptions[hash].password).head()); + const newHead = channel.head; + if(currentHead !== newHead) { + // console.log("NEW HEAD!", newHead); + + this._subscriptions[hash].head = newHead; + + if(!this._messages[hash]) + this._messages[hash] = []; + + this._messages[hash].push(newHead); + + if(this._subscriptions[hash].callback) + this._subscriptions[hash].callback(hash, newHead); + } } - static publish(hash, message) { - if(!messages[hash]) messages[hash] = []; - messages[hash].push(message); + subscribe(channel, password, callback) { + if(!this._subscriptions[channel] || this._subscriptions[channel].password !== password) { + console.log("SUBSCRIBE:", channel); + this._subscriptions[channel] = { + channel: channel, + password: password, + head: null, + callback: callback + }; + } } - static delete(hash) { - messages[hash] = []; + unsubscribe(channel) { + delete this._subscriptions[channel]; + delete this._messages[channel]; } - onNewMessage(channel, message) { - /* - // From orbit-server: - var hash = req.params.hash; - var head = req.body.head; - if(!head) throw "Invalid request"; - var user = authorize(req, res); - var channel = await(Database.getChannel(hash)); - channel.authenticateRead(req.body.password); - var uid = await (ipfsAPI.putObject(ipfs, JSON.stringify(user.get()))); - channel.authenticateWrite(uid.Hash); - await(verifyMessage(head, channel)); - await(channel.updateHead(head)) - */ + publish(hash, message) { + if(!this._messages[hash]) this._messages[hash] = []; + await(this._client.linkedList(hash, this._subscriptions[hash].password).add(message)); + } + + latest(hash) { + return { head: this._messages[hash] && this._messages[hash].length > 0 ? this._messages[hash][this._messages[hash].length - 1] : null, modes: {} }; + } + + delete(hash) { + this._messages[hash] = []; } }