Move LWW query away from OrbitList to OrbitDB. OrbitList is now just a transport layer.

This commit is contained in:
haad 2016-03-05 10:10:46 +01:00
parent 9c1e811dfe
commit db7e799a87
6 changed files with 117 additions and 121 deletions

View File

@ -38,7 +38,7 @@ let run = (async(() => {
console.log("---------------------------------------------------")
console.log("Key | Value")
console.log("---------------------------------------------------")
console.log(items.map((e) => `${e.payload.key} | ${e.payload.value}`).join("\n"));
console.log(items.map((e) => `${e.key} | ${e.value}`).join("\n"));
console.log("---------------------------------------------------")
console.log(`Query 2 #${count} took ${timer2.stop(true)} ms\n`);

View File

@ -38,7 +38,7 @@ class OrbitClient {
put: (key, data) => this.db.put(channel, password, key, data),
get: (key, options) => {
const items = this._iterator(channel, password, { key: key }).collect();
return items[0] ? items[0].payload.value : null;
return items[0] ? items[0].value : null;
},
leave: () => this._pubsub.unsubscribe(channel)
}

View File

@ -1,5 +1,6 @@
'use strict';
const Lazy = require('lazy.js');
const EventEmitter = require('events').EventEmitter;
const async = require('asyncawait/async');
const await = require('asyncawait/await');
@ -33,12 +34,27 @@ class OrbitDB {
}
/* DB Operations */
read(channel, password, options) {
let opts = options || {};
Object.assign(opts, { amount: opts.limit || 1 });
let messages = await(this._logs[channel].find(opts));
if(opts.reverse) messages.reverse();
return messages;
read(channel, password, opts) {
if(!opts) opts = {};
const operations = Lazy(this._logs[channel].items);
const amount = opts.limit ? (opts.limit > -1 ? opts.limit : this._logs[channel].items.length) : 1;
let result = [];
if(opts.key) {
// Key-Value, search latest key first
result = this._query(operations.reverse(), opts.key, 1, true);
} else if(opts.gt || opts.gte) {
// Greater than case
result = this._query(operations, opts.gt ? opts.gt : opts.gte, amount, opts.gte || opts.lte);
} else {
// Lower than and lastN case, search latest first by reversing the sequence
result = this._query(operations.reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse();
}
if(opts.reverse) result.reverse();
return result.toArray();
}
add(channel, password, data) {
@ -62,6 +78,28 @@ class OrbitDB {
}
/* Private methods */
// The LWW-set
_query(sequence, key, amount, inclusive) {
// Last-Write-Wins, ie. use only the first occurance of the key
let handled = [];
const _createLWWSet = (item) => {
const wasHandled = Lazy(handled).indexOf(item.key) > -1;
if(!wasHandled) handled.push(item.key);
if(Operations.isUpdate(item.op) && !wasHandled) return item;
return null;
};
// Find an items from the sequence (list of operations)
return sequence
.map((f) => await(f.fetchPayload())) // IO - fetch the actual OP from ipfs. consider merging with LL.
.skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested
.drop(!inclusive ? 1 : 0) // Drop the 'gt/lt' item, include 'gte/lte' item
.map(_createLWWSet) // Return items as LWW (ignore values after the first found)
.filter((f) => f !== null) // Make sure we don't have empty ones
.take(amount)
}
_createOperation(channel, password, operation, key, value, data) {
var createOperation = async(() => {
return new Promise(async((resolve, reject) => {

View File

@ -6,8 +6,7 @@ const async = require('asyncawait/async');
const await = require('asyncawait/await');
const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
const List = require('./List');
const Node = require('./OrbitNode');
const Operations = require('./Operations');
const OrbitNode = require('./OrbitNode');
const MaxBatchSize = 10; // How many items per sequence. Saves a snapshot to ipfs in batches of this many items.
const MaxHistory = 1000; // How many items to fetch in the chain per join
@ -24,7 +23,7 @@ class OrbitList extends List {
this._commit();
const heads = List.findHeads(this.items);
const node = new Node(this._ipfs, this.id, this.seq, this.ver, data, heads);
const node = new OrbitNode(this._ipfs, this.id, this.seq, this.ver, data, heads);
this._currentBatch.push(node);
this.ver ++;
}
@ -34,51 +33,13 @@ class OrbitList extends List {
this._fetchHistory(other.items);
}
// The LWW-set query interface
find(opts) {
let list = Lazy(this.items);
const hash = (opts.gt ? opts.gt : (opts.gte ? opts.gte : (opts.lt ? opts.lt : opts.lte)));
const amount = opts.amount ? (opts.amount && opts.amount > -1 ? opts.amount : this.items.length) : 1;
// Last-Write-Wins set
let handled = [];
const _createLWWSet = (f) => {
const wasHandled = Lazy(handled).indexOf(f.payload.key) > -1;
if(!wasHandled) handled.push(f.payload.key);
if(Operations.isUpdate(f.payload.op) && !wasHandled) return f;
return null;
};
// Find an item from the lazy sequence (list)
const _findFrom = (sequence, key, amount, inclusive) => {
return sequence
.map((f) => await(f.fetchPayload())) // IO - fetch the actual OP from ipfs. consider merging with LL.
.skipWhile((f) => key && f.payload.key !== key) // Drop elements until we have the first one requested
.drop(!inclusive ? 1 : 0) // Drop the 'gt/lt' item, include 'gte/lte' item
.map(_createLWWSet) // Return items as LWW (ignore values after the first found)
.filter((f) => f !== null) // Make sure we don't have empty ones
.take(amount)
};
// Key-Value
if(opts.key)
return _findFrom(list.reverse(), opts.key, 1, true).toArray();
// Greater than case
if(opts.gt || opts.gte)
return _findFrom(list, hash, amount, opts.gte || opts.lte).toArray();
// Lower than and lastN case, search latest first by reversing the sequence
return _findFrom(list.reverse(), hash, amount, opts.lte || !opts.lt).reverse().toArray();
}
/* Private methods */
_fetchHistory(items) {
let allHashes = this._items.map((a) => a.hash);
const res = Lazy(items)
.reverse() // Start from the latest item
.map((f) => f.heads).flatten() // Go through all heads
.filter((f) => !(f instanceof Node === true)) // OrbitNode vs. {}, filter out instances (we already have them in mem)
.filter((f) => !(f instanceof OrbitNode === true)) // OrbitNode vs. {}, filter out instances (we already have them in mem)
.map((f) => this._fetchRecursive(f, allHashes)).flatten() // IO - get the data from IPFS
.map((f) => this._insert(f)) // Insert to the list
.take(MaxHistory) // How many items from the history we should fetch
@ -86,12 +47,13 @@ class OrbitList extends List {
// console.log("--> Fetched", res.length, "items from the history\n");
}
// Fetch items in the linked list recursively
_fetchRecursive(hash, all) {
const isReferenced = (list, item) => Lazy(list).find((f) => f === item) !== undefined;
let result = [];
if(!isReferenced(all, hash)) {
all.push(hash);
const item = await(Node.fromIpfsHash(this._ipfs, hash)); // IO - get from IPFS
const item = await(OrbitNode.fromIpfsHash(this._ipfs, hash)); // IO - get from IPFS
result.push(item);
result = result.concat(Lazy(item.heads)
.map((f) => this._fetchRecursive(f, all))
@ -146,7 +108,7 @@ class OrbitList extends List {
}
static fromJson(ipfs, json) {
const items = Object.keys(json.items).map((f) => Node.fromIpfsHash(ipfs, json.items[f]));
const items = Object.keys(json.items).map((f) => await(OrbitNode.fromIpfsHash(ipfs, json.items[f])));
return new OrbitList(ipfs, json.id, json.seq, json.ver, items);
}

View File

@ -14,12 +14,35 @@ class OrbitNode extends Node {
this.hash = hash ? hash : this.ipfsHash;
}
fetchPayload() {
return new Promise(async((resolve, reject) => {
if(!this.Payload) {
const payload = await(ipfsAPI.getObject(this._ipfs, this.data));
this.Payload = JSON.parse(payload.Data);
if(this.Payload.value) {
const value = await(ipfsAPI.getObject(this._ipfs, this.Payload.value));
this.Payload.value = JSON.parse(value.Data)["content"];
}
}
let res = this.Payload;
Object.assign(res, { hash: this.data });
resolve(res);
}));
}
_commit() {
if(!this.hash) {
const r = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(this.asJson)));
this.hash = r.Hash;
}
}
get ipfsHash() {
this._commit();
await(this._commit());
return this.hash;
}
compact() {
get asJson() {
let res = { id: this.id, seq: this.seq, ver: this.ver, data: this.data }
let items = {};
this.next.forEach((f) => Object.defineProperty(items, f.compactId.toString(), { value: f.ipfsHash, enumerable: true }));
@ -27,32 +50,6 @@ class OrbitNode extends Node {
return res;
}
fetchPayload() {
return new Promise(async((resolve, reject) => {
await(this._getPayload());
resolve({ hash: this.data, payload: this.Payload });
}));
}
_getPayload() {
if(!this.Payload) {
const payload = await(ipfsAPI.getObject(this._ipfs, this.data));
this.Payload = JSON.parse(payload.Data);
if(this.Payload.value) {
const value = await(ipfsAPI.getObject(this._ipfs, this.Payload.value));
this.Payload.value = JSON.parse(value.Data)["content"];
}
}
return this.hash;
}
_commit() {
if(!this.hash) {
const r = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(this.compact())));
this.hash = r.Hash;
}
}
static fromIpfsHash(ipfs, hash) {
const createNode = async(() => {
return new Promise(async((resolve, reject) => {

View File

@ -111,10 +111,9 @@ describe('Orbit Client', function() {
db.del(head);
const items = db.iterator().collect();
assert.equal(items.length, 1);
assert.equal(items[0].hash.startsWith('Qm'), true);
assert.equal(items[0].payload.op, 'ADD');
assert.equal(items[0].payload.value, 'hello1');
assert.notEqual(items[0].payload.meta, null);
assert.equal(items[0].op, 'ADD');
assert.equal(items[0].value, 'hello1');
assert.notEqual(items[0].meta, null);
done();
}));
@ -126,9 +125,9 @@ describe('Orbit Client', function() {
const items = db.iterator().collect();
assert.equal(items.length, 1);
assert.equal(items[0].hash.startsWith('Qm'), true);
assert.equal(items[0].payload.op, 'ADD');
assert.equal(items[0].payload.value, 'hello3');
assert.notEqual(items[0].payload.meta, null);
assert.equal(items[0].op, 'ADD');
assert.equal(items[0].value, 'hello3');
assert.notEqual(items[0].meta, null);
done();
}));
});
@ -169,11 +168,11 @@ describe('Orbit Client', function() {
assert.notEqual(next, null);
assert.notEqual(next.hash, null);
assert.equal(next.hash.startsWith('Qm'), true);
assert.notEqual(next.payload, null);
assert.equal(next.payload.op, 'ADD');
assert.equal(next.payload.key.startsWith('Qm'), true);
assert.equal(next.payload.value, 'hello4');
assert.notEqual(next.payload.meta, null);
assert.notEqual(next, null);
assert.equal(next.op, 'ADD');
assert.equal(next.key.startsWith('Qm'), true);
assert.equal(next.value, 'hello4');
assert.notEqual(next.meta, null);
done();
}));
@ -198,9 +197,9 @@ describe('Orbit Client', function() {
const iter = db.iterator();
const first = iter.next().value;
const second = iter.next().value;
assert.equal(first.payload.key, items2[items2.length - 1]);
assert.equal(first.key, items2[items2.length - 1]);
assert.equal(second, null);
assert.equal(first.payload.value, 'hello4');
assert.equal(first.value, 'hello4');
done();
}));
});
@ -226,8 +225,8 @@ describe('Orbit Client', function() {
it('returns all items', async((done) => {
const messages = db.iterator({ limit: -1 }).collect();
assert.equal(messages.length, items.length);
assert.equal(messages[0].payload.value, 'hello0');
assert.equal(messages[messages.length - 1].payload.value, 'hello4');
assert.equal(messages[0].value, 'hello0');
assert.equal(messages[messages.length - 1].value, 'hello4');
done();
}));
@ -266,7 +265,7 @@ describe('Orbit Client', function() {
const iter = db.iterator({ limit: 1 });
const first = iter.next().value;
const second = iter.next().value;
assert.equal(first.payload.key, items2[items.length - 1]);
assert.equal(first.key, items2[items.length - 1]);
assert.equal(second, null);
done();
}));
@ -275,7 +274,7 @@ describe('Orbit Client', function() {
const iter = db.iterator({ limit: 1 });
const first = iter.next().value;
const second = iter.next().value;
assert.equal(first.payload.key, items2[items.length - 1]);
assert.equal(first.key, items2[items.length - 1]);
assert.equal(second, null);
done();
}));
@ -286,9 +285,9 @@ describe('Orbit Client', function() {
const second = iter.next().value;
const third = iter.next().value;
const fourth = iter.next().value;
assert.equal(first.payload.key, items2[items.length - 3]);
assert.equal(second.payload.key, items2[items.length - 2]);
assert.equal(third.payload.key, items2[items.length - 1]);
assert.equal(first.key, items2[items.length - 3]);
assert.equal(second.key, items2[items.length - 2]);
assert.equal(third.key, items2[items.length - 1]);
assert.equal(fourth, null);
done();
}));
@ -296,7 +295,7 @@ describe('Orbit Client', function() {
it('returns all items', async((done) => {
const messages = db.iterator({ limit: -1 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
messages.reverse();
assert.equal(messages.length, items2.length);
@ -307,7 +306,7 @@ describe('Orbit Client', function() {
it('returns all items when limit is bigger than -1', async((done) => {
const messages = db.iterator({ limit: -300 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, items2.length);
assert.equal(messages[0], items2[0]);
@ -317,7 +316,7 @@ describe('Orbit Client', function() {
it('returns all items when limit is bigger than number of items', async((done) => {
const messages = db.iterator({ limit: 300 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, items2.length);
assert.equal(messages[0], items2[0]);
@ -346,10 +345,10 @@ describe('Orbit Client', function() {
it('returns all items reversed', async((done) => {
const messages = db.iterator({ limit: -1, reverse: true })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, items2.length);
assert.equal(messages[0], items2[items.length - 1]);
assert.equal(messages[0], items2[0]);
done();
}));
});
@ -376,7 +375,7 @@ describe('Orbit Client', function() {
it('returns 1 item when gte is the head', async((done) => {
const messages = db.iterator({ gte: _.last(items2), limit: -1 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, 1);
assert.equal(messages[0], items2[items.length -1]);
@ -393,7 +392,7 @@ describe('Orbit Client', function() {
const gte = items2[items2.length - 2];
const messages = db.iterator({ gte: gte, limit: -1 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, 2);
assert.equal(messages[0], items2[items2.length - 2]);
@ -404,7 +403,7 @@ describe('Orbit Client', function() {
it('returns all items when gte is the root item', async((done) => {
const messages = db.iterator({ gte: items2[0], limit: -1 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, items2.length);
assert.equal(messages[0], items2[0]);
@ -415,7 +414,7 @@ describe('Orbit Client', function() {
it('returns items when gt is the root item', async((done) => {
const messages = db.iterator({ gt: items2[0], limit: -1 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, itemCount - 1);
assert.equal(messages[0], items2[1]);
@ -426,13 +425,13 @@ describe('Orbit Client', function() {
it('returns items when gt is defined', async((done) => {
const messages = db.iterator({ limit: -1})
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
const gt = messages[2];
const messages2 = db.iterator({ gt: gt, limit: 100 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages2.length, 2);
assert.equal(messages2[0], messages[messages.length - 2]);
@ -445,7 +444,7 @@ describe('Orbit Client', function() {
it('returns one item after head when lt is the head', async((done) => {
const messages = db.iterator({ lt: _.last(items2) })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, 1);
assert.equal(messages[0], items2[items2.length - 2]);
@ -455,7 +454,7 @@ describe('Orbit Client', function() {
it('returns all items when lt is head and limit is -1', async((done) => {
const messages = db.iterator({ lt: _.last(items2), limit: -1 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, items2.length - 1);
assert.equal(messages[0], items2[0]);
@ -466,7 +465,7 @@ describe('Orbit Client', function() {
it('returns 3 items when lt is head and limit is 3', async((done) => {
const messages = db.iterator({ lt: _.last(items2), limit: 3 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, 3);
assert.equal(messages[0], items2[items2.length - 4]);
@ -483,7 +482,7 @@ describe('Orbit Client', function() {
it('returns one item when lte is the root item', async((done) => {
const messages = db.iterator({ lte: items2[0] })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, 1);
assert.equal(messages[0], items2[0]);
@ -493,7 +492,7 @@ describe('Orbit Client', function() {
it('returns all items when lte is the head', async((done) => {
const messages = db.iterator({ lte: _.last(items2), limit: -1 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, itemCount);
assert.equal(messages[0], items2[0]);
@ -504,7 +503,7 @@ describe('Orbit Client', function() {
it('returns 3 items when lte is the head', async((done) => {
const messages = db.iterator({ lte: _.last(items2), limit: 3 })
.collect()
.map((e) => e.payload.key);
.map((e) => e.key);
assert.equal(messages.length, 3);
assert.equal(messages[0], items2[items2.length - 3]);
@ -538,9 +537,9 @@ describe('Orbit Client', function() {
let all = db.iterator().collect();
assert.equal(all.length, 1);
assert.equal(all[0].hash.startsWith('Qm'), true);
assert.equal(all[0].payload.key, 'key1');
assert.equal(all[0].payload.op, 'PUT');
assert.notEqual(all[0].payload.meta, null);
assert.equal(all[0].key, 'key1');
assert.equal(all[0].op, 'PUT');
assert.notEqual(all[0].meta, null);
done();
}));