Merge pull request #735 from amark/feature-multicast

Multicast transport
This commit is contained in:
Mark Nadal 2019-04-16 16:45:42 -07:00 committed by GitHub
commit 1e20fd5761
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 143 additions and 33 deletions

11
gun.js
View File

@ -1976,9 +1976,8 @@
if(!raw){ return }
var dup = ctx.dup, id, hash, msg, tmp = raw[0];
if(opt.pack <= raw.length){ return mesh.say({dam: '!', err: "Message too big!"}, peer) }
try{msg = JSON.parse(raw);
}catch(e){opt.log('DAM JSON parse error', e)}
if('{' === tmp){
try{msg = JSON.parse(raw);}catch(e){opt.log('DAM JSON parse error', e)}
if(!msg){ return }
if(dup.check(id = msg['#'])){ return }
dup.track(id, true).it = msg; // GUN core also dedups, so `true` is needed.
@ -2004,6 +2003,7 @@
return;
} else
if('[' === tmp){
try{msg = JSON.parse(raw);}catch(e){opt.log('DAM JSON parse error', e)}
if(!msg){ return }
var i = 0, m;
while(m = msg[i++]){
@ -2063,11 +2063,12 @@
function send(raw, peer){
var wire = peer.wire;
try{
if(wire.send){
wire.send(raw);
} else
if(peer.say){
peer.say(raw);
} else
if(wire.send){
if(wire.readyState && 1 != wire.readyState){ return }
wire.send(raw);
}
}catch(e){
(peer.queue = peer.queue || []).push(raw);

45
lib/ipfs.js Normal file
View File

@ -0,0 +1,45 @@
var opt = gun._.opt, u;
if (u === opt.ipfs.directory) {
opt.ipfs.directory = '/gun';
}
opt.store = {};
opt.store.put = function(file, data, cb){
var uri = opt.ipfs.directory + '/' + file;
opt.ipfs.instance.files.write(uri, Buffer.from(JSON.stringify(data)), {create:true})
.then(res => {
console.log('File written to IPFS directory', uri, res);
return opt.ipfs.instance.files.stat(opt.ipfs.directory, {hash:true});
}).then(res => {
console.log('Directory hash:', res.hash);
return opt.ipfs.instance.name.publish(res.hash);
// currently throws "This command must be run in online mode. Try running 'ipfs daemon' first." for some reason, maybe js-ipfs IPNS not ready yet
}).then(res => {
console.log('IPFS put request successful:', res);
cb(undefined, 1);
}).catch(error => {
console.error('IPFS put request failed', error);
});
}
opt.store.get = function(file, cb){
var uri = opt.ipfs.directory + '/' + file;
opt.ipfs.instance.files.read(uri, {})
.then(res => {
var data = JSON.parse(res.toString());
console.log(uri + ' was loaded from ipfs:', data);
cb(data);
});
}
opt.store.list = function(cb){
var stream = opt.ipfs.files.lsReadableStream(opt.ipfs.directory);
stream.on('data', (file) => {
console.log('ls', file.name);
if (cb(file.name)) {
stream.destroy();
}
});
stream.on('finish', () => {
cb();
});
}

63
lib/multicast.js Normal file
View File

@ -0,0 +1,63 @@
var Gun = (typeof window !== "undefined")? window.Gun : require('../gun');
Gun.on('create', function(root){
this.to.next(root);
var opt = root.opt;
if(false === opt.multicast){ return }
var udp = opt.multicast = opt.multicast || {};
udp.address = udp.address || '233.255.255.255';
udp.pack = udp.pack || 50000; // UDP messages limited to 65KB.
udp.port = udp.port || 23456;
var noop = function(){}, port;
var dgram = require("dgram");
var socket = dgram.createSocket({type: "udp4", reuseAddr: true});
socket.bind(udp.port);
socket.on("listening", function() {
socket.addMembership(udp.address);
udp.peer = {url: udp.address + ':' + udp.port, wire: socket};
udp.peer.say = function(raw){
var buf = Buffer.from(raw, 'utf8');
if(udp.pack <= buf.length){ // message too big!!!
return;
}
socket.send(buf, 0, buf.length, udp.port, udp.address, noop);
}
opt.mesh.hi(udp.peer);
console.log('multicasting on', udp.peer.url);
return; // below code only needed for when WebSocket connections desired!
setInterval(function broadcast(){
port = port || (opt.web && opt.web.address()||{}).port;
if(!port){ return }
udp.peer.say(JSON.stringify({id: opt.pid || (opt.pid = Math.random().toString(36).slice(2)), port: port}));
}, 1000);
});
socket.on("message", function(raw, info) { try {
if(!raw){ return }
raw = raw.toString('utf8');
opt.mesh.hear(raw, udp.peer);
return; // below code only needed for when WebSocket connections desired!
var message;
message = JSON.parse(raw.toString('utf8'));
if(opt.pid === message.id){ return } // ignore self
var url = 'http://' + info.address + ':' + (port || (opt.web && opt.web.address()||{}).port) + '/gun';
if(root.opt.peers[url]){ return }
console.log('discovered', url, message, info);
root.$.opt(url);
} catch(e){
console.log('multicast error', e, raw);
return;
} });
});

View File

@ -16,6 +16,7 @@
//try{require('../axe');}catch(e){}
require('./file');
require('./evict');
require('./multicast');
if('debug' === process.env.GUN_ENV){ require('./debug') }
module.exports = Gun;
}());

View File

@ -74,7 +74,7 @@ Gun.on('opt', function(root){
wire.on('message', function(msg){
opt.mesh.hear(msg.data || msg, peer);
});
wire.on('close', function(){
wire.on('close', function(a,b,c){
opt.mesh.bye(peer);
});
wire.on('error', function(e){});

8
package-lock.json generated
View File

@ -1,6 +1,6 @@
{
"name": "gun",
"version": "0.9.999999",
"version": "0.2019.413",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@ -1246,9 +1246,9 @@
"dev": true
},
"ws": {
"version": "5.2.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-5.2.0.tgz",
"integrity": "sha512-c18dMeW+PEQdDFzkhDsnBAlS4Z8KGStBQQUcQ5mf7Nf689jyGk0594L+i9RaQuf4gog6SvWLJorz2NfSaqxZ7w==",
"version": "6.2.1",
"resolved": "https://registry.npmjs.org/ws/-/ws-6.2.1.tgz",
"integrity": "sha512-GIyAXC2cB7LjvpgMt9EKS2ldqr0MTrORaleiOno6TweZ6r3TKtoFQWay/2PceJ3RuBasOHzXNn5Lrw1X0bEjqA==",
"requires": {
"async-limiter": "~1.0.0"
}

View File

@ -50,7 +50,7 @@
"node": ">=0.8.4"
},
"dependencies": {
"ws": "~>5.2.0"
"ws": "~>6.2.1"
},
"optionalDependencies": {
"text-encoding": "^0.7.0",