Merge pull request #695 from rogowski/master

AXE with DHT (by soul/peer.id) and radix
This commit is contained in:
Mark Nadal 2019-01-29 04:45:35 -08:00 committed by GitHub
commit 7fb53aa3da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 347 additions and 207 deletions

158
axe.js
View File

@ -39,11 +39,10 @@
var AXE = USE('./root'), Gun = (AXE.window||{}).Gun || USE('./gun', 1);
(Gun.AXE = AXE).GUN = AXE.Gun = Gun;
Gun.on('opt', function(at){
if(!at.axe){
at.axe = {};
var p = at.opt.peers, tmp;
var peers = at.opt.peers, tmp;
// 1. If any remembered peers or from last cache or extension
// 2. Fallback to use hard coded peers from dApp
// 3. Or any offered peers.
@ -61,53 +60,72 @@
// in case the p2p linear latency is high.
// Or there could be plenty of other better options.
console.log("axe");
if(at.opt.super){
function verify(msg, send, at) {
var peers = Object.values(p), puts = Object.keys(msg.put), i, j, peer;
function verify(dht, msg, send, at) {
var puts = Object.keys(msg.put);
var soul = puts[0]; /// TODO: verify all souls in puts. Copy the msg only with subscribed souls?
var subs = Gun.subscribe[soul];
var subs = dht(soul);
// console.log('[AXE] VERIFY soul: %s, subs: %s, Peers: %s, msg: ', soul, subs, Object.keys(peers), msg);
if (!subs) { return; }
for (i=0; i < subs.length; ++i) {
peer = subs[i];
send(msg, peer);
var tmp = [];
Gun.obj.map(subs.split(','), function(pid) {
if (pid in peers) {
tmp.push(pid);
// console.log('[AXE] SEND TO >>>>> ', pid, msg.put.bob || msg.put);
send(msg, peers[pid]);
}
});
/// Only connected peers in the tmp array.
if (at.on.opt.super) {
dht(soul, tmp.join(','));
}
}
var Rad = (Gun.window||{}).Radix || USE('./lib/radix', 1);
at.opt.dht = Rad();
at.on('in', input/*USE('./lib/super', 1)*/, at);
// at.on('out', function(msg, a) {
// this.to.next(msg);
// console.log('[AXE] out:', msg, a);
// }, at);
if(at.opt.super){
AXE.say = function(msg, send, at) {
if (msg.webrtc) {
// console.log('[AXE] MSG WEBRTC: ', msg.webrtc);
if (msg.webrtc.to) {
/// Send announce to one peer only if the msg have 'to' attr
var peer = (at.on.opt.peers) ? at.on.opt.peers[msg.webrtc.to] : null;
// if (peer) { at.on.opt.mesh.say(msg, peer); }
if (peer) { send(msg, peer); }
return;
}
}
if (!msg.put) { send(msg); return; }
//console.log('AXE HOOK!! ', msg);
verify(msg, send, at);
};
}
if(at.opt.super){
at.on('in', USE('./lib/super', 1), at);
var timerGC, queueGC;
at.on('bye', function(peer) {
console.log('Garbage collector triggered by peer.id: ', peer.id);
if (timerGC) { queueGC = true; return; }
timerGC = throttleGC();
});
var throttleGC = function() {
return setTimeout(function() {
GC(Gun.subscribe, p);
clearTimeout(timerGC);
timerGC = null;
if (queueGC) {
queueGC = false;
timerGC = throttleGC();
}
}, 1);
verify(at.on.opt.dht, msg, send, at);
};
} else {
AXE.say = function(msg, send, at) {
if (msg.webrtc) {
// console.log('[AXE] MSG WEBRTC: ', msg.webrtc);
}
if (!msg.put) { send(msg); return; }
verify(at.on.opt.dht, msg, send, at);
/// Always send to superpeers?
Gun.obj.map(at.on.opt.peers, function(peer) {
if (peer.url) {
// console.log('SEND TO SUPERPEER', msg);
send(msg, peer);
}
});
};
var connections = 0;
at.on('hi', function(opt) {
this.to.next(opt);
console.log('AXE PEER [HI]', new Date(), opt.pid);
//console.log('AXE PEER [HI]', new Date(), opt);
connections++;
/// The first connection don't need to resubscribe the nodes.
if (connections === 1) { return; }
/// TODO: resync all nodes in gun/gap
/// Resubscribe all nodes.
setTimeout(function() {
var souls = Object.keys(at.graph);
@ -119,48 +137,56 @@
//location.reload();
}, 500);
}, at);
//at.on('in', input, at);
}
}
this.to.next(at); // make sure to call the "next" middleware adapter.
});
function joindht(dht, soul, pids) {
if (!pids || !soul || !dht) { return; }
var subs = dht(soul);
var tmp = subs ? subs.split(',') : [];
Gun.obj.map(pids.split(','), function(pid) {
if (pid && tmp.indexOf(pid) === -1) { tmp.push(pid); }
});
tmp = tmp.join(',');
dht(soul, tmp);
return tmp;
}
function input(msg){
var at = this.as, to = this.to;
console.log('AXE PEER [IN]: ', msg);
this.to.next(msg);
}
// console.log('[AXE] input: ', msg);
var at = this.as, to = this.to, peer = (msg.mesh||{}).via;
var opt = at.opt;
var dht = opt.dht;
var get = msg.get, soul, key;
if(peer && get){
if(soul = get['#']){
if(key = get['.']){
/// Garbage collector to remove peers subscriptions when disconnect
// var peers = [1,3,5,7,9];
// function shuffle(array) { var tmp, current, top = array.length; if(top) while(--top) { current = Math.floor(Math.random() * (top + 1)); tmp = array[current]; array[current] = array[top]; array[top] = tmp;} return array;}
// for (var peers=[],i=0;i<7000;++i) peers[i]=i;
// peers = shuffle(peers).slice(2000);
// var subscribes = {soula: [1,2,3,4,5,6,7,8,9,0], soulb: [1,2,3,4,5,6,7,8,9,0], soulc: [1,2,3,4,5,6,7,8,9,0]};
// var subscribes = {}; for (var i=0;i<1000;++i) {subscribes['soul_'+i]= (function() {var a=[]; for(var i=0;i<100;i++){a.push(i)} return a;})()}
function GC(subscribes, peers) {
console.time('AXE GC');
var souls = Object.keys(subscribes), soul, i;
var peers = Object.values(peers);
if (souls.length === 0) {return;}
var removed = {};
for (i=0; i < souls.length; ++i) {
soul = souls[i];
// removed[soul] = 0;
var pidx = subscribes[soul].length;
while (pidx--) {
if (peers.indexOf(subscribes[soul][pidx]) === -1) {
// console.log('REMOVED: Soul: %s, peer: ', soul, pidx, subpeers[pidx]);
subscribes[soul].splice(pidx, 1);
// removed[soul]++;
} else {
}
if (!peer.id) {console.log('[*** WARN] no peer.id %s', soul);}
var pids = joindht(dht, soul, peer.id);
if (pids) {
var dht = {};
dht[soul] = pids;
at.opt.mesh.say({dht:dht}, opt.peers[peer.id]);
}
}
if (subscribes[soul].length === 0) { delete subscribes[soul]; }
}
console.timeEnd('AXE GC');
// console.log('[AXE GC] Removed: ', removed);
to.next(msg);
if (opt.webrtc && msg.dht) {
Gun.obj.map(msg.dht, function(pids, soul) {
dht(soul, pids);
Gun.obj.map(pids.split(','), function(pid) {
/// TODO: here we can put an algorithm of who must connect?
if (!pid || pid in opt.peers || pid === opt.pid) { return; }
opt.announce(pid);
});
});
}
}
module.exports = AXE;
})(USE, './axe');
}());

View File

@ -8,8 +8,11 @@
</head>
<body>
<h3 id="pid"></h3>
<script src="../gun.js"></script>
<!-- <script src="../axe.js"></script> -->
<script src="../gun/axe.js"></script>
<script src="../gun/lib/radix.js"></script>
<script src="../gun/lib/webrtc.js"></script>
<!-- <script src="../sea.js"></script> -->
<script>
var pid = location.hash.slice(1);
@ -23,14 +26,17 @@
Gun.on('opt', function(ctx) {
this.to.next(ctx);
ctx.on('hi', function(opt) {
console.log('HI!! PEER', new Date(), opt.pid);
// console.log('HI!! PEER', new Date(), opt.pid);
setTimeout(function() {
document.getElementById('pid').innerHTML = gun._.opt.pid;
});
if (pid) {
ctx.on('out', function(msg) {
msg.pid = pid;
this.to.next(msg);
});
}
// if (pid) {
// ctx.on('out', function(msg) {
// msg.pid = pid;
// this.to.next(msg);
// });
// }
});
var gun = Gun(opt);

22
gun.js
View File

@ -1946,7 +1946,7 @@
return;
}
// add hook for AXE?
if (Gun.AXE && opt && opt.super) { Gun.AXE.say(msg, mesh.say, this); return; } // rogowski
if (Gun.AXE) { Gun.AXE.say(msg, mesh.say, this); return; }
mesh.say(msg);
}
@ -2118,26 +2118,30 @@
tmp = tmp.id = tmp.id || Type.text.random(9);
mesh.say({dam: '?'}, opt.peers[tmp] = peer);
}
if(!tmp.hied){ ctx.on(tmp.hied = 'hi', peer) }
tmp = peer.queue; peer.queue = [];
Type.obj.map(tmp, function(msg){
mesh.say(msg, peer);
});
if(!tmp.hied){ ctx.on(tmp.hied = 'hi', peer); }
// tmp = peer.queue; peer.queue = [];
// Type.obj.map(tmp, function(msg){
// mesh.say(msg, peer);
// });
}
mesh.bye = function(peer){
Type.obj.del(opt.peers, peer.id); // assume if peer.url then reconnect
ctx.on('bye', peer);
}
mesh.hear['!'] = function(msg, peer){ opt.log('Error:', msg.err) }
mesh.hear['?'] = function(msg, peer){
if(!msg.pid){
return mesh.say({dam: '?', pid: opt.pid, '@': msg['#']}, peer);
// return mesh.say({dam: '?', pid: opt.pid, '@': msg['#']}, peer);
mesh.say({dam: '?', pid: opt.pid, '@': msg['#']}, peer);
var tmp = peer.queue; peer.queue = [];
Type.obj.map(tmp, function(msg){
mesh.say(msg, peer);
});
return;
}
peer.id = peer.id || msg.pid;
mesh.hi(peer);
}
return mesh;
}

