FIX CHAIN EMIT / ON, add more stats

This commit is contained in:
Mark Nadal 2020-02-12 06:35:17 -08:00
parent 82d8192700
commit a3383e5617
3 changed files with 64 additions and 30 deletions

63
gun.js
View File

@ -641,6 +641,7 @@
if(it && (age || opt.age) > (now - it.was)){ return }
delete s[id];
});
console.log("!!! DROP DUP !!!", +new Date - now); // REMEMBER TO DELETE THIS, IT LOGS TO EVERYONE & NOT STATS. NEED TO LOG TO STATS!
dup.to = null;
}
return dup;
@ -729,14 +730,18 @@
if(!(tmp = msg['#'])){ tmp = msg['#'] = text_rand(9) }
if(dup.check(tmp)){ return } dup.track(tmp);
tmp = msg._; msg._ = ('function' == typeof tmp)? tmp : function(){};
if(msg.get){ Gun.on.get(msg, gun) }
;(console.STAT||'').r2s = +new Date;
if(msg.get){ Gun.on._get(msg, gun) }
if(msg.put){ put(msg, gun) }
;(console.STAT||'').r2m = +new Date;
eve.to.next(msg);
msg.out = root2; at.on('out', msg); // EXPERIMENTAL!
;(console.STAT||'').r2e = +new Date;
}
function put(msg, gun){
var ctx = msg._, root = ctx.root = gun._, put = msg.put, id = msg['#'], mid = 1;
if(put['#']){ root.on('put2', msg); return }
var S = +new Date;
var all = ctx.all = function(err, ok){
if(!id){ return }
if(all.err = all.err || err){
@ -745,15 +750,18 @@
}
if(mid){ return }
if(!obj_empty(all.s)){ return } // keep waiting
Gun.log(S, +new Date - S, 'put');
root.on('in', {'@': id, ok: ok || 1});
id = u;
};
var set = root.set || (root.set = {'':1});
var set = ctx.set = {'':1};
;(console.STAT||'').p = +new Date;
all.err = obj_map(put, valid, msg);
ctx.node = ctx.state = u;
mid = msg = ctx = u;
;(console.STAT||'').pe = +new Date;
mid = ctx.node = ctx.state = u;
all(); // if synchronous
fire(root, ''); // if synchronous
fire(ctx, ''); // if synchronous
msg = ctx = u;
} Gun.on.put = put;
function valid(node, soul){
if(!node){ return ERR+cut(soul)+"no node." }
@ -777,7 +785,6 @@
var vertex = graph[soul] || empty, was = state_is(vertex, key, 1), known = vertex[key];
var machine = State(), is = HAM(machine, state, was, val, known), u;
(alls = all.s || (all.s = {}))[id] = 1;
(root.set || (root.set = {}))[id] = 1; // tmp code;
if(!is.incoming){
if(is.defer){
var to = is.defer - machine;
@ -790,22 +797,24 @@
all(u, -1);
return;
}
(ctx.set || (ctx.set = {}))[id] = 1; // tmp code;
root.on('put2', {put: {'#': soul, '.': key, ':': val, '>': state}, ack: function(err, ok){
delete alls[id];
all(err, ok);
}});
}, _: ctx});
}
function map(msg){
var eve = this, root = eve.as, graph = root.graph, put = msg.put, soul = put['#'], key = put['.'], val = put[':'], state = put['>'], id = msg['#'], tmp;
var eve = this, root = eve.as, graph = root.graph, ctx = msg._, put = msg.put, soul = put['#'], key = put['.'], val = put[':'], state = put['>'], id = msg['#'], tmp;
graph[soul] = state_ify(graph[soul], key, state, val, soul);
chain(root, soul, key, (u !== (tmp = put['=']))? tmp : val, state); // TODO: This should NOT be how the API works, this should be done at an extension layer, but hacky solution to migrate with old code for now.
fire(root, soul+key); // ^
chain(ctx, soul, key, (u !== (tmp = put['=']))? tmp : val, state); // TODO: This should NOT be how the API works, this should be done at an extension layer, but hacky solution to migrate with old code for now.
fire(ctx, soul+key); // ^
eve.to.next(msg);
}
function chain(root, soul, key,val, state){ var tmp, put;
function chain(ctx, soul, key,val, state){ var tmp, put;
var root = ctx.root;
(root.opt||'').super && root.$.get(soul); // I think we need super for now, but since we are rewriting, should consider getting rid of it.
if(!root || !(tmp = root.next) || !(tmp = tmp[soul]) || !tmp.$){ return }
(put = root.put || (root.put = {}))[soul] = state_ify(put[soul], key, state, val, soul);
(put = ctx.put || (ctx.put = {}))[soul] = state_ify(put[soul], key, state, val, soul);
tmp.put = state_ify(tmp.put, key, state, val, soul);
return;
tmp.change = state_ify(tmp.change, key, state, val, soul);
@ -819,17 +828,19 @@
},0);
}
function fire(root, id){ var set;
if(set = root.set){ delete set[id] }
function fire(ctx, id){ var set;
if(set = ctx.set){ delete set[id] }
if(!obj_empty(set)){ return }
var stop = {};
var next = root.next||'', put = root.put; root.put = root.set = null;
var root = ctx.root, next = root.next||'', put = ctx.put;
;(console.STAT||'').f = +new Date;
Gun.graph.is(put, function(node,soul){ var tmp;
if(!(tmp = next[soul]) || !tmp.$){ return }
root.stop = stop; // temporary fix till a better solution?
tmp.on('in', {$: tmp.$, get: soul, put: node});
root.stop = null; // temporary fix till a better solution?
})
});
;(console.STAT||'').fe = +new Date;
}
var ERR = "Error: Invalid graph!";
var cut = function(s){ return " '"+(''+s).slice(0,9)+"...' " }
@ -934,8 +945,10 @@
} else {
node = Gun.window? Gun.obj.copy(node) : node; // HNPERF: If !browser bump Performance? Is this too dangerous to reference root graph? Copy / shallow copy too expensive for big nodes. Gun.obj.to(node); // 1 layer deep copy // Gun.obj.copy(node); // too slow on big nodes
}
;(console.STAT||'').gns = Object.keys(node||{}).length;
node = Gun.graph.node(node);
tmp = (at||empty).ack;
;(console.STAT||'').g = +new Date;
var faith = function(){}; faith.ram = faith.faith = true; // HNPERF: We're testing performance improvement by skipping going through security again, but this should be audited.
root.on('in', {
'@': msg['#'],
@ -944,6 +957,7 @@
$: gun,
_: faith
});
;(console.STAT||'').ga = +new Date;
//if(0 < tmp){ return }
root.on('get', msg);
}
@ -2113,10 +2127,12 @@
return;
}
if('{' === tmp || (obj_is(raw) && (msg = raw))){
console.STAT = {h: +new Date};
try{msg = msg || JSON.parse(raw);
console.STAT.hp = +new Date;
}catch(e){return opt.log('DAM JSON parse error', e)}
if(!msg){ return }
if(msg.DBG_s){ opt.log(+new Date - msg.DBG_s, 'to hear', msg['#']) }
if(msg.DBG_s){ opt.log(+new Date - (console.STAT.ps = msg.DBG_s), 'to hear', msg['#']) }
if(!(id = msg['#'])){ id = msg['#'] = Type.text.random(9) }
if(tmp = dup_check(id)){ return }
/*if(!(hash = msg['##']) && u !== msg.put){ hash = msg['##'] = Type.obj.hash(msg.put) }
@ -2134,11 +2150,13 @@
dup_track(id);
return;
}
var S, ST; LOG && (S = +new Date); console.STAT = {};
var S, ST; LOG && (console.STAT.is = S = +new Date); LOG && (console.STAT.msg = msg);
msg.put? root.on('in2', msg) : root.on('in', msg); // come on, pretty epic live upgrade path for tens of millions of users, no? Oh boy, this is a TEMPORARY hack, make sure future versions are stable. // REMEMBER!!! When you "finish" this, check all adapters, especially RAD, SEA, AXE, UDP & others!!!
dup_track(id).via = peer;
mesh.leap = null; // warning! mesh.leap could be buggy.
if(LOG && !msg.nts && (ST = +new Date - S) > 9){ opt.log(S, ST, 'msg', msg['#'], JSON.stringify(console.STAT)) }
//if(LOG && !msg.nts && (ST = (console.STAT.hd = +new Date) - S) > 9){ opt.log(S, ST, 'msg', msg['#'], JSON.stringify(console.STAT)) }
if(LOG && !msg.nts && (ST = (console.STAT.hd = +new Date) - S) > 9){ opt.log(S, ST, 'msg', msg['#']); if(ST > 500){ try{ require('./lib/email').send({text: ""+ST+"ms "+JSON.stringify(msg)+" | "+JSON.stringify(console.STAT), from: "mark@gun.eco", to: "mark@gun.eco", subject: "GUN MSG"}, noop); }catch(e){} } } // this is ONLY turned on if ENV CONFIGS have email/password to send out from.
return;
}
}
@ -2158,6 +2176,7 @@
var meta = msg._||(msg._=function(){});
if(!(id = msg['#'])){ id = msg['#'] = Type.text.random(9) }
//if(!(hash = msg['##']) && u !== msg.put){ hash = msg['##'] = Type.obj.hash(msg.put) }
;(console.STAT||'').s = +new Date;
if(!(raw = meta.raw)){
raw = mesh.raw(msg);
/*if(hash && (tmp = msg['@'])){
@ -2168,6 +2187,7 @@
}
}*/
}
;(console.STAT||'').sr = +new Date;
//LOG && opt.log(S, +new Date - S, 'say prep');
dup_track(id);//.it = it(msg); // track for 9 seconds, default. Earth<->Mars would need more!
if(!peer && (tmp = msg['@'])){ peer = ((tmp = dup.s[tmp]) && (tmp.via || ((tmp = tmp.it) && (tmp = tmp._) && tmp.via))) || mesh.leap } // warning! mesh.leap could be buggy!
@ -2179,6 +2199,7 @@
if(!peer || !peer.id){ message = msg;
if(!Type.obj.is(peer || opt.peers)){ return false }
//var S; LOG && (S = +new Date);
;(console.STAT||'').pn = Object.keys(peer || opt.peers || {}).length;
Type.obj.map(peer || opt.peers, each); // in case peer is a peer list.
//LOG && opt.log(S, +new Date - S, 'say loop');
return;
@ -2200,7 +2221,7 @@
peer.batch = '['; // TODO: Prevent double JSON!
var S, ST; LOG && (S = +new Date);
setTimeout(function(){
LOG && (ST = +new Date - S) > 9 && opt.log(S, ST, '1ms TO');
LOG && (ST = +new Date - S) > 9 && opt.log(S, ST, '0ms TO', peer.id);
flush(peer);
}, opt.gap);
send(raw, peer);
@ -2231,7 +2252,7 @@
if(wire.send){
wire.send(raw);
}
LOG && (ST = +new Date - S) > 9 && opt.log(S, ST, 'wire send', raw.length);
LOG && ((console.STAT||'').ps = ST = +new Date - S) > 9 && opt.log(S, ST, 'wire send', raw.length);
mesh.say.d += raw.length||0; ++mesh.say.c; // STATS!
}catch(e){
(peer.queue = peer.queue || []).push(raw);

View File

@ -84,9 +84,9 @@
}
s.ack = function(err, ok){
var q = s.q || [], i = 0, ack;
//var S = +new Date;
var S = +new Date;
while(ack = q[i++]){ ack(err, ok) }
//console.log('acks:', +new Date - S, s.file, q.length);
LOG && opt.log(S, +new Date - S, 'rad acks', q.length, ename(s.file));
}
r.find(key, s.find);
}
@ -120,11 +120,13 @@
});
}
f.split = function(){
var S = +new Date;
f.text = '';
if(!f.count){ f.count = 0;
Radix.map(rad, function count(){ f.count++ }); // TODO: Perf? Any faster way to get total length?
}
f.limit = Math.ceil(f.count/2);
var SC = f.count;
f.count = 0;
f.sub = Radix();
Radix.map(rad, f.slice, {reverse: 1}); // IMPORTANT: DO THIS IN REVERSE, SO LAST HALF OF DATA MOVED TO NEW FILE BEFORE DROPPING FROM CURRENT FILE.
@ -132,6 +134,7 @@
f.hub = Radix();
Radix.map(rad, f.stop);
r.write(rad.file, f.hub, f.both, o);
LOG && opt.log(S, +new Date - S, "rad split", ename(rad.file), SC);
return true;
}
f.slice = function(val, key){
@ -221,12 +224,13 @@
if(!o.more){ cb(g.err, data, o); return }
if(data){ cb(g.err, data, o) }
if(o.parsed >= o.limit){ return }
r.parse(o.next, g.check);
r.parse(o.next, g.check); // put in setTimeout to puff?
}
g.check = function(err, disk, info){
g.get(err, disk, info);
if(disk.check){ return } disk.check = 1;
if(!disk || disk.check){ return } disk.check = 1;
(info || (info = {})).file || (info.file = g.file);
var S = (console.STAT||'').rcg = +new Date;
Radix.map(disk, function(val, key){
// assume in memory for now, since both write/read already call r.find which will init it.
r.find(key, function(file){
@ -239,6 +243,8 @@
});
})
});
ST = (console.STAT||'').rce = +new Date;
LOG && opt.log(S, ST - S, "rad check");
}
r.find(key, g.find);
}
@ -264,6 +270,7 @@
p.read = function(err, data){ var tmp;
LOG && opt.log(S, +new Date - S, 'read disk', JSON.stringify(file), ++RPC, 'total all parses.');
delete Q[file];
LOG && opt.log(S, q.length, "read queue #");
if((p.err = err) || (p.not = !data)){ map(q, p.ack); return }
if('string' !== typeof data){
try{
@ -344,6 +351,7 @@
if(r.disk){ raw || (raw = (r.disk[file]||'').raw) }
if(raw){ return p.read(u, raw) }
opt.store.get(ename(file), p.read);
// TODO: What if memory disk gets filled with updates, and we get an old one back?
}
}());

