mid conflict merge

This commit is contained in:
Mark Nadal 2016-11-14 16:44:00 -08:00
commit 1966a09ac2
19 changed files with 1147 additions and 286 deletions

4
.dockerignore Normal file
View File

@ -0,0 +1,4 @@
node_modules
.git
.gitignore
*.md

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
node_modules/*
npm-debug.log
yarn.lock
*data.json
*.db
.idea/

3
Dockerfile Normal file
View File

@ -0,0 +1,3 @@
FROM node:6-onbuild
ENV NPM_CONFIG_LOGLEVEL warn
EXPOSE 8080

View File

@ -43,6 +43,37 @@ Try the [interactive tutorial](http://gun.js.org/think.html) in the browser (**5
If that did not work it is probably because npm installed it to a global directory. To fix that try `mkdir node_modules` in your desired directory and re-run the above commands. You also might have to add `sudo` in front of the commands.
## Quick dev/test Deployments
- To quickly spin up a Gun test server for your development team, uilize eiher [Heroku](http://heroku.com) or [Docker](http://docker.com) or any variant thereof ([Dokku](http://dokku.viewdocs.io/dokku/), [Flynn.io](http://flynn.io), [now.sh](https://zeit.co/now), etc)
### Docker
```bash
git clone https://github.com/amark/gun.git
cd gun
docker build -t myrepo/gundb:v1 .
docker run -p 8080:8080 myrepo/gundb:v1
```
Then visit [http://localhost:8080](http://localhost:8080) in your browser.
### Hiroku
```bash
git clone https://github.com/amark/gun.git
cd gun
heroku create
git push -f heroku HEAD:master
```
Then visit the URL in the output of the 'heroku create' step, in your browser.
### Now.sh
```bash
npm install -g now
git clone https://github.com/amark/gun.git
cd gun
now --npm
```
Then visit the URL in the output of the 'now --npm' step, in your browser.
### Videos
- [Fault tolerance](https://www.youtube.com/watch?v=-i-11T5ZI9o&feature=youtu.be) (01:01)
- [Saving relational or document based data](https://www.youtube.com/watch?v=cOO6wz1rZVY&feature=youtu.be) (06:59)

View File

@ -1,10 +1,10 @@
console.log("If modules not found, run `npm install` in /example folder!"); // git subtree push -P examples heroku master // OR // git subtree split -P examples master && git push heroku [['HASH']]:master --force
var port = process.env.OPENSHIFT_NODEJS_PORT || process.env.VCAP_APP_PORT || process.env.PORT || process.argv[2] || 80;
var port = process.env.OPENSHIFT_NODEJS_PORT || process.env.VCAP_APP_PORT || process.env.PORT || process.argv[2] || 8080;
var express = require('express');
var app = express();
var Gun = require('gun');
var Gun = require('../');
var gun = Gun({
file: 'data.json',
s3: {
@ -17,4 +17,4 @@ var gun = Gun({
gun.wsp(app);
app.use(express.static(__dirname)).listen(port);
console.log('Server started on port ' + port + ' with /gun');
console.log('Server started on port ' + port + ' with /gun');

View File

@ -1,5 +1,6 @@
var Gun = require('gun');
var Gun = require('../');
var gun = Gun({
file: 'data.json',
s3: {
key: '', // AWS Access Key
secret: '', // AWS Secret Token

View File

@ -1,6 +1,6 @@
var port = process.env.OPENSHIFT_NODEJS_PORT || process.env.VCAP_APP_PORT || process.env.PORT || process.argv[2] || 80;
var port = process.env.OPENSHIFT_NODEJS_PORT || process.env.VCAP_APP_PORT || process.env.PORT || process.argv[2] || 8080;
var Gun = require('gun');
var Gun = require('../');
var gun = Gun({
file: 'data.json',
s3: {
@ -22,4 +22,4 @@ var server = require('http').createServer(function(req, res){
gun.wsp(server);
server.listen(port);
console.log('Server started on port ' + port + ' with /gun');
console.log('Server started on port ' + port + ' with /gun');

View File

@ -1,17 +0,0 @@
{
"name": "examples",
"main": "http.js",
"description": "Example gun apps"
, "version": "0.0.3"
, "engines": {
"node": "~>0.10.x"
}
, "dependencies": {
"express": "~>4.13.4",
"gun": "~>0.3.0"
}
, "scripts": {
"start": "node http.js",
"test": "mocha"
}
}

81
gun.js
View File

@ -372,12 +372,20 @@
}
if(last){
if(act.on.map){
var map = act.on.map, v;
for(var f in map){ v = map[f];
if(v[1]){
emit(v[1], act, event, v[2]);
}
}
/*
Gun.obj.map(act.on.map, function(v,f){ // TODO: BUG! Gun is not available in this module.
//emit(v[0], act, event, v[1]); // below enables more control
//console.log("boooooooo", f,v);
emit(v, act, event);
//emit(v[1], act, event, v[2]);
});
*/
} else {
emit(last, act, event);
}
@ -880,19 +888,19 @@
opt = opt || {};
var gun = this, at = gun._, tmp, u;
if(!at.root){ root(at) }
tmp = at.opt = at.opt || {};
at.opt = at.opt || {};
if(text_is(opt)){ opt = {peers: opt} }
else if(list_is(opt)){ opt = {peers: opt} }
if(text_is(opt.peers)){ opt.peers = [opt.peers] }
if(list_is(opt.peers)){ opt.peers = obj_map(opt.peers, function(n,f,m){m(n,{})}) }
obj_map(opt, function map(v,f){
if(obj_is(v)){
tmp = tmp[f] || (tmp[f] = {}); // TODO: Bug? Be careful of falsey values getting overwritten?
obj_map(v, map);
obj_map(v, map, this[f] || (this[f] = {})); // TODO: Bug? Be careful of falsey values getting overwritten?
return;
}
tmp[f] = v;
});
this[f] = v;
}, at.opt);
Gun.on('opt', at);
return gun;
}
function root(at){
@ -2037,7 +2045,9 @@
if(typeof field === 'string'){
tmp = field.split(opt.split || '.');
if(1 === tmp.length){
return back.get(field, cb, opt);
gun = back.get(field, cb, opt);
gun._.opt = opt;
return gun;
}
field = tmp;
}
@ -2052,25 +2062,34 @@
} else {
gun = back.get(field[0], cb, opt);
}
gun._.opt = opt;
return gun;
}
if(!field && 0 != field){
return back;
}
return back.get(''+field, cb, opt);
gun = back.get(''+field, cb, opt);
gun._.opt = opt;
return gun;
}
;(function(){
Gun.chain.on = function(tag, arg, eas, as){
var gun = this, at = gun._, tmp;
var gun = this, at = gun._, tmp, act, off;
if(!at.on){ at.on = Gun.on }
if(typeof tag === 'string'){
if(!arg){ return at.on(tag) }
at.on(tag, arg, eas || at, as);
act = at.on(tag, arg, eas || at, as);
off = function() {
if (act && act.off) act.off();
off.off();
};
off.off = gun.off.bind(gun) || noop;
gun.off = off;
return gun;
}
var opt = arg;
opt = (true === opt)? {change: true} : opt || {};
opt = (true === opt)? {change: true} : opt || {};
opt.ok = tag;
opt.last = {};
gun.any(ok, {as: opt, change: opt.change}); // TODO: PERF! Event listener leak!!!????
@ -2169,6 +2188,7 @@
;(function(){
Gun.chain.map = function(cb, opt, t){
<<<<<<< HEAD
var gun = this, cat = gun._, chain = cat.map;
//cb = cb || function(){ return this } // TODO: API BREAKING CHANGE! 0.5 Will behave more like other people's usage of `map` where the passed callback is a transform function. By default though, if no callback is specified then it will use a transform function that returns the same thing it received.
if(!chain){
@ -2177,11 +2197,40 @@
chain.on('in').map = {};
if(opt !== false){
gun.on(map, {change: true, as: cat});
=======
var gun = this, cat = gun._, chain = cat.map, ons = [], act, off;
if(!chain){
chain = cat.map = gun.chain();
var list = (cat = chain._).list = cat.list || {};
(ons[ons.length] = chain.on('in')).map = {};
ons[ons.length] = chain.on('out', function(at){
console.debug(8, 'map out', at);
if(at.get instanceof Function){
ons[ons.length] = chain.on('in', at.get, at);
return;
} else {
console.debug(9, 'map out', at);
ons[ons.length] = chain.on('in', gun.get.input, at.gun._);
}
});
if(opt !== false){
ons[ons.length] = gun.on(map, {change: true, as: cat});
console.debug(1, 'map');
>>>>>>> a4b64feef07d625d5ec7c37e151ed842c1c09dfa
}
}
if(cb){
chain.on(cb);
ons[ons.length] = chain.on(cb);
}
off = function() {
while (ons.length) {
act = ons.pop();
if (act && act.off) act.off();
}
return off.off();
};
off.off = chain.off.bind(chain) || noop;
chain.off = off;
return chain;
}
function map(at,ev){
@ -2208,8 +2257,10 @@
;(function(){
Gun.chain.set = function(item, cb, opt){
var gun = this;
var gun = this, soul;
cb = cb || function(){};
if (soul = Gun.node.soul(item)) return gun.set(gun.get(soul), cb, opt);
if (Gun.obj.is(item) && !Gun.is(item)) return gun.set(gun._.root.put(item), cb, opt);
return item.val(function(node){
var put = {}, soul = Gun.node.soul(node);
if(!soul){ return cb.call(gun, {err: Gun.log('Only a node can be linked! Not "' + node + '"!')}) }
@ -2228,7 +2279,7 @@
var store = root.localStorage || {setItem: noop, removeItem: noop, getItem: noop};
function put(at){ var err, id, opt, root = at.gun._.root;
(opt = at.opt || {}).prefix = opt.prefix || 'gun/';
(opt = at.opt || {}).prefix = opt.prefix || at.gun.Back('opt.prefix') || 'gun/';
Gun.graph.is(at.put, function(node, soul){
//try{store.setItem(opt.prefix + soul, Gun.text.ify(node));
try{store.setItem(opt.prefix + soul, Gun.text.ify(root._.graph[soul]||node));
@ -2240,7 +2291,7 @@
function get(at){
var gun = at.gun, lex = at.get, soul, data, opt, u;
//setTimeout(function(){
(opt = at.opt || {}).prefix = opt.prefix || 'gun/';
(opt = at.opt || {}).prefix = opt.prefix || at.gun.Back('opt.prefix') || 'gun/';
if(!lex || !(soul = lex[Gun._.soul])){ return }
data = Gun.obj.ify(store.getItem(opt.prefix + soul) || null);
if(!data){ return } // localStorage isn't trustworthy to say "not found".
@ -2443,7 +2494,7 @@
Tab.on = Gun.on;//Gun.on.create();
Tab.peers = require('../polyfill/peer');
Gun.on('get', function(at){
var gun = at.gun, opt = gun.Back('opt') || {}, peers = opt.peers;
var gun = at.gun, opt = at.opt || {}, peers = opt.peers || gun.Back('opt.peers');
if(!peers || Gun.obj.empty(peers)){
//setTimeout(function(){
Gun.log.once('peers', "Warning! You have no peers to connect to!");

View File

@ -6,6 +6,13 @@ var Gun = require('../gun'),
fs = require('fs'),
file = {};
function isUsingFileJS (context) {
var gun = context.gun;
var opt = context.opt || gun.Back('opt') || {};
return opt.file !== false;
}
// queue writes, adapted from https://github.com/toolness/jsondown/blob/master/jsondown.js
var isWriting = false, queuedWrites = [];
function writeFile(path, disk, at){
@ -24,6 +31,9 @@ function writeFile(path, disk, at){
}
Gun.on('put', function(at){
if (isUsingFileJS(at) === false) {
return;
}
var gun = at.gun, graph = at.put, opt = at.opt || {};
var __ = gun._.root._;
Gun.obj.map(graph, function(node, soul){
@ -32,7 +42,10 @@ Gun.on('put', function(at){
writeFile(opt.file || file.file, file.disk, at);
});
Gun.on('get', function(at){
var gun = at.gun, lex = at.get, opt = at.opt;
if (isUsingFileJS(at) === false) {
return;
}
var gun = at.gun, lex = at.get;
if(!lex){return}
gun.Back(-1).on('in', {'@': at['#'], put: Gun.graph.node(file.disk.graph[lex['#']])});
//at.cb(null, file.disk.graph[lex['#']]);
@ -43,7 +56,11 @@ Gun.on('opt', function(at){
if ((opts.file === false) || (opts.s3 && opts.s3.key)) {
return; // don't use this plugin if S3 is being used.
}
console.log("WARNING! This `file.js` module for gun is intended only for local development testing!")
Gun.log.once(
'file-warning',
'WARNING! This `file.js` module for gun is ' +
'intended only for local development testing!'
);
file.file = opts.file || file.file || 'data.json';
file.raw = file.raw || (fs.existsSync || require('path').existsSync)(opts.file) ? fs.readFileSync(opts.file).toString() : null;
file.disk = file.disk || Gun.obj.ify(file.raw || {graph: {}});

View File

@ -1,9 +1,13 @@
;(function(){
console.log("Hello wonderful person! :) I'm mark@gunDB.io, message me for help or with hatemail. I want to hear from you! <3");
var Gun = require('../gun');
console.log("TODO: MARK! UPDATE S3 DRIVER BEFORE PUBLISHING!")
;(function(){
var Gun = require('../gun');
//require('./s3');
require('./wsp');
require('./file');
module.exports = Gun;
require('./wsp/server');
require('./file');
Gun.log(
'Hello wonderful person! :)\n' +
'I\'m mark@gunDB.io, message me for help or with hatemail. ' +
'I want to hear from you! <3'
);
Gun.log('TODO: MARK! UPDATE S3 DRIVER BEFORE PUBLISHING!');
module.exports = Gun;
}());

View File

@ -1,230 +0,0 @@
;(function(wsp){
/*
TODO: SERVER PUSH!
TODO: SERVER GET!
TODO: SERVER PUSH!
TODO: SERVER GET!
TODO: SERVER PUSH!
TODO: SERVER GET!
TODO: SERVER PUSH!
TODO: SERVER GET!
TODO: SERVER PUSH!
TODO: SERVER GET!
TODO: SERVER PUSH!
TODO: SERVER GET!
*/
var Gun = require('../gun')
, formidable = require('formidable')
, ws = require('ws').Server
, http = require('./http')
, url = require('url');
Gun.on('opt', function(at){
var gun = at.gun, opt = at.opt;
gun.__ = at.root._;
gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {};
function start(server, port, app){
if(app && app.use){ app.use(gun.wsp.server) }
server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server;
require('./ws')(gun.wsp.ws = gun.wsp.ws || new ws(gun.__.opt.ws), function(req, res){
var ws = this;
req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid'];
ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){
if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() }
if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return }
if(msg && msg.headers){ delete msg.headers['ws-rid'] }
// TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id?
try{ws.send(Gun.text.ify(msg));
}catch(e){} // juuuust in case.
});
gun.wsp.wire(req, res);
}, {headers: {'ws-rid': 1, 'gun-sid': 1}});
gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80;
}
var wsp = gun.wsp = gun.wsp || function(server){
if(!server){ return gun }
if(Gun.fns.is(server.address)){
if(server.address()){
start(server, server.address().port);
return gun;
}
}
if(Gun.fns.is(server.get) && server.get('port')){
start(server, server.get('port'));
return gun;
}
var listen = server.listen;
server.listen = function(port){
var serve = listen.apply(server, arguments);
start(serve, port, server);
return serve;
}
return gun;
}
gun.wsp.on = gun.wsp.on || Gun.on;
gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i;
gun.wsp.poll = gun.wsp.poll || opt.poll || 1;
gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000;
gun.wsp.server = gun.wsp.server || function(req, res, next){ // http
next = next || function(){};
if(!req || !res){ return next(), false }
if(!req.upgrade){ return next(), false }
if(!req.url){ return next(), false }
if(!req.method){ return next(), false }
var msg = {};
msg.url = url.parse(req.url, true);
if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail!
if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){
res.writeHead(200, {'Content-Type': 'text/javascript'});
res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../gun.js')); // gun server is caching the gun library for the client
return true;
}
return http(req, res, function(req, res){
if(!req){ return next() }
var stream, cb = res = require('./jsonp')(req, res);
if(req.headers && (stream = req.headers['gun-sid'])){
stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream};
stream.drain = stream.drain || function(res){
if(!res || !stream || !stream.queue || !stream.queue.length){ return }
res({headers: {'gun-sid': stream.sid}, body: stream.queue });
stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull);
stream.reply = stream.queue = null;
return true;
}
stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){
if(!stream){ return ev.off() } // self cleans up after itself!
if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return }
(stream.queue = stream.queue || []).push(req);
stream.drain(stream.reply);
});
cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) }
clearTimeout(stream.off);
if(req.headers.pull){
if(stream.drain(cb)){ return }
return stream.reply = cb;
}
}
gun.wsp.wire(req, cb);
}), true;
}
if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){
require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity;
}
gun.wsp.msg = gun.wsp.msg || function(id){
if(!id){
return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id;
}
clearTimeout(gun.wsp.msg.clear);
gun.wsp.msg.clear = setTimeout(function(){
var now = Gun.time.is();
Gun.obj.map(gun.wsp.msg.debounce, function(t,id){
if((now - t) < (1000 * 60 * 5)){ return }
Gun.obj.del(gun.wsp.msg.debounce, id);
});
},500);
if(id = gun.wsp.msg.debounce[id]){
return gun.wsp.msg.debounce[id] = Gun.time.is(), id;
}
gun.wsp.msg.debounce[id] = Gun.time.is();
return;
};
gun.wsp.msg.debounce = gun.wsp.msg.debounce || {};
gun.wsp.wire = gun.wsp.wire || (function(){
// all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers
// except for the ones that are listed in the message as having already been sending to.
// all states, implemented with GET, are replied to the source that asked for it.
function tran(req, res){
if(!req || !res || !req.body || !req.headers){ return }
if(req.url){ req.url = url.format(req.url) }
var msg = req.body;
// AUTH for non-replies.
if(gun.wsp.msg(msg['#'])){ return }
gun.wsp.on('network', Gun.obj.copy(req));
if(msg['@']){ return } // no need to process.
if(msg['$'] && msg['$']['#']){ return tran.get(req, res) }
//if(Gun.is.lex(msg['$'])){ return tran.get(req, res) }
else { return tran.put(req, res) }
cb({body: {hello: 'world'}});
// TODO: BUG! server put should push.
}
tran.get = function(req, cb){
var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){
ev.off();
var graph = at.put;
return cb({headers: reply.headers, body: {
'#': gun.wsp.msg(),
'@': body['#'],
'$': graph,
'!': at.err
}});
return;
if(Gun.obj.empty(node)){
return cb({headers: reply.headers, body: node});
} // we're out of stuff!
/*
(function(chunks){ // FEATURE! Stream chunks if the nodes are large!
var max = 10, count = 0, soul = Gun.is.node.soul(node);
if(Object.keys(node).length > max){
var n = Gun.is.node.soul.ify({}, soul);
Gun.obj.map(node, function(val, field){
if(!(++count % max)){
cb({headers: reply.headers, chunk: n}); // send node chunks
n = Gun.is.node.soul.ify({}, soul);
}
Gun.is.node.state.ify([n, node], field, val);
});
if(count % max){ // finish off the last chunk
cb({headers: reply.headers, chunk: n});
}
} else {
cb({headers: reply.headers, chunk: node}); // send full node
}
}([]));
*/
cb({headers: reply.headers, chunk: node }); // Use this if you don't want streaming chunks feature.
})});
}
tran.put = function(req, cb){
//console.log("tran.put", req);
// NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually.
// This will give you much more fine-grain control over security, transactions, and what not.
var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){
//Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){
ev.off();
return cb({headers: reply.headers, body: {
'#': gun.wsp.msg(),
'@': body['#'],
'$': ack,
'!': ack.err
}});
})});
return;
if(Gun.is.graph(req.body)){
if(req.err = Gun.union(gun, req.body, function(err, ctx){ // TODO: BUG? Probably should give me ctx.graph
if(err){ return cb({headers: reply.headers, body: {err: err || "Union failed."}}) }
var ctx = ctx || {}; ctx.graph = {};
Gun.is.graph(req.body, function(node, soul){
ctx.graph[soul] = gun.__.graph[soul];
});
(gun.__.opt.wire.put || function(g,cb){cb("No save.")})(ctx.graph, function(err, ok){
if(err){ return cb({headers: reply.headers, body: {err: err || "Failed."}}) } // TODO: err should already be an error object?
cb({headers: reply.headers, body: {ok: ok || "Persisted."}});
//console.log("tran.put <------------------------", ok);
});
}).err){ cb({headers: reply.headers, body: {err: req.err || "Union failed."}}) }
} else {
cb({headers: reply.headers, body: {err: "Not a valid graph!"}});
}
}
gun.wsp.on('network', function(req){
// TODO: MARK! You should move the networking events to here, not in WSS only.
});
tran.json = 'application/json';
return tran;
}());
if(opt.server){
wsp(opt.server);
}
});
}({}));

176
lib/wsp/Peer.js Normal file
View File

@ -0,0 +1,176 @@
var WebSocket = require('ws');
/**
* Calculates backoff instances.
* @param {Object} [options] - Override the default settings.
* @param {Object} options.time=50 - Initial backoff time.
* @param {Object} options.factor=2 - How much to multiply the time by.
* @class
*/
function Backoff (options) {
this.options = options || {};
// Sets the initial backoff settings.
this.reset();
}
/**
* Increments the time by the factor.
* @return {Number} - The next backoff time.
*/
Backoff.prototype.next = function () {
this.time *= this.factor;
return this.time;
};
/**
* Resets the backoff state to it's original condition.
* @return {Backoff} - The context.
*/
Backoff.prototype.reset = function () {
var options = this.options;
this.time = options.time || 50;
this.factor = options.factor || 2;
return this;
};
/**
* Create a websocket client and handle reconnect backoff logic.
* @param {String} url - A preformatted url (starts with ws://)
* @param {Object} [options] - Override how the socket is managed.
* @param {Object} options.backoff - Backoff options (see the constructor).
* @class
*/
function Peer (url, options) {
if (!(this instanceof Peer)) {
return new Peer(url, options);
}
this.options = options || {};
// Messages sent while offline.
this.offline = [];
this.url = Peer.formatURL(url);
this.backoff = new Backoff(this.options.backoff);
this.retry(url);
}
/**
* Turns http URLs into WebSocket URLs.
* @param {String} url - The url to format.
* @return {String} - A correctly formatted WebSocket URL.
*/
Peer.formatURL = function (url) {
// Works for `https` and `wss` URLs, too.
return url.replace('http', 'ws');
};
var API = Peer.prototype;
/**
* Attempts a websocket connection.
* @param {String} url - The websocket URL.
* @return {WebSocket} - The new websocket instance.
*/
API.retry = function () {
var url = this.url;
var socket = new WebSocket(url);
this.socket = socket;
this.retryOnDisconnect(socket);
this.sendOnConnection();
return socket;
};
/**
* Sends the messages that couldn't be sent before once
* the connection is open.
* @return {Peer} - The context.
*/
API.sendOnConnection = function () {
var peer = this;
var queue = this.offline;
var socket = this.socket;
// Wait for the socket to connect.
socket.once('open', function () {
queue.forEach(function (msg) {
socket.send(msg);
});
peer.offline = [];
});
return this;
};
/**
* Schedules the next retry, according to the backoff.
* @param {Peer} peer - A peer instance.
* @return {Timeout} - The timeout value from `setTimeout`.
*/
function schedule (peer) {
var backoff = peer.backoff;
var time = backoff.time;
backoff.next();
return setTimeout(function () {
var socket = peer.retry();
// Successfully reconnected? Reset the backoff.
socket.once('open', backoff.reset.bind(backoff));
}, time);
}
/**
* Attaches handlers to the socket, attempting reconnection
* when it's closed.
* @param {WebSocket} socket - The websocket instance to bind to.
* @return {WebSocket} - The same websocket.
*/
API.retryOnDisconnect = function (socket) {
var peer = this;
// Listen for socket close events.
socket.once('close', function () {
schedule(peer);
});
socket.on('error', function (error) {
if (error.code === 'ECONNREFUSED') {
schedule(peer);
}
});
return socket;
};
/**
* Send data through the socket, or add it to a queue
* of offline requests if it's not ready yet.
* @param {String} msg - The data to send.
* @return {Peer} - The context.
*/
API.send = function (msg) {
var socket = this.socket;
var state = socket.readyState;
var ready = socket.OPEN;
if (state === ready) {
socket.send(msg);
} else {
this.offline.push(msg);
}
return this;
};
module.exports = Peer;

460
lib/wsp/client.js Normal file
View File

@ -0,0 +1,460 @@
/* eslint-env node*/
/*
eslint-disable
require-jsdoc,
no-warning-comments,
no-underscore-dangle,
max-params,
*/
'use strict';
var Gun = require('../../gun');
var WS = require('ws');
var Tab = {};
Tab.on = Gun.on;
Tab.peers = (function () {
function Peer (peers) {
if (!Peer.is(this)) {
return new Peer(peers);
}
this.peers = peers;
}
Peer.is = function (peer) {
return peer instanceof Peer;
};
function map (peer, url) {
var msg = this.msg;
var opt = this.opt || {};
opt.out = true;
Peer.request(url, msg, null, opt);
}
Peer.prototype.send = function (msg, opt) {
Peer.request.each(this.peers, map, {
msg: msg,
opt: opt,
});
};
Peer.request = (function () {
function request (base, body, cb, opt) {
var obj = base.length ? { base: base } : {};
obj.base = opt.base || base;
obj.body = opt.body || body;
obj.headers = opt.headers;
obj.url = opt.url;
obj.out = opt.out;
cb = cb || function () {};
if (!obj.base) {
return;
}
request.transport(obj, cb);
}
request.createServer = function (fn) {
request.createServer.list.push(fn);
};
request.createServer.ing = function (req, cb) {
var index = request.createServer.list.length;
var server;
while (index) {
index -= 1;
server = request.createServer.list[index] || function () {};
server(req, cb);
}
};
request.createServer.list = [];
request.back = 2;
request.backoff = 2;
request.transport = function (opt, cb) {
if (request.ws(opt, cb)) {
return;
}
};
request.ws = function (opt, cb, req) {
var ws;
if (!WS) {
return false;
}
ws = request.ws.peers[opt.base];
if (ws) {
req = req || {};
if (opt.headers) {
req.headers = opt.headers;
}
if (opt.body) {
req.body = opt.body;
}
if (opt.url) {
req.url = opt.url;
}
req.headers = req.headers || {};
if (!opt.out && !ws.cbs[req.headers['ws-rid']]) {
var rid = 'WS' +
new Date().getTime() +
'.' +
Math.floor((Math.random() * 65535) + 1);
req.headers['ws-rid'] = rid;
ws.cbs[rid] = function (err, res) {
if (!res || res.body || res.end) {
delete ws.cbs[req.headers['ws-rid']];
}
cb(err, res);
};
}
if (!ws.readyState) {
setTimeout(function () {
request.ws(opt, cb, req);
}, 100);
return true;
}
ws.sending = true;
ws.send(JSON.stringify(req));
return true;
}
if (ws === false) {
return false;
}
var wsURL = opt.base.replace('http', 'ws');
ws = request.ws.peers[opt.base] = new WS(wsURL);
ws.cbs = {};
ws.onopen = function () {
request.back = 2;
request.ws(opt, cb);
};
ws.onclose = function (event) {
if (!ws || !event) {
return;
}
if (ws.close instanceof Function) {
ws.close();
}
if (!ws.sending) {
ws = request.ws.peers[opt.base] = false;
request.transport(opt, cb);
return;
}
request.each(ws.cbs, function (cb) {
cb({
err: 'WebSocket disconnected!',
code: ws.sending ? (ws || {}).err || event.code : -1,
});
});
// This will make the next request try to reconnect
ws = request.ws.peers[opt.base] = null;
// TODO: Have the driver handle this!
setTimeout(function () {
// opt here is a race condition,
// is it not? Does this matter?
request.ws(opt, function () {});
}, request.back *= request.backoff);
};
ws.onmessage = function (msg) {
var res;
if (!msg || !msg.data) {
return;
}
try {
res = JSON.parse(msg.data);
} catch (error) {
return;
}
if (!res) {
return;
}
res.headers = res.headers || {};
if (res.headers['ws-rid']) {
var cb = ws.cbs[res.headers['ws-rid']] || function () {};
cb(null, res);
return;
}
// emit extra events.
if (res.body) {
request.createServer.ing(res, function (res) {
res.out = true;
request(opt.base, null, null, res);
});
}
};
ws.onerror = function (error) {
(ws || {}).err = error;
};
return true;
};
request.ws.peers = {};
request.ws.cbs = {};
request.each = function (obj, cb, as) {
if (!obj || !cb) {
return;
}
for (var key in obj) {
if (obj.hasOwnProperty(key)) {
cb.call(as, obj[key], key);
}
}
};
return request;
}());
return Peer;
}());
// Handle read requests.
Gun.on('get', function (at) {
var gun = at.gun;
var opt = at.opt || {};
var peers = opt.peers || gun.Back('opt.peers');
if (!peers || Gun.obj.empty(peers)) {
Gun.log.once('peers', 'Warning! You have no peers to connect to!');
at.gun.Back(-1).on('in', {'@': at['#']});
return;
}
// Create a new message.
var msg = {
// msg ID
'#': at['#'] || Gun.text.random(9),
// msg BODY
'$': at.get,
};
// Listen for a response.
// TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
Tab.on(msg['#'], function (err, data) {
var obj = {
'@': at['#'],
err: err,
put: data,
};
if (data) {
at.gun.Back(-1).on('out', obj);
} else {
at.gun.Back(-1).on('in', obj);
}
});
// Broadcast to all other peers.
Tab.peers(peers).send(msg, {
headers: {
'gun-sid': Tab.server.sid,
},
});
});
// Handle write requests.
Gun.on('put', function (at) {
if (at['@']) {
return;
}
var opt = at.gun.Back('opt') || {}, peers = opt.peers;
if (!peers || Gun.obj.empty(peers)) {
Gun.log.once('peers', 'Warning! You have no peers to save to!');
at.gun.Back(-1).on('in', {'@': at['#']});
return;
}
if (opt.websocket === false || (at.opt && at.opt.websocket === false)) {
return;
}
var msg = {
// msg ID
'#': at['#'] || Gun.text.random(9),
// msg BODY
'$': at.put,
};
// TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
Tab.on(msg['#'], function (err, ok) {
at.gun.Back(-1).on('in', {
'@': at['#'],
err: err,
ok: ok,
});
});
Tab.peers(peers).send(msg, {
headers: {
'gun-sid': Tab.server.sid,
},
});
});
// REVIEW: Do I need this on a server client?
// browser/client side Server!
// TODO: BUG! Does not respect separate instances!!!
Gun.on('opt', function (at) {
if (Tab.server) {
return;
}
var gun = at.gun;
var server = Tab.server = Tab.server || {};
var tmp;
server.sid = Gun.text.random();
Tab.peers.request.createServer(function (req, res) {
// Validate request.
if (!req || !res || !req.body || !req.headers) {
return;
}
var msg = req.body;
// AUTH for non-replies.
if (server.msg(msg['#'])) {
return;
}
// no need to process.
if (msg['@']) {
if (Tab.ons[tmp = msg['@'] || msg['#']]) {
Tab.on(tmp, [msg['!'], msg.$]);
}
return;
}
if (msg.$ && msg.$['#']) {
server.get(req, res);
return;
}
server.put(req, res);
});
server.get = function (req, cb) {
var body = req.body;
var lex = body.$;
var graph = gun._.root._.graph;
var node;
// Don't reply to data we don't have it in memory.
// TODO: Add localStorage?
if (!(node = graph[lex['#']])) {
return;
}
cb({
body: {
'#': server.msg(),
'@': body['#'],
'$': node,
},
});
};
server.put = function (req, cb) {
var body = req.body, graph = body.$;
var __ = gun._.root._;
// filter out what we don't have in memory.
if (!(graph = Gun.obj.map(graph, function (node, soul, map) {
if (!__.path[soul]) {
return;
}
map(soul, node);
}))) {
return;
}
gun.on('out', {
gun: gun,
opt: {
websocket: false,
},
put: graph,
'#': Gun.on.ask(function (ack, ev) {
if (!ack) {
return undefined;
}
ev.off();
return cb({
body: {
'#': server.msg(),
'@': body['#'],
'$': ack,
'!': ack.err,
},
});
}),
});
};
server.msg = function (id) {
if (!id) {
id = Gun.text.random(9);
server.msg.debounce[id] = Gun.time.is();
return id;
}
clearTimeout(server.msg.clear);
server.msg.clear = setTimeout(function () {
var now = Gun.time.is();
Gun.obj.map(server.msg.debounce, function (time, id) {
if ((now - time) < (1000 * 60 * 5)) {
return;
}
Gun.obj.del(server.msg.debounce, id);
});
}, 500);
if (server.msg.debounce[id]) {
server.msg.debounce[id] = Gun.time.is();
return id;
}
server.msg.debounce[id] = Gun.time.is();
return undefined;
};
server.msg.debounce = server.msg.debounce || {};
});

156
lib/wsp/server-push.js Normal file
View File

@ -0,0 +1,156 @@
'use strict';
var Gun = require('../../gun.js');
/**
* Whether the gun instance is attached to a socket server.
* @param {Gun} gun - The gun instance in question.
* @param {WebSocket.Server} server - A socket server gun might be attached to.
* @return {Boolean} - Whether it's attached.
*/
function isUsingServer (gun, server) {
var servers = gun.Back(-1)._.servers;
return servers ? servers.indexOf(server) !== -1 : false;
}
/**
* Calls a function when (or if) a socket is ready for messages.
* @param {WebSocket} socket - A websocket connection.
* @param {Function} cb - Called if or when the socket is ready.
* @return {Boolean} - Whether the socket is able to take messages.
*/
function ready (socket, cb) {
var state = socket.readyState;
// The socket is ready.
if (state === socket.OPEN) {
cb();
return true;
}
// Still opening.
if (state === socket.OPENING) {
socket.once('open', cb);
}
// Nope, closing or closed.
return false;
}
/**
* Send a request to a list of clients.
* @param {Obejct} context - A gun request context.
* @param {Object} clients - IDs mapped to socket instances.
* @param {Function} cb - Called for each response.
* @return {undefined}
*/
function request (context, clients, cb) {
Gun.obj.map(clients, function (client) {
ready(client, function () {
var msg = {
headers: {},
body: {
'#': Gun.on.ask(cb),
'$': context.get,
},
};
var serialized = JSON.stringify(msg);
client.send(serialized);
});
});
}
/**
* Pushes a graph update to a collection of clients.
* @param {Object} context - The context object passed by gun.
* @param {Object} clients - An object mapping URLs to clients.
* @param {Function} cb - Invoked on each client response.
* @return {undefined}
*/
function update (context, clients, cb) {
Gun.obj.map(clients, function (client) {
ready(client, function () {
var msg = {
headers: {},
body: {
'#': Gun.on.ask(cb),
'$': context.put,
},
};
var serialized = JSON.stringify(msg);
client.send(serialized);
});
});
}
/** * Attaches server push middleware to gun.
* @param {Gun} gun - The gun instance to attach to.
* @param {WebSocket.Server} server - A websocket server instance.
* @return {server} - The socket server.
*/
function attach (gun, server) {
var root = gun.Back(-1);
root._.servers = root._.servers || [];
root._.servers.push(server);
var pool = {};
server.on('connection', function (socket) {
socket.id = socket.id || Gun.text.random(10);
pool[socket.id] = socket;
socket.on('message', function (message) {
var data = Gun.obj.ify(message);
if (!data || !data.body) {
return;
}
var msg = data.body;
if (msg['@']) {
Gun.on.ack(msg['@'], [msg['!'], msg.$]);
return;
}
});
socket.once('close', function () {
delete pool[socket.id];
});
});
Gun.on('get', function (context) {
if (!isUsingServer(context.gun, server)) {
return;
}
request(context, pool, function (err, data) {
var response = {
'@': context['#'],
put: data,
err: err,
};
var root = context.gun.Back(Infinity);
root.on(data ? 'out' : 'in', response);
});
});
Gun.on('put', function (context) {
if (!isUsingServer(context.gun, server)) {
return;
}
update(context, pool, function (err, data) {
var ack = {
'!': err || null,
'$': data.$,
};
Gun.on.ack(context, ack);
});
});
}
module.exports = attach;

191
lib/wsp/server.js Normal file
View File

@ -0,0 +1,191 @@
var Gun = require('../../gun')
, formidable = require('formidable')
, http = require('../http')
, url = require('url')
, wsp = {}
, WS = require('ws')
, WSS = WS.Server
, attach = require('./server-push');
// Handles server to server sync.
require('./client.js');
Gun.on('opt', function(at){
var gun = at.gun, opt = at.opt;
gun.__ = at.root._;
gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {};
function start(server, port, app){
if(app && app.use){ app.use(gun.wsp.server) }
server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server;
if (!gun.wsp.ws) {
gun.wsp.ws = new WSS(gun.__.opt.ws);
attach(gun, gun.wsp.ws);
}
gun.wsp.ws = gun.wsp.ws || new WSS(gun.__.opt.ws);
require('./ws')(gun.wsp.ws, function(req, res){
var ws = this;
req.headers['gun-sid'] = ws.sid = ws.sid? ws.sid : req.headers['gun-sid'];
ws.sub = ws.sub || gun.wsp.on('network', function(msg, ev){
if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return ev.off() }
if(!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)){ return }
if(msg && msg.headers){ delete msg.headers['ws-rid'] }
// TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id?
try{ws.send(Gun.text.ify(msg));
}catch(e){} // juuuust in case.
});
gun.wsp.wire(req, res);
}, {headers: {'ws-rid': 1, 'gun-sid': 1}});
gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80;
}
var wsp = gun.wsp = gun.wsp || function(server){
if(!server){ return gun }
if(Gun.fns.is(server.address)){
if(server.address()){
start(server, server.address().port);
return gun;
}
}
if(Gun.fns.is(server.get) && server.get('port')){
start(server, server.get('port'));
return gun;
}
var listen = server.listen;
server.listen = function(port){
var serve = listen.apply(server, arguments);
start(serve, port, server);
return serve;
}
return gun;
}
gun.wsp.on = gun.wsp.on || Gun.on;
gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i;
gun.wsp.poll = gun.wsp.poll || opt.poll || 1;
gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000;
gun.wsp.server = gun.wsp.server || function(req, res, next){ // http
next = next || function(){};
if(!req || !res){ return next(), false }
if(!req.url){ return next(), false }
if(!req.method){ return next(), false }
var msg = {};
msg.url = url.parse(req.url, true);
if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail!
if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){
res.writeHead(200, {'Content-Type': 'text/javascript'});
res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../../gun.js')); // gun server is caching the gun library for the client
return true;
}
if(!req.upgrade){
next();
return false;
}
return http(req, res, function(req, res){
if(!req){ return next() }
var stream, cb = res = require('../jsonp')(req, res);
if(req.headers && (stream = req.headers['gun-sid'])){
stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream};
stream.drain = stream.drain || function(res){
if(!res || !stream || !stream.queue || !stream.queue.length){ return }
res({headers: {'gun-sid': stream.sid}, body: stream.queue });
stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull);
stream.reply = stream.queue = null;
return true;
}
stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){
if(!stream){ return ev.off() } // self cleans up after itself!
if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return }
(stream.queue = stream.queue || []).push(req);
stream.drain(stream.reply);
});
cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) }
clearTimeout(stream.off);
if(req.headers.pull){
if(stream.drain(cb)){ return }
return stream.reply = cb;
}
}
gun.wsp.wire(req, cb);
}), true;
}
if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){
require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity;
}
gun.wsp.msg = gun.wsp.msg || function(id){
if(!id){
return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id;
}
clearTimeout(gun.wsp.msg.clear);
gun.wsp.msg.clear = setTimeout(function(){
var now = Gun.time.is();
Gun.obj.map(gun.wsp.msg.debounce, function(t,id){
if((now - t) < (1000 * 60 * 5)){ return }
Gun.obj.del(gun.wsp.msg.debounce, id);
});
},500);
if(id = gun.wsp.msg.debounce[id]){
return gun.wsp.msg.debounce[id] = Gun.time.is(), id;
}
gun.wsp.msg.debounce[id] = Gun.time.is();
return;
};
gun.wsp.msg.debounce = gun.wsp.msg.debounce || {};
gun.wsp.wire = gun.wsp.wire || (function(){
// all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers
// except for the ones that are listed in the message as having already been sending to.
// all states, implemented with GET, are replied to the source that asked for it.
function tran(req, res){
if(!req || !res || !req.body || !req.headers){ return }
if(req.url){ req.url = url.format(req.url) }
var msg = req.body;
// AUTH for non-replies.
if(gun.wsp.msg(msg['#'])){ return }
gun.wsp.on('network', Gun.obj.copy(req));
if(msg['@']){ return } // no need to process.
if(msg['$'] && msg['$']['#']){ return tran.get(req, res) }
//if(Gun.is.lex(msg['$'])){ return tran.get(req, res) }
else { return tran.put(req, res) }
cb({body: {hello: 'world'}});
// TODO: BUG! server put should push.
}
tran.get = function(req, cb){
var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){
ev.off();
var graph = at.put;
return cb({headers: reply.headers, body: {
'#': gun.wsp.msg(),
'@': body['#'],
'$': graph,
'!': at.err
}});
})});
}
tran.put = function(req, cb){
// NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually.
// This will give you much more fine-grain control over security, transactions, and what not.
var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
gun.on('out', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){
//Gun.on('put', {gun: gun, put: graph, '#': Gun.on.ask(function(ack, ev){
ev.off();
return cb({headers: reply.headers, body: {
'#': gun.wsp.msg(),
'@': body['#'],
'$': ack,
'!': ack.err
}});
})});
}
gun.wsp.on('network', function(rq){
// TODO: MARK! You should move the networking events to here, not in WSS only.
});
tran.json = 'application/json';
return tran;
}());
if(opt.server){
wsp(opt.server);
}
});

