diff --git a/examples/basic/stream.html b/examples/basic/stream.html
new file mode 100644
index 00000000..ae86f548
--- /dev/null
+++ b/examples/basic/stream.html
@@ -0,0 +1,53 @@
+
+
+
+ ![]()
+ Stream
+ add
+ resolution
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/gun.js b/gun.js
index 705dcf4d..f1bd67d5 100644
--- a/gun.js
+++ b/gun.js
@@ -370,6 +370,7 @@
}
ctx.stun++; // TODO: 'forget' feature in SEA tied to this, bad approach, but hacked in for now. Any changes here must update there.
var aid = msg['#']+ctx.all++, id = {toString: function(){ return aid }, _: ctx}; id.toJSON = id.toString; // this *trick* makes it compatible between old & new versions.
+ root.dup.track(id)['#'] = msg['#']; // fixes new OK acks for RPC like RTC.
DBG && (DBG.ph = DBG.ph || +new Date);
root.on('put', {'#': id, '@': msg['@'], put: {'#': soul, '.': key, ':': val, '>': state}, _: ctx});
}
@@ -399,13 +400,18 @@
}
function ack(msg){ // aggregate ACKs.
var id = msg['@'] || '', ctx;
- if(!(ctx = id._)){ return }
+ if(!(ctx = id._)){
+ var dup = (dup = msg.$) && (dup = dup._) && (dup = dup.root) && (dup = dup.dup);
+ if(!(dup = dup.check(id))){ return }
+ msg['@'] = dup['#'] || msg['@'];
+ return;
+ }
ctx.acks = (ctx.acks||0) + 1;
if(ctx.err = msg.err){
msg['@'] = ctx['#'];
fire(ctx); // TODO: BUG? How it skips/stops propagation of msg if any 1 item is error, this would assume a whole batch/resync has same malicious intent.
}
- if(!ctx.stop && !ctx.crack){ ctx.crack = ctx.match && ctx.match.push(function(){back(ctx)}) } // handle synchronous acks
+ if(!ctx.stop && !ctx.crack){ ctx.crack = ctx.match && ctx.match.push(function(){back(ctx)}) } // handle synchronous acks. NOTE: If a storage peer ACKs synchronously then the PUT loop has not even counted up how many items need to be processed, so ctx.STOP flags this and adds only 1 callback to the end of the PUT loop.
back(ctx);
}
function back(ctx){
@@ -1092,7 +1098,7 @@
setTimeout.each(Object.keys(stun = stun.add||''), function(cb){ if(cb = stun[cb]){cb()} }); // resume the stunned reads // Any perf reasons to CPU schedule this .keys( ?
}).hatch = tmp; // this is not official yet ^
//console.log(1, "PUT", as.run, as.graph);
- (as.via._).on('out', {put: as.out = as.graph, opt: as.opt, '#': ask, _: tmp});
+ (as.via._).on('out', {put: as.out = as.graph, ok: as.ok || as.opt, opt: as.opt, '#': ask, _: tmp});
}; ran.end = function(stun,root){
stun.end = noop; // like with the earlier id, cheaper to make this flag a function so below callbacks do not have to do an extra type check.
if(stun.the.to === stun && stun === stun.the.last){ delete root.stun }
diff --git a/lib/webrtc.js b/lib/webrtc.js
index abc887cb..07bb5d71 100644
--- a/lib/webrtc.js
+++ b/lib/webrtc.js
@@ -33,30 +33,38 @@
// The above change corrects at least firefox RTC Peer handler where it **throws** on over 6 ice servers, and updates url: to urls: removing deprecation warning
opt.rtc.dataChannel = opt.rtc.dataChannel || {ordered: false, maxRetransmits: 2};
opt.rtc.sdp = opt.rtc.sdp || {mandatory: {OfferToReceiveAudio: false, OfferToReceiveVideo: false}};
+ opt.rtc.max = opt.rtc.max || 55; // is this a magic number?
+ opt.rtc.room = opt.rtc.room || Gun.window && (location.hash.slice(1) || location.pathname.slice(1));
opt.announce = function(to){
- root.on('out', {rtc: {id: opt.pid, to:to}}); // announce ourself
+ opt.rtc.start = +new Date; // handle room logic:
+ root.$.get('/RTC/'+opt.rtc.room+'99').get('+').put(opt.pid, function(ack){
+ if(!ack.ok || !ack.ok.rtc){ return }
+ open(ack);
+ }, {acks: opt.rtc.max}).on(function(last,key, msg){
+ if(last === opt.pid || opt.rtc.start > msg.put['>']){ return }
+ open({'#': ''+msg['#'], ok: {rtc: {id: last}}});
+ });
};
+
var mesh = opt.mesh = opt.mesh || Gun.Mesh(root);
root.on('create', function(at){
this.to.next(at);
setTimeout(opt.announce, 1);
});
- root.on('in', function(msg){
- if(msg.rtc){ open(msg) }
- this.to.next(msg);
- });
function open(msg){
- var rtc = msg.rtc, peer, tmp;
- if(!rtc || !rtc.id){ return }
- delete opt.announce[rtc.id]; /// remove after connect
+ if(this && this.off){ this.off() } // Ignore this, because of ask / ack.
+ if(!msg.ok){ return }
+ var rtc = msg.ok.rtc, peer, tmp;
+ if(!rtc || !rtc.id || rtc.id === opt.pid){ return }
+ //console.log("webrtc:", rtc);
if(tmp = rtc.answer){
if(!(peer = opt.peers[rtc.id] || open[rtc.id]) || peer.remoteSet){ return }
- tmp.sdp = tmp.sdp.replace(/\\r\\n/g, '\r\n')
+ tmp.sdp = tmp.sdp.replace(/\\r\\n/g, '\r\n');
return peer.setRemoteDescription(peer.remoteSet = new opt.RTCSessionDescription(tmp));
}
if(tmp = rtc.candidate){
- peer = opt.peers[rtc.id] || open[rtc.id] || open({rtc: {id: rtc.id}});
+ peer = opt.peers[rtc.id] || open[rtc.id] || open({ok: {rtc: {id: rtc.id}}});
return peer.addIceCandidate(new opt.RTCIceCandidate(tmp));
}
//if(opt.peers[rtc.id]){ return }
@@ -64,23 +72,23 @@
(peer = new opt.RTCPeerConnection(opt.rtc)).id = rtc.id;
var wire = peer.wire = peer.createDataChannel('dc', opt.rtc.dataChannel);
open[rtc.id] = peer;
- wire.onclose = function(){
- delete open[rtc.id];
- mesh.bye(peer);
- //reconnect(peer);
- };
- wire.onerror = function(err){};
+ wire.to = setTimeout(function(){delete open[rtc.id]},1000*60);
+ wire.onclose = function(){ mesh.bye(peer) };
+ wire.onerror = function(err){ };
wire.onopen = function(e){
- //delete open[rtc.id];
+ delete open[rtc.id];
mesh.hi(peer);
+ //clearTimeout(wire.to);
+ //delete open[rtc.id];
}
wire.onmessage = function(msg){
if(!msg){ return }
+ console.log(opt.pid, "HEARD FROM WEBRTC:");
mesh.hear(msg.data || msg, peer);
};
peer.onicecandidate = function(e){ // source: EasyRTC!
if(!e.candidate){ return }
- root.on('out', {'@': msg['#'], rtc: {candidate: e.candidate, id: opt.pid}});
+ root.on('out', {'@': msg['#'], ok: {rtc: {candidate: e.candidate, id: opt.pid}}});
}
peer.ondatachannel = function(e){
var rc = e.channel;
@@ -93,16 +101,16 @@
peer.setRemoteDescription(new opt.RTCSessionDescription(tmp));
peer.createAnswer(function(answer){
peer.setLocalDescription(answer);
- root.on('out', {'@': msg['#'], rtc: {answer: answer, id: opt.pid}});
+ root.on('out', {'@': msg['#'], ok: {rtc: {answer: answer, id: opt.pid}}});
}, function(){}, opt.rtc.sdp);
return;
}
peer.createOffer(function(offer){
peer.setLocalDescription(offer);
- root.on('out', {'@': msg['#'], rtc: {offer: offer, id: opt.pid}});
+ root.on('out', {'@': msg['#'], '#': root.ask(open), ok: {rtc: {offer: offer, id: opt.pid}}});
}, function(){}, opt.rtc.sdp);
return peer;
}
});
var noop = function(){};
-}());
+}());
\ No newline at end of file
diff --git a/test/common.js b/test/common.js
index a6b1392d..79356785 100644
--- a/test/common.js
+++ b/test/common.js
@@ -4045,6 +4045,76 @@ describe('Gun', function(){
nopasstun(0, gunB);
nopasstun(done, gunC);
}, 100);
+ });
+
+ it('ack aggregation bypass', function(done){
+ var alice = GUN({localStorage: false});
+ var bob = GUN({localStorage: false});
+ var carl = GUN({localStorage: false});
+
+ var adam = alice.back('opt.mesh');
+ var asay = adam.say;
+
+ var bdam = bob.back('opt.mesh');
+ var bsay = bdam.say;
+
+ var cdam = carl.back('opt.mesh');
+ var csay = cdam.say;
+
+ //console.only.i = 1;
+ adam.say = function(raw, peer){
+ console.only(2, 'adam says:', raw);
+ console.only(1, '...');
+ bdam.hear((raw.length && raw) || JSON.stringify(raw), {});
+ asay(raw, peer);
+ }
+ bdam.say = function(raw, peer){
+ console.only(7, "bob the relay is like YO", raw);
+ adam.hear((raw.length && raw) || JSON.stringify(raw), {});
+ cdam.hear((raw.length && raw) || JSON.stringify(raw), {});
+ bsay(raw, peer);
+ }
+ cdam.say = function(raw, peer){
+ console.only(4, "carl speaks out:", raw);
+ console.only(3, "...");
+ bdam.hear((raw.length && raw) || JSON.stringify(raw), {});
+ csay(raw, peer);
+ }
+
+ carl.on('put', async function(msg){
+ this.to.next(msg);
+
+ var tmp = msg.put;
+
+ //if(Math.random() > 0.5){ return; }
+ //console.log(msg.put);
+
+ //localStorage[tmp['#']+tmp['.']] = tmp[':'];
+
+ setTimeout(function(){
+ carl.on('out', {'@': msg['#']+'', ok: {BANANA: 9}});
+ }, 10);
+ });
+
+ alice.on('get', function(msg){ setTimeout(function(){ Gun.on.get.ack(msg); },9) })
+
+
+ setTimeout(async function(){
+ var pair = await SEA.pair();
+ var user = alice.user();
+ setTimeout(function(){
+ var c = 0;
+ //alice.on('auth', function(){
+ alice.get('test').put({a: 1, b: 2, c: 3}, function(ack){
+ //console.log("my data got saved?", ack);
+
+ if(ack.ok.BANANA && ++c === c){
+ done();
+ }
+ }, {acks: 99});
+ //}); user.auth(pair);
+ },10);
+ }, 100);
});
/*it.only('Make sure circular contexts are not copied', function(done){