Move events to Store from Oplog

This commit is contained in:
haad
2016-05-03 15:12:55 +02:00
parent 6427e57b2a
commit 5706090f35
7 changed files with 121 additions and 89 deletions

45
dist/orbitdb.min.js vendored
View File

@@ -153,9 +153,6 @@ var OrbitDB =
return logger.error(e.stack);
});
}
// TODO: FIX EVENTS!!
}, {
key: '_onWrite',
value: function _onWrite(dbname, hash) {
@@ -10737,7 +10734,7 @@ var OrbitDB =
var _this = this;
this.events.emit('load', this.dbname);
this._oplog = new OperationsLog(this._ipfs, this.dbname, this.events, this.options);
this._oplog = new OperationsLog(this._ipfs, this.dbname, this.options);
return this._oplog.load(id).then(function (merged) {
return _this._index.updateIndex(_this._oplog, merged);
}).then(function () {
@@ -10827,11 +10824,10 @@ var OrbitDB =
var Cache = __webpack_require__(239);
var OperationsLog = function () {
function OperationsLog(ipfs, dbname, events, opts) {
function OperationsLog(ipfs, dbname, opts) {
(0, _classCallCheck3.default)(this, OperationsLog);
this.dbname = dbname;
this.events = events;
this.options = opts || { cacheFile: null };
this._lastWrite = null;
this._ipfs = ipfs;
@@ -37538,6 +37534,10 @@ var OrbitDB =
var _stringify2 = _interopRequireDefault(_stringify);
var _promise = __webpack_require__(1);
var _promise2 = _interopRequireDefault(_promise);
var _classCallCheck2 = __webpack_require__(56);
var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
@@ -37565,8 +37565,15 @@ var OrbitDB =
(0, _createClass3.default)(Cache, null, [{
key: 'set',
value: function set(key, value) {
cache[key] = value;
if (filePath) fs.writeFileSync(filePath, (0, _stringify2.default)(cache, null, 2) + "\n");
return new _promise2.default(function (resolve, reject) {
cache[key] = value;
if (filePath) {
// fs.writeFileSync(filePath, JSON.stringify(cache, null, 2) + "\n");
fs.writeFile(filePath, (0, _stringify2.default)(cache, null, 2) + "\n", resolve);
} else {
resolve();
}
});
}
}, {
key: 'get',
@@ -37576,12 +37583,22 @@ var OrbitDB =
}, {
key: 'loadCache',
value: function loadCache(cacheFile) {
// filePath = cacheFile ? cacheFile : defaultFilepath;
if (cacheFile && fs.existsSync(cacheFile)) {
filePath = cacheFile;
logger.debug('Load cache from ' + cacheFile);
cache = JSON.parse(fs.readFileSync(cacheFile));
}
return new _promise2.default(function (resolve, reject) {
// filePath = cacheFile ? cacheFile : defaultFilepath;
if (cacheFile) {
fs.exists(cacheFile, function (err, res) {
if (res) {
filePath = cacheFile;
logger.debug('Load cache from ' + cacheFile);
cache = JSON.parse(fs.readFileSync(cacheFile));
} else {
resolve();
}
});
} else {
resolve();
}
});
}
}]);
return Cache;

View File

@@ -1,7 +1,5 @@
'use strict';
const await = require('asyncawait/await');
const async = require('asyncawait/async');
const ipfsd = require('ipfsd-ctl');
const OrbitDB = require('../src/OrbitDB');
const Timer = require('./Timer');
@@ -11,10 +9,8 @@ const Timer = require('./Timer');
// orbit-server
const host = process.argv[2] ? process.argv[2] : 'localhost'
const port = 3333;
const username = process.argv[3] ? process.argv[3] : 'testrunner';
const password = '';
const channelName = process.argv[4] ? process.argv[4] : 'c1';
const startIpfs = () => {
@@ -26,59 +22,47 @@ const startIpfs = () => {
});
};
let run = (async(() => {
try {
// Connect
const ipfs = await(startIpfs());
const orbit = await(OrbitDB.connect(host, port, username, password, ipfs));
const db = await(orbit.eventlog(channelName));
let run = (() => {
// Metrics
let totalQueries = 0;
let seconds = 0;
let queriesPerSecond = 0;
let lastTenSeconds = 0;
// Metrics output
setInterval(() => {
seconds ++;
if(seconds % 10 === 0) {
console.log(`--> Average of ${lastTenSeconds/10} q/s in the last 10 seconds`)
if(lastTenSeconds === 0)
throw new Error("Problems!");
lastTenSeconds = 0
}
console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`)
queriesPerSecond = 0;
}, 1000);
const query = async(() => {
// let timer = new Timer();
// timer.start();
try {
await(db.add(username + totalQueries));
// console.log(`${timer.stop(true)} ms`);
const queryLoop = (db) => {
db.add(username + totalQueries).then(() => {
totalQueries ++;
lastTenSeconds ++;
queriesPerSecond ++;
} catch(e) {
console.log(e);
}
process.nextTick(query);
});
process.nextTick(() => queryLoop(db));
});
};
query();
} catch(e) {
console.error("error:", e);
console.error(e.stack);
process.exit(1);
}
}))();
// Connect
console.log(`Connecting...`)
startIpfs()
.then((ipfs) => OrbitDB.connect(host, port, username, password, ipfs))
.then((orbit) => orbit.eventlog(channelName))
.then(queryLoop)
.then(() => {
// Metrics output
setInterval(() => {
seconds ++;
if(seconds % 10 === 0) {
console.log(`--> Average of ${lastTenSeconds/10} q/s in the last 10 seconds`)
if(lastTenSeconds === 0)
throw new Error("Problems!");
lastTenSeconds = 0
}
console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`)
queriesPerSecond = 0;
}, 1000);
})
.catch((e) => {
console.error("error:", e);
console.error(e.stack);
process.exit(1);
})
})();
module.exports = run;

View File

@@ -26609,9 +26609,6 @@
return logger.error(e.stack);
});
}
// TODO: FIX EVENTS!!
}, {
key: '_onWrite',
value: function _onWrite(dbname, hash) {
@@ -35550,7 +35547,7 @@
var _this = this;
this.events.emit('load', this.dbname);
this._oplog = new OperationsLog(this._ipfs, this.dbname, this.events, this.options);
this._oplog = new OperationsLog(this._ipfs, this.dbname, this.options);
return this._oplog.load(id).then(function (merged) {
return _this._index.updateIndex(_this._oplog, merged);
}).then(function () {
@@ -35640,11 +35637,10 @@
var Cache = __webpack_require__(475);
var OperationsLog = function () {
function OperationsLog(ipfs, dbname, events, opts) {
function OperationsLog(ipfs, dbname, opts) {
(0, _classCallCheck3.default)(this, OperationsLog);
this.dbname = dbname;
this.events = events;
this.options = opts || { cacheFile: null };
this._lastWrite = null;
this._ipfs = ipfs;
@@ -62291,6 +62287,10 @@
var _stringify2 = _interopRequireDefault(_stringify);
var _promise = __webpack_require__(297);
var _promise2 = _interopRequireDefault(_promise);
var _classCallCheck2 = __webpack_require__(327);
var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
@@ -62318,8 +62318,15 @@
(0, _createClass3.default)(Cache, null, [{
key: 'set',
value: function set(key, value) {
cache[key] = value;
if (filePath) fs.writeFileSync(filePath, (0, _stringify2.default)(cache, null, 2) + "\n");
return new _promise2.default(function (resolve, reject) {
cache[key] = value;
if (filePath) {
// fs.writeFileSync(filePath, JSON.stringify(cache, null, 2) + "\n");
fs.writeFile(filePath, (0, _stringify2.default)(cache, null, 2) + "\n", resolve);
} else {
resolve();
}
});
}
}, {
key: 'get',
@@ -62329,12 +62336,22 @@
}, {
key: 'loadCache',
value: function loadCache(cacheFile) {
// filePath = cacheFile ? cacheFile : defaultFilepath;
if (cacheFile && fs.existsSync(cacheFile)) {
filePath = cacheFile;
logger.debug('Load cache from ' + cacheFile);
cache = JSON.parse(fs.readFileSync(cacheFile));
}
return new _promise2.default(function (resolve, reject) {
// filePath = cacheFile ? cacheFile : defaultFilepath;
if (cacheFile) {
fs.exists(cacheFile, function (err, res) {
if (res) {
filePath = cacheFile;
logger.debug('Load cache from ' + cacheFile);
cache = JSON.parse(fs.readFileSync(cacheFile));
} else {
resolve();
}
});
} else {
resolve();
}
});
}
}]);
return Cache;

View File

@@ -69,7 +69,6 @@ class OrbitDB {
store.sync(message).catch((e) => logger.error(e.stack));
}
// TODO: FIX EVENTS!!
_onWrite(dbname, hash) {
this._pubsub.publish(dbname, hash);
this.events.emit('data', dbname, hash);

View File

@@ -11,9 +11,15 @@ let cache = {};
class Cache {
static set(key, value) {
cache[key] = value;
if(filePath)
fs.writeFileSync(filePath, JSON.stringify(cache, null, 2) + "\n");
return new Promise((resolve, reject) => {
cache[key] = value;
if(filePath) {
// fs.writeFileSync(filePath, JSON.stringify(cache, null, 2) + "\n");
fs.writeFile(filePath, JSON.stringify(cache, null, 2) + "\n", resolve);
} else {
resolve();
}
})
}
static get(key) {
@@ -21,12 +27,22 @@ class Cache {
}
static loadCache(cacheFile) {
// filePath = cacheFile ? cacheFile : defaultFilepath;
if(cacheFile && fs.existsSync(cacheFile)) {
filePath = cacheFile;
logger.debug('Load cache from ' + cacheFile);
cache = JSON.parse(fs.readFileSync(cacheFile));
}
return new Promise((resolve, reject) => {
// filePath = cacheFile ? cacheFile : defaultFilepath;
if(cacheFile) {
fs.exists(cacheFile, (err, res) => {
if(res) {
filePath = cacheFile;
logger.debug('Load cache from ' + cacheFile);
cache = JSON.parse(fs.readFileSync(cacheFile));
} else {
resolve();
}
});
} else {
resolve();
}
});
}
}

View File

@@ -4,9 +4,8 @@ const Log = require('ipfs-log');
const Cache = require('./Cache');
class OperationsLog {
constructor(ipfs, dbname, events, opts) {
constructor(ipfs, dbname, opts) {
this.dbname = dbname;
this.events = events;
this.options = opts || { cacheFile: null };
this._lastWrite = null;
this._ipfs = ipfs;

View File

@@ -16,7 +16,7 @@ class Store {
use(id) {
this.events.emit('load', this.dbname);
this._oplog = new OperationsLog(this._ipfs, this.dbname, this.events, this.options);
this._oplog = new OperationsLog(this._ipfs, this.dbname, this.options);
return this._oplog.load(id)
.then((merged) => this._index.updateIndex(this._oplog, merged))
.then(() => this.events.emit('readable', this.dbname))