mirror of
https://github.com/amark/gun.git
synced 2025-03-30 15:08:33 +00:00
276 lines
12 KiB
JavaScript
276 lines
12 KiB
JavaScript
;(function(wsp){
|
|
var Gun = require('../gun')
|
|
, ws = require('ws').Server
|
|
, http = require('./http')
|
|
, url = require('url');
|
|
Gun.on('opt').event(function(gun, opt){
|
|
gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {};
|
|
function start(server, port, app){
|
|
if(app && app.use){ app.use(gun.wsp.server) }
|
|
server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server;
|
|
require('./ws')(gun.wsp.ws = gun.wsp.ws || new ws(gun.__.opt.ws), function(req, res){
|
|
var ws = this;
|
|
req.headers['gun-sid'] = ws.sid = (ws.sid? ws.sid : req.headers['gun-sid']);
|
|
ws.sub = ws.sub || gun.wsp.on('network').event(function(msg){
|
|
if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return this.off() }
|
|
if(!msg || (ws.sid && msg.headers && msg.headers['gun-sid'] === ws.sid)){ return }
|
|
if(msg && msg.headers){ delete msg.headers['ws-rid'] }
|
|
// TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id?
|
|
try{ws.send(Gun.text.ify(msg));
|
|
}catch(e){} // juuuust in case.
|
|
});
|
|
gun.wsp.wire(req, res);
|
|
});
|
|
gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80;
|
|
}
|
|
var wsp = gun.wsp = gun.wsp || function(server, auth){
|
|
gun.wsp.auth = auth;
|
|
if(!server){ return gun }
|
|
if(Gun.fns.is(server.address)){
|
|
if(server.address()){
|
|
start(server, server.address().port);
|
|
return gun;
|
|
}
|
|
}
|
|
if(Gun.fns.is(server.get) && server.get('port')){
|
|
start(server, server.get('port'));
|
|
return gun;
|
|
}
|
|
var listen = server.listen;
|
|
server.listen = function(port){
|
|
var serve = listen.apply(server, arguments);
|
|
start(serve, port, server);
|
|
return serve;
|
|
}
|
|
return gun;
|
|
}
|
|
gun.wsp.on = gun.wsp.on || Gun.on.create();
|
|
gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i;
|
|
gun.wsp.poll = gun.wsp.poll || opt.poll || 1;
|
|
gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000;
|
|
gun.wsp.server = gun.wsp.server || function(req, res, next){ // http
|
|
next = next || function(){};
|
|
if(!req || !res){ return next(), false }
|
|
if(!req.url){ return next(), false }
|
|
if(!req.method){ return next(), false }
|
|
var msg = {};
|
|
msg.url = url.parse(req.url, true);
|
|
if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail!
|
|
if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){
|
|
res.writeHead(200, {'Content-Type': 'text/javascript'});
|
|
res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../gun.js')); // gun server is caching the gun library for the client
|
|
return true;
|
|
}
|
|
return http(req, res, function(req, res){
|
|
if(!req){ return next() }
|
|
var stream, cb = res = require('./jsonp')(req, res);
|
|
if(req.headers && (stream = req.headers['gun-sid'])){
|
|
stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream};
|
|
stream.sub = stream.sub || gun.wsp.on('network').event(function(req){
|
|
if(!stream){ return this.off() } // self cleans up after itself!
|
|
if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return }
|
|
(stream.queue = stream.queue || []).push(req);
|
|
stream.drain(stream.reply);
|
|
});
|
|
cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) }
|
|
stream.drain = stream.drain || function(res){
|
|
if(!res || !stream || !stream.queue || !stream.queue.length){ return }
|
|
res({headers: {'gun-sid': stream.sid}, body: stream.queue });
|
|
stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull);
|
|
stream.reply = stream.queue = null;
|
|
return true;
|
|
}
|
|
clearTimeout(stream.off);
|
|
if(req.headers.pull){
|
|
if(stream.drain(cb)){ return }
|
|
return stream.reply = cb;
|
|
}
|
|
}
|
|
gun.wsp.wire(req, cb);
|
|
}), true;
|
|
}
|
|
if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){
|
|
require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity;
|
|
}
|
|
gun.wsp.msg = gun.wsp.msg || function(id){
|
|
if(!id){
|
|
return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id;
|
|
}
|
|
clearTimeout(gun.wsp.msg.clear);
|
|
gun.wsp.msg.clear = setTimeout(function(){
|
|
var now = Gun.time.is();
|
|
Gun.obj.map(gun.wsp.msg.debounce, function(t,id){
|
|
if((now - t) < (1000 * 60 * 5)){ return }
|
|
Gun.obj.del(gun.wsp.msg.debounce, id);
|
|
});
|
|
},500);
|
|
if(id = gun.wsp.msg.debounce[id]){
|
|
return gun.wsp.msg.debounce[id] = Gun.time.is(), id;
|
|
}
|
|
};
|
|
gun.wsp.msg.debounce = gun.wsp.msg.debounce || {};
|
|
gun.wsp.wire = gun.wsp.wire || (function(){
|
|
// all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers
|
|
// except for the ones that are listed in the message as having already been sending to.
|
|
// all states, implemented with GET, are replied to the source that asked for it.
|
|
function flow(req, res){
|
|
if (!req.auth || req.headers.broadcast) {
|
|
gun.wsp.on('network').emit(Gun.obj.copy(req));
|
|
}
|
|
if(req.headers.rid){ return } // no need to process.
|
|
if(Gun.is.lex(req.body)){ return tran.get(req, res) }
|
|
else { return tran.put(req, res) }
|
|
}
|
|
function tran(req, res){
|
|
if(!req || !res || !req.body || !req.headers || !req.headers.id){ return }
|
|
if(gun.wsp.msg(req.headers.id)){ return }
|
|
req.method = (req.body && !Gun.is.lex(req.body))? 'put' : 'get';
|
|
if(gun.wsp.auth){ return gun.wsp.auth(req, function(reply){
|
|
if(!reply.headers){ reply.headers = {} }
|
|
if(!reply.headers['Content-Type']){ reply.headers['Content-Type'] = tran.json }
|
|
if(!reply.rid){ reply.headers.rid = req.headers.id }
|
|
if(!reply.id){ reply.headers.id = gun.wsp.msg() }
|
|
res(reply);
|
|
}, flow) }
|
|
else { return flow(req, res) }
|
|
}
|
|
tran.get = function(req, cb){
|
|
var key = req.url.key
|
|
, reply = {headers: {'Content-Type': tran.json, rid: req.headers.id, id: gun.wsp.msg()}};
|
|
//Gun.log(req);
|
|
// NTS HACK! SHOULD BE ITS OWN ISOLATED MODULE! //
|
|
if(req && req.url && req.url.pathname && req.url.pathname.indexOf('gun.nts') >= 0){
|
|
return cb({headers: reply.headers, body: {time: Gun.time.is() }});
|
|
}
|
|
// NTS END! SHOULD HAVE BEEN ITS OWN MODULE //
|
|
// ALL HACK! SHOULD BE ITS OWN MODULE OR CORE? //
|
|
if(req && req.url && Gun.obj.has(req.url.query, '*')){
|
|
return gun.all(req.url.key + req.url.search, function(err, list){
|
|
cb({headers: reply.headers, body: (err? (err.err? err : {err: err || "Unknown error."}) : list || null ) })
|
|
});
|
|
}
|
|
//Gun.log("GET!", req);
|
|
key = req.body;
|
|
//Gun.log("tran.get", key);
|
|
var opt = {key: false, local: true};
|
|
//gun.get(key, function(err, node){
|
|
(gun.__.opt.wire.get||function(key, cb){cb(null,null)})(key, function(err, node){
|
|
//Gun.log("tran.get", key, "<---", err, node);
|
|
reply.headers.id = gun.wsp.msg();
|
|
if(err || !node){
|
|
if(opt.on && opt.on.off){ opt.on.off() }
|
|
return cb({headers: reply.headers, body: (err? (err.err? err : {err: err || "Unknown error."}) : null)});
|
|
}
|
|
if(Gun.obj.empty(node)){
|
|
if(opt.on && opt.on.off){ opt.on.off() }
|
|
return cb({headers: reply.headers, body: node});
|
|
} // we're out of stuff!
|
|
/*
|
|
(function(chunks){ // FEATURE! Stream chunks if the nodes are large!
|
|
var max = 10, count = 0, soul = Gun.is.node.soul(node);
|
|
if(Object.keys(node).length > max){
|
|
var n = Gun.is.node.soul.ify({}, soul);
|
|
Gun.obj.map(node, function(val, field){
|
|
if(!(++count % max)){
|
|
cb({headers: reply.headers, chunk: n}); // send node chunks
|
|
n = Gun.is.node.soul.ify({}, soul);
|
|
}
|
|
Gun.is.node.state.ify([n, node], field, val);
|
|
});
|
|
if(count % max){ // finish off the last chunk
|
|
cb({headers: reply.headers, chunk: n});
|
|
}
|
|
} else {
|
|
cb({headers: reply.headers, chunk: node}); // send full node
|
|
}
|
|
}([]));
|
|
*/
|
|
cb({headers: reply.headers, chunk: node }); // Use this if you don't want streaming chunks feature.
|
|
}, opt);
|
|
}
|
|
tran.put = function(req, cb){
|
|
// NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually.
|
|
// This will give you much more fine-grain control over security, transactions, and what not.
|
|
var reply = {headers: {'Content-Type': tran.json, rid: req.headers.id, id: gun.wsp.msg()}};
|
|
if(!req.body){ return cb({headers: reply.headers, body: {err: "No body"}}) }
|
|
//Gun.log("\n\ntran.put ----------------->", req.body);
|
|
if(Gun.is.graph(req.body)){
|
|
if(req.err = Gun.union(gun, req.body, function(err, ctx){ // TODO: BUG? Probably should give me ctx.graph
|
|
if(err){ return cb({headers: reply.headers, body: {err: err || "Union failed."}}) }
|
|
var ctx = ctx || {}; ctx.graph = {};
|
|
Gun.is.graph(req.body, function(node, soul){
|
|
ctx.graph[soul] = gun.__.graph[soul];
|
|
});
|
|
(gun.__.opt.wire.put || function(g,cb){cb("No save.")})(ctx.graph, function(err, ok){
|
|
if(err){ return cb({headers: reply.headers, body: {err: err || "Failed."}}) } // TODO: err should already be an error object?
|
|
cb({headers: reply.headers, body: {ok: ok || "Persisted."}});
|
|
//Gun.log("tran.put <------------------------", ok);
|
|
}, {local: true});
|
|
}).err){ cb({headers: reply.headers, body: {err: req.err || "Union failed."}}) }
|
|
} else {
|
|
cb({headers: reply.headers, body: {err: "Not a valid graph!"}});
|
|
}
|
|
}
|
|
gun.wsp.on('network').event(function(req){
|
|
// TODO: MARK! You should move the networking events to here, not in WSS only.
|
|
});
|
|
tran.json = 'application/json';
|
|
return tran;
|
|
}());
|
|
if(opt.server){
|
|
wsp(opt.server);
|
|
}
|
|
|
|
if(gun.wsp.driver){ return }
|
|
var driver = gun.wsp.driver = {};
|
|
var noop = function(){};
|
|
var get = gun.__.opt.wire.get || noop;
|
|
var put = gun.__.opt.wire.put || noop;
|
|
var driver = {
|
|
put: function(graph, cb, opt){
|
|
put(graph, cb, opt);
|
|
opt = opt || {};
|
|
if(opt.local){ return }
|
|
var id = gun.wsp.msg();
|
|
gun.wsp.on('network').emit({ // sent to dynamic peers!
|
|
headers: {'Content-Type': 'application/json', id: id},
|
|
body: graph
|
|
});
|
|
var ropt = {headers:{}, WebSocket: WebSocket};
|
|
ropt.headers.id = id;
|
|
Gun.obj.map(opt.peers || gun.__.opt.peers, function(peer, url){
|
|
Gun.request(url, graph, function(err, reply){
|
|
reply.body = reply.body || reply.chunk || reply.end || reply.write;
|
|
if(err || !reply || (err = reply.body && reply.body.err)){
|
|
return cb({err: Gun.log(err || "Put failed.") });
|
|
}
|
|
cb(null, reply.body);
|
|
}, ropt);
|
|
});
|
|
},
|
|
get: function(lex, cb, opt){
|
|
get(lex, cb, opt);
|
|
opt = opt || {};
|
|
if(opt.local){ return }
|
|
if(!Gun.request){ return console.log("Server could not find default network abstraction.") }
|
|
var ropt = {headers:{}};
|
|
ropt.headers.id = gun.wsp.msg();
|
|
Gun.obj.map(opt.peers || gun.__.opt.peers, function(peer, url){
|
|
Gun.request(url, lex, function(err, reply){
|
|
reply.body = reply.body || reply.chunk || reply.end || reply.write;
|
|
if(err || !reply || (err = reply.body && reply.body.err)){
|
|
return cb({err: Gun.log(err || "Get failed.") });
|
|
}
|
|
cb(null, reply.body);
|
|
}, ropt);
|
|
});
|
|
}
|
|
}
|
|
var WebSocket = require('ws');
|
|
Gun.request.WebSocket = WebSocket;
|
|
Gun.request.createServer(gun.wsp.wire);
|
|
gun.__.opt.wire = driver;
|
|
gun.opt({wire: driver}, true);
|
|
});
|
|
}({}));
|