From 89b24d3862f6dfee1f17cf98314f6bf02f75ceae Mon Sep 17 00:00:00 2001 From: Mark Nadal Date: Fri, 12 Aug 2022 18:17:01 -0700 Subject: [PATCH] subscribe only on backpropagation --- gun.js | 23 ++++++++++++----------- lib/axe.js | 30 +++++++++++++++++++++--------- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/gun.js b/gun.js index e21fa5c2..174b4a98 100644 --- a/gun.js +++ b/gun.js @@ -195,6 +195,7 @@ var it = s[id] || (s[id] = {}); it.was = dup.now = +new Date; if(!dup.to){ dup.to = setTimeout(dup.drop, opt.age + 9) } + if(dt.ed){ dt.ed(id) } return it; } dup.drop = function(age){ @@ -1449,12 +1450,17 @@ if(tmp = msg.ok){ msg._.near = tmp['/'] } var S = +new Date; DBG && (DBG.is = S); peer.SI = id; + dup_track.ed = function(d){ + if(id !== d){ return } + dup_track.ed = 0; + if(!(d = dup.s[id])){ return } + d.via = peer; + if(msg.get){ d.it = msg } + } root.on('in', mesh.last = msg); - //ECHO = msg.put || ECHO; !(msg.ok !== -3740) && mesh.say({ok: -3740, put: ECHO, '@': msg['#']}, peer); DBG && (DBG.hd = +new Date); console.STAT && console.STAT(S, +new Date - S, msg.get? 'msg get' : msg.put? 'msg put' : 'msg'); - (tmp = dup_track(id)).via = peer; // don't dedup message ID till after, cause GUN has internal dedup check. - if(msg.get){ tmp.it = msg } + dup_track(id); // in case 'in' does not call track. if(ash){ dup_track(ash) } //dup.track(tmp+hash, true).it = it(msg); mesh.leap = mesh.last = null; // warning! mesh.leap could be buggy. } @@ -1494,13 +1500,13 @@ !loop && dup_track(id);//.it = it(msg); // track for 9 seconds, default. Earth<->Mars would need more! // always track, maybe move this to the 'after' logic if we split function. //if(msg.put && (msg.err || (dup.s[id]||'').err)){ return false } // TODO: in theory we should not be able to stun a message, but for now going to check if it can help network performance preventing invalid data to relay. if(!(hash = msg['##']) && u !== msg.put && !meta.via && ack){ mesh.hash(msg, peer); return } // TODO: Should broadcasts be hashed? - if(!peer && ack){ peer = ((tmp = dup.s[ack]) && (tmp.via || ((tmp = tmp.it) && (tmp = tmp._) && tmp.via))) || ((tmp = mesh.last) && ack === tmp['#'] && mesh.leap) } // warning! mesh.leap could be buggy! mesh last check reduces this. + if(!peer && ack){ peer = ((tmp = dup.s[ack]) && (tmp.via || ((tmp = tmp.it) && (tmp = tmp._) && tmp.via))) || ((tmp = mesh.last) && ack === tmp['#'] && mesh.leap) } // warning! mesh.leap could be buggy! mesh last check reduces this. // TODO: CLEAN UP THIS LINE NOW? `.it` should be reliable. if(!peer && ack){ // still no peer, then ack daisy chain 'tunnel' got lost. if(dup.s[ack]){ return } // in dups but no peer hints that this was ack to ourself, ignore. console.STAT && console.STAT(+new Date, ++SMIA, 'total no peer to ack to'); // TODO: Delete this now. Dropping lost ACKs is protocol fine now. return false; } // TODO: Temporary? If ack via trace has been lost, acks will go to all peers, which trashes browser bandwidth. Not relaying the ack will force sender to ask for ack again. Note, this is technically wrong for mesh behavior. - if(ack && !msg.put && !hash && (ackit(ack)||'')['##']){ return false } // If we're saying 'not found' but a relay had data, do not bother sending our not found. // Is this correct, return false? // NOTE: ADD PANIC TEST FOR THIS! + if(ack && !msg.put && !hash && ((dup.s[ack]||'').it||'')['##']){ return false } // If we're saying 'not found' but a relay had data, do not bother sending our not found. // Is this correct, return false? // NOTE: ADD PANIC TEST FOR THIS! if(!peer && mesh.way){ return mesh.way(msg) } DBG && (DBG.yh = +new Date); if(!(raw = meta.raw)){ mesh.raw(msg, peer); return } @@ -1552,10 +1558,6 @@ console.STAT && (ack === peer.SI) && console.STAT(S, +new Date - peer.SH, 'say ack'); } mesh.say.c = mesh.say.d = 0; - function ackit(ack){ - var tmp = (dup.s[ack]||'').it || mesh.last || ''; - if(ack === tmp['#']){ return tmp } - } // TODO: this caused a out-of-memory crash! mesh.raw = function(msg, peer){ // TODO: Clean this up / delete it / move logic out! if(!msg){ return '' } @@ -1565,8 +1567,7 @@ var hash = msg['##'], ack = msg['@']; if(hash && ack){ if(!meta.via && dup_check(ack+hash)){ return false } // for our own out messages, memory & storage may ack the same thing, so dedup that. Tho if via another peer, we already tracked it upon hearing, so this will always trigger false positives, so don't do that! - if((tmp = (dup.s[ack]||'').it) || ((tmp = mesh.last) && ack === tmp['#'])){ - //if(tmp = ackit(ack)){ // REPLACE ABOVE LINE WITH THIS? CHECK IF SAFE FIRST! + if(tmp = (dup.s[ack]||'').it){ if(hash === tmp['##']){ return false } // if ask has a matching hash, acking is optional. if(!tmp['##']){ tmp['##'] = hash } // if none, add our hash to ask so anyone we relay to can dedup. // NOTE: May only check against 1st ack chunk, 2nd+ won't know and still stream back to relaying peers which may then dedup. Any way to fix this wasted bandwidth? I guess force rate limiting breaking change, that asking peer has to ask for next lexical chunk. } diff --git a/lib/axe.js b/lib/axe.js index 3f06c848..7a895f7f 100644 --- a/lib/axe.js +++ b/lib/axe.js @@ -27,13 +27,9 @@ function start(root){ if(!msg){ return } var via = (msg._||'').via, soul, has, tmp, ref; if(!via || !via.id){ return fall(msg) } - var sub = (via.sub || (via.sub = new Object.Map)); - if('string' == typeof (soul = msg.get['#'])){ ref = root.$.get(soul) } - if('string' == typeof (tmp = msg.get['.'])){ has = tmp } else { has = '' } - ref && (sub.get(soul) || (sub.set(soul, tmp = new Object.Map) && tmp)).set(has, 1); // {soul: {'':1, has: 1}} - if(!(ref = (ref||'')._)){ return fall(msg) } + // SUBSCRIPTION LOGIC MOVED TO GET'S ACK REPLY. + if(!(ref = REF(msg)._)){ return fall(msg) } ref.asked = +new Date; - (ref.route || (ref.route = new Object.Map)).set(via.id, via); // this approach is not gonna scale how I want it to, but try for now. GET.turn(msg, ref.route, 0); } GET.turn = function(msg, route, turn){ @@ -51,7 +47,7 @@ function start(root){ } setTimeout.each(next, function(id){ var peer = opt.peers[id]; turn++; - if(!peer || !peer.wire){ route && route.delete(id); return } // bye! + if(!peer || !peer.wire){ route && route.delete(id); return } // bye! // TODO: CHECK IF 0 OTHER PEERS & UNSUBSCRIBE if(mesh.say(msg, peer) === false){ return } // was self if(0 == (turn % 3)){ return 1 } }, function(){ @@ -60,10 +56,26 @@ function start(root){ }, 3); } function fall(msg){ mesh.say(msg, opt.peers) } + function REF(msg){ + var ref = '', soul, has, tmp; + if(!msg || !msg.get){ return ref } + if('string' == typeof (soul = msg.get['#'])){ ref = root.$.get(soul) } + if('string' == typeof (tmp = msg.get['.'])){ has = tmp } else { has = '' } + return ref; + } + function LEX(lex){ return (lex = lex || '')['='] || lex['*'] || lex['>'] || lex } root.on('in', function(msg){ var to = this.to, tmp; if((tmp = msg['@']) && (tmp = dup.s[tmp])){ tmp.ack = (tmp.ack || 0) + 1; // count remote ACKs to GET. // TODO: If mismatch, should trigger next asks. + if(tmp.it && tmp.it.get && msg.put){ // WHEN SEEING A PUT REPLY TO A GET... + var get = tmp.it.get||'', ref = REF(tmp.it)._, via = (tmp.it._||'').via||'', sub; + if(via && ref){ // SUBSCRIBE THE PEER WHO ASKED VIA FOR IT: + via.id && (ref.route || (ref.route = new Object.Map)).set(via.id, via); + sub = (via.sub || (via.sub = new Object.Map)); + ref && (sub.get(LEX(get['#'])) || (sub.set(LEX(get['#']), sub = new Object.Map) && sub)).set(LEX(get['.']), 1); // {soul: {'':1, has: 1}} + } + } if((tmp = tmp.back)){ // backtrack OKs since AXE splits PUTs up. setTimeout.each(Object.keys(tmp), function(id){ to.next({'#': msg['#'], '@': id, ok: msg.ok}); @@ -74,7 +86,8 @@ function start(root){ to.next(msg); }); - root.on('create', function(){ + root.on('create', function(root){ + this.to.next(root); var Q = {}; root.on('put', function(msg){ var eve = this, at = eve.as, put = msg.put, soul = put['#'], has = put['.'], val = put[':'], state = put['>'], q, tmp; @@ -82,7 +95,6 @@ function start(root){ if(msg['@']){ return } // acks send existing data, not updates, so no need to resend to others. if(!soul || !has){ return } var ref = root.$.get(soul)._, route = (ref||'').route; - //'test' === soul && console.log(Object.port, ''+msg['#'], has, val, route && route.keys()); if(!route){ return } if(ref.skip){ ref.skip.now = msg['#']; return } (ref.skip = {now: msg['#']}).to = setTimeout(function(){