fix synchronous runs, don't skip wait, fire first, aggregate but error if need fire,

This commit is contained in:
Mark Nadal 2021-08-23 00:49:22 -07:00
parent 6336cc66ae
commit dac9e8d059
2 changed files with 29 additions and 20 deletions

47
gun.js
View File

@ -63,7 +63,7 @@
}
;(function(){ // max ~1ms or before stack overflow
var u, sT = setTimeout, l = 0, c = 0, sI = (typeof setImmediate !== ''+u && setImmediate) || sT; // queueMicrotask faster but blocks UI
sT.poll = sT.poll || function(f){
sT.poll = sT.poll || function(f){ //f(); return; // for testing
if((1 >= (+new Date - l)) && c++ < 3333){ f(); return }
sI(function(){ l = +new Date; f() },c=0)
}
@ -222,7 +222,7 @@
}
return true;
}
var id = (as && as['#']) || Math.random().toString(36).slice(2);
var id = (as && as['#']) || random(9);
if(!cb){ return id }
var to = this.on(id, cb, as);
to.err = to.err || setTimeout(function(){ to.off();
@ -230,6 +230,7 @@
}, lack);
return id;
}
var random = String.random || function(){ return Math.random().toString(36).slice(2) }
})(USE, './ask');
;USE(function(module){
@ -273,6 +274,7 @@
return gun;
}
function universe(msg){
//if(!F){ var eve = this; setTimeout(function(){ universe.call(eve, msg,1) },Math.random() * 100);return; } // ADD F TO PARAMS!
if(!msg){ return }
if(msg.out === universe){ this.to.next(msg); return }
var eve = this, as = eve.as, at = as.at || as, gun = at.$, dup = at.dup, tmp, DBG = msg.DBG;
@ -373,8 +375,8 @@
if((tmp = ctx.msg) && (tmp = tmp.put) && (tmp = tmp[soul])){ state_ify(tmp, key, state, val, soul) } // necessary! or else out messages do not get SEA transforms.
graph[soul] = state_ify(graph[soul], key, state, val, soul);
if(tmp = (root.next||'')[soul]){ tmp.on('in', msg) }
eve.to.next(msg);
fire(ctx);
eve.to.next(msg);
}
function fire(ctx, msg){ var root;
if(ctx.stop){ return }
@ -390,19 +392,20 @@
ctx.root.on('out', msg);
}
function ack(msg){ // aggregate ACKs.
var id = msg['@'] || '', root = (msg.$._||'').root, tmp;
// TODO: check for the sharded message err and transfer it onto the original batch?
if(!(tmp = id._)){ /*console.log("TODO: handle ack id.");*/ return }
tmp.acks = (tmp.acks||0) + 1;
if(tmp.err = msg.err){
msg['@'] = tmp['#'];
--tmp.stun;
}
if(0 == tmp.stun && tmp.acks == tmp.all){ // TODO: if ack is synchronous this may not work?
root && root.on('in', {'@': tmp['#'], err: msg.err, ok: msg.err? u : 'shard'});
msg.err && fire(tmp);
return;
var id = msg['@'] || '', ctx;
if(!(ctx = id._)){ return }
ctx.acks = (ctx.acks||0) + 1;
if(ctx.err = msg.err){
msg['@'] = ctx['#'];
fire(ctx); // TODO: BUG? How it skips/stops propagation of msg if any 1 item is error, this would assume a whole batch/resync has same malicious intent.
}
if(!ctx.stop && !ctx.crack){ ctx.crack = ctx.match && ctx.match.push(function(){back(ctx)}) } // handle synchronous acks
back(ctx);
}
function back(ctx){
if(!ctx || !ctx.root){ return }
if(ctx.stun || ctx.acks !== ctx.all){ return }
ctx.root.on('in', {'@': ctx['#'], err: ctx.err, ok: ctx.err? u : {'':1}});
}
var ERR = "Error: Invalid graph!";
@ -877,6 +880,7 @@
}; wait = {}; // end quick hack.
}
// call:
if(root.pass){ if(root.pass[id+at.id]){ return } root.pass[id+at.id] = 1 }
if(opt.on){ opt.ok.call(at.$, data, at.get, msg, eve || any); return } // TODO: Also consider breaking `this` since a lot of people do `=>` these days and `.call(` has slower performance.
if(opt.v2020){ opt.ok(msg, eve || any); return }
Object.keys(msg).forEach(function(k){ tmp[k] = msg[k] }, tmp = {}); msg = tmp; msg.put = data; // 2019 COMPATIBILITY! TODO: GET RID OF THIS!
@ -983,6 +987,7 @@
as.todo = [{it: as.data, ref: as.$}];
as.turn = as.turn || turn;
as.ran = as.ran || ran;
//var path = []; as.via.back(at => { at.get && path.push(at.get.slice(0,9)) }); path = path.reverse().join('.');
// TODO: Perf! We only need to stun chains that are being modified, not necessarily written to.
(function walk(){
var to = as.todo, at = to.pop(), d = at.it, cid = at.ref && at.ref._.id, v, k, cat, tmp, g;
@ -1007,12 +1012,13 @@
(as.wait || (as.wait = {}))[id] = '';
tmp = (cat.ref = (g? d : k? at.ref.get(k) : at.ref))._;
(tmp = (d && (d._||'')['#']) || tmp.soul || tmp.link)? resolve({soul: tmp}) : cat.ref.get(resolve, {run: as.run, /*hatch: 0,*/ v2020:1, out:{get:{'.':' '}}}); // TODO: BUG! This should be resolve ONLY soul to prevent full data from being loaded. // Fixed now?
//setTimeout(function(){ if(F){ return } console.log("I HAVE NOT BEEN CALLED!", path, id, cat.ref._.id, k) }, 9000); var F; // MAKE SURE TO ADD F = 1 below!
function resolve(msg, eve){
if(cat.link['#']){ return as.ran(as) }
var end = cat.link['#'];
if(eve){ eve.off(); eve.rid(msg) } // TODO: Too early! Check all peers ack not found.
// TODO: BUG maybe? Make sure this does not pick up a link change wipe, that it uses the changign link instead.
var soul = msg.soul || (tmp = (msg.$$||msg.$)._||'').soul || tmp.link || ((tmp = tmp.put||'')._||'')['#'] || tmp['#'] || (((tmp = msg.put||'') && msg.$$)? tmp['#'] : (tmp['=']||tmp[':']||'')['#']);
stun(as, msg.$);
var soul = end || msg.soul || (tmp = (msg.$$||msg.$)._||'').soul || tmp.link || ((tmp = tmp.put||'')._||'')['#'] || tmp['#'] || (((tmp = msg.put||'') && msg.$$)? tmp['#'] : (tmp['=']||tmp[':']||'')['#']);
!end && stun(as, msg.$);
if(!soul && !at.link['#']){ // check soul link above us
(at.wait || (at.wait = [])).push(function(){ resolve(msg, eve) }) // wait
return;
@ -1074,7 +1080,6 @@
(tmp = function(){ // this is not official yet, but quick solution to hack in for now.
if(!stun){ return }
ran.end(stun, root);
//console.log("PUT HATCH END", as.run, Object.keys(stun.add||''));
setTimeout.each(Object.keys(stun = stun.add||''), function(cb){ if(cb = stun[cb]){cb()} }); // resume the stunned reads // Any perf reasons to CPU schedule this .keys( ?
}).hatch = tmp; // this is not official yet ^
//console.log(1, "PUT", as.run, as.graph);
@ -1641,6 +1646,9 @@
var Gun = USE('../index');
Gun.Mesh = USE('./mesh');
// TODO: resync upon reconnect online/offline
//window.ononline = window.onoffline = function(){ console.log('online?', navigator.onLine) }
Gun.on('opt', function(root){
this.to.next(root);
if(root.once){ return }
@ -1731,6 +1739,7 @@
disk[soul] = Gun.state.ify(disk[soul], key, put['>'], put[':'], soul); // merge into disk object
if(!msg['@']){ acks.push(msg['#']) } // then ack any non-ack write. // TODO: use batch id.
if(to){ return }
//flush();return;
to = setTimeout(flush, opt.wait || 1); // that gets saved as a whole to disk every 1ms
});
function flush(){

View File

@ -71,7 +71,7 @@ function Store(opt){
//console.log("RS3 GOT <----", err, file, cbs.length, ((ack||{}).Body||'').length);//.toString().slice(0,20));
delete c.g[file];//Gun.obj.del(c.g, file);
var data, data = (ack||'').Body;
console.log(1, process.memoryUsage().heapUsed);
//console.log(1, process.memoryUsage().heapUsed);
var i = 0, cba; while(cba = cbs[i++]){ cba && cba(err, data) }//Gun.obj.map(cbs, cbe);
});
};