ok ack + webrtc

This commit is contained in:
Mark Nadal 2022-05-18 18:10:36 -07:00
parent 94ab05b032
commit 904b2f8e7f
4 changed files with 161 additions and 24 deletions

View File

@ -0,0 +1,53 @@
<!DOCTYPE html>
<center>
<img id="img" width="100%"><br>
Stream <select id="select"><option id="from">from</option></select>
add <input id="pass" placeholder="password" type="password">
resolution <input id="res" value="240" type="number" step="16">
</center>
<video id="video" width="100%" controls autoplay style="display: none;"></video>
<canvas id="canvas" width="0" style="display: none;"></canvas>
<script src="../jquery.js"></script>
<script src="../../../gun/gun.js"></script>
<script src="../../../gun/sea.js"></script>
<script src="../../../gun/lib/webrtc.js"></script>
<script>;(async function(){
gun = Gun(location.origin + '/gun'); //gun = GUN();
stream = canvas.getContext('2d'), stream.from = navigator.mediaDevices;
(await stream.from.enumerateDevices()).forEach((device,i) => {
if('videoinput' !== device.kind){ return }
var opt = $(from).clone().prependTo('select').get(0);
$(opt).text(opt.id = device.label || 'Camera '+i);
opt.value = device.deviceId;
});
$('select').on('change', async eve => { $(from).text('Off'); // update label
if('Off' == select.value){ return video.srcObject = null }
video.srcObject = await stream.from.getUserMedia({ audio: false,
video: (select.value && {deviceId: {exact: select.value}}) || {facingMode: "environment"}
});
});
setInterval(async tmp => {
if(!video.srcObject){ return }
var size = parseInt(res.value);
stream.drawImage(video, 0,0,
canvas.width = size || video.videoWidth * 0.1,
canvas.height = (size * (video.videoHeight/video.videoWidth)) || video.videoHeight * 0.1
);
var b64 = canvas.toDataURL('image/jpeg');
if(pass.value){ b64 = await SEA.encrypt(b64, pass.value) }
gun.get('test').get('video').put(b64);
}, 99);
gun.get('test').get('video').on(async data => {
if(pass.value){ data = await SEA.decrypt(data, pass.value) }
img.src = data;
});
}());</script>

12
gun.js
View File

