Merge oplog to store

This commit is contained in:
haad
2016-05-06 13:22:41 +02:00
parent 3ccab7b17d
commit c4e245d3d1
9 changed files with 98 additions and 39 deletions

View File

@@ -8,7 +8,7 @@ const KeyValueStore = require('./stores/kvstore/KeyValueStore');
const EventStore = require('./stores/eventlog/EventStore');
class OrbitDB {
constructor(ipfs, options) {
constructor(ipfs) {
this._ipfs = ipfs;
this._pubsub = null;
this.user = null;
@@ -18,7 +18,9 @@ class OrbitDB {
}
eventlog(dbname, options) {
if(!options) options = { subscribe: true };
if(!options) options = {};
if(options.subscribe === undefined) Object.assign(options, { subscribe: true });
const store = new EventStore(this._ipfs, this.user.username, dbname, options);
return this._subscribe(store, dbname, options.subscribe)
.then(() => this.stores[dbname] = store)
@@ -26,7 +28,9 @@ class OrbitDB {
}
kvstore(dbname, options) {
if(!options) options = { subscribe: true };
if(!options) options = {};
if(options.subscribe === undefined) Object.assign(options, { subscribe: true });
const store = new KeyValueStore(this._ipfs, this.user.username, dbname, options);
return this._subscribe(store, dbname, options.subscribe)
.then(() => this.stores[dbname] = store)
@@ -34,7 +38,9 @@ class OrbitDB {
}
counter(dbname, options) {
if(!options) options = { subscribe: true };
if(!options) options = {};
if(options.subscribe === undefined) Object.assign(options, { subscribe: true });
const store = new CounterStore(this._ipfs, this.user.username, dbname, options);
return this._subscribe(store, dbname, options.subscribe)
.then(() => this.stores[dbname] = store)
@@ -144,7 +150,7 @@ class OrbitClientFactory {
throw new Error("IPFS instance not provided");
}
const client = new OrbitDB(ipfs, options);
const client = new OrbitDB(ipfs);
return client._connect(network, username, password, options.allowOffline)
.then(() => client)
}

View File

@@ -41,7 +41,8 @@ class Pubsub {
}
publish(hash, message) {
this._socket.send(JSON.stringify({ channel: hash, message: message }));
if(this._subscriptions[hash])
this._socket.send(JSON.stringify({ channel: hash, message: message }));
}
_handleMessage(hash, message) {

View File

@@ -30,11 +30,13 @@ class Cache {
return new Promise((resolve, reject) => {
// filePath = cacheFile ? cacheFile : defaultFilepath;
if(cacheFile) {
fs.exists(cacheFile, (err, res) => {
filePath = cacheFile;
fs.exists(cacheFile, (res) => {
logger.debug(res);
if(res) {
filePath = cacheFile;
logger.debug('Load cache from ' + cacheFile);
cache = JSON.parse(fs.readFileSync(cacheFile));
resolve();
} else {
resolve();
}
@@ -44,6 +46,10 @@ class Cache {
}
});
}
static reset() {
cache = {};
}
}
module.exports = Cache;

View File

@@ -31,7 +31,6 @@ class OperationsLog extends Log {
// ts: new Date().getTime()
// }
// };
let node, logHash;
return super.add(entry)
.then((op) => node = op)
@@ -66,7 +65,7 @@ class OperationsLog extends Log {
// if(!hash || hash === this._lastWrite)
// return Promise.resolve([]);
const oldCount = this.items.length;
// const oldCount = this.items.length;
let newItems = [];
// return Log.fromIpfsHash(this._ipfs, hash)
// .then((other) => super.join(other))

View File

@@ -1,8 +1,10 @@
'use strict';
const EventEmitter = require('events').EventEmitter;
const OperationsLog = require('../oplog/OperationsLog');
const Log = require('ipfs-log');
// const OperationsLog = require('../oplog/OperationsLog');
const DefaultIndex = require('./DefaultIndex');
const Cache = require('../oplog/Cache');
class Store {
constructor(ipfs, id, dbname, options) {
@@ -12,21 +14,37 @@ class Store {
if(!options) options = {};
if(!options.Index) Object.assign(options, { Index: DefaultIndex });
if(!options.Log) Object.assign(options, { Log: OperationsLog });
if(!options.Log) Object.assign(options, { Log: Log });
if(!options.cacheFile) Object.assign(options, { cacheFile: null });
this.options = options;
this._index = new this.options.Index(this.id);
this._oplog = null;
this._ipfs = ipfs;
this._lastWrite = null;
}
use() {
this.events.emit('load', this.dbname);
this._oplog = new this.options.Log(this._ipfs, this.id, this.dbname, this.options);
return this._oplog.load()
.then((merged) => this._index.updateIndex(this._oplog, merged))
.then(() => this.events.emit('readable', this.dbname))
.then(() => this.events);
Cache.reset();
return Cache.loadCache(this.options.cacheFile).then(() => {
const cached = Cache.get(this.dbname);
// return this._oplog.load()
if(cached) {
return this.options.Log.fromIpfsHash(this._ipfs, cached)
.then((log) => this._oplog.join(log))
.then((merged) => this._index.updateIndex(this._oplog, merged))
.then(() => this.events.emit('readable', this.dbname))
.then(() => this.events);
}
return Promise.resolve(this.events);
// return this._oplog.load()
// .then((merged) => this._index.updateIndex(this._oplog, merged))
// .then(() => this.events.emit('readable', this.dbname))
// .then(() => this.events);
});
}
close() {
@@ -34,20 +52,30 @@ class Store {
}
sync(hash) {
if(!hash || hash === this._oplog._lastWrite || !this._oplog)
// if(!hash || hash === this._oplog._lastWrite || !this._oplog)
if(!hash || hash === this._lastWrite)
return Promise.resolve([]);
let newItems;
const oldCount = this._oplog.items.length;
let newItems = [];
// let newItems;
this.events.emit('load', this.dbname);
return this.options.Log.fromIpfsHash(this._ipfs, hash)
.then((log) => this._oplog.join(log))
.then((merged) => newItems = merged)
.then(() => Log.getIpfsHash(this._ipfs, this._oplog))
.then((hash) => Cache.set(this.dbname, hash))
.then(() => this._index.updateIndex(this._oplog, newItems))
.then(() => {
if(newItems.length > 0)
this.events.emit('readable', this.dbname);
})
.then(() => newItems)
// .then(() => Log.getIpfsHash(this._ipfs, this))
// .then((hash) => Cache.set(this.name, hash))
// .then(() => newItems.forEach((f) => Object.assign(f.payload, { hash: f.hash })))
// .then(() => newItems.map((f) => f.payload))
}
delete() {
@@ -58,12 +86,32 @@ class Store {
// _addOperation(type, key, data) {
_addOperation(data) {
// let node, logHash;
// return super.add(entry)
// .then((op) => node = op)
// .then(() => Object.assign(node.payload, { hash: node.hash }))
// .then(() => Log.getIpfsHash(this._ipfs, this))
// .then((hash) => logHash = hash)
// .then(() => this._lastWrite = logHash)
// .then(() => Cache.set(this.name, logHash))
// .then(() => {
// return node.payload;
// })
let result, logHash;
if(this._oplog) {
return this._oplog.add(data)
.then((op) => result = op)
// .then((op) => result = op)
.then((res) => {
result = res;
Object.assign(result.payload, { hash: res.hash })
return result;
})
// .then(() => Object.assign(result.payload, { hash: node.hash }))
.then(() => this.options.Log.getIpfsHash(this._ipfs, this._oplog))
.then((hash) => logHash = hash)
.then(() => this._lastWrite = logHash)
.then(() => Cache.set(this.dbname, logHash))
.then(() => this._index.updateIndex(this._oplog, [result]))
.then(() => this.events.emit('data', this.dbname, logHash))
.then(() => result.hash);

View File

@@ -12,14 +12,13 @@ class CounterIndex {
}
updateIndex(oplog, added) {
// console.log("ADDED", added)
if(this._counter) {
// added.filter((f) => f && f.payload.op === 'COUNTER')
// .map((f) => Counter.from(f.payload.value))
// .forEach((f) => this._counter.merge(f))
added.filter((f) => f && f.op === 'COUNTER')
.map((f) => Counter.from(f.value))
added.filter((f) => f && f.payload.op === 'COUNTER')
.map((f) => Counter.from(f.payload.value))
.forEach((f) => this._counter.merge(f))
// added.filter((f) => f && f.op === 'COUNTER')
// .map((f) => Counter.from(f.value))
// .forEach((f) => this._counter.merge(f))
}
}
}

View File

@@ -9,14 +9,14 @@ class EventIndex {
return Object.keys(this._index).map((f) => this._index[f]);
}
updateIndex(oplog, updated) {
updated.reduce((handled, item) => {
updateIndex(oplog, added) {
added.reduce((handled, item) => {
if(handled.indexOf(item.hash) === -1) {
handled.push(item.hash);
if(item.op === 'ADD') {
this._index[item.hash] = item
} else if(item.op === 'DEL') {
delete this._index[item.value];
if(item.payload.op === 'ADD') {
this._index[item.hash] = item.payload
} else if(item.payload.op === 'DEL') {
delete this._index[item.payload.value];
}
}
return handled;

View File

@@ -11,12 +11,12 @@ class KeyValueIndex {
updateIndex(oplog, updated) {
updated.reverse().reduce((handled, item) => {
if(handled.indexOf(item.key) === -1) {
handled.push(item.key);
if(item.op === 'PUT') {
this._index[item.key] = item.value
} else if(item.op === 'DEL') {
delete this._index[item.key];
if(handled.indexOf(item.payload.key) === -1) {
handled.push(item.payload.key);
if(item.payload.op === 'PUT') {
this._index[item.payload.key] = item.payload.value
} else if(item.payload.op === 'DEL') {
delete this._index[item.payload.key];
}
}
return handled;

View File

@@ -70,7 +70,7 @@ describe('CounterStore', function() {
describe('counters', function() {
it('increases a counter value', (done) => {
client1.counter('counter test', false).then((counter) => {
client1.counter('counter test', { subscribe: false, cacheFile: './orbit-db-cache.json' }).then((counter) => {
Promise.map([13, 1], (f) => counter.inc(f), { concurrency: 1 }).then(() => {
assert.equal(counter.value(), 14);
done();
@@ -87,7 +87,7 @@ describe('CounterStore', function() {
});
it('creates a new counter from cached data', function(done) {
client1.counter('counter test', false).then((counter) => {
client1.counter('counter test', { subscribe: false, cacheFile: './orbit-db-cache.json' }).then((counter) => {
assert.equal(counter.value(), 14);
done();
}).catch((e) => {