From adbea081204c1ebdbda6ff59fa4e72a8826706e9 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Wed, 26 Oct 2016 14:52:28 -0600 Subject: [PATCH 01/24] Add basic websocket logic Servers will now try to initiate a connection using websockets if the `peers` option is set. Currently, it'll either start throwing errors, or generate a broadcasting storm. Still work to be done... This marks a milestone of getting the servers to connect to each other. Now to have those messages make sense. Committing so I have an easy rollback point. --- lib/wsp-client.js | 460 ++++++++++++++++++++++++++++++++++++++++++++++ lib/wsp.js | 412 ++++++++++++++++++----------------------- 2 files changed, 643 insertions(+), 229 deletions(-) create mode 100644 lib/wsp-client.js diff --git a/lib/wsp-client.js b/lib/wsp-client.js new file mode 100644 index 00000000..8640d917 --- /dev/null +++ b/lib/wsp-client.js @@ -0,0 +1,460 @@ +/* eslint-env node*/ +/* + eslint-disable + require-jsdoc, + no-warning-comments, + no-underscore-dangle, + max-params, +*/ +'use strict'; + +var Gun = require('../gun'); +var WS = require('ws'); + +var Tab = {}; +Tab.on = Gun.on; +Tab.peers = (function () { + + function Peer (peers) { + if (!Peer.is(this)) { + return new Peer(peers); + } + + this.peers = peers; + } + + Peer.is = function (peer) { + return peer instanceof Peer; + }; + + function map (peer, url) { + var msg = this.msg; + var opt = this.opt || {}; + opt.out = true; + Peer.request(url, msg, null, opt); + } + + Peer.prototype.send = function (msg, opt) { + Peer.request.each(this.peers, map, { + msg: msg, + opt: opt, + }); + }; + + Peer.request = (function () { + + function request (base, body, cb, opt) { + + var obj = base.length ? { base: base } : {}; + obj.base = opt.base || base; + obj.body = opt.body || body; + obj.headers = opt.headers; + obj.url = opt.url; + obj.out = opt.out; + cb = cb || function () {}; + + if (!obj.base) { + return; + } + + request.transport(obj, cb); + } + + request.createServer = function (fn) { + request.createServer.list.push(fn); + }; + + request.createServer.ing = function (req, cb) { + var index = request.createServer.list.length; + var server; + while (index) { + index -= 1; + server = request.createServer.list[index] || function () {}; + server(req, cb); + } + }; + + request.createServer.list = []; + request.back = 2; + request.backoff = 2; + + request.transport = function (opt, cb) { + if (request.ws(opt, cb)) { + return; + } + }; + + request.ws = function (opt, cb, req) { + var ws; + if (!WS) { + return false; + } + + ws = request.ws.peers[opt.base]; + if (ws) { + req = req || {}; + if (opt.headers) { + req.headers = opt.headers; + } + if (opt.body) { + req.body = opt.body; + } + + if (opt.url) { + req.url = opt.url; + } + + req.headers = req.headers || {}; + + if (!opt.out && !ws.cbs[req.headers['ws-rid']]) { + var rid = 'WS' + + new Date().getTime() + + '.' + + Math.floor((Math.random() * 65535) + 1); + + req.headers['ws-rid'] = rid; + + ws.cbs[rid] = function (err, res) { + if (!res || res.body || res.end) { + delete ws.cbs[req.headers['ws-rid']]; + } + + cb(err, res); + }; + } + + if (!ws.readyState) { + setTimeout(function () { + request.ws(opt, cb, req); + }, 100); + + return true; + } + + ws.sending = true; + ws.send(JSON.stringify(req)); + return true; + } + + if (ws === false) { + return false; + } + + var wsURL = opt.base.replace('http', 'ws'); + + ws = request.ws.peers[opt.base] = new WS(wsURL); + ws.cbs = {}; + + ws.onopen = function () { + request.back = 2; + request.ws(opt, cb); + }; + + ws.onclose = function (event) { + + if (!ws || !event) { + return; + } + + if (ws.close instanceof Function) { + ws.close(); + } + + if (!ws.sending) { + ws = request.ws.peers[opt.base] = false; + request.transport(opt, cb); + return; + } + + request.each(ws.cbs, function (cb) { + cb({ + err: 'WebSocket disconnected!', + code: ws.sending ? (ws || {}).err || event.code : -1, + }); + }); + + // This will make the next request try to reconnect + ws = request.ws.peers[opt.base] = null; + + // TODO: Have the driver handle this! + setTimeout(function () { + + // opt here is a race condition, + // is it not? Does this matter? + request.ws(opt, function () {}); + }, request.back *= request.backoff); + }; + + ws.onmessage = function (msg) { + var res; + if (!msg || !msg.data) { + return; + } + try { + res = JSON.parse(msg.data); + } catch (error) { + return; + } + if (!res) { + return; + } + res.headers = res.headers || {}; + if (res.headers['ws-rid']) { + var cb = ws.cbs[res.headers['ws-rid']] || function () {}; + cb(null, res); + return; + } + + // emit extra events. + if (res.body) { + request.createServer.ing(res, function (res) { + res.out = true; + request(opt.base, null, null, res); + }); + } + }; + + ws.onerror = function (error) { + (ws || {}).err = error; + }; + + return true; + }; + request.ws.peers = {}; + request.ws.cbs = {}; + + request.each = function (obj, cb, as) { + if (!obj || !cb) { + return; + } + + for (var key in obj) { + if (obj.hasOwnProperty(key)) { + cb.call(as, obj[key], key); + } + } + }; + + return request; + }()); + + return Peer; +}()); + +// Handle read requests. +Gun.on('get', function (at) { + var gun = at.gun; + var opt = at.opt || {}; + var peers = opt.peers || gun.Back('opt.peers'); + + if (!peers || Gun.obj.empty(peers)) { + Gun.log.once('peers', 'Warning! You have no peers to connect to!'); + at.gun.Back(-1).on('in', {'@': at['#']}); + + return; + } + + // Create a new message. + var msg = { + + // msg ID + '#': at['#'] || Gun.text.random(9), + + // msg BODY + '$': at.get, + }; + + // Listen for a response. + // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? + Tab.on(msg['#'], function (err, data) { + var obj = { + '@': at['#'], + err: err, + put: data, + }; + + if (data) { + at.gun.Back(-1).on('out', obj); + } else { + at.gun.Back(-1).on('in', obj); + } + }); + + // Broadcast to all other peers. + Tab.peers(peers).send(msg, { + headers: { + 'gun-sid': Tab.server.sid, + }, + }); +}); + +// Handle write requests. +Gun.on('put', function (at) { + if (at['@']) { + return; + } + var opt = at.gun.Back('opt') || {}, peers = opt.peers; + if (!peers || Gun.obj.empty(peers)) { + Gun.log.once('peers', 'Warning! You have no peers to save to!'); + at.gun.Back(-1).on('in', {'@': at['#']}); + return; + } + if (opt.websocket === false || (at.opt && at.opt.websocket === false)) { + return; + } + var msg = { + + // msg ID + '#': at['#'] || Gun.text.random(9), + + // msg BODY + '$': at.put, + }; + + // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? + Tab.on(msg['#'], function (err, ok) { + at.gun.Back(-1).on('in', { + '@': at['#'], + err: err, + ok: ok, + }); + }); + + Tab.peers(peers).send(msg, { + headers: { + 'gun-sid': Tab.server.sid, + }, + }); +}); + +// REVIEW: Do I need this on a server client? +// browser/client side Server! +// TODO: BUG! Does not respect separate instances!!! +Gun.on('opt', function (at) { + if (Tab.server) { + return; + } + + var gun = at.gun; + var server = Tab.server = Tab.server || {}; + var tmp; + + server.sid = Gun.text.random(); + + Tab.peers.request.createServer(function (req, res) { + + // Validate request. + if (!req || !res || !req.body || !req.headers) { + return; + } + + var msg = req.body; + + // AUTH for non-replies. + if (server.msg(msg['#'])) { + return; + } + + // no need to process. + if (msg['@']) { + if (Tab.ons[tmp = msg['@'] || msg['#']]) { + Tab.on(tmp, [msg['!'], msg.$]); + } + return; + } + + if (msg.$ && msg.$['#']) { + server.get(req, res); + return; + } + + server.put(req, res); + }); + + server.get = function (req, cb) { + var body = req.body; + var lex = body.$; + var graph = gun._.root._.graph; + var node; + + // Don't reply to data we don't have it in memory. + // TODO: Add localStorage? + if (!(node = graph[lex['#']])) { + return; + } + + cb({ + body: { + '#': server.msg(), + '@': body['#'], + '$': node, + }, + }); + }; + + server.put = function (req, cb) { + var body = req.body, graph = body.$; + var __ = gun._.root._; + + // filter out what we don't have in memory. + if (!(graph = Gun.obj.map(graph, function (node, soul, map) { + if (!__.path[soul]) { + return; + } + map(soul, node); + }))) { + return; + } + gun.on('out', { + gun: gun, + opt: { + websocket: false, + }, + put: graph, + '#': Gun.on.ask(function (ack, ev) { + if (!ack) { + return undefined; + } + ev.off(); + return cb({ + body: { + '#': server.msg(), + '@': body['#'], + '$': ack, + '!': ack.err, + }, + }); + }), + }); + }; + + server.msg = function (id) { + if (!id) { + id = Gun.text.random(9); + server.msg.debounce[id] = Gun.time.is(); + return id; + } + + clearTimeout(server.msg.clear); + server.msg.clear = setTimeout(function () { + var now = Gun.time.is(); + Gun.obj.map(server.msg.debounce, function (time, id) { + if ((now - time) < (1000 * 60 * 5)) { + return; + } + + Gun.obj.del(server.msg.debounce, id); + }); + }, 500); + + if (server.msg.debounce[id]) { + server.msg.debounce[id] = Gun.time.is(); + return id; + } + + server.msg.debounce[id] = Gun.time.is(); + return undefined; + }; + + server.msg.debounce = server.msg.debounce || {}; +}); diff --git a/lib/wsp.js b/lib/wsp.js index 7768fa86..7ac4bbac 100644 --- a/lib/wsp.js +++ b/lib/wsp.js @@ -1,230 +1,184 @@ -;(function(wsp){ - /* - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - */ - var Gun = require('../gun') - , formidable = require('formidable') - , ws = require('ws').Server - , http = require('./http') - , url = require('url'); - Gun.on('opt', function(at){ - var gun = at.gun, opt = at.opt; - gun.__ = at.root._; - gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {}; - function start(server, port, app){ - if(app && app.use){ app.use(gun.wsp.server) } - server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server; - require('./ws')(gun.wsp.ws = gun.wsp.ws || new ws(gun.__.opt.ws), function(req, res){ - var ws = this; - req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid']; - ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){ - if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() } - if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return } - if(msg && msg.headers){ delete msg.headers['ws-rid'] } - // TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id? - try{ws.send(Gun.text.ify(msg)); - }catch(e){} // juuuust in case. - }); - gun.wsp.wire(req, res); - }, {headers: {'ws-rid': 1, 'gun-sid': 1}}); - gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80; - } - var wsp = gun.wsp = gun.wsp || function(server){ - if(!server){ return gun } - if(Gun.fns.is(server.address)){ - if(server.address()){ - start(server, server.address().port); - return gun; - } - } - if(Gun.fns.is(server.get) && server.get('port')){ - start(server, server.get('port')); - return gun; - } - var listen = server.listen; - server.listen = function(port){ - var serve = listen.apply(server, arguments); - start(serve, port, server); - return serve; - } - return gun; - } - gun.wsp.on = gun.wsp.on || Gun.on; - gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i; - gun.wsp.poll = gun.wsp.poll || opt.poll || 1; - gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000; - gun.wsp.server = gun.wsp.server || function(req, res, next){ // http - next = next || function(){}; - if(!req || !res){ return next(), false } - if(!req.url){ return next(), false } - if(!req.method){ return next(), false } - var msg = {}; - msg.url = url.parse(req.url, true); - if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail! - if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){ - res.writeHead(200, {'Content-Type': 'text/javascript'}); - res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../gun.js')); // gun server is caching the gun library for the client - return true; +/* eslint-disable*/ +var Gun = require('../gun') +, formidable = require('formidable') +, http = require('./http') +, url = require('url') +, wsp = {} +, WS = require('ws') +, ws = WS.Server; + +// Handles server to server sync. +require('./wsp-client.js'); + +Gun.on('opt', function(at){ + var gun = at.gun, opt = at.opt; + gun.__ = at.root._; + gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {}; + + function start(server, port, app){ + if(app && app.use){ app.use(gun.wsp.server) } + server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server; + require('./ws')(gun.wsp.ws = gun.wsp.ws || new ws(gun.__.opt.ws), function(req, res){ + var ws = this; + req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid']; + ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){ + if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() } + if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return } + if(msg && msg.headers){ delete msg.headers['ws-rid'] } + // TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id? + try{ws.send(Gun.text.ify(msg)); + }catch(e){} // juuuust in case. + }); + gun.wsp.wire(req, res); + }, {headers: {'ws-rid': 1, 'gun-sid': 1}}); + gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80; + } + var wsp = gun.wsp = gun.wsp || function(server){ + if(!server){ return gun } + if(Gun.fns.is(server.address)){ + if(server.address()){ + start(server, server.address().port); + return gun; } - if(!req.upgrade){ return next(), false } - return http(req, res, function(req, res){ - if(!req){ return next() } - var stream, cb = res = require('./jsonp')(req, res); - if(req.headers && (stream = req.headers['gun-sid'])){ - stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream}; - stream.drain = stream.drain || function(res){ - if(!res || !stream || !stream.queue || !stream.queue.length){ return } - res({headers: {'gun-sid': stream.sid}, body: stream.queue }); - stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull); - stream.reply = stream.queue = null; - return true; - } - stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){ - if(!stream){ return ev.off() } // self cleans up after itself! - if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return } - (stream.queue = stream.queue || []).push(req); - stream.drain(stream.reply); - }); - cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) } - clearTimeout(stream.off); - if(req.headers.pull){ - if(stream.drain(cb)){ return } - return stream.reply = cb; - } - } - gun.wsp.wire(req, cb); - }), true; - } - if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){ - require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity; - } - gun.wsp.msg = gun.wsp.msg || function(id){ - if(!id){ - return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id; - } - clearTimeout(gun.wsp.msg.clear); - gun.wsp.msg.clear = setTimeout(function(){ - var now = Gun.time.is(); - Gun.obj.map(gun.wsp.msg.debounce, function(t,id){ - if((now - t) < (1000 * 60 * 5)){ return } - Gun.obj.del(gun.wsp.msg.debounce, id); - }); - },500); - if(id = gun.wsp.msg.debounce[id]){ - return gun.wsp.msg.debounce[id] = Gun.time.is(), id; - } - gun.wsp.msg.debounce[id] = Gun.time.is(); - return; - }; - gun.wsp.msg.debounce = gun.wsp.msg.debounce || {}; - gun.wsp.wire = gun.wsp.wire || (function(){ - // all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers - // except for the ones that are listed in the message as having already been sending to. - // all states, implemented with GET, are replied to the source that asked for it. - function tran(req, res){ - if(!req || !res || !req.body || !req.headers){ return } - if(req.url){ req.url = url.format(req.url) } - var msg = req.body; - // AUTH for non-replies. - if(gun.wsp.msg(msg['#'])){ return } - gun.wsp.on('network', Gun.obj.copy(req)); - if(msg['@']){ return } // no need to process. - if(msg['$'] && msg['$']['#']){ return tran.get(req, res) } - //if(Gun.is.lex(msg['$'])){ return tran.get(req, res) } - else { return tran.put(req, res) } - cb({body: {hello: 'world'}}); - // TODO: BUG! server put should push. - } - tran.get = function(req, cb){ - var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; - gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){ - ev.off(); - var graph = at.put; - return cb({headers: reply.headers, body: { - '#': gun.wsp.msg(), - '@': body['#'], - '$': graph, - '!': at.err - }}); - return; - if(Gun.obj.empty(node)){ - return cb({headers: reply.headers, body: node}); - } // we're out of stuff! - /* - (function(chunks){ // FEATURE! Stream chunks if the nodes are large! - var max = 10, count = 0, soul = Gun.is.node.soul(node); - if(Object.keys(node).length > max){ - var n = Gun.is.node.soul.ify({}, soul); - Gun.obj.map(node, function(val, field){ - if(!(++count % max)){ - cb({headers: reply.headers, chunk: n}); // send node chunks - n = Gun.is.node.soul.ify({}, soul); - } - Gun.is.node.state.ify([n, node], field, val); - }); - if(count % max){ // finish off the last chunk - cb({headers: reply.headers, chunk: n}); - } - } else { - cb({headers: reply.headers, chunk: node}); // send full node - } - }([])); - */ - cb({headers: reply.headers, chunk: node }); // Use this if you don't want streaming chunks feature. - })}); - } - tran.put = function(req, cb){ - //console.log("tran.put", req); - // NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually. - // This will give you much more fine-grain control over security, transactions, and what not. - var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; - gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ - //Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ - ev.off(); - return cb({headers: reply.headers, body: { - '#': gun.wsp.msg(), - '@': body['#'], - '$': ack, - '!': ack.err - }}); - })}); - return; - if(Gun.is.graph(req.body)){ - if(req.err = Gun.union(gun, req.body, function(err, ctx){ // TODO: BUG? Probably should give me ctx.graph - if(err){ return cb({headers: reply.headers, body: {err: err || "Union failed."}}) } - var ctx = ctx || {}; ctx.graph = {}; - Gun.is.graph(req.body, function(node, soul){ - ctx.graph[soul] = gun.__.graph[soul]; - }); - (gun.__.opt.wire.put || function(g,cb){cb("No save.")})(ctx.graph, function(err, ok){ - if(err){ return cb({headers: reply.headers, body: {err: err || "Failed."}}) } // TODO: err should already be an error object? - cb({headers: reply.headers, body: {ok: ok || "Persisted."}}); - //console.log("tran.put <------------------------", ok); - }); - }).err){ cb({headers: reply.headers, body: {err: req.err || "Union failed."}}) } - } else { - cb({headers: reply.headers, body: {err: "Not a valid graph!"}}); - } - } - gun.wsp.on('network', function(req){ - // TODO: MARK! You should move the networking events to here, not in WSS only. - }); - tran.json = 'application/json'; - return tran; - }()); - if(opt.server){ - wsp(opt.server); - } - }); -}({})); + } + if(Gun.fns.is(server.get) && server.get('port')){ + start(server, server.get('port')); + return gun; + } + var listen = server.listen; + server.listen = function(port){ + var serve = listen.apply(server, arguments); + start(serve, port, server); + return serve; + } + return gun; + } + gun.wsp.on = gun.wsp.on || Gun.on; + gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i; + gun.wsp.poll = gun.wsp.poll || opt.poll || 1; + gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000; + gun.wsp.server = gun.wsp.server || function(req, res, next){ // http + next = next || function(){}; + if(!req || !res){ return next(), false } + if(!req.url){ return next(), false } + if(!req.method){ return next(), false } + var msg = {}; + msg.url = url.parse(req.url, true); + if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail! + if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){ + res.writeHead(200, {'Content-Type': 'text/javascript'}); + res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../gun.js')); // gun server is caching the gun library for the client + return true; + } + + if(!req.upgrade){ + next(); + return false; + } + + return http(req, res, function(req, res){ + if(!req){ return next() } + var stream, cb = res = require('./jsonp')(req, res); + if(req.headers && (stream = req.headers['gun-sid'])){ + stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream}; + stream.drain = stream.drain || function(res){ + if(!res || !stream || !stream.queue || !stream.queue.length){ return } + res({headers: {'gun-sid': stream.sid}, body: stream.queue }); + stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull); + stream.reply = stream.queue = null; + return true; + } + stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){ + if(!stream){ return ev.off() } // self cleans up after itself! + if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return } + (stream.queue = stream.queue || []).push(req); + stream.drain(stream.reply); + }); + cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) } + clearTimeout(stream.off); + if(req.headers.pull){ + if(stream.drain(cb)){ return } + return stream.reply = cb; + } + } + gun.wsp.wire(req, cb); + }), true; + } + if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){ + require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity; + } + gun.wsp.msg = gun.wsp.msg || function(id){ + if(!id){ + return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id; + } + clearTimeout(gun.wsp.msg.clear); + gun.wsp.msg.clear = setTimeout(function(){ + var now = Gun.time.is(); + Gun.obj.map(gun.wsp.msg.debounce, function(t,id){ + if((now - t) < (1000 * 60 * 5)){ return } + Gun.obj.del(gun.wsp.msg.debounce, id); + }); + },500); + if(id = gun.wsp.msg.debounce[id]){ + return gun.wsp.msg.debounce[id] = Gun.time.is(), id; + } + gun.wsp.msg.debounce[id] = Gun.time.is(); + return; + }; + gun.wsp.msg.debounce = gun.wsp.msg.debounce || {}; + gun.wsp.wire = gun.wsp.wire || (function(){ + // all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers + // except for the ones that are listed in the message as having already been sending to. + // all states, implemented with GET, are replied to the source that asked for it. + function tran(req, res){ + if(!req || !res || !req.body || !req.headers){ return } + if(req.url){ req.url = url.format(req.url) } + var msg = req.body; + // AUTH for non-replies. + if(gun.wsp.msg(msg['#'])){ return } + gun.wsp.on('network', Gun.obj.copy(req)); + if(msg['@']){ return } // no need to process. + if(msg['$'] && msg['$']['#']){ return tran.get(req, res) } + //if(Gun.is.lex(msg['$'])){ return tran.get(req, res) } + else { return tran.put(req, res) } + cb({body: {hello: 'world'}}); + // TODO: BUG! server put should push. + } + tran.get = function(req, cb){ + var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; + gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){ + ev.off(); + var graph = at.put; + return cb({headers: reply.headers, body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': graph, + '!': at.err + }}); + })}); + } + tran.put = function(req, cb){ + // NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually. + // This will give you much more fine-grain control over security, transactions, and what not. + var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; + gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ + //Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ + ev.off(); + return cb({headers: reply.headers, body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': ack, + '!': ack.err + }}); + })}); + } + gun.wsp.on('network', function(req){ + // TODO: MARK! You should move the networking events to here, not in WSS only. + }); + tran.json = 'application/json'; + return tran; + }()); + if(opt.server){ + wsp(opt.server); + } +}); From e36e78f532f9be55130b50a04d85936b92106b32 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Thu, 10 Nov 2016 10:09:18 -0700 Subject: [PATCH 02/24] Isolate websocket logic WebSocket logic has it's own folder now, `wsp`. --- lib/server.js | 2 +- lib/{wsp-client.js => wsp/client.js} | 2 +- lib/{wsp.js => wsp/server.js} | 12 ++++++------ lib/{ => wsp}/ws.js | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) rename lib/{wsp-client.js => wsp/client.js} (99%) rename lib/{wsp.js => wsp/server.js} (96%) rename lib/{ => wsp}/ws.js (92%) diff --git a/lib/server.js b/lib/server.js index af0f28b5..a6082d4c 100644 --- a/lib/server.js +++ b/lib/server.js @@ -3,7 +3,7 @@ var Gun = require('../gun'); console.log("TODO: MARK! UPDATE S3 DRIVER BEFORE PUBLISHING!") //require('./s3'); - require('./wsp'); + require('./wsp/server'); require('./file'); module.exports = Gun; }()); diff --git a/lib/wsp-client.js b/lib/wsp/client.js similarity index 99% rename from lib/wsp-client.js rename to lib/wsp/client.js index 8640d917..d39a53df 100644 --- a/lib/wsp-client.js +++ b/lib/wsp/client.js @@ -8,7 +8,7 @@ */ 'use strict'; -var Gun = require('../gun'); +var Gun = require('../../gun'); var WS = require('ws'); var Tab = {}; diff --git a/lib/wsp.js b/lib/wsp/server.js similarity index 96% rename from lib/wsp.js rename to lib/wsp/server.js index 7ac4bbac..2dfdd27c 100644 --- a/lib/wsp.js +++ b/lib/wsp/server.js @@ -1,14 +1,14 @@ /* eslint-disable*/ -var Gun = require('../gun') +var Gun = require('../../gun') , formidable = require('formidable') -, http = require('./http') +, http = require('../http') , url = require('url') , wsp = {} , WS = require('ws') , ws = WS.Server; // Handles server to server sync. -require('./wsp-client.js'); +require('./client.js'); Gun.on('opt', function(at){ var gun = at.gun, opt = at.opt; @@ -67,7 +67,7 @@ Gun.on('opt', function(at){ if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail! if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){ res.writeHead(200, {'Content-Type': 'text/javascript'}); - res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../gun.js')); // gun server is caching the gun library for the client + res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../../gun.js')); // gun server is caching the gun library for the client return true; } @@ -78,7 +78,7 @@ Gun.on('opt', function(at){ return http(req, res, function(req, res){ if(!req){ return next() } - var stream, cb = res = require('./jsonp')(req, res); + var stream, cb = res = require('../jsonp')(req, res); if(req.headers && (stream = req.headers['gun-sid'])){ stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream}; stream.drain = stream.drain || function(res){ @@ -172,7 +172,7 @@ Gun.on('opt', function(at){ }}); })}); } - gun.wsp.on('network', function(req){ + gun.wsp.on('network', function(rq){ // TODO: MARK! You should move the networking events to here, not in WSS only. }); tran.json = 'application/json'; diff --git a/lib/ws.js b/lib/wsp/ws.js similarity index 92% rename from lib/ws.js rename to lib/wsp/ws.js index 270ea0b4..45dc100a 100644 --- a/lib/ws.js +++ b/lib/wsp/ws.js @@ -1,4 +1,4 @@ -var Gun = require('../gun') +var Gun = require('../../gun') , url = require('url'); module.exports = function(wss, server, opt){ wss.on('connection', function(ws){ @@ -27,7 +27,7 @@ module.exports = function(wss, server, opt){ (reply.headers = reply.headers || {})['ws-rid'] = msg.headers['ws-rid']; } try{ws.send(Gun.text.ify(reply)); - }catch(e){} // juuuust in case. + }catch(e){} // juuuust in case. }); }); ws.off = function(m){ From 4cd2d434d109b71004f9b4ca23e901044c485234 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Thu, 10 Nov 2016 10:11:01 -0700 Subject: [PATCH 03/24] Add websocket backoff/retry logic New library handles websocket reconnection logic and queues messages that were sent while offline. --- lib/wsp/Peer.js | 176 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 lib/wsp/Peer.js diff --git a/lib/wsp/Peer.js b/lib/wsp/Peer.js new file mode 100644 index 00000000..05f510b3 --- /dev/null +++ b/lib/wsp/Peer.js @@ -0,0 +1,176 @@ +var WebSocket = require('ws'); + +/** + * Calculates backoff instances. + * @param {Object} [options] - Override the default settings. + * @param {Object} options.time=50 - Initial backoff time. + * @param {Object} options.factor=2 - How much to multiply the time by. + * @class + */ +function Backoff (options) { + this.options = options || {}; + + // Sets the initial backoff settings. + this.reset(); +} + +/** + * Increments the time by the factor. + * @return {Number} - The next backoff time. + */ +Backoff.prototype.next = function () { + this.time *= this.factor; + + return this.time; +}; + +/** + * Resets the backoff state to it's original condition. + * @return {Backoff} - The context. + */ +Backoff.prototype.reset = function () { + var options = this.options; + + this.time = options.time || 50; + this.factor = options.factor || 2; + + return this; +}; + +/** + * Create a websocket client and handle reconnect backoff logic. + * @param {String} url - A preformatted url (starts with ws://) + * @param {Object} [options] - Override how the socket is managed. + * @param {Object} options.backoff - Backoff options (see the constructor). + * @class + */ +function Peer (url, options) { + if (!(this instanceof Peer)) { + return new Peer(url, options); + } + + this.options = options || {}; + + // Messages sent while offline. + this.offline = []; + + this.url = Peer.formatURL(url); + this.backoff = new Backoff(this.options.backoff); + this.retry(url); +} + +/** + * Turns http URLs into WebSocket URLs. + * @param {String} url - The url to format. + * @return {String} - A correctly formatted WebSocket URL. + */ +Peer.formatURL = function (url) { + + // Works for `https` and `wss` URLs, too. + return url.replace('http', 'ws'); +}; + +var API = Peer.prototype; + +/** + * Attempts a websocket connection. + * @param {String} url - The websocket URL. + * @return {WebSocket} - The new websocket instance. + */ +API.retry = function () { + var url = this.url; + + var socket = new WebSocket(url); + this.socket = socket; + + this.retryOnDisconnect(socket); + + this.sendOnConnection(); + + return socket; +}; + +/** + * Sends the messages that couldn't be sent before once + * the connection is open. + * @return {Peer} - The context. + */ +API.sendOnConnection = function () { + var peer = this; + var queue = this.offline; + var socket = this.socket; + + // Wait for the socket to connect. + socket.once('open', function () { + queue.forEach(function (msg) { + socket.send(msg); + }); + + peer.offline = []; + }); + + return this; +}; + +/** + * Schedules the next retry, according to the backoff. + * @param {Peer} peer - A peer instance. + * @return {Timeout} - The timeout value from `setTimeout`. + */ +function schedule (peer) { + var backoff = peer.backoff; + var time = backoff.time; + backoff.next(); + + return setTimeout(function () { + var socket = peer.retry(); + + // Successfully reconnected? Reset the backoff. + socket.once('open', backoff.reset.bind(backoff)); + }, time); +} + +/** + * Attaches handlers to the socket, attempting reconnection + * when it's closed. + * @param {WebSocket} socket - The websocket instance to bind to. + * @return {WebSocket} - The same websocket. + */ +API.retryOnDisconnect = function (socket) { + var peer = this; + + // Listen for socket close events. + socket.once('close', function () { + schedule(peer); + }); + + socket.on('error', function (error) { + if (error.code === 'ECONNREFUSED') { + schedule(peer); + } + }); + + return socket; +}; + +/** + * Send data through the socket, or add it to a queue + * of offline requests if it's not ready yet. + * @param {String} msg - The data to send. + * @return {Peer} - The context. + */ +API.send = function (msg) { + var socket = this.socket; + var state = socket.readyState; + var ready = socket.OPEN; + + if (state === ready) { + socket.send(msg); + } else { + this.offline.push(msg); + } + + return this; +}; + +module.exports = Peer; From e8194887e06e9931292c9929a2823d0110e89eb9 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Fri, 11 Nov 2016 17:48:56 -0700 Subject: [PATCH 04/24] Add basic server get handling Servers now dispatch requests to clients and listen for responses, plugging them into gun. --- lib/wsp/server-push.js | 123 +++++++++++++++++++++++++++++++++++++++++ lib/wsp/server.js | 13 ++++- 2 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 lib/wsp/server-push.js diff --git a/lib/wsp/server-push.js b/lib/wsp/server-push.js new file mode 100644 index 00000000..ac3efa4d --- /dev/null +++ b/lib/wsp/server-push.js @@ -0,0 +1,123 @@ +'use strict'; +var Gun = require('../../gun.js'); + +/** + * Whether the gun instance is attached to a socket server. + * @param {Gun} gun - The gun instance in question. + * @param {WebSocket.Server} server - A socket server gun might be attached to. + * @return {Boolean} - Whether it's attached. + */ +function isUsingServer (gun, server) { + var servers = gun.Back(-1)._.servers; + + return servers ? servers.indexOf(server) !== -1 : false; +} + +/** + * Calls a function when (or if) a socket is ready for messages. + * @param {WebSocket} socket - A websocket connection. + * @param {Function} cb - Called if or when the socket is ready. + * @return {Boolean} - Whether the socket is able to take messages. + */ +function ready (socket, cb) { + var state = socket.readyState; + + // The socket is ready. + if (state === socket.OPEN) { + cb(); + return true; + } + + // Still opening. + if (state === socket.OPENING) { + socket.once('open', cb); + } + + // Nope, closing or closed. + return false; +} + +/** + * Send a request to a list of clients. + * @param {Obejct} context - A gun request context. + * @param {Object} clients - IDs mapped to socket instances. + * @param {Function} cb - Called for each response. + * @return {Object} - The context object. + */ +function request (context, clients, cb) { + Gun.obj.map(clients, function (client) { + ready(client, function () { + var msg = { + headers: {}, + body: { + '#': Gun.on.ask(cb), + '$': context.get, + }, + }; + + var serialized = JSON.stringify(msg); + client.send(serialized); + }); + }); +} + +/** * Attaches server push middleware to gun. + * @param {Gun} gun - The gun instance to attach to. + * @param {WebSocket.Server} server - A websocket server instance. + * @return {server} - The socket server. + */ +function attach (gun, server) { + var root = gun.Back(-1); + root._.servers = root._.servers || []; + root._.servers.push(server); + var pool = {}; + + server.on('connection', function (socket) { + socket.id = socket.id || Gun.text.random(10); + pool[socket.id] = socket; + + socket.on('message', function (message) { + var data = Gun.obj.ify(message); + + if (!data || !data.body) { + return; + } + + var msg = data.body; + + if (msg['@']) { + Gun.on.ack(msg['@'], [msg['!'], msg.$]); + return; + } + }); + + socket.once('close', function () { + delete pool[socket.id]; + }); + }); + + Gun.on('get', function (context) { + if (!isUsingServer(context.gun, server)) { + return; + } + request(context, pool, function (err, data) { + var response = { + '@': context['#'], + put: data, + err: err, + }; + + var root = context.gun.Back(Infinity); + + root.on(data ? 'out' : 'in', response); + }); + }); + + Gun.on('put', function (context) { + if (!isUsingServer(context.gun, server)) { + return; + } + }); +} + +module.exports = attach; diff --git a/lib/wsp/server.js b/lib/wsp/server.js index 2dfdd27c..b4e6fd16 100644 --- a/lib/wsp/server.js +++ b/lib/wsp/server.js @@ -1,11 +1,11 @@ -/* eslint-disable*/ var Gun = require('../../gun') , formidable = require('formidable') , http = require('../http') , url = require('url') , wsp = {} , WS = require('ws') -, ws = WS.Server; +, WSS = WS.Server +, attach = require('./server-push'); // Handles server to server sync. require('./client.js'); @@ -18,7 +18,14 @@ Gun.on('opt', function(at){ function start(server, port, app){ if(app && app.use){ app.use(gun.wsp.server) } server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server; - require('./ws')(gun.wsp.ws = gun.wsp.ws || new ws(gun.__.opt.ws), function(req, res){ + + if (!gun.wsp.ws) { + gun.wsp.ws = new WSS(gun.__.opt.ws); + attach(gun, gun.wsp.ws); + } + + gun.wsp.ws = gun.wsp.ws || new WSS(gun.__.opt.ws); + require('./ws')(gun.wsp.ws, function(req, res){ var ws = this; req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid']; ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){ From ba43dcac17cbf387736e9f2ac11a21c1205af4b4 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Mon, 14 Nov 2016 13:40:55 -0700 Subject: [PATCH 05/24] Add server push Pushes graph updates to connected clients, listening for acknowledgements. --- lib/wsp/server-push.js | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/lib/wsp/server-push.js b/lib/wsp/server-push.js index ac3efa4d..e4c77f7e 100644 --- a/lib/wsp/server-push.js +++ b/lib/wsp/server-push.js @@ -42,7 +42,7 @@ function ready (socket, cb) { * @param {Obejct} context - A gun request context. * @param {Object} clients - IDs mapped to socket instances. * @param {Function} cb - Called for each response. - * @return {Object} - The context object. + * @return {undefined} */ function request (context, clients, cb) { Gun.obj.map(clients, function (client) { @@ -61,6 +61,31 @@ function request (context, clients, cb) { }); } +/** + * Pushes a graph update to a collection of clients. + * @param {Object} context - The context object passed by gun. + * @param {Object} clients - An object mapping URLs to clients. + * @param {Function} cb - Invoked on each client response. + * @return {undefined} + */ +function update (context, clients, cb) { + Gun.obj.map(clients, function (client) { + ready(client, function () { + var msg = { + headers: {}, + body: { + '#': Gun.on.ask(cb), + '$': context.put, + }, + }; + + var serialized = JSON.stringify(msg); + + client.send(serialized); + }); + }); +} + /** * Attaches server push middleware to gun. * @param {Gun} gun - The gun instance to attach to. * @param {WebSocket.Server} server - A websocket server instance. @@ -117,6 +142,14 @@ function attach (gun, server) { if (!isUsingServer(context.gun, server)) { return; } + + update(context, pool, function (err, data) { + var ack = { + '!': err || null, + '$': data.$, + }; + Gun.on.ack(context, ack); + }); }); } From 3e66aff985ff93022a866b6f2170cde2676e2c50 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Mon, 14 Nov 2016 16:31:45 -0700 Subject: [PATCH 06/24] Allow opting out of file.json If the most recent gun options disable the file module, then it won't try to read/write from the json. Previously it would. Also, now you can override the behavior by passing `{ file: false }` as the options in `gun.put`. --- lib/file.js | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/lib/file.js b/lib/file.js index 51a6ed2c..0ee6a169 100644 --- a/lib/file.js +++ b/lib/file.js @@ -6,6 +6,13 @@ var Gun = require('../gun'), fs = require('fs'), file = {}; +function isUsingFileJS (context) { + var gun = context.gun; + var opt = context.opt || gun.Back('opt') || {}; + + return opt.file !== false; +} + // queue writes, adapted from https://github.com/toolness/jsondown/blob/master/jsondown.js var isWriting = false, queuedWrites = []; function writeFile(path, disk, at){ @@ -24,6 +31,9 @@ function writeFile(path, disk, at){ } Gun.on('put', function(at){ + if (isUsingFileJS(at) === false) { + return; + } var gun = at.gun, graph = at.put, opt = at.opt || {}; var __ = gun._.root._; Gun.obj.map(graph, function(node, soul){ @@ -32,7 +42,10 @@ Gun.on('put', function(at){ writeFile(opt.file || file.file, file.disk, at); }); Gun.on('get', function(at){ - var gun = at.gun, lex = at.get, opt = at.opt; + if (isUsingFileJS(at) === false) { + return; + } + var gun = at.gun, lex = at.get; if(!lex){return} gun.Back(-1).on('in', {'@': at['#'], put: Gun.graph.node(file.disk.graph[lex['#']])}); //at.cb(null, file.disk.graph[lex['#']]); @@ -43,7 +56,11 @@ Gun.on('opt', function(at){ if ((opts.file === false) || (opts.s3 && opts.s3.key)) { return; // don't use this plugin if S3 is being used. } - console.log("WARNING! This `file.js` module for gun is intended only for local development testing!") + Gun.log.once( + 'file-warning', + 'WARNING! This `file.js` module for gun is ' + + 'intended only for local development testing!' + ); file.file = opts.file || file.file || 'data.json'; file.raw = file.raw || (fs.existsSync || require('path').existsSync)(opts.file) ? fs.readFileSync(opts.file).toString() : null; file.disk = file.disk || Gun.obj.ify(file.raw || {graph: {}}); From ddf272bbb05f633aca36675044b78858fb7902ad Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Mon, 14 Nov 2016 17:00:06 -0700 Subject: [PATCH 07/24] Allow greeting messages to be opt-out If you use gun a bunch, you've probably noticed the messages like "Hello wonderful person :)" and "WARNING: This file.js module...". This PR allows you to silence them. Use `Gun.log.off = true` to bring peace back to your workflow. > **Note:** great when used with file watchers like nodemon." --- lib/server.js | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/server.js b/lib/server.js index a6082d4c..fbe858a1 100644 --- a/lib/server.js +++ b/lib/server.js @@ -1,9 +1,13 @@ -;(function(){ - console.log("Hello wonderful person! :) I'm mark@gunDB.io, message me for help or with hatemail. I want to hear from you! <3"); - var Gun = require('../gun'); - console.log("TODO: MARK! UPDATE S3 DRIVER BEFORE PUBLISHING!") +;(function(){ + var Gun = require('../gun'); //require('./s3'); require('./wsp/server'); - require('./file'); - module.exports = Gun; + require('./file'); + Gun.log( + 'Hello wonderful person! :)\n' + + 'I\'m mark@gunDB.io, message me for help or with hatemail. ' + + 'I want to hear from you! <3' + ); + Gun.log('TODO: MARK! UPDATE S3 DRIVER BEFORE PUBLISHING!'); + module.exports = Gun; }()); From e7162aa09845fd835c1bf2092b950496217d33fc Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Tue, 15 Nov 2016 13:10:00 -0700 Subject: [PATCH 08/24] Reliably check file.js options The FileJS module can be passed options in two ways, and this commit ensures they're treated in the right way. Previously, options passed as .put or .get parameters would be favored over those used on the chain, even if `file` wasn't specified. Now, the module will only use the method options if `file` is mentioned, falling back to the chain options. This was a mistake on my part with the first PR (#268), I failed to notice notice that edge case. --- lib/file.js | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/lib/file.js b/lib/file.js index 0ee6a169..36459601 100644 --- a/lib/file.js +++ b/lib/file.js @@ -7,10 +7,20 @@ var Gun = require('../gun'), file = {}; function isUsingFileJS (context) { - var gun = context.gun; - var opt = context.opt || gun.Back('opt') || {}; - return opt.file !== false; + // Options passed via .get or .put. + var methodOptions = context.opt || {}; + + // Options set on the gun chain. + var chainOptions = context.gun.Back('opt') || {}; + + // Favor method options over chain options. + var file = methodOptions.hasOwnProperty('file') + ? methodOptions.file + : chainOptions.file; + + // Return whether the module is disabled. + return file !== false; } // queue writes, adapted from https://github.com/toolness/jsondown/blob/master/jsondown.js From 126a668bf3fb6c35ae39ffa8f9c41482ec4c99aa Mon Sep 17 00:00:00 2001 From: Mark Nadal Date: Tue, 15 Nov 2016 14:39:24 -0800 Subject: [PATCH 09/24] Update gun.js --- gun.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gun.js b/gun.js index a9857102..125a56dc 100644 --- a/gun.js +++ b/gun.js @@ -751,7 +751,7 @@ if(!f){ at.node = at.node || n || {}; if(obj_has(v, Node._)){ - at.node._ = Gun.obj.copy(v._); + at.node._ = obj_copy(v._); } at.node = Node.soul.ify(at.node, Val.rel.is(at.rel)); } From f999e5f2ea8736ceaa45e4e59717f9c0ba91ec93 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Tue, 15 Nov 2016 16:20:18 -0700 Subject: [PATCH 10/24] Fix chain option search If you're reading this commit history, please avert your eyes. 3 commits to fix this admittedly simple problem is more than my pride can handle. The `gun.Back()` function was only searching for `opt`, not `opt.file`. It would stop at the first mention of `opt` even if it didn't have settings for `file`. Now it won't. --- lib/file.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/file.js b/lib/file.js index 36459601..26b0bea3 100644 --- a/lib/file.js +++ b/lib/file.js @@ -12,12 +12,12 @@ function isUsingFileJS (context) { var methodOptions = context.opt || {}; // Options set on the gun chain. - var chainOptions = context.gun.Back('opt') || {}; + var chainOption = context.gun.Back('opt.file'); // Favor method options over chain options. var file = methodOptions.hasOwnProperty('file') ? methodOptions.file - : chainOptions.file; + : chainOption; // Return whether the module is disabled. return file !== false; From 37696e1ada5fa2a566bf5fbe61a477f61520c1e2 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Wed, 16 Nov 2016 16:40:14 -0700 Subject: [PATCH 11/24] Prevent most broadcast storms Better de-duplication on messages sent for GET and PUT requests, allowing full circular connections without blowing up your computer. Sadly, this broke some things against the previous version, so per @amark's request I'm publishing so he can debug. --- lib/wsp/client.js | 22 +++++++++++++++++----- lib/wsp/server-push.js | 23 +++++++++++++++++++++-- lib/wsp/server.js | 20 ++++++++++++++++++-- 3 files changed, 56 insertions(+), 9 deletions(-) diff --git a/lib/wsp/client.js b/lib/wsp/client.js index d39a53df..808eb91b 100644 --- a/lib/wsp/client.js +++ b/lib/wsp/client.js @@ -246,6 +246,11 @@ Gun.on('get', function (at) { var gun = at.gun; var opt = at.opt || {}; var peers = opt.peers || gun.Back('opt.peers'); + var server = Tab.server || {}; + + var duplicated = server.msg || function () { + return false; + }; if (!peers || Gun.obj.empty(peers)) { Gun.log.once('peers', 'Warning! You have no peers to connect to!'); @@ -264,20 +269,27 @@ Gun.on('get', function (at) { '$': at.get, }; + if (duplicated(msg['#'])) { + return; + } + // Listen for a response. // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? Tab.on(msg['#'], function (err, data) { + var id = Gun.text.random(); + var root = at.gun.Back(Infinity); + var obj = { + '#': id, '@': at['#'], err: err, put: data, + + // Flag, prevents rebroadcast. + nopush: true, }; - if (data) { - at.gun.Back(-1).on('out', obj); - } else { - at.gun.Back(-1).on('in', obj); - } + root.on(data ? 'out' : 'in', obj); }); // Broadcast to all other peers. diff --git a/lib/wsp/server-push.js b/lib/wsp/server-push.js index e4c77f7e..55093ff6 100644 --- a/lib/wsp/server-push.js +++ b/lib/wsp/server-push.js @@ -45,12 +45,19 @@ function ready (socket, cb) { * @return {undefined} */ function request (context, clients, cb) { + var id = context['#'] || Gun.text.random(9); + + Gun.on(id, function (err, data, event) { + cb(err, data); + event.off(); + }); + Gun.obj.map(clients, function (client) { ready(client, function () { var msg = { headers: {}, body: { - '#': Gun.on.ask(cb), + '#': id, '$': context.get, }, }; @@ -69,12 +76,19 @@ function request (context, clients, cb) { * @return {undefined} */ function update (context, clients, cb) { + var id = context['#'] || Gun.text.random(9); + + Gun.on(id, function (err, data, event) { + cb(err, data); + event.off(); + }); + Gun.obj.map(clients, function (client) { ready(client, function () { var msg = { headers: {}, body: { - '#': Gun.on.ask(cb), + '#': id, '$': context.put, }, }; @@ -125,6 +139,7 @@ function attach (gun, server) { if (!isUsingServer(context.gun, server)) { return; } + request(context, pool, function (err, data) { var response = { '@': context['#'], @@ -143,6 +158,10 @@ function attach (gun, server) { return; } + if (context.nopush) { + return; + } + update(context, pool, function (err, data) { var ack = { '!': err || null, diff --git a/lib/wsp/server.js b/lib/wsp/server.js index b4e6fd16..c3fa0385 100644 --- a/lib/wsp/server.js +++ b/lib/wsp/server.js @@ -152,8 +152,24 @@ Gun.on('opt', function(at){ // TODO: BUG! server put should push. } tran.get = function(req, cb){ - var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; - gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){ + var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}; + + var graph = gun.Back(Infinity)._.graph; + var node = graph[lex['#']]; + var result = Gun.graph.ify(node); + + if (node) { + return cb({ + headers: reply.headers, + body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': result, + }, + }); + } + + gun.on('out', {gun: gun, get: lex, req: 1, '#': body['#'] || Gun.on.ask(function(at, ev){ ev.off(); var graph = at.put; return cb({headers: reply.headers, body: { From 86a5abd550b6559f00790e7b63357daaa6d7138f Mon Sep 17 00:00:00 2001 From: Cole Albon Date: Wed, 16 Nov 2016 19:14:31 -0700 Subject: [PATCH 12/24] #271 one liner fix error while assigning console --- gun.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gun.js b/gun.js index 125a56dc..5e6fe139 100644 --- a/gun.js +++ b/gun.js @@ -6,7 +6,7 @@ if(typeof window !== "undefined"){ root = window } if(typeof global !== "undefined"){ root = global } root = root || {}; - var console = root.console = root.console || {log: function(){}}; + var console = root.console || {log: function(){}}; function require(arg){ return arg.slice? require[resolve(arg)] : function(mod, path){ arg(mod = {exports: {}}); From e8f8047cb6043519e8f530cb77fffc604dcf80d9 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Thu, 17 Nov 2016 14:07:26 -0700 Subject: [PATCH 13/24] Expose websocket events The Peer "class" now extends EventEmitter. Listening to any websocket events (e.g., "message", "close", "open", etc.) will not only subscribe to the current websocket, but all future websockets. This provides a much needed abstraction, since reconnection replaces the socket, which would typically destroy your listeners. --- lib/wsp/Peer.js | 132 ++++++++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 66 deletions(-) diff --git a/lib/wsp/Peer.js b/lib/wsp/Peer.js index 05f510b3..aafd3513 100644 --- a/lib/wsp/Peer.js +++ b/lib/wsp/Peer.js @@ -1,4 +1,9 @@ +/* eslint-disable no-underscore-dangle */ +'use strict'; + var WebSocket = require('ws'); +var Emitter = require('events'); +var util = require('util'); /** * Calculates backoff instances. @@ -38,8 +43,23 @@ Backoff.prototype.reset = function () { }; /** - * Create a websocket client and handle reconnect backoff logic. - * @param {String} url - A preformatted url (starts with ws://) + * Schedules the next connection, according to the backoff. + * @param {Peer} peer - A peer instance. + * @return {Timeout} - The timeout value from `setTimeout`. + */ +function scheduleReconnect (peer) { + var backoff = peer.backoff; + var time = backoff.time; + backoff.next(); + + var reconnect = peer.connect.bind(peer); + + return setTimeout(reconnect, time); +} + +/** + * Handles reconnections and defers messages until the socket is ready. + * @param {String} url - The address to connect to. * @param {Object} [options] - Override how the socket is managed. * @param {Object} options.backoff - Backoff options (see the constructor). * @class @@ -49,14 +69,38 @@ function Peer (url, options) { return new Peer(url, options); } + // Extend EventEmitter. + Emitter.call(this); + this.setMaxListeners(Infinity); + this.options = options || {}; - // Messages sent while offline. - this.offline = []; + // Messages sent before the socket is ready. + this.deferredMsgs = []; this.url = Peer.formatURL(url); this.backoff = new Backoff(this.options.backoff); - this.retry(url); + + // Set up the websocket. + this.connect(); + + var peer = this; + var reconnect = scheduleReconnect.bind(null, peer); + + // Handle reconnection. + this.on('close', reconnect); + this.on('error', function (error) { + if (error.code === 'ECONNREFUSED') { + reconnect(); + } + }); + + // Send deferred messages. + this.on('open', function () { + peer.drainQueue(); + peer.backoff.reset(); + }); + } /** @@ -70,92 +114,47 @@ Peer.formatURL = function (url) { return url.replace('http', 'ws'); }; +util.inherits(Peer, Emitter); var API = Peer.prototype; /** * Attempts a websocket connection. - * @param {String} url - The websocket URL. * @return {WebSocket} - The new websocket instance. */ -API.retry = function () { +API.connect = function () { var url = this.url; + // Open a new websocket. var socket = new WebSocket(url); + + // Re-use the previous listeners. + socket._events = this._events; + this.socket = socket; - this.retryOnDisconnect(socket); - - this.sendOnConnection(); - return socket; }; /** - * Sends the messages that couldn't be sent before once - * the connection is open. + * Sends all the messages in the deferred queue. * @return {Peer} - The context. */ -API.sendOnConnection = function () { +API.drainQueue = function () { var peer = this; - var queue = this.offline; - var socket = this.socket; - // Wait for the socket to connect. - socket.once('open', function () { - queue.forEach(function (msg) { - socket.send(msg); - }); - - peer.offline = []; + this.deferredMsgs.forEach(function (msg) { + peer.send(msg); }); + // Reset the queue. + this.deferredMsgs = []; + return this; }; -/** - * Schedules the next retry, according to the backoff. - * @param {Peer} peer - A peer instance. - * @return {Timeout} - The timeout value from `setTimeout`. - */ -function schedule (peer) { - var backoff = peer.backoff; - var time = backoff.time; - backoff.next(); - - return setTimeout(function () { - var socket = peer.retry(); - - // Successfully reconnected? Reset the backoff. - socket.once('open', backoff.reset.bind(backoff)); - }, time); -} - -/** - * Attaches handlers to the socket, attempting reconnection - * when it's closed. - * @param {WebSocket} socket - The websocket instance to bind to. - * @return {WebSocket} - The same websocket. - */ -API.retryOnDisconnect = function (socket) { - var peer = this; - - // Listen for socket close events. - socket.once('close', function () { - schedule(peer); - }); - - socket.on('error', function (error) { - if (error.code === 'ECONNREFUSED') { - schedule(peer); - } - }); - - return socket; -}; - /** * Send data through the socket, or add it to a queue - * of offline requests if it's not ready yet. + * of deferred messages if it's not ready yet. * @param {String} msg - The data to send. * @return {Peer} - The context. */ @@ -164,10 +163,11 @@ API.send = function (msg) { var state = socket.readyState; var ready = socket.OPEN; + // Make sure the socket is ready. if (state === ready) { socket.send(msg); } else { - this.offline.push(msg); + this.deferredMsgs.push(msg); } return this; From 186b237ea5ba698dd31a7956bc5e1e562976522e Mon Sep 17 00:00:00 2001 From: Eric Schapp Date: Fri, 18 Nov 2016 10:53:00 -0600 Subject: [PATCH 14/24] Update README.md Just a quick typo fix. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7449fb4d..b5beea73 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Try the [interactive tutorial](http://gun.js.org/think.html) in the browser (**5 ``` Then visit [http://localhost:8080](http://localhost:8080) in your browser. -### Hiroku +### Heroku ```bash git clone https://github.com/amark/gun.git cd gun From e402e3966ee0669c430b72e0315933da99ec6478 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Fri, 18 Nov 2016 11:42:24 -0700 Subject: [PATCH 15/24] Optimistically open client sockets Changes behavior from only opening sockets when absolutely necessary to keeping them open for as long as possible. Key differences: - Much higher success rate for messages sent from the connected server. - Process no longer shuts down if nothing is done with gun, instead listens for incoming messages on client sockets. Socket reconnect handle by Peer instances, meaning better handling for deferred messages and predictable backoff. The client.js logic has been significantly refactored. Among the improvements, GET/PUT requests now respect the `peers` option for each gun instance, only sending requests to the URLs listed. --- lib/wsp/Peer.js | 7 +- lib/wsp/Pool.js | 101 ++++++++ lib/wsp/client.js | 584 +++++++++++++++++-------------------------- lib/wsp/duplicate.js | 90 +++++++ 4 files changed, 430 insertions(+), 352 deletions(-) create mode 100644 lib/wsp/Pool.js create mode 100644 lib/wsp/duplicate.js diff --git a/lib/wsp/Peer.js b/lib/wsp/Peer.js index aafd3513..35b04f28 100644 --- a/lib/wsp/Peer.js +++ b/lib/wsp/Peer.js @@ -155,7 +155,7 @@ API.drainQueue = function () { /** * Send data through the socket, or add it to a queue * of deferred messages if it's not ready yet. - * @param {String} msg - The data to send. + * @param {Mixed} msg - String, or anything that JSON can handle. * @return {Peer} - The context. */ API.send = function (msg) { @@ -163,6 +163,11 @@ API.send = function (msg) { var state = socket.readyState; var ready = socket.OPEN; + // Make sure it's a string. + if (typeof msg !== 'string') { + msg = JSON.stringify(msg); + } + // Make sure the socket is ready. if (state === ready) { socket.send(msg); diff --git a/lib/wsp/Pool.js b/lib/wsp/Pool.js new file mode 100644 index 00000000..542c0482 --- /dev/null +++ b/lib/wsp/Pool.js @@ -0,0 +1,101 @@ +'use strict'; + +/** + * Simpler interface over a collection of sockets. Works with + * WebSocket clients, or sockets from a WebSocket server. + * @class + */ +function Pool () { + if (!(this instanceof Pool)) { + return new Pool(); + } + + // Maps IDs to sockets. + this.sockets = {}; +} + +var API = Pool.prototype; + +/** + * Returns the socket by the given ID. + * @param {String} id - The unique socket ID. + * @return {WebSocket|Null} - The WebSocket, if found. + */ +API.get = function (id) { + return this.sockets[id] || null; +}; + +/** + * Adds a socket to the pool. + * @param {String} id - The socket ID. + * @param {WebSocket} socket - A websocket instance. + * @return {Pool} - The context. + */ +API.add = function (id, socket) { + this.sockets[id] = socket; + + return this; +}; + +/** + * Removes a socket from the pool. + * @param {String} id - The ID of the socket to remove. + * @return {Boolean} - Whether the pool contained the socket. + */ +API.remove = function (id) { + var sockets = this.sockets; + var hasSocket = sockets.hasOwnProperty(id); + + if (hasSocket) { + delete sockets[id]; + } + + return hasSocket; +}; + +/** + * Creates a filtered pool of sockets. Works the same as Array#filter. + * @param {Function} fn - Called for each socket in the pool. + * @param {Mixed} [_this] - The `this` context to use when invoking + * the callback. + * @return {Pool} - A new, filtered socket pool. + */ +API.filter = function (fn, _this) { + var filtered = Pool(); + var pool = this; + + _this = _this || pool; + + Object.keys(this.sockets).forEach(function (id) { + var socket = pool.sockets[id]; + + var shouldAdd = fn.call(_this, socket, id, pool); + + // Add it to the new pool. + if (shouldAdd) { + filtered.add(id, socket); + } + }); + + return filtered; +}; + +/** + * Send a message through each socket in the pool. + * @param {String} msg - The message to send. + * @return {Number} - How many sockets the message was sent to. + */ +API.send = function (msg) { + var pool = this; + + var ids = Object.keys(this.sockets); + + ids.forEach(function (id) { + var socket = pool.sockets[id]; + socket.send(msg); + }); + + return ids.length; +}; + +module.exports = Pool; diff --git a/lib/wsp/client.js b/lib/wsp/client.js index d39a53df..db8acfea 100644 --- a/lib/wsp/client.js +++ b/lib/wsp/client.js @@ -1,245 +1,65 @@ -/* eslint-env node*/ /* eslint-disable - require-jsdoc, no-warning-comments, no-underscore-dangle, - max-params, */ 'use strict'; var Gun = require('../../gun'); -var WS = require('ws'); +var Socket = require('./Peer'); +var Pool = require('./Pool'); +var duplicate = require('./duplicate'); -var Tab = {}; -Tab.on = Gun.on; -Tab.peers = (function () { +// Maps URLs to sockets. +// Shared between all gun instances. +var sockets = Pool(); +var emitter = { on: Gun.on }; +var server = { - function Peer (peers) { - if (!Peer.is(this)) { - return new Peer(peers); - } + // Session id. + sid: Gun.text.random(), - this.peers = peers; - } + // Request handlers. + handlers: [], - Peer.is = function (peer) { - return peer instanceof Peer; - }; - - function map (peer, url) { - var msg = this.msg; - var opt = this.opt || {}; - opt.out = true; - Peer.request(url, msg, null, opt); - } - - Peer.prototype.send = function (msg, opt) { - Peer.request.each(this.peers, map, { - msg: msg, - opt: opt, + // Call handlers. + handle: function (req, res) { + server.handlers.forEach(function (server) { + server(req, res); }); - }; + }, +}; - Peer.request = (function () { +/** + * Take a map of URLs pointing to options and ensure the + * urls are using the WS protocol. + * @param {Object} peers - Any object with URLs as keys. + * @return {Object} - Object with normalized URL keys. + */ +function normalizeURLs (peers) { + var formatted = {}; - function request (base, body, cb, opt) { + Object.keys(peers).forEach(function (url) { + var options = peers[url]; + var id = Socket.formatURL(url); + formatted[id] = options; + }); - var obj = base.length ? { base: base } : {}; - obj.base = opt.base || base; - obj.body = opt.body || body; - obj.headers = opt.headers; - obj.url = opt.url; - obj.out = opt.out; - cb = cb || function () {}; + return formatted; +} - if (!obj.base) { - return; - } +/** + * Turns a map of URLs into a socket pool. + * @param {Object} peers - Any object with URLs as keys. + * @return {Pool} - A pool of sockets corresponding to the URLs. + */ +function getSocketSubset (peers) { + var urls = normalizeURLs(peers); - request.transport(obj, cb); - } - - request.createServer = function (fn) { - request.createServer.list.push(fn); - }; - - request.createServer.ing = function (req, cb) { - var index = request.createServer.list.length; - var server; - while (index) { - index -= 1; - server = request.createServer.list[index] || function () {}; - server(req, cb); - } - }; - - request.createServer.list = []; - request.back = 2; - request.backoff = 2; - - request.transport = function (opt, cb) { - if (request.ws(opt, cb)) { - return; - } - }; - - request.ws = function (opt, cb, req) { - var ws; - if (!WS) { - return false; - } - - ws = request.ws.peers[opt.base]; - if (ws) { - req = req || {}; - if (opt.headers) { - req.headers = opt.headers; - } - if (opt.body) { - req.body = opt.body; - } - - if (opt.url) { - req.url = opt.url; - } - - req.headers = req.headers || {}; - - if (!opt.out && !ws.cbs[req.headers['ws-rid']]) { - var rid = 'WS' + - new Date().getTime() + - '.' + - Math.floor((Math.random() * 65535) + 1); - - req.headers['ws-rid'] = rid; - - ws.cbs[rid] = function (err, res) { - if (!res || res.body || res.end) { - delete ws.cbs[req.headers['ws-rid']]; - } - - cb(err, res); - }; - } - - if (!ws.readyState) { - setTimeout(function () { - request.ws(opt, cb, req); - }, 100); - - return true; - } - - ws.sending = true; - ws.send(JSON.stringify(req)); - return true; - } - - if (ws === false) { - return false; - } - - var wsURL = opt.base.replace('http', 'ws'); - - ws = request.ws.peers[opt.base] = new WS(wsURL); - ws.cbs = {}; - - ws.onopen = function () { - request.back = 2; - request.ws(opt, cb); - }; - - ws.onclose = function (event) { - - if (!ws || !event) { - return; - } - - if (ws.close instanceof Function) { - ws.close(); - } - - if (!ws.sending) { - ws = request.ws.peers[opt.base] = false; - request.transport(opt, cb); - return; - } - - request.each(ws.cbs, function (cb) { - cb({ - err: 'WebSocket disconnected!', - code: ws.sending ? (ws || {}).err || event.code : -1, - }); - }); - - // This will make the next request try to reconnect - ws = request.ws.peers[opt.base] = null; - - // TODO: Have the driver handle this! - setTimeout(function () { - - // opt here is a race condition, - // is it not? Does this matter? - request.ws(opt, function () {}); - }, request.back *= request.backoff); - }; - - ws.onmessage = function (msg) { - var res; - if (!msg || !msg.data) { - return; - } - try { - res = JSON.parse(msg.data); - } catch (error) { - return; - } - if (!res) { - return; - } - res.headers = res.headers || {}; - if (res.headers['ws-rid']) { - var cb = ws.cbs[res.headers['ws-rid']] || function () {}; - cb(null, res); - return; - } - - // emit extra events. - if (res.body) { - request.createServer.ing(res, function (res) { - res.out = true; - request(opt.base, null, null, res); - }); - } - }; - - ws.onerror = function (error) { - (ws || {}).err = error; - }; - - return true; - }; - request.ws.peers = {}; - request.ws.cbs = {}; - - request.each = function (obj, cb, as) { - if (!obj || !cb) { - return; - } - - for (var key in obj) { - if (obj.hasOwnProperty(key)) { - cb.call(as, obj[key], key); - } - } - }; - - return request; - }()); - - return Peer; -}()); + return sockets.filter(function (socket) { + return urls.hasOwnProperty(socket.url); + }); +} // Handle read requests. Gun.on('get', function (at) { @@ -248,17 +68,20 @@ Gun.on('get', function (at) { var peers = opt.peers || gun.Back('opt.peers'); if (!peers || Gun.obj.empty(peers)) { - Gun.log.once('peers', 'Warning! You have no peers to connect to!'); - at.gun.Back(-1).on('in', {'@': at['#']}); + at.gun.Back(Infinity).on('in', { + '@': at['#'], + }); return; } + var id = at['#'] || Gun.text.random(9); + // Create a new message. var msg = { // msg ID - '#': at['#'] || Gun.text.random(9), + '#': id, // msg BODY '$': at.get, @@ -266,7 +89,7 @@ Gun.on('get', function (at) { // Listen for a response. // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? - Tab.on(msg['#'], function (err, data) { + emitter.on(id, function (err, data) { var obj = { '@': at['#'], err: err, @@ -280,11 +103,12 @@ Gun.on('get', function (at) { } }); - // Broadcast to all other peers. - Tab.peers(peers).send(msg, { - headers: { - 'gun-sid': Tab.server.sid, - }, + var subset = getSocketSubset(peers); + + // Broadcast to the connected peers. + subset.send({ + headers: { 'gun-sid': server.sid }, + body: msg, }); }); @@ -293,26 +117,40 @@ Gun.on('put', function (at) { if (at['@']) { return; } - var opt = at.gun.Back('opt') || {}, peers = opt.peers; + + var peers = at.gun.Back('opt.peers'); + var enabled = at.gun.Back('opt.websocket'); + var options = at.opt || {}; + if (!peers || Gun.obj.empty(peers)) { - Gun.log.once('peers', 'Warning! You have no peers to save to!'); - at.gun.Back(-1).on('in', {'@': at['#']}); + + // TODO: What about wsp/server clients? Maybe we shouldn't + // immediately assume there's no data to be found. + at.gun.Back(-1).on('in', { + '@': at['#'], + }); + return; } - if (opt.websocket === false || (at.opt && at.opt.websocket === false)) { + + if (options.websocket === false || enabled === false) { return; } + + var id = at['#'] || Gun.text.random(9); + var msg = { - // msg ID - '#': at['#'] || Gun.text.random(9), + // Message ID. + '#': id, - // msg BODY + // Message body. '$': at.put, }; // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? - Tab.on(msg['#'], function (err, ok) { + // Listen for acknowledgement(s). + Gun.on(id, function (err, ok) { at.gun.Back(-1).on('in', { '@': at['#'], err: err, @@ -320,28 +158,157 @@ Gun.on('put', function (at) { }); }); - Tab.peers(peers).send(msg, { - headers: { - 'gun-sid': Tab.server.sid, - }, + var subset = getSocketSubset(peers); + + subset.send({ + headers: { 'gun-sid': server.sid }, + body: msg, + }); +}); + +// Open any new sockets listed, +// adding them to the global pool. +Gun.on('opt', function (context) { + var gun = context.gun; + + var peers = gun.Back('opt.peers') || {}; + + Gun.obj.map(peers, function (options, url) { + if (sockets[url]) { + return; + } + + var socket = Socket(url, options); + sockets.add(url, socket); + + /** + * Handle responses to requests, adding default headers. + * @param {Object} reply - A gun reply object. + * @return {undefined} + */ + function respond (reply) { + + // Make sure headers are defined. + var headers = reply.headers = reply.headers || {}; + + // Add 'gun-sid' if it doesn't exist. + headers['gun-sid'] = headers['gun-sid'] || server.sid; + + socket.send(reply); + } + + socket.on('message', function (msg) { + var request; + + try { + request = JSON.parse(msg); + } catch (error) { + return; + } + + // Validate the request. + if (!request || !request.body) { + return; + } + + request.headers = request.headers || {}; + + // emit extra events. + server.handle(request, respond); + }); }); }); -// REVIEW: Do I need this on a server client? -// browser/client side Server! -// TODO: BUG! Does not respect separate instances!!! Gun.on('opt', function (at) { - if (Tab.server) { + var gun = at.gun; + var root = gun.Back(Infinity); + var options = (root._.opt = root._.opt || {}); + + // Only register once per gun instance. + if (options['@client']) { return; } - var gun = at.gun; - var server = Tab.server = Tab.server || {}; - var tmp; + var driver = options['@client'] = { - server.sid = Gun.text.random(); + /** + * Handles get requests sent from other peers. + * @param {Object} req - The get request. + * @param {Function} cb - Handles replies. + * @return {undefined} + */ + get: function (req, cb) { + var body = req.body; + var lex = body.$; + var graph = gun._.root._.graph; + var node = graph[lex['#']]; - Tab.peers.request.createServer(function (req, res) { + // TODO: Reply even if it's not in memory. + if (!node) { + return; + } + + cb({ + body: { + '#': duplicate.track.newID(), + '@': body['#'], + '$': node, + }, + }); + }, + + /** + * Handles put requests sent from other peers. + * @param {Object} req - The put request. + * @param {Function} cb - A response callback. + * @return {undefined} + */ + put: function (req, cb) { + var body = req.body; + var graph = body.$; + + // Cached gun paths. + var path = gun._.root._.path || {}; + + graph = Gun.obj.map(graph, function (node, soul, map) { + if (!path[soul]) { + return; + } + map(soul, node); + }); + + // filter out what we don't have in memory. + if (!graph) { + return; + } + + var id = Gun.on.ask(function (ack, event) { + if (!ack) { + return; + } + + event.off(); + + cb({ + body: { + '#': duplicate.track.newID(), + '@': body['#'], + '$': ack, + '!': ack.err, + }, + }); + }); + + gun.on('out', { + '#': duplicate.track(id), + gun: gun, + opt: { websocket: false }, + put: graph, + }); + }, + }; + + server.handlers.push(function (req, res) { // Validate request. if (!req || !res || !req.body || !req.headers) { @@ -350,111 +317,26 @@ Gun.on('opt', function (at) { var msg = req.body; - // AUTH for non-replies. - if (server.msg(msg['#'])) { + if (duplicate(msg['#'])) { return; } - // no need to process. + // It's a response, no need to reply. if (msg['@']) { - if (Tab.ons[tmp = msg['@'] || msg['#']]) { - Tab.on(tmp, [msg['!'], msg.$]); - } + var reqID = msg['@']; + + emitter.on(reqID, [ + msg['!'], + msg.$, + ]); + return; } - if (msg.$ && msg.$['#']) { - server.get(req, res); - return; - } + var isLex = msg.$ && msg.$['#']; + var method = isLex ? 'get' : 'put'; - server.put(req, res); + driver[method](req, res); }); - server.get = function (req, cb) { - var body = req.body; - var lex = body.$; - var graph = gun._.root._.graph; - var node; - - // Don't reply to data we don't have it in memory. - // TODO: Add localStorage? - if (!(node = graph[lex['#']])) { - return; - } - - cb({ - body: { - '#': server.msg(), - '@': body['#'], - '$': node, - }, - }); - }; - - server.put = function (req, cb) { - var body = req.body, graph = body.$; - var __ = gun._.root._; - - // filter out what we don't have in memory. - if (!(graph = Gun.obj.map(graph, function (node, soul, map) { - if (!__.path[soul]) { - return; - } - map(soul, node); - }))) { - return; - } - gun.on('out', { - gun: gun, - opt: { - websocket: false, - }, - put: graph, - '#': Gun.on.ask(function (ack, ev) { - if (!ack) { - return undefined; - } - ev.off(); - return cb({ - body: { - '#': server.msg(), - '@': body['#'], - '$': ack, - '!': ack.err, - }, - }); - }), - }); - }; - - server.msg = function (id) { - if (!id) { - id = Gun.text.random(9); - server.msg.debounce[id] = Gun.time.is(); - return id; - } - - clearTimeout(server.msg.clear); - server.msg.clear = setTimeout(function () { - var now = Gun.time.is(); - Gun.obj.map(server.msg.debounce, function (time, id) { - if ((now - time) < (1000 * 60 * 5)) { - return; - } - - Gun.obj.del(server.msg.debounce, id); - }); - }, 500); - - if (server.msg.debounce[id]) { - server.msg.debounce[id] = Gun.time.is(); - return id; - } - - server.msg.debounce[id] = Gun.time.is(); - return undefined; - }; - - server.msg.debounce = server.msg.debounce || {}; }); diff --git a/lib/wsp/duplicate.js b/lib/wsp/duplicate.js new file mode 100644 index 00000000..9080fee7 --- /dev/null +++ b/lib/wsp/duplicate.js @@ -0,0 +1,90 @@ +'use strict'; + +var Gun = require('../../gun'); + +var cache = {}; +var timeout = null; + +/** + * Remove all entries in the cache older than 5 minutes. + * Reschedules itself to run again when the oldest item + * might be too old. + * @return {undefined} + */ +function gc () { + var now = Date.now(); + var oldest = now; + var maxAge = 5 * 60 * 1000; + + Gun.obj.map(cache, function (time, id) { + oldest = Math.min(now, time); + + if ((now - time) < maxAge) { + return; + } + + delete cache[id]; + }); + + var done = Gun.obj.empty(cache); + + // Disengage GC. + if (done) { + timeout = null; + return; + } + + // Just how old? + var elapsed = now - oldest; + + // How long before it's too old? + var nextGC = maxAge - elapsed; + + // Schedule the next GC event. + timeout = setTimeout(gc, nextGC); +} + +/** + * Checks a memory-efficient cache to see if a string has been seen before. + * @param {String} id - A string to keep track of. + * @return {Boolean} - Whether it's been seen recently. + */ +function duplicate (id) { + + // Have we seen this ID recently? + var existing = cache.hasOwnProperty(id); + + // Add it to the cache. + duplicate.track(id); + + return existing; +} + +/** + * Starts tracking an ID as a possible future duplicate. + * @param {String} id - The ID to track. + * @return {String} - The same ID. + */ +duplicate.track = function (id) { + cache[id] = Date.now(); + + // Engage GC. + if (!timeout) { + gc(); + } + + return id; +}; + +/** + * Generate a new ID and start tracking it. + * @param {Number} [chars] - The number of characters to use. + * @return {String} - The newly created ID. + */ +duplicate.track.newID = function (chars) { + var id = Gun.text.random(chars); + + return duplicate.track(id); +}; + +module.exports = duplicate; From 6a63b46d2bb2944177d239036c021d41ed313b5b Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Fri, 18 Nov 2016 12:49:58 -0700 Subject: [PATCH 16/24] Add max reconnect backoff Sockets will try to reconnect upon disconnection with an exponentially rising backoff (configurable). However, if it's unbounded, at a certain point does it need to even try? Probably not. Instead there's now a `max` option that defaults to a minute, and the backoff will never exceed that time. --- lib/wsp/Peer.js | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/wsp/Peer.js b/lib/wsp/Peer.js index 35b04f28..493ba8e7 100644 --- a/lib/wsp/Peer.js +++ b/lib/wsp/Peer.js @@ -10,6 +10,7 @@ var util = require('util'); * @param {Object} [options] - Override the default settings. * @param {Object} options.time=50 - Initial backoff time. * @param {Object} options.factor=2 - How much to multiply the time by. + * @param {Object} options.max=1min - Maximum backoff time. * @class */ function Backoff (options) { @@ -24,7 +25,14 @@ function Backoff (options) { * @return {Number} - The next backoff time. */ Backoff.prototype.next = function () { - this.time *= this.factor; + var next = this.time * this.factor; + + if (next > this.max) { + this.time = this.max; + return this.max; + } + + this.time = next; return this.time; }; @@ -38,6 +46,7 @@ Backoff.prototype.reset = function () { this.time = options.time || 50; this.factor = options.factor || 2; + this.max = options.max || 1 * 60 * 1000; return this; }; From c8e919f287d738334a208de63fde90668bad5f02 Mon Sep 17 00:00:00 2001 From: Mark Nadal Date: Tue, 22 Nov 2016 18:07:49 -0800 Subject: [PATCH 17/24] fix chain --- gun.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gun.js b/gun.js index 5e6fe139..f248568c 100644 --- a/gun.js +++ b/gun.js @@ -845,6 +845,7 @@ ;require(function(module){ function Gun(o){ + if(o instanceof Gun){ return this } if(!(this instanceof Gun)){ return Gun.create(o) } this._ = {gun: this}; } @@ -892,7 +893,7 @@ */ (Gun.chain = Gun.prototype).chain = function(){ var chain = new this.constructor(), _; - _ = chain._ || (chain._ = {}); + _ = chain._ || (chain._ = {gun: chain}); _.root = this._.root; _.back = this; return chain; From 9d9dea25533123e850e4789e4edf9c5deda897a7 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Fri, 25 Nov 2016 09:49:48 -0700 Subject: [PATCH 18/24] Add envelope plugin system @amark made these changes, I'm just committing them. --- gun.js | 151 ++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 106 insertions(+), 45 deletions(-) diff --git a/gun.js b/gun.js index f248568c..ca8cb142 100644 --- a/gun.js +++ b/gun.js @@ -1,4 +1,5 @@ -//console.log("!!!!!!!!!!!!!!!! WARNING THIS IS GUN 0.5 !!!!!!!!!!!!!!!!!!!!!!"); +/* eslint-disable */ +//console.log("!!!!!!!!!!!!!!!! WARNING THIS IS GUN 0.5 !!!!!!!!!!!!!!!!!!!!!!"); ;(function(){ /* UNBUILD */ @@ -73,7 +74,7 @@ Type.list.map = function(l, c, _){ return obj_map(l, c, _) } Type.list.index = 1; // change this to 0 if you want non-logical, non-mathematical, non-matrix, non-convenient array notation Type.obj = {is: function(o){ return o? (o instanceof Object && o.constructor === Object) || Object.prototype.toString.call(o).match(/^\[object (\w+)\]$/)[1] === 'Object' : false }} - Type.obj.put = function(o, f, v){ return (o||{})[f] = v, o } + Type.obj.put = function(o, f, v){ return (o||{})[f] = v, o } Type.obj.has = function(o, f){ return o && Object.prototype.hasOwnProperty.call(o, f) } Type.obj.del = function(o, k){ if(!o){ return } @@ -163,7 +164,7 @@ var obj = Type.obj, obj_is = obj.is, obj_has = obj.has, obj_map = obj.map; module.exports = Type; })(require, './type'); - + ;require(function(module){ // On event emitter generic javascript utility. function Scope(){ @@ -295,7 +296,7 @@ ;require(function(module){ var On = require('./on'); - + function Chain(create, opt){ opt = opt || {}; opt.id = opt.id || '#'; @@ -312,7 +313,7 @@ return; } if(at.stun === stun){ - delete at.stun; + delete at.stun; } off = true; var i = 0, q = res.queue, l = q.length, c, v; @@ -476,11 +477,11 @@ } if(incomingState < currentState){ return {historical: true}; // the incoming value is within the boundary of the machine's state, but not within the range. - + } if(currentState < incomingState){ return {converge: true, incoming: true}; // the incoming value is within both the boundary and the range of the machine's state. - + } if(incomingState === currentState){ if(incomingValue === currentValue){ // Note: while these are practically the same, the deltas could be technically different @@ -620,7 +621,7 @@ if(o.node){ o.node[f] = tmp } return; } - if(Val.is(v)){ + if(Val.is(v)){ o.node[f] = v; } } @@ -705,8 +706,8 @@ } function map(n, s){ // we invert this because the way we check for this is via a negation. if(!n || s !== Node.soul(n) || !Node.is(n, this.fn)){ return true } // it is true that this is an invalid graph. - if(!fn_is(this.cb)){ return } - nf.n = n; nf.as = this.as; + if(!fn_is(this.cb)){ return } + nf.n = n; nf.as = this.as; this.cb.call(nf.as, n, s, nf); } }()); @@ -715,7 +716,7 @@ var at = {path: [], obj: obj}; if(!env){ env = {}; - } else + } else if(typeof env === 'string'){ env = {soul: env}; } else @@ -885,7 +886,7 @@ Gun.graph = require('./graph'); Gun.on = require('./onify')(); - + /* var opt = {chain: 'in', back: 'out', extend: 'root', id: Gun._.soul}; Gun.chain = require('./chain')(Gun, opt); @@ -930,6 +931,10 @@ } function output(at){ var cat = this, gun = cat.gun, tmp; + console.log("OUT!", Gun.obj.to(at, {gun: null})); + if(at['#']){ + dedup.track(at['#']); + } if(at.put){ cat.on('in', obj_to(at, {gun: cat.gun})); } @@ -938,7 +943,7 @@ } if(at.put){ Gun.on('put', at) } if(at.get){ get(at, cat) } - Gun.on('out', at); + Gun.on('out', at); return; if(!cat.back){ return } cat.back.on('out', at); } @@ -958,15 +963,31 @@ Gun.on('get', at); } function input(at){ var cat = this; + console.log("IN", at); + if(at['@']){ + if(!at['#']){ + at['#'] = Gun.text.random(); + dedup.track(at['#']); + cat.on('out', at); + } + } + if(at['#'] && dedup.check(at['#'])){ return } + /* if(at['@'] || at.err || u === at.put){ at.gun = at.gun || cat.gun; Gun.on.ack(at['@'], at); return; } - if(cat.graph){ - Gun.obj.map(at.put, ham, {at: at, cat: this}); // all unions must happen first, sadly. + */ + if(at.put){ + if(cat.graph){ + Gun.obj.map(at.put, ham, {at: at, cat: this}); // all unions must happen first, sadly. + } + Gun.obj.map(at.put, map, {at: at, cat: this}); + Gun.on('put', at); } - Gun.obj.map(at.put, map, {at: at, cat: this}); + if(!at.gun){ at.gun = cat.gun } + if(at.get){ Gun.on('get', at) } } function ham(data, key){ var cat = this.cat, graph = cat.graph; @@ -987,6 +1008,52 @@ via: this.at }); } + function dedup(){} + dedup.cache = {}; + dedup.track = function (id) { + dedup.cache[id] = Gun.time.is(); + // Engage GC. + if (!dedup.to) { + dedup.gc(); + } + return id; + }; + dedup.check = function(id){ + // Have we seen this ID recently? + return Gun.obj.has(dedup.cache, id); + } + dedup.gc = function(){ + var now = Gun.time.is(); + var oldest = now; + var maxAge = 5 * 60 * 1000; + // TODO: Gun.scheduler already does this? Reuse that. + Gun.obj.map(dedup.cache, function (time, id) { + oldest = Math.min(now, time); + + if ((now - time) < maxAge) { + return; + } + + delete dedup.cache[id]; + }); + + var done = Gun.obj.empty(dedup.cache); + + // Disengage GC. + if (done) { + dedup.to = null; + return; + } + + // Just how old? + var elapsed = now - oldest; + + // How long before it's too old? + var nextGC = maxAge - elapsed; + + // Schedule the next GC event. + dedup.to = setTimeout(dedup.gc, nextGC); + } }()); var text = Type.text, text_is = text.is, text_random = text.random; var list = Type.list, list_is = list.is; @@ -1020,7 +1087,7 @@ var is = state_is(node, field), cs = state_is(vertex, field); if(u === is || u === cs){ return true } // it is true that this is an invalid HAM comparison. var iv = rel_is(value) || value, cv = rel_is(vertex[field]) || vertex[field]; - + @@ -1090,7 +1157,7 @@ var obj = Gun.obj, obj_is = obj.is, obj_put = obj.put, obj_map = obj.map, obj_empty = obj.empty; var num = Gun.num, num_is = num.is; var _soul = Gun.val.rel._, _field = '.'; - + ;(function(){ var obj = {}, u; Gun.chain.Back = function(n, opt){ var tmp; if(-1 === n || Infinity === n){ @@ -1221,7 +1288,7 @@ as.batch(); } - function any(at, ev){ + function any(at, ev){ function implicit(at){ // TODO: CLEAN UP!!!!! if(!at || !at.get){ return } // TODO: CLEAN UP!!!!! as.data = obj_put({}, tmp = at.get, as.data); // TODO: CLEAN UP!!!!! @@ -1232,9 +1299,9 @@ implicit(at); // TODO: CLEAN UP!!!!! } // TODO: CLEAN UP!!!!! var as = this; - if(at.err){ + if(at.err){ console.log("Please report this as an issue! Put.any.err"); - return + return } var cat = as.ref._, data = at.put, opt = as.opt, root, tmp; if(u === data){ @@ -1353,7 +1420,7 @@ var cat = back._, path = cat.path, gun = back.chain(), at = gun._; if(!path){ path = cat.path = {} } path[at.get = key] = gun; - at.stun = at.stun || cat.stun; // TODO: BUG! Clean up! This is kinda ugly. These need to be attached all the way down regardless of whether a gun chain has been cached or not for the first time. + at.stun = at.stun || cat.stun; // TODO: BUG! Clean up! This is kinda ugly. These need to be attached all the way down regardless of whether a gun chain has been cached or not for the first time. Gun.on('path', at); //gun.on('in', input, at); // For 'in' if I add my own listeners to each then I MUST do it before in gets called. If I listen globally for all incoming data instead though, regardless of individual listeners, I can transform the data there and then as well. gun.on('out', output, at); // However for output, there isn't really the global option. I must listen by adding my own listener individually BEFORE this one is ever called. @@ -1364,7 +1431,6 @@ if(!at.gun){ at.gun = gun; } - console.debug(10, 'out', cat.get, at.get); if(at.get && !at.get[_soul]){ if(typeof at.get === 'string'){ // request for soul! if(cat.ask){ @@ -1411,7 +1477,6 @@ at.gun.on('out', tmp); return; } - console.debug(7, 'out', cat.get, at.get, cat.ask); cat.back.on('out', { gun: cat.gun, get: cat.get @@ -1432,14 +1497,12 @@ tmp['#'] = Gun.on.ask(ack, tmp); cat.back.on('out', tmp); } else { - console.debug(6, 'out', cat.get); cat.back.on('out', { gun: cat.gun, get: cat.get }); } } - console.debug(9, 'out', cat.get); if(cat.stun && cat.stun(at)){ return } gun.on('in', at.get, at); return; @@ -1453,7 +1516,6 @@ console.log("Please report this as an issue! In.err"); // TODO: BUG! return; } - console.debug(10, 'input', at, cat.get); if(value.call(cat, at, ev)){ return; } @@ -1474,7 +1536,6 @@ return true; } if(!cat.link && Gun.node.soul(put) && (rel = Gun.node.soul(at.put))){ - console.debug(11, 'value', put); ask(cat, rel); return false; } @@ -1577,7 +1638,6 @@ if(!any){ return this } var chain = this, cat = chain._, opt = opt || {}, last = {};//function(){}; if(opt.change){ opt.change = 1 } - console.debug(5, 'any'); chain.on('out', {get: function(at, ev){ //console.log("any!", at); if(!at.gun){ console.log('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%EXPLODE%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%', at) } @@ -1590,7 +1650,7 @@ at = obj_to(at, {put: data = cat.change = cat.put = Gun.state.ify(Gun.node.ify({}, tmp))}); } // TODO: BUG! Need to use at.put > cat.put for merged cache? - if(tmp = opt.change){ // TODO: BUG! Opt is outer scope, gun/cat/data might be iterative and thus only inner scope? Aka, we can't use it for all of them. + if(tmp = opt.change){ // TODO: BUG! Opt is outer scope, gun/cat/data might be iterative and thus only inner scope? Aka, we can't use it for all of them. if(1 === tmp){ opt.change = true; } else { @@ -1603,11 +1663,11 @@ if(last[id] == data && obj_has(last, id)){ return } last[id] = data; // TODO: PERF! Memory optimizaiton? Can we avoid this. */ - + if(last.put === data && last.get === id){ return } last.get = id; last.put = data; - + cat.last = data; if(opt.as){ any.call(opt.as, at, ev); @@ -1838,8 +1898,6 @@ gun = back; var i = 0, l = field.length; for(i; i < l; i++){ - console.debug(3, 'path', field[i]); - console.debug(2, 'path', field[i]); gun = gun.get(field[i], (i+1 === l)? cb : null, opt); } gun.back = back; // TODO: API change! @@ -1893,7 +1951,7 @@ } //if(obj_empty(value, Gun._.meta) && !(opt && opt.empty)){ // TODO: PERF! Deprecate!??? - + //} else { //console.log("value", value); //if(!(value||empty)['#']/* || !val_rel_is(value)*/){ // TODO: Performance hit!???? // TODO: BUG! WE should avoid this. So that way it is usable with gun plugin chains. @@ -1912,7 +1970,6 @@ if(cb){ (opt = opt || {}).ok = cb; opt.cat = at; - console.debug(4, 'val', at); gun.any(val, {as: opt}); opt.async = true; } @@ -1986,18 +2043,15 @@ var list = (cat = chain._).list = cat.list || {}; (ons[ons.length] = chain.on('in')).map = {}; ons[ons.length] = chain.on('out', function(at){ - console.debug(8, 'map out', at); if(at.get instanceof Function){ ons[ons.length] = chain.on('in', at.get, at); return; } else { - console.debug(9, 'map out', at); ons[ons.length] = chain.on('in', gun.get.input, at.gun._); } }); if(opt !== false){ ons[ons.length] = gun.on(map, {change: true, as: cat}); - console.debug(1, 'map'); } } if(cb){ @@ -2056,7 +2110,7 @@ ;require(function(module){ if(typeof JSON === 'undefined'){ throw new Error("Include JSON first: ajax.cdnjs.com/ajax/libs/json2/20110223/json2.js") } // for old IE use if(typeof Gun === 'undefined'){ return } // TODO: localStorage is Browser only. But it would be nice if it could somehow plugin into NodeJS compatible localStorage APIs? - + var root, noop = function(){}; if(typeof window !== 'undefined'){ root = window } var store = root.localStorage || {setItem: noop, removeItem: noop, getItem: noop}; @@ -2086,7 +2140,7 @@ Gun.on('put', put); Gun.on('get', get); })(require, './adapters/localStorage'); - + ;require(function(module){ function r(base, body, cb, opt){ var o = base.length? {base: base} : {}; @@ -2128,6 +2182,7 @@ } if(!ws.readyState){ return setTimeout(function(){ r.ws(opt, cb, req) },100), true } ws.sending = true; + console.log("websocket out", req); ws.send(JSON.stringify(req)); return true; } @@ -2269,7 +2324,7 @@ ;require(function(module){ if(typeof JSON === 'undefined'){ throw new Error("Include JSON first: ajax.cdnjs.com/ajax/libs/json2/20110223/json2.js") } // for old IE use if(typeof Gun === 'undefined'){ return } // TODO: window.Websocket is Browser only. But it would be nice if it could somehow merge it with lib/WSP? - + var root, noop = function(){}; if(typeof window !== 'undefined'){ root = window } @@ -2285,10 +2340,13 @@ //},100); return; } + var msg = at; + /* var msg = { '#': at['#'] || Gun.text.random(9), // msg ID '$': at.get // msg BODY }; + */ Tab.on(msg['#'], function(err, data){ // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? if(data){ at.gun.Back(-1).on('out', {'@': at['#'], err: err, put: data}); @@ -2324,6 +2382,9 @@ Tab.peers.request.createServer(function(req, res){ if(!req || !res || !req.body || !req.headers){ return } var msg = req.body; + console.log("SERVER", req); + gun.on('in', req.body); + return; // AUTH for non-replies. if(server.msg(msg['#'])){ return } //server.on('network', Gun.obj.copy(req)); // Unless we have WebRTC, not needed. @@ -2331,7 +2392,7 @@ if(Tab.ons[tmp = msg['@'] || msg['#']]){ Tab.on(tmp, [msg['!'], msg['$']]); } - return + return } if(msg['$'] && msg['$']['#']){ return server.get(req, res) } else { return server.put(req, res) } @@ -2376,12 +2437,12 @@ Gun.obj.del(server.msg.debounce, id); }); },500); - if(server.msg.debounce[id]){ + if(server.msg.debounce[id]){ return server.msg.debounce[id] = Gun.time.is(), id; } server.msg.debounce[id] = Gun.time.is(); return; - }; + }; server.msg.debounce = server.msg.debounce || {}; }); From 484849353059069b3fe66755db723c74eb6dd589 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Fri, 25 Nov 2016 14:38:03 -0700 Subject: [PATCH 19/24] Add envelope-system server sync Using gun's new envelope system (where routing and de-duplication happen inside gun core), server-to-server synchronization has been implemented. However, it comes with this warning: the chain isn't yet ready and you'll have difficulty reading or writing data via the chain. --- gun.js | 39 +++--- lib/wsp/client.js | 262 ++--------------------------------------- lib/wsp/server-push.js | 99 ++-------------- lib/wsp/server.js | 253 ++++++++++++++++++++++----------------- 4 files changed, 188 insertions(+), 465 deletions(-) diff --git a/gun.js b/gun.js index ca8cb142..eb7382a0 100644 --- a/gun.js +++ b/gun.js @@ -1,4 +1,5 @@ /* eslint-disable */ +/* eslint-enable no-console */ //console.log("!!!!!!!!!!!!!!!! WARNING THIS IS GUN 0.5 !!!!!!!!!!!!!!!!!!!!!!"); ;(function(){ @@ -929,21 +930,26 @@ gun.on('in', input, at); gun.on('out', output, at); } - function output(at){ - var cat = this, gun = cat.gun, tmp; - console.log("OUT!", Gun.obj.to(at, {gun: null})); - if(at['#']){ - dedup.track(at['#']); + function output(at){ + var cat = this, gun = cat.gun, tmp; + if(at['#']){ + dedup.track(at['#']); } - if(at.put){ + if(at.put){ cat.on('in', obj_to(at, {gun: cat.gun})); } if(!at.gun){ at = Gun.obj.to(at, {gun: gun}); } - if(at.put){ Gun.on('put', at) } - if(at.get){ get(at, cat) } - Gun.on('out', at); return; + if(at.put){ Gun.on('put', at) } + if(at.get){ get(at, cat) } + + // Reads and writes both trigger output. + if (at.put !== undefined || at.get !== undefined) { + Gun.on('out', at); + } + // Gun.on('out', at); + return; if(!cat.back){ return } cat.back.on('out', at); } @@ -963,8 +969,7 @@ Gun.on('get', at); } function input(at){ var cat = this; - console.log("IN", at); - if(at['@']){ + if(at['@']){ if(!at['#']){ at['#'] = Gun.text.random(); dedup.track(at['#']); @@ -979,15 +984,15 @@ return; } */ - if(at.put){ + if(!at.gun){ at.gun = cat.gun } + if(at.put){ if(cat.graph){ Gun.obj.map(at.put, ham, {at: at, cat: this}); // all unions must happen first, sadly. } Gun.obj.map(at.put, map, {at: at, cat: this}); Gun.on('put', at); } - if(!at.gun){ at.gun = cat.gun } - if(at.get){ Gun.on('get', at) } + if(at.get){ Gun.on('get', at) } } function ham(data, key){ var cat = this.cat, graph = cat.graph; @@ -2182,8 +2187,7 @@ } if(!ws.readyState){ return setTimeout(function(){ r.ws(opt, cb, req) },100), true } ws.sending = true; - console.log("websocket out", req); - ws.send(JSON.stringify(req)); + ws.send(JSON.stringify(req)); return true; } if(ws === false){ return } @@ -2382,8 +2386,7 @@ Tab.peers.request.createServer(function(req, res){ if(!req || !res || !req.body || !req.headers){ return } var msg = req.body; - console.log("SERVER", req); - gun.on('in', req.body); + gun.on('in', req.body); return; // AUTH for non-replies. if(server.msg(msg['#'])){ return } diff --git a/lib/wsp/client.js b/lib/wsp/client.js index db8acfea..d1c1082a 100644 --- a/lib/wsp/client.js +++ b/lib/wsp/client.js @@ -8,27 +8,11 @@ var Gun = require('../../gun'); var Socket = require('./Peer'); var Pool = require('./Pool'); -var duplicate = require('./duplicate'); // Maps URLs to sockets. // Shared between all gun instances. var sockets = Pool(); -var emitter = { on: Gun.on }; -var server = { - - // Session id. - sid: Gun.text.random(), - - // Request handlers. - handlers: [], - - // Call handlers. - handle: function (req, res) { - server.handlers.forEach(function (server) { - server(req, res); - }); - }, -}; +var sid = Gun.text.random(); /** * Take a map of URLs pointing to options and ensure the @@ -61,108 +45,20 @@ function getSocketSubset (peers) { }); } -// Handle read requests. -Gun.on('get', function (at) { - var gun = at.gun; - var opt = at.opt || {}; +Gun.on('out', function (ctx) { + var gun = ctx.gun; + var opt = ctx.opt || {}; var peers = opt.peers || gun.Back('opt.peers'); - if (!peers || Gun.obj.empty(peers)) { - at.gun.Back(Infinity).on('in', { - '@': at['#'], - }); - + if (!peers) { return; } - var id = at['#'] || Gun.text.random(9); - - // Create a new message. - var msg = { - - // msg ID - '#': id, - - // msg BODY - '$': at.get, - }; - - // Listen for a response. - // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? - emitter.on(id, function (err, data) { - var obj = { - '@': at['#'], - err: err, - put: data, - }; - - if (data) { - at.gun.Back(-1).on('out', obj); - } else { - at.gun.Back(-1).on('in', obj); - } - }); - - var subset = getSocketSubset(peers); - - // Broadcast to the connected peers. - subset.send({ - headers: { 'gun-sid': server.sid }, - body: msg, - }); -}); - -// Handle write requests. -Gun.on('put', function (at) { - if (at['@']) { - return; - } - - var peers = at.gun.Back('opt.peers'); - var enabled = at.gun.Back('opt.websocket'); - var options = at.opt || {}; - - if (!peers || Gun.obj.empty(peers)) { - - // TODO: What about wsp/server clients? Maybe we shouldn't - // immediately assume there's no data to be found. - at.gun.Back(-1).on('in', { - '@': at['#'], - }); - - return; - } - - if (options.websocket === false || enabled === false) { - return; - } - - var id = at['#'] || Gun.text.random(9); - - var msg = { - - // Message ID. - '#': id, - - // Message body. - '$': at.put, - }; - - // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? - // Listen for acknowledgement(s). - Gun.on(id, function (err, ok) { - at.gun.Back(-1).on('in', { - '@': at['#'], - err: err, - ok: ok, - }); - }); - var subset = getSocketSubset(peers); subset.send({ - headers: { 'gun-sid': server.sid }, - body: msg, + headers: { 'gun-sid': sid }, + body: ctx, }); }); @@ -170,6 +66,7 @@ Gun.on('put', function (at) { // adding them to the global pool. Gun.on('opt', function (context) { var gun = context.gun; + var root = gun.Back(Infinity); var peers = gun.Back('opt.peers') || {}; @@ -181,22 +78,6 @@ Gun.on('opt', function (context) { var socket = Socket(url, options); sockets.add(url, socket); - /** - * Handle responses to requests, adding default headers. - * @param {Object} reply - A gun reply object. - * @return {undefined} - */ - function respond (reply) { - - // Make sure headers are defined. - var headers = reply.headers = reply.headers || {}; - - // Add 'gun-sid' if it doesn't exist. - headers['gun-sid'] = headers['gun-sid'] || server.sid; - - socket.send(reply); - } - socket.on('message', function (msg) { var request; @@ -211,132 +92,7 @@ Gun.on('opt', function (context) { return; } - request.headers = request.headers || {}; - - // emit extra events. - server.handle(request, respond); + root.on('in', request.body); }); }); }); - -Gun.on('opt', function (at) { - var gun = at.gun; - var root = gun.Back(Infinity); - var options = (root._.opt = root._.opt || {}); - - // Only register once per gun instance. - if (options['@client']) { - return; - } - - var driver = options['@client'] = { - - /** - * Handles get requests sent from other peers. - * @param {Object} req - The get request. - * @param {Function} cb - Handles replies. - * @return {undefined} - */ - get: function (req, cb) { - var body = req.body; - var lex = body.$; - var graph = gun._.root._.graph; - var node = graph[lex['#']]; - - // TODO: Reply even if it's not in memory. - if (!node) { - return; - } - - cb({ - body: { - '#': duplicate.track.newID(), - '@': body['#'], - '$': node, - }, - }); - }, - - /** - * Handles put requests sent from other peers. - * @param {Object} req - The put request. - * @param {Function} cb - A response callback. - * @return {undefined} - */ - put: function (req, cb) { - var body = req.body; - var graph = body.$; - - // Cached gun paths. - var path = gun._.root._.path || {}; - - graph = Gun.obj.map(graph, function (node, soul, map) { - if (!path[soul]) { - return; - } - map(soul, node); - }); - - // filter out what we don't have in memory. - if (!graph) { - return; - } - - var id = Gun.on.ask(function (ack, event) { - if (!ack) { - return; - } - - event.off(); - - cb({ - body: { - '#': duplicate.track.newID(), - '@': body['#'], - '$': ack, - '!': ack.err, - }, - }); - }); - - gun.on('out', { - '#': duplicate.track(id), - gun: gun, - opt: { websocket: false }, - put: graph, - }); - }, - }; - - server.handlers.push(function (req, res) { - - // Validate request. - if (!req || !res || !req.body || !req.headers) { - return; - } - - var msg = req.body; - - if (duplicate(msg['#'])) { - return; - } - - // It's a response, no need to reply. - if (msg['@']) { - var reqID = msg['@']; - - emitter.on(reqID, [ - msg['!'], - msg.$, - ]); - - return; - } - - var isLex = msg.$ && msg.$['#']; - var method = isLex ? 'get' : 'put'; - - driver[method](req, res); - }); - -}); diff --git a/lib/wsp/server-push.js b/lib/wsp/server-push.js index 55093ff6..5cd20470 100644 --- a/lib/wsp/server-push.js +++ b/lib/wsp/server-push.js @@ -38,68 +38,20 @@ function ready (socket, cb) { } /** - * Send a request to a list of clients. - * @param {Obejct} context - A gun request context. + * Send a message to a group of clients. + * @param {Obejct} msg - An http envelope-like message. * @param {Object} clients - IDs mapped to socket instances. - * @param {Function} cb - Called for each response. * @return {undefined} */ -function request (context, clients, cb) { - var id = context['#'] || Gun.text.random(9); - - Gun.on(id, function (err, data, event) { - cb(err, data); - event.off(); - }); - +function send (msg, clients) { Gun.obj.map(clients, function (client) { ready(client, function () { - var msg = { - headers: {}, - body: { - '#': id, - '$': context.get, - }, - }; - var serialized = JSON.stringify(msg); client.send(serialized); }); }); } -/** - * Pushes a graph update to a collection of clients. - * @param {Object} context - The context object passed by gun. - * @param {Object} clients - An object mapping URLs to clients. - * @param {Function} cb - Invoked on each client response. - * @return {undefined} - */ -function update (context, clients, cb) { - var id = context['#'] || Gun.text.random(9); - - Gun.on(id, function (err, data, event) { - cb(err, data); - event.off(); - }); - - Gun.obj.map(clients, function (client) { - ready(client, function () { - var msg = { - headers: {}, - body: { - '#': id, - '$': context.put, - }, - }; - - var serialized = JSON.stringify(msg); - - client.send(serialized); - }); - }); -} - /** * Attaches server push middleware to gun. * @param {Gun} gun - The gun instance to attach to. * @param {WebSocket.Server} server - A websocket server instance. @@ -110,6 +62,7 @@ function attach (gun, server) { root._.servers = root._.servers || []; root._.servers.push(server); var pool = {}; + var sid = Gun.text.random(); server.on('connection', function (socket) { socket.id = socket.id || Gun.text.random(10); @@ -122,12 +75,7 @@ function attach (gun, server) { return; } - var msg = data.body; - - if (msg['@']) { - Gun.on.ack(msg['@'], [msg['!'], msg.$]); - return; - } + root.on('in', data.body); }); socket.once('close', function () { @@ -135,40 +83,17 @@ function attach (gun, server) { }); }); - Gun.on('get', function (context) { - if (!isUsingServer(context.gun, server)) { + Gun.on('out', function (context) { + if (!isUsingServer(context.gun, server) || Gun.obj.empty(pool)) { return; } - request(context, pool, function (err, data) { - var response = { - '@': context['#'], - put: data, - err: err, - }; + var msg = { + headers: { 'gun-sid': sid }, + body: context, + }; - var root = context.gun.Back(Infinity); - - root.on(data ? 'out' : 'in', response); - }); - }); - - Gun.on('put', function (context) { - if (!isUsingServer(context.gun, server)) { - return; - } - - if (context.nopush) { - return; - } - - update(context, pool, function (err, data) { - var ack = { - '!': err || null, - '$': data.$, - }; - Gun.on.ack(context, ack); - }); + send(msg, pool); }); } diff --git a/lib/wsp/server.js b/lib/wsp/server.js index c3fa0385..9c486f64 100644 --- a/lib/wsp/server.js +++ b/lib/wsp/server.js @@ -1,22 +1,24 @@ -var Gun = require('../../gun') -, formidable = require('formidable') -, http = require('../http') -, url = require('url') -, wsp = {} -, WS = require('ws') -, WSS = WS.Server -, attach = require('./server-push'); +/* eslint-disable require-jsdoc, no-underscore-dangle */ +'use strict'; +var Gun = require('../../gun'); +var http = require('../http'); +var url = require('url'); +var WS = require('ws'); +var WSS = WS.Server; +var attach = require('./server-push'); // Handles server to server sync. require('./client.js'); -Gun.on('opt', function(at){ +Gun.on('opt', function (at) { var gun = at.gun, opt = at.opt; gun.__ = at.root._; gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {}; - function start(server, port, app){ - if(app && app.use){ app.use(gun.wsp.server) } + function start (server, port, app) { + if (app && app.use) { + app.use(gun.wsp.server); + } server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server; if (!gun.wsp.ws) { @@ -25,141 +27,155 @@ Gun.on('opt', function(at){ } gun.wsp.ws = gun.wsp.ws || new WSS(gun.__.opt.ws); - require('./ws')(gun.wsp.ws, function(req, res){ + require('./ws')(gun.wsp.ws, function (req, res) { var ws = this; - req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid']; - ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){ - if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() } - if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return } - if(msg && msg.headers){ delete msg.headers['ws-rid'] } + req.headers['gun-sid'] = ws.sid = ws.sid ? ws.sid : req.headers['gun-sid']; + ws.sub = ws.sub || gun.wsp.on('network', function (msg, ev) { + if (!ws || !ws.send || !ws._socket || !ws._socket.writable) { return ev.off(); } + if (!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)) { return; } + if (msg && msg.headers) { delete msg.headers['ws-rid']; } // TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id? - try{ws.send(Gun.text.ify(msg)); - }catch(e){} // juuuust in case. + try { ws.send(Gun.text.ify(msg)); + } catch (e) {} // juuuust in case. }); gun.wsp.wire(req, res); }, {headers: {'ws-rid': 1, 'gun-sid': 1}}); gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80; } - var wsp = gun.wsp = gun.wsp || function(server){ - if(!server){ return gun } - if(Gun.fns.is(server.address)){ - if(server.address()){ + var wsp = gun.wsp = gun.wsp || function (server) { + if (!server) { return gun; } + if (Gun.fns.is(server.address)) { + if (server.address()) { start(server, server.address().port); return gun; } } - if(Gun.fns.is(server.get) && server.get('port')){ + if (Gun.fns.is(server.get) && server.get('port')) { start(server, server.get('port')); return gun; } var listen = server.listen; - server.listen = function(port){ + server.listen = function (port) { var serve = listen.apply(server, arguments); start(serve, port, server); return serve; - } + }; return gun; - } + }; gun.wsp.on = gun.wsp.on || Gun.on; gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i; gun.wsp.poll = gun.wsp.poll || opt.poll || 1; gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000; - gun.wsp.server = gun.wsp.server || function(req, res, next){ // http - next = next || function(){}; - if(!req || !res){ return next(), false } - if(!req.url){ return next(), false } - if(!req.method){ return next(), false } + gun.wsp.server = gun.wsp.server || function (req, res, next) { // http + next = next || function () {}; + if (!req || !res) { return next(), false; } + if (!req.url) { return next(), false; } + if (!req.method) { return next(), false; } var msg = {}; msg.url = url.parse(req.url, true); - if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail! - if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){ + if (!gun.wsp.regex.test(msg.url.pathname)) { return next(), false; } // TODO: BUG! If the option isn't a regex then this will fail! + if (msg.url.pathname.replace(gun.wsp.regex, '').slice(0, 3).toLowerCase() === '.js') { res.writeHead(200, {'Content-Type': 'text/javascript'}); res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../../gun.js')); // gun server is caching the gun library for the client return true; } - if(!req.upgrade){ + if (!req.upgrade) { next(); return false; } - return http(req, res, function(req, res){ - if(!req){ return next() } + return http(req, res, function (req, res) { + if (!req) { return next(); } var stream, cb = res = require('../jsonp')(req, res); - if(req.headers && (stream = req.headers['gun-sid'])){ + if (req.headers && (stream = req.headers['gun-sid'])) { stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream}; - stream.drain = stream.drain || function(res){ - if(!res || !stream || !stream.queue || !stream.queue.length){ return } + stream.drain = stream.drain || function (res) { + if (!res || !stream || !stream.queue || !stream.queue.length) { return; } res({headers: {'gun-sid': stream.sid}, body: stream.queue }); - stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull); + stream.off = setTimeout(function () { stream = null; }, gun.wsp.pull); stream.reply = stream.queue = null; return true; - } - stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){ - if(!stream){ return ev.off() } // self cleans up after itself! - if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return } + }; + stream.sub = stream.sub || gun.wsp.on('network', function (req, ev) { + if (!stream) { return ev.off(); } // self cleans up after itself! + if (!req || (req.headers && req.headers['gun-sid'] === stream.sid)) { return; } (stream.queue = stream.queue || []).push(req); stream.drain(stream.reply); }); - cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) } + cb = function (r) { (r.headers || {}).poll = gun.wsp.poll; res(r); }; clearTimeout(stream.off); - if(req.headers.pull){ - if(stream.drain(cb)){ return } + if (req.headers.pull) { + if (stream.drain(cb)) { return; } return stream.reply = cb; } } gun.wsp.wire(req, cb); }), true; - } - if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){ + }; + if ((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false) { require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity; } - gun.wsp.msg = gun.wsp.msg || function(id){ - if(!id){ + gun.wsp.msg = gun.wsp.msg || function (id) { + if (!id) { return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id; } clearTimeout(gun.wsp.msg.clear); - gun.wsp.msg.clear = setTimeout(function(){ + gun.wsp.msg.clear = setTimeout(function () { var now = Gun.time.is(); - Gun.obj.map(gun.wsp.msg.debounce, function(t,id){ - if((now - t) < (1000 * 60 * 5)){ return } + Gun.obj.map(gun.wsp.msg.debounce, function (t, id) { + if ((now - t) < (1000 * 60 * 5)) { return; } Gun.obj.del(gun.wsp.msg.debounce, id); }); - },500); - if(id = gun.wsp.msg.debounce[id]){ + }, 500); + if (id = gun.wsp.msg.debounce[id]) { return gun.wsp.msg.debounce[id] = Gun.time.is(), id; } gun.wsp.msg.debounce[id] = Gun.time.is(); return; }; gun.wsp.msg.debounce = gun.wsp.msg.debounce || {}; - gun.wsp.wire = gun.wsp.wire || (function(){ - // all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers - // except for the ones that are listed in the message as having already been sending to. - // all states, implemented with GET, are replied to the source that asked for it. - function tran(req, res){ - if(!req || !res || !req.body || !req.headers){ return } - if(req.url){ req.url = url.format(req.url) } - var msg = req.body; - // AUTH for non-replies. - if(gun.wsp.msg(msg['#'])){ return } - gun.wsp.on('network', Gun.obj.copy(req)); - if(msg['@']){ return } // no need to process. - if(msg['$'] && msg['$']['#']){ return tran.get(req, res) } - //if(Gun.is.lex(msg['$'])){ return tran.get(req, res) } - else { return tran.put(req, res) } - cb({body: {hello: 'world'}}); - // TODO: BUG! server put should push. + gun.wsp.wire = gun.wsp.wire || (function () { + // all streams, technically PATCH but implemented as + // PUT or POST, are forwarded to other trusted peers + // except for the ones that are listed in the message + // as having already been sent to. + // all states, implemented with GET, are replied to the + // source that asked for it. + function tran (req, res) { + if (!req || !res || !req.body || !req.headers) { + return; + } + if (req.url) { + req.url = url.format(req.url); + } + // var msg = req.body; + // console.log('SERVER', req); + gun.on('in', req.body); + // console.log('-----------------'); + // // AUTH for non-replies. + // if(gun.wsp.msg(msg['#'])){ return } + // gun.wsp.on('network', Gun.obj.copy(req)); + // if(msg['@']){ return } // no need to process. + // if(msg['$'] && msg['$']['#']){ return tran.get(req, res) } + // //if(Gun.is.lex(msg['$'])){ return tran.get(req, res) } + // else { return tran.put(req, res) } + // cb({body: {hello: 'world'}}); + // // TODO: BUG! server put should push. } - tran.get = function(req, cb){ - var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}; + tran.get = function (req, cb) { + var body = req.body; + var lex = body.$; + var reply = { + headers: { 'Content-Type': tran.json }, + }; var graph = gun.Back(Infinity)._.graph; var node = graph[lex['#']]; var result = Gun.graph.ify(node); if (node) { - return cb({ + cb({ headers: reply.headers, body: { '#': gun.wsp.msg(), @@ -167,41 +183,64 @@ Gun.on('opt', function(at){ '$': result, }, }); + + return; } - gun.on('out', {gun: gun, get: lex, req: 1, '#': body['#'] || Gun.on.ask(function(at, ev){ - ev.off(); - var graph = at.put; - return cb({headers: reply.headers, body: { - '#': gun.wsp.msg(), - '@': body['#'], - '$': graph, - '!': at.err - }}); - })}); - } - tran.put = function(req, cb){ - // NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually. - // This will give you much more fine-grain control over security, transactions, and what not. - var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; - gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ - //Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ - ev.off(); - return cb({headers: reply.headers, body: { - '#': gun.wsp.msg(), - '@': body['#'], - '$': ack, - '!': ack.err - }}); - })}); - } - gun.wsp.on('network', function(rq){ - // TODO: MARK! You should move the networking events to here, not in WSS only. - }); + gun.on('out', { + gun: gun, + get: lex, + req: 1, + '#': body['#'] || Gun.on.ask(function (at, ev) { + ev.off(); + var graph = at.put; + return cb({ + headers: reply.headers, + body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': graph, + '!': at.err, + }, + }); + }), + }); + }; + + tran.put = function (req, cb) { + // NOTE: It is highly recommended you do your own PUT/POSTs + // through your own API that then saves to gun manually. + // This will give you much more fine-grain control over + // security, transactions, and what not. + var body = req.body; + var graph = body.$; + var reply = { + headers: { 'Content-Type': tran.json }, + }; + + gun.on('out', { + gun: gun, + put: graph, + '#': Gun.on.ask(function (ack, ev) { + ev.off(); + return cb({ + headers: reply.headers, + body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': ack, + '!': ack.err, + }, + }); + }), + }); + }; + tran.json = 'application/json'; return tran; }()); - if(opt.server){ + + if (opt.server) { wsp(opt.server); } }); From 7c0c7f3605d55a82bf3e2a73dab7ec1bee4c9579 Mon Sep 17 00:00:00 2001 From: hillct Date: Sun, 27 Nov 2016 13:04:37 -0500 Subject: [PATCH 20/24] Minify Gun at package publish-time --- .gitignore | 1 + package.json | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 2cbe713e..56f45cc2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ node_modules/* npm-debug.log +gun.min.js yarn.lock *data.json *.db diff --git a/package.json b/package.json index 18e8b3f1..b5969385 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "main": "index.js", "scripts": { "start": "node examples/http.js 8080", + "prepublish": "uglifyjs2 gun.js -o gun.min.js -c -m", "test": "mocha", "unbuild": "node lib/unbuild.js" }, @@ -49,9 +50,10 @@ "ws": "~>1.0.1" }, "devDependencies": { - "mocha": "~>1.9.0", "express": "~>4.13.4", + "mocha": "~>1.9.0", "panic-server": "~>0.3.0", - "selenium-webdriver": "~>2.53.2" + "selenium-webdriver": "~>2.53.2", + "uglify-js2": "^2.1.11" } } From 9abe3fb75fc975ec3fea89d14899eae42f7c06cf Mon Sep 17 00:00:00 2001 From: Mark Nadal Date: Mon, 28 Nov 2016 17:57:20 -0800 Subject: [PATCH 21/24] fix for d3x0r and Stefdv --- gun.js | 188 ++++++++++++++++++++++++----------------- lib/wsp/client.js | 2 +- lib/wsp/server-push.js | 7 +- lib/wsp/server.js | 2 - lib/wsp/ws.js | 4 +- 5 files changed, 116 insertions(+), 87 deletions(-) diff --git a/gun.js b/gun.js index eb7382a0..4e23322d 100644 --- a/gun.js +++ b/gun.js @@ -1,6 +1,6 @@ -/* eslint-disable */ -/* eslint-enable no-console */ -//console.log("!!!!!!!!!!!!!!!! WARNING THIS IS GUN 0.5 !!!!!!!!!!!!!!!!!!!!!!"); +/* eslint-disable */ +/* eslint-enable no-console */ +//console.log("!!!!!!!!!!!!!!!! WARNING THIS IS GUN 0.5 !!!!!!!!!!!!!!!!!!!!!!"); ;(function(){ /* UNBUILD */ @@ -75,7 +75,7 @@ Type.list.map = function(l, c, _){ return obj_map(l, c, _) } Type.list.index = 1; // change this to 0 if you want non-logical, non-mathematical, non-matrix, non-convenient array notation Type.obj = {is: function(o){ return o? (o instanceof Object && o.constructor === Object) || Object.prototype.toString.call(o).match(/^\[object (\w+)\]$/)[1] === 'Object' : false }} - Type.obj.put = function(o, f, v){ return (o||{})[f] = v, o } + Type.obj.put = function(o, f, v){ return (o||{})[f] = v, o } Type.obj.has = function(o, f){ return o && Object.prototype.hasOwnProperty.call(o, f) } Type.obj.del = function(o, k){ if(!o){ return } @@ -165,7 +165,7 @@ var obj = Type.obj, obj_is = obj.is, obj_has = obj.has, obj_map = obj.map; module.exports = Type; })(require, './type'); - + ;require(function(module){ // On event emitter generic javascript utility. function Scope(){ @@ -297,7 +297,7 @@ ;require(function(module){ var On = require('./on'); - + function Chain(create, opt){ opt = opt || {}; opt.id = opt.id || '#'; @@ -314,7 +314,7 @@ return; } if(at.stun === stun){ - delete at.stun; + delete at.stun; } off = true; var i = 0, q = res.queue, l = q.length, c, v; @@ -348,6 +348,7 @@ on.ack = function(at, reply){ if(!at || !reply || !ask.on){ return } var id = at[opt.id] || at; + if(!ask.ons[id]){ return } ask.on(id, reply); return true; } @@ -478,11 +479,11 @@ } if(incomingState < currentState){ return {historical: true}; // the incoming value is within the boundary of the machine's state, but not within the range. - + } if(currentState < incomingState){ return {converge: true, incoming: true}; // the incoming value is within both the boundary and the range of the machine's state. - + } if(incomingState === currentState){ if(incomingValue === currentValue){ // Note: while these are practically the same, the deltas could be technically different @@ -622,7 +623,7 @@ if(o.node){ o.node[f] = tmp } return; } - if(Val.is(v)){ + if(Val.is(v)){ o.node[f] = v; } } @@ -707,8 +708,8 @@ } function map(n, s){ // we invert this because the way we check for this is via a negation. if(!n || s !== Node.soul(n) || !Node.is(n, this.fn)){ return true } // it is true that this is an invalid graph. - if(!fn_is(this.cb)){ return } - nf.n = n; nf.as = this.as; + if(!fn_is(this.cb)){ return } + nf.n = n; nf.as = this.as; this.cb.call(nf.as, n, s, nf); } }()); @@ -717,7 +718,7 @@ var at = {path: [], obj: obj}; if(!env){ env = {}; - } else + } else if(typeof env === 'string'){ env = {soul: env}; } else @@ -887,7 +888,7 @@ Gun.graph = require('./graph'); Gun.on = require('./onify')(); - + /* var opt = {chain: 'in', back: 'out', extend: 'root', id: Gun._.soul}; Gun.chain = require('./chain')(Gun, opt); @@ -930,28 +931,28 @@ gun.on('in', input, at); gun.on('out', output, at); } - function output(at){ - var cat = this, gun = cat.gun, tmp; - if(at['#']){ - dedup.track(at['#']); - } - if(at.put){ - cat.on('in', obj_to(at, {gun: cat.gun})); + function output(at){ + var cat = this, gun = cat.gun, tmp; + // TODO: BUG! Outgoing `get` to read from in memory!!! + if(at.get && get(at, cat)){ return } + //if(at.put){ + cat.on('in', obj_to(at, {gun: cat.gun})); // TODO: PERF! input now goes to output so it would be nice to reduce the circularity here for perf purposes. + //} + if(at['#']){ + dedup.track(at['#']); } if(!at.gun){ at = Gun.obj.to(at, {gun: gun}); } - if(at.put){ Gun.on('put', at) } - if(at.get){ get(at, cat) } - - // Reads and writes both trigger output. - if (at.put !== undefined || at.get !== undefined) { - Gun.on('out', at); - } - // Gun.on('out', at); - return; - if(!cat.back){ return } - cat.back.on('out', at); + //if(at.put){ Gun.on('put', at) } + //if(at.get){ get(at, cat) } + // Reads and writes both trigger output. // that should be intended. + //if (at.put !== undefined || at.get !== undefined) { + Gun.on('out', at); + //} + // Gun.on('out', at); + //if(!cat.back){ return } + //cat.back.on('out', at); } function get(at, cat){ var soul = at.get[_soul], node = cat.graph[soul], field = at.get[_field]; @@ -961,38 +962,36 @@ node = Gun.obj.put({_: node._}, field, node[field]); } cat.on('in', { - '@': at.req? at['#'] : 0, // temporary hack + '@': at['#'], put: Gun.graph.node(node) // TODO: BUG! Clone node! }); - return; + return true; } - Gun.on('get', at); + //Gun.on('get', at); } function input(at){ var cat = this; - if(at['@']){ + if(!at.gun){ at.gun = cat.gun } + if(at['@']){ if(!at['#']){ - at['#'] = Gun.text.random(); + at['#'] = Gun.text.random(); // TODO: Use what is used other places instead. dedup.track(at['#']); cat.on('out', at); } + //if(at.err || u === at.put){ + Gun.on.ack(at['@'], at); + //return; + //} } if(at['#'] && dedup.check(at['#'])){ return } - /* - if(at['@'] || at.err || u === at.put){ - at.gun = at.gun || cat.gun; - Gun.on.ack(at['@'], at); - return; - } - */ - if(!at.gun){ at.gun = cat.gun } - if(at.put){ + if(at.put){ if(cat.graph){ Gun.obj.map(at.put, ham, {at: at, cat: this}); // all unions must happen first, sadly. } Gun.obj.map(at.put, map, {at: at, cat: this}); + if(0 === at['@']){ return } // TODO: UNCLEAN! Temporary hack for now. Gun.on('put', at); } - if(at.get){ Gun.on('get', at) } + if(at.get){ Gun.on('get', at) } } function ham(data, key){ var cat = this.cat, graph = cat.graph; @@ -1092,7 +1091,7 @@ var is = state_is(node, field), cs = state_is(vertex, field); if(u === is || u === cs){ return true } // it is true that this is an invalid HAM comparison. var iv = rel_is(value) || value, cv = rel_is(vertex[field]) || vertex[field]; - + @@ -1162,7 +1161,7 @@ var obj = Gun.obj, obj_is = obj.is, obj_put = obj.put, obj_map = obj.map, obj_empty = obj.empty; var num = Gun.num, num_is = num.is; var _soul = Gun.val.rel._, _field = '.'; - + ;(function(){ var obj = {}, u; Gun.chain.Back = function(n, opt){ var tmp; if(-1 === n || Infinity === n){ @@ -1258,10 +1257,9 @@ as.ref.on('out', { gun: as.ref, put: as.out = as.env.graph, opt: as.opt, '#': Gun.on.ask(function(ack, ev){ - if(ack && 0 === ack.ok){ return } - ev.off(); // One response is good enough for us currently. Later we may want to adjust this. + ev.off(); // One response is good enough for us currently. Later we may want to provide an option to adjust this. if(!as.opt.any){ return } - as.opt.any.call(as.opt.as || as.gun, ack.err, ack.ok); + as.opt.any.call(as.opt.as || as.gun, ack.err, ack.ok, ev); }, as.opt) }); if(as.res){ as.res() } @@ -1293,7 +1291,7 @@ as.batch(); } - function any(at, ev){ + function any(at, ev){ function implicit(at){ // TODO: CLEAN UP!!!!! if(!at || !at.get){ return } // TODO: CLEAN UP!!!!! as.data = obj_put({}, tmp = at.get, as.data); // TODO: CLEAN UP!!!!! @@ -1304,9 +1302,9 @@ implicit(at); // TODO: CLEAN UP!!!!! } // TODO: CLEAN UP!!!!! var as = this; - if(at.err){ + if(at.err){ console.log("Please report this as an issue! Put.any.err"); - return + return } var cat = as.ref._, data = at.put, opt = as.opt, root, tmp; if(u === data){ @@ -1425,7 +1423,7 @@ var cat = back._, path = cat.path, gun = back.chain(), at = gun._; if(!path){ path = cat.path = {} } path[at.get = key] = gun; - at.stun = at.stun || cat.stun; // TODO: BUG! Clean up! This is kinda ugly. These need to be attached all the way down regardless of whether a gun chain has been cached or not for the first time. + at.stun = at.stun || cat.stun; // TODO: BUG! Clean up! This is kinda ugly. These need to be attached all the way down regardless of whether a gun chain has been cached or not for the first time. Gun.on('path', at); //gun.on('in', input, at); // For 'in' if I add my own listeners to each then I MUST do it before in gets called. If I listen globally for all incoming data instead though, regardless of individual listeners, I can transform the data there and then as well. gun.on('out', output, at); // However for output, there isn't really the global option. I must listen by adding my own listener individually BEFORE this one is ever called. @@ -1655,7 +1653,7 @@ at = obj_to(at, {put: data = cat.change = cat.put = Gun.state.ify(Gun.node.ify({}, tmp))}); } // TODO: BUG! Need to use at.put > cat.put for merged cache? - if(tmp = opt.change){ // TODO: BUG! Opt is outer scope, gun/cat/data might be iterative and thus only inner scope? Aka, we can't use it for all of them. + if(tmp = opt.change){ // TODO: BUG! Opt is outer scope, gun/cat/data might be iterative and thus only inner scope? Aka, we can't use it for all of them. if(1 === tmp){ opt.change = true; } else { @@ -1668,11 +1666,11 @@ if(last[id] == data && obj_has(last, id)){ return } last[id] = data; // TODO: PERF! Memory optimizaiton? Can we avoid this. */ - + if(last.put === data && last.get === id){ return } last.get = id; last.put = data; - + cat.last = data; if(opt.as){ any.call(opt.as, at, ev); @@ -1689,6 +1687,7 @@ if(!f || obj_has(tmp[s], f)){ ev.off(); at['@'] = 0; + at['#'] = 0; return root.on('in', at); } /* @@ -1731,6 +1730,37 @@ via: at }); } + + function ackk(at, ev){ var gun = this.gun; + var cat = gun._; + if(u !== cat.change){ return ev.off() } + // TODO: PERF! Memory. If somebody `gun.off()` we should clean up these requests. + // TODO: PERF! Memory. If peers only reply with `not` (or we never get replies) these event listeners will be left hanging - even if we get push updates that the data does exist. + if(cat.root === cat.back){ + //at.gun = cat.gun; + if(at.gun === cat.gun){ return } + at = { + get: cat.get, + gun: cat.gun, + via: at, + put: at.put[cat.get] + } + + } else { + if(obj_has(at.put, cat.get)){ return ev.off() } + at = { + get: cat.get, + gun: gun, + via: at.via? at : { + get: cat.back._.get, + gun: cat.back, + via: at + } + } + } + //at.get = at.get || cat.get; + cat.on('in', at); + } var obj = Gun.obj, obj_has = obj.has, obj_to = obj.to; var empty = {}, u; var _soul = Gun._.soul, _field = Gun._.field, _sid = Gun.on.ask._, _rid = Gun.on.ack._; @@ -1956,7 +1986,7 @@ } //if(obj_empty(value, Gun._.meta) && !(opt && opt.empty)){ // TODO: PERF! Deprecate!??? - + //} else { //console.log("value", value); //if(!(value||empty)['#']/* || !val_rel_is(value)*/){ // TODO: Performance hit!???? // TODO: BUG! WE should avoid this. So that way it is usable with gun plugin chains. @@ -2115,20 +2145,22 @@ ;require(function(module){ if(typeof JSON === 'undefined'){ throw new Error("Include JSON first: ajax.cdnjs.com/ajax/libs/json2/20110223/json2.js") } // for old IE use if(typeof Gun === 'undefined'){ return } // TODO: localStorage is Browser only. But it would be nice if it could somehow plugin into NodeJS compatible localStorage APIs? - + var root, noop = function(){}; if(typeof window !== 'undefined'){ root = window } var store = root.localStorage || {setItem: noop, removeItem: noop, getItem: noop}; function put(at){ var err, id, opt, root = at.gun._.root; - (opt = at.opt || {}).prefix = opt.prefix || at.gun.Back('opt.prefix') || 'gun/'; + (opt = {}).prefix = (at.opt || opt).prefix || at.gun.Back('opt.prefix') || 'gun/'; Gun.graph.is(at.put, function(node, soul){ //try{store.setItem(opt.prefix + soul, Gun.text.ify(node)); try{store.setItem(opt.prefix + soul, Gun.text.ify(root._.graph[soul]||node)); }catch(e){ err = e || "localStorage failure" } }); //console.log('@@@@@@@@@@local put!'); - Gun.on.ack(at, {err: err, ok: 0}); // TODO: Reliability! Are we sure we want to have localStorage ack? + if(Gun.obj.empty(gun.Back('opt.peers'))){ + Gun.on.ack(at, {err: err, ok: 0}); // only ack if there are no peers. + } } function get(at){ var gun = at.gun, lex = at.get, soul, data, opt, u; @@ -2136,7 +2168,12 @@ (opt = at.opt || {}).prefix = opt.prefix || at.gun.Back('opt.prefix') || 'gun/'; if(!lex || !(soul = lex[Gun._.soul])){ return } data = Gun.obj.ify(store.getItem(opt.prefix + soul) || null); - if(!data){ return } // localStorage isn't trustworthy to say "not found". + if(!data){ // localStorage isn't trustworthy to say "not found". + if(Gun.obj.empty(gun.Back('opt.peers'))){ + gun.Back(-1).on('in', {'@': at['#']}); + } + return; + } if(Gun.obj.has(lex, '.')){var tmp = data[lex['.']];data = {_: data._};if(u !== tmp){data[lex['.']] = tmp}} //console.log('@@@@@@@@@@@@local get', data, at); gun.Back(-1).on('in', {'@': at['#'], put: Gun.graph.node(data)}); @@ -2145,7 +2182,7 @@ Gun.on('put', put); Gun.on('get', get); })(require, './adapters/localStorage'); - + ;require(function(module){ function r(base, body, cb, opt){ var o = base.length? {base: base} : {}; @@ -2187,7 +2224,7 @@ } if(!ws.readyState){ return setTimeout(function(){ r.ws(opt, cb, req) },100), true } ws.sending = true; - ws.send(JSON.stringify(req)); + ws.send(JSON.stringify(req)); return true; } if(ws === false){ return } @@ -2328,20 +2365,18 @@ ;require(function(module){ if(typeof JSON === 'undefined'){ throw new Error("Include JSON first: ajax.cdnjs.com/ajax/libs/json2/20110223/json2.js") } // for old IE use if(typeof Gun === 'undefined'){ return } // TODO: window.Websocket is Browser only. But it would be nice if it could somehow merge it with lib/WSP? - + var root, noop = function(){}; if(typeof window !== 'undefined'){ root = window } var Tab = {}; Tab.on = Gun.on;//Gun.on.create(); Tab.peers = require('../polyfill/peer'); - Gun.on('get', function(at){ + Gun.on('out', function(at){ + if(at.put){ return } // TODO: BUG! Doing this for now, to debug. However puts are handled below anyways, but it would be nice if we could switch over to this for both? var gun = at.gun, opt = at.opt || {}, peers = opt.peers || gun.Back('opt.peers'); if(!peers || Gun.obj.empty(peers)){ - //setTimeout(function(){ Gun.log.once('peers', "Warning! You have no peers to connect to!"); - at.gun.Back(-1).on('in', {'@': at['#']}); - //},100); return; } var msg = at; @@ -2365,11 +2400,10 @@ var opt = at.gun.Back('opt') || {}, peers = opt.peers; if(!peers || Gun.obj.empty(peers)){ Gun.log.once('peers', "Warning! You have no peers to save to!"); - at.gun.Back(-1).on('in', {'@': at['#']}); return; } if(false === opt.websocket || (at.opt && false === at.opt.websocket)){ return } - var msg = { + var msg = at || { '#': at['#'] || Gun.text.random(9), // msg ID '$': at.put // msg BODY }; @@ -2386,7 +2420,7 @@ Tab.peers.request.createServer(function(req, res){ if(!req || !res || !req.body || !req.headers){ return } var msg = req.body; - gun.on('in', req.body); + gun.on('in', req.body); return; // AUTH for non-replies. if(server.msg(msg['#'])){ return } @@ -2395,7 +2429,7 @@ if(Tab.ons[tmp = msg['@'] || msg['#']]){ Tab.on(tmp, [msg['!'], msg['$']]); } - return + return } if(msg['$'] && msg['$']['#']){ return server.get(req, res) } else { return server.put(req, res) } @@ -2440,12 +2474,12 @@ Gun.obj.del(server.msg.debounce, id); }); },500); - if(server.msg.debounce[id]){ + if(server.msg.debounce[id]){ return server.msg.debounce[id] = Gun.time.is(), id; } server.msg.debounce[id] = Gun.time.is(); return; - }; + }; server.msg.debounce = server.msg.debounce || {}; }); diff --git a/lib/wsp/client.js b/lib/wsp/client.js index d1c1082a..80b408f6 100644 --- a/lib/wsp/client.js +++ b/lib/wsp/client.js @@ -91,7 +91,7 @@ Gun.on('opt', function (context) { if (!request || !request.body) { return; } - + root.on('in', request.body); }); }); diff --git a/lib/wsp/server-push.js b/lib/wsp/server-push.js index 5cd20470..727f8d1e 100644 --- a/lib/wsp/server-push.js +++ b/lib/wsp/server-push.js @@ -63,21 +63,19 @@ function attach (gun, server) { root._.servers.push(server); var pool = {}; var sid = Gun.text.random(); - server.on('connection', function (socket) { socket.id = socket.id || Gun.text.random(10); pool[socket.id] = socket; - + /* socket.on('message', function (message) { var data = Gun.obj.ify(message); if (!data || !data.body) { return; } - root.on('in', data.body); }); - + */ socket.once('close', function () { delete pool[socket.id]; }); @@ -92,7 +90,6 @@ function attach (gun, server) { headers: { 'gun-sid': sid }, body: context, }; - send(msg, pool); }); } diff --git a/lib/wsp/server.js b/lib/wsp/server.js index 9c486f64..3c56a425 100644 --- a/lib/wsp/server.js +++ b/lib/wsp/server.js @@ -150,9 +150,7 @@ Gun.on('opt', function (at) { req.url = url.format(req.url); } // var msg = req.body; - // console.log('SERVER', req); gun.on('in', req.body); - // console.log('-----------------'); // // AUTH for non-replies. // if(gun.wsp.msg(msg['#'])){ return } // gun.wsp.on('network', Gun.obj.copy(req)); diff --git a/lib/wsp/ws.js b/lib/wsp/ws.js index 45dc100a..5b95932d 100644 --- a/lib/wsp/ws.js +++ b/lib/wsp/ws.js @@ -1,4 +1,4 @@ -var Gun = require('../../gun') +var Gun = require('../../gun') , url = require('url'); module.exports = function(wss, server, opt){ wss.on('connection', function(ws){ @@ -27,7 +27,7 @@ module.exports = function(wss, server, opt){ (reply.headers = reply.headers || {})['ws-rid'] = msg.headers['ws-rid']; } try{ws.send(Gun.text.ify(reply)); - }catch(e){} // juuuust in case. + }catch(e){} // juuuust in case. }); }); ws.off = function(m){ From 3e2bf996fa8bab6f6886a0b5a8c762754d32ce06 Mon Sep 17 00:00:00 2001 From: Mark Nadal Date: Mon, 28 Nov 2016 19:40:39 -0800 Subject: [PATCH 22/24] final touches --- gun.js | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/gun.js b/gun.js index 4e23322d..5fa4d101 100644 --- a/gun.js +++ b/gun.js @@ -971,27 +971,26 @@ } function input(at){ var cat = this; if(!at.gun){ at.gun = cat.gun } - if(at['@']){ - if(!at['#']){ - at['#'] = Gun.text.random(); // TODO: Use what is used other places instead. - dedup.track(at['#']); - cat.on('out', at); - } - //if(at.err || u === at.put){ - Gun.on.ack(at['@'], at); - //return; - //} + if(!at['#'] && at['@']){ + at['#'] = Gun.text.random(); // TODO: Use what is used other places instead. + Gun.on.ack(at['@'], at); + dedup.track(at['#']); + cat.on('out', at); + return; } if(at['#'] && dedup.check(at['#'])){ return } + dedup.track(at['#']); + Gun.on.ack(at['@'], at); if(at.put){ if(cat.graph){ Gun.obj.map(at.put, ham, {at: at, cat: this}); // all unions must happen first, sadly. } Gun.obj.map(at.put, map, {at: at, cat: this}); - if(0 === at['@']){ return } // TODO: UNCLEAN! Temporary hack for now. + //if(0 === at['@']){ return } // TODO: UNCLEAN! Temporary hack for now. Gun.on('put', at); } if(at.get){ Gun.on('get', at) } + Gun.on('out', at); } function ham(data, key){ var cat = this.cat, graph = cat.graph; @@ -1387,6 +1386,7 @@ } else if(!lex && 0 != lex){ (gun = back.chain())._.err = {err: Gun.log('Invalid get request!', lex)}; + asdf; if(cb){ cb.call(gun, gun._.err) } return gun; } else @@ -1686,8 +1686,8 @@ if(tmp = at.put){ if(!f || obj_has(tmp[s], f)){ ev.off(); - at['@'] = 0; - at['#'] = 0; + //at['@'] = 0; + //at['#'] = 0; return root.on('in', at); } /* @@ -1712,7 +1712,7 @@ return; } } - if(gun._.put){ + if(gun._.put && !(null === f)){ gun = gun.get(f, null, {path:true}); gun.on('in', { err: at.err, From ce32d8414bdaabd6b4e6341d86b951a32c07ad8e Mon Sep 17 00:00:00 2001 From: Mark Nadal Date: Mon, 28 Nov 2016 20:31:27 -0800 Subject: [PATCH 23/24] ahem, thank you d3x0r --- gun.js | 1 - 1 file changed, 1 deletion(-) diff --git a/gun.js b/gun.js index 5fa4d101..c5a9c688 100644 --- a/gun.js +++ b/gun.js @@ -1386,7 +1386,6 @@ } else if(!lex && 0 != lex){ (gun = back.chain())._.err = {err: Gun.log('Invalid get request!', lex)}; - asdf; if(cb){ cb.call(gun, gun._.err) } return gun; } else From bd960b4d67f74d57a9eef45803af07755c05e23b Mon Sep 17 00:00:00 2001 From: Mark Nadal Date: Tue, 29 Nov 2016 11:35:20 -0800 Subject: [PATCH 24/24] thanks to @Stefdv #284 --- gun.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gun.js b/gun.js index c5a9c688..49daa5e8 100644 --- a/gun.js +++ b/gun.js @@ -2157,7 +2157,7 @@ }catch(e){ err = e || "localStorage failure" } }); //console.log('@@@@@@@@@@local put!'); - if(Gun.obj.empty(gun.Back('opt.peers'))){ + if(Gun.obj.empty(at.gun.Back('opt.peers'))){ Gun.on.ack(at, {err: err, ok: 0}); // only ack if there are no peers. } }