This commit is contained in:
Mark Nadal 2022-05-29 19:43:24 -07:00
parent 55682b6f4b
commit 2beb258b4c
5 changed files with 57 additions and 36 deletions

View File

@ -1,10 +1,11 @@
<!DOCTYPE html>
<center>
<img id="img" width="100%"><br>
<img id="img" width="100%"><br/>
Stream <select id="select"><option id="from">from</option></select>
add <input id="pass" placeholder="password" type="password">
resolution <input id="res" value="240" type="number" step="16">
resolution <input id="res" value="240" step="32" max="1080" type="number" style="width:3em;">
or <input id="upload" type="file">
</center>
<video id="video" width="100%" controls autoplay style="display: none;"></video>
<canvas id="canvas" width="0" style="display: none;"></canvas>
@ -27,14 +28,15 @@ stream = canvas.getContext('2d'), stream.from = navigator.mediaDevices;
});
$('select').on('change', async eve => { $(from).text('Off'); // update label
if('Off' == select.value){ return video.srcObject = null }
if('Off' == select.value){ return video.srcObject.getTracks()[0].stop() }
video.srcObject = await stream.from.getUserMedia({ audio: false,
video: (select.value && {deviceId: {exact: select.value}}) || {facingMode: "environment"}
});
});
$('#upload').on('change', async eve => { console.log("Check ./upload.html") })
setInterval(async tmp => {
if(!video.srcObject){ return }
if(!(video.srcObject||'').active){ return }
var size = parseInt(res.value);
stream.drawImage(video, 0,0,
canvas.width = size || video.videoWidth * 0.1,
@ -47,7 +49,7 @@ setInterval(async tmp => {
gun.get('test').get('video').on(async data => {
if(pass.value){ data = await SEA.decrypt(data, pass.value) }
img.src = data;
img.src = data; // Beware: Some browsers memory leak fast src updates.
});
}());</script>

27
gun.js
View File

@ -372,7 +372,7 @@
var aid = msg['#']+ctx.all++, id = {toString: function(){ return aid }, _: ctx}; id.toJSON = id.toString; // this *trick* makes it compatible between old & new versions.
root.dup.track(id)['#'] = msg['#']; // fixes new OK acks for RPC like RTC.
DBG && (DBG.ph = DBG.ph || +new Date);
root.on('put', {'#': id, '@': msg['@'], put: {'#': soul, '.': key, ':': val, '>': state}, _: ctx});
root.on('put', {'#': id, '@': msg['@'], put: {'#': soul, '.': key, ':': val, '>': state}, ok: msg.ok, _: ctx});
}
function map(msg){
var DBG; if(DBG = (msg._||'').DBG){ DBG.pa = +new Date; DBG.pm = DBG.pm || +new Date}
@ -399,11 +399,11 @@
CF(); // courtesy check;
}
function ack(msg){ // aggregate ACKs.
var id = msg['@'] || '', ctx;
var id = msg['@'] || '', ctx, ok, tmp;
if(!(ctx = id._)){
var dup = (dup = msg.$) && (dup = dup._) && (dup = dup.root) && (dup = dup.dup);
if(!(dup = dup.check(id))){ return }
msg['@'] = dup['#'] || msg['@'];
msg['@'] = dup['#'] || msg['@']; // This doesn't do anything anymore, backtrack it to something else?
return;
}
ctx.acks = (ctx.acks||0) + 1;
@ -411,13 +411,14 @@
msg['@'] = ctx['#'];
fire(ctx); // TODO: BUG? How it skips/stops propagation of msg if any 1 item is error, this would assume a whole batch/resync has same malicious intent.
}
ctx.ok = msg.ok || ctx.ok;
if(!ctx.stop && !ctx.crack){ ctx.crack = ctx.match && ctx.match.push(function(){back(ctx)}) } // handle synchronous acks. NOTE: If a storage peer ACKs synchronously then the PUT loop has not even counted up how many items need to be processed, so ctx.STOP flags this and adds only 1 callback to the end of the PUT loop.
back(ctx);
}
function back(ctx){
if(!ctx || !ctx.root){ return }
if(ctx.stun || ctx.acks !== ctx.all){ return }
ctx.root.on('in', {'@': ctx['#'], err: ctx.err, ok: ctx.err? u : {'':1}});
ctx.root.on('in', {'@': ctx['#'], err: ctx.err, ok: ctx.err? u : ctx.ok || {'':1}});
}
var ERR = "Error: Invalid graph!";
@ -1087,7 +1088,7 @@
if(as.todo.length || as.end || !Object.empty(as.wait)){ return } as.end = 1;
var cat = (as.$.back(-1)._), root = cat.root, ask = cat.ask(function(ack){
root.on('ack', ack);
if(ack.err){ Gun.log(ack) }
if(ack.err && as.ok){ Gun.log(ack) }
if(++acks > (as.acks || 0)){ this.off() } // Adjustable ACKs! Only 1 by default.
if(!as.ack){ return }
as.ack(ack, this);
@ -1098,7 +1099,7 @@
setTimeout.each(Object.keys(stun = stun.add||''), function(cb){ if(cb = stun[cb]){cb()} }); // resume the stunned reads // Any perf reasons to CPU schedule this .keys( ?
}).hatch = tmp; // this is not official yet ^
//console.log(1, "PUT", as.run, as.graph);
(as.via._).on('out', {put: as.out = as.graph, ok: as.ok || as.opt, opt: as.opt, '#': ask, _: tmp});
(as.via._).on('out', {put: as.out = as.graph, ok: as.ok && {'@': as.ok+1}, opt: as.opt, '#': ask, _: tmp});
}; ran.end = function(stun,root){
stun.end = noop; // like with the earlier id, cheaper to make this flag a function so below callbacks do not have to do an extra type check.
if(stun.the.to === stun && stun === stun.the.last){ delete root.stun }
@ -1428,6 +1429,7 @@
dup_track(id);
return;
}
if(tmp = msg.ok){ msg._.near = tmp['/'] }
var S = +new Date;
DBG && (DBG.is = S); peer.SI = id;
root.on('in', mesh.last = msg);
@ -1473,14 +1475,15 @@
var DBG = msg.DBG, S = +new Date; meta.y = meta.y || S; if(!peer){ DBG && (DBG.y = S) }
if(!(id = msg['#'])){ id = msg['#'] = String.random(9) }
!loop && dup_track(id);//.it = it(msg); // track for 9 seconds, default. Earth<->Mars would need more! // always track, maybe move this to the 'after' logic if we split function.
if(msg.put && (msg.err || (dup.s[id]||'').err)){ return false } // TODO: in theory we should not be able to stun a message, but for now going to check if it can help network performance preventing invalid data to relay.
//if(msg.put && (msg.err || (dup.s[id]||'').err)){ return false } // TODO: in theory we should not be able to stun a message, but for now going to check if it can help network performance preventing invalid data to relay.
if(!(hash = msg['##']) && u !== msg.put && !meta.via && ack){ mesh.hash(msg, peer); return } // TODO: Should broadcasts be hashed?
if(!peer && ack){ peer = ((tmp = dup.s[ack]) && (tmp.via || ((tmp = tmp.it) && (tmp = tmp._) && tmp.via))) || ((tmp = mesh.last) && ack === tmp['#'] && mesh.leap) } // warning! mesh.leap could be buggy! mesh last check reduces this.
if(!peer && ack){ // still no peer, then ack daisy chain lost.
if(!peer && ack){ // still no peer, then ack daisy chain 'tunnel' got lost.
if(dup.s[ack]){ return } // in dups but no peer hints that this was ack to self, ignore.
console.STAT && console.STAT(+new Date, ++SMIA, 'total no peer to ack to');
return false;
} // TODO: Temporary? If ack via trace has been lost, acks will go to all peers, which trashes browser bandwidth. Not relaying the ack will force sender to ask for ack again. Note, this is technically wrong for mesh behavior.
if(msg.put && (tmp = msg.ok)){ msg.ok = {'@':(tmp['@']||1)-1, '/': (tmp['/']==msg._.near)? mesh.near : tmp['/']}; }
if(!peer && mesh.way){ return mesh.way(msg) }
DBG && (DBG.yh = +new Date);
if(!(raw = meta.raw)){ mesh.raw(msg, peer); return }
@ -1546,7 +1549,7 @@
if(!tmp['##']){ tmp['##'] = hash } // if none, add our hash to ask so anyone we relay to can dedup. // NOTE: May only check against 1st ack chunk, 2nd+ won't know and still stream back to relaying peers which may then dedup. Any way to fix this wasted bandwidth? I guess force rate limiting breaking change, that asking peer has to ask for next lexical chunk.
}
}
if(!msg.dam){
if(!msg.dam && !msg['@']){
var i = 0, to = []; tmp = opt.peers;
for(var k in tmp){ var p = tmp[k]; // TODO: Make it up peers instead!
to.push(p.url || p.pid || p.id);
@ -1600,6 +1603,7 @@
(peer.queue = peer.queue || []).push(raw);
}}
mesh.near = 0; // TODO: BUG! Mesh.near buggy, FIX!
mesh.hi = function(peer){
var wire = peer.wire, tmp;
if(!wire){ mesh.wire((peer.length && {url: peer, id: peer}) || peer); return }
@ -1620,6 +1624,7 @@
//Type.obj.native && Type.obj.native(); // dirty place to check if other JS polluted.
}
mesh.bye = function(peer){
--mesh.near;
root.on('bye', peer);
var tmp = +(new Date); tmp = (tmp - (peer.met||tmp));
mesh.bye.time = ((mesh.bye.time || tmp) + tmp) / 2;
@ -1650,12 +1655,12 @@
var gets = {};
root.on('bye', function(peer, tmp){ this.to.next(peer);
if(tmp = console.STAT){ tmp.peers = (tmp.peers || 0) - 1; }
if(tmp = console.STAT){ tmp.peers = mesh.near; }
if(!(tmp = peer.url)){ return } gets[tmp] = true;
setTimeout(function(){ delete gets[tmp] },opt.lack || 9000);
});
root.on('hi', function(peer, tmp){ this.to.next(peer);
if(tmp = console.STAT){ tmp.peers = (tmp.peers || 0) + 1 }
if(tmp = console.STAT){ tmp.peers = mesh.near }
if(!(tmp = peer.url) || !gets[tmp]){ return } delete gets[tmp];
if(opt.super){ return } // temporary (?) until we have better fix/solution?
setTimeout.each(Object.keys(root.next), function(soul){ var node = root.next[soul]; // TODO: .keys( is slow

View File

@ -5,6 +5,8 @@
var Gun = (typeof window !== "undefined")? window.Gun : require('../gun'), u;
Gun.on('opt', function(at){ start(at); this.to.next(at) }); // make sure to call the "next" middleware adapter.
// For Future WebRTC notes: Chrome 500 max limit, however 256 likely - FF "none", webtorrent does 55 per torrent (thanks Feross! How'd you come to this number?).
function start(root){
if(root.axe){ return }
var opt = root.opt, peers = opt.peers;
@ -60,27 +62,34 @@ function start(root){
}
function fall(msg){ mesh.say(msg, opt.peers) }
root.on('in', function(msg){ var tmp;
root.on('in', function(msg){ var to = this.to, tmp;
if((tmp = msg['@']) && (tmp = dup.s[tmp])){
tmp.ack = (tmp.ack || 0) + 1; // count remote ACKs to GET. // TODO: If mismatch, should trigger next asks.
if((tmp = tmp.back)){ // backtrack OKs since AXE splits PUTs up.
setTimeout.each(Object.keys(tmp), function(id){
to.next({'#': msg['#'], '@': id, ok: msg.ok});
});
return;
}
}
this.to.next(msg);
to.next(msg);
});
root.on('create', function(){
var Q = {};
root.on('put', function(msg){
var eve = this, at = eve.as, put = msg.put, soul = put['#'], has = put['.'], val = put[':'], state = put['>'], id = msg['#'], tmp;
var eve = this, at = eve.as, put = msg.put, soul = put['#'], has = put['.'], val = put[':'], state = put['>'], q, tmp;
eve.to.next(msg);
if(msg['@']){ return } // acks send existing data, not updates, so no need to resend to others.
if(!soul || !has){ return }
var ref = root.$.get(soul)._, route = (ref||'').route;
//'test' === soul && console.log(Object.port, ''+msg['#'], has, val, route && route.keys());
if(!route){ return }
if(Q[soul+has]){ return; } (Q[soul+has] = setTimeout(function(){ delete Q[soul+has]; // TODO: add debounce here!? hmm, scope would need sub. // Q is a quick hack!
setTimeout.each(Object.maps(route), function(id){ var peer, tmp;
if(!(peer = route.get(id))){ return }
if(!peer.wire){ route.delete(id); return } // bye!
q = Q[tmp = soul+has] || (Q[tmp] = {});
if(!Q[soul+has]){ setTimeout(function(){ id = Q[soul+has]; delete Q[soul+has]; // TODO: add debounce here!? hmm, scope would need sub. // Q is a quick hack!
setTimeout.each(Object.maps(route), function(pid){ var peer, tmp;
if(!(peer = route.get(pid))){ return }
if(!peer.wire){ route.delete(pid); return } // bye!
var sub = (peer.sub || (peer.sub = new Object.Map)).get(soul);
if(!sub){ return }
if(!sub.get(has) && !sub.get('')){ return }
@ -91,15 +100,19 @@ function start(root){
val = tmp;
}
put[soul] = state_ify(put[soul], has, state, val, soul);
tmp = dup.track(peer.next = peer.next || String.random(9));
(tmp.back || (tmp.back = {}))[''+id] = 1;
if(peer.to){ return }
peer.to = setTimeout(function(){ flush(peer) }, opt.gap);
});
}, 9));
}) }, 9) }
Q[soul+has] = msg['#'];
});
});
function flush(peer){
var msg = {put: peer.put};
peer.put = peer.to = null;
var msg = {'#': peer.next, put: peer.put, ok: {'@': 4, '/': mesh.near}}; // BUG: TODO: sub count!
// TODO: what about DAM's >< dedup? Current thinking is, don't use it, however, you could store first msg# & latest msg#, and if here... latest === first then likely it is the same >< thing, so if(firstMsg['><'][peer.id]){ return } don't send.
peer.next = peer.put = peer.to = null;
mesh.say(msg, peer);
}
var state_ify = Gun.state.ify, state_is = Gun.state.is;
@ -128,7 +141,7 @@ function start(root){
}());
;(function(){ // THIS IS THE MOB MODULE;
return; // unfinished
return; // WORK IN PROGRESS, TEST FINALIZED, NEED TO MAKE STABLE.
/*
AXE should have a couple of threshold items...
let's pretend there is a variable max peers connected
@ -147,14 +160,12 @@ function start(root){
root.on('hi', function(peer){
this.to.next(peer);
if(peer.url){ return } // I am assuming that if we are wanting to make an outbound connection to them, that we don't ever want to drop them unless our actual config settings change.
var count = Object.keys(opt.peers).length;
var count = Object.keys(opt.peers).length || mesh.near; // TODO: BUG! This is slow, use .near, but near is buggy right now, fix in DAM.
if(opt.mob >= count){ return } // TODO: Make dynamic based on RAM/CPU also. Or possibly even weird stuff like opt.mob / axe.up length?
var peers = [];Object.keys(axe.up).forEach(function(p){ p = axe.up[p]; p.url && peers.push(p.url) });
//console.log(Object.port, 'mobbed?', peer.pid, opt.mob, count, Object.keys(opt.peers)+'', 'bye', peer.pid || peer.id);
if(!peers.length){ return }
var peers = {};Object.keys(axe.up).forEach(function(p){ p = axe.up[p]; p.url && (peers[p.url]={}) });
// TODO: BUG!!! Infinite reconnection loop happens if not enough relays, or if some are missing. For instance, :8766 says to connect to :8767 which then says to connect to :8766. To not DDoS when system overload, figure clever way to tell peers to retry later, that network does not have enough capacity?
mesh.say({dam: 'mob', mob: count, peers: peers}, peer);
//setTimeout(function(){ mesh.bye(peer) }, 9); // something with better perf? // UNCOMMENT WHEN WE ACTIVATE THIS FEATURE
setTimeout(function(){ mesh.bye(peer) }, 9); // something with better perf?
});
root.on('bye', function(peer){
this.to.next(peer);

View File

@ -86,7 +86,7 @@ describe("Put ACK", function(){
}
}
console.log(port, " connect to ", peers);
var gun = Gun({file: env.i+'data', peers: peers, web: server, axe: false}); // not working with axe currently!
var gun = Gun({file: env.i+'data', peers: peers, web: server}); // Note: test with AXE on & off.
server.listen(port, function(){
test.done();
});

View File

@ -25,6 +25,8 @@ We want to then see the peers move to the other relays, such that the 3 have 100
If it does, the test passes.
(Note: At the end of this test, it uses GUN to sync data about what peers are connected to whom. While this is useful in that it also verifies that sync between b1 <-> b3 works regardless of whether direct or indirect connections, as such it could result in errors: If GUN has a bug, the AXE test may fail even if it is not the fault of AXE, and likewise - the usage of GUN in this test is contrived, it passing has 0 correlation that GUN is correctly handling the sync logic. In fact, assume it is not, make sure you use another test to verify that.)
Thanks @Drader for helping with these tests!!!!
*/
// <-- PANIC template, copy & paste, tweak a few settings if needed...
@ -133,9 +135,10 @@ describe("Mob test.", function(){
mesh.hear['mob'] = function(msg, peer){
// TODO: NOTE, code AXE DHT to aggressively drop new peers AFTER superpeer sends this rebalance/disconnect message that contains some other superpeers.
clearTimeout(gun.TO); gun.TO = setTimeout(end, 2000);
console.log("getting mobbed", msg);
if(!msg.peers){ return }
var one = msg.peers[Math.floor(Math.random()*msg.peers.length)];
console.log('Browser', env.i, 'chooses', one, 'from', JSON.stringify(msg.peers), 'that', peer.url, 'suggested, because it is mobbed.');//, 'from', msg.peers+'');
var peers = Object.keys(msg.peers), one = peers[Math.floor(Math.random()*peers.length)];
console.log('Browser', env.i, 'chooses', one, 'from', JSON.stringify(peers), 'that', peer.url, 'suggested, because it is mobbed.');//, 'from', msg.peers+'');
mesh.bye(peer); // Idea: Should keep track of failed ones to reduce repeats. For another feature/module that deserves its own separate test.
mesh.hi(one);
}