diff --git a/src/Client.js b/src/Client.js index 6524c07..d2aee86 100644 --- a/src/Client.js +++ b/src/Client.js @@ -83,22 +83,17 @@ class Client { } _onMessage(channel, message) { - console.log("<--", channel, message) - this.eventlogDB.sync(channel, message); - this.counterDB.sync(channel, message).catch((e) => { - logger.error(e.stack); - }) - this.keyvalueDB.sync(channel, message); + this.eventlogDB.sync(channel, message).catch((e) => logger.error(e.stack)); + this.counterDB.sync(channel, message).catch((e) => logger.error(e.stack)); + this.keyvalueDB.sync(channel, message).catch((e) => logger.error(e.stack)); } _onWrite(channel, hash) { - console.log("-->", channel, hash) this._pubsub.publish(channel, hash); this.events.emit('data', channel, hash); } _onSync(channel, hash) { - console.log("synced", channel, hash) this.events.emit('data', channel, hash); } diff --git a/src/db/CounterDB.js b/src/db/CounterDB.js index fa2dac8..6ab6d65 100644 --- a/src/db/CounterDB.js +++ b/src/db/CounterDB.js @@ -1,64 +1,26 @@ 'use strict'; -const Lazy = require('lazy.js'); -const OrbitDB = require('./OrbitDB'); -const OpTypes = require('./Operation').Types; -const Counter = require('./GCounter'); - -class CounterIndex { - constructor() { - this._index = {}; - } - - createCounter(key, id) { - this._index[key] = new Counter(id); - } - - get(key) { - return this._index[key]; - } - - updateIndex(oplog) { - console.log("UPDATE IDNEX!", JSON.stringify(oplog.ops, null, 2)); - const counter = this._index[oplog.dbname]; - if(counter) { - Lazy(oplog.ops) - .map((f) => Counter.from(f.value)) - .each((f) => counter.merge(f)) - - this._index[oplog.dbname] = counter; - } - } -} +const OrbitDB = require('./OrbitDB'); +const OpTypes = require('./Operation').Types; +const CounterIndex = require('./CounterIndex'); class CounterDB extends OrbitDB { constructor(ipfs, options) { super(ipfs, options) - // this._counters = {}; this._index = new CounterIndex(); } use(dbname, id) { - // this._counters[dbname] = new Counter(id); this._index.createCounter(dbname, id); return super.use(dbname, id); } - // sync(dbname, hash) { - // const counter = this._counters[dbname]; - // if(counter) { - // return super.sync(dbname, hash).then((oplog) => { - // console.log("OPFS", oplog) - // return Lazy(oplog.ops) - // .map((f) => Counter.from(f.value)) - // .map((f) => counter.merge(f)) - // .toArray(); - // }); - // } - // } + delete(dbname) { + super.delete(dbname); + this._index = new CounterIndex(); + } query(dbname) { - // return this._counters[dbname].value; return this._index.get(dbname).value; } diff --git a/src/db/CounterIndex.js b/src/db/CounterIndex.js new file mode 100644 index 0000000..70782dc --- /dev/null +++ b/src/db/CounterIndex.js @@ -0,0 +1,31 @@ +'use strict'; + +const Counter = require('./GCounter'); + +class CounterIndex { + constructor() { + this._index = {}; + } + + createCounter(key, id) { + this._index[key] = new Counter(id); + } + + get(key) { + return this._index[key]; + } + + updateIndex(oplog) { + const counter = this._index[oplog.dbname]; + if(counter) { + oplog.ops + .filter((f) => f !== undefined) + .map((f) => Counter.from(f.value)) + .forEach((f) => counter.merge(f)) + + this._index[oplog.dbname] = counter; + } + } +} + +module.exports = CounterIndex; diff --git a/src/db/DefaultIndex.js b/src/db/DefaultIndex.js new file mode 100644 index 0000000..ec69c41 --- /dev/null +++ b/src/db/DefaultIndex.js @@ -0,0 +1,17 @@ +'use strict'; + +class DefaultIndex { + constructor() { + this._index = []; + } + + get() { + return this._index; + } + + updateIndex(oplog) { + this._index = oplog.ops; + } +} + +module.exports = DefaultIndex; diff --git a/src/db/EventLogDB.js b/src/db/EventLogDB.js index 569ab8b..a616e59 100644 --- a/src/db/EventLogDB.js +++ b/src/db/EventLogDB.js @@ -53,7 +53,7 @@ class EventLogDB extends OrbitDB { if(opts.gt || opts.gte) { // Greater than case - result = this._read(this._index.get().reverse(), opts.gt ? opts.gt : opts.gte, amount, opts.gte ? opts.gte : false) + result = this._read(this._index.get().reverse(), opts.gt ? opts.gt : opts.gte, amount, opts.gte ? true : false) } else { // Lower than and lastN case, search latest first by reversing the sequence result = this._read(this._index.get(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse() @@ -66,8 +66,8 @@ class EventLogDB extends OrbitDB { _read(ops, key, amount, inclusive) { return Lazy(ops) - .skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested - .drop(inclusive ? 0 : 1) // Drop the 'gt/lt' item, include 'gte/lte' item + .skipWhile((f) => key && f.key !== key) + .drop(inclusive ? 0 : 1) .take(amount); } } diff --git a/src/db/EventLogIndex.js b/src/db/EventLogIndex.js index b592796..fc7e4b7 100644 --- a/src/db/EventLogIndex.js +++ b/src/db/EventLogIndex.js @@ -1,6 +1,5 @@ 'use strict'; -const Lazy = require('lazy.js'); const OpTypes = require('./Operation').Types; class EventLogIndex { @@ -15,7 +14,7 @@ class EventLogIndex { updateIndex(oplog) { let handled = []; const _createLWWSet = (item) => { - if(Lazy(handled).indexOf(item.key) === -1) { + if(handled.indexOf(item.key) === -1) { handled.push(item.key); if(OpTypes.isInsert(item.op)) return item; @@ -23,13 +22,10 @@ class EventLogIndex { return null; }; - const items = Lazy(oplog.ops.reverse()) - .map(_createLWWSet) // Return items as LWW (ignore values after the first found) - .compact() // Remove nulls - // .take(oplog.ops.length) - .toArray(); - - this._index = items; + this._index = oplog.ops + .reverse() + .map(_createLWWSet) + .filter((f) => f !== null); } } diff --git a/src/db/GCounter.js b/src/db/GCounter.js index fdce486..a4e002c 100644 --- a/src/db/GCounter.js +++ b/src/db/GCounter.js @@ -32,7 +32,6 @@ class GCounter { } merge(other) { - console.log("MERGE", other, this) Object.keys(other._counters).forEach((f) => { this._counters[f] = Math.max(this._counters[f] ? this._counters[f] : 0, other._counters[f]); }); diff --git a/src/db/KVIndex.js b/src/db/KVIndex.js index 6c54729..777c252 100644 --- a/src/db/KVIndex.js +++ b/src/db/KVIndex.js @@ -1,6 +1,5 @@ 'use strict'; -const Lazy = require('lazy.js'); const OpTypes = require('./Operation').Types; class KVIndex { @@ -15,7 +14,7 @@ class KVIndex { updateIndex(oplog) { let handled = []; const _createLWWSet = (item) => { - if(Lazy(handled).indexOf(item.key) === -1) { + if(handled.indexOf(item.key) === -1) { handled.push(item.key); if(OpTypes.isInsert(item.op)) return item; @@ -24,11 +23,11 @@ class KVIndex { }; this._index = {}; - Lazy(oplog.ops.reverse()) + oplog.ops + .reverse() .map(_createLWWSet) - .compact() - // .take(oplog.ops.length) - .each((f) => this._index[f.key] = f.value); + .filter((f) => f !== null) + .forEach((f) => this._index[f.key] = f.value); } } diff --git a/src/db/OperationsLog.js b/src/db/OperationsLog.js index bd090b3..194a469 100644 --- a/src/db/OperationsLog.js +++ b/src/db/OperationsLog.js @@ -35,10 +35,8 @@ class OperationsLog { return; }) .then(() => { - if(this.options.cacheFile) { - console.log("from cache", this.dbname) + if(this.options.cacheFile) return this.sync(Cache.get(this.dbname)) - } return; }) @@ -50,31 +48,25 @@ class OperationsLog { } sync(hash) { - console.log("0", hash, this.lastWrite) if(!hash || hash === this.lastWrite || !this._log) return Promise.resolve(); this.events.emit('load', this.dbname); - console.log("1") const oldCount = this._log.items.length; return Log.fromIpfsHash(this._ipfs, hash) .then((other) => this._log.join(other)) .then((merged) => { - console.log("2") if(this._log.items.length - oldCount === 0) return; return this._cacheInMemory(this._log); }) .then(() => { - console.log("3") Cache.set(this.dbname, hash) this.events.emit('sync', this.dbname, hash) return this; }) - // .then(() => this.events.emit('sync', this.dbname, hash)) - // .then(() => this) } addOperation(operation, key, value) { @@ -93,7 +85,6 @@ class OperationsLog { return Log.getIpfsHash(this._ipfs, this._log).then((hash) => { this.lastWrite = hash; Cache.set(this.dbname, hash); - console.log("----------------- write ------------------", this.id, hash) this.events.emit('write', this.dbname, hash); return result.op.hash; }); diff --git a/src/db/OrbitDB.js b/src/db/OrbitDB.js index 828a8af..49d9aaa 100644 --- a/src/db/OrbitDB.js +++ b/src/db/OrbitDB.js @@ -2,11 +2,12 @@ const EventEmitter = require('events').EventEmitter; const OperationsLog = require('./OperationsLog'); +const DefaultIndex = require('./DefaultIndex'); class OrbitDB { constructor(ipfs, options) { this._ipfs = ipfs; - this._index = null; + this._index = new DefaultIndex();; this._oplogs = {}; this.events = {}; this.options = options || {}; @@ -17,8 +18,7 @@ class OrbitDB { const oplog = new OperationsLog(this._ipfs, dbname, this.events[dbname], this.options); return oplog.create(id) .then(() => { - if(this._index) - this._index.updateIndex(oplog); + this._index.updateIndex(oplog); this._oplogs[dbname] = oplog; return this; }); @@ -27,13 +27,9 @@ class OrbitDB { sync(dbname, hash) { const oplog = this._oplogs[dbname]; if(hash && oplog) { - console.log("sync", dbname, hash, oplog.id) return oplog.sync(hash) .then((result) => { - console.log("synced", dbname, hash, oplog.id) - console.log("res", result) - if(this._index) - this._index.updateIndex(oplog); + this._index.updateIndex(oplog); return this; }); } @@ -51,8 +47,7 @@ class OrbitDB { if(oplog) { return oplog.addOperation(type, key, data) .then((result) => { - if(this._index) - this._index.updateIndex(oplog); + this._index.updateIndex(oplog); return result; }); } diff --git a/test/counterdb.test.js b/test/counterdb.test.js index b9e9b28..6b02adb 100644 --- a/test/counterdb.test.js +++ b/test/counterdb.test.js @@ -80,7 +80,6 @@ describe('Orbit Client', function() { it('creates a new counter from cached data', function(done) { client1.counter('counter test', false).then((counter) => { - console.log("COUNTER", counter) assert.equal(counter.value(), 14); done(); }).catch((e) => {