diff --git a/lib/wsp/Peer.js b/lib/wsp/Peer.js index 05f510b3..493ba8e7 100644 --- a/lib/wsp/Peer.js +++ b/lib/wsp/Peer.js @@ -1,10 +1,16 @@ +/* eslint-disable no-underscore-dangle */ +'use strict'; + var WebSocket = require('ws'); +var Emitter = require('events'); +var util = require('util'); /** * Calculates backoff instances. * @param {Object} [options] - Override the default settings. * @param {Object} options.time=50 - Initial backoff time. * @param {Object} options.factor=2 - How much to multiply the time by. + * @param {Object} options.max=1min - Maximum backoff time. * @class */ function Backoff (options) { @@ -19,7 +25,14 @@ function Backoff (options) { * @return {Number} - The next backoff time. */ Backoff.prototype.next = function () { - this.time *= this.factor; + var next = this.time * this.factor; + + if (next > this.max) { + this.time = this.max; + return this.max; + } + + this.time = next; return this.time; }; @@ -33,13 +46,29 @@ Backoff.prototype.reset = function () { this.time = options.time || 50; this.factor = options.factor || 2; + this.max = options.max || 1 * 60 * 1000; return this; }; /** - * Create a websocket client and handle reconnect backoff logic. - * @param {String} url - A preformatted url (starts with ws://) + * Schedules the next connection, according to the backoff. + * @param {Peer} peer - A peer instance. + * @return {Timeout} - The timeout value from `setTimeout`. + */ +function scheduleReconnect (peer) { + var backoff = peer.backoff; + var time = backoff.time; + backoff.next(); + + var reconnect = peer.connect.bind(peer); + + return setTimeout(reconnect, time); +} + +/** + * Handles reconnections and defers messages until the socket is ready. + * @param {String} url - The address to connect to. * @param {Object} [options] - Override how the socket is managed. * @param {Object} options.backoff - Backoff options (see the constructor). * @class @@ -49,14 +78,38 @@ function Peer (url, options) { return new Peer(url, options); } + // Extend EventEmitter. + Emitter.call(this); + this.setMaxListeners(Infinity); + this.options = options || {}; - // Messages sent while offline. - this.offline = []; + // Messages sent before the socket is ready. + this.deferredMsgs = []; this.url = Peer.formatURL(url); this.backoff = new Backoff(this.options.backoff); - this.retry(url); + + // Set up the websocket. + this.connect(); + + var peer = this; + var reconnect = scheduleReconnect.bind(null, peer); + + // Handle reconnection. + this.on('close', reconnect); + this.on('error', function (error) { + if (error.code === 'ECONNREFUSED') { + reconnect(); + } + }); + + // Send deferred messages. + this.on('open', function () { + peer.drainQueue(); + peer.backoff.reset(); + }); + } /** @@ -70,93 +123,48 @@ Peer.formatURL = function (url) { return url.replace('http', 'ws'); }; +util.inherits(Peer, Emitter); var API = Peer.prototype; /** * Attempts a websocket connection. - * @param {String} url - The websocket URL. * @return {WebSocket} - The new websocket instance. */ -API.retry = function () { +API.connect = function () { var url = this.url; + // Open a new websocket. var socket = new WebSocket(url); + + // Re-use the previous listeners. + socket._events = this._events; + this.socket = socket; - this.retryOnDisconnect(socket); - - this.sendOnConnection(); - return socket; }; /** - * Sends the messages that couldn't be sent before once - * the connection is open. + * Sends all the messages in the deferred queue. * @return {Peer} - The context. */ -API.sendOnConnection = function () { +API.drainQueue = function () { var peer = this; - var queue = this.offline; - var socket = this.socket; - // Wait for the socket to connect. - socket.once('open', function () { - queue.forEach(function (msg) { - socket.send(msg); - }); - - peer.offline = []; + this.deferredMsgs.forEach(function (msg) { + peer.send(msg); }); + // Reset the queue. + this.deferredMsgs = []; + return this; }; -/** - * Schedules the next retry, according to the backoff. - * @param {Peer} peer - A peer instance. - * @return {Timeout} - The timeout value from `setTimeout`. - */ -function schedule (peer) { - var backoff = peer.backoff; - var time = backoff.time; - backoff.next(); - - return setTimeout(function () { - var socket = peer.retry(); - - // Successfully reconnected? Reset the backoff. - socket.once('open', backoff.reset.bind(backoff)); - }, time); -} - -/** - * Attaches handlers to the socket, attempting reconnection - * when it's closed. - * @param {WebSocket} socket - The websocket instance to bind to. - * @return {WebSocket} - The same websocket. - */ -API.retryOnDisconnect = function (socket) { - var peer = this; - - // Listen for socket close events. - socket.once('close', function () { - schedule(peer); - }); - - socket.on('error', function (error) { - if (error.code === 'ECONNREFUSED') { - schedule(peer); - } - }); - - return socket; -}; - /** * Send data through the socket, or add it to a queue - * of offline requests if it's not ready yet. - * @param {String} msg - The data to send. + * of deferred messages if it's not ready yet. + * @param {Mixed} msg - String, or anything that JSON can handle. * @return {Peer} - The context. */ API.send = function (msg) { @@ -164,10 +172,16 @@ API.send = function (msg) { var state = socket.readyState; var ready = socket.OPEN; + // Make sure it's a string. + if (typeof msg !== 'string') { + msg = JSON.stringify(msg); + } + + // Make sure the socket is ready. if (state === ready) { socket.send(msg); } else { - this.offline.push(msg); + this.deferredMsgs.push(msg); } return this; diff --git a/lib/wsp/Pool.js b/lib/wsp/Pool.js new file mode 100644 index 00000000..542c0482 --- /dev/null +++ b/lib/wsp/Pool.js @@ -0,0 +1,101 @@ +'use strict'; + +/** + * Simpler interface over a collection of sockets. Works with + * WebSocket clients, or sockets from a WebSocket server. + * @class + */ +function Pool () { + if (!(this instanceof Pool)) { + return new Pool(); + } + + // Maps IDs to sockets. + this.sockets = {}; +} + +var API = Pool.prototype; + +/** + * Returns the socket by the given ID. + * @param {String} id - The unique socket ID. + * @return {WebSocket|Null} - The WebSocket, if found. + */ +API.get = function (id) { + return this.sockets[id] || null; +}; + +/** + * Adds a socket to the pool. + * @param {String} id - The socket ID. + * @param {WebSocket} socket - A websocket instance. + * @return {Pool} - The context. + */ +API.add = function (id, socket) { + this.sockets[id] = socket; + + return this; +}; + +/** + * Removes a socket from the pool. + * @param {String} id - The ID of the socket to remove. + * @return {Boolean} - Whether the pool contained the socket. + */ +API.remove = function (id) { + var sockets = this.sockets; + var hasSocket = sockets.hasOwnProperty(id); + + if (hasSocket) { + delete sockets[id]; + } + + return hasSocket; +}; + +/** + * Creates a filtered pool of sockets. Works the same as Array#filter. + * @param {Function} fn - Called for each socket in the pool. + * @param {Mixed} [_this] - The `this` context to use when invoking + * the callback. + * @return {Pool} - A new, filtered socket pool. + */ +API.filter = function (fn, _this) { + var filtered = Pool(); + var pool = this; + + _this = _this || pool; + + Object.keys(this.sockets).forEach(function (id) { + var socket = pool.sockets[id]; + + var shouldAdd = fn.call(_this, socket, id, pool); + + // Add it to the new pool. + if (shouldAdd) { + filtered.add(id, socket); + } + }); + + return filtered; +}; + +/** + * Send a message through each socket in the pool. + * @param {String} msg - The message to send. + * @return {Number} - How many sockets the message was sent to. + */ +API.send = function (msg) { + var pool = this; + + var ids = Object.keys(this.sockets); + + ids.forEach(function (id) { + var socket = pool.sockets[id]; + socket.send(msg); + }); + + return ids.length; +}; + +module.exports = Pool; diff --git a/lib/wsp/client.js b/lib/wsp/client.js index d39a53df..db8acfea 100644 --- a/lib/wsp/client.js +++ b/lib/wsp/client.js @@ -1,245 +1,65 @@ -/* eslint-env node*/ /* eslint-disable - require-jsdoc, no-warning-comments, no-underscore-dangle, - max-params, */ 'use strict'; var Gun = require('../../gun'); -var WS = require('ws'); +var Socket = require('./Peer'); +var Pool = require('./Pool'); +var duplicate = require('./duplicate'); -var Tab = {}; -Tab.on = Gun.on; -Tab.peers = (function () { +// Maps URLs to sockets. +// Shared between all gun instances. +var sockets = Pool(); +var emitter = { on: Gun.on }; +var server = { - function Peer (peers) { - if (!Peer.is(this)) { - return new Peer(peers); - } + // Session id. + sid: Gun.text.random(), - this.peers = peers; - } + // Request handlers. + handlers: [], - Peer.is = function (peer) { - return peer instanceof Peer; - }; - - function map (peer, url) { - var msg = this.msg; - var opt = this.opt || {}; - opt.out = true; - Peer.request(url, msg, null, opt); - } - - Peer.prototype.send = function (msg, opt) { - Peer.request.each(this.peers, map, { - msg: msg, - opt: opt, + // Call handlers. + handle: function (req, res) { + server.handlers.forEach(function (server) { + server(req, res); }); - }; + }, +}; - Peer.request = (function () { +/** + * Take a map of URLs pointing to options and ensure the + * urls are using the WS protocol. + * @param {Object} peers - Any object with URLs as keys. + * @return {Object} - Object with normalized URL keys. + */ +function normalizeURLs (peers) { + var formatted = {}; - function request (base, body, cb, opt) { + Object.keys(peers).forEach(function (url) { + var options = peers[url]; + var id = Socket.formatURL(url); + formatted[id] = options; + }); - var obj = base.length ? { base: base } : {}; - obj.base = opt.base || base; - obj.body = opt.body || body; - obj.headers = opt.headers; - obj.url = opt.url; - obj.out = opt.out; - cb = cb || function () {}; + return formatted; +} - if (!obj.base) { - return; - } +/** + * Turns a map of URLs into a socket pool. + * @param {Object} peers - Any object with URLs as keys. + * @return {Pool} - A pool of sockets corresponding to the URLs. + */ +function getSocketSubset (peers) { + var urls = normalizeURLs(peers); - request.transport(obj, cb); - } - - request.createServer = function (fn) { - request.createServer.list.push(fn); - }; - - request.createServer.ing = function (req, cb) { - var index = request.createServer.list.length; - var server; - while (index) { - index -= 1; - server = request.createServer.list[index] || function () {}; - server(req, cb); - } - }; - - request.createServer.list = []; - request.back = 2; - request.backoff = 2; - - request.transport = function (opt, cb) { - if (request.ws(opt, cb)) { - return; - } - }; - - request.ws = function (opt, cb, req) { - var ws; - if (!WS) { - return false; - } - - ws = request.ws.peers[opt.base]; - if (ws) { - req = req || {}; - if (opt.headers) { - req.headers = opt.headers; - } - if (opt.body) { - req.body = opt.body; - } - - if (opt.url) { - req.url = opt.url; - } - - req.headers = req.headers || {}; - - if (!opt.out && !ws.cbs[req.headers['ws-rid']]) { - var rid = 'WS' + - new Date().getTime() + - '.' + - Math.floor((Math.random() * 65535) + 1); - - req.headers['ws-rid'] = rid; - - ws.cbs[rid] = function (err, res) { - if (!res || res.body || res.end) { - delete ws.cbs[req.headers['ws-rid']]; - } - - cb(err, res); - }; - } - - if (!ws.readyState) { - setTimeout(function () { - request.ws(opt, cb, req); - }, 100); - - return true; - } - - ws.sending = true; - ws.send(JSON.stringify(req)); - return true; - } - - if (ws === false) { - return false; - } - - var wsURL = opt.base.replace('http', 'ws'); - - ws = request.ws.peers[opt.base] = new WS(wsURL); - ws.cbs = {}; - - ws.onopen = function () { - request.back = 2; - request.ws(opt, cb); - }; - - ws.onclose = function (event) { - - if (!ws || !event) { - return; - } - - if (ws.close instanceof Function) { - ws.close(); - } - - if (!ws.sending) { - ws = request.ws.peers[opt.base] = false; - request.transport(opt, cb); - return; - } - - request.each(ws.cbs, function (cb) { - cb({ - err: 'WebSocket disconnected!', - code: ws.sending ? (ws || {}).err || event.code : -1, - }); - }); - - // This will make the next request try to reconnect - ws = request.ws.peers[opt.base] = null; - - // TODO: Have the driver handle this! - setTimeout(function () { - - // opt here is a race condition, - // is it not? Does this matter? - request.ws(opt, function () {}); - }, request.back *= request.backoff); - }; - - ws.onmessage = function (msg) { - var res; - if (!msg || !msg.data) { - return; - } - try { - res = JSON.parse(msg.data); - } catch (error) { - return; - } - if (!res) { - return; - } - res.headers = res.headers || {}; - if (res.headers['ws-rid']) { - var cb = ws.cbs[res.headers['ws-rid']] || function () {}; - cb(null, res); - return; - } - - // emit extra events. - if (res.body) { - request.createServer.ing(res, function (res) { - res.out = true; - request(opt.base, null, null, res); - }); - } - }; - - ws.onerror = function (error) { - (ws || {}).err = error; - }; - - return true; - }; - request.ws.peers = {}; - request.ws.cbs = {}; - - request.each = function (obj, cb, as) { - if (!obj || !cb) { - return; - } - - for (var key in obj) { - if (obj.hasOwnProperty(key)) { - cb.call(as, obj[key], key); - } - } - }; - - return request; - }()); - - return Peer; -}()); + return sockets.filter(function (socket) { + return urls.hasOwnProperty(socket.url); + }); +} // Handle read requests. Gun.on('get', function (at) { @@ -248,17 +68,20 @@ Gun.on('get', function (at) { var peers = opt.peers || gun.Back('opt.peers'); if (!peers || Gun.obj.empty(peers)) { - Gun.log.once('peers', 'Warning! You have no peers to connect to!'); - at.gun.Back(-1).on('in', {'@': at['#']}); + at.gun.Back(Infinity).on('in', { + '@': at['#'], + }); return; } + var id = at['#'] || Gun.text.random(9); + // Create a new message. var msg = { // msg ID - '#': at['#'] || Gun.text.random(9), + '#': id, // msg BODY '$': at.get, @@ -266,7 +89,7 @@ Gun.on('get', function (at) { // Listen for a response. // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? - Tab.on(msg['#'], function (err, data) { + emitter.on(id, function (err, data) { var obj = { '@': at['#'], err: err, @@ -280,11 +103,12 @@ Gun.on('get', function (at) { } }); - // Broadcast to all other peers. - Tab.peers(peers).send(msg, { - headers: { - 'gun-sid': Tab.server.sid, - }, + var subset = getSocketSubset(peers); + + // Broadcast to the connected peers. + subset.send({ + headers: { 'gun-sid': server.sid }, + body: msg, }); }); @@ -293,26 +117,40 @@ Gun.on('put', function (at) { if (at['@']) { return; } - var opt = at.gun.Back('opt') || {}, peers = opt.peers; + + var peers = at.gun.Back('opt.peers'); + var enabled = at.gun.Back('opt.websocket'); + var options = at.opt || {}; + if (!peers || Gun.obj.empty(peers)) { - Gun.log.once('peers', 'Warning! You have no peers to save to!'); - at.gun.Back(-1).on('in', {'@': at['#']}); + + // TODO: What about wsp/server clients? Maybe we shouldn't + // immediately assume there's no data to be found. + at.gun.Back(-1).on('in', { + '@': at['#'], + }); + return; } - if (opt.websocket === false || (at.opt && at.opt.websocket === false)) { + + if (options.websocket === false || enabled === false) { return; } + + var id = at['#'] || Gun.text.random(9); + var msg = { - // msg ID - '#': at['#'] || Gun.text.random(9), + // Message ID. + '#': id, - // msg BODY + // Message body. '$': at.put, }; // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? - Tab.on(msg['#'], function (err, ok) { + // Listen for acknowledgement(s). + Gun.on(id, function (err, ok) { at.gun.Back(-1).on('in', { '@': at['#'], err: err, @@ -320,28 +158,157 @@ Gun.on('put', function (at) { }); }); - Tab.peers(peers).send(msg, { - headers: { - 'gun-sid': Tab.server.sid, - }, + var subset = getSocketSubset(peers); + + subset.send({ + headers: { 'gun-sid': server.sid }, + body: msg, + }); +}); + +// Open any new sockets listed, +// adding them to the global pool. +Gun.on('opt', function (context) { + var gun = context.gun; + + var peers = gun.Back('opt.peers') || {}; + + Gun.obj.map(peers, function (options, url) { + if (sockets[url]) { + return; + } + + var socket = Socket(url, options); + sockets.add(url, socket); + + /** + * Handle responses to requests, adding default headers. + * @param {Object} reply - A gun reply object. + * @return {undefined} + */ + function respond (reply) { + + // Make sure headers are defined. + var headers = reply.headers = reply.headers || {}; + + // Add 'gun-sid' if it doesn't exist. + headers['gun-sid'] = headers['gun-sid'] || server.sid; + + socket.send(reply); + } + + socket.on('message', function (msg) { + var request; + + try { + request = JSON.parse(msg); + } catch (error) { + return; + } + + // Validate the request. + if (!request || !request.body) { + return; + } + + request.headers = request.headers || {}; + + // emit extra events. + server.handle(request, respond); + }); }); }); -// REVIEW: Do I need this on a server client? -// browser/client side Server! -// TODO: BUG! Does not respect separate instances!!! Gun.on('opt', function (at) { - if (Tab.server) { + var gun = at.gun; + var root = gun.Back(Infinity); + var options = (root._.opt = root._.opt || {}); + + // Only register once per gun instance. + if (options['@client']) { return; } - var gun = at.gun; - var server = Tab.server = Tab.server || {}; - var tmp; + var driver = options['@client'] = { - server.sid = Gun.text.random(); + /** + * Handles get requests sent from other peers. + * @param {Object} req - The get request. + * @param {Function} cb - Handles replies. + * @return {undefined} + */ + get: function (req, cb) { + var body = req.body; + var lex = body.$; + var graph = gun._.root._.graph; + var node = graph[lex['#']]; - Tab.peers.request.createServer(function (req, res) { + // TODO: Reply even if it's not in memory. + if (!node) { + return; + } + + cb({ + body: { + '#': duplicate.track.newID(), + '@': body['#'], + '$': node, + }, + }); + }, + + /** + * Handles put requests sent from other peers. + * @param {Object} req - The put request. + * @param {Function} cb - A response callback. + * @return {undefined} + */ + put: function (req, cb) { + var body = req.body; + var graph = body.$; + + // Cached gun paths. + var path = gun._.root._.path || {}; + + graph = Gun.obj.map(graph, function (node, soul, map) { + if (!path[soul]) { + return; + } + map(soul, node); + }); + + // filter out what we don't have in memory. + if (!graph) { + return; + } + + var id = Gun.on.ask(function (ack, event) { + if (!ack) { + return; + } + + event.off(); + + cb({ + body: { + '#': duplicate.track.newID(), + '@': body['#'], + '$': ack, + '!': ack.err, + }, + }); + }); + + gun.on('out', { + '#': duplicate.track(id), + gun: gun, + opt: { websocket: false }, + put: graph, + }); + }, + }; + + server.handlers.push(function (req, res) { // Validate request. if (!req || !res || !req.body || !req.headers) { @@ -350,111 +317,26 @@ Gun.on('opt', function (at) { var msg = req.body; - // AUTH for non-replies. - if (server.msg(msg['#'])) { + if (duplicate(msg['#'])) { return; } - // no need to process. + // It's a response, no need to reply. if (msg['@']) { - if (Tab.ons[tmp = msg['@'] || msg['#']]) { - Tab.on(tmp, [msg['!'], msg.$]); - } + var reqID = msg['@']; + + emitter.on(reqID, [ + msg['!'], + msg.$, + ]); + return; } - if (msg.$ && msg.$['#']) { - server.get(req, res); - return; - } + var isLex = msg.$ && msg.$['#']; + var method = isLex ? 'get' : 'put'; - server.put(req, res); + driver[method](req, res); }); - server.get = function (req, cb) { - var body = req.body; - var lex = body.$; - var graph = gun._.root._.graph; - var node; - - // Don't reply to data we don't have it in memory. - // TODO: Add localStorage? - if (!(node = graph[lex['#']])) { - return; - } - - cb({ - body: { - '#': server.msg(), - '@': body['#'], - '$': node, - }, - }); - }; - - server.put = function (req, cb) { - var body = req.body, graph = body.$; - var __ = gun._.root._; - - // filter out what we don't have in memory. - if (!(graph = Gun.obj.map(graph, function (node, soul, map) { - if (!__.path[soul]) { - return; - } - map(soul, node); - }))) { - return; - } - gun.on('out', { - gun: gun, - opt: { - websocket: false, - }, - put: graph, - '#': Gun.on.ask(function (ack, ev) { - if (!ack) { - return undefined; - } - ev.off(); - return cb({ - body: { - '#': server.msg(), - '@': body['#'], - '$': ack, - '!': ack.err, - }, - }); - }), - }); - }; - - server.msg = function (id) { - if (!id) { - id = Gun.text.random(9); - server.msg.debounce[id] = Gun.time.is(); - return id; - } - - clearTimeout(server.msg.clear); - server.msg.clear = setTimeout(function () { - var now = Gun.time.is(); - Gun.obj.map(server.msg.debounce, function (time, id) { - if ((now - time) < (1000 * 60 * 5)) { - return; - } - - Gun.obj.del(server.msg.debounce, id); - }); - }, 500); - - if (server.msg.debounce[id]) { - server.msg.debounce[id] = Gun.time.is(); - return id; - } - - server.msg.debounce[id] = Gun.time.is(); - return undefined; - }; - - server.msg.debounce = server.msg.debounce || {}; }); diff --git a/lib/wsp/duplicate.js b/lib/wsp/duplicate.js new file mode 100644 index 00000000..9080fee7 --- /dev/null +++ b/lib/wsp/duplicate.js @@ -0,0 +1,90 @@ +'use strict'; + +var Gun = require('../../gun'); + +var cache = {}; +var timeout = null; + +/** + * Remove all entries in the cache older than 5 minutes. + * Reschedules itself to run again when the oldest item + * might be too old. + * @return {undefined} + */ +function gc () { + var now = Date.now(); + var oldest = now; + var maxAge = 5 * 60 * 1000; + + Gun.obj.map(cache, function (time, id) { + oldest = Math.min(now, time); + + if ((now - time) < maxAge) { + return; + } + + delete cache[id]; + }); + + var done = Gun.obj.empty(cache); + + // Disengage GC. + if (done) { + timeout = null; + return; + } + + // Just how old? + var elapsed = now - oldest; + + // How long before it's too old? + var nextGC = maxAge - elapsed; + + // Schedule the next GC event. + timeout = setTimeout(gc, nextGC); +} + +/** + * Checks a memory-efficient cache to see if a string has been seen before. + * @param {String} id - A string to keep track of. + * @return {Boolean} - Whether it's been seen recently. + */ +function duplicate (id) { + + // Have we seen this ID recently? + var existing = cache.hasOwnProperty(id); + + // Add it to the cache. + duplicate.track(id); + + return existing; +} + +/** + * Starts tracking an ID as a possible future duplicate. + * @param {String} id - The ID to track. + * @return {String} - The same ID. + */ +duplicate.track = function (id) { + cache[id] = Date.now(); + + // Engage GC. + if (!timeout) { + gc(); + } + + return id; +}; + +/** + * Generate a new ID and start tracking it. + * @param {Number} [chars] - The number of characters to use. + * @return {String} - The newly created ID. + */ +duplicate.track.newID = function (chars) { + var id = Gun.text.random(chars); + + return duplicate.track(id); +}; + +module.exports = duplicate; diff --git a/lib/wsp/server-push.js b/lib/wsp/server-push.js index e4c77f7e..55093ff6 100644 --- a/lib/wsp/server-push.js +++ b/lib/wsp/server-push.js @@ -45,12 +45,19 @@ function ready (socket, cb) { * @return {undefined} */ function request (context, clients, cb) { + var id = context['#'] || Gun.text.random(9); + + Gun.on(id, function (err, data, event) { + cb(err, data); + event.off(); + }); + Gun.obj.map(clients, function (client) { ready(client, function () { var msg = { headers: {}, body: { - '#': Gun.on.ask(cb), + '#': id, '$': context.get, }, }; @@ -69,12 +76,19 @@ function request (context, clients, cb) { * @return {undefined} */ function update (context, clients, cb) { + var id = context['#'] || Gun.text.random(9); + + Gun.on(id, function (err, data, event) { + cb(err, data); + event.off(); + }); + Gun.obj.map(clients, function (client) { ready(client, function () { var msg = { headers: {}, body: { - '#': Gun.on.ask(cb), + '#': id, '$': context.put, }, }; @@ -125,6 +139,7 @@ function attach (gun, server) { if (!isUsingServer(context.gun, server)) { return; } + request(context, pool, function (err, data) { var response = { '@': context['#'], @@ -143,6 +158,10 @@ function attach (gun, server) { return; } + if (context.nopush) { + return; + } + update(context, pool, function (err, data) { var ack = { '!': err || null, diff --git a/lib/wsp/server.js b/lib/wsp/server.js index b4e6fd16..c3fa0385 100644 --- a/lib/wsp/server.js +++ b/lib/wsp/server.js @@ -152,8 +152,24 @@ Gun.on('opt', function(at){ // TODO: BUG! server put should push. } tran.get = function(req, cb){ - var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; - gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){ + var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}; + + var graph = gun.Back(Infinity)._.graph; + var node = graph[lex['#']]; + var result = Gun.graph.ify(node); + + if (node) { + return cb({ + headers: reply.headers, + body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': result, + }, + }); + } + + gun.on('out', {gun: gun, get: lex, req: 1, '#': body['#'] || Gun.on.ask(function(at, ev){ ev.off(); var graph = at.put; return cb({headers: reply.headers, body: {