View File

@ -16,11 +16,14 @@ Gun.on('create', function(root){
this.to.next(msg);
var id = msg['#'], put = msg.put, soul = put['#'], key = put['.'], val = put[':'], state = put['>'], tmp;
tmp = soul+esc+key; // soul+key; // be nice to move away from escaping
var S = (console.STAT||'').rp = +new Date;
dare(tmp, {':': val, '>': state}, function(err, ok){
LOG && Gun.log(S, +new Date - S, 'put2');
if(msg.ack){ msg.ack(err, ok || 1) } return; // REVISE THIS IN FUTURE!!!
if(err){ root.on('in', {'@': id, err: err}); return }
root.on('in', {'@': id, ok: ok});
});
(console.STAT||'').rps = +new Date;
});
root.on('put', function(msg){
@ -68,6 +71,7 @@ Gun.on('create', function(root){
root.on('get', function(msg){
this.to.next(msg);
(console.STAT||'').rg = +new Date;
var id = msg['#'], get = msg.get, soul = msg.get['#'], has = msg.get['.']||'', o = {}, graph, lex, key, tmp, force;
if('string' == typeof soul){
key = soul;
@ -112,7 +116,7 @@ Gun.on('create', function(root){
if(err){ opt.store.stats.get.err = err }
}catch(e){} // STATS!
//if(u === data && info.chunks > 1){ return } // if we already sent a chunk, ignore ending empty responses. // this causes tests to fail.
LOG && Gun.log(S, +new Date - S, 'got', JSON.stringify(key)); S = +new Date;
LOG && Gun.log(S, +new Date - S, 'got', JSON.stringify(key)); S = +new Date; (console.STAT||'').rgp = S;
info = info || '';
var va, ve;
if(info.unit && data && u !== (va = data[':']) && u !== (ve = data['>'])){ // new format
@ -131,15 +135,16 @@ Gun.on('create', function(root){
}
if(!graph && data){ each(data, '') }
}
console.STAT && (console.STAT.radgetcount = C);
if(LOG && (ST = +new Date - S) > 9){ Gun.log(S, ST, 'got prep time'); Gun.log(S, C, 'got prep #') } C = 0; S = +new Date;
(console.STAT||'').rgc = C;
// TODO: PERF NOTES! Can you cache these preps? May not be relevant anymore with new changes.
if(LOG && (ST = +new Date - S) > 9){ Gun.log(S, ST, 'got prep time'); Gun.log(S, C, 'got prep #') } C = 0; S = +new Date; (console.STAT||'').rgi = S;
var faith = function(){}; faith.faith = true; faith.rad = get; // HNPERF: We're testing performance improvement by skipping going through security again, but this should be audited.
root.on('in', {'@': id, put: graph, '%': info.more? 1 : u, err: err? err : u, _: faith});
LOG && (ST = +new Date - S) > 9 && Gun.log(S, ST, 'got emit', Object.keys(graph||{}).length);
LOG && (ST = ((console.STAT||'').rge = +new Date) - S) > 9 && Gun.log(S, ST, 'got emit', Object.keys(graph||{}).length);
graph = u; // each is outside our scope, we have to reset graph to nothing!
}, o);
LOG && (ST = +new Date - S) > 9 && Gun.log(S, ST, 'get call');
console.STAT && (console.STAT.radget = ST);
(console.STAT||'').rgs = ST;
function each(val, has, a,b){ // TODO: THIS CODE NEEDS TO BE FASTER!!!!
C++;
if(!val){ return }