mirror of
https://github.com/amark/gun.git
synced 2025-11-24 14:35:55 +00:00
subscribe only on backpropagation
This commit is contained in:
parent
d5c8a02980
commit
89b24d3862
23
gun.js
23
gun.js
@ -195,6 +195,7 @@
|
||||
var it = s[id] || (s[id] = {});
|
||||
it.was = dup.now = +new Date;
|
||||
if(!dup.to){ dup.to = setTimeout(dup.drop, opt.age + 9) }
|
||||
if(dt.ed){ dt.ed(id) }
|
||||
return it;
|
||||
}
|
||||
dup.drop = function(age){
|
||||
@ -1449,12 +1450,17 @@
|
||||
if(tmp = msg.ok){ msg._.near = tmp['/'] }
|
||||
var S = +new Date;
|
||||
DBG && (DBG.is = S); peer.SI = id;
|
||||
dup_track.ed = function(d){
|
||||
if(id !== d){ return }
|
||||
dup_track.ed = 0;
|
||||
if(!(d = dup.s[id])){ return }
|
||||
d.via = peer;
|
||||
if(msg.get){ d.it = msg }
|
||||
}
|
||||
root.on('in', mesh.last = msg);
|
||||
//ECHO = msg.put || ECHO; !(msg.ok !== -3740) && mesh.say({ok: -3740, put: ECHO, '@': msg['#']}, peer);
|
||||
DBG && (DBG.hd = +new Date);
|
||||
console.STAT && console.STAT(S, +new Date - S, msg.get? 'msg get' : msg.put? 'msg put' : 'msg');
|
||||
(tmp = dup_track(id)).via = peer; // don't dedup message ID till after, cause GUN has internal dedup check.
|
||||
if(msg.get){ tmp.it = msg }
|
||||
dup_track(id); // in case 'in' does not call track.
|
||||
if(ash){ dup_track(ash) } //dup.track(tmp+hash, true).it = it(msg);
|
||||
mesh.leap = mesh.last = null; // warning! mesh.leap could be buggy.
|
||||
}
|
||||
@ -1494,13 +1500,13 @@
|
||||
!loop && dup_track(id);//.it = it(msg); // track for 9 seconds, default. Earth<->Mars would need more! // always track, maybe move this to the 'after' logic if we split function.
|
||||
//if(msg.put && (msg.err || (dup.s[id]||'').err)){ return false } // TODO: in theory we should not be able to stun a message, but for now going to check if it can help network performance preventing invalid data to relay.
|
||||
if(!(hash = msg['##']) && u !== msg.put && !meta.via && ack){ mesh.hash(msg, peer); return } // TODO: Should broadcasts be hashed?
|
||||
if(!peer && ack){ peer = ((tmp = dup.s[ack]) && (tmp.via || ((tmp = tmp.it) && (tmp = tmp._) && tmp.via))) || ((tmp = mesh.last) && ack === tmp['#'] && mesh.leap) } // warning! mesh.leap could be buggy! mesh last check reduces this.
|
||||
if(!peer && ack){ peer = ((tmp = dup.s[ack]) && (tmp.via || ((tmp = tmp.it) && (tmp = tmp._) && tmp.via))) || ((tmp = mesh.last) && ack === tmp['#'] && mesh.leap) } // warning! mesh.leap could be buggy! mesh last check reduces this. // TODO: CLEAN UP THIS LINE NOW? `.it` should be reliable.
|
||||
if(!peer && ack){ // still no peer, then ack daisy chain 'tunnel' got lost.
|
||||
if(dup.s[ack]){ return } // in dups but no peer hints that this was ack to ourself, ignore.
|
||||
console.STAT && console.STAT(+new Date, ++SMIA, 'total no peer to ack to'); // TODO: Delete this now. Dropping lost ACKs is protocol fine now.
|
||||
return false;
|
||||
} // TODO: Temporary? If ack via trace has been lost, acks will go to all peers, which trashes browser bandwidth. Not relaying the ack will force sender to ask for ack again. Note, this is technically wrong for mesh behavior.
|
||||
if(ack && !msg.put && !hash && (ackit(ack)||'')['##']){ return false } // If we're saying 'not found' but a relay had data, do not bother sending our not found. // Is this correct, return false? // NOTE: ADD PANIC TEST FOR THIS!
|
||||
if(ack && !msg.put && !hash && ((dup.s[ack]||'').it||'')['##']){ return false } // If we're saying 'not found' but a relay had data, do not bother sending our not found. // Is this correct, return false? // NOTE: ADD PANIC TEST FOR THIS!
|
||||
if(!peer && mesh.way){ return mesh.way(msg) }
|
||||
DBG && (DBG.yh = +new Date);
|
||||
if(!(raw = meta.raw)){ mesh.raw(msg, peer); return }
|
||||
@ -1552,10 +1558,6 @@
|
||||
console.STAT && (ack === peer.SI) && console.STAT(S, +new Date - peer.SH, 'say ack');
|
||||
}
|
||||
mesh.say.c = mesh.say.d = 0;
|
||||
function ackit(ack){
|
||||
var tmp = (dup.s[ack]||'').it || mesh.last || '';
|
||||
if(ack === tmp['#']){ return tmp }
|
||||
}
|
||||
// TODO: this caused a out-of-memory crash!
|
||||
mesh.raw = function(msg, peer){ // TODO: Clean this up / delete it / move logic out!
|
||||
if(!msg){ return '' }
|
||||
@ -1565,8 +1567,7 @@
|
||||
var hash = msg['##'], ack = msg['@'];
|
||||
if(hash && ack){
|
||||
if(!meta.via && dup_check(ack+hash)){ return false } // for our own out messages, memory & storage may ack the same thing, so dedup that. Tho if via another peer, we already tracked it upon hearing, so this will always trigger false positives, so don't do that!
|
||||
if((tmp = (dup.s[ack]||'').it) || ((tmp = mesh.last) && ack === tmp['#'])){
|
||||
//if(tmp = ackit(ack)){ // REPLACE ABOVE LINE WITH THIS? CHECK IF SAFE FIRST!
|
||||
if(tmp = (dup.s[ack]||'').it){
|
||||
if(hash === tmp['##']){ return false } // if ask has a matching hash, acking is optional.
|
||||
if(!tmp['##']){ tmp['##'] = hash } // if none, add our hash to ask so anyone we relay to can dedup. // NOTE: May only check against 1st ack chunk, 2nd+ won't know and still stream back to relaying peers which may then dedup. Any way to fix this wasted bandwidth? I guess force rate limiting breaking change, that asking peer has to ask for next lexical chunk.
|
||||
}
|
||||
|
||||
30
lib/axe.js
30
lib/axe.js
@ -27,13 +27,9 @@ function start(root){
|
||||
if(!msg){ return }
|
||||
var via = (msg._||'').via, soul, has, tmp, ref;
|
||||
if(!via || !via.id){ return fall(msg) }
|
||||
var sub = (via.sub || (via.sub = new Object.Map));
|
||||
if('string' == typeof (soul = msg.get['#'])){ ref = root.$.get(soul) }
|
||||
if('string' == typeof (tmp = msg.get['.'])){ has = tmp } else { has = '' }
|
||||
ref && (sub.get(soul) || (sub.set(soul, tmp = new Object.Map) && tmp)).set(has, 1); // {soul: {'':1, has: 1}}
|
||||
if(!(ref = (ref||'')._)){ return fall(msg) }
|
||||
// SUBSCRIPTION LOGIC MOVED TO GET'S ACK REPLY.
|
||||
if(!(ref = REF(msg)._)){ return fall(msg) }
|
||||
ref.asked = +new Date;
|
||||
(ref.route || (ref.route = new Object.Map)).set(via.id, via); // this approach is not gonna scale how I want it to, but try for now.
|
||||
GET.turn(msg, ref.route, 0);
|
||||
}
|
||||
GET.turn = function(msg, route, turn){
|
||||
@ -51,7 +47,7 @@ function start(root){
|
||||
}
|
||||
setTimeout.each(next, function(id){
|
||||
var peer = opt.peers[id]; turn++;
|
||||
if(!peer || !peer.wire){ route && route.delete(id); return } // bye!
|
||||
if(!peer || !peer.wire){ route && route.delete(id); return } // bye! // TODO: CHECK IF 0 OTHER PEERS & UNSUBSCRIBE
|
||||
if(mesh.say(msg, peer) === false){ return } // was self
|
||||
if(0 == (turn % 3)){ return 1 }
|
||||
}, function(){
|
||||
@ -60,10 +56,26 @@ function start(root){
|
||||
}, 3);
|
||||
}
|
||||
function fall(msg){ mesh.say(msg, opt.peers) }
|
||||
function REF(msg){
|
||||
var ref = '', soul, has, tmp;
|
||||
if(!msg || !msg.get){ return ref }
|
||||
if('string' == typeof (soul = msg.get['#'])){ ref = root.$.get(soul) }
|
||||
if('string' == typeof (tmp = msg.get['.'])){ has = tmp } else { has = '' }
|
||||
return ref;
|
||||
}
|
||||
function LEX(lex){ return (lex = lex || '')['='] || lex['*'] || lex['>'] || lex }
|
||||
|
||||
root.on('in', function(msg){ var to = this.to, tmp;
|
||||
if((tmp = msg['@']) && (tmp = dup.s[tmp])){
|
||||
tmp.ack = (tmp.ack || 0) + 1; // count remote ACKs to GET. // TODO: If mismatch, should trigger next asks.
|
||||
if(tmp.it && tmp.it.get && msg.put){ // WHEN SEEING A PUT REPLY TO A GET...
|
||||
var get = tmp.it.get||'', ref = REF(tmp.it)._, via = (tmp.it._||'').via||'', sub;
|
||||
if(via && ref){ // SUBSCRIBE THE PEER WHO ASKED VIA FOR IT:
|
||||
via.id && (ref.route || (ref.route = new Object.Map)).set(via.id, via);
|
||||
sub = (via.sub || (via.sub = new Object.Map));
|
||||
ref && (sub.get(LEX(get['#'])) || (sub.set(LEX(get['#']), sub = new Object.Map) && sub)).set(LEX(get['.']), 1); // {soul: {'':1, has: 1}}
|
||||
}
|
||||
}
|
||||
if((tmp = tmp.back)){ // backtrack OKs since AXE splits PUTs up.
|
||||
setTimeout.each(Object.keys(tmp), function(id){
|
||||
to.next({'#': msg['#'], '@': id, ok: msg.ok});
|
||||
@ -74,7 +86,8 @@ function start(root){
|
||||
to.next(msg);
|
||||
});
|
||||
|
||||
root.on('create', function(){
|
||||
root.on('create', function(root){
|
||||
this.to.next(root);
|
||||
var Q = {};
|
||||
root.on('put', function(msg){
|
||||
var eve = this, at = eve.as, put = msg.put, soul = put['#'], has = put['.'], val = put[':'], state = put['>'], q, tmp;
|
||||
@ -82,7 +95,6 @@ function start(root){
|
||||
if(msg['@']){ return } // acks send existing data, not updates, so no need to resend to others.
|
||||
if(!soul || !has){ return }
|
||||
var ref = root.$.get(soul)._, route = (ref||'').route;
|
||||
//'test' === soul && console.log(Object.port, ''+msg['#'], has, val, route && route.keys());
|
||||
if(!route){ return }
|
||||
if(ref.skip){ ref.skip.now = msg['#']; return }
|
||||
(ref.skip = {now: msg['#']}).to = setTimeout(function(){
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user