View File

@ -1,4 +1,4 @@
var Gun = require('../gun')
var Gun = require('../../gun')
, url = require('url');
module.exports = function(wss, server, opt){
wss.on('connection', function(ws){
@ -27,7 +27,7 @@ module.exports = function(wss, server, opt){
(reply.headers = reply.headers || {})['ws-rid'] = msg.headers['ws-rid'];
}
try{ws.send(Gun.text.ify(reply));
}catch(e){} // juuuust in case.
}catch(e){} // juuuust in case.
});
});
ws.off = function(m){

View File

@ -50,6 +50,7 @@
},
"devDependencies": {
"mocha": "~>1.9.0",
"express": "~>4.13.4",
"panic-server": "~>0.3.0",
"selenium-webdriver": "~>2.53.2"
}

View File

@ -6755,12 +6755,19 @@ describe('Gun', function(){
var alice = gun.put({name: 'alice', birth: Math.random()}).key('person/alice');
var bob = gun.put({name: 'bob', birth: Math.random()}).key('person/bob');
var carl = gun.put({name: 'carl', birth: Math.random()}).key('person/carl');
var dave = gun.put({name: 'dave', birth: Math.random()}).key('person/dave');
var dave = gun.put({name: 'dave', birth: Math.random()}).key('person/dave');
// Test set with new object
var alan = users.set({name: 'alan', birth: Math.random()}).key('person/alan');
alan.val(function(alan) {
// Test set with node
dave.path('friends').set(alan);
});
users.set(alice);
users.set(bob);
users.set(carl);
users.set(dave);
users.set(dave);
alice.path('friends').set(bob).back.set(carl);
bob.path('friends').set(alice);
@ -6768,7 +6775,8 @@ describe('Gun', function(){
var team = gun.get('team/lions').put({name: "Lions"});
team.path('members').set(alice);
team.path('members').set(bob);
team.path('members').set(bob);
team.path('members').set(alan); // Test set with set
alice.path('team').put(team);
bob.path('team').put(team);
@ -6780,10 +6788,14 @@ describe('Gun', function(){
} else
if('bob' === member.name){
done.bob = true;
} else {
} else
if('alan' === member.name){
done.alan = true;
} else
{
expect(member).to.not.be.ok();
}
if(done.alice && done.bob){
if(done.alice && done.bob && done.alan){
setTimeout(function(){
done();
},10);