WIP commit

This commit is contained in:
haad 2016-04-26 13:02:55 +02:00
parent abe957890d
commit a858eabbe3
11 changed files with 77 additions and 93 deletions

View File

@ -83,22 +83,17 @@ class Client {
} }
_onMessage(channel, message) { _onMessage(channel, message) {
console.log("<--", channel, message) this.eventlogDB.sync(channel, message).catch((e) => logger.error(e.stack));
this.eventlogDB.sync(channel, message); this.counterDB.sync(channel, message).catch((e) => logger.error(e.stack));
this.counterDB.sync(channel, message).catch((e) => { this.keyvalueDB.sync(channel, message).catch((e) => logger.error(e.stack));
logger.error(e.stack);
})
this.keyvalueDB.sync(channel, message);
} }
_onWrite(channel, hash) { _onWrite(channel, hash) {
console.log("-->", channel, hash)
this._pubsub.publish(channel, hash); this._pubsub.publish(channel, hash);
this.events.emit('data', channel, hash); this.events.emit('data', channel, hash);
} }
_onSync(channel, hash) { _onSync(channel, hash) {
console.log("synced", channel, hash)
this.events.emit('data', channel, hash); this.events.emit('data', channel, hash);
} }

View File

@ -1,64 +1,26 @@
'use strict'; 'use strict';
const Lazy = require('lazy.js'); const OrbitDB = require('./OrbitDB');
const OrbitDB = require('./OrbitDB'); const OpTypes = require('./Operation').Types;
const OpTypes = require('./Operation').Types; const CounterIndex = require('./CounterIndex');
const Counter = require('./GCounter');
class CounterIndex {
constructor() {
this._index = {};
}
createCounter(key, id) {
this._index[key] = new Counter(id);
}
get(key) {
return this._index[key];
}
updateIndex(oplog) {
console.log("UPDATE IDNEX!", JSON.stringify(oplog.ops, null, 2));
const counter = this._index[oplog.dbname];
if(counter) {
Lazy(oplog.ops)
.map((f) => Counter.from(f.value))
.each((f) => counter.merge(f))
this._index[oplog.dbname] = counter;
}
}
}
class CounterDB extends OrbitDB { class CounterDB extends OrbitDB {
constructor(ipfs, options) { constructor(ipfs, options) {
super(ipfs, options) super(ipfs, options)
// this._counters = {};
this._index = new CounterIndex(); this._index = new CounterIndex();
} }
use(dbname, id) { use(dbname, id) {
// this._counters[dbname] = new Counter(id);
this._index.createCounter(dbname, id); this._index.createCounter(dbname, id);
return super.use(dbname, id); return super.use(dbname, id);
} }
// sync(dbname, hash) { delete(dbname) {
// const counter = this._counters[dbname]; super.delete(dbname);
// if(counter) { this._index = new CounterIndex();
// return super.sync(dbname, hash).then((oplog) => { }
// console.log("OPFS", oplog)
// return Lazy(oplog.ops)
// .map((f) => Counter.from(f.value))
// .map((f) => counter.merge(f))
// .toArray();
// });
// }
// }
query(dbname) { query(dbname) {
// return this._counters[dbname].value;
return this._index.get(dbname).value; return this._index.get(dbname).value;
} }

31
src/db/CounterIndex.js Normal file
View File

@ -0,0 +1,31 @@
'use strict';
const Counter = require('./GCounter');
class CounterIndex {
constructor() {
this._index = {};
}
createCounter(key, id) {
this._index[key] = new Counter(id);
}
get(key) {
return this._index[key];
}
updateIndex(oplog) {
const counter = this._index[oplog.dbname];
if(counter) {
oplog.ops
.filter((f) => f !== undefined)
.map((f) => Counter.from(f.value))
.forEach((f) => counter.merge(f))
this._index[oplog.dbname] = counter;
}
}
}
module.exports = CounterIndex;

17
src/db/DefaultIndex.js Normal file
View File

@ -0,0 +1,17 @@
'use strict';
class DefaultIndex {
constructor() {
this._index = [];
}
get() {
return this._index;
}
updateIndex(oplog) {
this._index = oplog.ops;
}
}
module.exports = DefaultIndex;

View File

@ -53,7 +53,7 @@ class EventLogDB extends OrbitDB {
if(opts.gt || opts.gte) { if(opts.gt || opts.gte) {
// Greater than case // Greater than case
result = this._read(this._index.get().reverse(), opts.gt ? opts.gt : opts.gte, amount, opts.gte ? opts.gte : false) result = this._read(this._index.get().reverse(), opts.gt ? opts.gt : opts.gte, amount, opts.gte ? true : false)
} else { } else {
// Lower than and lastN case, search latest first by reversing the sequence // Lower than and lastN case, search latest first by reversing the sequence
result = this._read(this._index.get(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse() result = this._read(this._index.get(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse()
@ -66,8 +66,8 @@ class EventLogDB extends OrbitDB {
_read(ops, key, amount, inclusive) { _read(ops, key, amount, inclusive) {
return Lazy(ops) return Lazy(ops)
.skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested .skipWhile((f) => key && f.key !== key)
.drop(inclusive ? 0 : 1) // Drop the 'gt/lt' item, include 'gte/lte' item .drop(inclusive ? 0 : 1)
.take(amount); .take(amount);
} }
} }

View File

@ -1,6 +1,5 @@
'use strict'; 'use strict';
const Lazy = require('lazy.js');
const OpTypes = require('./Operation').Types; const OpTypes = require('./Operation').Types;
class EventLogIndex { class EventLogIndex {
@ -15,7 +14,7 @@ class EventLogIndex {
updateIndex(oplog) { updateIndex(oplog) {
let handled = []; let handled = [];
const _createLWWSet = (item) => { const _createLWWSet = (item) => {
if(Lazy(handled).indexOf(item.key) === -1) { if(handled.indexOf(item.key) === -1) {
handled.push(item.key); handled.push(item.key);
if(OpTypes.isInsert(item.op)) if(OpTypes.isInsert(item.op))
return item; return item;
@ -23,13 +22,10 @@ class EventLogIndex {
return null; return null;
}; };
const items = Lazy(oplog.ops.reverse()) this._index = oplog.ops
.map(_createLWWSet) // Return items as LWW (ignore values after the first found) .reverse()
.compact() // Remove nulls .map(_createLWWSet)
// .take(oplog.ops.length) .filter((f) => f !== null);
.toArray();
this._index = items;
} }
} }

View File

@ -32,7 +32,6 @@ class GCounter {
} }
merge(other) { merge(other) {
console.log("MERGE", other, this)
Object.keys(other._counters).forEach((f) => { Object.keys(other._counters).forEach((f) => {
this._counters[f] = Math.max(this._counters[f] ? this._counters[f] : 0, other._counters[f]); this._counters[f] = Math.max(this._counters[f] ? this._counters[f] : 0, other._counters[f]);
}); });

View File

@ -1,6 +1,5 @@
'use strict'; 'use strict';
const Lazy = require('lazy.js');
const OpTypes = require('./Operation').Types; const OpTypes = require('./Operation').Types;
class KVIndex { class KVIndex {
@ -15,7 +14,7 @@ class KVIndex {
updateIndex(oplog) { updateIndex(oplog) {
let handled = []; let handled = [];
const _createLWWSet = (item) => { const _createLWWSet = (item) => {
if(Lazy(handled).indexOf(item.key) === -1) { if(handled.indexOf(item.key) === -1) {
handled.push(item.key); handled.push(item.key);
if(OpTypes.isInsert(item.op)) if(OpTypes.isInsert(item.op))
return item; return item;
@ -24,11 +23,11 @@ class KVIndex {
}; };
this._index = {}; this._index = {};
Lazy(oplog.ops.reverse()) oplog.ops
.reverse()
.map(_createLWWSet) .map(_createLWWSet)
.compact() .filter((f) => f !== null)
// .take(oplog.ops.length) .forEach((f) => this._index[f.key] = f.value);
.each((f) => this._index[f.key] = f.value);
} }
} }

View File

@ -35,10 +35,8 @@ class OperationsLog {
return; return;
}) })
.then(() => { .then(() => {
if(this.options.cacheFile) { if(this.options.cacheFile)
console.log("from cache", this.dbname)
return this.sync(Cache.get(this.dbname)) return this.sync(Cache.get(this.dbname))
}
return; return;
}) })
@ -50,31 +48,25 @@ class OperationsLog {
} }
sync(hash) { sync(hash) {
console.log("0", hash, this.lastWrite)
if(!hash || hash === this.lastWrite || !this._log) if(!hash || hash === this.lastWrite || !this._log)
return Promise.resolve(); return Promise.resolve();
this.events.emit('load', this.dbname); this.events.emit('load', this.dbname);
console.log("1")
const oldCount = this._log.items.length; const oldCount = this._log.items.length;
return Log.fromIpfsHash(this._ipfs, hash) return Log.fromIpfsHash(this._ipfs, hash)
.then((other) => this._log.join(other)) .then((other) => this._log.join(other))
.then((merged) => { .then((merged) => {
console.log("2")
if(this._log.items.length - oldCount === 0) if(this._log.items.length - oldCount === 0)
return; return;
return this._cacheInMemory(this._log); return this._cacheInMemory(this._log);
}) })
.then(() => { .then(() => {
console.log("3")
Cache.set(this.dbname, hash) Cache.set(this.dbname, hash)
this.events.emit('sync', this.dbname, hash) this.events.emit('sync', this.dbname, hash)
return this; return this;
}) })
// .then(() => this.events.emit('sync', this.dbname, hash))
// .then(() => this)
} }
addOperation(operation, key, value) { addOperation(operation, key, value) {
@ -93,7 +85,6 @@ class OperationsLog {
return Log.getIpfsHash(this._ipfs, this._log).then((hash) => { return Log.getIpfsHash(this._ipfs, this._log).then((hash) => {
this.lastWrite = hash; this.lastWrite = hash;
Cache.set(this.dbname, hash); Cache.set(this.dbname, hash);
console.log("----------------- write ------------------", this.id, hash)
this.events.emit('write', this.dbname, hash); this.events.emit('write', this.dbname, hash);
return result.op.hash; return result.op.hash;
}); });

View File

@ -2,11 +2,12 @@
const EventEmitter = require('events').EventEmitter; const EventEmitter = require('events').EventEmitter;
const OperationsLog = require('./OperationsLog'); const OperationsLog = require('./OperationsLog');
const DefaultIndex = require('./DefaultIndex');
class OrbitDB { class OrbitDB {
constructor(ipfs, options) { constructor(ipfs, options) {
this._ipfs = ipfs; this._ipfs = ipfs;
this._index = null; this._index = new DefaultIndex();;
this._oplogs = {}; this._oplogs = {};
this.events = {}; this.events = {};
this.options = options || {}; this.options = options || {};
@ -17,8 +18,7 @@ class OrbitDB {
const oplog = new OperationsLog(this._ipfs, dbname, this.events[dbname], this.options); const oplog = new OperationsLog(this._ipfs, dbname, this.events[dbname], this.options);
return oplog.create(id) return oplog.create(id)
.then(() => { .then(() => {
if(this._index) this._index.updateIndex(oplog);
this._index.updateIndex(oplog);
this._oplogs[dbname] = oplog; this._oplogs[dbname] = oplog;
return this; return this;
}); });
@ -27,13 +27,9 @@ class OrbitDB {
sync(dbname, hash) { sync(dbname, hash) {
const oplog = this._oplogs[dbname]; const oplog = this._oplogs[dbname];
if(hash && oplog) { if(hash && oplog) {
console.log("sync", dbname, hash, oplog.id)
return oplog.sync(hash) return oplog.sync(hash)
.then((result) => { .then((result) => {
console.log("synced", dbname, hash, oplog.id) this._index.updateIndex(oplog);
console.log("res", result)
if(this._index)
this._index.updateIndex(oplog);
return this; return this;
}); });
} }
@ -51,8 +47,7 @@ class OrbitDB {
if(oplog) { if(oplog) {
return oplog.addOperation(type, key, data) return oplog.addOperation(type, key, data)
.then((result) => { .then((result) => {
if(this._index) this._index.updateIndex(oplog);
this._index.updateIndex(oplog);
return result; return result;
}); });
} }

View File

@ -80,7 +80,6 @@ describe('Orbit Client', function() {
it('creates a new counter from cached data', function(done) { it('creates a new counter from cached data', function(done) {
client1.counter('counter test', false).then((counter) => { client1.counter('counter test', false).then((counter) => {
console.log("COUNTER", counter)
assert.equal(counter.value(), 14); assert.equal(counter.value(), 14);
done(); done();
}).catch((e) => { }).catch((e) => {