From c4e245d3d175c01ac857300f615b21afefd280b3 Mon Sep 17 00:00:00 2001 From: haad Date: Fri, 6 May 2016 13:22:41 +0200 Subject: [PATCH] Merge oplog to store --- src/OrbitDB.js | 16 ++++--- src/PubSub.js | 3 +- src/oplog/Cache.js | 10 ++++- src/oplog/OperationsLog.js | 3 +- src/stores/Store.js | 66 +++++++++++++++++++++++++---- src/stores/counters/CounterIndex.js | 11 +++-- src/stores/eventlog/EventIndex.js | 12 +++--- src/stores/kvstore/KeyValueIndex.js | 12 +++--- test/counterdb.test.js | 4 +- 9 files changed, 98 insertions(+), 39 deletions(-) diff --git a/src/OrbitDB.js b/src/OrbitDB.js index 464f27b..8e85ebd 100644 --- a/src/OrbitDB.js +++ b/src/OrbitDB.js @@ -8,7 +8,7 @@ const KeyValueStore = require('./stores/kvstore/KeyValueStore'); const EventStore = require('./stores/eventlog/EventStore'); class OrbitDB { - constructor(ipfs, options) { + constructor(ipfs) { this._ipfs = ipfs; this._pubsub = null; this.user = null; @@ -18,7 +18,9 @@ class OrbitDB { } eventlog(dbname, options) { - if(!options) options = { subscribe: true }; + if(!options) options = {}; + if(options.subscribe === undefined) Object.assign(options, { subscribe: true }); + const store = new EventStore(this._ipfs, this.user.username, dbname, options); return this._subscribe(store, dbname, options.subscribe) .then(() => this.stores[dbname] = store) @@ -26,7 +28,9 @@ class OrbitDB { } kvstore(dbname, options) { - if(!options) options = { subscribe: true }; + if(!options) options = {}; + if(options.subscribe === undefined) Object.assign(options, { subscribe: true }); + const store = new KeyValueStore(this._ipfs, this.user.username, dbname, options); return this._subscribe(store, dbname, options.subscribe) .then(() => this.stores[dbname] = store) @@ -34,7 +38,9 @@ class OrbitDB { } counter(dbname, options) { - if(!options) options = { subscribe: true }; + if(!options) options = {}; + if(options.subscribe === undefined) Object.assign(options, { subscribe: true }); + const store = new CounterStore(this._ipfs, this.user.username, dbname, options); return this._subscribe(store, dbname, options.subscribe) .then(() => this.stores[dbname] = store) @@ -144,7 +150,7 @@ class OrbitClientFactory { throw new Error("IPFS instance not provided"); } - const client = new OrbitDB(ipfs, options); + const client = new OrbitDB(ipfs); return client._connect(network, username, password, options.allowOffline) .then(() => client) } diff --git a/src/PubSub.js b/src/PubSub.js index 1b85d87..6954609 100644 --- a/src/PubSub.js +++ b/src/PubSub.js @@ -41,7 +41,8 @@ class Pubsub { } publish(hash, message) { - this._socket.send(JSON.stringify({ channel: hash, message: message })); + if(this._subscriptions[hash]) + this._socket.send(JSON.stringify({ channel: hash, message: message })); } _handleMessage(hash, message) { diff --git a/src/oplog/Cache.js b/src/oplog/Cache.js index 69ae8a4..d5e8452 100644 --- a/src/oplog/Cache.js +++ b/src/oplog/Cache.js @@ -30,11 +30,13 @@ class Cache { return new Promise((resolve, reject) => { // filePath = cacheFile ? cacheFile : defaultFilepath; if(cacheFile) { - fs.exists(cacheFile, (err, res) => { + filePath = cacheFile; + fs.exists(cacheFile, (res) => { + logger.debug(res); if(res) { - filePath = cacheFile; logger.debug('Load cache from ' + cacheFile); cache = JSON.parse(fs.readFileSync(cacheFile)); + resolve(); } else { resolve(); } @@ -44,6 +46,10 @@ class Cache { } }); } + + static reset() { + cache = {}; + } } module.exports = Cache; \ No newline at end of file diff --git a/src/oplog/OperationsLog.js b/src/oplog/OperationsLog.js index c3aa2fa..3c27767 100644 --- a/src/oplog/OperationsLog.js +++ b/src/oplog/OperationsLog.js @@ -31,7 +31,6 @@ class OperationsLog extends Log { // ts: new Date().getTime() // } // }; - let node, logHash; return super.add(entry) .then((op) => node = op) @@ -66,7 +65,7 @@ class OperationsLog extends Log { // if(!hash || hash === this._lastWrite) // return Promise.resolve([]); - const oldCount = this.items.length; + // const oldCount = this.items.length; let newItems = []; // return Log.fromIpfsHash(this._ipfs, hash) // .then((other) => super.join(other)) diff --git a/src/stores/Store.js b/src/stores/Store.js index 434cb48..b927ee1 100644 --- a/src/stores/Store.js +++ b/src/stores/Store.js @@ -1,8 +1,10 @@ 'use strict'; const EventEmitter = require('events').EventEmitter; -const OperationsLog = require('../oplog/OperationsLog'); +const Log = require('ipfs-log'); +// const OperationsLog = require('../oplog/OperationsLog'); const DefaultIndex = require('./DefaultIndex'); +const Cache = require('../oplog/Cache'); class Store { constructor(ipfs, id, dbname, options) { @@ -12,21 +14,37 @@ class Store { if(!options) options = {}; if(!options.Index) Object.assign(options, { Index: DefaultIndex }); - if(!options.Log) Object.assign(options, { Log: OperationsLog }); + if(!options.Log) Object.assign(options, { Log: Log }); + if(!options.cacheFile) Object.assign(options, { cacheFile: null }); this.options = options; this._index = new this.options.Index(this.id); this._oplog = null; this._ipfs = ipfs; + this._lastWrite = null; } use() { this.events.emit('load', this.dbname); this._oplog = new this.options.Log(this._ipfs, this.id, this.dbname, this.options); - return this._oplog.load() - .then((merged) => this._index.updateIndex(this._oplog, merged)) - .then(() => this.events.emit('readable', this.dbname)) - .then(() => this.events); + Cache.reset(); + return Cache.loadCache(this.options.cacheFile).then(() => { + const cached = Cache.get(this.dbname); + // return this._oplog.load() + if(cached) { + return this.options.Log.fromIpfsHash(this._ipfs, cached) + .then((log) => this._oplog.join(log)) + .then((merged) => this._index.updateIndex(this._oplog, merged)) + .then(() => this.events.emit('readable', this.dbname)) + .then(() => this.events); + } + + return Promise.resolve(this.events); + // return this._oplog.load() + // .then((merged) => this._index.updateIndex(this._oplog, merged)) + // .then(() => this.events.emit('readable', this.dbname)) + // .then(() => this.events); + }); } close() { @@ -34,20 +52,30 @@ class Store { } sync(hash) { - if(!hash || hash === this._oplog._lastWrite || !this._oplog) + // if(!hash || hash === this._oplog._lastWrite || !this._oplog) + if(!hash || hash === this._lastWrite) return Promise.resolve([]); - let newItems; + const oldCount = this._oplog.items.length; + let newItems = []; + // let newItems; this.events.emit('load', this.dbname); return this.options.Log.fromIpfsHash(this._ipfs, hash) .then((log) => this._oplog.join(log)) .then((merged) => newItems = merged) + .then(() => Log.getIpfsHash(this._ipfs, this._oplog)) + .then((hash) => Cache.set(this.dbname, hash)) .then(() => this._index.updateIndex(this._oplog, newItems)) .then(() => { if(newItems.length > 0) this.events.emit('readable', this.dbname); }) .then(() => newItems) + + // .then(() => Log.getIpfsHash(this._ipfs, this)) + // .then((hash) => Cache.set(this.name, hash)) + // .then(() => newItems.forEach((f) => Object.assign(f.payload, { hash: f.hash }))) + // .then(() => newItems.map((f) => f.payload)) } delete() { @@ -58,12 +86,32 @@ class Store { // _addOperation(type, key, data) { _addOperation(data) { + // let node, logHash; + // return super.add(entry) + // .then((op) => node = op) + // .then(() => Object.assign(node.payload, { hash: node.hash })) + // .then(() => Log.getIpfsHash(this._ipfs, this)) + // .then((hash) => logHash = hash) + // .then(() => this._lastWrite = logHash) + // .then(() => Cache.set(this.name, logHash)) + // .then(() => { + // return node.payload; + // }) + let result, logHash; if(this._oplog) { return this._oplog.add(data) - .then((op) => result = op) + // .then((op) => result = op) + .then((res) => { + result = res; + Object.assign(result.payload, { hash: res.hash }) + return result; + }) + // .then(() => Object.assign(result.payload, { hash: node.hash })) .then(() => this.options.Log.getIpfsHash(this._ipfs, this._oplog)) .then((hash) => logHash = hash) + .then(() => this._lastWrite = logHash) + .then(() => Cache.set(this.dbname, logHash)) .then(() => this._index.updateIndex(this._oplog, [result])) .then(() => this.events.emit('data', this.dbname, logHash)) .then(() => result.hash); diff --git a/src/stores/counters/CounterIndex.js b/src/stores/counters/CounterIndex.js index 259d555..57d4b23 100644 --- a/src/stores/counters/CounterIndex.js +++ b/src/stores/counters/CounterIndex.js @@ -12,14 +12,13 @@ class CounterIndex { } updateIndex(oplog, added) { - // console.log("ADDED", added) if(this._counter) { - // added.filter((f) => f && f.payload.op === 'COUNTER') - // .map((f) => Counter.from(f.payload.value)) - // .forEach((f) => this._counter.merge(f)) - added.filter((f) => f && f.op === 'COUNTER') - .map((f) => Counter.from(f.value)) + added.filter((f) => f && f.payload.op === 'COUNTER') + .map((f) => Counter.from(f.payload.value)) .forEach((f) => this._counter.merge(f)) + // added.filter((f) => f && f.op === 'COUNTER') + // .map((f) => Counter.from(f.value)) + // .forEach((f) => this._counter.merge(f)) } } } diff --git a/src/stores/eventlog/EventIndex.js b/src/stores/eventlog/EventIndex.js index fd3cc87..66dbbf5 100644 --- a/src/stores/eventlog/EventIndex.js +++ b/src/stores/eventlog/EventIndex.js @@ -9,14 +9,14 @@ class EventIndex { return Object.keys(this._index).map((f) => this._index[f]); } - updateIndex(oplog, updated) { - updated.reduce((handled, item) => { + updateIndex(oplog, added) { + added.reduce((handled, item) => { if(handled.indexOf(item.hash) === -1) { handled.push(item.hash); - if(item.op === 'ADD') { - this._index[item.hash] = item - } else if(item.op === 'DEL') { - delete this._index[item.value]; + if(item.payload.op === 'ADD') { + this._index[item.hash] = item.payload + } else if(item.payload.op === 'DEL') { + delete this._index[item.payload.value]; } } return handled; diff --git a/src/stores/kvstore/KeyValueIndex.js b/src/stores/kvstore/KeyValueIndex.js index 4ea3d7f..7e50dde 100644 --- a/src/stores/kvstore/KeyValueIndex.js +++ b/src/stores/kvstore/KeyValueIndex.js @@ -11,12 +11,12 @@ class KeyValueIndex { updateIndex(oplog, updated) { updated.reverse().reduce((handled, item) => { - if(handled.indexOf(item.key) === -1) { - handled.push(item.key); - if(item.op === 'PUT') { - this._index[item.key] = item.value - } else if(item.op === 'DEL') { - delete this._index[item.key]; + if(handled.indexOf(item.payload.key) === -1) { + handled.push(item.payload.key); + if(item.payload.op === 'PUT') { + this._index[item.payload.key] = item.payload.value + } else if(item.payload.op === 'DEL') { + delete this._index[item.payload.key]; } } return handled; diff --git a/test/counterdb.test.js b/test/counterdb.test.js index d794ef4..f4a253b 100644 --- a/test/counterdb.test.js +++ b/test/counterdb.test.js @@ -70,7 +70,7 @@ describe('CounterStore', function() { describe('counters', function() { it('increases a counter value', (done) => { - client1.counter('counter test', false).then((counter) => { + client1.counter('counter test', { subscribe: false, cacheFile: './orbit-db-cache.json' }).then((counter) => { Promise.map([13, 1], (f) => counter.inc(f), { concurrency: 1 }).then(() => { assert.equal(counter.value(), 14); done(); @@ -87,7 +87,7 @@ describe('CounterStore', function() { }); it('creates a new counter from cached data', function(done) { - client1.counter('counter test', false).then((counter) => { + client1.counter('counter test', { subscribe: false, cacheFile: './orbit-db-cache.json' }).then((counter) => { assert.equal(counter.value(), 14); done(); }).catch((e) => {