From 01c93157cb0a89211c0eb10610dce7bbf131490e Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Mon, 12 Dec 2016 14:37:09 -0700 Subject: [PATCH] Fix msg passing, add reconnect & backoff Fixed broadcast storm caused by re-enveloping the same message and re-emitting it (I passed the envelope to gun, not just the msg body). Sockets will attempt an exponential reconnect if the message queue isn't empty, otherwise they're still just lazy bums that only open when needed. JSONP wasn't working before I took on this project, but now it's completely disconnected. Once it's working, it should go in the `request` function as a websocket fallback. --- gun.js | 412 ++++++++++++++++++++++++++------------------------------- 1 file changed, 188 insertions(+), 224 deletions(-) diff --git a/gun.js b/gun.js index 304d4659..6bbd937e 100644 --- a/gun.js +++ b/gun.js @@ -1,6 +1,4 @@ -/* eslint-disable */ -/* eslint-enable no-console */ -//console.log("!!!!!!!!!!!!!!!! WARNING THIS IS GUN 0.5 !!!!!!!!!!!!!!!!!!!!!!"); +//console.log("!!!!!!!!!!!!!!!! WARNING THIS IS GUN 0.5 !!!!!!!!!!!!!!!!!!!!!!"); ;(function(){ /* UNBUILD */ @@ -121,7 +119,7 @@ t.r[k] = v; return; } t.r = t.r || []; - t.r.push(k); + t.r.push(k); }; Type.obj.map = function(l, c, _){ var u, i = 0, x, r, ll, lle, f = fn_is(c); @@ -1029,7 +1027,8 @@ Dedup.prototype.gc = function(){ var now = Gun.time.is(); var oldest = now; - var maxAge = 5 * 60 * 1000; + var maxAge = 5 * 60 * 1000; + var cache = this.cache; // TODO: Gun.scheduler already does this? Reuse that. Gun.obj.map(this.cache, function (time, id) { oldest = Math.min(now, time); @@ -1038,7 +1037,7 @@ return; } - delete this.cache[id]; + delete cache[id]; }); var done = Gun.obj.empty(this.cache); @@ -2185,15 +2184,19 @@ })(require, './adapters/localStorage'); ;require(function(module){ - /* eslint-enable */ - /* eslint-env browser */ - /* eslint-disable require-jsdoc */ - /* eslint "comma-dangle": ["error", "never"] */ var Gun = require('./gun'); - function Client (url) { + // Check for stone-age browsers. + if (typeof JSON === 'undefined') { + throw new Error( + 'Gun depends on JSON. Please load it first:\n' + + 'ajax.cdnjs.com/ajax/libs/json2/20110223/json2.js' + ); + } + + function Client (url, options) { if (!(this instanceof Client)) { - return new Client(url); + return new Client(url, options); } this.url = Client.formatURL(url); @@ -2202,6 +2205,9 @@ this.sid = Gun.text.random(10); this.on = Gun.on; + + this.options = options || {}; + this.resetBackoff(); } Client.prototype = { @@ -2232,6 +2238,15 @@ client.on('message', msg); }); + socket.addEventListener('close', function () { + + // Attempt reconnect if requests are pending. + if (client.queue.length) { + client.scheduleReconnect(); + } + + }); + // Send the messages in the queue. this.ready(function () { client.drainQueue(); @@ -2240,6 +2255,45 @@ return socket; }, + resetBackoff: function () { + var backoff = this.options; + + this.backoff = { + time: backoff.time || 100, + max: backoff.max || 30000, + factor: backoff.factor || 2 + }; + + return this.backoff; + }, + + nextBackoff: function () { + var backoff = this.backoff; + var next = backoff.time * backoff.factor; + var max = backoff.max; + + if (next > max) { + next = max; + } + + return (backoff.time = next); + }, + + // Try to efficiently reconnect. + scheduleReconnect: function () { + var client = this; + var time = this.backoff.time; + this.nextBackoff(); + + setTimeout(function () { + client.connect(); + + client.ready(function () { + client.resetBackoff(); + }); + }, time); + }, + isClosed: function () { var socket = this.socket; @@ -2348,6 +2402,7 @@ // Define client instances as gun needs them. // Sockets will not be opened until absolutely necessary. Gun.on('opt', function (ctx) { + var gun = ctx.gun; var peers = gun.Back('opt.peers') || {}; @@ -2376,11 +2431,17 @@ return; } - gun.on('in', data); + gun.on('in', data.body); }); }); }); + function request (peers, ctx) { + if (Client.isSupported) { + Client.broadcast(peers, { body: ctx }); + } + } + // Broadcast the messages. Gun.on('out', function (ctx) { var gun = ctx.gun; @@ -2392,219 +2453,122 @@ return; } - var msg = Gun.text.ify(ctx); - - if (Client.isSupported) { - Client.broadcast(peers, { body: msg }); - } + request(peers, ctx); }); - function r () {} - r.jsonp = function(opt, cb){ - r.jsonp.ify(opt, function(url){ - if(!url){ return } - r.jsonp.send(url, function(err, reply){ - cb(err, reply); - r.jsonp.poll(opt, reply); - }, opt.jsonp); - }); - } - r.jsonp.send = function(url, cb, id){ - var js = document.createElement('script'); - js.src = url; - js.onerror = function(c){ - (window[js.id]||function(){})(null, {err: "JSONP failed!"}); - } - window[js.id = id] = function(res, err){ - cb(err, res); - cb.id = js.id; - js.parentNode.removeChild(js); - window[cb.id] = null; // TODO: BUG: This needs to handle chunking! - try{delete window[cb.id]; - }catch(e){} - } - js.async = true; - document.getElementsByTagName('head')[0].appendChild(js); - return js; - } - r.jsonp.poll = function(opt, res){ - if(!opt || !opt.base || !res || !res.headers || !res.headers.poll){ return } - (r.jsonp.poll.s = r.jsonp.poll.s || {})[opt.base] = r.jsonp.poll.s[opt.base] || setTimeout(function(){ // TODO: Need to optimize for Chrome's 6 req limit? - //Gun.log("polling again"); - var o = {base: opt.base, headers: {pull: 1}}; - r.each(opt.headers, function(v,i){ o.headers[i] = v }) - r.jsonp(o, function(err, reply){ - delete r.jsonp.poll.s[opt.base]; - while(reply.body && reply.body.length && reply.body.shift){ // we're assuming an array rather than chunk encoding. :( - var res = reply.body.shift(); - if(res && res.body){ r.createServer.ing(res, function(){ r(opt.base, null, null, res) }) } // emit extra events. - } - }); - }, res.headers.poll); - } - r.jsonp.ify = function(opt, cb){ - var uri = encodeURIComponent, q = '?'; - if(opt.url && opt.url.pathname){ q = opt.url.pathname + q; } - q = opt.base + q; - r.each((opt.url||{}).query, function(v, i){ q += uri(i) + '=' + uri(v) + '&' }); - if(opt.headers){ q += uri('`') + '=' + uri(JSON.stringify(opt.headers)) + '&' } - if(r.jsonp.max < q.length){ return cb() } - q += uri('jsonp') + '=' + uri(opt.jsonp = 'P'+Math.floor((Math.random()*65535)+1)); - if(opt.body){ - q += '&'; - var w = opt.body, wls = function(w,l,s){ - return uri('%') + '=' + uri(w+'-'+(l||w)+'/'+(s||w)) + '&' + uri('$') + '='; + request.jsonp = function (opt, cb) { + request.jsonp.ify(opt, function (url) { + if (!url) { + return; + } + request.jsonp.send(url, function (err, reply) { + cb(err, reply); + request.jsonp.poll(opt, reply); + }, opt.jsonp); + }); + }; + request.jsonp.send = function (url, cb, id) { + var js = document.createElement('script'); + js.src = url; + js.onerror = function () { + (window[js.id] || function () {})(null, { + err: 'JSONP failed!' + }); + }; + window[js.id = id] = function (res, err) { + cb(err, res); + cb.id = js.id; + js.parentNode.removeChild(js); + delete window[cb.id]; + }; + js.async = true; + document.getElementsByTagName('head')[0].appendChild(js); + return js; + }; + request.jsonp.poll = function (opt, res) { + if (!opt || !opt.base || !res || !res.headers || !res.headers.poll) { + return; + } + var polls = request.jsonp.poll.s = request.jsonp.poll.s || {}; + polls[opt.base] = polls[opt.base] || setTimeout(function () { + var msg = { + base: opt.base, + headers: { pull: 1 } + }; + + request.each(opt.headers, function (header, name) { + msg.headers[name] = header; + }); + + request.jsonp(msg, function (err, reply) { + delete polls[opt.base]; + + var body = reply.body || []; + while (body.length && body.shift) { + var res = reply.body.shift(); + if (res && res.body) { + request.createServer.ing(res, function () { + request(opt.base, null, null, res); + }); + } + } + }); + }, res.headers.poll); + }; + request.jsonp.ify = function (opt, cb) { + var uri = encodeURIComponent, query = '?'; + if (opt.url && opt.url.pathname) { + query = opt.url.pathname + query; + } + query = opt.base + query; + request.each((opt.url || {}).query, function (value, key) { + query += (uri(key) + '=' + uri(value) + '&'); + }); + if (opt.headers) { + query += uri('`') + '=' + uri( + JSON.stringify(opt.headers) + ) + '&'; + } + if (request.jsonp.max < query.length) { + return cb(); + } + var random = Math.floor(Math.random() * (0xffff + 1)); + query += (uri('jsonp') + '=' + uri(opt.jsonp = 'P' + random)); + if (opt.body) { + query += '&'; + var w = opt.body, wls = function (w, l, s) { + return uri('%') + '=' + uri(w+'-'+(l||w)+'/'+(s||w)) + '&' + uri('$') + '='; } - if(typeof w != 'string'){ - w = JSON.stringify(w); - q += uri('^') + '=' + uri('json') + '&'; - } - w = uri(w); - var i = 0, l = w.length - , s = r.jsonp.max - (q.length + wls(l.toString()).length); - if(s < 0){ return cb() } - while(w){ - cb(q + wls(i, (i = i + s), l) + w.slice(0, i)); - w = w.slice(i); - } - } else { - cb(q); - } - } - r.jsonp.max = 2000; - r.each = function(obj, cb, as){ - if(!obj || !cb){ return } - for(var i in obj){ - if(obj.hasOwnProperty(i)){ - cb.call(as, obj[i], i); - } - } - } - module.exports = r; - /* eslint-disable */ + if (typeof w != 'string') { + w = JSON.stringify(w); + query += uri('^') + '=' + uri('json') + '&'; + } + w = uri(w); + var i = 0, l = w.length + , s = request.jsonp.max - (query.length + wls(l.toString()).length); + if (s < 0){ + return cb(); + } + while (w) { + cb(query + wls(i, (i += s), l) + w.slice(0, i)); + w = w.slice(i); + } + } else { + cb(query); + } + }; + request.jsonp.max = 2000; + request.each = function (obj, cb, as) { + if (!obj || !cb) { + return; + } + for (var key in obj) { + if (obj.hasOwnProperty(key)) { + cb.call(as, obj[key], key); + } + } + }; + module.exports = Client; })(require, './polyfill/request'); - - ;require(function(module){ - if(typeof JSON === 'undefined'){ throw new Error("Include JSON first: ajax.cdnjs.com/ajax/libs/json2/20110223/json2.js") } // for old IE use - if(typeof Gun === 'undefined'){ return } // TODO: window.Websocket is Browser only. But it would be nice if it could somehow merge it with lib/WSP? - - var root, noop = function(){}; - if(typeof window !== 'undefined'){ root = window } - - var Tab = {}; - Tab.on = Gun.on;//Gun.on.create(); - Tab.peers = require('../polyfill/peer'); - Gun.on('out', function(at){ - if(at.put){ return } // TODO: BUG! Doing this for now, to debug. However puts are handled below anyways, but it would be nice if we could switch over to this for both? - var gun = at.gun, opt = at.opt || {}, peers = opt.peers || gun.Back('opt.peers'); - if(!peers || Gun.obj.empty(peers)){ - Gun.log.once('peers', "Warning! You have no peers to connect to!"); - return; - } - var msg = at; - /* - var msg = { - '#': at['#'] || Gun.text.random(9), // msg ID - '$': at.get // msg BODY - }; - */ - Tab.on(msg['#'], function(err, data){ // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? - if(data){ - at.gun.Back(-1).on('out', {'@': at['#'], err: err, put: data}); - } else { - at.gun.Back(-1).on('in', {'@': at['#'], err: err, put: data}); - } - }); - Tab.peers(peers).send(msg, {headers: {'gun-sid': Tab.server.sid}}); - }); - Gun.on('put', function(at){ - if(at['@']){ return } - var opt = at.gun.Back('opt') || {}, peers = opt.peers; - if(!peers || Gun.obj.empty(peers)){ - Gun.log.once('peers', "Warning! You have no peers to save to!"); - return; - } - if(false === opt.websocket || (at.opt && false === at.opt.websocket)){ return } - var msg = at || { - '#': at['#'] || Gun.text.random(9), // msg ID - '$': at.put // msg BODY - }; - Tab.on(msg['#'], function(err, ok){ // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? - at.gun.Back(-1).on('in', {'@': at['#'], err: err, ok: ok}); - }); - Tab.peers(peers).send(msg, {headers: {'gun-sid': Tab.server.sid}}); - }); - // browser/client side Server! - Gun.on('opt', function(at){ // TODO: BUG! Does not respect separate instances!!! - if(Tab.server){ return } - var gun = at.gun, server = Tab.server = {}, tmp; - server.sid = Gun.text.random(); - Tab.peers.request.createServer(function(req, res){ - if(!req || !res || !req.body || !req.headers){ return } - var msg = req.body; - gun.on('in', req.body); - return; - // AUTH for non-replies. - if(server.msg(msg['#'])){ return } - //server.on('network', Gun.obj.copy(req)); // Unless we have WebRTC, not needed. - if(msg['@']){ // no need to process. - if(Tab.ons[tmp = msg['@'] || msg['#']]){ - Tab.on(tmp, [msg['!'], msg['$']]); - } - return - } - if(msg['$'] && msg['$']['#']){ return server.get(req, res) } - else { return server.put(req, res) } - }); - server.get = function(req, cb){ - var body = req.body, lex = body['$'], opt; - var graph = gun._.root._.graph, node; - if(!(node = graph[lex['#']])){ return } // Don't reply to data we don't have it in memory. TODO: Add localStorage? - cb({body: { - '#': server.msg(), - '@': body['#'], - '$': node - }}); - } - server.put = function(req, cb){ - var body = req.body, graph = body['$']; - var __ = gun._.root._; - if(!(graph = Gun.obj.map(graph, function(node, soul, map){ // filter out what we don't have in memory. - if(!__.path[soul]){ return } - map(soul, node); - }))){ return } - gun.on('out', {gun: gun, opt: {websocket: false}, put: graph, '#': Gun.on.ask(function(ack, ev){ - if(!ack){ return } - ev.off(); - return cb({body: { - '#': server.msg(), - '@': body['#'], - '$': ack, - '!': ack.err - }}); - })}); - } - server.msg = function(id){ - if(!id){ - return server.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id; - } - clearTimeout(server.msg.clear); - server.msg.clear = setTimeout(function(){ - var now = Gun.time.is(); - Gun.obj.map(server.msg.debounce, function(t,id){ - if((now - t) < (1000 * 60 * 5)){ return } - Gun.obj.del(server.msg.debounce, id); - }); - },500); - if(server.msg.debounce[id]){ - return server.msg.debounce[id] = Gun.time.is(), id; - } - server.msg.debounce[id] = Gun.time.is(); - return; - }; - server.msg.debounce = server.msg.debounce || {}; - }); - - })(require, './adapters/wsp'); - + }());