From b352cd742aca4f6eb276529f2b903daafe4f4e07 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Thu, 1 Dec 2016 16:51:35 -0700 Subject: [PATCH] Update client WebSocket plugin Lovely little abstraction layer over websockets sends messages using gun's new envelope system. Exponential backoff has not been implemented yet. If the socket fails to connect, it won't retry (yet). Also, this update left some dead code with the jsonp implementation. Soon that should be ported over to the new envelope system too, but now it's disconnected from gun. --- gun.js | 331 +++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 226 insertions(+), 105 deletions(-) diff --git a/gun.js b/gun.js index 00c635e2..304d4659 100644 --- a/gun.js +++ b/gun.js @@ -932,7 +932,7 @@ return gun; } function output(at){ - var cat = this, gun = cat.gun, tmp; + var cat = this, gun = cat.gun, tmp; // TODO: BUG! Outgoing `get` to read from in memory!!! if(at.get && get(at, cat)){ return } //if(at.put){ @@ -1431,8 +1431,8 @@ return gun; } function output(at){ - var cat = this, gun = cat.gun, root = gun.Back(-1), put, get, tmp; - if(!at.gun){ + var cat = this, gun = cat.gun, root = gun.Back(-1), put, get, tmp; + if(!at.gun){ at.gun = gun; } if(at.get && !at.get[_soul]){ @@ -1746,7 +1746,7 @@ via: at, put: at.put[cat.get] } - + } else { if(obj_has(at.put, cat.get)){ return ev.off() } at = { @@ -2174,7 +2174,7 @@ gun.Back(-1).on('in', {'@': at['#']}); } return; - } + } if(Gun.obj.has(lex, '.')){var tmp = data[lex['.']];data = {_: data._};if(u !== tmp){data[lex['.']] = tmp}} //console.log('@@@@@@@@@@@@local get', data, at); gun.Back(-1).on('in', {'@': at['#'], put: Gun.graph.node(data)}); @@ -2184,83 +2184,223 @@ Gun.on('get', get); })(require, './adapters/localStorage'); - ;require(function(module){ - function r(base, body, cb, opt){ - var o = base.length? {base: base} : {}; - o.base = opt.base || base; - o.body = opt.body || body; - o.headers = opt.headers; - o.url = opt.url; - o.out = opt.out; - cb = cb || function(){}; - if(!o.base){ return } - r.transport(o, cb); - } - r.createServer = function(fn){ r.createServer.s.push(fn) } - r.createServer.ing = function(req, cb){ - var i = r.createServer.s.length; - while(i--){ (r.createServer.s[i] || function(){})(req, cb) } - } - r.createServer.s = []; - r.back = 2; r.backoff = 2; - r.transport = function(opt, cb){ - //Gun.log("TRANSPORT:", opt); - if(r.ws(opt, cb)){ return } - r.jsonp(opt, cb); - } - r.ws = function(opt, cb, req){ - var ws, WS = window.WebSocket || window.mozWebSocket || window.webkitWebSocket; - if(!WS){ return } - if(ws = r.ws.peers[opt.base]){ - req = req || {}; - if(opt.headers){ req.headers = opt.headers } - if(opt.body){ req.body = opt.body } - if(opt.url){ req.url = opt.url } - req.headers = req.headers || {}; - if(!opt.out && !ws.cbs[req.headers['ws-rid']]){ - ws.cbs[req.headers['ws-rid'] = 'WS' + (+ new Date()) + '.' + Math.floor((Math.random()*65535)+1)] = function(err,res){ - if(!res || res.body || res.end){ delete ws.cbs[req.headers['ws-rid']] } - cb(err,res); - } - } - if(!ws.readyState){ return setTimeout(function(){ r.ws(opt, cb, req) },100), true } - ws.sending = true; - ws.send(JSON.stringify(req)); - return true; - } - if(ws === false){ return } - (ws = r.ws.peers[opt.base] = new WS(opt.base.replace('http','ws'))).cbs = {}; - ws.onopen = function(o){ r.back = 2; r.ws(opt, cb) }; - ws.onclose = window.onbeforeunload = function(c){ - if(!ws || !c){ return } - if(ws.close instanceof Function){ ws.close() } - if(!ws.sending){ - ws = r.ws.peers[opt.base] = false; - return r.transport(opt, cb); - } - r.each(ws.cbs, function(cb){ - cb({err: "WebSocket disconnected!", code: !ws.sending? -1 : (ws||{}).err || c.code}); - }); - ws = r.ws.peers[opt.base] = null; // this will make the next request try to reconnect - setTimeout(function(){ // TODO: Have the driver handle this! - r.ws(opt, function(){}); // opt here is a race condition, is it not? Does this matter? - }, r.back *= r.backoff); - }; - ws.onmessage = function(m){ var res; - if(!m || !m.data){ return } - try{res = JSON.parse(m.data); - }catch(e){ return } - if(!res){ return } - res.headers = res.headers || {}; - if(res.headers['ws-rid']){ return (ws.cbs[res.headers['ws-rid']]||function(){})(null, res) } - if(res.body){ r.createServer.ing(res, function(res){ res.out = true; r(opt.base, null, null, res)}) } // emit extra events. - }; - ws.onerror = function(e){ (ws||{}).err = e }; - return true; - } - r.ws.peers = {}; - r.ws.cbs = {}; - r.jsonp = function(opt, cb){ + ;require(function(module){ + /* eslint-enable */ + /* eslint-env browser */ + /* eslint-disable require-jsdoc */ + /* eslint "comma-dangle": ["error", "never"] */ + var Gun = require('./gun'); + + function Client (url) { + if (!(this instanceof Client)) { + return new Client(url); + } + + this.url = Client.formatURL(url); + this.socket = null; + this.queue = []; + this.sid = Gun.text.random(10); + + this.on = Gun.on; + } + + Client.prototype = { + constructor: Client, + + drainQueue: function () { + var queue = this.queue; + var client = this; + + // Reset the queue. + this.queue = []; + + // Send each message. + queue.forEach(function (msg) { + client.send(msg); + }); + + return queue.length; + }, + + connect: function () { + var client = this; + var socket = new Client.WebSocket(this.url); + this.socket = socket; + + // Forward messages into the emitter. + socket.addEventListener('message', function (msg) { + client.on('message', msg); + }); + + // Send the messages in the queue. + this.ready(function () { + client.drainQueue(); + }); + + return socket; + }, + + isClosed: function () { + var socket = this.socket; + + if (!socket) { + return true; + } + + var state = socket.readyState; + + if (state === socket.CLOSING || state === socket.CLOSED) { + return true; + } + + return false; + }, + + ready: function (callback) { + var socket = this.socket; + var state = socket.readyState; + + if (state === socket.OPEN) { + callback(); + return; + } + + if (state === socket.CONNECTING) { + socket.addEventListener('open', callback); + } + }, + + send: function (msg) { + if (this.isClosed()) { + this.queue.push(msg); + + // Will send once connected. + this.connect(); + return false; + } + + var socket = this.socket; + + // Make sure the socket is open. + this.ready(function () { + socket.send(msg); + }); + + return true; + } + }; + + if (typeof window !== 'undefined') { + Client.WebSocket = window.WebSocket || + window.webkitWebSocket || + window.mozWebSocket || + null; + } + + Client.isSupported = Client.WebSocket !== null; + + // Ensure the protocol is correct. + Client.formatURL = function (url) { + return url.replace('http', 'ws'); + }; + + // Send a message to a group of peers. + Client.broadcast = function (urls, msg) { + var pool = Client.pool; + msg.headers = msg.headers || {}; + + Gun.obj.map(urls, function (options, addr) { + + var url = Client.formatURL(addr); + + var peer = pool[url]; + + var envelope = { + headers: Gun.obj.to(msg.headers, { + 'gun-sid': peer.sid + }), + body: msg.body + }; + + var serialized = Gun.text.ify(envelope); + + peer.send(serialized); + }); + + }; + + // A map of URLs to client instances. + Client.pool = {}; + + // Close all WebSockets when the window closes. + if (typeof window !== 'undefined') { + window.addEventListener('unload', function () { + Gun.obj.map(Client.pool, function (client) { + if (client.isClosed()) { + return; + } + + client.socket.close(); + }); + }); + } + + // Define client instances as gun needs them. + // Sockets will not be opened until absolutely necessary. + Gun.on('opt', function (ctx) { + var gun = ctx.gun; + var peers = gun.Back('opt.peers') || {}; + + Gun.obj.map(peers, function (options, addr) { + var url = Client.formatURL(addr); + + // Ignore clients we've seen before. + if (Client.pool.hasOwnProperty(url)) { + return; + } + + var client = Client.pool[url] = new Client(url); + + // Listen to incoming messages. + client.on('message', function (msg) { + var data; + + try { + data = Gun.obj.ify(msg.data); + } catch (err) { + // Invalid message, discard it. + return; + } + + if (!data || !data.body) { + return; + } + + gun.on('in', data); + }); + }); + }); + + // Broadcast the messages. + Gun.on('out', function (ctx) { + var gun = ctx.gun; + + var peers = gun.Back('opt.peers') || {}; + + // Validate. + if (Gun.obj.empty(peers) || !WebSocket) { + return; + } + + var msg = Gun.text.ify(ctx); + + if (Client.isSupported) { + Client.broadcast(peers, { body: msg }); + } + }); + + function r () {} + r.jsonp = function(opt, cb){ r.jsonp.ify(opt, function(url){ if(!url){ return } r.jsonp.send(url, function(err, reply){ @@ -2341,29 +2481,10 @@ } } module.exports = r; - })(require, './polyfill/request'); + /* eslint-disable */ + })(require, './polyfill/request'); - ;require(function(module){ - P.request = require('./request'); - function P(p){ - if(!P.is(this)){ return new P(p) } - this.peers = p; - } - P.is = function(p){ return (p instanceof P) } - P.chain = P.prototype; - function map(peer, url){ - var msg = this.msg; - var opt = this.opt || {}; - opt.out = true; - P.request(url, msg, null, opt); - } - P.chain.send = function(msg, opt){ - P.request.each(this.peers, map, {msg: msg, opt: opt}); - } - module.exports = P; - })(require, './polyfill/peer'); - - ;require(function(module){ + ;require(function(module){ if(typeof JSON === 'undefined'){ throw new Error("Include JSON first: ajax.cdnjs.com/ajax/libs/json2/20110223/json2.js") } // for old IE use if(typeof Gun === 'undefined'){ return } // TODO: window.Websocket is Browser only. But it would be nice if it could somehow merge it with lib/WSP? @@ -2486,4 +2607,4 @@ })(require, './adapters/wsp'); -}()); \ No newline at end of file +}());