diff --git a/lib/server.js b/lib/server.js index af0f28b5..a6082d4c 100644 --- a/lib/server.js +++ b/lib/server.js @@ -3,7 +3,7 @@ var Gun = require('../gun'); console.log("TODO: MARK! UPDATE S3 DRIVER BEFORE PUBLISHING!") //require('./s3'); - require('./wsp'); + require('./wsp/server'); require('./file'); module.exports = Gun; }()); diff --git a/lib/wsp.js b/lib/wsp.js deleted file mode 100644 index 7768fa86..00000000 --- a/lib/wsp.js +++ /dev/null @@ -1,230 +0,0 @@ -;(function(wsp){ - /* - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - TODO: SERVER PUSH! - TODO: SERVER GET! - */ - var Gun = require('../gun') - , formidable = require('formidable') - , ws = require('ws').Server - , http = require('./http') - , url = require('url'); - Gun.on('opt', function(at){ - var gun = at.gun, opt = at.opt; - gun.__ = at.root._; - gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {}; - function start(server, port, app){ - if(app && app.use){ app.use(gun.wsp.server) } - server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server; - require('./ws')(gun.wsp.ws = gun.wsp.ws || new ws(gun.__.opt.ws), function(req, res){ - var ws = this; - req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid']; - ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){ - if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() } - if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return } - if(msg && msg.headers){ delete msg.headers['ws-rid'] } - // TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id? - try{ws.send(Gun.text.ify(msg)); - }catch(e){} // juuuust in case. - }); - gun.wsp.wire(req, res); - }, {headers: {'ws-rid': 1, 'gun-sid': 1}}); - gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80; - } - var wsp = gun.wsp = gun.wsp || function(server){ - if(!server){ return gun } - if(Gun.fns.is(server.address)){ - if(server.address()){ - start(server, server.address().port); - return gun; - } - } - if(Gun.fns.is(server.get) && server.get('port')){ - start(server, server.get('port')); - return gun; - } - var listen = server.listen; - server.listen = function(port){ - var serve = listen.apply(server, arguments); - start(serve, port, server); - return serve; - } - return gun; - } - gun.wsp.on = gun.wsp.on || Gun.on; - gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i; - gun.wsp.poll = gun.wsp.poll || opt.poll || 1; - gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000; - gun.wsp.server = gun.wsp.server || function(req, res, next){ // http - next = next || function(){}; - if(!req || !res){ return next(), false } - if(!req.url){ return next(), false } - if(!req.method){ return next(), false } - var msg = {}; - msg.url = url.parse(req.url, true); - if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail! - if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){ - res.writeHead(200, {'Content-Type': 'text/javascript'}); - res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../gun.js')); // gun server is caching the gun library for the client - return true; - } - if(!req.upgrade){ return next(), false } - return http(req, res, function(req, res){ - if(!req){ return next() } - var stream, cb = res = require('./jsonp')(req, res); - if(req.headers && (stream = req.headers['gun-sid'])){ - stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream}; - stream.drain = stream.drain || function(res){ - if(!res || !stream || !stream.queue || !stream.queue.length){ return } - res({headers: {'gun-sid': stream.sid}, body: stream.queue }); - stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull); - stream.reply = stream.queue = null; - return true; - } - stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){ - if(!stream){ return ev.off() } // self cleans up after itself! - if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return } - (stream.queue = stream.queue || []).push(req); - stream.drain(stream.reply); - }); - cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) } - clearTimeout(stream.off); - if(req.headers.pull){ - if(stream.drain(cb)){ return } - return stream.reply = cb; - } - } - gun.wsp.wire(req, cb); - }), true; - } - if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){ - require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity; - } - gun.wsp.msg = gun.wsp.msg || function(id){ - if(!id){ - return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id; - } - clearTimeout(gun.wsp.msg.clear); - gun.wsp.msg.clear = setTimeout(function(){ - var now = Gun.time.is(); - Gun.obj.map(gun.wsp.msg.debounce, function(t,id){ - if((now - t) < (1000 * 60 * 5)){ return } - Gun.obj.del(gun.wsp.msg.debounce, id); - }); - },500); - if(id = gun.wsp.msg.debounce[id]){ - return gun.wsp.msg.debounce[id] = Gun.time.is(), id; - } - gun.wsp.msg.debounce[id] = Gun.time.is(); - return; - }; - gun.wsp.msg.debounce = gun.wsp.msg.debounce || {}; - gun.wsp.wire = gun.wsp.wire || (function(){ - // all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers - // except for the ones that are listed in the message as having already been sending to. - // all states, implemented with GET, are replied to the source that asked for it. - function tran(req, res){ - if(!req || !res || !req.body || !req.headers){ return } - if(req.url){ req.url = url.format(req.url) } - var msg = req.body; - // AUTH for non-replies. - if(gun.wsp.msg(msg['#'])){ return } - gun.wsp.on('network', Gun.obj.copy(req)); - if(msg['@']){ return } // no need to process. - if(msg['$'] && msg['$']['#']){ return tran.get(req, res) } - //if(Gun.is.lex(msg['$'])){ return tran.get(req, res) } - else { return tran.put(req, res) } - cb({body: {hello: 'world'}}); - // TODO: BUG! server put should push. - } - tran.get = function(req, cb){ - var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; - gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){ - ev.off(); - var graph = at.put; - return cb({headers: reply.headers, body: { - '#': gun.wsp.msg(), - '@': body['#'], - '$': graph, - '!': at.err - }}); - return; - if(Gun.obj.empty(node)){ - return cb({headers: reply.headers, body: node}); - } // we're out of stuff! - /* - (function(chunks){ // FEATURE! Stream chunks if the nodes are large! - var max = 10, count = 0, soul = Gun.is.node.soul(node); - if(Object.keys(node).length > max){ - var n = Gun.is.node.soul.ify({}, soul); - Gun.obj.map(node, function(val, field){ - if(!(++count % max)){ - cb({headers: reply.headers, chunk: n}); // send node chunks - n = Gun.is.node.soul.ify({}, soul); - } - Gun.is.node.state.ify([n, node], field, val); - }); - if(count % max){ // finish off the last chunk - cb({headers: reply.headers, chunk: n}); - } - } else { - cb({headers: reply.headers, chunk: node}); // send full node - } - }([])); - */ - cb({headers: reply.headers, chunk: node }); // Use this if you don't want streaming chunks feature. - })}); - } - tran.put = function(req, cb){ - //console.log("tran.put", req); - // NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually. - // This will give you much more fine-grain control over security, transactions, and what not. - var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; - gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ - //Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ - ev.off(); - return cb({headers: reply.headers, body: { - '#': gun.wsp.msg(), - '@': body['#'], - '$': ack, - '!': ack.err - }}); - })}); - return; - if(Gun.is.graph(req.body)){ - if(req.err = Gun.union(gun, req.body, function(err, ctx){ // TODO: BUG? Probably should give me ctx.graph - if(err){ return cb({headers: reply.headers, body: {err: err || "Union failed."}}) } - var ctx = ctx || {}; ctx.graph = {}; - Gun.is.graph(req.body, function(node, soul){ - ctx.graph[soul] = gun.__.graph[soul]; - }); - (gun.__.opt.wire.put || function(g,cb){cb("No save.")})(ctx.graph, function(err, ok){ - if(err){ return cb({headers: reply.headers, body: {err: err || "Failed."}}) } // TODO: err should already be an error object? - cb({headers: reply.headers, body: {ok: ok || "Persisted."}}); - //console.log("tran.put <------------------------", ok); - }); - }).err){ cb({headers: reply.headers, body: {err: req.err || "Union failed."}}) } - } else { - cb({headers: reply.headers, body: {err: "Not a valid graph!"}}); - } - } - gun.wsp.on('network', function(req){ - // TODO: MARK! You should move the networking events to here, not in WSS only. - }); - tran.json = 'application/json'; - return tran; - }()); - if(opt.server){ - wsp(opt.server); - } - }); -}({})); diff --git a/lib/wsp/Peer.js b/lib/wsp/Peer.js new file mode 100644 index 00000000..05f510b3 --- /dev/null +++ b/lib/wsp/Peer.js @@ -0,0 +1,176 @@ +var WebSocket = require('ws'); + +/** + * Calculates backoff instances. + * @param {Object} [options] - Override the default settings. + * @param {Object} options.time=50 - Initial backoff time. + * @param {Object} options.factor=2 - How much to multiply the time by. + * @class + */ +function Backoff (options) { + this.options = options || {}; + + // Sets the initial backoff settings. + this.reset(); +} + +/** + * Increments the time by the factor. + * @return {Number} - The next backoff time. + */ +Backoff.prototype.next = function () { + this.time *= this.factor; + + return this.time; +}; + +/** + * Resets the backoff state to it's original condition. + * @return {Backoff} - The context. + */ +Backoff.prototype.reset = function () { + var options = this.options; + + this.time = options.time || 50; + this.factor = options.factor || 2; + + return this; +}; + +/** + * Create a websocket client and handle reconnect backoff logic. + * @param {String} url - A preformatted url (starts with ws://) + * @param {Object} [options] - Override how the socket is managed. + * @param {Object} options.backoff - Backoff options (see the constructor). + * @class + */ +function Peer (url, options) { + if (!(this instanceof Peer)) { + return new Peer(url, options); + } + + this.options = options || {}; + + // Messages sent while offline. + this.offline = []; + + this.url = Peer.formatURL(url); + this.backoff = new Backoff(this.options.backoff); + this.retry(url); +} + +/** + * Turns http URLs into WebSocket URLs. + * @param {String} url - The url to format. + * @return {String} - A correctly formatted WebSocket URL. + */ +Peer.formatURL = function (url) { + + // Works for `https` and `wss` URLs, too. + return url.replace('http', 'ws'); +}; + +var API = Peer.prototype; + +/** + * Attempts a websocket connection. + * @param {String} url - The websocket URL. + * @return {WebSocket} - The new websocket instance. + */ +API.retry = function () { + var url = this.url; + + var socket = new WebSocket(url); + this.socket = socket; + + this.retryOnDisconnect(socket); + + this.sendOnConnection(); + + return socket; +}; + +/** + * Sends the messages that couldn't be sent before once + * the connection is open. + * @return {Peer} - The context. + */ +API.sendOnConnection = function () { + var peer = this; + var queue = this.offline; + var socket = this.socket; + + // Wait for the socket to connect. + socket.once('open', function () { + queue.forEach(function (msg) { + socket.send(msg); + }); + + peer.offline = []; + }); + + return this; +}; + +/** + * Schedules the next retry, according to the backoff. + * @param {Peer} peer - A peer instance. + * @return {Timeout} - The timeout value from `setTimeout`. + */ +function schedule (peer) { + var backoff = peer.backoff; + var time = backoff.time; + backoff.next(); + + return setTimeout(function () { + var socket = peer.retry(); + + // Successfully reconnected? Reset the backoff. + socket.once('open', backoff.reset.bind(backoff)); + }, time); +} + +/** + * Attaches handlers to the socket, attempting reconnection + * when it's closed. + * @param {WebSocket} socket - The websocket instance to bind to. + * @return {WebSocket} - The same websocket. + */ +API.retryOnDisconnect = function (socket) { + var peer = this; + + // Listen for socket close events. + socket.once('close', function () { + schedule(peer); + }); + + socket.on('error', function (error) { + if (error.code === 'ECONNREFUSED') { + schedule(peer); + } + }); + + return socket; +}; + +/** + * Send data through the socket, or add it to a queue + * of offline requests if it's not ready yet. + * @param {String} msg - The data to send. + * @return {Peer} - The context. + */ +API.send = function (msg) { + var socket = this.socket; + var state = socket.readyState; + var ready = socket.OPEN; + + if (state === ready) { + socket.send(msg); + } else { + this.offline.push(msg); + } + + return this; +}; + +module.exports = Peer; diff --git a/lib/wsp/client.js b/lib/wsp/client.js new file mode 100644 index 00000000..d39a53df --- /dev/null +++ b/lib/wsp/client.js @@ -0,0 +1,460 @@ +/* eslint-env node*/ +/* + eslint-disable + require-jsdoc, + no-warning-comments, + no-underscore-dangle, + max-params, +*/ +'use strict'; + +var Gun = require('../../gun'); +var WS = require('ws'); + +var Tab = {}; +Tab.on = Gun.on; +Tab.peers = (function () { + + function Peer (peers) { + if (!Peer.is(this)) { + return new Peer(peers); + } + + this.peers = peers; + } + + Peer.is = function (peer) { + return peer instanceof Peer; + }; + + function map (peer, url) { + var msg = this.msg; + var opt = this.opt || {}; + opt.out = true; + Peer.request(url, msg, null, opt); + } + + Peer.prototype.send = function (msg, opt) { + Peer.request.each(this.peers, map, { + msg: msg, + opt: opt, + }); + }; + + Peer.request = (function () { + + function request (base, body, cb, opt) { + + var obj = base.length ? { base: base } : {}; + obj.base = opt.base || base; + obj.body = opt.body || body; + obj.headers = opt.headers; + obj.url = opt.url; + obj.out = opt.out; + cb = cb || function () {}; + + if (!obj.base) { + return; + } + + request.transport(obj, cb); + } + + request.createServer = function (fn) { + request.createServer.list.push(fn); + }; + + request.createServer.ing = function (req, cb) { + var index = request.createServer.list.length; + var server; + while (index) { + index -= 1; + server = request.createServer.list[index] || function () {}; + server(req, cb); + } + }; + + request.createServer.list = []; + request.back = 2; + request.backoff = 2; + + request.transport = function (opt, cb) { + if (request.ws(opt, cb)) { + return; + } + }; + + request.ws = function (opt, cb, req) { + var ws; + if (!WS) { + return false; + } + + ws = request.ws.peers[opt.base]; + if (ws) { + req = req || {}; + if (opt.headers) { + req.headers = opt.headers; + } + if (opt.body) { + req.body = opt.body; + } + + if (opt.url) { + req.url = opt.url; + } + + req.headers = req.headers || {}; + + if (!opt.out && !ws.cbs[req.headers['ws-rid']]) { + var rid = 'WS' + + new Date().getTime() + + '.' + + Math.floor((Math.random() * 65535) + 1); + + req.headers['ws-rid'] = rid; + + ws.cbs[rid] = function (err, res) { + if (!res || res.body || res.end) { + delete ws.cbs[req.headers['ws-rid']]; + } + + cb(err, res); + }; + } + + if (!ws.readyState) { + setTimeout(function () { + request.ws(opt, cb, req); + }, 100); + + return true; + } + + ws.sending = true; + ws.send(JSON.stringify(req)); + return true; + } + + if (ws === false) { + return false; + } + + var wsURL = opt.base.replace('http', 'ws'); + + ws = request.ws.peers[opt.base] = new WS(wsURL); + ws.cbs = {}; + + ws.onopen = function () { + request.back = 2; + request.ws(opt, cb); + }; + + ws.onclose = function (event) { + + if (!ws || !event) { + return; + } + + if (ws.close instanceof Function) { + ws.close(); + } + + if (!ws.sending) { + ws = request.ws.peers[opt.base] = false; + request.transport(opt, cb); + return; + } + + request.each(ws.cbs, function (cb) { + cb({ + err: 'WebSocket disconnected!', + code: ws.sending ? (ws || {}).err || event.code : -1, + }); + }); + + // This will make the next request try to reconnect + ws = request.ws.peers[opt.base] = null; + + // TODO: Have the driver handle this! + setTimeout(function () { + + // opt here is a race condition, + // is it not? Does this matter? + request.ws(opt, function () {}); + }, request.back *= request.backoff); + }; + + ws.onmessage = function (msg) { + var res; + if (!msg || !msg.data) { + return; + } + try { + res = JSON.parse(msg.data); + } catch (error) { + return; + } + if (!res) { + return; + } + res.headers = res.headers || {}; + if (res.headers['ws-rid']) { + var cb = ws.cbs[res.headers['ws-rid']] || function () {}; + cb(null, res); + return; + } + + // emit extra events. + if (res.body) { + request.createServer.ing(res, function (res) { + res.out = true; + request(opt.base, null, null, res); + }); + } + }; + + ws.onerror = function (error) { + (ws || {}).err = error; + }; + + return true; + }; + request.ws.peers = {}; + request.ws.cbs = {}; + + request.each = function (obj, cb, as) { + if (!obj || !cb) { + return; + } + + for (var key in obj) { + if (obj.hasOwnProperty(key)) { + cb.call(as, obj[key], key); + } + } + }; + + return request; + }()); + + return Peer; +}()); + +// Handle read requests. +Gun.on('get', function (at) { + var gun = at.gun; + var opt = at.opt || {}; + var peers = opt.peers || gun.Back('opt.peers'); + + if (!peers || Gun.obj.empty(peers)) { + Gun.log.once('peers', 'Warning! You have no peers to connect to!'); + at.gun.Back(-1).on('in', {'@': at['#']}); + + return; + } + + // Create a new message. + var msg = { + + // msg ID + '#': at['#'] || Gun.text.random(9), + + // msg BODY + '$': at.get, + }; + + // Listen for a response. + // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? + Tab.on(msg['#'], function (err, data) { + var obj = { + '@': at['#'], + err: err, + put: data, + }; + + if (data) { + at.gun.Back(-1).on('out', obj); + } else { + at.gun.Back(-1).on('in', obj); + } + }); + + // Broadcast to all other peers. + Tab.peers(peers).send(msg, { + headers: { + 'gun-sid': Tab.server.sid, + }, + }); +}); + +// Handle write requests. +Gun.on('put', function (at) { + if (at['@']) { + return; + } + var opt = at.gun.Back('opt') || {}, peers = opt.peers; + if (!peers || Gun.obj.empty(peers)) { + Gun.log.once('peers', 'Warning! You have no peers to save to!'); + at.gun.Back(-1).on('in', {'@': at['#']}); + return; + } + if (opt.websocket === false || (at.opt && at.opt.websocket === false)) { + return; + } + var msg = { + + // msg ID + '#': at['#'] || Gun.text.random(9), + + // msg BODY + '$': at.put, + }; + + // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout? + Tab.on(msg['#'], function (err, ok) { + at.gun.Back(-1).on('in', { + '@': at['#'], + err: err, + ok: ok, + }); + }); + + Tab.peers(peers).send(msg, { + headers: { + 'gun-sid': Tab.server.sid, + }, + }); +}); + +// REVIEW: Do I need this on a server client? +// browser/client side Server! +// TODO: BUG! Does not respect separate instances!!! +Gun.on('opt', function (at) { + if (Tab.server) { + return; + } + + var gun = at.gun; + var server = Tab.server = Tab.server || {}; + var tmp; + + server.sid = Gun.text.random(); + + Tab.peers.request.createServer(function (req, res) { + + // Validate request. + if (!req || !res || !req.body || !req.headers) { + return; + } + + var msg = req.body; + + // AUTH for non-replies. + if (server.msg(msg['#'])) { + return; + } + + // no need to process. + if (msg['@']) { + if (Tab.ons[tmp = msg['@'] || msg['#']]) { + Tab.on(tmp, [msg['!'], msg.$]); + } + return; + } + + if (msg.$ && msg.$['#']) { + server.get(req, res); + return; + } + + server.put(req, res); + }); + + server.get = function (req, cb) { + var body = req.body; + var lex = body.$; + var graph = gun._.root._.graph; + var node; + + // Don't reply to data we don't have it in memory. + // TODO: Add localStorage? + if (!(node = graph[lex['#']])) { + return; + } + + cb({ + body: { + '#': server.msg(), + '@': body['#'], + '$': node, + }, + }); + }; + + server.put = function (req, cb) { + var body = req.body, graph = body.$; + var __ = gun._.root._; + + // filter out what we don't have in memory. + if (!(graph = Gun.obj.map(graph, function (node, soul, map) { + if (!__.path[soul]) { + return; + } + map(soul, node); + }))) { + return; + } + gun.on('out', { + gun: gun, + opt: { + websocket: false, + }, + put: graph, + '#': Gun.on.ask(function (ack, ev) { + if (!ack) { + return undefined; + } + ev.off(); + return cb({ + body: { + '#': server.msg(), + '@': body['#'], + '$': ack, + '!': ack.err, + }, + }); + }), + }); + }; + + server.msg = function (id) { + if (!id) { + id = Gun.text.random(9); + server.msg.debounce[id] = Gun.time.is(); + return id; + } + + clearTimeout(server.msg.clear); + server.msg.clear = setTimeout(function () { + var now = Gun.time.is(); + Gun.obj.map(server.msg.debounce, function (time, id) { + if ((now - time) < (1000 * 60 * 5)) { + return; + } + + Gun.obj.del(server.msg.debounce, id); + }); + }, 500); + + if (server.msg.debounce[id]) { + server.msg.debounce[id] = Gun.time.is(); + return id; + } + + server.msg.debounce[id] = Gun.time.is(); + return undefined; + }; + + server.msg.debounce = server.msg.debounce || {}; +}); diff --git a/lib/wsp/server-push.js b/lib/wsp/server-push.js new file mode 100644 index 00000000..e4c77f7e --- /dev/null +++ b/lib/wsp/server-push.js @@ -0,0 +1,156 @@ +'use strict'; +var Gun = require('../../gun.js'); + +/** + * Whether the gun instance is attached to a socket server. + * @param {Gun} gun - The gun instance in question. + * @param {WebSocket.Server} server - A socket server gun might be attached to. + * @return {Boolean} - Whether it's attached. + */ +function isUsingServer (gun, server) { + var servers = gun.Back(-1)._.servers; + + return servers ? servers.indexOf(server) !== -1 : false; +} + +/** + * Calls a function when (or if) a socket is ready for messages. + * @param {WebSocket} socket - A websocket connection. + * @param {Function} cb - Called if or when the socket is ready. + * @return {Boolean} - Whether the socket is able to take messages. + */ +function ready (socket, cb) { + var state = socket.readyState; + + // The socket is ready. + if (state === socket.OPEN) { + cb(); + return true; + } + + // Still opening. + if (state === socket.OPENING) { + socket.once('open', cb); + } + + // Nope, closing or closed. + return false; +} + +/** + * Send a request to a list of clients. + * @param {Obejct} context - A gun request context. + * @param {Object} clients - IDs mapped to socket instances. + * @param {Function} cb - Called for each response. + * @return {undefined} + */ +function request (context, clients, cb) { + Gun.obj.map(clients, function (client) { + ready(client, function () { + var msg = { + headers: {}, + body: { + '#': Gun.on.ask(cb), + '$': context.get, + }, + }; + + var serialized = JSON.stringify(msg); + client.send(serialized); + }); + }); +} + +/** + * Pushes a graph update to a collection of clients. + * @param {Object} context - The context object passed by gun. + * @param {Object} clients - An object mapping URLs to clients. + * @param {Function} cb - Invoked on each client response. + * @return {undefined} + */ +function update (context, clients, cb) { + Gun.obj.map(clients, function (client) { + ready(client, function () { + var msg = { + headers: {}, + body: { + '#': Gun.on.ask(cb), + '$': context.put, + }, + }; + + var serialized = JSON.stringify(msg); + + client.send(serialized); + }); + }); +} + +/** * Attaches server push middleware to gun. + * @param {Gun} gun - The gun instance to attach to. + * @param {WebSocket.Server} server - A websocket server instance. + * @return {server} - The socket server. + */ +function attach (gun, server) { + var root = gun.Back(-1); + root._.servers = root._.servers || []; + root._.servers.push(server); + var pool = {}; + + server.on('connection', function (socket) { + socket.id = socket.id || Gun.text.random(10); + pool[socket.id] = socket; + + socket.on('message', function (message) { + var data = Gun.obj.ify(message); + + if (!data || !data.body) { + return; + } + + var msg = data.body; + + if (msg['@']) { + Gun.on.ack(msg['@'], [msg['!'], msg.$]); + return; + } + }); + + socket.once('close', function () { + delete pool[socket.id]; + }); + }); + + Gun.on('get', function (context) { + if (!isUsingServer(context.gun, server)) { + return; + } + request(context, pool, function (err, data) { + var response = { + '@': context['#'], + put: data, + err: err, + }; + + var root = context.gun.Back(Infinity); + + root.on(data ? 'out' : 'in', response); + }); + }); + + Gun.on('put', function (context) { + if (!isUsingServer(context.gun, server)) { + return; + } + + update(context, pool, function (err, data) { + var ack = { + '!': err || null, + '$': data.$, + }; + Gun.on.ack(context, ack); + }); + }); +} + +module.exports = attach; diff --git a/lib/wsp/server.js b/lib/wsp/server.js new file mode 100644 index 00000000..b4e6fd16 --- /dev/null +++ b/lib/wsp/server.js @@ -0,0 +1,191 @@ +var Gun = require('../../gun') +, formidable = require('formidable') +, http = require('../http') +, url = require('url') +, wsp = {} +, WS = require('ws') +, WSS = WS.Server +, attach = require('./server-push'); + +// Handles server to server sync. +require('./client.js'); + +Gun.on('opt', function(at){ + var gun = at.gun, opt = at.opt; + gun.__ = at.root._; + gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {}; + + function start(server, port, app){ + if(app && app.use){ app.use(gun.wsp.server) } + server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server; + + if (!gun.wsp.ws) { + gun.wsp.ws = new WSS(gun.__.opt.ws); + attach(gun, gun.wsp.ws); + } + + gun.wsp.ws = gun.wsp.ws || new WSS(gun.__.opt.ws); + require('./ws')(gun.wsp.ws, function(req, res){ + var ws = this; + req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid']; + ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){ + if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() } + if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return } + if(msg && msg.headers){ delete msg.headers['ws-rid'] } + // TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id? + try{ws.send(Gun.text.ify(msg)); + }catch(e){} // juuuust in case. + }); + gun.wsp.wire(req, res); + }, {headers: {'ws-rid': 1, 'gun-sid': 1}}); + gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80; + } + var wsp = gun.wsp = gun.wsp || function(server){ + if(!server){ return gun } + if(Gun.fns.is(server.address)){ + if(server.address()){ + start(server, server.address().port); + return gun; + } + } + if(Gun.fns.is(server.get) && server.get('port')){ + start(server, server.get('port')); + return gun; + } + var listen = server.listen; + server.listen = function(port){ + var serve = listen.apply(server, arguments); + start(serve, port, server); + return serve; + } + return gun; + } + gun.wsp.on = gun.wsp.on || Gun.on; + gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i; + gun.wsp.poll = gun.wsp.poll || opt.poll || 1; + gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000; + gun.wsp.server = gun.wsp.server || function(req, res, next){ // http + next = next || function(){}; + if(!req || !res){ return next(), false } + if(!req.url){ return next(), false } + if(!req.method){ return next(), false } + var msg = {}; + msg.url = url.parse(req.url, true); + if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail! + if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){ + res.writeHead(200, {'Content-Type': 'text/javascript'}); + res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../../gun.js')); // gun server is caching the gun library for the client + return true; + } + + if(!req.upgrade){ + next(); + return false; + } + + return http(req, res, function(req, res){ + if(!req){ return next() } + var stream, cb = res = require('../jsonp')(req, res); + if(req.headers && (stream = req.headers['gun-sid'])){ + stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream}; + stream.drain = stream.drain || function(res){ + if(!res || !stream || !stream.queue || !stream.queue.length){ return } + res({headers: {'gun-sid': stream.sid}, body: stream.queue }); + stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull); + stream.reply = stream.queue = null; + return true; + } + stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){ + if(!stream){ return ev.off() } // self cleans up after itself! + if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return } + (stream.queue = stream.queue || []).push(req); + stream.drain(stream.reply); + }); + cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) } + clearTimeout(stream.off); + if(req.headers.pull){ + if(stream.drain(cb)){ return } + return stream.reply = cb; + } + } + gun.wsp.wire(req, cb); + }), true; + } + if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){ + require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity; + } + gun.wsp.msg = gun.wsp.msg || function(id){ + if(!id){ + return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id; + } + clearTimeout(gun.wsp.msg.clear); + gun.wsp.msg.clear = setTimeout(function(){ + var now = Gun.time.is(); + Gun.obj.map(gun.wsp.msg.debounce, function(t,id){ + if((now - t) < (1000 * 60 * 5)){ return } + Gun.obj.del(gun.wsp.msg.debounce, id); + }); + },500); + if(id = gun.wsp.msg.debounce[id]){ + return gun.wsp.msg.debounce[id] = Gun.time.is(), id; + } + gun.wsp.msg.debounce[id] = Gun.time.is(); + return; + }; + gun.wsp.msg.debounce = gun.wsp.msg.debounce || {}; + gun.wsp.wire = gun.wsp.wire || (function(){ + // all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers + // except for the ones that are listed in the message as having already been sending to. + // all states, implemented with GET, are replied to the source that asked for it. + function tran(req, res){ + if(!req || !res || !req.body || !req.headers){ return } + if(req.url){ req.url = url.format(req.url) } + var msg = req.body; + // AUTH for non-replies. + if(gun.wsp.msg(msg['#'])){ return } + gun.wsp.on('network', Gun.obj.copy(req)); + if(msg['@']){ return } // no need to process. + if(msg['$'] && msg['$']['#']){ return tran.get(req, res) } + //if(Gun.is.lex(msg['$'])){ return tran.get(req, res) } + else { return tran.put(req, res) } + cb({body: {hello: 'world'}}); + // TODO: BUG! server put should push. + } + tran.get = function(req, cb){ + var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; + gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){ + ev.off(); + var graph = at.put; + return cb({headers: reply.headers, body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': graph, + '!': at.err + }}); + })}); + } + tran.put = function(req, cb){ + // NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually. + // This will give you much more fine-grain control over security, transactions, and what not. + var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt; + gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ + //Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){ + ev.off(); + return cb({headers: reply.headers, body: { + '#': gun.wsp.msg(), + '@': body['#'], + '$': ack, + '!': ack.err + }}); + })}); + } + gun.wsp.on('network', function(rq){ + // TODO: MARK! You should move the networking events to here, not in WSS only. + }); + tran.json = 'application/json'; + return tran; + }()); + if(opt.server){ + wsp(opt.server); + } +}); diff --git a/lib/ws.js b/lib/wsp/ws.js similarity index 92% rename from lib/ws.js rename to lib/wsp/ws.js index 270ea0b4..45dc100a 100644 --- a/lib/ws.js +++ b/lib/wsp/ws.js @@ -1,4 +1,4 @@ -var Gun = require('../gun') +var Gun = require('../../gun') , url = require('url'); module.exports = function(wss, server, opt){ wss.on('connection', function(ws){ @@ -27,7 +27,7 @@ module.exports = function(wss, server, opt){ (reply.headers = reply.headers || {})['ws-rid'] = msg.headers['ws-rid']; } try{ws.send(Gun.text.ify(reply)); - }catch(e){} // juuuust in case. + }catch(e){} // juuuust in case. }); }); ws.off = function(m){