AXE - using a garbage collector to remove subscribes from superpeer and resubscribe from peers (browser).

This commit is contained in:
Adriano Rogowski 2018-11-30 21:50:40 -02:00
parent 1e8bece479
commit e2a7dba29f
2 changed files with 82 additions and 17 deletions

83
axe.js
View File

@ -63,26 +63,62 @@
console.log("axe", at.opt);
if(at.opt.super){
function verify(msg, send, at) {
var peers = Object.keys(p), puts = Object.keys(msg.put), i, j, peer;
var peers = Object.values(p), puts = Object.keys(msg.put), i, j, peer;
var soul = puts[0]; /// TODO: verify all souls in puts. Copy the msg only with subscribed souls?
for (i=0; i < peers.length; ++i) {
peer = p[peers[i]];
//if (peer.url) {console.log('AXE do not reject superpeers'); send(msg, peer); continue;} /// always send to superpeers?
if (!peer.id) {console.log('AXE peer without id: ', peer); continue;}
if (!Gun.subscribe[soul] || !Gun.subscribe[soul][peer.id]) { console.log('AXE SAY reject msg to peer: %s, soul: %s', peer.id, soul); continue; }
var subs = Gun.subscribe[soul];
if (!subs) { return; }
for (i=0; i < subs.length; ++i) {
peer = subs[i];
send(msg, peer);
}
}
AXE.say = function(msg, send, at) {
if (!msg.put) { send(msg); return; }
console.log('AXE HOOK!! ', msg);
//console.log('AXE HOOK!! ', msg);
verify(msg, send, at);
};
/// TODO: remove peer from all Gun.subscribe. On `mesh.bye` event?
}
if(at.opt.super){
at.on('in', USE('./lib/super', 1), at);
var timerGC, queueGC;
at.on('bye', function(peer) {
console.log('Garbage collector triggered by peer.id: ', peer.id);
if (timerGC) { queueGC = true; return; }
timerGC = throttleGC();
});
var throttleGC = function() {
return setTimeout(function() {
GC(Gun.subscribe, p);
clearTimeout(timerGC);
timerGC = null;
if (queueGC) {
queueGC = false;
timerGC = throttleGC();
}
}, 1);
};
} else {
var connections = 0;
at.on('hi', function(opt) {
this.to.next(opt);
console.log('AXE PEER [HI]', new Date(), opt.pid);
connections++;
/// The first connection don't need to resubscribe the nodes.
if (connections === 1) { return; }
/// TODO: resync all nodes in gun/gap
/// Resubscribe all nodes.
setTimeout(function() {
var souls = Object.keys(at.graph);
for (var i=0; i < souls.length; ++i) {
//at.gun.get(souls[i]).off();
at.next[souls[i]].ack = 0;
at.gun.get(souls[i]).once(function(){});
}
//location.reload();
}, 500);
}, at);
//at.on('in', input, at);
}
}
@ -91,8 +127,39 @@
function input(msg){
var at = this.as, to = this.to;
console.log('AXE PEER [IN]: ', msg);
this.to.next(msg);
}
/// Garbage collector to remove peers subscriptions when disconnect
// var peers = [1,3,5,7,9];
// function shuffle(array) { var tmp, current, top = array.length; if(top) while(--top) { current = Math.floor(Math.random() * (top + 1)); tmp = array[current]; array[current] = array[top]; array[top] = tmp;} return array;}
// for (var peers=[],i=0;i<7000;++i) peers[i]=i;
// peers = shuffle(peers).slice(2000);
// var subscribes = {soula: [1,2,3,4,5,6,7,8,9,0], soulb: [1,2,3,4,5,6,7,8,9,0], soulc: [1,2,3,4,5,6,7,8,9,0]};
// var subscribes = {}; for (var i=0;i<1000;++i) {subscribes['soul_'+i]= (function() {var a=[]; for(var i=0;i<100;i++){a.push(i)} return a;})()}
function GC(subscribes, peers) {
console.time('AXE GC');
var souls = Object.keys(subscribes), soul, i;
var peers = Object.values(peers);
if (souls.length === 0) {return;}
var removed = {};
for (i=0; i < souls.length; ++i) {
soul = souls[i];
// removed[soul] = 0;
var pidx = subscribes[soul].length;
while (pidx--) {
if (peers.indexOf(subscribes[soul][pidx]) === -1) {
// console.log('REMOVED: Soul: %s, peer: ', soul, pidx, subpeers[pidx]);
subscribes[soul].splice(pidx, 1);
// removed[soul]++;
}
}
if (subscribes[soul].length === 0) { delete subscribes[soul]; }
}
console.timeEnd('AXE GC');
// console.log('[AXE GC] Removed: ', removed);
}
module.exports = AXE;
})(USE, './axe');

View File

@ -1,28 +1,26 @@
;(function(){
var Gun = (typeof window !== "undefined")? window.Gun : require('../gun');
var Rad = (Gun.window||{}).Radix || require('./radix');
/// Store the subscribes
Gun.subscribe = {}; /// TODO: use Rad instead of plain object?
function input(msg){
var at = this.as, to = this.to, peer = (msg.mesh||empty).via;
var get = msg.get, soul, key;
if(!peer || !get){ return to.next(msg) }
console.log("super", msg);
// console.log("super", msg);
if(soul = get['#']){
if(key = get['.']){
} else {
}
subscribe(soul, peer, msg);
Gun.subscribe[soul] = Gun.subscribe[soul] || [];
if (Gun.subscribe[soul].indexOf(peer) === -1) {
Gun.subscribe[soul].push(peer);
}
}
to.next(msg);
}
/// Store the subscribes
Gun.subscribe = {}; /// TODO: use Rad instead of plain object?
function subscribe(soul, peer, msg) {
if (!peer.id) { console.log('super jump peer without id: ', peer, msg); return; } /// TODO: this occurs in first subscription. Use peer reference or peer.wire.id?
Gun.subscribe[soul] = Gun.subscribe[soul] || {};
Gun.subscribe[soul][peer.id] = 1;
}
var empty = {}, u;
if(Gun.window){ return }
try{module.exports = input}catch(e){}