mirror of
https://github.com/amark/gun.git
synced 2025-03-30 15:08:33 +00:00
Merge pull request #276 from PsychoLlama/s2s-sync
Optimistically open client sockets
This commit is contained in:
commit
42003cb82f
150
lib/wsp/Peer.js
150
lib/wsp/Peer.js
@ -1,10 +1,16 @@
|
||||
/* eslint-disable no-underscore-dangle */
|
||||
'use strict';
|
||||
|
||||
var WebSocket = require('ws');
|
||||
var Emitter = require('events');
|
||||
var util = require('util');
|
||||
|
||||
/**
|
||||
* Calculates backoff instances.
|
||||
* @param {Object} [options] - Override the default settings.
|
||||
* @param {Object} options.time=50 - Initial backoff time.
|
||||
* @param {Object} options.factor=2 - How much to multiply the time by.
|
||||
* @param {Object} options.max=1min - Maximum backoff time.
|
||||
* @class
|
||||
*/
|
||||
function Backoff (options) {
|
||||
@ -19,7 +25,14 @@ function Backoff (options) {
|
||||
* @return {Number} - The next backoff time.
|
||||
*/
|
||||
Backoff.prototype.next = function () {
|
||||
this.time *= this.factor;
|
||||
var next = this.time * this.factor;
|
||||
|
||||
if (next > this.max) {
|
||||
this.time = this.max;
|
||||
return this.max;
|
||||
}
|
||||
|
||||
this.time = next;
|
||||
|
||||
return this.time;
|
||||
};
|
||||
@ -33,13 +46,29 @@ Backoff.prototype.reset = function () {
|
||||
|
||||
this.time = options.time || 50;
|
||||
this.factor = options.factor || 2;
|
||||
this.max = options.max || 1 * 60 * 1000;
|
||||
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a websocket client and handle reconnect backoff logic.
|
||||
* @param {String} url - A preformatted url (starts with ws://)
|
||||
* Schedules the next connection, according to the backoff.
|
||||
* @param {Peer} peer - A peer instance.
|
||||
* @return {Timeout} - The timeout value from `setTimeout`.
|
||||
*/
|
||||
function scheduleReconnect (peer) {
|
||||
var backoff = peer.backoff;
|
||||
var time = backoff.time;
|
||||
backoff.next();
|
||||
|
||||
var reconnect = peer.connect.bind(peer);
|
||||
|
||||
return setTimeout(reconnect, time);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles reconnections and defers messages until the socket is ready.
|
||||
* @param {String} url - The address to connect to.
|
||||
* @param {Object} [options] - Override how the socket is managed.
|
||||
* @param {Object} options.backoff - Backoff options (see the constructor).
|
||||
* @class
|
||||
@ -49,14 +78,38 @@ function Peer (url, options) {
|
||||
return new Peer(url, options);
|
||||
}
|
||||
|
||||
// Extend EventEmitter.
|
||||
Emitter.call(this);
|
||||
this.setMaxListeners(Infinity);
|
||||
|
||||
this.options = options || {};
|
||||
|
||||
// Messages sent while offline.
|
||||
this.offline = [];
|
||||
// Messages sent before the socket is ready.
|
||||
this.deferredMsgs = [];
|
||||
|
||||
this.url = Peer.formatURL(url);
|
||||
this.backoff = new Backoff(this.options.backoff);
|
||||
this.retry(url);
|
||||
|
||||
// Set up the websocket.
|
||||
this.connect();
|
||||
|
||||
var peer = this;
|
||||
var reconnect = scheduleReconnect.bind(null, peer);
|
||||
|
||||
// Handle reconnection.
|
||||
this.on('close', reconnect);
|
||||
this.on('error', function (error) {
|
||||
if (error.code === 'ECONNREFUSED') {
|
||||
reconnect();
|
||||
}
|
||||
});
|
||||
|
||||
// Send deferred messages.
|
||||
this.on('open', function () {
|
||||
peer.drainQueue();
|
||||
peer.backoff.reset();
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -70,93 +123,48 @@ Peer.formatURL = function (url) {
|
||||
return url.replace('http', 'ws');
|
||||
};
|
||||
|
||||
util.inherits(Peer, Emitter);
|
||||
var API = Peer.prototype;
|
||||
|
||||
/**
|
||||
* Attempts a websocket connection.
|
||||
* @param {String} url - The websocket URL.
|
||||
* @return {WebSocket} - The new websocket instance.
|
||||
*/
|
||||
API.retry = function () {
|
||||
API.connect = function () {
|
||||
var url = this.url;
|
||||
|
||||
// Open a new websocket.
|
||||
var socket = new WebSocket(url);
|
||||
|
||||
// Re-use the previous listeners.
|
||||
socket._events = this._events;
|
||||
|
||||
this.socket = socket;
|
||||
|
||||
this.retryOnDisconnect(socket);
|
||||
|
||||
this.sendOnConnection();
|
||||
|
||||
return socket;
|
||||
};
|
||||
|
||||
/**
|
||||
* Sends the messages that couldn't be sent before once
|
||||
* the connection is open.
|
||||
* Sends all the messages in the deferred queue.
|
||||
* @return {Peer} - The context.
|
||||
*/
|
||||
API.sendOnConnection = function () {
|
||||
API.drainQueue = function () {
|
||||
var peer = this;
|
||||
var queue = this.offline;
|
||||
var socket = this.socket;
|
||||
|
||||
// Wait for the socket to connect.
|
||||
socket.once('open', function () {
|
||||
queue.forEach(function (msg) {
|
||||
socket.send(msg);
|
||||
});
|
||||
|
||||
peer.offline = [];
|
||||
this.deferredMsgs.forEach(function (msg) {
|
||||
peer.send(msg);
|
||||
});
|
||||
|
||||
// Reset the queue.
|
||||
this.deferredMsgs = [];
|
||||
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Schedules the next retry, according to the backoff.
|
||||
* @param {Peer} peer - A peer instance.
|
||||
* @return {Timeout} - The timeout value from `setTimeout`.
|
||||
*/
|
||||
function schedule (peer) {
|
||||
var backoff = peer.backoff;
|
||||
var time = backoff.time;
|
||||
backoff.next();
|
||||
|
||||
return setTimeout(function () {
|
||||
var socket = peer.retry();
|
||||
|
||||
// Successfully reconnected? Reset the backoff.
|
||||
socket.once('open', backoff.reset.bind(backoff));
|
||||
}, time);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attaches handlers to the socket, attempting reconnection
|
||||
* when it's closed.
|
||||
* @param {WebSocket} socket - The websocket instance to bind to.
|
||||
* @return {WebSocket} - The same websocket.
|
||||
*/
|
||||
API.retryOnDisconnect = function (socket) {
|
||||
var peer = this;
|
||||
|
||||
// Listen for socket close events.
|
||||
socket.once('close', function () {
|
||||
schedule(peer);
|
||||
});
|
||||
|
||||
socket.on('error', function (error) {
|
||||
if (error.code === 'ECONNREFUSED') {
|
||||
schedule(peer);
|
||||
}
|
||||
});
|
||||
|
||||
return socket;
|
||||
};
|
||||
|
||||
/**
|
||||
* Send data through the socket, or add it to a queue
|
||||
* of offline requests if it's not ready yet.
|
||||
* @param {String} msg - The data to send.
|
||||
* of deferred messages if it's not ready yet.
|
||||
* @param {Mixed} msg - String, or anything that JSON can handle.
|
||||
* @return {Peer} - The context.
|
||||
*/
|
||||
API.send = function (msg) {
|
||||
@ -164,10 +172,16 @@ API.send = function (msg) {
|
||||
var state = socket.readyState;
|
||||
var ready = socket.OPEN;
|
||||
|
||||
// Make sure it's a string.
|
||||
if (typeof msg !== 'string') {
|
||||
msg = JSON.stringify(msg);
|
||||
}
|
||||
|
||||
// Make sure the socket is ready.
|
||||
if (state === ready) {
|
||||
socket.send(msg);
|
||||
} else {
|
||||
this.offline.push(msg);
|
||||
this.deferredMsgs.push(msg);
|
||||
}
|
||||
|
||||
return this;
|
||||
|
101
lib/wsp/Pool.js
Normal file
101
lib/wsp/Pool.js
Normal file
@ -0,0 +1,101 @@
|
||||
'use strict';
|
||||
|
||||
/**
|
||||
* Simpler interface over a collection of sockets. Works with
|
||||
* WebSocket clients, or sockets from a WebSocket server.
|
||||
* @class
|
||||
*/
|
||||
function Pool () {
|
||||
if (!(this instanceof Pool)) {
|
||||
return new Pool();
|
||||
}
|
||||
|
||||
// Maps IDs to sockets.
|
||||
this.sockets = {};
|
||||
}
|
||||
|
||||
var API = Pool.prototype;
|
||||
|
||||
/**
|
||||
* Returns the socket by the given ID.
|
||||
* @param {String} id - The unique socket ID.
|
||||
* @return {WebSocket|Null} - The WebSocket, if found.
|
||||
*/
|
||||
API.get = function (id) {
|
||||
return this.sockets[id] || null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Adds a socket to the pool.
|
||||
* @param {String} id - The socket ID.
|
||||
* @param {WebSocket} socket - A websocket instance.
|
||||
* @return {Pool} - The context.
|
||||
*/
|
||||
API.add = function (id, socket) {
|
||||
this.sockets[id] = socket;
|
||||
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Removes a socket from the pool.
|
||||
* @param {String} id - The ID of the socket to remove.
|
||||
* @return {Boolean} - Whether the pool contained the socket.
|
||||
*/
|
||||
API.remove = function (id) {
|
||||
var sockets = this.sockets;
|
||||
var hasSocket = sockets.hasOwnProperty(id);
|
||||
|
||||
if (hasSocket) {
|
||||
delete sockets[id];
|
||||
}
|
||||
|
||||
return hasSocket;
|
||||
};
|
||||
|
||||
/**
|
||||
* Creates a filtered pool of sockets. Works the same as Array#filter.
|
||||
* @param {Function} fn - Called for each socket in the pool.
|
||||
* @param {Mixed} [_this] - The `this` context to use when invoking
|
||||
* the callback.
|
||||
* @return {Pool} - A new, filtered socket pool.
|
||||
*/
|
||||
API.filter = function (fn, _this) {
|
||||
var filtered = Pool();
|
||||
var pool = this;
|
||||
|
||||
_this = _this || pool;
|
||||
|
||||
Object.keys(this.sockets).forEach(function (id) {
|
||||
var socket = pool.sockets[id];
|
||||
|
||||
var shouldAdd = fn.call(_this, socket, id, pool);
|
||||
|
||||
// Add it to the new pool.
|
||||
if (shouldAdd) {
|
||||
filtered.add(id, socket);
|
||||
}
|
||||
});
|
||||
|
||||
return filtered;
|
||||
};
|
||||
|
||||
/**
|
||||
* Send a message through each socket in the pool.
|
||||
* @param {String} msg - The message to send.
|
||||
* @return {Number} - How many sockets the message was sent to.
|
||||
*/
|
||||
API.send = function (msg) {
|
||||
var pool = this;
|
||||
|
||||
var ids = Object.keys(this.sockets);
|
||||
|
||||
ids.forEach(function (id) {
|
||||
var socket = pool.sockets[id];
|
||||
socket.send(msg);
|
||||
});
|
||||
|
||||
return ids.length;
|
||||
};
|
||||
|
||||
module.exports = Pool;
|
@ -1,245 +1,65 @@
|
||||
/* eslint-env node*/
|
||||
/*
|
||||
eslint-disable
|
||||
require-jsdoc,
|
||||
no-warning-comments,
|
||||
no-underscore-dangle,
|
||||
max-params,
|
||||
*/
|
||||
'use strict';
|
||||
|
||||
var Gun = require('../../gun');
|
||||
var WS = require('ws');
|
||||
var Socket = require('./Peer');
|
||||
var Pool = require('./Pool');
|
||||
var duplicate = require('./duplicate');
|
||||
|
||||
var Tab = {};
|
||||
Tab.on = Gun.on;
|
||||
Tab.peers = (function () {
|
||||
// Maps URLs to sockets.
|
||||
// Shared between all gun instances.
|
||||
var sockets = Pool();
|
||||
var emitter = { on: Gun.on };
|
||||
var server = {
|
||||
|
||||
function Peer (peers) {
|
||||
if (!Peer.is(this)) {
|
||||
return new Peer(peers);
|
||||
}
|
||||
// Session id.
|
||||
sid: Gun.text.random(),
|
||||
|
||||
this.peers = peers;
|
||||
}
|
||||
// Request handlers.
|
||||
handlers: [],
|
||||
|
||||
Peer.is = function (peer) {
|
||||
return peer instanceof Peer;
|
||||
};
|
||||
|
||||
function map (peer, url) {
|
||||
var msg = this.msg;
|
||||
var opt = this.opt || {};
|
||||
opt.out = true;
|
||||
Peer.request(url, msg, null, opt);
|
||||
}
|
||||
|
||||
Peer.prototype.send = function (msg, opt) {
|
||||
Peer.request.each(this.peers, map, {
|
||||
msg: msg,
|
||||
opt: opt,
|
||||
// Call handlers.
|
||||
handle: function (req, res) {
|
||||
server.handlers.forEach(function (server) {
|
||||
server(req, res);
|
||||
});
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
Peer.request = (function () {
|
||||
/**
|
||||
* Take a map of URLs pointing to options and ensure the
|
||||
* urls are using the WS protocol.
|
||||
* @param {Object} peers - Any object with URLs as keys.
|
||||
* @return {Object} - Object with normalized URL keys.
|
||||
*/
|
||||
function normalizeURLs (peers) {
|
||||
var formatted = {};
|
||||
|
||||
function request (base, body, cb, opt) {
|
||||
Object.keys(peers).forEach(function (url) {
|
||||
var options = peers[url];
|
||||
var id = Socket.formatURL(url);
|
||||
formatted[id] = options;
|
||||
});
|
||||
|
||||
var obj = base.length ? { base: base } : {};
|
||||
obj.base = opt.base || base;
|
||||
obj.body = opt.body || body;
|
||||
obj.headers = opt.headers;
|
||||
obj.url = opt.url;
|
||||
obj.out = opt.out;
|
||||
cb = cb || function () {};
|
||||
return formatted;
|
||||
}
|
||||
|
||||
if (!obj.base) {
|
||||
return;
|
||||
}
|
||||
/**
|
||||
* Turns a map of URLs into a socket pool.
|
||||
* @param {Object} peers - Any object with URLs as keys.
|
||||
* @return {Pool} - A pool of sockets corresponding to the URLs.
|
||||
*/
|
||||
function getSocketSubset (peers) {
|
||||
var urls = normalizeURLs(peers);
|
||||
|
||||
request.transport(obj, cb);
|
||||
}
|
||||
|
||||
request.createServer = function (fn) {
|
||||
request.createServer.list.push(fn);
|
||||
};
|
||||
|
||||
request.createServer.ing = function (req, cb) {
|
||||
var index = request.createServer.list.length;
|
||||
var server;
|
||||
while (index) {
|
||||
index -= 1;
|
||||
server = request.createServer.list[index] || function () {};
|
||||
server(req, cb);
|
||||
}
|
||||
};
|
||||
|
||||
request.createServer.list = [];
|
||||
request.back = 2;
|
||||
request.backoff = 2;
|
||||
|
||||
request.transport = function (opt, cb) {
|
||||
if (request.ws(opt, cb)) {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
request.ws = function (opt, cb, req) {
|
||||
var ws;
|
||||
if (!WS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ws = request.ws.peers[opt.base];
|
||||
if (ws) {
|
||||
req = req || {};
|
||||
if (opt.headers) {
|
||||
req.headers = opt.headers;
|
||||
}
|
||||
if (opt.body) {
|
||||
req.body = opt.body;
|
||||
}
|
||||
|
||||
if (opt.url) {
|
||||
req.url = opt.url;
|
||||
}
|
||||
|
||||
req.headers = req.headers || {};
|
||||
|
||||
if (!opt.out && !ws.cbs[req.headers['ws-rid']]) {
|
||||
var rid = 'WS' +
|
||||
new Date().getTime() +
|
||||
'.' +
|
||||
Math.floor((Math.random() * 65535) + 1);
|
||||
|
||||
req.headers['ws-rid'] = rid;
|
||||
|
||||
ws.cbs[rid] = function (err, res) {
|
||||
if (!res || res.body || res.end) {
|
||||
delete ws.cbs[req.headers['ws-rid']];
|
||||
}
|
||||
|
||||
cb(err, res);
|
||||
};
|
||||
}
|
||||
|
||||
if (!ws.readyState) {
|
||||
setTimeout(function () {
|
||||
request.ws(opt, cb, req);
|
||||
}, 100);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
ws.sending = true;
|
||||
ws.send(JSON.stringify(req));
|
||||
return true;
|
||||
}
|
||||
|
||||
if (ws === false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
var wsURL = opt.base.replace('http', 'ws');
|
||||
|
||||
ws = request.ws.peers[opt.base] = new WS(wsURL);
|
||||
ws.cbs = {};
|
||||
|
||||
ws.onopen = function () {
|
||||
request.back = 2;
|
||||
request.ws(opt, cb);
|
||||
};
|
||||
|
||||
ws.onclose = function (event) {
|
||||
|
||||
if (!ws || !event) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (ws.close instanceof Function) {
|
||||
ws.close();
|
||||
}
|
||||
|
||||
if (!ws.sending) {
|
||||
ws = request.ws.peers[opt.base] = false;
|
||||
request.transport(opt, cb);
|
||||
return;
|
||||
}
|
||||
|
||||
request.each(ws.cbs, function (cb) {
|
||||
cb({
|
||||
err: 'WebSocket disconnected!',
|
||||
code: ws.sending ? (ws || {}).err || event.code : -1,
|
||||
});
|
||||
});
|
||||
|
||||
// This will make the next request try to reconnect
|
||||
ws = request.ws.peers[opt.base] = null;
|
||||
|
||||
// TODO: Have the driver handle this!
|
||||
setTimeout(function () {
|
||||
|
||||
// opt here is a race condition,
|
||||
// is it not? Does this matter?
|
||||
request.ws(opt, function () {});
|
||||
}, request.back *= request.backoff);
|
||||
};
|
||||
|
||||
ws.onmessage = function (msg) {
|
||||
var res;
|
||||
if (!msg || !msg.data) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
res = JSON.parse(msg.data);
|
||||
} catch (error) {
|
||||
return;
|
||||
}
|
||||
if (!res) {
|
||||
return;
|
||||
}
|
||||
res.headers = res.headers || {};
|
||||
if (res.headers['ws-rid']) {
|
||||
var cb = ws.cbs[res.headers['ws-rid']] || function () {};
|
||||
cb(null, res);
|
||||
return;
|
||||
}
|
||||
|
||||
// emit extra events.
|
||||
if (res.body) {
|
||||
request.createServer.ing(res, function (res) {
|
||||
res.out = true;
|
||||
request(opt.base, null, null, res);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
ws.onerror = function (error) {
|
||||
(ws || {}).err = error;
|
||||
};
|
||||
|
||||
return true;
|
||||
};
|
||||
request.ws.peers = {};
|
||||
request.ws.cbs = {};
|
||||
|
||||
request.each = function (obj, cb, as) {
|
||||
if (!obj || !cb) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (var key in obj) {
|
||||
if (obj.hasOwnProperty(key)) {
|
||||
cb.call(as, obj[key], key);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return request;
|
||||
}());
|
||||
|
||||
return Peer;
|
||||
}());
|
||||
return sockets.filter(function (socket) {
|
||||
return urls.hasOwnProperty(socket.url);
|
||||
});
|
||||
}
|
||||
|
||||
// Handle read requests.
|
||||
Gun.on('get', function (at) {
|
||||
@ -248,17 +68,20 @@ Gun.on('get', function (at) {
|
||||
var peers = opt.peers || gun.Back('opt.peers');
|
||||
|
||||
if (!peers || Gun.obj.empty(peers)) {
|
||||
Gun.log.once('peers', 'Warning! You have no peers to connect to!');
|
||||
at.gun.Back(-1).on('in', {'@': at['#']});
|
||||
at.gun.Back(Infinity).on('in', {
|
||||
'@': at['#'],
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
var id = at['#'] || Gun.text.random(9);
|
||||
|
||||
// Create a new message.
|
||||
var msg = {
|
||||
|
||||
// msg ID
|
||||
'#': at['#'] || Gun.text.random(9),
|
||||
'#': id,
|
||||
|
||||
// msg BODY
|
||||
'$': at.get,
|
||||
@ -266,7 +89,7 @@ Gun.on('get', function (at) {
|
||||
|
||||
// Listen for a response.
|
||||
// TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
|
||||
Tab.on(msg['#'], function (err, data) {
|
||||
emitter.on(id, function (err, data) {
|
||||
var obj = {
|
||||
'@': at['#'],
|
||||
err: err,
|
||||
@ -280,11 +103,12 @@ Gun.on('get', function (at) {
|
||||
}
|
||||
});
|
||||
|
||||
// Broadcast to all other peers.
|
||||
Tab.peers(peers).send(msg, {
|
||||
headers: {
|
||||
'gun-sid': Tab.server.sid,
|
||||
},
|
||||
var subset = getSocketSubset(peers);
|
||||
|
||||
// Broadcast to the connected peers.
|
||||
subset.send({
|
||||
headers: { 'gun-sid': server.sid },
|
||||
body: msg,
|
||||
});
|
||||
});
|
||||
|
||||
@ -293,26 +117,40 @@ Gun.on('put', function (at) {
|
||||
if (at['@']) {
|
||||
return;
|
||||
}
|
||||
var opt = at.gun.Back('opt') || {}, peers = opt.peers;
|
||||
|
||||
var peers = at.gun.Back('opt.peers');
|
||||
var enabled = at.gun.Back('opt.websocket');
|
||||
var options = at.opt || {};
|
||||
|
||||
if (!peers || Gun.obj.empty(peers)) {
|
||||
Gun.log.once('peers', 'Warning! You have no peers to save to!');
|
||||
at.gun.Back(-1).on('in', {'@': at['#']});
|
||||
|
||||
// TODO: What about wsp/server clients? Maybe we shouldn't
|
||||
// immediately assume there's no data to be found.
|
||||
at.gun.Back(-1).on('in', {
|
||||
'@': at['#'],
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
if (opt.websocket === false || (at.opt && at.opt.websocket === false)) {
|
||||
|
||||
if (options.websocket === false || enabled === false) {
|
||||
return;
|
||||
}
|
||||
|
||||
var id = at['#'] || Gun.text.random(9);
|
||||
|
||||
var msg = {
|
||||
|
||||
// msg ID
|
||||
'#': at['#'] || Gun.text.random(9),
|
||||
// Message ID.
|
||||
'#': id,
|
||||
|
||||
// msg BODY
|
||||
// Message body.
|
||||
'$': at.put,
|
||||
};
|
||||
|
||||
// TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
|
||||
Tab.on(msg['#'], function (err, ok) {
|
||||
// Listen for acknowledgement(s).
|
||||
Gun.on(id, function (err, ok) {
|
||||
at.gun.Back(-1).on('in', {
|
||||
'@': at['#'],
|
||||
err: err,
|
||||
@ -320,28 +158,157 @@ Gun.on('put', function (at) {
|
||||
});
|
||||
});
|
||||
|
||||
Tab.peers(peers).send(msg, {
|
||||
headers: {
|
||||
'gun-sid': Tab.server.sid,
|
||||
},
|
||||
var subset = getSocketSubset(peers);
|
||||
|
||||
subset.send({
|
||||
headers: { 'gun-sid': server.sid },
|
||||
body: msg,
|
||||
});
|
||||
});
|
||||
|
||||
// Open any new sockets listed,
|
||||
// adding them to the global pool.
|
||||
Gun.on('opt', function (context) {
|
||||
var gun = context.gun;
|
||||
|
||||
var peers = gun.Back('opt.peers') || {};
|
||||
|
||||
Gun.obj.map(peers, function (options, url) {
|
||||
if (sockets[url]) {
|
||||
return;
|
||||
}
|
||||
|
||||
var socket = Socket(url, options);
|
||||
sockets.add(url, socket);
|
||||
|
||||
/**
|
||||
* Handle responses to requests, adding default headers.
|
||||
* @param {Object} reply - A gun reply object.
|
||||
* @return {undefined}
|
||||
*/
|
||||
function respond (reply) {
|
||||
|
||||
// Make sure headers are defined.
|
||||
var headers = reply.headers = reply.headers || {};
|
||||
|
||||
// Add 'gun-sid' if it doesn't exist.
|
||||
headers['gun-sid'] = headers['gun-sid'] || server.sid;
|
||||
|
||||
socket.send(reply);
|
||||
}
|
||||
|
||||
socket.on('message', function (msg) {
|
||||
var request;
|
||||
|
||||
try {
|
||||
request = JSON.parse(msg);
|
||||
} catch (error) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Validate the request.
|
||||
if (!request || !request.body) {
|
||||
return;
|
||||
}
|
||||
|
||||
request.headers = request.headers || {};
|
||||
|
||||
// emit extra events.
|
||||
server.handle(request, respond);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// REVIEW: Do I need this on a server client?
|
||||
// browser/client side Server!
|
||||
// TODO: BUG! Does not respect separate instances!!!
|
||||
Gun.on('opt', function (at) {
|
||||
if (Tab.server) {
|
||||
var gun = at.gun;
|
||||
var root = gun.Back(Infinity);
|
||||
var options = (root._.opt = root._.opt || {});
|
||||
|
||||
// Only register once per gun instance.
|
||||
if (options['@client']) {
|
||||
return;
|
||||
}
|
||||
|
||||
var gun = at.gun;
|
||||
var server = Tab.server = Tab.server || {};
|
||||
var tmp;
|
||||
var driver = options['@client'] = {
|
||||
|
||||
server.sid = Gun.text.random();
|
||||
/**
|
||||
* Handles get requests sent from other peers.
|
||||
* @param {Object} req - The get request.
|
||||
* @param {Function} cb - Handles replies.
|
||||
* @return {undefined}
|
||||
*/
|
||||
get: function (req, cb) {
|
||||
var body = req.body;
|
||||
var lex = body.$;
|
||||
var graph = gun._.root._.graph;
|
||||
var node = graph[lex['#']];
|
||||
|
||||
Tab.peers.request.createServer(function (req, res) {
|
||||
// TODO: Reply even if it's not in memory.
|
||||
if (!node) {
|
||||
return;
|
||||
}
|
||||
|
||||
cb({
|
||||
body: {
|
||||
'#': duplicate.track.newID(),
|
||||
'@': body['#'],
|
||||
'$': node,
|
||||
},
|
||||
});
|
||||
},
|
||||
|
||||
/**
|
||||
* Handles put requests sent from other peers.
|
||||
* @param {Object} req - The put request.
|
||||
* @param {Function} cb - A response callback.
|
||||
* @return {undefined}
|
||||
*/
|
||||
put: function (req, cb) {
|
||||
var body = req.body;
|
||||
var graph = body.$;
|
||||
|
||||
// Cached gun paths.
|
||||
var path = gun._.root._.path || {};
|
||||
|
||||
graph = Gun.obj.map(graph, function (node, soul, map) {
|
||||
if (!path[soul]) {
|
||||
return;
|
||||
}
|
||||
map(soul, node);
|
||||
});
|
||||
|
||||
// filter out what we don't have in memory.
|
||||
if (!graph) {
|
||||
return;
|
||||
}
|
||||
|
||||
var id = Gun.on.ask(function (ack, event) {
|
||||
if (!ack) {
|
||||
return;
|
||||
}
|
||||
|
||||
event.off();
|
||||
|
||||
cb({
|
||||
body: {
|
||||
'#': duplicate.track.newID(),
|
||||
'@': body['#'],
|
||||
'$': ack,
|
||||
'!': ack.err,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
gun.on('out', {
|
||||
'#': duplicate.track(id),
|
||||
gun: gun,
|
||||
opt: { websocket: false },
|
||||
put: graph,
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
server.handlers.push(function (req, res) {
|
||||
|
||||
// Validate request.
|
||||
if (!req || !res || !req.body || !req.headers) {
|
||||
@ -350,111 +317,26 @@ Gun.on('opt', function (at) {
|
||||
|
||||
var msg = req.body;
|
||||
|
||||
// AUTH for non-replies.
|
||||
if (server.msg(msg['#'])) {
|
||||
if (duplicate(msg['#'])) {
|
||||
return;
|
||||
}
|
||||
|
||||
// no need to process.
|
||||
// It's a response, no need to reply.
|
||||
if (msg['@']) {
|
||||
if (Tab.ons[tmp = msg['@'] || msg['#']]) {
|
||||
Tab.on(tmp, [msg['!'], msg.$]);
|
||||
}
|
||||
var reqID = msg['@'];
|
||||
|
||||
emitter.on(reqID, [
|
||||
msg['!'],
|
||||
msg.$,
|
||||
]);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.$ && msg.$['#']) {
|
||||
server.get(req, res);
|
||||
return;
|
||||
}
|
||||
var isLex = msg.$ && msg.$['#'];
|
||||
var method = isLex ? 'get' : 'put';
|
||||
|
||||
server.put(req, res);
|
||||
driver[method](req, res);
|
||||
});
|
||||
|
||||
server.get = function (req, cb) {
|
||||
var body = req.body;
|
||||
var lex = body.$;
|
||||
var graph = gun._.root._.graph;
|
||||
var node;
|
||||
|
||||
// Don't reply to data we don't have it in memory.
|
||||
// TODO: Add localStorage?
|
||||
if (!(node = graph[lex['#']])) {
|
||||
return;
|
||||
}
|
||||
|
||||
cb({
|
||||
body: {
|
||||
'#': server.msg(),
|
||||
'@': body['#'],
|
||||
'$': node,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
server.put = function (req, cb) {
|
||||
var body = req.body, graph = body.$;
|
||||
var __ = gun._.root._;
|
||||
|
||||
// filter out what we don't have in memory.
|
||||
if (!(graph = Gun.obj.map(graph, function (node, soul, map) {
|
||||
if (!__.path[soul]) {
|
||||
return;
|
||||
}
|
||||
map(soul, node);
|
||||
}))) {
|
||||
return;
|
||||
}
|
||||
gun.on('out', {
|
||||
gun: gun,
|
||||
opt: {
|
||||
websocket: false,
|
||||
},
|
||||
put: graph,
|
||||
'#': Gun.on.ask(function (ack, ev) {
|
||||
if (!ack) {
|
||||
return undefined;
|
||||
}
|
||||
ev.off();
|
||||
return cb({
|
||||
body: {
|
||||
'#': server.msg(),
|
||||
'@': body['#'],
|
||||
'$': ack,
|
||||
'!': ack.err,
|
||||
},
|
||||
});
|
||||
}),
|
||||
});
|
||||
};
|
||||
|
||||
server.msg = function (id) {
|
||||
if (!id) {
|
||||
id = Gun.text.random(9);
|
||||
server.msg.debounce[id] = Gun.time.is();
|
||||
return id;
|
||||
}
|
||||
|
||||
clearTimeout(server.msg.clear);
|
||||
server.msg.clear = setTimeout(function () {
|
||||
var now = Gun.time.is();
|
||||
Gun.obj.map(server.msg.debounce, function (time, id) {
|
||||
if ((now - time) < (1000 * 60 * 5)) {
|
||||
return;
|
||||
}
|
||||
|
||||
Gun.obj.del(server.msg.debounce, id);
|
||||
});
|
||||
}, 500);
|
||||
|
||||
if (server.msg.debounce[id]) {
|
||||
server.msg.debounce[id] = Gun.time.is();
|
||||
return id;
|
||||
}
|
||||
|
||||
server.msg.debounce[id] = Gun.time.is();
|
||||
return undefined;
|
||||
};
|
||||
|
||||
server.msg.debounce = server.msg.debounce || {};
|
||||
});
|
||||
|
90
lib/wsp/duplicate.js
Normal file
90
lib/wsp/duplicate.js
Normal file
@ -0,0 +1,90 @@
|
||||
'use strict';
|
||||
|
||||
var Gun = require('../../gun');
|
||||
|
||||
var cache = {};
|
||||
var timeout = null;
|
||||
|
||||
/**
|
||||
* Remove all entries in the cache older than 5 minutes.
|
||||
* Reschedules itself to run again when the oldest item
|
||||
* might be too old.
|
||||
* @return {undefined}
|
||||
*/
|
||||
function gc () {
|
||||
var now = Date.now();
|
||||
var oldest = now;
|
||||
var maxAge = 5 * 60 * 1000;
|
||||
|
||||
Gun.obj.map(cache, function (time, id) {
|
||||
oldest = Math.min(now, time);
|
||||
|
||||
if ((now - time) < maxAge) {
|
||||
return;
|
||||
}
|
||||
|
||||
delete cache[id];
|
||||
});
|
||||
|
||||
var done = Gun.obj.empty(cache);
|
||||
|
||||
// Disengage GC.
|
||||
if (done) {
|
||||
timeout = null;
|
||||
return;
|
||||
}
|
||||
|
||||
// Just how old?
|
||||
var elapsed = now - oldest;
|
||||
|
||||
// How long before it's too old?
|
||||
var nextGC = maxAge - elapsed;
|
||||
|
||||
// Schedule the next GC event.
|
||||
timeout = setTimeout(gc, nextGC);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks a memory-efficient cache to see if a string has been seen before.
|
||||
* @param {String} id - A string to keep track of.
|
||||
* @return {Boolean} - Whether it's been seen recently.
|
||||
*/
|
||||
function duplicate (id) {
|
||||
|
||||
// Have we seen this ID recently?
|
||||
var existing = cache.hasOwnProperty(id);
|
||||
|
||||
// Add it to the cache.
|
||||
duplicate.track(id);
|
||||
|
||||
return existing;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts tracking an ID as a possible future duplicate.
|
||||
* @param {String} id - The ID to track.
|
||||
* @return {String} - The same ID.
|
||||
*/
|
||||
duplicate.track = function (id) {
|
||||
cache[id] = Date.now();
|
||||
|
||||
// Engage GC.
|
||||
if (!timeout) {
|
||||
gc();
|
||||
}
|
||||
|
||||
return id;
|
||||
};
|
||||
|
||||
/**
|
||||
* Generate a new ID and start tracking it.
|
||||
* @param {Number} [chars] - The number of characters to use.
|
||||
* @return {String} - The newly created ID.
|
||||
*/
|
||||
duplicate.track.newID = function (chars) {
|
||||
var id = Gun.text.random(chars);
|
||||
|
||||
return duplicate.track(id);
|
||||
};
|
||||
|
||||
module.exports = duplicate;
|
@ -45,12 +45,19 @@ function ready (socket, cb) {
|
||||
* @return {undefined}
|
||||
*/
|
||||
function request (context, clients, cb) {
|
||||
var id = context['#'] || Gun.text.random(9);
|
||||
|
||||
Gun.on(id, function (err, data, event) {
|
||||
cb(err, data);
|
||||
event.off();
|
||||
});
|
||||
|
||||
Gun.obj.map(clients, function (client) {
|
||||
ready(client, function () {
|
||||
var msg = {
|
||||
headers: {},
|
||||
body: {
|
||||
'#': Gun.on.ask(cb),
|
||||
'#': id,
|
||||
'$': context.get,
|
||||
},
|
||||
};
|
||||
@ -69,12 +76,19 @@ function request (context, clients, cb) {
|
||||
* @return {undefined}
|
||||
*/
|
||||
function update (context, clients, cb) {
|
||||
var id = context['#'] || Gun.text.random(9);
|
||||
|
||||
Gun.on(id, function (err, data, event) {
|
||||
cb(err, data);
|
||||
event.off();
|
||||
});
|
||||
|
||||
Gun.obj.map(clients, function (client) {
|
||||
ready(client, function () {
|
||||
var msg = {
|
||||
headers: {},
|
||||
body: {
|
||||
'#': Gun.on.ask(cb),
|
||||
'#': id,
|
||||
'$': context.put,
|
||||
},
|
||||
};
|
||||
@ -125,6 +139,7 @@ function attach (gun, server) {
|
||||
if (!isUsingServer(context.gun, server)) {
|
||||
return;
|
||||
}
|
||||
|
||||
request(context, pool, function (err, data) {
|
||||
var response = {
|
||||
'@': context['#'],
|
||||
@ -143,6 +158,10 @@ function attach (gun, server) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (context.nopush) {
|
||||
return;
|
||||
}
|
||||
|
||||
update(context, pool, function (err, data) {
|
||||
var ack = {
|
||||
'!': err || null,
|
||||
|
@ -152,8 +152,24 @@ Gun.on('opt', function(at){
|
||||
// TODO: BUG! server put should push.
|
||||
}
|
||||
tran.get = function(req, cb){
|
||||
var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
|
||||
gun.on('out', {gun: gun, get: lex, req: 1, '#': Gun.on.ask(function(at, ev){
|
||||
var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}};
|
||||
|
||||
var graph = gun.Back(Infinity)._.graph;
|
||||
var node = graph[lex['#']];
|
||||
var result = Gun.graph.ify(node);
|
||||
|
||||
if (node) {
|
||||
return cb({
|
||||
headers: reply.headers,
|
||||
body: {
|
||||
'#': gun.wsp.msg(),
|
||||
'@': body['#'],
|
||||
'$': result,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
gun.on('out', {gun: gun, get: lex, req: 1, '#': body['#'] || Gun.on.ask(function(at, ev){
|
||||
ev.off();
|
||||
var graph = at.put;
|
||||
return cb({headers: reply.headers, body: {
|
||||
|
Loading…
x
Reference in New Issue
Block a user