@ -370,6 +370,7 @@
}
ctx.stun++; // TODO: 'forget' feature in SEA tied to this, bad approach, but hacked in for now. Any changes here must update there.
var aid = msg['#']+ctx.all++, id = {toString: function(){ return aid }, _: ctx}; id.toJSON = id.toString; // this *trick* makes it compatible between old & new versions.
root.dup.track(id)['#'] = msg['#']; // fixes new OK acks for RPC like RTC.
DBG && (DBG.ph = DBG.ph || +new Date);
root.on('put', {'#': id, '@': msg['@'], put: {'#': soul, '.': key, ':': val, '>': state}, _: ctx});
}
@ -399,13 +400,18 @@
}
function ack(msg){ // aggregate ACKs.
var id = msg['@'] || '', ctx;
if(!(ctx = id._)){ return }
if(!(ctx = id._)){
var dup = (dup = msg.$) && (dup = dup._) && (dup = dup.root) && (dup = dup.dup);
if(!(dup = dup.check(id))){ return }
msg['@'] = dup['#'] || msg['@'];
return;
}
ctx.acks = (ctx.acks||0) + 1;
if(ctx.err = msg.err){
msg['@'] = ctx['#'];
fire(ctx); // TODO: BUG? How it skips/stops propagation of msg if any 1 item is error, this would assume a whole batch/resync has same malicious intent.
}
if(!ctx.stop && !ctx.crack){ ctx.crack = ctx.match && ctx.match.push(function(){back(ctx)}) } // handle synchronous acks
if(!ctx.stop && !ctx.crack){ ctx.crack = ctx.match && ctx.match.push(function(){back(ctx)}) } // handle synchronous acks. NOTE: If a storage peer ACKs synchronously then the PUT loop has not even counted up how many items need to be processed, so ctx.STOP flags this and adds only 1 callback to the end of the PUT loop.
back(ctx);
}
function back(ctx){
@ -1092,7 +1098,7 @@
setTimeout.each(Object.keys(stun = stun.add||''), function(cb){ if(cb = stun[cb]){cb()} }); // resume the stunned reads // Any perf reasons to CPU schedule this .keys( ?
}).hatch = tmp; // this is not official yet ^
//console.log(1, "PUT", as.run, as.graph);
(as.via._).on('out', {put: as.out = as.graph, opt: as.opt, '#': ask, _: tmp});
(as.via._).on('out', {put: as.out = as.graph, ok: as.ok || as.opt, opt: as.opt, '#': ask, _: tmp});
}; ran.end = function(stun,root){
stun.end = noop; // like with the earlier id, cheaper to make this flag a function so below callbacks do not have to do an extra type check.
if(stun.the.to === stun && stun === stun.the.last){ delete root.stun }

View File

@ -33,30 +33,38 @@
// The above change corrects at least firefox RTC Peer handler where it **throws** on over 6 ice servers, and updates url: to urls: removing deprecation warning
opt.rtc.dataChannel = opt.rtc.dataChannel || {ordered: false, maxRetransmits: 2};
opt.rtc.sdp = opt.rtc.sdp || {mandatory: {OfferToReceiveAudio: false, OfferToReceiveVideo: false}};
opt.rtc.max = opt.rtc.max || 55; // is this a magic number?
opt.rtc.room = opt.rtc.room || Gun.window && (location.hash.slice(1) || location.pathname.slice(1));
opt.announce = function(to){
root.on('out', {rtc: {id: opt.pid, to:to}}); // announce ourself
opt.rtc.start = +new Date; // handle room logic:
root.$.get('/RTC/'+opt.rtc.room+'<?99').get('+').put(opt.pid, function(ack){
if(!ack.ok || !ack.ok.rtc){ return }
open(ack);
}, {acks: opt.rtc.max}).on(function(last,key, msg){
if(last === opt.pid || opt.rtc.start > msg.put['>']){ return }
open({'#': ''+msg['#'], ok: {rtc: {id: last}}});
});
};
var mesh = opt.mesh = opt.mesh || Gun.Mesh(root);
root.on('create', function(at){
this.to.next(at);
setTimeout(opt.announce, 1);
});
root.on('in', function(msg){
if(msg.rtc){ open(msg) }
this.to.next(msg);
});
function open(msg){
var rtc = msg.rtc, peer, tmp;
if(!rtc || !rtc.id){ return }
delete opt.announce[rtc.id]; /// remove after connect
if(this && this.off){ this.off() } // Ignore this, because of ask / ack.
if(!msg.ok){ return }
var rtc = msg.ok.rtc, peer, tmp;
if(!rtc || !rtc.id || rtc.id === opt.pid){ return }
//console.log("webrtc:", rtc);
if(tmp = rtc.answer){
if(!(peer = opt.peers[rtc.id] || open[rtc.id]) || peer.remoteSet){ return }
tmp.sdp = tmp.sdp.replace(/\\r\\n/g, '\r\n')
tmp.sdp = tmp.sdp.replace(/\\r\\n/g, '\r\n');
return peer.setRemoteDescription(peer.remoteSet = new opt.RTCSessionDescription(tmp));
}
if(tmp = rtc.candidate){
peer = opt.peers[rtc.id] || open[rtc.id] || open({rtc: {id: rtc.id}});
peer = opt.peers[rtc.id] || open[rtc.id] || open({ok: {rtc: {id: rtc.id}}});
return peer.addIceCandidate(new opt.RTCIceCandidate(tmp));
}
//if(opt.peers[rtc.id]){ return }
@ -64,23 +72,23 @@
(peer = new opt.RTCPeerConnection(opt.rtc)).id = rtc.id;
var wire = peer.wire = peer.createDataChannel('dc', opt.rtc.dataChannel);
open[rtc.id] = peer;
wire.onclose = function(){
delete open[rtc.id];
mesh.bye(peer);
//reconnect(peer);
};
wire.onerror = function(err){};
wire.to = setTimeout(function(){delete open[rtc.id]},1000*60);
wire.onclose = function(){ mesh.bye(peer) };
wire.onerror = function(err){ };
wire.onopen = function(e){
//delete open[rtc.id];
delete open[rtc.id];
mesh.hi(peer);
//clearTimeout(wire.to);
//delete open[rtc.id];
}
wire.onmessage = function(msg){
if(!msg){ return }
console.log(opt.pid, "HEARD FROM WEBRTC:");
mesh.hear(msg.data || msg, peer);
};
peer.onicecandidate = function(e){ // source: EasyRTC!
if(!e.candidate){ return }
root.on('out', {'@': msg['#'], rtc: {candidate: e.candidate, id: opt.pid}});
root.on('out', {'@': msg['#'], ok: {rtc: {candidate: e.candidate, id: opt.pid}}});
}
peer.ondatachannel = function(e){
var rc = e.channel;
@ -93,16 +101,16 @@
peer.setRemoteDescription(new opt.RTCSessionDescription(tmp));
peer.createAnswer(function(answer){
peer.setLocalDescription(answer);
root.on('out', {'@': msg['#'], rtc: {answer: answer, id: opt.pid}});
root.on('out', {'@': msg['#'], ok: {rtc: {answer: answer, id: opt.pid}}});
}, function(){}, opt.rtc.sdp);
return;
}
peer.createOffer(function(offer){
peer.setLocalDescription(offer);
root.on('out', {'@': msg['#'], rtc: {offer: offer, id: opt.pid}});
root.on('out', {'@': msg['#'], '#': root.ask(open), ok: {rtc: {offer: offer, id: opt.pid}}});
}, function(){}, opt.rtc.sdp);
return peer;
}
});
var noop = function(){};
}());
}());

