From db7e799a875f345fa18645e84d19be00f2135f6d Mon Sep 17 00:00:00 2001 From: haad Date: Sat, 5 Mar 2016 10:10:46 +0100 Subject: [PATCH] Move LWW query away from OrbitList to OrbitDB. OrbitList is now just a transport layer. --- examples/reader.js | 2 +- src/OrbitClient.js | 2 +- src/OrbitDB.js | 50 ++++++++++++++++++++--- src/list/OrbitList.js | 50 +++-------------------- src/list/OrbitNode.js | 53 ++++++++++++------------- test/orbit-client-tests.js | 81 +++++++++++++++++++------------------- 6 files changed, 117 insertions(+), 121 deletions(-) diff --git a/examples/reader.js b/examples/reader.js index a7a19e4..7a1037e 100644 --- a/examples/reader.js +++ b/examples/reader.js @@ -38,7 +38,7 @@ let run = (async(() => { console.log("---------------------------------------------------") console.log("Key | Value") console.log("---------------------------------------------------") - console.log(items.map((e) => `${e.payload.key} | ${e.payload.value}`).join("\n")); + console.log(items.map((e) => `${e.key} | ${e.value}`).join("\n")); console.log("---------------------------------------------------") console.log(`Query 2 #${count} took ${timer2.stop(true)} ms\n`); diff --git a/src/OrbitClient.js b/src/OrbitClient.js index 1e78db4..5255450 100644 --- a/src/OrbitClient.js +++ b/src/OrbitClient.js @@ -38,7 +38,7 @@ class OrbitClient { put: (key, data) => this.db.put(channel, password, key, data), get: (key, options) => { const items = this._iterator(channel, password, { key: key }).collect(); - return items[0] ? items[0].payload.value : null; + return items[0] ? items[0].value : null; }, leave: () => this._pubsub.unsubscribe(channel) } diff --git a/src/OrbitDB.js b/src/OrbitDB.js index 90e0a37..1ab8509 100644 --- a/src/OrbitDB.js +++ b/src/OrbitDB.js @@ -1,5 +1,6 @@ 'use strict'; +const Lazy = require('lazy.js'); const EventEmitter = require('events').EventEmitter; const async = require('asyncawait/async'); const await = require('asyncawait/await'); @@ -33,12 +34,27 @@ class OrbitDB { } /* DB Operations */ - read(channel, password, options) { - let opts = options || {}; - Object.assign(opts, { amount: opts.limit || 1 }); - let messages = await(this._logs[channel].find(opts)); - if(opts.reverse) messages.reverse(); - return messages; + read(channel, password, opts) { + if(!opts) opts = {}; + + const operations = Lazy(this._logs[channel].items); + const amount = opts.limit ? (opts.limit > -1 ? opts.limit : this._logs[channel].items.length) : 1; + + let result = []; + + if(opts.key) { + // Key-Value, search latest key first + result = this._query(operations.reverse(), opts.key, 1, true); + } else if(opts.gt || opts.gte) { + // Greater than case + result = this._query(operations, opts.gt ? opts.gt : opts.gte, amount, opts.gte || opts.lte); + } else { + // Lower than and lastN case, search latest first by reversing the sequence + result = this._query(operations.reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse(); + } + + if(opts.reverse) result.reverse(); + return result.toArray(); } add(channel, password, data) { @@ -62,6 +78,28 @@ class OrbitDB { } /* Private methods */ + + // The LWW-set + _query(sequence, key, amount, inclusive) { + // Last-Write-Wins, ie. use only the first occurance of the key + let handled = []; + const _createLWWSet = (item) => { + const wasHandled = Lazy(handled).indexOf(item.key) > -1; + if(!wasHandled) handled.push(item.key); + if(Operations.isUpdate(item.op) && !wasHandled) return item; + return null; + }; + + // Find an items from the sequence (list of operations) + return sequence + .map((f) => await(f.fetchPayload())) // IO - fetch the actual OP from ipfs. consider merging with LL. + .skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested + .drop(!inclusive ? 1 : 0) // Drop the 'gt/lt' item, include 'gte/lte' item + .map(_createLWWSet) // Return items as LWW (ignore values after the first found) + .filter((f) => f !== null) // Make sure we don't have empty ones + .take(amount) + } + _createOperation(channel, password, operation, key, value, data) { var createOperation = async(() => { return new Promise(async((resolve, reject) => { diff --git a/src/list/OrbitList.js b/src/list/OrbitList.js index 126cb71..6cee262 100644 --- a/src/list/OrbitList.js +++ b/src/list/OrbitList.js @@ -6,8 +6,7 @@ const async = require('asyncawait/async'); const await = require('asyncawait/await'); const ipfsAPI = require('orbit-common/lib/ipfs-api-promised'); const List = require('./List'); -const Node = require('./OrbitNode'); -const Operations = require('./Operations'); +const OrbitNode = require('./OrbitNode'); const MaxBatchSize = 10; // How many items per sequence. Saves a snapshot to ipfs in batches of this many items. const MaxHistory = 1000; // How many items to fetch in the chain per join @@ -24,7 +23,7 @@ class OrbitList extends List { this._commit(); const heads = List.findHeads(this.items); - const node = new Node(this._ipfs, this.id, this.seq, this.ver, data, heads); + const node = new OrbitNode(this._ipfs, this.id, this.seq, this.ver, data, heads); this._currentBatch.push(node); this.ver ++; } @@ -34,51 +33,13 @@ class OrbitList extends List { this._fetchHistory(other.items); } - // The LWW-set query interface - find(opts) { - let list = Lazy(this.items); - const hash = (opts.gt ? opts.gt : (opts.gte ? opts.gte : (opts.lt ? opts.lt : opts.lte))); - const amount = opts.amount ? (opts.amount && opts.amount > -1 ? opts.amount : this.items.length) : 1; - - // Last-Write-Wins set - let handled = []; - const _createLWWSet = (f) => { - const wasHandled = Lazy(handled).indexOf(f.payload.key) > -1; - if(!wasHandled) handled.push(f.payload.key); - if(Operations.isUpdate(f.payload.op) && !wasHandled) return f; - return null; - }; - - // Find an item from the lazy sequence (list) - const _findFrom = (sequence, key, amount, inclusive) => { - return sequence - .map((f) => await(f.fetchPayload())) // IO - fetch the actual OP from ipfs. consider merging with LL. - .skipWhile((f) => key && f.payload.key !== key) // Drop elements until we have the first one requested - .drop(!inclusive ? 1 : 0) // Drop the 'gt/lt' item, include 'gte/lte' item - .map(_createLWWSet) // Return items as LWW (ignore values after the first found) - .filter((f) => f !== null) // Make sure we don't have empty ones - .take(amount) - }; - - // Key-Value - if(opts.key) - return _findFrom(list.reverse(), opts.key, 1, true).toArray(); - - // Greater than case - if(opts.gt || opts.gte) - return _findFrom(list, hash, amount, opts.gte || opts.lte).toArray(); - - // Lower than and lastN case, search latest first by reversing the sequence - return _findFrom(list.reverse(), hash, amount, opts.lte || !opts.lt).reverse().toArray(); - } - /* Private methods */ _fetchHistory(items) { let allHashes = this._items.map((a) => a.hash); const res = Lazy(items) .reverse() // Start from the latest item .map((f) => f.heads).flatten() // Go through all heads - .filter((f) => !(f instanceof Node === true)) // OrbitNode vs. {}, filter out instances (we already have them in mem) + .filter((f) => !(f instanceof OrbitNode === true)) // OrbitNode vs. {}, filter out instances (we already have them in mem) .map((f) => this._fetchRecursive(f, allHashes)).flatten() // IO - get the data from IPFS .map((f) => this._insert(f)) // Insert to the list .take(MaxHistory) // How many items from the history we should fetch @@ -86,12 +47,13 @@ class OrbitList extends List { // console.log("--> Fetched", res.length, "items from the history\n"); } + // Fetch items in the linked list recursively _fetchRecursive(hash, all) { const isReferenced = (list, item) => Lazy(list).find((f) => f === item) !== undefined; let result = []; if(!isReferenced(all, hash)) { all.push(hash); - const item = await(Node.fromIpfsHash(this._ipfs, hash)); // IO - get from IPFS + const item = await(OrbitNode.fromIpfsHash(this._ipfs, hash)); // IO - get from IPFS result.push(item); result = result.concat(Lazy(item.heads) .map((f) => this._fetchRecursive(f, all)) @@ -146,7 +108,7 @@ class OrbitList extends List { } static fromJson(ipfs, json) { - const items = Object.keys(json.items).map((f) => Node.fromIpfsHash(ipfs, json.items[f])); + const items = Object.keys(json.items).map((f) => await(OrbitNode.fromIpfsHash(ipfs, json.items[f]))); return new OrbitList(ipfs, json.id, json.seq, json.ver, items); } diff --git a/src/list/OrbitNode.js b/src/list/OrbitNode.js index 9c0224e..a77a1ad 100644 --- a/src/list/OrbitNode.js +++ b/src/list/OrbitNode.js @@ -14,12 +14,35 @@ class OrbitNode extends Node { this.hash = hash ? hash : this.ipfsHash; } + fetchPayload() { + return new Promise(async((resolve, reject) => { + if(!this.Payload) { + const payload = await(ipfsAPI.getObject(this._ipfs, this.data)); + this.Payload = JSON.parse(payload.Data); + if(this.Payload.value) { + const value = await(ipfsAPI.getObject(this._ipfs, this.Payload.value)); + this.Payload.value = JSON.parse(value.Data)["content"]; + } + } + let res = this.Payload; + Object.assign(res, { hash: this.data }); + resolve(res); + })); + } + + _commit() { + if(!this.hash) { + const r = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(this.asJson))); + this.hash = r.Hash; + } + } + get ipfsHash() { - this._commit(); + await(this._commit()); return this.hash; } - compact() { + get asJson() { let res = { id: this.id, seq: this.seq, ver: this.ver, data: this.data } let items = {}; this.next.forEach((f) => Object.defineProperty(items, f.compactId.toString(), { value: f.ipfsHash, enumerable: true })); @@ -27,32 +50,6 @@ class OrbitNode extends Node { return res; } - fetchPayload() { - return new Promise(async((resolve, reject) => { - await(this._getPayload()); - resolve({ hash: this.data, payload: this.Payload }); - })); - } - - _getPayload() { - if(!this.Payload) { - const payload = await(ipfsAPI.getObject(this._ipfs, this.data)); - this.Payload = JSON.parse(payload.Data); - if(this.Payload.value) { - const value = await(ipfsAPI.getObject(this._ipfs, this.Payload.value)); - this.Payload.value = JSON.parse(value.Data)["content"]; - } - } - return this.hash; - } - - _commit() { - if(!this.hash) { - const r = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(this.compact()))); - this.hash = r.Hash; - } - } - static fromIpfsHash(ipfs, hash) { const createNode = async(() => { return new Promise(async((resolve, reject) => { diff --git a/test/orbit-client-tests.js b/test/orbit-client-tests.js index 9107d65..9ea02a9 100644 --- a/test/orbit-client-tests.js +++ b/test/orbit-client-tests.js @@ -111,10 +111,9 @@ describe('Orbit Client', function() { db.del(head); const items = db.iterator().collect(); assert.equal(items.length, 1); - assert.equal(items[0].hash.startsWith('Qm'), true); - assert.equal(items[0].payload.op, 'ADD'); - assert.equal(items[0].payload.value, 'hello1'); - assert.notEqual(items[0].payload.meta, null); + assert.equal(items[0].op, 'ADD'); + assert.equal(items[0].value, 'hello1'); + assert.notEqual(items[0].meta, null); done(); })); @@ -126,9 +125,9 @@ describe('Orbit Client', function() { const items = db.iterator().collect(); assert.equal(items.length, 1); assert.equal(items[0].hash.startsWith('Qm'), true); - assert.equal(items[0].payload.op, 'ADD'); - assert.equal(items[0].payload.value, 'hello3'); - assert.notEqual(items[0].payload.meta, null); + assert.equal(items[0].op, 'ADD'); + assert.equal(items[0].value, 'hello3'); + assert.notEqual(items[0].meta, null); done(); })); }); @@ -169,11 +168,11 @@ describe('Orbit Client', function() { assert.notEqual(next, null); assert.notEqual(next.hash, null); assert.equal(next.hash.startsWith('Qm'), true); - assert.notEqual(next.payload, null); - assert.equal(next.payload.op, 'ADD'); - assert.equal(next.payload.key.startsWith('Qm'), true); - assert.equal(next.payload.value, 'hello4'); - assert.notEqual(next.payload.meta, null); + assert.notEqual(next, null); + assert.equal(next.op, 'ADD'); + assert.equal(next.key.startsWith('Qm'), true); + assert.equal(next.value, 'hello4'); + assert.notEqual(next.meta, null); done(); })); @@ -198,9 +197,9 @@ describe('Orbit Client', function() { const iter = db.iterator(); const first = iter.next().value; const second = iter.next().value; - assert.equal(first.payload.key, items2[items2.length - 1]); + assert.equal(first.key, items2[items2.length - 1]); assert.equal(second, null); - assert.equal(first.payload.value, 'hello4'); + assert.equal(first.value, 'hello4'); done(); })); }); @@ -226,8 +225,8 @@ describe('Orbit Client', function() { it('returns all items', async((done) => { const messages = db.iterator({ limit: -1 }).collect(); assert.equal(messages.length, items.length); - assert.equal(messages[0].payload.value, 'hello0'); - assert.equal(messages[messages.length - 1].payload.value, 'hello4'); + assert.equal(messages[0].value, 'hello0'); + assert.equal(messages[messages.length - 1].value, 'hello4'); done(); })); @@ -266,7 +265,7 @@ describe('Orbit Client', function() { const iter = db.iterator({ limit: 1 }); const first = iter.next().value; const second = iter.next().value; - assert.equal(first.payload.key, items2[items.length - 1]); + assert.equal(first.key, items2[items.length - 1]); assert.equal(second, null); done(); })); @@ -275,7 +274,7 @@ describe('Orbit Client', function() { const iter = db.iterator({ limit: 1 }); const first = iter.next().value; const second = iter.next().value; - assert.equal(first.payload.key, items2[items.length - 1]); + assert.equal(first.key, items2[items.length - 1]); assert.equal(second, null); done(); })); @@ -286,9 +285,9 @@ describe('Orbit Client', function() { const second = iter.next().value; const third = iter.next().value; const fourth = iter.next().value; - assert.equal(first.payload.key, items2[items.length - 3]); - assert.equal(second.payload.key, items2[items.length - 2]); - assert.equal(third.payload.key, items2[items.length - 1]); + assert.equal(first.key, items2[items.length - 3]); + assert.equal(second.key, items2[items.length - 2]); + assert.equal(third.key, items2[items.length - 1]); assert.equal(fourth, null); done(); })); @@ -296,7 +295,7 @@ describe('Orbit Client', function() { it('returns all items', async((done) => { const messages = db.iterator({ limit: -1 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); messages.reverse(); assert.equal(messages.length, items2.length); @@ -307,7 +306,7 @@ describe('Orbit Client', function() { it('returns all items when limit is bigger than -1', async((done) => { const messages = db.iterator({ limit: -300 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, items2.length); assert.equal(messages[0], items2[0]); @@ -317,7 +316,7 @@ describe('Orbit Client', function() { it('returns all items when limit is bigger than number of items', async((done) => { const messages = db.iterator({ limit: 300 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, items2.length); assert.equal(messages[0], items2[0]); @@ -346,10 +345,10 @@ describe('Orbit Client', function() { it('returns all items reversed', async((done) => { const messages = db.iterator({ limit: -1, reverse: true }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, items2.length); - assert.equal(messages[0], items2[items.length - 1]); + assert.equal(messages[0], items2[0]); done(); })); }); @@ -376,7 +375,7 @@ describe('Orbit Client', function() { it('returns 1 item when gte is the head', async((done) => { const messages = db.iterator({ gte: _.last(items2), limit: -1 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, 1); assert.equal(messages[0], items2[items.length -1]); @@ -393,7 +392,7 @@ describe('Orbit Client', function() { const gte = items2[items2.length - 2]; const messages = db.iterator({ gte: gte, limit: -1 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, 2); assert.equal(messages[0], items2[items2.length - 2]); @@ -404,7 +403,7 @@ describe('Orbit Client', function() { it('returns all items when gte is the root item', async((done) => { const messages = db.iterator({ gte: items2[0], limit: -1 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, items2.length); assert.equal(messages[0], items2[0]); @@ -415,7 +414,7 @@ describe('Orbit Client', function() { it('returns items when gt is the root item', async((done) => { const messages = db.iterator({ gt: items2[0], limit: -1 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, itemCount - 1); assert.equal(messages[0], items2[1]); @@ -426,13 +425,13 @@ describe('Orbit Client', function() { it('returns items when gt is defined', async((done) => { const messages = db.iterator({ limit: -1}) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); const gt = messages[2]; const messages2 = db.iterator({ gt: gt, limit: 100 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages2.length, 2); assert.equal(messages2[0], messages[messages.length - 2]); @@ -445,7 +444,7 @@ describe('Orbit Client', function() { it('returns one item after head when lt is the head', async((done) => { const messages = db.iterator({ lt: _.last(items2) }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, 1); assert.equal(messages[0], items2[items2.length - 2]); @@ -455,7 +454,7 @@ describe('Orbit Client', function() { it('returns all items when lt is head and limit is -1', async((done) => { const messages = db.iterator({ lt: _.last(items2), limit: -1 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, items2.length - 1); assert.equal(messages[0], items2[0]); @@ -466,7 +465,7 @@ describe('Orbit Client', function() { it('returns 3 items when lt is head and limit is 3', async((done) => { const messages = db.iterator({ lt: _.last(items2), limit: 3 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, 3); assert.equal(messages[0], items2[items2.length - 4]); @@ -483,7 +482,7 @@ describe('Orbit Client', function() { it('returns one item when lte is the root item', async((done) => { const messages = db.iterator({ lte: items2[0] }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, 1); assert.equal(messages[0], items2[0]); @@ -493,7 +492,7 @@ describe('Orbit Client', function() { it('returns all items when lte is the head', async((done) => { const messages = db.iterator({ lte: _.last(items2), limit: -1 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, itemCount); assert.equal(messages[0], items2[0]); @@ -504,7 +503,7 @@ describe('Orbit Client', function() { it('returns 3 items when lte is the head', async((done) => { const messages = db.iterator({ lte: _.last(items2), limit: 3 }) .collect() - .map((e) => e.payload.key); + .map((e) => e.key); assert.equal(messages.length, 3); assert.equal(messages[0], items2[items2.length - 3]); @@ -538,9 +537,9 @@ describe('Orbit Client', function() { let all = db.iterator().collect(); assert.equal(all.length, 1); assert.equal(all[0].hash.startsWith('Qm'), true); - assert.equal(all[0].payload.key, 'key1'); - assert.equal(all[0].payload.op, 'PUT'); - assert.notEqual(all[0].payload.meta, null); + assert.equal(all[0].key, 'key1'); + assert.equal(all[0].op, 'PUT'); + assert.notEqual(all[0].meta, null); done(); }));