mirror of
https://github.com/amark/gun.git
synced 2025-06-24 15:02:33 +00:00
Merge pull request #267 from PsychoLlama/s2s-sync
Add server-to-server sync
This commit is contained in:
commit
14482abf0c
@ -3,7 +3,7 @@
|
||||
var Gun = require('../gun');
|
||||
console.log("TODO: MARK! UPDATE S3 DRIVER BEFORE PUBLISHING!")
|
||||
//require('./s3');
|
||||
require('./wsp');
|
||||
require('./wsp/server');
|
||||
require('./file');
|
||||
module.exports = Gun;
|
||||
}());
|
||||
|
230
lib/wsp.js
230
lib/wsp.js
@ -1,230 +0,0 @@
|
||||
;(function(wsp){
|
||||
/*
|
||||
TODO: SERVER PUSH!
|
||||
TODO: SERVER GET!
|
||||
TODO: SERVER PUSH!
|
||||
TODO: SERVER GET!
|
||||
TODO: SERVER PUSH!
|
||||
TODO: SERVER GET!
|
||||
TODO: SERVER PUSH!
|
||||
TODO: SERVER GET!
|
||||
TODO: SERVER PUSH!
|
||||
TODO: SERVER GET!
|
||||
TODO: SERVER PUSH!
|
||||
TODO: SERVER GET!
|
||||
*/
|
||||
var Gun = require('../gun')
|
||||
, formidable = require('formidable')
|
||||
, ws = require('ws').Server
|
||||
, http = require('./http')
|
||||
, url = require('url');
|
||||
Gun.on('opt', function(at){
|
||||
var gun = at.gun, opt = at.opt;
|
||||
gun.__ = at.root._;
|
||||
gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {};
|
||||
function start(server, port, app){
|
||||
if(app && app.use){ app.use(gun.wsp.server) }
|
||||
server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server;
|
||||
require('./ws')(gun.wsp.ws = gun.wsp.ws || new ws(gun.__.opt.ws), function(req, res){
|
||||
var ws = this;
|
||||
req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid'];
|
||||
ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){
|
||||
if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() }
|
||||
if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return }
|
||||
if(msg && msg.headers){ delete msg.headers['ws-rid'] }
|
||||
// TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id?
|
||||
try{ws.send(Gun.text.ify(msg));
|
||||
}catch(e){} // juuuust in case.
|
||||
});
|
||||
gun.wsp.wire(req, res);
|
||||
}, {headers: {'ws-rid': 1, 'gun-sid': 1}});
|
||||
gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80;
|
||||
}
|
||||
var wsp = gun.wsp = gun.wsp || function(server){
|
||||
if(!server){ return gun }
|
||||
if(Gun.fns.is(server.address)){
|
||||
if(server.address()){
|
||||
start(server, server.address().port);
|
||||
return gun;
|
||||
}
|
||||
}
|
||||
if(Gun.fns.is(server.get) && server.get('port')){
|
||||
start(server, server.get('port'));
|
||||
return gun;
|
||||
}
|
||||
var listen = server.listen;
|
||||
server.listen = function(port){
|
||||
var serve = listen.apply(server, arguments);
|
||||
start(serve, port, server);
|
||||
return serve;
|
||||
}
|
||||
return gun;
|
||||
}
|
||||
gun.wsp.on = gun.wsp.on || Gun.on;
|
||||
gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i;
|
||||
gun.wsp.poll = gun.wsp.poll || opt.poll || 1;
|
||||
gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000;
|
||||
gun.wsp.server = gun.wsp.server || function(req, res, next){ // http
|
||||
next = next || function(){};
|
||||
if(!req || !res){ return next(), false }
|
||||
if(!req.url){ return next(), false }
|
||||
if(!req.method){ return next(), false }
|
||||
var msg = {};
|
||||
msg.url = url.parse(req.url, true);
|
||||
if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail!
|
||||
if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){
|
||||
res.writeHead(200, {'Content-Type': 'text/javascript'});
|
||||
res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../gun.js')); // gun server is caching the gun library for the client
|
||||
return true;
|
||||
}
|
||||
if(!req.upgrade){ return next(), false }
|
||||
return http(req, res, function(req, res){
|
||||
if(!req){ return next() }
|
||||
var stream, cb = res = require('./jsonp')(req, res);
|
||||
if(req.headers && (stream = req.headers['gun-sid'])){
|
||||
stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream};
|
||||
stream.drain = stream.drain || function(res){
|
||||
if(!res || !stream || !stream.queue || !stream.queue.length){ return }
|
||||
res({headers: {'gun-sid': stream.sid}, body: stream.queue });
|
||||
stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull);
|
||||
stream.reply = stream.queue = null;
|
||||
return true;
|
||||
}
|
||||
stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){
|
||||
if(!stream){ return ev.off() } // self cleans up after itself!
|
||||
if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return }
|
||||
(stream.queue = stream.queue || []).push(req);
|
||||
stream.drain(stream.reply);
|
||||
});
|
||||
cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) }
|
||||
clearTimeout(stream.off);
|
||||
if(req.headers.pull){
|
||||
if(stream.drain(cb)){ return }
|
||||
return stream.reply = cb;
|
||||
}
|
||||
}
|
||||
gun.wsp.wire(req, cb);
|
||||
}), true;
|
||||
}
|
||||
if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){
|
||||
require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity;
|
||||
}
|
||||
gun.wsp.msg = gun.wsp.msg || function(id){
|
||||
if(!id){
|
||||
return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id;
|
||||
}
|
||||
clearTimeout(gun.wsp.msg.clear);
|
||||
gun.wsp.msg.clear = setTimeout(function(){
|
||||
var now = Gun.time.is();
|
||||
Gun.obj.map(gun.wsp.msg.debounce, function(t,id){
|
||||
if((now - t) < (1000 * 60 * 5)){ return }
|
||||
Gun.obj.del(gun.wsp.msg.debounce, id);
|
||||
});
|
||||
},500);
|
||||
if(id = gun.wsp.msg.debounce[id]){
|
||||
return gun.wsp.msg.debounce[id] = Gun.time.is(), id;
|
||||
}
|
||||
gun.wsp.msg.debounce[id] = Gun.time.is();
|
||||
return;
|
||||
};
|
||||
gun.wsp.msg.debounce = gun.wsp.msg.debounce || {};
|
||||
gun.wsp.wire = gun.wsp.wire || (function(){
|
||||
// all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers
|
||||
// except for the ones that are listed in the message as having already been sending to.
|
||||
// all states, implemented with GET, are replied to the source that asked for it.
|
||||
function tran(req, res){
|
||||
if(!req || !res || !req.body || !req.headers){ return }
|
||||
if(req.url){ req.url = url.format(req.url) }
|
||||
var msg = req.body;
|
||||
// AUTH for non-replies.
|
||||
if(gun.wsp.msg(msg['#'])){ return }
|
||||
gun.wsp.on('network', Gun.obj.copy(req));
|
||||
if(msg['@']){ return } // no need to process.
|
||||
if(msg['$'] && msg['$']['#']){ return tran.get(req, res) }
|
||||
//if(Gun.is.lex(msg['$'])){ return tran.get(req, res) }
|
||||
else { return tran.put(req, res) }
|
||||
cb({body: {hello: 'world'}});
|
||||
// TODO: BUG! server put should push.
|
||||
}
|
||||
tran.get = function(req, cb){
|
||||
var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
|
||||
gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){
|
||||
ev.off();
|
||||
var graph = at.put;
|
||||
return cb({headers: reply.headers, body: {
|
||||
'#': gun.wsp.msg(),
|
||||
'@': body['#'],
|
||||
'$': graph,
|
||||
'!': at.err
|
||||
}});
|
||||
return;
|
||||
if(Gun.obj.empty(node)){
|
||||
return cb({headers: reply.headers, body: node});
|
||||
} // we're out of stuff!
|
||||
/*
|
||||
(function(chunks){ // FEATURE! Stream chunks if the nodes are large!
|
||||
var max = 10, count = 0, soul = Gun.is.node.soul(node);
|
||||
if(Object.keys(node).length > max){
|
||||
var n = Gun.is.node.soul.ify({}, soul);
|
||||
Gun.obj.map(node, function(val, field){
|
||||
if(!(++count % max)){
|
||||
cb({headers: reply.headers, chunk: n}); // send node chunks
|
||||
n = Gun.is.node.soul.ify({}, soul);
|
||||
}
|
||||
Gun.is.node.state.ify([n, node], field, val);
|
||||
});
|
||||
if(count % max){ // finish off the last chunk
|
||||
cb({headers: reply.headers, chunk: n});
|
||||
}
|
||||
} else {
|
||||
cb({headers: reply.headers, chunk: node}); // send full node
|
||||
}
|
||||
}([]));
|
||||
*/
|
||||
cb({headers: reply.headers, chunk: node }); // Use this if you don't want streaming chunks feature.
|
||||
})});
|
||||
}
|
||||
tran.put = function(req, cb){
|
||||
//console.log("tran.put", req);
|
||||
// NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually.
|
||||
// This will give you much more fine-grain control over security, transactions, and what not.
|
||||
var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
|
||||
gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){
|
||||
//Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){
|
||||
ev.off();
|
||||
return cb({headers: reply.headers, body: {
|
||||
'#': gun.wsp.msg(),
|
||||
'@': body['#'],
|
||||
'$': ack,
|
||||
'!': ack.err
|
||||
}});
|
||||
})});
|
||||
return;
|
||||
if(Gun.is.graph(req.body)){
|
||||
if(req.err = Gun.union(gun, req.body, function(err, ctx){ // TODO: BUG? Probably should give me ctx.graph
|
||||
if(err){ return cb({headers: reply.headers, body: {err: err || "Union failed."}}) }
|
||||
var ctx = ctx || {}; ctx.graph = {};
|
||||
Gun.is.graph(req.body, function(node, soul){
|
||||
ctx.graph[soul] = gun.__.graph[soul];
|
||||
});
|
||||
(gun.__.opt.wire.put || function(g,cb){cb("No save.")})(ctx.graph, function(err, ok){
|
||||
if(err){ return cb({headers: reply.headers, body: {err: err || "Failed."}}) } // TODO: err should already be an error object?
|
||||
cb({headers: reply.headers, body: {ok: ok || "Persisted."}});
|
||||
//console.log("tran.put <------------------------", ok);
|
||||
});
|
||||
}).err){ cb({headers: reply.headers, body: {err: req.err || "Union failed."}}) }
|
||||
} else {
|
||||
cb({headers: reply.headers, body: {err: "Not a valid graph!"}});
|
||||
}
|
||||
}
|
||||
gun.wsp.on('network', function(req){
|
||||
// TODO: MARK! You should move the networking events to here, not in WSS only.
|
||||
});
|
||||
tran.json = 'application/json';
|
||||
return tran;
|
||||
}());
|
||||
if(opt.server){
|
||||
wsp(opt.server);
|
||||
}
|
||||
});
|
||||
}({}));
|
176
lib/wsp/Peer.js
Normal file
176
lib/wsp/Peer.js
Normal file
@ -0,0 +1,176 @@
|
||||
var WebSocket = require('ws');
|
||||
|
||||
/**
|
||||
* Calculates backoff instances.
|
||||
* @param {Object} [options] - Override the default settings.
|
||||
* @param {Object} options.time=50 - Initial backoff time.
|
||||
* @param {Object} options.factor=2 - How much to multiply the time by.
|
||||
* @class
|
||||
*/
|
||||
function Backoff (options) {
|
||||
this.options = options || {};
|
||||
|
||||
// Sets the initial backoff settings.
|
||||
this.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments the time by the factor.
|
||||
* @return {Number} - The next backoff time.
|
||||
*/
|
||||
Backoff.prototype.next = function () {
|
||||
this.time *= this.factor;
|
||||
|
||||
return this.time;
|
||||
};
|
||||
|
||||
/**
|
||||
* Resets the backoff state to it's original condition.
|
||||
* @return {Backoff} - The context.
|
||||
*/
|
||||
Backoff.prototype.reset = function () {
|
||||
var options = this.options;
|
||||
|
||||
this.time = options.time || 50;
|
||||
this.factor = options.factor || 2;
|
||||
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a websocket client and handle reconnect backoff logic.
|
||||
* @param {String} url - A preformatted url (starts with ws://)
|
||||
* @param {Object} [options] - Override how the socket is managed.
|
||||
* @param {Object} options.backoff - Backoff options (see the constructor).
|
||||
* @class
|
||||
*/
|
||||
function Peer (url, options) {
|
||||
if (!(this instanceof Peer)) {
|
||||
return new Peer(url, options);
|
||||
}
|
||||
|
||||
this.options = options || {};
|
||||
|
||||
// Messages sent while offline.
|
||||
this.offline = [];
|
||||
|
||||
this.url = Peer.formatURL(url);
|
||||
this.backoff = new Backoff(this.options.backoff);
|
||||
this.retry(url);
|
||||
}
|
||||
|
||||
/**
|
||||
* Turns http URLs into WebSocket URLs.
|
||||
* @param {String} url - The url to format.
|
||||
* @return {String} - A correctly formatted WebSocket URL.
|
||||
*/
|
||||
Peer.formatURL = function (url) {
|
||||
|
||||
// Works for `https` and `wss` URLs, too.
|
||||
return url.replace('http', 'ws');
|
||||
};
|
||||
|
||||
var API = Peer.prototype;
|
||||
|
||||
/**
|
||||
* Attempts a websocket connection.
|
||||
* @param {String} url - The websocket URL.
|
||||
* @return {WebSocket} - The new websocket instance.
|
||||
*/
|
||||
API.retry = function () {
|
||||
var url = this.url;
|
||||
|
||||
var socket = new WebSocket(url);
|
||||
this.socket = socket;
|
||||
|
||||
this.retryOnDisconnect(socket);
|
||||
|
||||
this.sendOnConnection();
|
||||
|
||||
return socket;
|
||||
};
|
||||
|
||||
/**
|
||||
* Sends the messages that couldn't be sent before once
|
||||
* the connection is open.
|
||||
* @return {Peer} - The context.
|
||||
*/
|
||||
API.sendOnConnection = function () {
|
||||
var peer = this;
|
||||
var queue = this.offline;
|
||||
var socket = this.socket;
|
||||
|
||||
// Wait for the socket to connect.
|
||||
socket.once('open', function () {
|
||||
queue.forEach(function (msg) {
|
||||
socket.send(msg);
|
||||
});
|
||||
|
||||
peer.offline = [];
|
||||
});
|
||||
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Schedules the next retry, according to the backoff.
|
||||
* @param {Peer} peer - A peer instance.
|
||||
* @return {Timeout} - The timeout value from `setTimeout`.
|
||||
*/
|
||||
function schedule (peer) {
|
||||
var backoff = peer.backoff;
|
||||
var time = backoff.time;
|
||||
backoff.next();
|
||||
|
||||
return setTimeout(function () {
|
||||
var socket = peer.retry();
|
||||
|
||||
// Successfully reconnected? Reset the backoff.
|
||||
socket.once('open', backoff.reset.bind(backoff));
|
||||
}, time);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attaches handlers to the socket, attempting reconnection
|
||||
* when it's closed.
|
||||
* @param {WebSocket} socket - The websocket instance to bind to.
|
||||
* @return {WebSocket} - The same websocket.
|
||||
*/
|
||||
API.retryOnDisconnect = function (socket) {
|
||||
var peer = this;
|
||||
|
||||
// Listen for socket close events.
|
||||
socket.once('close', function () {
|
||||
schedule(peer);
|
||||
});
|
||||
|
||||
socket.on('error', function (error) {
|
||||
if (error.code === 'ECONNREFUSED') {
|
||||
schedule(peer);
|
||||
}
|
||||
});
|
||||
|
||||
return socket;
|
||||
};
|
||||
|
||||
/**
|
||||
* Send data through the socket, or add it to a queue
|
||||
* of offline requests if it's not ready yet.
|
||||
* @param {String} msg - The data to send.
|
||||
* @return {Peer} - The context.
|
||||
*/
|
||||
API.send = function (msg) {
|
||||
var socket = this.socket;
|
||||
var state = socket.readyState;
|
||||
var ready = socket.OPEN;
|
||||
|
||||
if (state === ready) {
|
||||
socket.send(msg);
|
||||
} else {
|
||||
this.offline.push(msg);
|
||||
}
|
||||
|
||||
return this;
|
||||
};
|
||||
|
||||
module.exports = Peer;
|
460
lib/wsp/client.js
Normal file
460
lib/wsp/client.js
Normal file
@ -0,0 +1,460 @@
|
||||
/* eslint-env node*/
|
||||
/*
|
||||
eslint-disable
|
||||
require-jsdoc,
|
||||
no-warning-comments,
|
||||
no-underscore-dangle,
|
||||
max-params,
|
||||
*/
|
||||
'use strict';
|
||||
|
||||
var Gun = require('../../gun');
|
||||
var WS = require('ws');
|
||||
|
||||
var Tab = {};
|
||||
Tab.on = Gun.on;
|
||||
Tab.peers = (function () {
|
||||
|
||||
function Peer (peers) {
|
||||
if (!Peer.is(this)) {
|
||||
return new Peer(peers);
|
||||
}
|
||||
|
||||
this.peers = peers;
|
||||
}
|
||||
|
||||
Peer.is = function (peer) {
|
||||
return peer instanceof Peer;
|
||||
};
|
||||
|
||||
function map (peer, url) {
|
||||
var msg = this.msg;
|
||||
var opt = this.opt || {};
|
||||
opt.out = true;
|
||||
Peer.request(url, msg, null, opt);
|
||||
}
|
||||
|
||||
Peer.prototype.send = function (msg, opt) {
|
||||
Peer.request.each(this.peers, map, {
|
||||
msg: msg,
|
||||
opt: opt,
|
||||
});
|
||||
};
|
||||
|
||||
Peer.request = (function () {
|
||||
|
||||
function request (base, body, cb, opt) {
|
||||
|
||||
var obj = base.length ? { base: base } : {};
|
||||
obj.base = opt.base || base;
|
||||
obj.body = opt.body || body;
|
||||
obj.headers = opt.headers;
|
||||
obj.url = opt.url;
|
||||
obj.out = opt.out;
|
||||
cb = cb || function () {};
|
||||
|
||||
if (!obj.base) {
|
||||
return;
|
||||
}
|
||||
|
||||
request.transport(obj, cb);
|
||||
}
|
||||
|
||||
request.createServer = function (fn) {
|
||||
request.createServer.list.push(fn);
|
||||
};
|
||||
|
||||
request.createServer.ing = function (req, cb) {
|
||||
var index = request.createServer.list.length;
|
||||
var server;
|
||||
while (index) {
|
||||
index -= 1;
|
||||
server = request.createServer.list[index] || function () {};
|
||||
server(req, cb);
|
||||
}
|
||||
};
|
||||
|
||||
request.createServer.list = [];
|
||||
request.back = 2;
|
||||
request.backoff = 2;
|
||||
|
||||
request.transport = function (opt, cb) {
|
||||
if (request.ws(opt, cb)) {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
request.ws = function (opt, cb, req) {
|
||||
var ws;
|
||||
if (!WS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ws = request.ws.peers[opt.base];
|
||||
if (ws) {
|
||||
req = req || {};
|
||||
if (opt.headers) {
|
||||
req.headers = opt.headers;
|
||||
}
|
||||
if (opt.body) {
|
||||
req.body = opt.body;
|
||||
}
|
||||
|
||||
if (opt.url) {
|
||||
req.url = opt.url;
|
||||
}
|
||||
|
||||
req.headers = req.headers || {};
|
||||
|
||||
if (!opt.out && !ws.cbs[req.headers['ws-rid']]) {
|
||||
var rid = 'WS' +
|
||||
new Date().getTime() +
|
||||
'.' +
|
||||
Math.floor((Math.random() * 65535) + 1);
|
||||
|
||||
req.headers['ws-rid'] = rid;
|
||||
|
||||
ws.cbs[rid] = function (err, res) {
|
||||
if (!res || res.body || res.end) {
|
||||
delete ws.cbs[req.headers['ws-rid']];
|
||||
}
|
||||
|
||||
cb(err, res);
|
||||
};
|
||||
}
|
||||
|
||||
if (!ws.readyState) {
|
||||
setTimeout(function () {
|
||||
request.ws(opt, cb, req);
|
||||
}, 100);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
ws.sending = true;
|
||||
ws.send(JSON.stringify(req));
|
||||
return true;
|
||||
}
|
||||
|
||||
if (ws === false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
var wsURL = opt.base.replace('http', 'ws');
|
||||
|
||||
ws = request.ws.peers[opt.base] = new WS(wsURL);
|
||||
ws.cbs = {};
|
||||
|
||||
ws.onopen = function () {
|
||||
request.back = 2;
|
||||
request.ws(opt, cb);
|
||||
};
|
||||
|
||||
ws.onclose = function (event) {
|
||||
|
||||
if (!ws || !event) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (ws.close instanceof Function) {
|
||||
ws.close();
|
||||
}
|
||||
|
||||
if (!ws.sending) {
|
||||
ws = request.ws.peers[opt.base] = false;
|
||||
request.transport(opt, cb);
|
||||
return;
|
||||
}
|
||||
|
||||
request.each(ws.cbs, function (cb) {
|
||||
cb({
|
||||
err: 'WebSocket disconnected!',
|
||||
code: ws.sending ? (ws || {}).err || event.code : -1,
|
||||
});
|
||||
});
|
||||
|
||||
// This will make the next request try to reconnect
|
||||
ws = request.ws.peers[opt.base] = null;
|
||||
|
||||
// TODO: Have the driver handle this!
|
||||
setTimeout(function () {
|
||||
|
||||
// opt here is a race condition,
|
||||
// is it not? Does this matter?
|
||||
request.ws(opt, function () {});
|
||||
}, request.back *= request.backoff);
|
||||
};
|
||||
|
||||
ws.onmessage = function (msg) {
|
||||
var res;
|
||||
if (!msg || !msg.data) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
res = JSON.parse(msg.data);
|
||||
} catch (error) {
|
||||
return;
|
||||
}
|
||||
if (!res) {
|
||||
return;
|
||||
}
|
||||
res.headers = res.headers || {};
|
||||
if (res.headers['ws-rid']) {
|
||||
var cb = ws.cbs[res.headers['ws-rid']] || function () {};
|
||||
cb(null, res);
|
||||
return;
|
||||
}
|
||||
|
||||
// emit extra events.
|
||||
if (res.body) {
|
||||
request.createServer.ing(res, function (res) {
|
||||
res.out = true;
|
||||
request(opt.base, null, null, res);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
ws.onerror = function (error) {
|
||||
(ws || {}).err = error;
|
||||
};
|
||||
|
||||
return true;
|
||||
};
|
||||
request.ws.peers = {};
|
||||
request.ws.cbs = {};
|
||||
|
||||
request.each = function (obj, cb, as) {
|
||||
if (!obj || !cb) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (var key in obj) {
|
||||
if (obj.hasOwnProperty(key)) {
|
||||
cb.call(as, obj[key], key);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return request;
|
||||
}());
|
||||
|
||||
return Peer;
|
||||
}());
|
||||
|
||||
// Handle read requests.
|
||||
Gun.on('get', function (at) {
|
||||
var gun = at.gun;
|
||||
var opt = at.opt || {};
|
||||
var peers = opt.peers || gun.Back('opt.peers');
|
||||
|
||||
if (!peers || Gun.obj.empty(peers)) {
|
||||
Gun.log.once('peers', 'Warning! You have no peers to connect to!');
|
||||
at.gun.Back(-1).on('in', {'@': at['#']});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Create a new message.
|
||||
var msg = {
|
||||
|
||||
// msg ID
|
||||
'#': at['#'] || Gun.text.random(9),
|
||||
|
||||
// msg BODY
|
||||
'$': at.get,
|
||||
};
|
||||
|
||||
// Listen for a response.
|
||||
// TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
|
||||
Tab.on(msg['#'], function (err, data) {
|
||||
var obj = {
|
||||
'@': at['#'],
|
||||
err: err,
|
||||
put: data,
|
||||
};
|
||||
|
||||
if (data) {
|
||||
at.gun.Back(-1).on('out', obj);
|
||||
} else {
|
||||
at.gun.Back(-1).on('in', obj);
|
||||
}
|
||||
});
|
||||
|
||||
// Broadcast to all other peers.
|
||||
Tab.peers(peers).send(msg, {
|
||||
headers: {
|
||||
'gun-sid': Tab.server.sid,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
// Handle write requests.
|
||||
Gun.on('put', function (at) {
|
||||
if (at['@']) {
|
||||
return;
|
||||
}
|
||||
var opt = at.gun.Back('opt') || {}, peers = opt.peers;
|
||||
if (!peers || Gun.obj.empty(peers)) {
|
||||
Gun.log.once('peers', 'Warning! You have no peers to save to!');
|
||||
at.gun.Back(-1).on('in', {'@': at['#']});
|
||||
return;
|
||||
}
|
||||
if (opt.websocket === false || (at.opt && at.opt.websocket === false)) {
|
||||
return;
|
||||
}
|
||||
var msg = {
|
||||
|
||||
// msg ID
|
||||
'#': at['#'] || Gun.text.random(9),
|
||||
|
||||
// msg BODY
|
||||
'$': at.put,
|
||||
};
|
||||
|
||||
// TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
|
||||
Tab.on(msg['#'], function (err, ok) {
|
||||
at.gun.Back(-1).on('in', {
|
||||
'@': at['#'],
|
||||
err: err,
|
||||
ok: ok,
|
||||
});
|
||||
});
|
||||
|
||||
Tab.peers(peers).send(msg, {
|
||||
headers: {
|
||||
'gun-sid': Tab.server.sid,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
// REVIEW: Do I need this on a server client?
|
||||
// browser/client side Server!
|
||||
// TODO: BUG! Does not respect separate instances!!!
|
||||
Gun.on('opt', function (at) {
|
||||
if (Tab.server) {
|
||||
return;
|
||||
}
|
||||
|
||||
var gun = at.gun;
|
||||
var server = Tab.server = Tab.server || {};
|
||||
var tmp;
|
||||
|
||||
server.sid = Gun.text.random();
|
||||
|
||||
Tab.peers.request.createServer(function (req, res) {
|
||||
|
||||
// Validate request.
|
||||
if (!req || !res || !req.body || !req.headers) {
|
||||
return;
|
||||
}
|
||||
|
||||
var msg = req.body;
|
||||
|
||||
// AUTH for non-replies.
|
||||
if (server.msg(msg['#'])) {
|
||||
return;
|
||||
}
|
||||
|
||||
// no need to process.
|
||||
if (msg['@']) {
|
||||
if (Tab.ons[tmp = msg['@'] || msg['#']]) {
|
||||
Tab.on(tmp, [msg['!'], msg.$]);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.$ && msg.$['#']) {
|
||||
server.get(req, res);
|
||||
return;
|
||||
}
|
||||
|
||||
server.put(req, res);
|
||||
});
|
||||
|
||||
server.get = function (req, cb) {
|
||||
var body = req.body;
|
||||
var lex = body.$;
|
||||
var graph = gun._.root._.graph;
|
||||
var node;
|
||||
|
||||
// Don't reply to data we don't have it in memory.
|
||||
// TODO: Add localStorage?
|
||||
if (!(node = graph[lex['#']])) {
|
||||
return;
|
||||
}
|
||||
|
||||
cb({
|
||||
body: {
|
||||
'#': server.msg(),
|
||||
'@': body['#'],
|
||||
'$': node,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
server.put = function (req, cb) {
|
||||
var body = req.body, graph = body.$;
|
||||
var __ = gun._.root._;
|
||||
|
||||
// filter out what we don't have in memory.
|
||||
if (!(graph = Gun.obj.map(graph, function (node, soul, map) {
|
||||
if (!__.path[soul]) {
|
||||
return;
|
||||
}
|
||||
map(soul, node);
|
||||
}))) {
|
||||
return;
|
||||
}
|
||||
gun.on('out', {
|
||||
gun: gun,
|
||||
opt: {
|
||||
websocket: false,
|
||||
},
|
||||
put: graph,
|
||||
'#': Gun.on.ask(function (ack, ev) {
|
||||
if (!ack) {
|
||||
return undefined;
|
||||
}
|
||||
ev.off();
|
||||
return cb({
|
||||
body: {
|
||||
'#': server.msg(),
|
||||
'@': body['#'],
|
||||
'$': ack,
|
||||
'!': ack.err,
|
||||
},
|
||||
});
|
||||
}),
|
||||
});
|
||||
};
|
||||
|
||||
server.msg = function (id) {
|
||||
if (!id) {
|
||||
id = Gun.text.random(9);
|
||||
server.msg.debounce[id] = Gun.time.is();
|
||||
return id;
|
||||
}
|
||||
|
||||
clearTimeout(server.msg.clear);
|
||||
server.msg.clear = setTimeout(function () {
|
||||
var now = Gun.time.is();
|
||||
Gun.obj.map(server.msg.debounce, function (time, id) {
|
||||
if ((now - time) < (1000 * 60 * 5)) {
|
||||
return;
|
||||
}
|
||||
|
||||
Gun.obj.del(server.msg.debounce, id);
|
||||
});
|
||||
}, 500);
|
||||
|
||||
if (server.msg.debounce[id]) {
|
||||
server.msg.debounce[id] = Gun.time.is();
|
||||
return id;
|
||||
}
|
||||
|
||||
server.msg.debounce[id] = Gun.time.is();
|
||||
return undefined;
|
||||
};
|
||||
|
||||
server.msg.debounce = server.msg.debounce || {};
|
||||
});
|
156
lib/wsp/server-push.js
Normal file
156
lib/wsp/server-push.js
Normal file
@ -0,0 +1,156 @@
|
||||
'use strict';
|
||||
var Gun = require('../../gun.js');
|
||||
|
||||
/**
|
||||
* Whether the gun instance is attached to a socket server.
|
||||
* @param {Gun} gun - The gun instance in question.
|
||||
* @param {WebSocket.Server} server - A socket server gun might be attached to.
|
||||
* @return {Boolean} - Whether it's attached.
|
||||
*/
|
||||
function isUsingServer (gun, server) {
|
||||
var servers = gun.Back(-1)._.servers;
|
||||
|
||||
return servers ? servers.indexOf(server) !== -1 : false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls a function when (or if) a socket is ready for messages.
|
||||
* @param {WebSocket} socket - A websocket connection.
|
||||
* @param {Function} cb - Called if or when the socket is ready.
|
||||
* @return {Boolean} - Whether the socket is able to take messages.
|
||||
*/
|
||||
function ready (socket, cb) {
|
||||
var state = socket.readyState;
|
||||
|
||||
// The socket is ready.
|
||||
if (state === socket.OPEN) {
|
||||
cb();
|
||||
return true;
|
||||
}
|
||||
|
||||
// Still opening.
|
||||
if (state === socket.OPENING) {
|
||||
socket.once('open', cb);
|
||||
}
|
||||
|
||||
// Nope, closing or closed.
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a request to a list of clients.
|
||||
* @param {Obejct} context - A gun request context.
|
||||
* @param {Object} clients - IDs mapped to socket instances.
|
||||
* @param {Function} cb - Called for each response.
|
||||
* @return {undefined}
|
||||
*/
|
||||
function request (context, clients, cb) {
|
||||
Gun.obj.map(clients, function (client) {
|
||||
ready(client, function () {
|
||||
var msg = {
|
||||
headers: {},
|
||||
body: {
|
||||
'#': Gun.on.ask(cb),
|
||||
'$': context.get,
|
||||
},
|
||||
};
|
||||
|
||||
var serialized = JSON.stringify(msg);
|
||||
client.send(serialized);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Pushes a graph update to a collection of clients.
|
||||
* @param {Object} context - The context object passed by gun.
|
||||
* @param {Object} clients - An object mapping URLs to clients.
|
||||
* @param {Function} cb - Invoked on each client response.
|
||||
* @return {undefined}
|
||||
*/
|
||||
function update (context, clients, cb) {
|
||||
Gun.obj.map(clients, function (client) {
|
||||
ready(client, function () {
|
||||
var msg = {
|
||||
headers: {},
|
||||
body: {
|
||||
'#': Gun.on.ask(cb),
|
||||
'$': context.put,
|
||||
},
|
||||
};
|
||||
|
||||
var serialized = JSON.stringify(msg);
|
||||
|
||||
client.send(serialized);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/** * Attaches server push middleware to gun.
|
||||
* @param {Gun} gun - The gun instance to attach to.
|
||||
* @param {WebSocket.Server} server - A websocket server instance.
|
||||
* @return {server} - The socket server.
|
||||
*/
|
||||
function attach (gun, server) {
|
||||
var root = gun.Back(-1);
|
||||
root._.servers = root._.servers || [];
|
||||
root._.servers.push(server);
|
||||
var pool = {};
|
||||
|
||||
server.on('connection', function (socket) {
|
||||
socket.id = socket.id || Gun.text.random(10);
|
||||
pool[socket.id] = socket;
|
||||
|
||||
socket.on('message', function (message) {
|
||||
var data = Gun.obj.ify(message);
|
||||
|
||||
if (!data || !data.body) {
|
||||
return;
|
||||
}
|
||||
|
||||
var msg = data.body;
|
||||
|
||||
if (msg['@']) {
|
||||
Gun.on.ack(msg['@'], [msg['!'], msg.$]);
|
||||
return;
|
||||
}
|
||||
});
|
||||
|
||||
socket.once('close', function () {
|
||||
delete pool[socket.id];
|
||||
});
|
||||
});
|
||||
|
||||
Gun.on('get', function (context) {
|
||||
if (!isUsingServer(context.gun, server)) {
|
||||
return;
|
||||
}
|
||||
request(context, pool, function (err, data) {
|
||||
var response = {
|
||||
'@': context['#'],
|
||||
put: data,
|
||||
err: err,
|
||||
};
|
||||
|
||||
var root = context.gun.Back(Infinity);
|
||||
|
||||
root.on(data ? 'out' : 'in', response);
|
||||
});
|
||||
});
|
||||
|
||||
Gun.on('put', function (context) {
|
||||
if (!isUsingServer(context.gun, server)) {
|
||||
return;
|
||||
}
|
||||
|
||||
update(context, pool, function (err, data) {
|
||||
var ack = {
|
||||
'!': err || null,
|
||||
'$': data.$,
|
||||
};
|
||||
Gun.on.ack(context, ack);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = attach;
|
191
lib/wsp/server.js
Normal file
191
lib/wsp/server.js
Normal file
@ -0,0 +1,191 @@
|
||||
var Gun = require('../../gun')
|
||||
, formidable = require('formidable')
|
||||
, http = require('../http')
|
||||
, url = require('url')
|
||||
, wsp = {}
|
||||
, WS = require('ws')
|
||||
, WSS = WS.Server
|
||||
, attach = require('./server-push');
|
||||
|
||||
// Handles server to server sync.
|
||||
require('./client.js');
|
||||
|
||||
Gun.on('opt', function(at){
|
||||
var gun = at.gun, opt = at.opt;
|
||||
gun.__ = at.root._;
|
||||
gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {};
|
||||
|
||||
function start(server, port, app){
|
||||
if(app && app.use){ app.use(gun.wsp.server) }
|
||||
server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server;
|
||||
|
||||
if (!gun.wsp.ws) {
|
||||
gun.wsp.ws = new WSS(gun.__.opt.ws);
|
||||
attach(gun, gun.wsp.ws);
|
||||
}
|
||||
|
||||
gun.wsp.ws = gun.wsp.ws || new WSS(gun.__.opt.ws);
|
||||
require('./ws')(gun.wsp.ws, function(req, res){
|
||||
var ws = this;
|
||||
req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid'];
|
||||
ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){
|
||||
if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() }
|
||||
if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return }
|
||||
if(msg && msg.headers){ delete msg.headers['ws-rid'] }
|
||||
// TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id?
|
||||
try{ws.send(Gun.text.ify(msg));
|
||||
}catch(e){} // juuuust in case.
|
||||
});
|
||||
gun.wsp.wire(req, res);
|
||||
}, {headers: {'ws-rid': 1, 'gun-sid': 1}});
|
||||
gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80;
|
||||
}
|
||||
var wsp = gun.wsp = gun.wsp || function(server){
|
||||
if(!server){ return gun }
|
||||
if(Gun.fns.is(server.address)){
|
||||
if(server.address()){
|
||||
start(server, server.address().port);
|
||||
return gun;
|
||||
}
|
||||
}
|
||||
if(Gun.fns.is(server.get) && server.get('port')){
|
||||
start(server, server.get('port'));
|
||||
return gun;
|
||||
}
|
||||
var listen = server.listen;
|
||||
server.listen = function(port){
|
||||
var serve = listen.apply(server, arguments);
|
||||
start(serve, port, server);
|
||||
return serve;
|
||||
}
|
||||
return gun;
|
||||
}
|
||||
gun.wsp.on = gun.wsp.on || Gun.on;
|
||||
gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i;
|
||||
gun.wsp.poll = gun.wsp.poll || opt.poll || 1;
|
||||
gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000;
|
||||
gun.wsp.server = gun.wsp.server || function(req, res, next){ // http
|
||||
next = next || function(){};
|
||||
if(!req || !res){ return next(), false }
|
||||
if(!req.url){ return next(), false }
|
||||
if(!req.method){ return next(), false }
|
||||
var msg = {};
|
||||
msg.url = url.parse(req.url, true);
|
||||
if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail!
|
||||
if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){
|
||||
res.writeHead(200, {'Content-Type': 'text/javascript'});
|
||||
res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../../gun.js')); // gun server is caching the gun library for the client
|
||||
return true;
|
||||
}
|
||||
|
||||
if(!req.upgrade){
|
||||
next();
|
||||
return false;
|
||||
}
|
||||
|
||||
return http(req, res, function(req, res){
|
||||
if(!req){ return next() }
|
||||
var stream, cb = res = require('../jsonp')(req, res);
|
||||
if(req.headers && (stream = req.headers['gun-sid'])){
|
||||
stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream};
|
||||
stream.drain = stream.drain || function(res){
|
||||
if(!res || !stream || !stream.queue || !stream.queue.length){ return }
|
||||
res({headers: {'gun-sid': stream.sid}, body: stream.queue });
|
||||
stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull);
|
||||
stream.reply = stream.queue = null;
|
||||
return true;
|
||||
}
|
||||
stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){
|
||||
if(!stream){ return ev.off() } // self cleans up after itself!
|
||||
if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return }
|
||||
(stream.queue = stream.queue || []).push(req);
|
||||
stream.drain(stream.reply);
|
||||
});
|
||||
cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) }
|
||||
clearTimeout(stream.off);
|
||||
if(req.headers.pull){
|
||||
if(stream.drain(cb)){ return }
|
||||
return stream.reply = cb;
|
||||
}
|
||||
}
|
||||
gun.wsp.wire(req, cb);
|
||||
}), true;
|
||||
}
|
||||
if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){
|
||||
require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity;
|
||||
}
|
||||
gun.wsp.msg = gun.wsp.msg || function(id){
|
||||
if(!id){
|
||||
return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id;
|
||||
}
|
||||
clearTimeout(gun.wsp.msg.clear);
|
||||
gun.wsp.msg.clear = setTimeout(function(){
|
||||
var now = Gun.time.is();
|
||||
Gun.obj.map(gun.wsp.msg.debounce, function(t,id){
|
||||
if((now - t) < (1000 * 60 * 5)){ return }
|
||||
Gun.obj.del(gun.wsp.msg.debounce, id);
|
||||
});
|
||||
},500);
|
||||
if(id = gun.wsp.msg.debounce[id]){
|
||||
return gun.wsp.msg.debounce[id] = Gun.time.is(), id;
|
||||
}
|
||||
gun.wsp.msg.debounce[id] = Gun.time.is();
|
||||
return;
|
||||
};
|
||||
gun.wsp.msg.debounce = gun.wsp.msg.debounce || {};
|
||||
gun.wsp.wire = gun.wsp.wire || (function(){
|
||||
// all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers
|
||||
// except for the ones that are listed in the message as having already been sending to.
|
||||
// all states, implemented with GET, are replied to the source that asked for it.
|
||||
function tran(req, res){
|
||||
if(!req || !res || !req.body || !req.headers){ return }
|
||||
if(req.url){ req.url = url.format(req.url) }
|
||||
var msg = req.body;
|
||||
// AUTH for non-replies.
|
||||
if(gun.wsp.msg(msg['#'])){ return }
|
||||
gun.wsp.on('network', Gun.obj.copy(req));
|
||||
if(msg['@']){ return } // no need to process.
|
||||
if(msg['$'] && msg['$']['#']){ return tran.get(req, res) }
|
||||
//if(Gun.is.lex(msg['$'])){ return tran.get(req, res) }
|
||||
else { return tran.put(req, res) }
|
||||
cb({body: {hello: 'world'}});
|
||||
// TODO: BUG! server put should push.
|
||||
}
|
||||
tran.get = function(req, cb){
|
||||
var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
|
||||
gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){
|
||||
ev.off();
|
||||
var graph = at.put;
|
||||
return cb({headers: reply.headers, body: {
|
||||
'#': gun.wsp.msg(),
|
||||
'@': body['#'],
|
||||
'$': graph,
|
||||
'!': at.err
|
||||
}});
|
||||
})});
|
||||
}
|
||||
tran.put = function(req, cb){
|
||||
// NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually.
|
||||
// This will give you much more fine-grain control over security, transactions, and what not.
|
||||
var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
|
||||
gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){
|
||||
//Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){
|
||||
ev.off();
|
||||
return cb({headers: reply.headers, body: {
|
||||
'#': gun.wsp.msg(),
|
||||
'@': body['#'],
|
||||
'$': ack,
|
||||
'!': ack.err
|
||||
}});
|
||||
})});
|
||||
}
|
||||
gun.wsp.on('network', function(rq){
|
||||
// TODO: MARK! You should move the networking events to here, not in WSS only.
|
||||
});
|
||||
tran.json = 'application/json';
|
||||
return tran;
|
||||
}());
|
||||
if(opt.server){
|
||||
wsp(opt.server);
|
||||
}
|
||||
});
|
@ -1,4 +1,4 @@
|
||||
var Gun = require('../gun')
|
||||
var Gun = require('../../gun')
|
||||
, url = require('url');
|
||||
module.exports = function(wss, server, opt){
|
||||
wss.on('connection', function(ws){
|
||||
@ -27,7 +27,7 @@ module.exports = function(wss, server, opt){
|
||||
(reply.headers = reply.headers || {})['ws-rid'] = msg.headers['ws-rid'];
|
||||
}
|
||||
try{ws.send(Gun.text.ify(reply));
|
||||
}catch(e){} // juuuust in case.
|
||||
}catch(e){} // juuuust in case.
|
||||
});
|
||||
});
|
||||
ws.off = function(m){
|
Loading…
x
Reference in New Issue
Block a user