restructure acks

This commit is contained in:
Mark Nadal 2020-08-30 07:21:56 -07:00
parent 14688298c9
commit cdca331555
2 changed files with 108 additions and 55 deletions

133
gun.js
View File

@ -41,7 +41,7 @@
}
String.hash = function(s, c){ // via SO
if(typeof s !== 'string'){ return }
c = c || 0;
c = c || 0; // CPU schedule hashing by
if(!s.length){ return c }
for(var i=0,l=s.length,n; i<l; ++i){
n = s.charCodeAt(i);
@ -62,27 +62,34 @@
return l;
}
;(function(){ // max ~1ms or before stack overflow
var u, l = 0, c = 0, sI = (typeof setImmediate !== ''+u && setImmediate) || setTimeout;
setTimeout.poll = function(f){
var u, sT = setTimeout, l = 0, c = 0, sI = (typeof setImmediate !== ''+u && setImmediate) || sT;
sT.poll = sT.poll || function(f){
if((1 >= (+new Date - l)) && c++ < 3333){ f(); return }
sI(function(){ l = +new Date; f() },c=0)
}
}());
;(function(){ // Too many polls block, this "threads" them in turns over a single thread in time.
var p = setTimeout.poll, s = [], i = 0, fn;
setTimeout.turn = function(f){ 1 == s.push(f) && p(turn) };
function turn(){
if(fn = s[i++]){ fn() }
var sT = setTimeout, t = sT.turn = sT.turn || function(f){ 1 == s.push(f) && p(T) }
, s = t.s = [], p = sT.poll, i = 0, f, T = function(){
if(f = s[i++]){ f() }
if(i == s.length || 99 == i){
s = s.slice(i);
s = t.s = s.slice(i);
i = 0;
}
if(s.length){ p(turn) }
if(s.length){ p(T) }
}
}());
// JSON should not block CPU, this should be a standard. But browsers do not have it, so we are stuck polyfilling with the blocking version.
//JSON.parseAsync = JSON.parseAsync || function(t,cb,r){ var u; try{ cb(u, JSON.parse(t,r)) }catch(e){ cb(e) } }
//JSON.stringifyAsync = JSON.stringifyAsync || function(v,cb,r,s){ var u; try{ cb(u, JSON.stringify(v,r,s)) }catch(e){ cb(e) } }
;(function(){
var u, sT = setTimeout, T = sT.turn;
sT.each = sT.each || function(l,f,e,S){ S = S || 9; (function t(s,L,r){
if(L = (s = (l||[]).splice(0,S)).length){
for(var i = 0; i < L; i++){
if(u !== (r = f(s[i]))){ break }
}
if(u === r){ T(t); return }
} e && e(r);
}())}
}());
})(USE, './shim');
;USE(function(module){
@ -272,7 +279,7 @@
if(dup.check(tmp)){ return } dup.track(tmp);
tmp = msg._; msg._ = ('function' == typeof tmp)? tmp : function(){};
(msg.$ && (msg.$ === (msg.$._||'').$)) || (msg.$ = gun);
console.only(5, "UNI", msg);
if(msg['@'] && !msg.put){ ack(msg) }
if(!at.ask(msg['@'], msg)){ // is this machine listening for an ack?
DBG && (DBG.u = +new Date);
if(msg.get){ Gun.on._get(msg, gun) }
@ -292,11 +299,13 @@
root.on('out', msg);
return;
}
var put = msg.put, id = msg['#'];
var put = msg.put;
var DBG = ctx.DBG = msg.DBG;
if(put['#'] && put['.']){ root.on('put', msg); return } // TODO: BUG! This needs to call HAM instead.
DBG && (DBG.p = S);
ctx['#'] = msg['#'];
ctx.msg = msg;
ctx.all = 0;
ctx.stun = 1;
var nl = Object.keys(put).sort(); // TODO: This is unbounded operation, large graphs will be slower. Write our own CPU scheduled sort? Or somehow do it in below?
var ni = 0, nj, kl, soul, node, states, err, tmp;
@ -323,6 +332,7 @@
var val = node[key], state = states[key];
if(u === state){ err = ERR+cut(key)+"on"+cut(soul)+"no state."; break }
if(!valid(val)){ err = ERR+cut(key)+"on"+cut(soul)+"bad "+(typeof val)+cut(val); break }
ctx.all++; //ctx.ack[soul+key] = '';
ham(val, key, soul, state, msg);
}
if((kl = kl.slice(i)).length){ turn(pop); return }
@ -340,7 +350,7 @@
var vertex = graph[soul] || empty, was = state_is(vertex, key, 1), known = vertex[key];
var now = State(),u;
if(state > now){ setTo; return }
if(state < was){ old; if(!ctx.miss){ return } } // but some chains have a cache miss that need to re-fire. // TODO: Improve in future.
if(state < was){ old; if(!ctx.miss){ return } } // but some chains have a cache miss that need to re-fire. // TODO: Improve in future. // for AXE this would reduce rebroadcast, but GUN does it on message forwarding.
if(state === was && (val === known || L(val) <= L(known))){ return } // same
/*if(!is.incoming){
if(is.defer){
@ -355,14 +365,21 @@
}*/
//(lot = ctx.lot||'').s++; lot.more++;
//(ctx.stun || (ctx.stun = {}))[soul+key] = 1;
ctx.stun++; // TODO: 'forget' feature in SEA tied to this, bad approach, but hacked in for now. Any changes here must update there.
//ctx.stun++; // TODO: 'forget' feature in SEA tied to this, bad approach, but hacked in for now. Any changes here must update there.
var aid = msg['#']+ctx.stun++, id = {toString: function(){ return aid }, _: ctx}; // this *trick* makes it compatible between old & new versions.
var DBG = ctx.DBG; DBG && (DBG.ph = DBG.ph || +new Date);
root.on('put', {'#': msg['#'], '@': msg['@'], put: {'#': soul, '.': key, ':': val, '>': state}, _: ctx});
root.on('put', {'#': id, '@': msg['@'], put: {'#': soul, '.': key, ':': val, '>': state}, _: ctx});
}
function map(msg){
var DBG; if(DBG = (msg._||'').DBG){ DBG.pa = +new Date; DBG.pm = DBG.pm || +new Date}
var eve = this, root = eve.as, graph = root.graph, ctx = msg._, put = msg.put, soul = put['#'], key = put['.'], val = put[':'], state = put['>'], id = msg['#'], tmp;
graph[soul] = state_ify(graph[soul], key, state, val, soul); // TODO: Only put in graph if subscribed? Relays vs Browsers?
/* // manhattan:
var $ = root.$.get(soul), _ = $._;
_.put = state_ify(_.put, key, state, (u !== (tmp = put['=']))? tmp : val, soul);
console.log(soul, _.put);
eve.to.next(msg);
return; // eom */
chain(ctx, soul, key, (u !== (tmp = put['=']))? tmp : val, state); // TODO: This should NOT be how the API works, this should be done at an extension layer, but hacky solution to migrate with old code for now.
if((tmp = ctx.out) && (tmp = tmp.put)){
tmp[soul] = state_ify(tmp[soul], key, state, val, soul); // TODO: Hacky, fix & come back later, for actual pushing messages.
@ -401,6 +418,16 @@
var ERR = "Error: Invalid graph!";
var cut = function(s){ return " '"+(''+s).slice(0,9)+"...' " }
var L = JSON.stringify, MD = 2147483647, State = Gun.state;
function ack(msg){ // aggregate ACKs.
var id = msg['@'] || '', tmp;
if(!(tmp = id._)){ console.log("TODO: handle ack id."); return }
tmp.acks = (tmp.acks||0) + 1;
if(0 == tmp.stun && tmp.acks == tmp.all){ // TODO: if ack is synchronous this may not work?
console.log("@@@@@@ DONE @@@@@@", id);
((msg.$||'')._||'').root.on('in', {'@': tmp['#'], err: msg.err, ok: 'shard'});
}
}
}());
;(function(){
@ -421,6 +448,7 @@
} else {
//node = node;//Gun.window? Gun.obj.copy(node) : node; // HNPERF: If !browser bump Performance? Is this too dangerous to reference root graph? Copy / shallow copy too expensive for big nodes. Gun.obj.to(node); // 1 layer deep copy // Gun.obj.copy(node); // too slow on big nodes
var ack = msg['#'], id = text_rand(9), keys = Object.keys(node||{}), soul = node._['#'];
// PERF: Consider commenting this out to force disk-only reads for perf testing?
(function got(){
var i = 0, k, put = {};
while(i < 9 && (k = keys[i++])){
@ -430,14 +458,15 @@
(tmp = {})[soul] = put; put = tmp;
var faith = function(){}; faith.ram = faith.faith = true; // HNPERF: We're testing performance improvement by skipping going through security again, but this should be audited.
tmp = keys.length;
DBG && (DBG.ga = +new Date);
root.on('in', {'@': ack, '#': id, put: put, '%': (tmp? (id = text_rand(9)) : u), ram: 1, $: gun, _: faith});
if(!tmp){ return }
setTimeout.turn(got);
}());
//root.on('get', msg);
root.on('get', msg); // send GET to storage adapters.
return;
}
//console.log("GOT", Object.keys(node).length);
/*//console.log("GOT", Object.keys(node).length);
(tmp = {})[node._['#']] = node; node = tmp;
tmp = (at||empty).ack;
var faith = function(){}; faith.ram = faith.faith = true; // HNPERF: We're testing performance improvement by skipping going through security again, but this should be audited.
@ -454,6 +483,7 @@
//if(0 < tmp){ return }
root.on('get', msg);
DBG && (DBG.gd = +new Date);
*/
}
}());
@ -633,6 +663,7 @@
function input(msg){
var eve = this, cat = eve.as, root = cat.root, gun = msg.$, at = (gun||empty)._ || empty, change = msg.put, rel, tmp;
//console.log("IN:", cat.get, cat.has, change, msg);//return;
if(cat.get && msg.get !== cat.get){
tmp = {get: cat.get}; Object.keys(msg).forEach(function(k){ tmp[k] = msg[k] });
}
@ -643,14 +674,6 @@
//cat.ack = cat.ack || at.ack;
}
}
console.only(12, "in:", at.get, change);
console.only(11, "in:", at.get, change);
console.only(10, "in:", at.get, change);
console.only(9, "in:", at.get, change);
console.only(8, "in:", at.get, change);
console.only(7, "in:", at.get, change);
console.only(4, "in:", at.get, change);
console.only(3, "in:", at.get, change);
if(u === change){
tmp = at.put;
eve.to.next(msg);
@ -668,7 +691,6 @@
eve.to.next(msg);
echo(cat, msg, eve);
if(cat.next){ Object.keys(change).forEach(map, {msg: msg, cat: cat}) }
console.only(6, "in:", at.get, change);
return;
}
//if(!(rel = Gun.val.link.is(change))){
@ -708,7 +730,6 @@
tmp.is = tmp.is || at.put;
tmp[cat.id] = at.put || true;
//if(root.stop){
console.only(13, "in:", at.get, change);
eve.to.next(msg)
//}
relate(cat, msg, at, rel);
@ -965,7 +986,6 @@
msg = o;
}
}
console.only(14, 'use', data, );
if((tmp = root.mum) && at.id){ // TODO: can we delete mum entirely now?
var id = at.id + (eve.id || (eve.id = String.random(9)));
if(tmp[id]){ return }
@ -1028,7 +1048,7 @@
to.push(cat);
// ---------------
var id = seen.length;
(as.wait || (as.wait = {}))[id] = 1;
(as.wait || (as.wait = {}))[id] = '';
cat.ref = (k? at.ref.get(k) : at.ref);
cat.ref.get(function(soul, tmp, msg){
if(!soul){
@ -1062,12 +1082,7 @@
if(!as.ack){ return }
as.ack(ack, this);
}, as.opt), acks = 0;
//var tmp = cat.root.now; obj.del(cat.root, 'now');
//var mum = cat.root.mum; cat.root.mum = {};
(as.via._).on('out', {put: as.out = as.graph, opt: as.opt, '#': ask});
//cat.root.mum = mum? obj.to(mum, cat.root.mum) : mum;
//cat.root.now = tmp;
//as.via._.on('res', {}); delete as.via._.tag.res; // emitting causes mem leak?
}
function get(as){
@ -1142,6 +1157,10 @@
opt.ok.call(gun, data, msg.get, msg, ev);
}
}
// Rules:
// 1. If cached, should be fast, but not read while write.
// 2. Should not retrigger other listeners, should get triggered even if nothing found.
// 3. If the same callback passed to many different once chains, each should resolve - an unsubscribe from the same callback should not effect the state of the other resolving chains, if you do want to cancel them all early you should mutate the callback itself with a flag & check for it at top of callback
Gun.chain.once = function(cb, opt){
var gun = this, at = gun._, data = at.put;
console.only(1, 'once');
@ -1165,10 +1184,18 @@
}
return gun;
}
/*Gun.chain.once = function(cb, opt){
var gun = this, at = gun._, data = at.put;
at.on('out', {get: {'#':'ASDF'}, '#': at.root.ask(function(msg){
})});
return gun;
}*/
function val(msg, eve, to){
if(!msg.$){ eve.off(); return }
var opt = this.as, cat = opt.at, gun = msg.$, at = gun._, data = at.put || msg.put, link, tmp;
console.log(cat.get, 'ONCE:', msg);
if(tmp = msg.$$){
link = tmp = (msg.$$._);
if(u !== link.put){
@ -1181,7 +1208,9 @@
if(!to && u === data && !at.root.opt.super && eve.ack <= (opt.acks || Object.keys(at.root.opt.peers).length)){ return }
if((!to && (u === data || at.soul || at.link || (link && !(0 < link.ack))))
|| (u === data && !at.root.opt.super && (tmp = Object.keys(at.root.opt.peers).length) && (!to && (link||at).ack < tmp))){
console.log("time-ooooo");
tmp = (eve.wait = {})[at.id] = setTimeout(function(){
console.log("TIME!!!!");
val.call({as:opt}, msg, eve, tmp || 1);
}, opt.wait || 99);
return;
@ -1678,11 +1707,11 @@
}
Gun.on('create', function lg(root){
this.to.next(root);
var opt = root.opt, graph = root.graph, acks = {}, disk, to;
var opt = root.opt, graph = root.graph, acks = [], disk, to;
if(false === opt.localStorage){ return }
opt.prefix = opt.file || 'gun/';
try{ disk = lg[opt.prefix] = lg[opt.prefix] || JSON.parse(store.getItem(opt.prefix));
}catch(e){ disk = lg[opt.prefix] = {} }
try{ disk = lg[opt.prefix] = lg[opt.prefix] || JSON.parse(store.getItem(opt.prefix)) || {};
}catch(e){ disk = lg[opt.prefix] = {}; }
root.on('get', function(msg){
this.to.next(msg);
@ -1693,29 +1722,29 @@
data = Gun.state.ify({}, tmp, Gun.state.is(data, tmp), data[tmp], soul);
}
if(data){ (tmp = {})[soul] = data } // back into a graph.
console.only(6, 'lS get', tmp, JSON.parse(JSON.stringify(disk)));
console.only(2, 'lS get', tmp, JSON.parse(JSON.stringify(disk)));
console.only(2, 'lS got', tmp);
root.on('in', {'@': msg['#'], put: tmp, lS:1});// || root.$});
});
root.on('put', function(msg){
this.to.next(msg);
var put = msg.put, soul = put['#'], key = put['.'], val = put[':'], state = put['>'], tmp;
//console.log("...lSput:::", soul, key, val);
disk[soul] = Gun.state.ify(disk[soul], key, state, val, soul);
if(!msg['@']){ acks[msg['#']] = 1; } // only ack non-acks.
if(to){ return } // already batched
to = setTimeout(flush, opt.wait || 1);
this.to.next(msg); // remember to call next middleware adapter
var put = msg.put, soul = put['#'], key = put['.'], tmp; // pull data off wire envelope
disk[soul] = Gun.state.ify(disk[soul], key, put['>'], put[':'], soul); // merge into disk object
if(!msg['@']){ acks.push(msg['#']) } // then ack any non-ack write. // TODO: use batch id.
if(to){ return }
to = setTimeout(flush, opt.wait || 1); // that gets saved as a whole to disk every 1ms
});
function flush(){
var err, ack = acks; clearTimeout(to); to = false; acks = {};
var err, ack = acks; clearTimeout(to); to = false; acks = [];
try{store.setItem(opt.prefix, JSON.stringify(disk));
}catch(e){
Gun.log(err = (e || "localStorage failure") + " Consider using GUN's IndexedDB plugin for RAD for more storage space, https://gun.eco/docs/RAD#install");
Gun.log((err = (e || "localStorage failure")) + " Consider using GUN's IndexedDB plugin for RAD for more storage space, https://gun.eco/docs/RAD#install");
root.on('localStorage:error', {err: err, get: opt.prefix, put: disk});
}
if(!err && !Object.empty(opt.peers)){ return } // only ack if there are no peers.
Object.keys(ack).forEach(function(id){ root.on('in', {'@': id, err: err, ok: 0}) }); // localStorage isn't reliable, so make its `ok` code be a low number.
if(!err && !Object.empty(opt.peers)){ return } // only ack if there are no peers. // Switch this to probabilistic mode
setTimeout.each(ack, function(id){
root.on('in', {'@': id, err: err, ok: 0}); // localStorage isn't reliable, so make its `ok` code be a low number.
});
}
});

View File

@ -2657,22 +2657,46 @@ describe('Gun', function(){
}, 25);
});
/*it('get recursive map', function(done){
var teams = {red: {}, blue: {}};
var alice = {age: 27, name: "Alice"};
var bob = {age: 29, name: "Bob"};
alice.spouse = bob;
bob.spouse = alice;
var carl = {age: 31, name: "Carl"};
teams.blue.alice = alice;
teams.blue.carl = carl;
teams.red.bob = bob;
teams.red.carl = carl;
console.only.i=1;console.log("===============");
Gun.statedisk(teams, 'gerema', function(ack){
console.log("VVVVVVVVVVVVVVVVVVV", ack);
setTimeout(function(){
gun.get('gerema').map().map().get('spouse').on(function(data){
console.only(2,'hi', data);
console.only(1,'hi', data);
console.log("*****************", data);return;
});
},500);});
});*/
it.only('get node after recursive field', function(done){
var bob = {age: 29, name: "Bob!"};
var cat = {name: "Fluffy", species: "kitty"};
var user = {bob: bob};
bob.pet = cat;
cat.slave = bob;
Gun.statedisk(user, 'node/circle', function(){
Gun.statedisk(user, 'nodecircle', function(){
console.only.i=1;console.log("===============");
gun.get('node/circle').get('bob').get('pet').get('slave').once(function(data){
gun.get('nodecircle').get('bob').get('pet').get('slave').once(function(data){
//clearTimeout(done.to);
//setTimeout(function(){
console.log("*****************", data);return;
expect(data.age).to.be(29);
expect(data.name).to.be("Bob!");
expect(Gun.val.link.is(data.pet)).to.ok();
expect('string' == typeof Gun.valid(data.pet)).to.ok();
done();
//},300);
});