Fix a perf bottleneck by commiting lists in batches. 100 tests!

This commit is contained in:
haad 2016-02-22 11:33:38 +02:00
parent 8caf8cbb9a
commit 4c0e8f6a9e
8 changed files with 91 additions and 48 deletions

View File

@ -22,25 +22,28 @@ let run = (async(() => {
let id = 'Log: Query ' let id = 'Log: Query '
let running = false; let running = false;
setInterval(async(() => { // setInterval(async(() => {
if(!running) { // if(!running) {
while(true) {
running = true; running = true;
// let timer = new Timer(true); const key = "Lamb";
channel.put("lamb", "of god" + count); let timer = new Timer(true);
// console.log(`Query #${count} took ${timer.stop(true)} ms\n`); channel.put(key, "Of God " + count);
let v = channel.get("lamb"); let v = channel.get(key);
console.log(`Query #${count} took ${timer.stop(true)} ms\n`);
console.log("---------------------------------------------------") console.log("---------------------------------------------------")
console.log("Id | Seq | Ver | Data") console.log("Key | Value")
console.log("---------------------------------------------------") console.log("---------------------------------------------------")
console.log(v); console.log(`${key} | ${v}`);
console.log("---------------------------------------------------") console.log("---------------------------------------------------")
running = false; running = false;
count ++; count ++;
} }
}), 500); // }
// }), 500);
} catch(e) { } catch(e) {
console.error("error:", e); console.error("error:", e);

View File

@ -7,6 +7,8 @@ const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
const OrbitList = require('./list/OrbitList'); const OrbitList = require('./list/OrbitList');
const HashCacheOps = require('./HashCacheOps'); const HashCacheOps = require('./HashCacheOps');
var Timer = require('../examples/Timer');
const DefaultAmount = 1; const DefaultAmount = 1;
class DataStore { class DataStore {
@ -16,7 +18,7 @@ class DataStore {
} }
add(hash) { add(hash) {
this.list.add(hash); return this.list.add(hash);
} }
join(other) { join(other) {

View File

@ -124,12 +124,9 @@ class OrbitClient {
} }
_createMessage(channel, password, operation, key, value) { _createMessage(channel, password, operation, key, value) {
// Create meta info
const size = -1; const size = -1;
const metaInfo = new MetaInfo(ItemTypes.Message, size, this.user.id, new Date().getTime()); const meta = new MetaInfo(ItemTypes.Message, size, this.user.id, new Date().getTime());
// Create the hash cache item const item = new OrbitDBItem(operation, key, value, meta);
const item = new OrbitDBItem(operation, key, value, metaInfo);
// Save the item to ipfs
const data = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(item))); const data = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(item)));
return data.Hash; return data.Hash;
} }
@ -151,11 +148,12 @@ class OrbitClient {
} }
_createOperation(channel, password, operation, key, value, data) { _createOperation(channel, password, operation, key, value, data) {
let hash = this._createMessage(channel, password, operation, key, value); const hash = this._createMessage(channel, password, operation, key, value);
this._store.add(hash); const res = await(this._store.add(hash));
const listHash = await(this._store.list.getIpfsHash()); const listHash = await(this._store.list.getIpfsHash());
await(this._pubsub.publish(channel, listHash)); await(this._pubsub.publish(channel, listHash));
return key; return key;
// return res;
} }
_deleteChannel(channel, password) { _deleteChannel(channel, password) {

View File

@ -24,13 +24,8 @@ class List {
} }
join(other) { join(other) {
if(other.seq && other.seq > this.seq) { this.seq = (other.seq && other.seq > this.seq ? other.seq : this.seq) + 1;
this.seq = other.seq + 1; this.ver = 0;
this.ver = 0;
} else {
this.seq = this.seq + 1;
this.ver = 0;
}
const current = _.differenceWith(this._currentBatch, this._items, this._equals); const current = _.differenceWith(this._currentBatch, this._items, this._equals);
const others = _.differenceWith(other.items, this._items, this._equals); const others = _.differenceWith(other.items, this._items, this._equals);
const final = _.unionWith(current, others, this._equals); const final = _.unionWith(current, others, this._equals);

View File

@ -7,6 +7,8 @@ const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
const List = require('./List'); const List = require('./List');
const Node = require('./OrbitNode'); const Node = require('./OrbitNode');
const MaxBatchSize = 200;
class OrbitList extends List { class OrbitList extends List {
constructor(id, ipfs) { constructor(id, ipfs) {
super(id); super(id);
@ -19,6 +21,11 @@ class OrbitList extends List {
const node = new Node(this._ipfs, this.id, this.seq, this.ver, data, heads); const node = new Node(this._ipfs, this.id, this.seq, this.ver, data, heads);
this._currentBatch.push(node); this._currentBatch.push(node);
this.ver ++; this.ver ++;
if(this.ver >= MaxBatchSize)
this._commit();
return node.ipfsHash;
} }
clear() { clear() {
@ -27,27 +34,36 @@ class OrbitList extends List {
} }
getIpfsHash() { getIpfsHash() {
return new Promise(async((resolve, reject) => { const list = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(this.toJson())));
const list = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(this.toJson()))); return list.Hash;
resolve(list.Hash);
}));
} }
static fromIpfsHash(ipfs, hash) { static fromIpfsHash(ipfs, hash) {
return new Promise(async((resolve, reject) => { const l = await(ipfsAPI.getObject(ipfs, hash));
const l = await(ipfsAPI.getObject(ipfs, hash)); const list = OrbitList.fromJson(ipfs, JSON.parse(l.Data));
const list = OrbitList.fromJson(ipfs, JSON.parse(l.Data)); return list;
resolve(list);
}));
} }
static fromJson(ipfs, json) { static fromJson(ipfs, json) {
let list = new List(json.id); let list = new List(json.id);
list.seq = json.seq; list.seq = json.seq;
list.ver = json.ver; list.ver = json.ver;
list._items = _.uniqWith(json.items.map((f) => new Node(ipfs, f.id, f.seq, f.ver, f.data, f.next)), _.isEqual); // list._items = _.uniqWith(json.items.map((f) => new Node(ipfs, f.id, f.seq, f.ver, f.data, f.next)), _.isEqual);
list._items = json.items.map((f) => new Node(ipfs, f.id, f.seq, f.ver, f.data, f.next));
return list; return list;
} }
static get batchSize() {
return MaxBatchSize;
}
_commit() {
const current = _.differenceWith(this._currentBatch, this._items, this._equals);
this._items = this._items.concat(current);
this._currentBatch = [];
this.ver = 0;
this.seq ++;
}
} }
module.exports = OrbitList; module.exports = OrbitList;

View File

@ -21,19 +21,23 @@ class OrbitNode extends Node {
return "" + this.id + "." + this.seq + "." + this.ver + "." + this.hash; return "" + this.id + "." + this.seq + "." + this.ver + "." + this.hash;
} }
get ipfsHash() {
if(!this.hash) {
const t = this.compact();
const r = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(t)));
this.hash = r.Hash;
}
return this.hash;
}
getPayload() { getPayload() {
if(!this.Payload) { if(!this.Payload) {
return new Promise(async((resolve, reject) => { const payload = await(ipfsAPI.getObject(this._ipfs, this.data));
const payload = await(ipfsAPI.getObject(this._ipfs, this.data)); this.Payload = JSON.parse(payload.Data);
this.Payload = JSON.parse(payload.Data); if(this.Payload.value) {
if(this.Payload.value) { const value = await(ipfsAPI.getObject(this._ipfs, this.Payload.value));
const value = await(ipfsAPI.getObject(this._ipfs, this.Payload.value)); this.Payload.value = JSON.parse(value.Data)["content"];
this.Payload.value = JSON.parse(value.Data)["content"]; }
}
resolve(this);
}));
} else {
return this;
} }
} }

