mirror of
https://github.com/amark/gun.git
synced 2025-03-30 15:08:33 +00:00
Optimistically open client sockets
Changes behavior from only opening sockets when absolutely necessary to keeping them open for as long as possible. Key differences: - Much higher success rate for messages sent from the connected server. - Process no longer shuts down if nothing is done with gun, instead listens for incoming messages on client sockets. Socket reconnect handle by Peer instances, meaning better handling for deferred messages and predictable backoff. The client.js logic has been significantly refactored. Among the improvements, GET/PUT requests now respect the `peers` option for each gun instance, only sending requests to the URLs listed.
This commit is contained in:
parent
e8f8047cb6
commit
e402e3966e
@ -155,7 +155,7 @@ API.drainQueue = function () {
|
||||
/**
|
||||
* Send data through the socket, or add it to a queue
|
||||
* of deferred messages if it's not ready yet.
|
||||
* @param {String} msg - The data to send.
|
||||
* @param {Mixed} msg - String, or anything that JSON can handle.
|
||||
* @return {Peer} - The context.
|
||||
*/
|
||||
API.send = function (msg) {
|
||||
@ -163,6 +163,11 @@ API.send = function (msg) {
|
||||
var state = socket.readyState;
|
||||
var ready = socket.OPEN;
|
||||
|
||||
// Make sure it's a string.
|
||||
if (typeof msg !== 'string') {
|
||||
msg = JSON.stringify(msg);
|
||||
}
|
||||
|
||||
// Make sure the socket is ready.
|
||||
if (state === ready) {
|
||||
socket.send(msg);
|
||||
|
101
lib/wsp/Pool.js
Normal file
101
lib/wsp/Pool.js
Normal file
@ -0,0 +1,101 @@
|
||||
'use strict';
|
||||
|
||||
/**
|
||||
* Simpler interface over a collection of sockets. Works with
|
||||
* WebSocket clients, or sockets from a WebSocket server.
|
||||
* @class
|
||||
*/
|
||||
function Pool () {
|
||||
if (!(this instanceof Pool)) {
|
||||
return new Pool();
|
||||
}
|
||||
|
||||
// Maps IDs to sockets.
|
||||
this.sockets = {};
|
||||
}
|
||||
|
||||
var API = Pool.prototype;
|
||||
|
||||
/**
|
||||
* Returns the socket by the given ID.
|
||||
* @param {String} id - The unique socket ID.
|
||||
* @return {WebSocket|Null} - The WebSocket, if found.
|
||||
*/
|
||||
API.get = function (id) {
|
||||
return this.sockets[id] || null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Adds a socket to the pool.
|
||||
* @param {String} id - The socket ID.
|
||||
* @param {WebSocket} socket - A websocket instance.
|
||||
* @return {Pool} - The context.
|
||||
*/
|
||||
API.add = function (id, socket) {
|
||||
this.sockets[id] = socket;
|
||||
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Removes a socket from the pool.
|
||||
* @param {String} id - The ID of the socket to remove.
|
||||
* @return {Boolean} - Whether the pool contained the socket.
|
||||
*/
|
||||
API.remove = function (id) {
|
||||
var sockets = this.sockets;
|
||||
var hasSocket = sockets.hasOwnProperty(id);
|
||||
|
||||
if (hasSocket) {
|
||||
delete sockets[id];
|
||||
}
|
||||
|
||||
return hasSocket;
|
||||
};
|
||||
|
||||
/**
|
||||
* Creates a filtered pool of sockets. Works the same as Array#filter.
|
||||
* @param {Function} fn - Called for each socket in the pool.
|
||||
* @param {Mixed} [_this] - The `this` context to use when invoking
|
||||
* the callback.
|
||||
* @return {Pool} - A new, filtered socket pool.
|
||||
*/
|
||||
API.filter = function (fn, _this) {
|
||||
var filtered = Pool();
|
||||
var pool = this;
|
||||
|
||||
_this = _this || pool;
|
||||
|
||||
Object.keys(this.sockets).forEach(function (id) {
|
||||
var socket = pool.sockets[id];
|
||||
|
||||
var shouldAdd = fn.call(_this, socket, id, pool);
|
||||
|
||||
// Add it to the new pool.
|
||||
if (shouldAdd) {
|
||||
filtered.add(id, socket);
|
||||
}
|
||||
});
|
||||
|
||||
return filtered;
|
||||
};
|
||||
|
||||
/**
|
||||
* Send a message through each socket in the pool.
|
||||
* @param {String} msg - The message to send.
|
||||
* @return {Number} - How many sockets the message was sent to.
|
||||
*/
|
||||
API.send = function (msg) {
|
||||
var pool = this;
|
||||
|
||||
var ids = Object.keys(this.sockets);
|
||||
|
||||
ids.forEach(function (id) {
|
||||
var socket = pool.sockets[id];
|
||||
socket.send(msg);
|
||||
});
|
||||
|
||||
return ids.length;
|
||||
};
|
||||
|
||||
module.exports = Pool;
|
@ -1,245 +1,65 @@
|
||||
/* eslint-env node*/
|
||||
/*
|
||||
eslint-disable
|
||||
require-jsdoc,
|
||||
no-warning-comments,
|
||||
no-underscore-dangle,
|
||||
max-params,
|
||||
*/
|
||||
'use strict';
|
||||
|
||||
var Gun = require('../../gun');
|
||||
var WS = require('ws');
|
||||
var Socket = require('./Peer');
|
||||
var Pool = require('./Pool');
|
||||
var duplicate = require('./duplicate');
|
||||
|
||||
var Tab = {};
|
||||
Tab.on = Gun.on;
|
||||
Tab.peers = (function () {
|
||||
// Maps URLs to sockets.
|
||||
// Shared between all gun instances.
|
||||
var sockets = Pool();
|
||||
var emitter = { on: Gun.on };
|
||||
var server = {
|
||||
|
||||
function Peer (peers) {
|
||||
if (!Peer.is(this)) {
|
||||
return new Peer(peers);
|
||||
}
|
||||
// Session id.
|
||||
sid: Gun.text.random(),
|
||||
|
||||
this.peers = peers;
|
||||
}
|
||||
// Request handlers.
|
||||
handlers: [],
|
||||
|
||||
Peer.is = function (peer) {
|
||||
return peer instanceof Peer;
|
||||
};
|
||||
|
||||
function map (peer, url) {
|
||||
var msg = this.msg;
|
||||
var opt = this.opt || {};
|
||||
opt.out = true;
|
||||
Peer.request(url, msg, null, opt);
|
||||
}
|
||||
|
||||
Peer.prototype.send = function (msg, opt) {
|
||||
Peer.request.each(this.peers, map, {
|
||||
msg: msg,
|
||||
opt: opt,
|
||||
// Call handlers.
|
||||
handle: function (req, res) {
|
||||
server.handlers.forEach(function (server) {
|
||||
server(req, res);
|
||||
});
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
Peer.request = (function () {
|
||||
/**
|
||||
* Take a map of URLs pointing to options and ensure the
|
||||
* urls are using the WS protocol.
|
||||
* @param {Object} peers - Any object with URLs as keys.
|
||||
* @return {Object} - Object with normalized URL keys.
|
||||
*/
|
||||
function normalizeURLs (peers) {
|
||||
var formatted = {};
|
||||
|
||||
function request (base, body, cb, opt) {
|
||||
Object.keys(peers).forEach(function (url) {
|
||||
var options = peers[url];
|
||||
var id = Socket.formatURL(url);
|
||||
formatted[id] = options;
|
||||
});
|
||||
|
||||
var obj = base.length ? { base: base } : {};
|
||||
obj.base = opt.base || base;
|
||||
obj.body = opt.body || body;
|
||||
obj.headers = opt.headers;
|
||||
obj.url = opt.url;
|
||||
obj.out = opt.out;
|
||||
cb = cb || function () {};
|
||||
return formatted;
|
||||
}
|
||||
|
||||
if (!obj.base) {
|
||||
return;
|
||||
}
|
||||
/**
|
||||
* Turns a map of URLs into a socket pool.
|
||||
* @param {Object} peers - Any object with URLs as keys.
|
||||
* @return {Pool} - A pool of sockets corresponding to the URLs.
|
||||
*/
|
||||
function getSocketSubset (peers) {
|
||||
var urls = normalizeURLs(peers);
|
||||
|
||||
request.transport(obj, cb);
|
||||
}
|
||||
|
||||
request.createServer = function (fn) {
|
||||
request.createServer.list.push(fn);
|
||||
};
|
||||
|
||||
request.createServer.ing = function (req, cb) {
|
||||
var index = request.createServer.list.length;
|
||||
var server;
|
||||
while (index) {
|
||||
index -= 1;
|
||||
server = request.createServer.list[index] || function () {};
|
||||
server(req, cb);
|
||||
}
|
||||
};
|
||||
|
||||
request.createServer.list = [];
|
||||
request.back = 2;
|
||||
request.backoff = 2;
|
||||
|
||||
request.transport = function (opt, cb) {
|
||||
if (request.ws(opt, cb)) {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
request.ws = function (opt, cb, req) {
|
||||
var ws;
|
||||
if (!WS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ws = request.ws.peers[opt.base];
|
||||
if (ws) {
|
||||
req = req || {};
|
||||
if (opt.headers) {
|
||||
req.headers = opt.headers;
|
||||
}
|
||||
if (opt.body) {
|
||||
req.body = opt.body;
|
||||
}
|
||||
|
||||
if (opt.url) {
|
||||
req.url = opt.url;
|
||||
}
|
||||
|
||||
req.headers = req.headers || {};
|
||||
|
||||
if (!opt.out && !ws.cbs[req.headers['ws-rid']]) {
|
||||
var rid = 'WS' +
|
||||
new Date().getTime() +
|
||||
'.' +
|
||||
Math.floor((Math.random() * 65535) + 1);
|
||||
|
||||
req.headers['ws-rid'] = rid;
|
||||
|
||||
ws.cbs[rid] = function (err, res) {
|
||||
if (!res || res.body || res.end) {
|
||||
delete ws.cbs[req.headers['ws-rid']];
|
||||
}
|
||||
|
||||
cb(err, res);
|
||||
};
|
||||
}
|
||||
|
||||
if (!ws.readyState) {
|
||||
setTimeout(function () {
|
||||
request.ws(opt, cb, req);
|
||||
}, 100);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
ws.sending = true;
|
||||
ws.send(JSON.stringify(req));
|
||||
return true;
|
||||
}
|
||||
|
||||
if (ws === false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
var wsURL = opt.base.replace('http', 'ws');
|
||||
|
||||
ws = request.ws.peers[opt.base] = new WS(wsURL);
|
||||
ws.cbs = {};
|
||||
|
||||
ws.onopen = function () {
|
||||
request.back = 2;
|
||||
request.ws(opt, cb);
|
||||
};
|
||||
|
||||
ws.onclose = function (event) {
|
||||
|
||||
if (!ws || !event) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (ws.close instanceof Function) {
|
||||
ws.close();
|
||||
}
|
||||
|
||||
if (!ws.sending) {
|
||||
ws = request.ws.peers[opt.base] = false;
|
||||
request.transport(opt, cb);
|
||||
return;
|
||||
}
|
||||
|
||||
request.each(ws.cbs, function (cb) {
|
||||
cb({
|
||||
err: 'WebSocket disconnected!',
|
||||
code: ws.sending ? (ws || {}).err || event.code : -1,
|
||||
});
|
||||
});
|
||||
|
||||
// This will make the next request try to reconnect
|
||||
ws = request.ws.peers[opt.base] = null;
|
||||
|
||||
// TODO: Have the driver handle this!
|
||||
setTimeout(function () {
|
||||
|
||||
// opt here is a race condition,
|
||||
// is it not? Does this matter?
|
||||
request.ws(opt, function () {});
|
||||
}, request.back *= request.backoff);
|
||||
};
|
||||
|
||||
ws.onmessage = function (msg) {
|
||||
var res;
|
||||
if (!msg || !msg.data) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
res = JSON.parse(msg.data);
|
||||
} catch (error) {
|
||||
return;
|
||||
}
|
||||
if (!res) {
|
||||
return;
|
||||
}
|
||||
res.headers = res.headers || {};
|
||||
if (res.headers['ws-rid']) {
|
||||
var cb = ws.cbs[res.headers['ws-rid']] || function () {};
|
||||
cb(null, res);
|
||||
return;
|
||||
}
|
||||
|
||||
// emit extra events.
|
||||
if (res.body) {
|
||||
request.createServer.ing(res, function (res) {
|
||||
res.out = true;
|
||||
request(opt.base, null, null, res);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
ws.onerror = function (error) {
|
||||
(ws || {}).err = error;
|
||||
};
|
||||
|
||||
return true;
|
||||
};
|
||||
request.ws.peers = {};
|
||||
request.ws.cbs = {};
|
||||
|
||||
request.each = function (obj, cb, as) {
|
||||
if (!obj || !cb) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (var key in obj) {
|
||||
if (obj.hasOwnProperty(key)) {
|
||||
cb.call(as, obj[key], key);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return request;
|
||||
}());
|
||||
|
||||
return Peer;
|
||||
}());
|
||||
return sockets.filter(function (socket) {
|
||||
return urls.hasOwnProperty(socket.url);
|
||||
});
|
||||
}
|
||||
|
||||
// Handle read requests.
|
||||
Gun.on('get', function (at) {
|
||||
@ -248,17 +68,20 @@ Gun.on('get', function (at) {
|
||||
var peers = opt.peers || gun.Back('opt.peers');
|
||||
|
||||
if (!peers || Gun.obj.empty(peers)) {
|
||||
Gun.log.once('peers', 'Warning! You have no peers to connect to!');
|
||||
at.gun.Back(-1).on('in', {'@': at['#']});
|
||||
at.gun.Back(Infinity).on('in', {
|
||||
'@': at['#'],
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
var id = at['#'] || Gun.text.random(9);
|
||||
|
||||
// Create a new message.
|
||||
var msg = {
|
||||
|
||||
// msg ID
|
||||
'#': at['#'] || Gun.text.random(9),
|
||||
'#': id,
|
||||
|
||||
// msg BODY
|
||||
'$': at.get,
|
||||
@ -266,7 +89,7 @@ Gun.on('get', function (at) {
|
||||
|
||||
// Listen for a response.
|
||||
// TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
|
||||
Tab.on(msg['#'], function (err, data) {
|
||||
emitter.on(id, function (err, data) {
|
||||
var obj = {
|
||||
'@': at['#'],
|
||||
err: err,
|
||||
@ -280,11 +103,12 @@ Gun.on('get', function (at) {
|
||||
}
|
||||
});
|
||||
|
||||
// Broadcast to all other peers.
|
||||
Tab.peers(peers).send(msg, {
|
||||
headers: {
|
||||
'gun-sid': Tab.server.sid,
|
||||
},
|
||||
var subset = getSocketSubset(peers);
|
||||
|
||||
// Broadcast to the connected peers.
|
||||
subset.send({
|
||||
headers: { 'gun-sid': server.sid },
|
||||
body: msg,
|
||||
});
|
||||
});
|
||||
|
||||
@ -293,26 +117,40 @@ Gun.on('put', function (at) {
|
||||
if (at['@']) {
|
||||
return;
|
||||
}
|
||||
var opt = at.gun.Back('opt') || {}, peers = opt.peers;
|
||||
|
||||
var peers = at.gun.Back('opt.peers');
|
||||
var enabled = at.gun.Back('opt.websocket');
|
||||
var options = at.opt || {};
|
||||
|
||||
if (!peers || Gun.obj.empty(peers)) {
|
||||
Gun.log.once('peers', 'Warning! You have no peers to save to!');
|
||||
at.gun.Back(-1).on('in', {'@': at['#']});
|
||||
|
||||
// TODO: What about wsp/server clients? Maybe we shouldn't
|
||||
// immediately assume there's no data to be found.
|
||||
at.gun.Back(-1).on('in', {
|
||||
'@': at['#'],
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
if (opt.websocket === false || (at.opt && at.opt.websocket === false)) {
|
||||
|
||||
if (options.websocket === false || enabled === false) {
|
||||
return;
|
||||
}
|
||||
|
||||
var id = at['#'] || Gun.text.random(9);
|
||||
|
||||
var msg = {
|
||||
|
||||
// msg ID
|
||||
'#': at['#'] || Gun.text.random(9),
|
||||
// Message ID.
|
||||
'#': id,
|
||||
|
||||
// msg BODY
|
||||
// Message body.
|
||||
'$': at.put,
|
||||
};
|
||||
|
||||
// TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
|
||||
Tab.on(msg['#'], function (err, ok) {
|
||||
// Listen for acknowledgement(s).
|
||||
Gun.on(id, function (err, ok) {
|
||||
at.gun.Back(-1).on('in', {
|
||||
'@': at['#'],
|
||||
err: err,
|
||||
@ -320,28 +158,157 @@ Gun.on('put', function (at) {
|
||||
});
|
||||
});
|
||||
|
||||
Tab.peers(peers).send(msg, {
|
||||
headers: {
|
||||
'gun-sid': Tab.server.sid,
|
||||
},
|
||||
var subset = getSocketSubset(peers);
|
||||
|
||||
subset.send({
|
||||
headers: { 'gun-sid': server.sid },
|
||||
body: msg,
|
||||
});
|
||||
});
|
||||
|
||||
// Open any new sockets listed,
|
||||
// adding them to the global pool.
|
||||
Gun.on('opt', function (context) {
|
||||
var gun = context.gun;
|
||||
|
||||
var peers = gun.Back('opt.peers') || {};
|
||||
|
||||
Gun.obj.map(peers, function (options, url) {
|
||||
if (sockets[url]) {
|
||||
return;
|
||||
}
|
||||
|
||||
var socket = Socket(url, options);
|
||||
sockets.add(url, socket);
|
||||
|
||||
/**
|
||||
* Handle responses to requests, adding default headers.
|
||||
* @param {Object} reply - A gun reply object.
|
||||
* @return {undefined}
|
||||
*/
|
||||
function respond (reply) {
|
||||
|
||||
// Make sure headers are defined.
|
||||
var headers = reply.headers = reply.headers || {};
|
||||
|
||||
// Add 'gun-sid' if it doesn't exist.
|
||||
headers['gun-sid'] = headers['gun-sid'] || server.sid;
|
||||
|
||||
socket.send(reply);
|
||||
}
|
||||
|
||||
socket.on('message', function (msg) {
|
||||
var request;
|
||||
|
||||
try {
|
||||
request = JSON.parse(msg);
|
||||
} catch (error) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Validate the request.
|
||||
if (!request || !request.body) {
|
||||
return;
|
||||
}
|
||||
|
||||
request.headers = request.headers || {};
|
||||
|
||||
// emit extra events.
|
||||
server.handle(request, respond);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// REVIEW: Do I need this on a server client?
|
||||
// browser/client side Server!
|
||||
// TODO: BUG! Does not respect separate instances!!!
|
||||
Gun.on('opt', function (at) {
|
||||
if (Tab.server) {
|
||||
var gun = at.gun;
|
||||
var root = gun.Back(Infinity);
|
||||
var options = (root._.opt = root._.opt || {});
|
||||
|
||||
// Only register once per gun instance.
|
||||
if (options['@client']) {
|
||||
return;
|
||||
}
|
||||
|
||||
var gun = at.gun;
|
||||
var server = Tab.server = Tab.server || {};
|
||||
var tmp;
|
||||
var driver = options['@client'] = {
|
||||
|
||||
server.sid = Gun.text.random();
|
||||
/**
|
||||
* Handles get requests sent from other peers.
|
||||
* @param {Object} req - The get request.
|
||||
* @param {Function} cb - Handles replies.
|
||||
* @return {undefined}
|
||||
*/
|
||||
get: function (req, cb) {
|
||||
var body = req.body;
|
||||
var lex = body.$;
|
||||
var graph = gun._.root._.graph;
|
||||
var node = graph[lex['#']];
|
||||
|
||||
Tab.peers.request.createServer(function (req, res) {
|
||||
// TODO: Reply even if it's not in memory.
|
||||
if (!node) {
|
||||
return;
|
||||
}
|
||||
|
||||
cb({
|
||||
body: {
|
||||
'#': duplicate.track.newID(),
|
||||
'@': body['#'],
|
||||
'$': node,
|
||||
},
|
||||
});
|
||||
},
|
||||
|
||||
/**
|
||||
* Handles put requests sent from other peers.
|
||||
* @param {Object} req - The put request.
|
||||
* @param {Function} cb - A response callback.
|
||||
* @return {undefined}
|
||||
*/
|
||||
put: function (req, cb) {
|
||||
var body = req.body;
|
||||
var graph = body.$;
|
||||
|
||||
// Cached gun paths.
|
||||
var path = gun._.root._.path || {};
|
||||
|
||||
graph = Gun.obj.map(graph, function (node, soul, map) {
|
||||
if (!path[soul]) {
|
||||
return;
|
||||
}
|
||||
map(soul, node);
|
||||
});
|
||||
|
||||
// filter out what we don't have in memory.
|
||||
if (!graph) {
|
||||
return;
|
||||
}
|
||||
|
||||
var id = Gun.on.ask(function (ack, event) {
|
||||
if (!ack) {
|
||||
return;
|
||||
}
|
||||
|
||||
event.off();
|
||||
|
||||
cb({
|
||||
body: {
|
||||
'#': duplicate.track.newID(),
|
||||
'@': body['#'],
|
||||
'$': ack,
|
||||
'!': ack.err,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
gun.on('out', {
|
||||
'#': duplicate.track(id),
|
||||
gun: gun,
|
||||
opt: { websocket: false },
|
||||
put: graph,
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
server.handlers.push(function (req, res) {
|
||||
|
||||
// Validate request.
|
||||
if (!req || !res || !req.body || !req.headers) {
|
||||
@ -350,111 +317,26 @@ Gun.on('opt', function (at) {
|
||||
|
||||
var msg = req.body;
|
||||
|
||||
// AUTH for non-replies.
|
||||
if (server.msg(msg['#'])) {
|
||||
if (duplicate(msg['#'])) {
|
||||
return;
|
||||
}
|
||||
|
||||
// no need to process.
|
||||
// It's a response, no need to reply.
|
||||
if (msg['@']) {
|
||||
if (Tab.ons[tmp = msg['@'] || msg['#']]) {
|
||||
Tab.on(tmp, [msg['!'], msg.$]);
|
||||
}
|
||||
var reqID = msg['@'];
|
||||
|
||||
emitter.on(reqID, [
|
||||
msg['!'],
|
||||
msg.$,
|
||||
]);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.$ && msg.$['#']) {
|
||||
server.get(req, res);
|
||||
return;
|
||||
}
|
||||
var isLex = msg.$ && msg.$['#'];
|
||||
var method = isLex ? 'get' : 'put';
|
||||
|
||||
server.put(req, res);
|
||||
driver[method](req, res);
|
||||
});
|
||||
|
||||
server.get = function (req, cb) {
|
||||
var body = req.body;
|
||||
var lex = body.$;
|
||||
var graph = gun._.root._.graph;
|
||||
var node;
|
||||
|
||||
// Don't reply to data we don't have it in memory.
|
||||
// TODO: Add localStorage?
|
||||
if (!(node = graph[lex['#']])) {
|
||||
return;
|
||||
}
|
||||
|
||||
cb({
|
||||
body: {
|
||||
'#': server.msg(),
|
||||
'@': body['#'],
|
||||
'$': node,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
server.put = function (req, cb) {
|
||||
var body = req.body, graph = body.$;
|
||||
var __ = gun._.root._;
|
||||
|
||||
// filter out what we don't have in memory.
|
||||
if (!(graph = Gun.obj.map(graph, function (node, soul, map) {
|
||||
if (!__.path[soul]) {
|
||||
return;
|
||||
}
|
||||
map(soul, node);
|
||||
}))) {
|
||||
return;
|
||||
}
|
||||
gun.on('out', {
|
||||
gun: gun,
|
||||
opt: {
|
||||
websocket: false,
|
||||
},
|
||||
put: graph,
|
||||
'#': Gun.on.ask(function (ack, ev) {
|
||||
if (!ack) {
|
||||
return undefined;
|
||||
}
|
||||
ev.off();
|
||||
return cb({
|
||||
body: {
|
||||
'#': server.msg(),
|
||||
'@': body['#'],
|
||||
'$': ack,
|
||||
'!': ack.err,
|
||||
},
|
||||
});
|
||||
}),
|
||||
});
|
||||
};
|
||||
|
||||
server.msg = function (id) {
|
||||
if (!id) {
|
||||
id = Gun.text.random(9);
|
||||
server.msg.debounce[id] = Gun.time.is();
|
||||
return id;
|
||||
}
|
||||
|
||||
clearTimeout(server.msg.clear);
|
||||
server.msg.clear = setTimeout(function () {
|
||||
var now = Gun.time.is();
|
||||
Gun.obj.map(server.msg.debounce, function (time, id) {
|
||||
if ((now - time) < (1000 * 60 * 5)) {
|
||||
return;
|
||||
}
|
||||
|
||||
Gun.obj.del(server.msg.debounce, id);
|
||||
});
|
||||
}, 500);
|
||||
|
||||
if (server.msg.debounce[id]) {
|
||||
server.msg.debounce[id] = Gun.time.is();
|
||||
return id;
|
||||
}
|
||||
|
||||
server.msg.debounce[id] = Gun.time.is();
|
||||
return undefined;
|
||||
};
|
||||
|
||||
server.msg.debounce = server.msg.debounce || {};
|
||||
});
|
||||
|
90
lib/wsp/duplicate.js
Normal file
90
lib/wsp/duplicate.js
Normal file
@ -0,0 +1,90 @@
|
||||
'use strict';
|
||||
|
||||
var Gun = require('../../gun');
|
||||
|
||||
var cache = {};
|
||||
var timeout = null;
|
||||
|
||||
/**
|
||||
* Remove all entries in the cache older than 5 minutes.
|
||||
* Reschedules itself to run again when the oldest item
|
||||
* might be too old.
|
||||
* @return {undefined}
|
||||
*/
|
||||
function gc () {
|
||||
var now = Date.now();
|
||||
var oldest = now;
|
||||
var maxAge = 5 * 60 * 1000;
|
||||
|
||||
Gun.obj.map(cache, function (time, id) {
|
||||
oldest = Math.min(now, time);
|
||||
|
||||
if ((now - time) < maxAge) {
|
||||
return;
|
||||
}
|
||||
|
||||
delete cache[id];
|
||||
});
|
||||
|
||||
var done = Gun.obj.empty(cache);
|
||||
|
||||
// Disengage GC.
|
||||
if (done) {
|
||||
timeout = null;
|
||||
return;
|
||||
}
|
||||
|
||||
// Just how old?
|
||||
var elapsed = now - oldest;
|
||||
|
||||
// How long before it's too old?
|
||||
var nextGC = maxAge - elapsed;
|
||||
|
||||
// Schedule the next GC event.
|
||||
timeout = setTimeout(gc, nextGC);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks a memory-efficient cache to see if a string has been seen before.
|
||||
* @param {String} id - A string to keep track of.
|
||||
* @return {Boolean} - Whether it's been seen recently.
|
||||
*/
|
||||
function duplicate (id) {
|
||||
|
||||
// Have we seen this ID recently?
|
||||
var existing = cache.hasOwnProperty(id);
|
||||
|
||||
// Add it to the cache.
|
||||
duplicate.track(id);
|
||||
|
||||
return existing;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts tracking an ID as a possible future duplicate.
|
||||
* @param {String} id - The ID to track.
|
||||
* @return {String} - The same ID.
|
||||
*/
|
||||
duplicate.track = function (id) {
|
||||
cache[id] = Date.now();
|
||||
|
||||
// Engage GC.
|
||||
if (!timeout) {
|
||||
gc();
|
||||
}
|
||||
|
||||
return id;
|
||||
};
|
||||
|
||||
/**
|
||||
* Generate a new ID and start tracking it.
|
||||
* @param {Number} [chars] - The number of characters to use.
|
||||
* @return {String} - The newly created ID.
|
||||
*/
|
||||
duplicate.track.newID = function (chars) {
|
||||
var id = Gun.text.random(chars);
|
||||
|
||||
return duplicate.track(id);
|
||||
};
|
||||
|
||||
module.exports = duplicate;
|
Loading…
x
Reference in New Issue
Block a user