View File

@ -4045,6 +4045,76 @@ describe('Gun', function(){
nopasstun(0, gunB);
nopasstun(done, gunC);
}, 100);
});
it('ack aggregation bypass', function(done){
var alice = GUN({localStorage: false});
var bob = GUN({localStorage: false});
var carl = GUN({localStorage: false});
var adam = alice.back('opt.mesh');
var asay = adam.say;
var bdam = bob.back('opt.mesh');
var bsay = bdam.say;
var cdam = carl.back('opt.mesh');
var csay = cdam.say;
//console.only.i = 1;
adam.say = function(raw, peer){
console.only(2, 'adam says:', raw);
console.only(1, '...');
bdam.hear((raw.length && raw) || JSON.stringify(raw), {});
asay(raw, peer);
}
bdam.say = function(raw, peer){
console.only(7, "bob the relay is like YO", raw);
adam.hear((raw.length && raw) || JSON.stringify(raw), {});
cdam.hear((raw.length && raw) || JSON.stringify(raw), {});
bsay(raw, peer);
}
cdam.say = function(raw, peer){
console.only(4, "carl speaks out:", raw);
console.only(3, "...");
bdam.hear((raw.length && raw) || JSON.stringify(raw), {});
csay(raw, peer);
}
carl.on('put', async function(msg){
this.to.next(msg);
var tmp = msg.put;
//if(Math.random() > 0.5){ return; }
//console.log(msg.put);
//localStorage[tmp['#']+tmp['.']] = tmp[':'];
setTimeout(function(){
carl.on('out', {'@': msg['#']+'', ok: {BANANA: 9}});
}, 10);
});
alice.on('get', function(msg){ setTimeout(function(){ Gun.on.get.ack(msg); },9) })
setTimeout(async function(){
var pair = await SEA.pair();
var user = alice.user();
setTimeout(function(){
var c = 0;
//alice.on('auth', function(){
alice.get('test').put({a: 1, b: 2, c: 3}, function(ack){
//console.log("my data got saved?", ack);
if(ack.ok.BANANA && ++c === c){
done();
}
}, {acks: 99});
//}); user.auth(pair);
},10);
}, 100);
});
/*it.only('Make sure circular contexts are not copied', function(done){