IPFS Pubsub (WIP)

This commit is contained in:
haad
2016-09-12 16:19:53 +02:00
parent 1a1029fc84
commit 3211fb86bb
8 changed files with 124 additions and 156 deletions

View File

@@ -1,8 +1,9 @@
'use strict';
// const ipfsd = require('ipfsd-ctl');
const IPFS = require('ipfs')
const ipfsd = require('ipfsd-ctl');
// const IPFS = require('ipfs')
// const ipfsd = require('ipfsd-ctl');
const IpfsApi = require('ipfs-api')
const OrbitDB = require('../src/OrbitDB');
const Timer = require('./Timer');
@@ -16,17 +17,17 @@ const channelName = process.argv[3] ? process.argv[3] : 'c1';
const startIpfs = () => {
return new Promise((resolve, reject) => {
ipfsd.disposableApi((err, ipfs) => {
if(err) console.error(err);
resolve(ipfs);
});
// ipfsd.local((err, node) => {
// if(err) reject(err);
// node.startDaemon((err, ipfs) => {
// if(err) reject(err);
// resolve(ipfs);
// });
// ipfsd.disposableApi((err, ipfs) => {
// if(err) console.error(err);
// resolve(ipfs);
// });
ipfsd.local((err, node) => {
if(err) reject(err);
node.startDaemon((err, ipfs) => {
if(err) reject(err);
resolve(ipfs);
});
});
// const ipfs = new IPFS('/tmp/benchmark')
// ipfs.goOnline(() => {
// resolve(ipfs)
@@ -41,7 +42,6 @@ let totalQueries = 0;
let seconds = 0;
let queriesPerSecond = 0;
let lastTenSeconds = 0;
let store;
const queryLoop = (db) => {
// let timer = new Timer();
@@ -52,38 +52,32 @@ const queryLoop = (db) => {
totalQueries ++;
lastTenSeconds ++;
queriesPerSecond ++;
process.nextTick(() => queryLoop(db));
process.nextTick(() => queryLoop(db))
});
};
let run = (() => {
// Connect
console.log(`Connecting...`)
startIpfs()
.then((ipfs) => OrbitDB.connect(network, username, password, ipfs))
.then((orbit) => orbit.eventlog(channelName))
.then((db) => {
const ipfs = IpfsApi('localhost', '5002')
const orbit = new OrbitDB(ipfs, 'benchmark')
const db = orbit.eventlog(channelName)
queryLoop(db);
// Metrics output
setInterval(() => {
seconds ++;
if(seconds % 10 === 0) {
console.log(`--> Average of ${lastTenSeconds/10} q/s in the last 10 seconds`);
if(lastTenSeconds === 0)
throw new Error("Problems!");
lastTenSeconds = 0;
}
console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`);
queriesPerSecond = 0;
}, 1000);
// Metrics output
setInterval(() => {
seconds ++;
if(seconds % 10 === 0) {
console.log(`--> Average of ${lastTenSeconds/10} q/s in the last 10 seconds`);
if(lastTenSeconds === 0)
throw new Error("Problems!");
lastTenSeconds = 0;
}
console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`);
queriesPerSecond = 0;
}, 1000);
})
.catch((e) => {
console.error("error:", e);
console.error(e.stack);
process.exit(1);
})
// Start
queryLoop(db);
})();
module.exports = run;

View File

@@ -9,7 +9,7 @@ const Timer = require('./Timer');
// usage: reader.js <network hash> <username> <channel> <interval in ms>
// orbit-server
const network = 'QmYPobvobKsyoCKTw476yTui611XABf927KxUPCf4gRLRr'; // 'localhost:3333'
const network = '178.62.241.75:3333'; // 'localhost:3333'
const username = process.argv[2] ? process.argv[2] : 'testrunner';
const password = '';
const channelName = process.argv[3] ? process.argv[3] : 'c2';
@@ -28,7 +28,7 @@ let run = (async(() => {
try {
const ipfs = await(startIpfs());
const orbit = await(OrbitDB.connect(network, username, password, ipfs));
const db = await(orbit.eventlog(channelName));
const db = orbit.eventlog(channelName);
let count = 1;
let running = false;
@@ -42,7 +42,7 @@ let run = (async(() => {
console.log("---------------------------------------------------")
console.log("Timestamp | Value")
console.log("---------------------------------------------------")
console.log(items.map((e) => `${e.meta.ts} | ${e.value}`).join("\n"));
console.log(items.map((e) => `${e.payload.meta.ts} | ${e.payload.value}`).join("\n"));
console.log("---------------------------------------------------")
console.log(`Query #${count} took ${timer2.stop(true)} ms\n`);

View File

@@ -9,7 +9,7 @@
"url": "https://github.com/haadcode/orbit-db"
},
"engines": {
"node": "^4.x.x"
"node": "^6.x.x"
},
"main": "src/OrbitDB.js",
"dependencies": {
@@ -18,7 +18,7 @@
"orbit-db-eventstore": "0.1.1",
"orbit-db-feedstore": "0.1.1",
"orbit-db-kvstore": "0.1.1",
"socket.io-client": "^1.4.5"
"orbit-db-pubsub": "0.0.1"
},
"devDependencies": {
"asyncawait": "^1.0.6",
@@ -27,8 +27,8 @@
"babel-plugin-transform-runtime": "^6.8.0",
"babel-preset-es2015": "^6.6.0",
"exports-loader": "^0.6.3",
"ipfs": "^0.13.0",
"ipfs-api": "^6.0.3",
"ipfs": "^0.15.0",
"ipfs-api": "https://github.com/haadcode/js-ipfs-api.git",
"ipfsd-ctl": "^0.14.0",
"json-loader": "^0.5.4",
"lodash": "^4.3.0",

View File

@@ -7,14 +7,14 @@ const EventStore = require('orbit-db-eventstore');
const FeedStore = require('orbit-db-feedstore');
const KeyValueStore = require('orbit-db-kvstore');
const CounterStore = require('orbit-db-counterstore');
const PubSub = require('./PubSub');
const Pubsub = require('orbit-db-pubsub');
const Cache = require('./Cache');
class OrbitDB {
constructor(ipfs) {
constructor(ipfs, id, options) {
this._ipfs = ipfs;
this._pubsub = null;
this.user = null;
this._pubsub = options && options.broker ? new options.broker(ipfs) : new Pubsub(ipfs)
this.user = { id: id }
this.network = null;
this.events = new EventEmitter();
this.stores = {};
@@ -50,17 +50,16 @@ class OrbitDB {
this.network = null;
}
_createStore(Store, dbname, options) {
if(!options) options = {};
const replicate = options.subscribe !== undefined ? options.subscribe : true;
const store = new Store(this._ipfs, this.user.username, dbname, options);
_createStore(Store, dbname, options = { subscribe: true }) {
// if(!options) options = {};
// const replicate = options.subscribe !== undefined ? options.subscribe : true;
const store = new Store(this._ipfs, this.user.id, dbname, options);
this.stores[dbname] = store;
return this._subscribe(store, dbname, replicate, options);
return this._subscribe(store, dbname, options.subscribe, options);
}
_subscribe(store, dbname, subscribe, options) {
if(subscribe === undefined) subscribe = true
_subscribe(store, dbname, subscribe = true, options) {
// if(subscribe === undefined) subscribe = true
store.events.on('data', this._onData.bind(this))
store.events.on('write', this._onWrite.bind(this))
store.events.on('close', this._onClose.bind(this))
@@ -139,7 +138,6 @@ class OrbitDB {
port = this.network.publishers[0].split(":")[1];
})
.then(() => {
this._pubsub = new PubSub();
logger.debug(`Connecting to network ${hash} (${host}:${port})`);
return this._pubsub.connect(host, port, username, password)
})
@@ -162,18 +160,19 @@ class OrbitDB {
}
class OrbitClientFactory {
static connect(network, username, password, ipfs, options) {
if(!options) options = { allowOffline: false };
static connect(host, username, password, ipfs, options = { allowOffline: false }) {
// if(!options) options = { allowOffline: false };
if(!ipfs) {
logger.error("IPFS instance not provided");
throw new Error("IPFS instance not provided");
}
const client = new OrbitDB(ipfs);
return client._connect(network, username, password, options.allowOffline)
.then(() => client)
const client = new OrbitDB(ipfs, options);
client.user = { username: username, id: username } // TODO: user id from ipfs hash
return Promise.resolve(client)
// return client._connect(host, username, password, options.allowOffline)
// .then(() => client)
}
}
module.exports = OrbitClientFactory;
module.exports = OrbitDB;

View File

@@ -1,63 +0,0 @@
'use strict';
const io = require('socket.io-client');
const logger = require('logplease').create("orbit-db.Pubsub");
class Pubsub {
constructor() {
this._socket = null;
this._subscriptions = {};
}
connect(host, port, username, password) {
return new Promise((resolve, reject) => {
if(!this._socket)
this._socket = io.connect(`http://${host}:${port}`, { 'forceNew': true });
this._socket.on('connect', resolve);
this._socket.on('connect_error', (err) => reject(new Error(`Connection refused to Pubsub at '${host}:${port}'`)));
this._socket.on('disconnect', (socket) => logger.warn(`Disconnected from Pubsub at 'http://${host}:${port}'`));
this._socket.on('error', (e) => logger.error('Pubsub socket error:', e));
this._socket.on('message', this._handleMessage.bind(this));
this._socket.on('subscribed', this._handleSubscribed.bind(this));
});
}
disconnect() {
if(this._socket)
this._socket.disconnect();
}
subscribe(hash, callback, onSubscribed, fetchHistory) {
if(!this._subscriptions[hash]) {
this._subscriptions[hash] = { callback: callback, history: fetchHistory, onSubscribed: onSubscribed };
this._socket.emit('subscribe', { channel: hash }); // calls back with 'subscribed' event
}
}
unsubscribe(hash) {
if(this._subscriptions[hash]) {
this._socket.emit('unsubscribe', { channel: hash });
delete this._subscriptions[hash];
}
}
publish(hash, message) {
if(this._subscriptions[hash])
this._socket.send(JSON.stringify({ channel: hash, message: message }));
}
_handleMessage(hash, message) {
const subscription = this._subscriptions[hash];
if(subscription && subscription.callback)
subscription.callback(hash, message);
}
_handleSubscribed(hash, message) {
const subscription = this._subscriptions[hash];
if(subscription && subscription.history && subscription.onSubscribed)
subscription.onSubscribed(hash, message)
}
}
module.exports = Pubsub;

38
src/cli/index.js Normal file
View File

@@ -0,0 +1,38 @@
'use strict'
const IpfsApi = require('ipfs-api')
const OrbitDB = require('../../src/OrbitDB')
// Usage: node index <username> <dbname>
const username = process.argv[2] || 'testrunner'
const dbname = process.argv[3] || 'testdb'
// const data = process.argv[4] || 'hello world ' + new Date().getTime()
console.log(">", username, dbname)
const ipfs = IpfsApi('localhost', '5002')
const id = username
const client = new OrbitDB(ipfs, id)
const db = client.eventlog(dbname)
setInterval(() => {
const data = {
value: "Hello at " + new Date().getTime(),
from: username
}
db.add(data).then((hash) => {
const result = db.iterator({ limit: 5 })
.collect()
.map((e) => {
return e.payload.value
})
console.log("RESULT:")
result.forEach((e) => {
console.log(JSON.stringify(e))
})
console.log("")
})
}, 1000)

View File

@@ -7,7 +7,7 @@ const assert = require('assert');
const async = require('asyncawait/async');
const await = require('asyncawait/await');
const ipfsd = require('ipfsd-ctl');
const IPFS = require('ipfs')
// const IPFS = require('ipfs')
const OrbitDB = require('../src/OrbitDB');
const OrbitServer = require('orbit-server/src/server');
@@ -21,40 +21,40 @@ const password = '';
let ipfs, ipfsDaemon;
const IpfsApis = [
{
// js-ipfs
name: 'js-ipfs',
start: () => {
return new Promise((resolve, reject) => {
const IPFS = require('ipfs')
const ipfs = new IPFS('/tmp/orbitdbtest');
resolve(ipfs);
// ipfs.goOnline((err) => {
// if(err) reject(err)
// resolve(ipfs)
// });
});
},
stop: () => Promise.resolve()
// stop: () => new Promise((resolve, reject) => ipfs.goOffline(resolve))
},
// {
// // js-ipfs
// name: 'js-ipfs',
// start: () => {
// return new Promise((resolve, reject) => {
// const IPFS = require('ipfs')
// const ipfs = new IPFS('/tmp/orbitdbtest');
// resolve(ipfs);
// // ipfs.goOnline((err) => {
// // if(err) reject(err)
// // resolve(ipfs)
// // });
// });
// },
// stop: () => Promise.resolve()
// // stop: () => new Promise((resolve, reject) => ipfs.goOffline(resolve))
// },
{
// js-ipfs-api via local daemon
name: 'js-ipfs-api',
start: () => {
return new Promise((resolve, reject) => {
ipfsd.disposableApi((err, ipfs) => {
if(err) reject(err);
resolve(ipfs);
});
// ipfsd.local((err, node) => {
// ipfsd.disposableApi((err, ipfs) => {
// if(err) reject(err);
// ipfsDaemon = node;
// ipfsDaemon.startDaemon((err, ipfs) => {
// if(err) reject(err);
// resolve(ipfs);
// });
// resolve(ipfs);
// });
ipfsd.local((err, node) => {
if(err) reject(err);
ipfsDaemon = node;
ipfsDaemon.startDaemon((err, ipfs) => {
if(err) reject(err);
resolve(ipfs);
});
});
});
},
stop: () => Promise.resolve()

View File

@@ -8,7 +8,7 @@ const rimraf = require('rimraf')
const OrbitDB = require('../src/OrbitDB');
const OrbitServer = require('orbit-server/src/server');
const ipfsd = require('ipfsd-ctl');
const IPFS = require('ipfs')
// const IPFS = require('ipfs')
// Mute logging
require('logplease').setLogLevel('ERROR');