diff --git a/.gitignore b/.gitignore index 2cbe713e..56f45cc2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ node_modules/* npm-debug.log +gun.min.js yarn.lock *data.json *.db diff --git a/README.md b/README.md index 7449fb4d..b5beea73 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Try the [interactive tutorial](http://gun.js.org/think.html) in the browser (**5 ``` Then visit [http://localhost:8080](http://localhost:8080) in your browser. -### Hiroku +### Heroku ```bash git clone https://github.com/amark/gun.git cd gun diff --git a/gun.js b/gun.js index a9857102..49daa5e8 100644 --- a/gun.js +++ b/gun.js @@ -1,3 +1,5 @@ +/* eslint-disable */ +/* eslint-enable no-console */ //console.log("!!!!!!!!!!!!!!!! WARNING THIS IS GUN 0.5 !!!!!!!!!!!!!!!!!!!!!!"); ;(function(){ @@ -6,7 +8,7 @@ if(typeof window !== "undefined"){ root = window } if(typeof global !== "undefined"){ root = global } root = root || {}; - var console = root.console = root.console || {log: function(){}}; + var console = root.console || {log: function(){}}; function require(arg){ return arg.slice? require[resolve(arg)] : function(mod, path){ arg(mod = {exports: {}}); @@ -73,7 +75,7 @@ Type.list.map = function(l, c, _){ return obj_map(l, c, _) } Type.list.index = 1; // change this to 0 if you want non-logical, non-mathematical, non-matrix, non-convenient array notation Type.obj = {is: function(o){ return o? (o instanceof Object && o.constructor === Object) || Object.prototype.toString.call(o).match(/^\[object (\w+)\]$/)[1] === 'Object' : false }} - Type.obj.put = function(o, f, v){ return (o||{})[f] = v, o } + Type.obj.put = function(o, f, v){ return (o||{})[f] = v, o } Type.obj.has = function(o, f){ return o && Object.prototype.hasOwnProperty.call(o, f) } Type.obj.del = function(o, k){ if(!o){ return } @@ -163,7 +165,7 @@ var obj = Type.obj, obj_is = obj.is, obj_has = obj.has, obj_map = obj.map; module.exports = Type; })(require, './type'); - + ;require(function(module){ // On event emitter generic javascript utility. function Scope(){ @@ -295,7 +297,7 @@ ;require(function(module){ var On = require('./on'); - + function Chain(create, opt){ opt = opt || {}; opt.id = opt.id || '#'; @@ -312,7 +314,7 @@ return; } if(at.stun === stun){ - delete at.stun; + delete at.stun; } off = true; var i = 0, q = res.queue, l = q.length, c, v; @@ -346,6 +348,7 @@ on.ack = function(at, reply){ if(!at || !reply || !ask.on){ return } var id = at[opt.id] || at; + if(!ask.ons[id]){ return } ask.on(id, reply); return true; } @@ -476,11 +479,11 @@ } if(incomingState < currentState){ return {historical: true}; // the incoming value is within the boundary of the machine's state, but not within the range. - + } if(currentState < incomingState){ return {converge: true, incoming: true}; // the incoming value is within both the boundary and the range of the machine's state. - + } if(incomingState === currentState){ if(incomingValue === currentValue){ // Note: while these are practically the same, the deltas could be technically different @@ -620,7 +623,7 @@ if(o.node){ o.node[f] = tmp } return; } - if(Val.is(v)){ + if(Val.is(v)){ o.node[f] = v; } } @@ -705,8 +708,8 @@ } function map(n, s){ // we invert this because the way we check for this is via a negation. if(!n || s !== Node.soul(n) || !Node.is(n, this.fn)){ return true } // it is true that this is an invalid graph. - if(!fn_is(this.cb)){ return } - nf.n = n; nf.as = this.as; + if(!fn_is(this.cb)){ return } + nf.n = n; nf.as = this.as; this.cb.call(nf.as, n, s, nf); } }()); @@ -715,7 +718,7 @@ var at = {path: [], obj: obj}; if(!env){ env = {}; - } else + } else if(typeof env === 'string'){ env = {soul: env}; } else @@ -751,7 +754,7 @@ if(!f){ at.node = at.node || n || {}; if(obj_has(v, Node._)){ - at.node._ = Gun.obj.copy(v._); + at.node._ = obj_copy(v._); } at.node = Node.soul.ify(at.node, Val.rel.is(at.rel)); } @@ -845,6 +848,7 @@ ;require(function(module){ function Gun(o){ + if(o instanceof Gun){ return this } if(!(this instanceof Gun)){ return Gun.create(o) } this._ = {gun: this}; } @@ -884,7 +888,7 @@ Gun.graph = require('./graph'); Gun.on = require('./onify')(); - + /* var opt = {chain: 'in', back: 'out', extend: 'root', id: Gun._.soul}; Gun.chain = require('./chain')(Gun, opt); @@ -892,7 +896,7 @@ */ (Gun.chain = Gun.prototype).chain = function(){ var chain = new this.constructor(), _; - _ = chain._ || (chain._ = {}); + _ = chain._ || (chain._ = {gun: chain}); _.root = this._.root; _.back = this; return chain; @@ -929,17 +933,26 @@ } function output(at){ var cat = this, gun = cat.gun, tmp; - if(at.put){ - cat.on('in', obj_to(at, {gun: cat.gun})); + // TODO: BUG! Outgoing `get` to read from in memory!!! + if(at.get && get(at, cat)){ return } + //if(at.put){ + cat.on('in', obj_to(at, {gun: cat.gun})); // TODO: PERF! input now goes to output so it would be nice to reduce the circularity here for perf purposes. + //} + if(at['#']){ + dedup.track(at['#']); } if(!at.gun){ at = Gun.obj.to(at, {gun: gun}); } - if(at.put){ Gun.on('put', at) } - if(at.get){ get(at, cat) } - Gun.on('out', at); - if(!cat.back){ return } - cat.back.on('out', at); + //if(at.put){ Gun.on('put', at) } + //if(at.get){ get(at, cat) } + // Reads and writes both trigger output. // that should be intended. + //if (at.put !== undefined || at.get !== undefined) { + Gun.on('out', at); + //} + // Gun.on('out', at); + //if(!cat.back){ return } + //cat.back.on('out', at); } function get(at, cat){ var soul = at.get[_soul], node = cat.graph[soul], field = at.get[_field]; @@ -949,23 +962,35 @@ node = Gun.obj.put({_: node._}, field, node[field]); } cat.on('in', { - '@': at.req? at['#'] : 0, // temporary hack + '@': at['#'], put: Gun.graph.node(node) // TODO: BUG! Clone node! }); - return; + return true; } - Gun.on('get', at); + //Gun.on('get', at); } function input(at){ var cat = this; - if(at['@'] || at.err || u === at.put){ - at.gun = at.gun || cat.gun; + if(!at.gun){ at.gun = cat.gun } + if(!at['#'] && at['@']){ + at['#'] = Gun.text.random(); // TODO: Use what is used other places instead. Gun.on.ack(at['@'], at); + dedup.track(at['#']); + cat.on('out', at); return; } - if(cat.graph){ - Gun.obj.map(at.put, ham, {at: at, cat: this}); // all unions must happen first, sadly. + if(at['#'] && dedup.check(at['#'])){ return } + dedup.track(at['#']); + Gun.on.ack(at['@'], at); + if(at.put){ + if(cat.graph){ + Gun.obj.map(at.put, ham, {at: at, cat: this}); // all unions must happen first, sadly. + } + Gun.obj.map(at.put, map, {at: at, cat: this}); + //if(0 === at['@']){ return } // TODO: UNCLEAN! Temporary hack for now. + Gun.on('put', at); } - Gun.obj.map(at.put, map, {at: at, cat: this}); + if(at.get){ Gun.on('get', at) } + Gun.on('out', at); } function ham(data, key){ var cat = this.cat, graph = cat.graph; @@ -986,6 +1011,52 @@ via: this.at }); } + function dedup(){} + dedup.cache = {}; + dedup.track = function (id) { + dedup.cache[id] = Gun.time.is(); + // Engage GC. + if (!dedup.to) { + dedup.gc(); + } + return id; + }; + dedup.check = function(id){ + // Have we seen this ID recently? + return Gun.obj.has(dedup.cache, id); + } + dedup.gc = function(){ + var now = Gun.time.is(); + var oldest = now; + var maxAge = 5 * 60 * 1000; + // TODO: Gun.scheduler already does this? Reuse that. + Gun.obj.map(dedup.cache, function (time, id) { + oldest = Math.min(now, time); + + if ((now - time) < maxAge) { + return; + } + + delete dedup.cache[id]; + }); + + var done = Gun.obj.empty(dedup.cache); + + // Disengage GC. + if (done) { + dedup.to = null; + return; + } + + // Just how old? + var elapsed = now - oldest; + + // How long before it's too old? + var nextGC = maxAge - elapsed; + + // Schedule the next GC event. + dedup.to = setTimeout(dedup.gc, nextGC); + } }()); var text = Type.text, text_is = text.is, text_random = text.random; var list = Type.list, list_is = list.is; @@ -1019,7 +1090,7 @@ var is = state_is(node, field), cs = state_is(vertex, field); if(u === is || u === cs){ return true } // it is true that this is an invalid HAM comparison. var iv = rel_is(value) || value, cv = rel_is(vertex[field]) || vertex[field]; - + @@ -1089,7 +1160,7 @@ var obj = Gun.obj, obj_is = obj.is, obj_put = obj.put, obj_map = obj.map, obj_empty = obj.empty; var num = Gun.num, num_is = num.is; var _soul = Gun.val.rel._, _field = '.'; - + ;(function(){ var obj = {}, u; Gun.chain.Back = function(n, opt){ var tmp; if(-1 === n || Infinity === n){ @@ -1185,10 +1256,9 @@ as.ref.on('out', { gun: as.ref, put: as.out = as.env.graph, opt: as.opt, '#': Gun.on.ask(function(ack, ev){ - if(ack && 0 === ack.ok){ return } - ev.off(); // One response is good enough for us currently. Later we may want to adjust this. + ev.off(); // One response is good enough for us currently. Later we may want to provide an option to adjust this. if(!as.opt.any){ return } - as.opt.any.call(as.opt.as || as.gun, ack.err, ack.ok); + as.opt.any.call(as.opt.as || as.gun, ack.err, ack.ok, ev); }, as.opt) }); if(as.res){ as.res() } @@ -1220,7 +1290,7 @@ as.batch(); } - function any(at, ev){ + function any(at, ev){ function implicit(at){ // TODO: CLEAN UP!!!!! if(!at || !at.get){ return } // TODO: CLEAN UP!!!!! as.data = obj_put({}, tmp = at.get, as.data); // TODO: CLEAN UP!!!!! @@ -1231,9 +1301,9 @@ implicit(at); // TODO: CLEAN UP!!!!! } // TODO: CLEAN UP!!!!! var as = this; - if(at.err){ + if(at.err){ console.log("Please report this as an issue! Put.any.err"); - return + return } var cat = as.ref._, data = at.put, opt = as.opt, root, tmp; if(u === data){ @@ -1352,7 +1422,7 @@ var cat = back._, path = cat.path, gun = back.chain(), at = gun._; if(!path){ path = cat.path = {} } path[at.get = key] = gun; - at.stun = at.stun || cat.stun; // TODO: BUG! Clean up! This is kinda ugly. These need to be attached all the way down regardless of whether a gun chain has been cached or not for the first time. + at.stun = at.stun || cat.stun; // TODO: BUG! Clean up! This is kinda ugly. These need to be attached all the way down regardless of whether a gun chain has been cached or not for the first time. Gun.on('path', at); //gun.on('in', input, at); // For 'in' if I add my own listeners to each then I MUST do it before in gets called. If I listen globally for all incoming data instead though, regardless of individual listeners, I can transform the data there and then as well. gun.on('out', output, at); // However for output, there isn't really the global option. I must listen by adding my own listener individually BEFORE this one is ever called. @@ -1363,7 +1433,6 @@ if(!at.gun){ at.gun = gun; } - console.debug(10, 'out', cat.get, at.get); if(at.get && !at.get[_soul]){ if(typeof at.get === 'string'){ // request for soul! if(cat.ask){ @@ -1410,7 +1479,6 @@ at.gun.on('out', tmp); return; } - console.debug(7, 'out', cat.get, at.get, cat.ask); cat.back.on('out', { gun: cat.gun, get: cat.get @@ -1431,14 +1499,12 @@ tmp['#'] = Gun.on.ask(ack, tmp); cat.back.on('out', tmp); } else { - console.debug(6, 'out', cat.get); cat.back.on('out', { gun: cat.gun, get: cat.get }); } } - console.debug(9, 'out', cat.get); if(cat.stun && cat.stun(at)){ return } gun.on('in', at.get, at); return; @@ -1452,7 +1518,6 @@ console.log("Please report this as an issue! In.err"); // TODO: BUG! return; } - console.debug(10, 'input', at, cat.get); if(value.call(cat, at, ev)){ return; } @@ -1473,7 +1538,6 @@ return true; } if(!cat.link && Gun.node.soul(put) && (rel = Gun.node.soul(at.put))){ - console.debug(11, 'value', put); ask(cat, rel); return false; } @@ -1576,7 +1640,6 @@ if(!any){ return this } var chain = this, cat = chain._, opt = opt || {}, last = {};//function(){}; if(opt.change){ opt.change = 1 } - console.debug(5, 'any'); chain.on('out', {get: function(at, ev){ //console.log("any!", at); if(!at.gun){ console.log('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%EXPLODE%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%', at) } @@ -1589,7 +1652,7 @@ at = obj_to(at, {put: data = cat.change = cat.put = Gun.state.ify(Gun.node.ify({}, tmp))}); } // TODO: BUG! Need to use at.put > cat.put for merged cache? - if(tmp = opt.change){ // TODO: BUG! Opt is outer scope, gun/cat/data might be iterative and thus only inner scope? Aka, we can't use it for all of them. + if(tmp = opt.change){ // TODO: BUG! Opt is outer scope, gun/cat/data might be iterative and thus only inner scope? Aka, we can't use it for all of them. if(1 === tmp){ opt.change = true; } else { @@ -1602,11 +1665,11 @@ if(last[id] == data && obj_has(last, id)){ return } last[id] = data; // TODO: PERF! Memory optimizaiton? Can we avoid this. */ - + if(last.put === data && last.get === id){ return } last.get = id; last.put = data; - + cat.last = data; if(opt.as){ any.call(opt.as, at, ev); @@ -1622,7 +1685,8 @@ if(tmp = at.put){ if(!f || obj_has(tmp[s], f)){ ev.off(); - at['@'] = 0; + //at['@'] = 0; + //at['#'] = 0; return root.on('in', at); } /* @@ -1647,7 +1711,7 @@ return; } } - if(gun._.put){ + if(gun._.put && !(null === f)){ gun = gun.get(f, null, {path:true}); gun.on('in', { err: at.err, @@ -1665,6 +1729,37 @@ via: at }); } + + function ackk(at, ev){ var gun = this.gun; + var cat = gun._; + if(u !== cat.change){ return ev.off() } + // TODO: PERF! Memory. If somebody `gun.off()` we should clean up these requests. + // TODO: PERF! Memory. If peers only reply with `not` (or we never get replies) these event listeners will be left hanging - even if we get push updates that the data does exist. + if(cat.root === cat.back){ + //at.gun = cat.gun; + if(at.gun === cat.gun){ return } + at = { + get: cat.get, + gun: cat.gun, + via: at, + put: at.put[cat.get] + } + + } else { + if(obj_has(at.put, cat.get)){ return ev.off() } + at = { + get: cat.get, + gun: gun, + via: at.via? at : { + get: cat.back._.get, + gun: cat.back, + via: at + } + } + } + //at.get = at.get || cat.get; + cat.on('in', at); + } var obj = Gun.obj, obj_has = obj.has, obj_to = obj.to; var empty = {}, u; var _soul = Gun._.soul, _field = Gun._.field, _sid = Gun.on.ask._, _rid = Gun.on.ack._; @@ -1837,8 +1932,6 @@ gun = back; var i = 0, l = field.length; for(i; i < l; i++){ - console.debug(3, 'path', field[i]); - console.debug(2, 'path', field[i]); gun = gun.get(field[i], (i+1 === l)? cb : null, opt); } gun.back = back; // TODO: API change! @@ -1892,7 +1985,7 @@ } //if(obj_empty(value, Gun._.meta) && !(opt && opt.empty)){ // TODO: PERF! Deprecate!??? - + //} else { //console.log("value", value); //if(!(value||empty)['#']/* || !val_rel_is(value)*/){ // TODO: Performance hit!???? // TODO: BUG! WE should avoid this. So that way it is usable with gun plugin chains. @@ -1911,7 +2004,6 @@ if(cb){ (opt = opt || {}).ok = cb; opt.cat = at; - console.debug(4, 'val', at); gun.any(val, {as: opt}); opt.async = true; } @@ -1985,18 +2077,15 @@ var list = (cat = chain._).list = cat.list || {}; (ons[ons.length] = chain.on('in')).map = {}; ons[ons.length] = chain.on('out', function(at){ - console.debug(8, 'map out', at); if(at.get instanceof Function){ ons[ons.length] = chain.on('in', at.get, at); return; } else { - console.debug(9, 'map out', at); ons[ons.length] = chain.on('in', gun.get.input, at.gun._); } }); if(opt !== false){ ons[ons.length] = gun.on(map, {change: true, as: cat}); - console.debug(1, 'map'); } } if(cb){ @@ -2055,20 +2144,22 @@ ;require(function(module){ if(typeof JSON === 'undefined'){ throw new Error("Include JSON first: ajax.cdnjs.com/ajax/libs/json2/20110223/json2.js") } // for old IE use if(typeof Gun === 'undefined'){ return } // TODO: localStorage is Browser only. But it would be nice if it could somehow plugin into NodeJS compatible localStorage APIs? - + var root, noop = function(){}; if(typeof window !== 'undefined'){ root = window } var store = root.localStorage || {setItem: noop, removeItem: noop, getItem: noop}; function put(at){ var err, id, opt, root = at.gun._.root; - (opt = at.opt || {}).prefix = opt.prefix || at.gun.Back('opt.prefix') || 'gun/'; + (opt = {}).prefix = (at.opt || opt).prefix || at.gun.Back('opt.prefix') || 'gun/'; Gun.graph.is(at.put, function(node, soul){ //try{store.setItem(opt.prefix + soul, Gun.text.ify(node)); try{store.setItem(opt.prefix + soul, Gun.text.ify(root._.graph[soul]||node)); }catch(e){ err = e || "localStorage failure" } }); //console.log('@@@@@@@@@@local put!'); - Gun.on.ack(at, {err: err, ok: 0}); // TODO: Reliability! Are we sure we want to have localStorage ack? + if(Gun.obj.empty(at.gun.Back('opt.peers'))){ + Gun.on.ack(at, {err: err, ok: 0}); // only ack if there are no peers. + } } function get(at){ var gun = at.gun, lex = at.get, soul, data, opt, u; @@ -2076,7 +2167,12 @@ (opt = at.opt || {}).prefix = opt.prefix || at.gun.Back('opt.prefix') || 'gun/'; if(!lex || !(soul = lex[Gun._.soul])){ return } data = Gun.obj.ify(store.getItem(opt.prefix + soul) || null); - if(!data){ return } // localStorage isn't trustworthy to say "not found". + if(!data){ // localStorage isn't trustworthy to say "not found". + if(Gun.obj.empty(gun.Back('opt.peers'))){ + gun.Back(-1).on('in', {'@': at['#']}); + } + return; + } if(Gun.obj.has(lex, '.')){var tmp = data[lex['.']];data = {_: data._};if(u !== tmp){data[lex['.']] = tmp}} //console.log('@@@@@@@@@@@@local get', data, at); gun.Back(-1).on('in', {'@': at['#'], put: Gun.graph.node(data)}); @@ -2085,7 +2181,7 @@ Gun.on('put', put); Gun.on('get', get); })(require, './adapters/localStorage'); - + ;require(function(module){ function r(base, body, cb, opt){ var o = base.length? {base: base} : {}; @@ -2268,26 +2364,27 @@ ;require(function(module){ if(typeof JSON === 'undefined'){ throw new Error("Include JSON first: ajax.cdnjs.com/ajax/libs/json2/20110223/json2.js") } // for old IE use if(typeof Gun === 'undefined'){ return } // TODO: window.Websocket is Browser only. But it would be nice if it could somehow merge it with lib/WSP? - + var root, noop = function(){}; if(typeof window !== 'undefined'){ root = window } var Tab = {}; Tab.on = Gun.on;//Gun.on.create(); Tab.peers = require('../polyfill/peer'); - Gun.on('get', function(at){ + Gun.on('out', function(at){ + if(at.put){ return } // TODO: BUG! Doing this for now, to debug. However puts are handled below anyways, but it would be nice if we could switch over to this for both? var gun = at.gun, opt = at.opt || {}, peers = opt.peers || gun.Back('opt.peers'); if(!peers || Gun.obj.empty(peers)){ - //setTimeout(function(){ Gun.log.once('peers', "Warning! You have no peers to connect to!"); - at.gun.Back(-1).on('in', {'@': at['#']}); - //},100); return; } + var msg = at; + /* var msg = { '#': at['#'] || Gun.text.random(9), // msg ID '$': at.get // msg BODY }; + */ Tab.on(msg['#'], function(err, data){ // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? if(data){ at.gun.Back(-1).on('out', {'@': at['#'], err: err, put: data}); @@ -2302,11 +2399,10 @@ var opt = at.gun.Back('opt') || {}, peers = opt.peers; if(!peers || Gun.obj.empty(peers)){ Gun.log.once('peers', "Warning! You have no peers to save to!"); - at.gun.Back(-1).on('in', {'@': at['#']}); return; } if(false === opt.websocket || (at.opt && false === at.opt.websocket)){ return } - var msg = { + var msg = at || { '#': at['#'] || Gun.text.random(9), // msg ID '$': at.put // msg BODY }; @@ -2323,6 +2419,8 @@ Tab.peers.request.createServer(function(req, res){ if(!req || !res || !req.body || !req.headers){ return } var msg = req.body; + gun.on('in', req.body); + return; // AUTH for non-replies. if(server.msg(msg['#'])){ return } //server.on('network', Gun.obj.copy(req)); // Unless we have WebRTC, not needed. @@ -2330,7 +2428,7 @@ if(Tab.ons[tmp = msg['@'] || msg['#']]){ Tab.on(tmp, [msg['!'], msg['$']]); } - return + return } if(msg['$'] && msg['$']['#']){ return server.get(req, res) } else { return server.put(req, res) } @@ -2375,12 +2473,12 @@ Gun.obj.del(server.msg.debounce, id); }); },500); - if(server.msg.debounce[id]){ + if(server.msg.debounce[id]){ return server.msg.debounce[id] = Gun.time.is(), id; } server.msg.debounce[id] = Gun.time.is(); return; - }; + }; server.msg.debounce = server.msg.debounce || {}; }); diff --git a/lib/file.js b/lib/file.js index 51a6ed2c..26b0bea3 100644 --- a/lib/file.js +++ b/lib/file.js @@ -6,6 +6,23 @@ var Gun = require('../gun'), fs = require('fs'), file = {}; +function isUsingFileJS (context) { + + // Options passed via .get or .put. + var methodOptions = context.opt || {}; + + // Options set on the gun chain. + var chainOption = context.gun.Back('opt.file'); + + // Favor method options over chain options. + var file = methodOptions.hasOwnProperty('file') + ? methodOptions.file + : chainOption; + + // Return whether the module is disabled. + return file !== false; +} + // queue writes, adapted from https://github.com/toolness/jsondown/blob/master/jsondown.js var isWriting = false, queuedWrites = []; function writeFile(path, disk, at){ @@ -24,6 +41,9 @@ function writeFile(path, disk, at){ } Gun.on('put', function(at){ + if (isUsingFileJS(at) === false) { + return; + } var gun = at.gun, graph = at.put, opt = at.opt || {}; var __ = gun._.root._; Gun.obj.map(graph, function(node, soul){ @@ -32,7 +52,10 @@ Gun.on('put', function(at){ writeFile(opt.file || file.file, file.disk, at); }); Gun.on('get', function(at){ - var gun = at.gun, lex = at.get, opt = at.opt; + if (isUsingFileJS(at) === false) { + return; + } + var gun = at.gun, lex = at.get; if(!lex){return} gun.Back(-1).on('in', {'@': at['#'], put: Gun.graph.node(file.disk.graph[lex['#']])}); //at.cb(null, file.disk.graph[lex['#']]); @@ -43,7 +66,11 @@ Gun.on('opt', function(at){ if ((opts.file === false) || (opts.s3 && opts.s3.key)) { return; // don't use this plugin if S3 is being used. } - console.log("WARNING! This `file.js` module for gun is intended only for local development testing!") + Gun.log.once( + 'file-warning', + 'WARNING! This `file.js` module for gun is ' + + 'intended only for local development testing!' + ); file.file = opts.file || file.file || 'data.json'; file.raw = file.raw || (fs.existsSync || require('path').existsSync)(opts.file) ? fs.readFileSync(opts.file).toString() : null; file.disk = file.disk || Gun.obj.ify(file.raw || {graph: {}}); diff --git a/lib/server.js b/lib/server.js index af0f28b5..fbe858a1 100644 --- a/lib/server.js +++ b/lib/server.js @@ -1,9 +1,13 @@ -;(function(){ - console.log("Hello wonderful person! :) I'm mark@gunDB.io, message me for help or with hatemail. I want to hear from you! <3"); - var Gun = require('../gun'); - console.log("TODO: MARK! UPDATE S3 DRIVER BEFORE PUBLISHING!") +;(function(){ + var Gun = require('../gun'); //require('./s3'); - require('./wsp'); - require('./file'); - module.exports = Gun; + require('./wsp/server'); + require('./file'); + Gun.log( + 'Hello wonderful person! :)\n' + + 'I\'m mark@gunDB.io, message me for help or with hatemail. ' + + 'I want to hear from you! <3' + ); + Gun.log('TODO: MARK! UPDATE S3 DRIVER BEFORE PUBLISHING!'); + module.exports = Gun; }()); diff --git a/lib/wsp.js b/lib/wsp.js deleted file mode 100644 index 7768fa86..00000000 --- a/lib/wsp.js +++ /dev/null @@ -1,230 +0,0 @@ -;(function(wsp){ - /* - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - */ - var Gun = require('../gun') - , formidable = require('formidable') - , ws = require('ws').Server - , http = require('./http') - , url = require('url'); - Gun.on('opt', function(at){ - var gun = at.gun, opt = at.opt; - gun.__ = at.root._; - gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {}; - function start(server, port, app){ - if(app && app.use){ app.use(gun.wsp.server) } - server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server; - require('./ws')(gun.wsp.ws = gun.wsp.ws || new ws(gun.__.opt.ws), function(req, res){ - var ws = this; - req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid']; - ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){ - if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() } - if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return } - if(msg && msg.headers){ delete msg.headers['ws-rid'] } - // TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id? - try{ws.send(Gun.text.ify(msg)); - }catch(e){} // juuuust in case. - }); - gun.wsp.wire(req, res); - }, {headers: {'ws-rid': 1, 'gun-sid': 1}}); - gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80; - } - var wsp = gun.wsp = gun.wsp || function(server){ - if(!server){ return gun } - if(Gun.fns.is(server.address)){ - if(server.address()){ - start(server, server.address().port); - return gun; - } - } - if(Gun.fns.is(server.get) && server.get('port')){ - start(server, server.get('port')); - return gun; - } - var listen = server.listen; - server.listen = function(port){ - var serve = listen.apply(server, arguments); - start(serve, port, server); - return serve; - } - return gun; - } - gun.wsp.on = gun.wsp.on || Gun.on; - gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i; - gun.wsp.poll = gun.wsp.poll || opt.poll || 1; - gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000; - gun.wsp.server = gun.wsp.server || function(req, res, next){ // http - next = next || function(){}; - if(!req || !res){ return next(), false } - if(!req.url){ return next(), false } - if(!req.method){ return next(), false } - var msg = {}; - msg.url = url.parse(req.url, true); - if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail! - if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){ - res.writeHead(200, {'Content-Type': 'text/javascript'}); - res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../gun.js')); // gun server is caching the gun library for the client - return true; - } - if(!req.upgrade){ return next(), false } - return http(req, res, function(req, res){ - if(!req){ return next() } - var stream, cb = res = require('./jsonp')(req, res); - if(req.headers && (stream = req.headers['gun-sid'])){ - stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream}; - stream.drain = stream.drain || function(res){ - if(!res || !stream || !stream.queue || !stream.queue.length){ return } - res({headers: {'gun-sid': stream.sid}, body: stream.queue }); - stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull); - stream.reply = stream.queue = null; - return true; - } - stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){ - if(!stream){ return ev.off() } // self cleans up after itself! - if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return } - (stream.queue = stream.queue || []).push(req); - stream.drain(stream.reply); - }); - cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) } - clearTimeout(stream.off); - if(req.headers.pull){ - if(stream.drain(cb)){ return } - return stream.reply = cb; - } - } - gun.wsp.wire(req, cb); - }), true; - } - if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){ - require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity; - } - gun.wsp.msg = gun.wsp.msg || function(id){ - if(!id){ - return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id; - } - clearTimeout(gun.wsp.msg.clear); - gun.wsp.msg.clear = setTimeout(function(){ - var now = Gun.time.is(); - Gun.obj.map(gun.wsp.msg.debounce, function(t,id){ - if((now - t) < (1000 * 60 * 5)){ return } - Gun.obj.del(gun.wsp.msg.debounce, id); - }); - },500); - if(id = gun.wsp.msg.debounce[id]){ - return gun.wsp.msg.debounce[id] = Gun.time.is(), id; - } - gun.wsp.msg.debounce[id] = Gun.time.is(); - return; - }; - gun.wsp.msg.debounce = gun.wsp.msg.debounce || {}; - gun.wsp.wire = gun.wsp.wire || (function(){ - // all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers - // except for the ones that are listed in the message as having already been sending to. - // all states, implemented with GET, are replied to the source that asked for it. - function tran(req, res){ - if(!req || !res || !req.body || !req.headers){ return } - if(req.url){ req.url = url.format(req.url) } - var msg = req.body; - // AUTH for non-replies. - if(gun.wsp.msg(msg['#'])){ return } - gun.wsp.on('network', Gun.obj.copy(req)); - if(msg['@']){ return } // no need to process. - if(msg['$'] && msg['$']['#']){ return tran.get(req, res) } - //if(Gun.is.lex(msg['$'])){ return tran.get(req, res) } - else { return tran.put(req, res) } - cb({body: {hello: 'world'}}); - // TODO: BUG! server put should push. - } - tran.get = function(req, cb){ - var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; - gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){ - ev.off(); - var graph = at.put; - return cb({headers: reply.headers, body: { - '#': gun.wsp.msg(), - '@': body['#'], - '$': graph, - '!': at.err - }}); - return; - if(Gun.obj.empty(node)){ - return cb({headers: reply.headers, body: node}); - } // we're out of stuff! - /* - (function(chunks){ // FEATURE! Stream chunks if the nodes are large! - var max = 10, count = 0, soul = Gun.is.node.soul(node); - if(Object.keys(node).length > max){ - var n = Gun.is.node.soul.ify({}, soul); - Gun.obj.map(node, function(val, field){ - if(!(++count % max)){ - cb({headers: reply.headers, chunk: n}); // send node chunks - n = Gun.is.node.soul.ify({}, soul); - } - Gun.is.node.state.ify([n, node], field, val); - }); - if(count % max){ // finish off the last chunk - cb({headers: reply.headers, chunk: n}); - } - } else { - cb({headers: reply.headers, chunk: node}); // send full node - } - }([])); - */ - cb({headers: reply.headers, chunk: node }); // Use this if you don't want streaming chunks feature. - })}); - } - tran.put = function(req, cb){ - //console.log("tran.put", req); - // NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually. - // This will give you much more fine-grain control over security, transactions, and what not. - var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; - gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ - //Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ - ev.off(); - return cb({headers: reply.headers, body: { - '#': gun.wsp.msg(), - '@': body['#'], - '$': ack, - '!': ack.err - }}); - })}); - return; - if(Gun.is.graph(req.body)){ - if(req.err = Gun.union(gun, req.body, function(err, ctx){ // TODO: BUG? Probably should give me ctx.graph - if(err){ return cb({headers: reply.headers, body: {err: err || "Union failed."}}) } - var ctx = ctx || {}; ctx.graph = {}; - Gun.is.graph(req.body, function(node, soul){ - ctx.graph[soul] = gun.__.graph[soul]; - }); - (gun.__.opt.wire.put || function(g,cb){cb("No save.")})(ctx.graph, function(err, ok){ - if(err){ return cb({headers: reply.headers, body: {err: err || "Failed."}}) } // TODO: err should already be an error object? - cb({headers: reply.headers, body: {ok: ok || "Persisted."}}); - //console.log("tran.put <------------------------", ok); - }); - }).err){ cb({headers: reply.headers, body: {err: req.err || "Union failed."}}) } - } else { - cb({headers: reply.headers, body: {err: "Not a valid graph!"}}); - } - } - gun.wsp.on('network', function(req){ - // TODO: MARK! You should move the networking events to here, not in WSS only. - }); - tran.json = 'application/json'; - return tran; - }()); - if(opt.server){ - wsp(opt.server); - } - }); -}({})); diff --git a/lib/wsp/Peer.js b/lib/wsp/Peer.js new file mode 100644 index 00000000..493ba8e7 --- /dev/null +++ b/lib/wsp/Peer.js @@ -0,0 +1,190 @@ +/* eslint-disable no-underscore-dangle */ +'use strict'; + +var WebSocket = require('ws'); +var Emitter = require('events'); +var util = require('util'); + +/** + * Calculates backoff instances. + * @param {Object} [options] - Override the default settings. + * @param {Object} options.time=50 - Initial backoff time. + * @param {Object} options.factor=2 - How much to multiply the time by. + * @param {Object} options.max=1min - Maximum backoff time. + * @class + */ +function Backoff (options) { + this.options = options || {}; + + // Sets the initial backoff settings. + this.reset(); +} + +/** + * Increments the time by the factor. + * @return {Number} - The next backoff time. + */ +Backoff.prototype.next = function () { + var next = this.time * this.factor; + + if (next > this.max) { + this.time = this.max; + return this.max; + } + + this.time = next; + + return this.time; +}; + +/** + * Resets the backoff state to it's original condition. + * @return {Backoff} - The context. + */ +Backoff.prototype.reset = function () { + var options = this.options; + + this.time = options.time || 50; + this.factor = options.factor || 2; + this.max = options.max || 1 * 60 * 1000; + + return this; +}; + +/** + * Schedules the next connection, according to the backoff. + * @param {Peer} peer - A peer instance. + * @return {Timeout} - The timeout value from `setTimeout`. + */ +function scheduleReconnect (peer) { + var backoff = peer.backoff; + var time = backoff.time; + backoff.next(); + + var reconnect = peer.connect.bind(peer); + + return setTimeout(reconnect, time); +} + +/** + * Handles reconnections and defers messages until the socket is ready. + * @param {String} url - The address to connect to. + * @param {Object} [options] - Override how the socket is managed. + * @param {Object} options.backoff - Backoff options (see the constructor). + * @class + */ +function Peer (url, options) { + if (!(this instanceof Peer)) { + return new Peer(url, options); + } + + // Extend EventEmitter. + Emitter.call(this); + this.setMaxListeners(Infinity); + + this.options = options || {}; + + // Messages sent before the socket is ready. + this.deferredMsgs = []; + + this.url = Peer.formatURL(url); + this.backoff = new Backoff(this.options.backoff); + + // Set up the websocket. + this.connect(); + + var peer = this; + var reconnect = scheduleReconnect.bind(null, peer); + + // Handle reconnection. + this.on('close', reconnect); + this.on('error', function (error) { + if (error.code === 'ECONNREFUSED') { + reconnect(); + } + }); + + // Send deferred messages. + this.on('open', function () { + peer.drainQueue(); + peer.backoff.reset(); + }); + +} + +/** + * Turns http URLs into WebSocket URLs. + * @param {String} url - The url to format. + * @return {String} - A correctly formatted WebSocket URL. + */ +Peer.formatURL = function (url) { + + // Works for `https` and `wss` URLs, too. + return url.replace('http', 'ws'); +}; + +util.inherits(Peer, Emitter); +var API = Peer.prototype; + +/** + * Attempts a websocket connection. + * @return {WebSocket} - The new websocket instance. + */ +API.connect = function () { + var url = this.url; + + // Open a new websocket. + var socket = new WebSocket(url); + + // Re-use the previous listeners. + socket._events = this._events; + + this.socket = socket; + + return socket; +}; + +/** + * Sends all the messages in the deferred queue. + * @return {Peer} - The context. + */ +API.drainQueue = function () { + var peer = this; + + this.deferredMsgs.forEach(function (msg) { + peer.send(msg); + }); + + // Reset the queue. + this.deferredMsgs = []; + + return this; +}; + +/** + * Send data through the socket, or add it to a queue + * of deferred messages if it's not ready yet. + * @param {Mixed} msg - String, or anything that JSON can handle. + * @return {Peer} - The context. + */ +API.send = function (msg) { + var socket = this.socket; + var state = socket.readyState; + var ready = socket.OPEN; + + // Make sure it's a string. + if (typeof msg !== 'string') { + msg = JSON.stringify(msg); + } + + // Make sure the socket is ready. + if (state === ready) { + socket.send(msg); + } else { + this.deferredMsgs.push(msg); + } + + return this; +}; + +module.exports = Peer; diff --git a/lib/wsp/Pool.js b/lib/wsp/Pool.js new file mode 100644 index 00000000..542c0482 --- /dev/null +++ b/lib/wsp/Pool.js @@ -0,0 +1,101 @@ +'use strict'; + +/** + * Simpler interface over a collection of sockets. Works with + * WebSocket clients, or sockets from a WebSocket server. + * @class + */ +function Pool () { + if (!(this instanceof Pool)) { + return new Pool(); + } + + // Maps IDs to sockets. + this.sockets = {}; +} + +var API = Pool.prototype; + +/** + * Returns the socket by the given ID. + * @param {String} id - The unique socket ID. + * @return {WebSocket|Null} - The WebSocket, if found. + */ +API.get = function (id) { + return this.sockets[id] || null; +}; + +/** + * Adds a socket to the pool. + * @param {String} id - The socket ID. + * @param {WebSocket} socket - A websocket instance. + * @return {Pool} - The context. + */ +API.add = function (id, socket) { + this.sockets[id] = socket; + + return this; +}; + +/** + * Removes a socket from the pool. + * @param {String} id - The ID of the socket to remove. + * @return {Boolean} - Whether the pool contained the socket. + */ +API.remove = function (id) { + var sockets = this.sockets; + var hasSocket = sockets.hasOwnProperty(id); + + if (hasSocket) { + delete sockets[id]; + } + + return hasSocket; +}; + +/** + * Creates a filtered pool of sockets. Works the same as Array#filter. + * @param {Function} fn - Called for each socket in the pool. + * @param {Mixed} [_this] - The `this` context to use when invoking + * the callback. + * @return {Pool} - A new, filtered socket pool. + */ +API.filter = function (fn, _this) { + var filtered = Pool(); + var pool = this; + + _this = _this || pool; + + Object.keys(this.sockets).forEach(function (id) { + var socket = pool.sockets[id]; + + var shouldAdd = fn.call(_this, socket, id, pool); + + // Add it to the new pool. + if (shouldAdd) { + filtered.add(id, socket); + } + }); + + return filtered; +}; + +/** + * Send a message through each socket in the pool. + * @param {String} msg - The message to send. + * @return {Number} - How many sockets the message was sent to. + */ +API.send = function (msg) { + var pool = this; + + var ids = Object.keys(this.sockets); + + ids.forEach(function (id) { + var socket = pool.sockets[id]; + socket.send(msg); + }); + + return ids.length; +}; + +module.exports = Pool; diff --git a/lib/wsp/client.js b/lib/wsp/client.js new file mode 100644 index 00000000..80b408f6 --- /dev/null +++ b/lib/wsp/client.js @@ -0,0 +1,98 @@ +/* + eslint-disable + no-warning-comments, + no-underscore-dangle, +*/ +'use strict'; + +var Gun = require('../../gun'); +var Socket = require('./Peer'); +var Pool = require('./Pool'); + +// Maps URLs to sockets. +// Shared between all gun instances. +var sockets = Pool(); +var sid = Gun.text.random(); + +/** + * Take a map of URLs pointing to options and ensure the + * urls are using the WS protocol. + * @param {Object} peers - Any object with URLs as keys. + * @return {Object} - Object with normalized URL keys. + */ +function normalizeURLs (peers) { + var formatted = {}; + + Object.keys(peers).forEach(function (url) { + var options = peers[url]; + var id = Socket.formatURL(url); + formatted[id] = options; + }); + + return formatted; +} + +/** + * Turns a map of URLs into a socket pool. + * @param {Object} peers - Any object with URLs as keys. + * @return {Pool} - A pool of sockets corresponding to the URLs. + */ +function getSocketSubset (peers) { + var urls = normalizeURLs(peers); + + return sockets.filter(function (socket) { + return urls.hasOwnProperty(socket.url); + }); +} + +Gun.on('out', function (ctx) { + var gun = ctx.gun; + var opt = ctx.opt || {}; + var peers = opt.peers || gun.Back('opt.peers'); + + if (!peers) { + return; + } + + var subset = getSocketSubset(peers); + + subset.send({ + headers: { 'gun-sid': sid }, + body: ctx, + }); +}); + +// Open any new sockets listed, +// adding them to the global pool. +Gun.on('opt', function (context) { + var gun = context.gun; + var root = gun.Back(Infinity); + + var peers = gun.Back('opt.peers') || {}; + + Gun.obj.map(peers, function (options, url) { + if (sockets[url]) { + return; + } + + var socket = Socket(url, options); + sockets.add(url, socket); + + socket.on('message', function (msg) { + var request; + + try { + request = JSON.parse(msg); + } catch (error) { + return; + } + + // Validate the request. + if (!request || !request.body) { + return; + } + + root.on('in', request.body); + }); + }); +}); diff --git a/lib/wsp/duplicate.js b/lib/wsp/duplicate.js new file mode 100644 index 00000000..9080fee7 --- /dev/null +++ b/lib/wsp/duplicate.js @@ -0,0 +1,90 @@ +'use strict'; + +var Gun = require('../../gun'); + +var cache = {}; +var timeout = null; + +/** + * Remove all entries in the cache older than 5 minutes. + * Reschedules itself to run again when the oldest item + * might be too old. + * @return {undefined} + */ +function gc () { + var now = Date.now(); + var oldest = now; + var maxAge = 5 * 60 * 1000; + + Gun.obj.map(cache, function (time, id) { + oldest = Math.min(now, time); + + if ((now - time) < maxAge) { + return; + } + + delete cache[id]; + }); + + var done = Gun.obj.empty(cache); + + // Disengage GC. + if (done) { + timeout = null; + return; + } + + // Just how old? + var elapsed = now - oldest; + + // How long before it's too old? + var nextGC = maxAge - elapsed; + + // Schedule the next GC event. + timeout = setTimeout(gc, nextGC); +} + +/** + * Checks a memory-efficient cache to see if a string has been seen before. + * @param {String} id - A string to keep track of. + * @return {Boolean} - Whether it's been seen recently. + */ +function duplicate (id) { + + // Have we seen this ID recently? + var existing = cache.hasOwnProperty(id); + + // Add it to the cache. + duplicate.track(id); + + return existing; +} + +/** + * Starts tracking an ID as a possible future duplicate. + * @param {String} id - The ID to track. + * @return {String} - The same ID. + */ +duplicate.track = function (id) { + cache[id] = Date.now(); + + // Engage GC. + if (!timeout) { + gc(); + } + + return id; +}; + +/** + * Generate a new ID and start tracking it. + * @param {Number} [chars] - The number of characters to use. + * @return {String} - The newly created ID. + */ +duplicate.track.newID = function (chars) { + var id = Gun.text.random(chars); + + return duplicate.track(id); +}; + +module.exports = duplicate; diff --git a/lib/wsp/server-push.js b/lib/wsp/server-push.js new file mode 100644 index 00000000..727f8d1e --- /dev/null +++ b/lib/wsp/server-push.js @@ -0,0 +1,97 @@ +'use strict'; +var Gun = require('../../gun.js'); + +/** + * Whether the gun instance is attached to a socket server. + * @param {Gun} gun - The gun instance in question. + * @param {WebSocket.Server} server - A socket server gun might be attached to. + * @return {Boolean} - Whether it's attached. + */ +function isUsingServer (gun, server) { + var servers = gun.Back(-1)._.servers; + + return servers ? servers.indexOf(server) !== -1 : false; +} + +/** + * Calls a function when (or if) a socket is ready for messages. + * @param {WebSocket} socket - A websocket connection. + * @param {Function} cb - Called if or when the socket is ready. + * @return {Boolean} - Whether the socket is able to take messages. + */ +function ready (socket, cb) { + var state = socket.readyState; + + // The socket is ready. + if (state === socket.OPEN) { + cb(); + return true; + } + + // Still opening. + if (state === socket.OPENING) { + socket.once('open', cb); + } + + // Nope, closing or closed. + return false; +} + +/** + * Send a message to a group of clients. + * @param {Obejct} msg - An http envelope-like message. + * @param {Object} clients - IDs mapped to socket instances. + * @return {undefined} + */ +function send (msg, clients) { + Gun.obj.map(clients, function (client) { + ready(client, function () { + var serialized = JSON.stringify(msg); + client.send(serialized); + }); + }); +} + +/** * Attaches server push middleware to gun. + * @param {Gun} gun - The gun instance to attach to. + * @param {WebSocket.Server} server - A websocket server instance. + * @return {server} - The socket server. + */ +function attach (gun, server) { + var root = gun.Back(-1); + root._.servers = root._.servers || []; + root._.servers.push(server); + var pool = {}; + var sid = Gun.text.random(); + server.on('connection', function (socket) { + socket.id = socket.id || Gun.text.random(10); + pool[socket.id] = socket; + /* + socket.on('message', function (message) { + var data = Gun.obj.ify(message); + + if (!data || !data.body) { + return; + } + root.on('in', data.body); + }); + */ + socket.once('close', function () { + delete pool[socket.id]; + }); + }); + + Gun.on('out', function (context) { + if (!isUsingServer(context.gun, server) || Gun.obj.empty(pool)) { + return; + } + + var msg = { + headers: { 'gun-sid': sid }, + body: context, + }; + send(msg, pool); + }); +} + +module.exports = attach; diff --git a/lib/wsp/server.js b/lib/wsp/server.js new file mode 100644 index 00000000..3c56a425 --- /dev/null +++ b/lib/wsp/server.js @@ -0,0 +1,244 @@ +/* eslint-disable require-jsdoc, no-underscore-dangle */ +'use strict'; +var Gun = require('../../gun'); +var http = require('../http'); +var url = require('url'); +var WS = require('ws'); +var WSS = WS.Server; +var attach = require('./server-push'); + +// Handles server to server sync. +require('./client.js'); + +Gun.on('opt', function (at) { + var gun = at.gun, opt = at.opt; + gun.__ = at.root._; + gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {}; + + function start (server, port, app) { + if (app && app.use) { + app.use(gun.wsp.server); + } + server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server; + + if (!gun.wsp.ws) { + gun.wsp.ws = new WSS(gun.__.opt.ws); + attach(gun, gun.wsp.ws); + } + + gun.wsp.ws = gun.wsp.ws || new WSS(gun.__.opt.ws); + require('./ws')(gun.wsp.ws, function (req, res) { + var ws = this; + req.headers['gun-sid'] = ws.sid = ws.sid ? ws.sid : req.headers['gun-sid']; + ws.sub = ws.sub || gun.wsp.on('network', function (msg, ev) { + if (!ws || !ws.send || !ws._socket || !ws._socket.writable) { return ev.off(); } + if (!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)) { return; } + if (msg && msg.headers) { delete msg.headers['ws-rid']; } + // TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id? + try { ws.send(Gun.text.ify(msg)); + } catch (e) {} // juuuust in case. + }); + gun.wsp.wire(req, res); + }, {headers: {'ws-rid': 1, 'gun-sid': 1}}); + gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80; + } + var wsp = gun.wsp = gun.wsp || function (server) { + if (!server) { return gun; } + if (Gun.fns.is(server.address)) { + if (server.address()) { + start(server, server.address().port); + return gun; + } + } + if (Gun.fns.is(server.get) && server.get('port')) { + start(server, server.get('port')); + return gun; + } + var listen = server.listen; + server.listen = function (port) { + var serve = listen.apply(server, arguments); + start(serve, port, server); + return serve; + }; + return gun; + }; + gun.wsp.on = gun.wsp.on || Gun.on; + gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i; + gun.wsp.poll = gun.wsp.poll || opt.poll || 1; + gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000; + gun.wsp.server = gun.wsp.server || function (req, res, next) { // http + next = next || function () {}; + if (!req || !res) { return next(), false; } + if (!req.url) { return next(), false; } + if (!req.method) { return next(), false; } + var msg = {}; + msg.url = url.parse(req.url, true); + if (!gun.wsp.regex.test(msg.url.pathname)) { return next(), false; } // TODO: BUG! If the option isn't a regex then this will fail! + if (msg.url.pathname.replace(gun.wsp.regex, '').slice(0, 3).toLowerCase() === '.js') { + res.writeHead(200, {'Content-Type': 'text/javascript'}); + res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../../gun.js')); // gun server is caching the gun library for the client + return true; + } + + if (!req.upgrade) { + next(); + return false; + } + + return http(req, res, function (req, res) { + if (!req) { return next(); } + var stream, cb = res = require('../jsonp')(req, res); + if (req.headers && (stream = req.headers['gun-sid'])) { + stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream}; + stream.drain = stream.drain || function (res) { + if (!res || !stream || !stream.queue || !stream.queue.length) { return; } + res({headers: {'gun-sid': stream.sid}, body: stream.queue }); + stream.off = setTimeout(function () { stream = null; }, gun.wsp.pull); + stream.reply = stream.queue = null; + return true; + }; + stream.sub = stream.sub || gun.wsp.on('network', function (req, ev) { + if (!stream) { return ev.off(); } // self cleans up after itself! + if (!req || (req.headers && req.headers['gun-sid'] === stream.sid)) { return; } + (stream.queue = stream.queue || []).push(req); + stream.drain(stream.reply); + }); + cb = function (r) { (r.headers || {}).poll = gun.wsp.poll; res(r); }; + clearTimeout(stream.off); + if (req.headers.pull) { + if (stream.drain(cb)) { return; } + return stream.reply = cb; + } + } + gun.wsp.wire(req, cb); + }), true; + }; + if ((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false) { + require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity; + } + gun.wsp.msg = gun.wsp.msg || function (id) { + if (!id) { + return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id; + } + clearTimeout(gun.wsp.msg.clear); + gun.wsp.msg.clear = setTimeout(function () { + var now = Gun.time.is(); + Gun.obj.map(gun.wsp.msg.debounce, function (t, id) { + if ((now - t) < (1000 * 60 * 5)) { return; } + Gun.obj.del(gun.wsp.msg.debounce, id); + }); + }, 500); + if (id = gun.wsp.msg.debounce[id]) { + return gun.wsp.msg.debounce[id] = Gun.time.is(), id; + } + gun.wsp.msg.debounce[id] = Gun.time.is(); + return; + }; + gun.wsp.msg.debounce = gun.wsp.msg.debounce || {}; + gun.wsp.wire = gun.wsp.wire || (function () { + // all streams, technically PATCH but implemented as + // PUT or POST, are forwarded to other trusted peers + // except for the ones that are listed in the message + // as having already been sent to. + // all states, implemented with GET, are replied to the + // source that asked for it. + function tran (req, res) { + if (!req || !res || !req.body || !req.headers) { + return; + } + if (req.url) { + req.url = url.format(req.url); + } + // var msg = req.body; + gun.on('in', req.body); + // // AUTH for non-replies. + // if(gun.wsp.msg(msg['#'])){ return } + // gun.wsp.on('network', Gun.obj.copy(req)); + // if(msg['@']){ return } // no need to process. + // if(msg['$'] && msg['$']['#']){ return tran.get(req, res) } + // //if(Gun.is.lex(msg['$'])){ return tran.get(req, res) } + // else { return tran.put(req, res) } + // cb({body: {hello: 'world'}}); + // // TODO: BUG! server put should push. + } + tran.get = function (req, cb) { + var body = req.body; + var lex = body.$; + var reply = { + headers: { 'Content-Type': tran.json }, + }; + + var graph = gun.Back(Infinity)._.graph; + var node = graph[lex['#']]; + var result = Gun.graph.ify(node); + + if (node) { + cb({ + headers: reply.headers, + body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': result, + }, + }); + + return; + } + + gun.on('out', { + gun: gun, + get: lex, + req: 1, + '#': body['#'] || Gun.on.ask(function (at, ev) { + ev.off(); + var graph = at.put; + return cb({ + headers: reply.headers, + body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': graph, + '!': at.err, + }, + }); + }), + }); + }; + + tran.put = function (req, cb) { + // NOTE: It is highly recommended you do your own PUT/POSTs + // through your own API that then saves to gun manually. + // This will give you much more fine-grain control over + // security, transactions, and what not. + var body = req.body; + var graph = body.$; + var reply = { + headers: { 'Content-Type': tran.json }, + }; + + gun.on('out', { + gun: gun, + put: graph, + '#': Gun.on.ask(function (ack, ev) { + ev.off(); + return cb({ + headers: reply.headers, + body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': ack, + '!': ack.err, + }, + }); + }), + }); + }; + + tran.json = 'application/json'; + return tran; + }()); + + if (opt.server) { + wsp(opt.server); + } +}); diff --git a/lib/ws.js b/lib/wsp/ws.js similarity index 92% rename from lib/ws.js rename to lib/wsp/ws.js index 270ea0b4..5b95932d 100644 --- a/lib/ws.js +++ b/lib/wsp/ws.js @@ -1,4 +1,4 @@ -var Gun = require('../gun') +var Gun = require('../../gun') , url = require('url'); module.exports = function(wss, server, opt){ wss.on('connection', function(ws){ @@ -27,7 +27,7 @@ module.exports = function(wss, server, opt){ (reply.headers = reply.headers || {})['ws-rid'] = msg.headers['ws-rid']; } try{ws.send(Gun.text.ify(reply)); - }catch(e){} // juuuust in case. + }catch(e){} // juuuust in case. }); }); ws.off = function(m){ diff --git a/package.json b/package.json index 18e8b3f1..b5969385 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "main": "index.js", "scripts": { "start": "node examples/http.js 8080", + "prepublish": "uglifyjs2 gun.js -o gun.min.js -c -m", "test": "mocha", "unbuild": "node lib/unbuild.js" }, @@ -49,9 +50,10 @@ "ws": "~>1.0.1" }, "devDependencies": { - "mocha": "~>1.9.0", "express": "~>4.13.4", + "mocha": "~>1.9.0", "panic-server": "~>0.3.0", - "selenium-webdriver": "~>2.53.2" + "selenium-webdriver": "~>2.53.2", + "uglify-js2": "^2.1.11" } }