View File

@ -70,6 +70,7 @@ describe('Orbit Client', () => {
it('adds five items', async((done) => { it('adds five items', async((done) => {
for(let i = 0; i < 5; i ++) { for(let i = 0; i < 5; i ++) {
let hash = db.add('hello'); let hash = db.add('hello');
// console.log(hash)
assert.notEqual(hash, null); assert.notEqual(hash, null);
assert.equal(hash.startsWith('Qm'), true); assert.equal(hash.startsWith('Qm'), true);
assert.equal(hash.length, 46); assert.equal(hash.length, 46);
@ -78,7 +79,7 @@ describe('Orbit Client', () => {
})); }));
it('adds an item that is > 256 bytes', async((done) => { it('adds an item that is > 256 bytes', async((done) => {
let msg = new Buffer(512); let msg = new Buffer(1024);
msg.fill('a') msg.fill('a')
const hash = db.add(msg.toString()); const hash = db.add(msg.toString());
assert.notEqual(hash, null); assert.notEqual(hash, null);

View File

@ -231,6 +231,30 @@ describe('OrbitList', async(function() {
done(); done();
})); }));
it('commits a list after batch size was reached', async((done) => {
const list = new List('A', ipfs);
for(let i = 1; i <= List.batchSize; i ++) {
list.add("hello" + i);
}
assert.equal(list.id, 'A');
assert.equal(list.seq, 1);
assert.equal(list.ver, 0);
assert.equal(list.items.length, List.batchSize);
assert.equal(list._currentBatch.length, 0);
assert.equal(list._items.length, List.batchSize);
const item = list.items[list.items.length - 1];
assert.equal(item.id, 'A');
assert.equal(item.seq, 0);
assert.equal(item.ver, List.batchSize - 1);
assert.equal(item.data, 'hello' + List.batchSize);
assert.equal(item.next, 'A.0.198.QmRKrcfkejCvxTxApZACjHpxzAKKGnCtFi2rD31CT7RkBS');
done();
}));
}); });
describe('join', () => { describe('join', () => {