View File

@ -16,7 +16,7 @@
var rtcpc = opt.RTCPeerConnection || env.RTCPeerConnection || env.webkitRTCPeerConnection || env.mozRTCPeerConnection;
var rtcsd = opt.RTCSessionDescription || env.RTCSessionDescription || env.webkitRTCSessionDescription || env.mozRTCSessionDescription;
var rtcic = opt.RTCIceCandidate || env.RTCIceCandidate || env.webkitRTCIceCandidate || env.mozRTCIceCandidate;
if(!rtcpc || !rtcsd || !rtcic){ return }
if(!rtcpc || !rtcsd || !rtcic){ console.log('[ERROR] WebRTC not found!'); return;}
opt.RTCPeerConnection = rtcpc;
opt.RTCSessionDescription = rtcsd;
opt.RTCIceCandidate = rtcic;
@ -30,12 +30,17 @@
]};
opt.webrtc.dataChannel = opt.webrtc.dataChannel || {ordered: false, maxRetransmits: 2};
opt.webrtc.sdp = opt.webrtc.sdp || {mandatory: {OfferToReceiveAudio: false, OfferToReceiveVideo: false}};
opt.announce = function(to){
//setTimeout(function() {
console.log('[WEBRTC] announce ', opt.pid, to); root.on('out', {webrtc: {id: opt.pid, to:to}});
// }, 1);
}; // announce ourself
var mesh = opt.mesh = opt.mesh || Gun.Mesh(root);
root.on('create', function(at){
this.to.next(at);
setTimeout(function(){ root.on('out', {webrtc: {id: opt.pid}}) },1); // announce ourself
});
// root.on('create', function(at){
// this.to.next(at);
// setTimeout(function(){ root.on('out', {webrtc: {id: opt.pid}}) },1); // announce ourself
// });
root.on('in', function(msg){
if(msg.webrtc){ open(msg) }
this.to.next(msg);
@ -54,6 +59,7 @@
}
if(opt.peers[rtc.id]){ return }
(peer = new opt.RTCPeerConnection(opt.webrtc)).id = rtc.id;
// var wire = peer.wire = peer.createDataChannel(((opt.pid + '->' + msg.webrtc.id) || 'dc'), opt.webrtc.dataChannel);
var wire = peer.wire = peer.createDataChannel('dc', opt.webrtc.dataChannel);
mesh.hi(peer);
wire.onclose = function(){

View File

@ -2,8 +2,8 @@
* AXE test 1
* What we want here: (1) Superpeer and (n) peers
* - The peers receives only the requested data.
* - If the Superpeer crash, must recreate all subscriptions and update the peers.
* - If some peer crash or go offline, when connected again must receive the changes made by others while out.
* - If the Superpeer crash, after restart, must recreate all subscriptions and update the peers.
* - If some peer crash or go offline, must receive the changes via RTC.
*
* Tip: to run this `npm run testaxe`
* Tip 2: if you clone the gun repo, you need to create a link do gun package. Do `npm install && cd node_modules && ln -s ../ gun`
@ -13,15 +13,16 @@ var config = {
IP: require('ip').address(),
port: 8765,
servers: 2,
browsers: 2,
browsers: 3,
route: {
'/': __dirname + '/index.html',
'/gun.js': __dirname + '/../../gun.js',
'/gun/axe.js': __dirname + '/../../axe.js',
'/gun/lib/radix.js': __dirname + '/../../lib/radix.js',
'/gun/lib/webrtc.js': __dirname + '/../../lib/webrtc.js',
'/jquery.js': __dirname + '/../../examples/jquery.js'
}
}
var panic = require('panic-server');
panic.server().on('request', function(req, res){
config.route[req.url] && require('fs').createReadStream(config.route[req.url]).pipe(res);
@ -45,10 +46,10 @@ var server2 = servers.excluding(server).pluck(1);
var browsers = clients.excluding(servers);
var alice = browsers.pluck(1);
var bob = browsers.excluding(alice).pluck(1);
var john = browsers.excluding(alice).excluding(bob).pluck(1);
var again = {};
describe("The Holy Grail AXE Test!", function(){
console.time('TOTAL TEST TIME');
this.timeout(5 * 60 * 1000);
// this.timeout(10 * 60 * 1000);
@ -89,25 +90,35 @@ describe("The Holy Grail AXE Test!", function(){
tests.push(client.run(function(test){
localStorage.clear(); console.log('Clear localStorage!!!');
var env = test.props;
var gun = window.gun = Gun({peers:['http://'+ env.config.IP + ':' + (env.config.port + 1) + '/gun'], wait: 1000});
var opt = {peers:['http://'+ env.config.IP + ':' + (env.config.port + 1) + '/gun'], wait: 1000};
var pid = location.hash.slice(1);
if (pid) { opt.pid = pid; }
Gun.on('opt', function(ctx) {
this.to.next(ctx);
ctx.on('hi', function(opt) {
document.getElementById('pid').innerHTML = (document.getElementById('pid').innerHTML || "-") + ', ' + this.on.opt.pid;
});
});
var gun = window.gun = Gun(opt);
window.ref = gun.get('holy').get('grail');
}, {i: i += 1, config: config}));
});
return Promise.all(tests);
});
it("Wait for Alice and Bob...", function(done){
it("Wait for Alice, Bob and John...", function(done){
setTimeout(done, 1000);
});
it("Alice Write: Hi Bob!", function(){
return alice.run(function(test){
console.log("I AM ALICE");
$('#name').text('Alice');
test.async();
ref.once(function() { // TODO: Need `.once` first for subscription. If Alice do a `.put` before a `.once`, Alice will get old data from localStorage if Bob update
ref.put('Hi Bob!', function(ack) {
console.log(ack);
setTimeout(test.done, 10000);
setTimeout(test.done, 2000);
});
});
});
@ -116,6 +127,7 @@ describe("The Holy Grail AXE Test!", function(){
it("Bob receive ONCE from Alice: Hi Bob!", function(){
return bob.run(function(test){
console.log("I AM BOB");
$('#name').text('Bob');
test.async();
ref.once(function(data){
if('Hi Bob!' === data){
@ -157,10 +169,29 @@ describe("The Holy Grail AXE Test!", function(){
})
});
it("Jhon Read what Bob say to Alice: Hi Alice!", function(){
return john.run(function(test){
test.async();
console.log("I AM JOHN");
$('#name').text('John');
ref.once(function(data){
if('Hi Alice!' === data){
console.log('[OK] John receive the data: ', data);
return test.done();
} else {
//TODO: aqui em duvida.. está pegando do localStorage, mas Bob alterou o dado.
var err = '[FAIL] John receive wrong data: "' + data + '" and must be "Hi Alice!"';
console.log(err);
return test.fail(err);
}
})
})
});
it("Bob Write in some data, Alice not subscribed", function(){
return bob.run(function(test){
test.async();
gun.get('bob').get('mine').put('Alice dont want this data!', function() {
gun.get('bob').get('mine').put('Alice dont want this data now!', function() {
setTimeout(test.done, 2000);
});
});
@ -172,13 +203,17 @@ describe("The Holy Grail AXE Test!", function(){
/// This must be empty, because alice don't make a subscription to this node.
var bobdata = JSON.parse(localStorage.getItem('gun/')).bob;
if (bobdata) {
var err = '[FAIL] Alice receive not subscribed data: ' + JSON.stringify(bobdata);
var err = '[FAIL] Alice receive not subscribed data in localStorage: ' + JSON.stringify(bobdata);
console.log(err);
return test.fail(err);
} else {
}
if (gun._.graph.bob) {
var err = '[FAIL] Alice receive not subscribed data in in graph: ' + JSON.stringify(gun._.graph.bob);
console.log(err);
return test.fail(err);
}
console.log('[OK] Alice Read must NOT receive data from Bob: ', bobdata);
return test.done();
}
})
});
@ -202,12 +237,12 @@ describe("The Holy Grail AXE Test!", function(){
return bob.run(function(test){
test.async();
gun.get('bob').get('mine').put('Alice WANT this data now!', function() {
setTimeout(test.done, 2000);
setTimeout(test.done, 5000);
});
});
});
it("Alice must receive updates from Bob node", function(){
it("Alice must receive 'Alice WANT this data now!' from Bob node", function(){
return alice.run(function(test){
test.async();
if (gun._.graph.bob && gun._.graph.bob.mine === 'Alice WANT this data now!') {
@ -223,7 +258,6 @@ describe("The Holy Grail AXE Test!", function(){
it("Server has crashed!", function(){
return server.run(function(test){
console.log(3);
// var env = test.props;
// try{ require('fs').unlinkSync(env.i+'data'); }catch(e){}
process.exit(0);
@ -231,11 +265,10 @@ describe("The Holy Grail AXE Test!", function(){
});
it("Wait...", function(done){
console.log(4);
setTimeout(done, 2000);
});
it("Alice update the data (superpeer crashed yet).", function(){
it("Alice change the data (superpeer crashed yet).", function(){
return alice.run(function(test){
var env = test.props;
if(window.WebSocket){
@ -247,17 +280,17 @@ describe("The Holy Grail AXE Test!", function(){
}
test.async()
ref.put("Superpeer? Where are you?", function() {
setTimeout(test.done, 100);
setTimeout(test.done, 1000);
});
}, {config: config});
});
it("Bob can't see what Alice change because Superpeer is out.", function(){
it("Bob receive what Alice change via WebRTC.", function(){
return bob.run(function(test){
test.async();
ref.once(function(data){
if('Superpeer? Where are you?' !== data){
console.log('[OK] Bob have old data: ', data);
if('Superpeer? Where are you?' === data){
console.log('[OK] Bob received data via WebRTC: ', data);
return test.done();
} else {
var err = '[FAIL] Bob MUST not receive: "Superpeer? Where are you?", but receive: ' + data;
@ -268,6 +301,39 @@ describe("The Holy Grail AXE Test!", function(){
})
});
it("Bob change the data again (superpeer crashed yet).", function(){
return alice.run(function(test){
var env = test.props;
if(window.WebSocket){
var err;
try{ new WebSocket('http://'+ env.config.IP + ':' + (env.config.port + 2) + '/gun') }catch(e){ err = e }
if(!err){
test.fail("Server did not crash.");
}
}
test.async()
ref.put("Alice, can you hear me?", function() {
setTimeout(test.done, 1000);
});
}, {config: config});
});
it("Alice MUST receive 'Alice, can you hear me?' via WebRTC.", function(){
return bob.run(function(test){
test.async();
ref.once(function(data){
if('Alice, can you hear me?' === data){
console.log('[OK] Alice received data via WebRTC: ', data);
return test.done();
} else {
var err = '[FAIL] Alice MUST not receive: "Superpeer? Where are you?", but receive: ' + data;
console.log(err);
return test.fail(err);
}
})
})
});
it("Superpeer come started again!", function(){
return server2.run(function(test){
var env = test.props;
@ -291,19 +357,28 @@ describe("The Holy Grail AXE Test!", function(){
});
it("Wait sync...", function(done){
console.log(4);
setTimeout(done, 5000);
});
it("Bob now receive what Alice change because Superpeer is on.", function(){
it("Alice change the data again (superpeer is UP!).", function(){
return alice.run(function(test){
var env = test.props;
test.async()
ref.put("Yes Bob! Thanks for asking!", function() {
setTimeout(test.done, 1000);
});
}, {config: config});
});
it("Bob MUST receive 'Yes Bob! Thanks for asking!'", function(){
return bob.run(function(test){
test.async();
ref.once(function(data){
if('Superpeer? Where are you?' === data){
console.log('[OK] Bob have old data: ', data);
if('Yes Bob! Thanks for asking!' === data){
console.log('[OK] Bob received the data change: ', data);
return test.done();
} else {
var err = '[FAIL] Bob MUST not receive: "Superpeer? Where are you?", but receive: ' + data;
var err = '[FAIL] Bob MUST not receive: "Yes Bob! Thanks for asking!", but receive: ' + data;
console.log(err);
return test.fail(err);
}
@ -311,6 +386,26 @@ describe("The Holy Grail AXE Test!", function(){
})
});
it("John dont want to know what Bob say in his node!", function(){
return john.run(function(test){
test.async();
/// This must be empty, because John don't make a subscription to this node.
var bobdata = JSON.parse(localStorage.getItem('gun/')).bob;
if (bobdata) {
var err = '[FAIL] John receive not subscribed data: ' + JSON.stringify(bobdata);
console.log(err);
return test.fail(err);
}
if (gun._.graph.bob) {
var err = '[FAIL] John receive not subscribed data in in graph: ' + JSON.stringify(gun._.graph.bob);
console.log(err);
return test.fail(err);
}
console.log('[OK] John Read must NOT receive data from Bob: ', bobdata, gun._.graph.bob);
return test.done();
})
});
it("All finished!", function(done){
console.log("Done! Cleaning things up...");
setTimeout(function(){
@ -326,6 +421,5 @@ describe("The Holy Grail AXE Test!", function(){
return servers.run(function(){
process.exit();
});
console.timeEnd('TOTAL TEST TIME');
});
});

View File

@ -2,8 +2,12 @@
<script>panic.server(location.origin)</script>
<script src='gun.js'></script>
<script src='gun/axe.js'></script>
<script src="gun/lib/radix.js"></script>
<script src='gun/lib/webrtc.js'></script>
<script src='jquery.js'></script>
<meta charset="utf-8"/>
<h1>Running AXE Tests.</h1>
<h1>Running AXE Tests</h1>
<div><span>Name: </span><span id="name"></span></div>
<div><span>PID: </span><span id="pid"></span></div>
<div id="log"></div>
<!-- <textarea id="print" style="width: 100%; height: 90%; border: 0;"></textarea> -->