From 3211fb86bbc9c0a1fb395ea86a83eb769d834cd9 Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 12 Sep 2016 16:19:53 +0200 Subject: [PATCH] IPFS Pubsub (WIP) --- examples/benchmark.js | 68 +++++++++++++++++-------------------- examples/eventlog-reader.js | 6 ++-- package.json | 8 ++--- src/OrbitDB.js | 39 +++++++++++---------- src/PubSub.js | 63 ---------------------------------- src/cli/index.js | 38 +++++++++++++++++++++ test/client.test.js | 56 +++++++++++++++--------------- test/counterdb.test.js | 2 +- 8 files changed, 124 insertions(+), 156 deletions(-) delete mode 100644 src/PubSub.js create mode 100644 src/cli/index.js diff --git a/examples/benchmark.js b/examples/benchmark.js index eadc4da..b30df9a 100644 --- a/examples/benchmark.js +++ b/examples/benchmark.js @@ -1,8 +1,9 @@ 'use strict'; // const ipfsd = require('ipfsd-ctl'); -const IPFS = require('ipfs') -const ipfsd = require('ipfsd-ctl'); +// const IPFS = require('ipfs') +// const ipfsd = require('ipfsd-ctl'); +const IpfsApi = require('ipfs-api') const OrbitDB = require('../src/OrbitDB'); const Timer = require('./Timer'); @@ -16,17 +17,17 @@ const channelName = process.argv[3] ? process.argv[3] : 'c1'; const startIpfs = () => { return new Promise((resolve, reject) => { - ipfsd.disposableApi((err, ipfs) => { - if(err) console.error(err); - resolve(ipfs); - }); - // ipfsd.local((err, node) => { - // if(err) reject(err); - // node.startDaemon((err, ipfs) => { - // if(err) reject(err); - // resolve(ipfs); - // }); + // ipfsd.disposableApi((err, ipfs) => { + // if(err) console.error(err); + // resolve(ipfs); // }); + ipfsd.local((err, node) => { + if(err) reject(err); + node.startDaemon((err, ipfs) => { + if(err) reject(err); + resolve(ipfs); + }); + }); // const ipfs = new IPFS('/tmp/benchmark') // ipfs.goOnline(() => { // resolve(ipfs) @@ -41,7 +42,6 @@ let totalQueries = 0; let seconds = 0; let queriesPerSecond = 0; let lastTenSeconds = 0; -let store; const queryLoop = (db) => { // let timer = new Timer(); @@ -52,38 +52,32 @@ const queryLoop = (db) => { totalQueries ++; lastTenSeconds ++; queriesPerSecond ++; - process.nextTick(() => queryLoop(db)); + process.nextTick(() => queryLoop(db)) }); }; let run = (() => { // Connect console.log(`Connecting...`) - startIpfs() - .then((ipfs) => OrbitDB.connect(network, username, password, ipfs)) - .then((orbit) => orbit.eventlog(channelName)) - .then((db) => { + const ipfs = IpfsApi('localhost', '5002') + const orbit = new OrbitDB(ipfs, 'benchmark') + const db = orbit.eventlog(channelName) - queryLoop(db); + // Metrics output + setInterval(() => { + seconds ++; + if(seconds % 10 === 0) { + console.log(`--> Average of ${lastTenSeconds/10} q/s in the last 10 seconds`); + if(lastTenSeconds === 0) + throw new Error("Problems!"); + lastTenSeconds = 0; + } + console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`); + queriesPerSecond = 0; + }, 1000); - // Metrics output - setInterval(() => { - seconds ++; - if(seconds % 10 === 0) { - console.log(`--> Average of ${lastTenSeconds/10} q/s in the last 10 seconds`); - if(lastTenSeconds === 0) - throw new Error("Problems!"); - lastTenSeconds = 0; - } - console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`); - queriesPerSecond = 0; - }, 1000); - }) - .catch((e) => { - console.error("error:", e); - console.error(e.stack); - process.exit(1); - }) + // Start + queryLoop(db); })(); module.exports = run; diff --git a/examples/eventlog-reader.js b/examples/eventlog-reader.js index 7df894a..7b1cff1 100644 --- a/examples/eventlog-reader.js +++ b/examples/eventlog-reader.js @@ -9,7 +9,7 @@ const Timer = require('./Timer'); // usage: reader.js // orbit-server -const network = 'QmYPobvobKsyoCKTw476yTui611XABf927KxUPCf4gRLRr'; // 'localhost:3333' +const network = '178.62.241.75:3333'; // 'localhost:3333' const username = process.argv[2] ? process.argv[2] : 'testrunner'; const password = ''; const channelName = process.argv[3] ? process.argv[3] : 'c2'; @@ -28,7 +28,7 @@ let run = (async(() => { try { const ipfs = await(startIpfs()); const orbit = await(OrbitDB.connect(network, username, password, ipfs)); - const db = await(orbit.eventlog(channelName)); + const db = orbit.eventlog(channelName); let count = 1; let running = false; @@ -42,7 +42,7 @@ let run = (async(() => { console.log("---------------------------------------------------") console.log("Timestamp | Value") console.log("---------------------------------------------------") - console.log(items.map((e) => `${e.meta.ts} | ${e.value}`).join("\n")); + console.log(items.map((e) => `${e.payload.meta.ts} | ${e.payload.value}`).join("\n")); console.log("---------------------------------------------------") console.log(`Query #${count} took ${timer2.stop(true)} ms\n`); diff --git a/package.json b/package.json index 2ac3616..210581b 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "url": "https://github.com/haadcode/orbit-db" }, "engines": { - "node": "^4.x.x" + "node": "^6.x.x" }, "main": "src/OrbitDB.js", "dependencies": { @@ -18,7 +18,7 @@ "orbit-db-eventstore": "0.1.1", "orbit-db-feedstore": "0.1.1", "orbit-db-kvstore": "0.1.1", - "socket.io-client": "^1.4.5" + "orbit-db-pubsub": "0.0.1" }, "devDependencies": { "asyncawait": "^1.0.6", @@ -27,8 +27,8 @@ "babel-plugin-transform-runtime": "^6.8.0", "babel-preset-es2015": "^6.6.0", "exports-loader": "^0.6.3", - "ipfs": "^0.13.0", - "ipfs-api": "^6.0.3", + "ipfs": "^0.15.0", + "ipfs-api": "https://github.com/haadcode/js-ipfs-api.git", "ipfsd-ctl": "^0.14.0", "json-loader": "^0.5.4", "lodash": "^4.3.0", diff --git a/src/OrbitDB.js b/src/OrbitDB.js index 7f32dde..8606605 100644 --- a/src/OrbitDB.js +++ b/src/OrbitDB.js @@ -7,14 +7,14 @@ const EventStore = require('orbit-db-eventstore'); const FeedStore = require('orbit-db-feedstore'); const KeyValueStore = require('orbit-db-kvstore'); const CounterStore = require('orbit-db-counterstore'); -const PubSub = require('./PubSub'); +const Pubsub = require('orbit-db-pubsub'); const Cache = require('./Cache'); class OrbitDB { - constructor(ipfs) { + constructor(ipfs, id, options) { this._ipfs = ipfs; - this._pubsub = null; - this.user = null; + this._pubsub = options && options.broker ? new options.broker(ipfs) : new Pubsub(ipfs) + this.user = { id: id } this.network = null; this.events = new EventEmitter(); this.stores = {}; @@ -50,17 +50,16 @@ class OrbitDB { this.network = null; } - _createStore(Store, dbname, options) { - if(!options) options = {}; - const replicate = options.subscribe !== undefined ? options.subscribe : true; - const store = new Store(this._ipfs, this.user.username, dbname, options); + _createStore(Store, dbname, options = { subscribe: true }) { + // if(!options) options = {}; + // const replicate = options.subscribe !== undefined ? options.subscribe : true; + const store = new Store(this._ipfs, this.user.id, dbname, options); this.stores[dbname] = store; - return this._subscribe(store, dbname, replicate, options); + return this._subscribe(store, dbname, options.subscribe, options); } - _subscribe(store, dbname, subscribe, options) { - if(subscribe === undefined) subscribe = true - + _subscribe(store, dbname, subscribe = true, options) { + // if(subscribe === undefined) subscribe = true store.events.on('data', this._onData.bind(this)) store.events.on('write', this._onWrite.bind(this)) store.events.on('close', this._onClose.bind(this)) @@ -139,7 +138,6 @@ class OrbitDB { port = this.network.publishers[0].split(":")[1]; }) .then(() => { - this._pubsub = new PubSub(); logger.debug(`Connecting to network ${hash} (${host}:${port})`); return this._pubsub.connect(host, port, username, password) }) @@ -162,18 +160,19 @@ class OrbitDB { } class OrbitClientFactory { - static connect(network, username, password, ipfs, options) { - if(!options) options = { allowOffline: false }; - + static connect(host, username, password, ipfs, options = { allowOffline: false }) { + // if(!options) options = { allowOffline: false }; if(!ipfs) { logger.error("IPFS instance not provided"); throw new Error("IPFS instance not provided"); } - const client = new OrbitDB(ipfs); - return client._connect(network, username, password, options.allowOffline) - .then(() => client) + const client = new OrbitDB(ipfs, options); + client.user = { username: username, id: username } // TODO: user id from ipfs hash + return Promise.resolve(client) + // return client._connect(host, username, password, options.allowOffline) + // .then(() => client) } } -module.exports = OrbitClientFactory; +module.exports = OrbitDB; diff --git a/src/PubSub.js b/src/PubSub.js deleted file mode 100644 index 5a5efce..0000000 --- a/src/PubSub.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict'; - -const io = require('socket.io-client'); -const logger = require('logplease').create("orbit-db.Pubsub"); - -class Pubsub { - constructor() { - this._socket = null; - this._subscriptions = {}; - } - - connect(host, port, username, password) { - return new Promise((resolve, reject) => { - if(!this._socket) - this._socket = io.connect(`http://${host}:${port}`, { 'forceNew': true }); - - this._socket.on('connect', resolve); - this._socket.on('connect_error', (err) => reject(new Error(`Connection refused to Pubsub at '${host}:${port}'`))); - this._socket.on('disconnect', (socket) => logger.warn(`Disconnected from Pubsub at 'http://${host}:${port}'`)); - this._socket.on('error', (e) => logger.error('Pubsub socket error:', e)); - this._socket.on('message', this._handleMessage.bind(this)); - this._socket.on('subscribed', this._handleSubscribed.bind(this)); - }); - } - - disconnect() { - if(this._socket) - this._socket.disconnect(); - } - - subscribe(hash, callback, onSubscribed, fetchHistory) { - if(!this._subscriptions[hash]) { - this._subscriptions[hash] = { callback: callback, history: fetchHistory, onSubscribed: onSubscribed }; - this._socket.emit('subscribe', { channel: hash }); // calls back with 'subscribed' event - } - } - - unsubscribe(hash) { - if(this._subscriptions[hash]) { - this._socket.emit('unsubscribe', { channel: hash }); - delete this._subscriptions[hash]; - } - } - - publish(hash, message) { - if(this._subscriptions[hash]) - this._socket.send(JSON.stringify({ channel: hash, message: message })); - } - - _handleMessage(hash, message) { - const subscription = this._subscriptions[hash]; - if(subscription && subscription.callback) - subscription.callback(hash, message); - } - - _handleSubscribed(hash, message) { - const subscription = this._subscriptions[hash]; - if(subscription && subscription.history && subscription.onSubscribed) - subscription.onSubscribed(hash, message) - } -} - -module.exports = Pubsub; diff --git a/src/cli/index.js b/src/cli/index.js new file mode 100644 index 0000000..05cb8be --- /dev/null +++ b/src/cli/index.js @@ -0,0 +1,38 @@ +'use strict' + +const IpfsApi = require('ipfs-api') +const OrbitDB = require('../../src/OrbitDB') + +// Usage: node index + +const username = process.argv[2] || 'testrunner' +const dbname = process.argv[3] || 'testdb' +// const data = process.argv[4] || 'hello world ' + new Date().getTime() + +console.log(">", username, dbname) + +const ipfs = IpfsApi('localhost', '5002') +const id = username +const client = new OrbitDB(ipfs, id) +const db = client.eventlog(dbname) + +setInterval(() => { + const data = { + value: "Hello at " + new Date().getTime(), + from: username + } + + db.add(data).then((hash) => { + const result = db.iterator({ limit: 5 }) + .collect() + .map((e) => { + return e.payload.value + }) + + console.log("RESULT:") + result.forEach((e) => { + console.log(JSON.stringify(e)) + }) + console.log("") + }) +}, 1000) diff --git a/test/client.test.js b/test/client.test.js index 2c3a5f4..812cddd 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -7,7 +7,7 @@ const assert = require('assert'); const async = require('asyncawait/async'); const await = require('asyncawait/await'); const ipfsd = require('ipfsd-ctl'); -const IPFS = require('ipfs') +// const IPFS = require('ipfs') const OrbitDB = require('../src/OrbitDB'); const OrbitServer = require('orbit-server/src/server'); @@ -21,40 +21,40 @@ const password = ''; let ipfs, ipfsDaemon; const IpfsApis = [ -{ - // js-ipfs - name: 'js-ipfs', - start: () => { - return new Promise((resolve, reject) => { - const IPFS = require('ipfs') - const ipfs = new IPFS('/tmp/orbitdbtest'); - resolve(ipfs); - // ipfs.goOnline((err) => { - // if(err) reject(err) - // resolve(ipfs) - // }); - }); - }, - stop: () => Promise.resolve() - // stop: () => new Promise((resolve, reject) => ipfs.goOffline(resolve)) -}, +// { +// // js-ipfs +// name: 'js-ipfs', +// start: () => { +// return new Promise((resolve, reject) => { +// const IPFS = require('ipfs') +// const ipfs = new IPFS('/tmp/orbitdbtest'); +// resolve(ipfs); +// // ipfs.goOnline((err) => { +// // if(err) reject(err) +// // resolve(ipfs) +// // }); +// }); +// }, +// stop: () => Promise.resolve() +// // stop: () => new Promise((resolve, reject) => ipfs.goOffline(resolve)) +// }, { // js-ipfs-api via local daemon name: 'js-ipfs-api', start: () => { return new Promise((resolve, reject) => { - ipfsd.disposableApi((err, ipfs) => { - if(err) reject(err); - resolve(ipfs); - }); - // ipfsd.local((err, node) => { + // ipfsd.disposableApi((err, ipfs) => { // if(err) reject(err); - // ipfsDaemon = node; - // ipfsDaemon.startDaemon((err, ipfs) => { - // if(err) reject(err); - // resolve(ipfs); - // }); + // resolve(ipfs); // }); + ipfsd.local((err, node) => { + if(err) reject(err); + ipfsDaemon = node; + ipfsDaemon.startDaemon((err, ipfs) => { + if(err) reject(err); + resolve(ipfs); + }); + }); }); }, stop: () => Promise.resolve() diff --git a/test/counterdb.test.js b/test/counterdb.test.js index 6f95960..c8d6955 100644 --- a/test/counterdb.test.js +++ b/test/counterdb.test.js @@ -8,7 +8,7 @@ const rimraf = require('rimraf') const OrbitDB = require('../src/OrbitDB'); const OrbitServer = require('orbit-server/src/server'); const ipfsd = require('ipfsd-ctl'); -const IPFS = require('ipfs') +// const IPFS = require('ipfs') // Mute logging require('logplease').setLogLevel('ERROR');