Refactor stores to be individual databases

This commit is contained in:
haad 2016-05-03 10:38:47 +02:00
parent 9573f24bc7
commit aece96b49c
10 changed files with 123 additions and 119 deletions

View File

@ -14,50 +14,33 @@ class OrbitDB {
this.user = null; this.user = null;
this.network = null; this.network = null;
this.events = new EventEmitter(); this.events = new EventEmitter();
this.kvStore = new KeyValueStore(this._ipfs, options); this.stores = {};
this.eventStore = new EventStore(this._ipfs, options);
this.counterStore = new CounterStore(this._ipfs, options);
} }
eventlog(dbname, subscribe) { eventlog(dbname, options) {
const db = this.eventStore; if(!options) options = { subscribe: true };
const api = { const store = new EventStore(this._ipfs, dbname, options);
iterator: (options) => db.iterator(dbname, options), return this._subscribe(store, dbname, options.subscribe)
add: (data) => db.add(dbname, data), .then(() => this.stores[dbname] = store)
del: (hash) => db.remove(dbname, hash), .then(() => store);
delete: () => db.delete(dbname),
close: () => this._pubsub.unsubscribe(dbname)
}
return this._subscribe(db, dbname, subscribe).then(() => api);
} }
kvstore(dbname, subscribe) { kvstore(dbname, options) {
const db = this.kvStore; if(!options) options = { subscribe: true };
const api = { const store = new KeyValueStore(this._ipfs, dbname, options);
put: (key, value) => db.put(dbname, key, value), if(this.stores[dbname]) this.stores[dbname].close();
set: (key, value) => db.set(dbname, key, value), // alias for put()
get: (key) => db.get(dbname, key),
del: (key) => db.del(dbname, key),
delete: () => db.delete(dbname),
close: () => this._pubsub.unsubscribe(dbname),
sync: (hash) => db.sync(dbname, hash)
}
return this._subscribe(db, dbname, subscribe).then(() => api); return this._subscribe(store, dbname, options.subscribe)
.then(() => this.stores[dbname] = store)
.then(() => store);
} }
counter(dbname, subscribe) { counter(dbname, options) {
const db = this.counterStore; if(!options) options = { subscribe: true };
const api = { const store = new CounterStore(this._ipfs, dbname, options);
value: () => db.query(dbname), return this._subscribe(store, dbname, options.subscribe)
inc: (amount) => db.inc(dbname, amount), .then(() => this.stores[dbname] = store)
dec: (amount) => console.log("dec() not implemented yet"), .then(() => store);
delete: () => db.delete(dbname),
close: () => this._pubsub.unsubscribe(dbname),
}
return this._subscribe(db, dbname, subscribe).then(() => api);
} }
disconnect() { disconnect() {
@ -70,10 +53,11 @@ class OrbitDB {
_subscribe(store, dbname, subscribe, callback) { _subscribe(store, dbname, subscribe, callback) {
if(subscribe === undefined) subscribe = true; if(subscribe === undefined) subscribe = true;
return store.use(dbname, this.user.username).then((events) => { return store.use(this.user.username).then((events) => {
events.on('readable', this._onSync.bind(this)); events.on('readable', this._onSync.bind(this));
events.on('data', this._onWrite.bind(this)); events.on('data', this._onWrite.bind(this));
events.on('load', this._onLoad.bind(this)); events.on('load', this._onLoad.bind(this));
events.on('close', this._onClose.bind(this));
if(subscribe) if(subscribe)
this._pubsub.subscribe(dbname, '', this._onMessage.bind(this)); this._pubsub.subscribe(dbname, '', this._onMessage.bind(this));
@ -82,12 +66,12 @@ class OrbitDB {
}); });
} }
_onMessage(channel, message) { _onMessage(dbname, message) {
[this.eventStore, this.kvStore, this.counterStore].forEach((store) => { const store = this.stores[dbname];
store.sync(channel, message).catch((e) => logger.error(e.stack)); store.sync(message).catch((e) => logger.error(e.stack));
})
} }
// TODO: FIX EVENTS!!
_onWrite(channel, hash) { _onWrite(channel, hash) {
this._pubsub.publish(channel, hash); this._pubsub.publish(channel, hash);
this.events.emit('data', channel, hash); this.events.emit('data', channel, hash);
@ -101,6 +85,12 @@ class OrbitDB {
this.events.emit('load', channel, hash); this.events.emit('load', channel, hash);
} }
_onClose(dbname) {
this._pubsub.unsubscribe(dbname);
delete this.stores[dbname];
this.events.emit('closed', dbname);
}
_connect(host, port, username, password, allowOffline) { _connect(host, port, username, password, allowOffline) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if(allowOffline === undefined) allowOffline = false; if(allowOffline === undefined) allowOffline = false;

View File

@ -57,8 +57,12 @@ class OperationsLog {
const oldCount = this._log.items.length; const oldCount = this._log.items.length;
let newItems = []; let newItems = [];
return Log.fromIpfsHash(this._ipfs, hash) return Log.fromIpfsHash(this._ipfs, hash)
.then((other) => this._log.join(other)) .then((other) => {
.then((merged) => newItems = merged) return this._log.join(other)
})
.then((merged) => {
newItems = merged
})
.then(() => Cache.set(this.dbname, hash)) .then(() => Cache.set(this.dbname, hash))
.then(() => newItems.map((f) => f.payload)) .then(() => newItems.map((f) => f.payload))
} }

View File

@ -5,44 +5,49 @@ const OperationsLog = require('../oplog/OperationsLog');
const DefaultIndex = require('./DefaultIndex'); const DefaultIndex = require('./DefaultIndex');
class Store { class Store {
constructor(ipfs, options) { constructor(ipfs, dbname, options) {
this._ipfs = ipfs; this.dbname = dbname;
this._index = new DefaultIndex(); this.events = null;
this._oplogs = {};
this.events = {};
this.options = options || {}; this.options = options || {};
this._index = new DefaultIndex();
this._oplog = null;
this._ipfs = ipfs;
} }
use(dbname, id) { use(id) {
this.events[dbname] = new EventEmitter(); this.events = new EventEmitter();
const oplog = new OperationsLog(this._ipfs, dbname, this.events[dbname], this.options); const oplog = new OperationsLog(this._ipfs, this.dbname, this.events, this.options);
return oplog.load(id) return oplog.load(id)
.then((merged) => this._index.updateIndex(oplog, merged)) .then((merged) => this._index.updateIndex(oplog, merged))
.then(() => this._oplogs[dbname] = oplog) .then(() => this._oplog = oplog)
.then(() => this.events[dbname]); .then(() => this.events);
} }
sync(dbname, hash) { close() {
const oplog = this._oplogs[dbname]; this.events.emit('close', this.dbname);
}
sync(hash) {
const oplog = this._oplog;
let newItems; let newItems;
if(hash && oplog) { if(hash && oplog) {
return oplog.merge(hash) return oplog.merge(hash)
.then((merged) => newItems = merged) .then((merged) => newItems = merged)
.then(() => this._index.updateIndex(oplog, newItems)) .then(() => this._index.updateIndex(oplog, newItems))
.then(() => this.events[dbname].emit('readable', dbname)) .then(() => this.events.emit('readable', this.dbname))
.then(() => newItems) .then(() => newItems)
} }
return Promise.resolve([]); return Promise.resolve([]);
} }
delete(dbname) { delete() {
if(this._oplogs[dbname]) if(this._oplog)
this._oplogs[dbname].delete(); this._oplog.delete();
} }
_addOperation(dbname, type, key, data) { _addOperation(type, key, data) {
const oplog = this._oplogs[dbname]; const oplog = this._oplog;
let result; let result;
if(oplog) { if(oplog) {
return oplog.addOperation(type, key, data) return oplog.addOperation(type, key, data)

View File

@ -4,25 +4,25 @@ const Counter = require('../../crdts/GCounter');
class CounterIndex { class CounterIndex {
constructor() { constructor() {
this._index = {}; this._index = null;
} }
createCounter(dbname, id) { createCounter(id) {
this._index[dbname] = new Counter(id); this._index = new Counter(id);
} }
get(dbname) { get() {
return this._index[dbname]; return this._index;
} }
updateIndex(oplog, updated) { updateIndex(oplog, updated) {
const counter = this._index[oplog.dbname]; const counter = this._index;
if(counter) { if(counter) {
updated.filter((f) => f && f.op === 'COUNTER') updated.filter((f) => f && f.op === 'COUNTER')
.map((f) => Counter.from(f.value)) .map((f) => Counter.from(f.value))
.forEach((f) => counter.merge(f)) .forEach((f) => counter.merge(f))
this._index[oplog.dbname] = counter; this._index = counter;
} }
} }
} }

View File

@ -4,30 +4,30 @@ const Store = require('../Store');
const CounterIndex = require('./CounterIndex'); const CounterIndex = require('./CounterIndex');
class CounterStore extends Store { class CounterStore extends Store {
constructor(ipfs, options) { constructor(ipfs, dbname, options) {
super(ipfs, options) super(ipfs, dbname, options)
this._index = new CounterIndex(); this._index = new CounterIndex();
} }
use(dbname, id) { use(id) {
this._index.createCounter(dbname, id); this._index.createCounter(id);
return super.use(dbname, id); return super.use(id);
} }
delete(dbname) { delete() {
super.delete(dbname); super.delete();
this._index = new CounterIndex(); this._index = new CounterIndex();
} }
query(dbname) { value() {
return this._index.get(dbname).value; return this._index.get().value;
} }
inc(dbname, amount) { inc(amount) {
const counter = this._index.get(dbname); const counter = this._index.get();
if(counter) { if(counter) {
counter.increment(amount); counter.increment(amount);
return this._addOperation(dbname, 'COUNTER', null, counter.payload); return this._addOperation('COUNTER', null, counter.payload);
} }
} }
} }

View File

@ -5,26 +5,26 @@ const Store = require('../Store');
const EventLogIndex = require('./EventIndex'); const EventLogIndex = require('./EventIndex');
class EventStore extends Store { class EventStore extends Store {
constructor(ipfs, options) { constructor(ipfs, dbname, options) {
super(ipfs, options) super(ipfs, dbname, options)
this._index = new EventLogIndex(); this._index = new EventLogIndex();
} }
delete(dbname) { delete(dbname) {
super.delete(dbname); super.delete();
this._index = new EventLogIndex(); this._index = new EventLogIndex();
} }
add(dbname, data) { add(data) {
return this._addOperation(dbname, 'ADD', null, data); return this._addOperation('ADD', null, data);
} }
remove(dbname, hash) { remove(hash) {
return this._addOperation(dbname, 'DELETE', hash); return this._addOperation('DELETE', hash);
} }
iterator(dbname, options) { iterator(options) {
const messages = this._query(dbname, options); const messages = this._query(this.dbname, options);
let currentIndex = 0; let currentIndex = 0;
let iterator = { let iterator = {
[Symbol.iterator]() { [Symbol.iterator]() {

View File

@ -15,10 +15,11 @@ class KeyValueIndex {
updated.reverse().forEach((item) => { updated.reverse().forEach((item) => {
if(handled.indexOf(item.key) === -1) { if(handled.indexOf(item.key) === -1) {
handled.push(item.key); handled.push(item.key);
if(item.op === 'PUT') if(item.op === 'PUT') {
this._index[item.key] = item.value this._index[item.key] = item.value
else if (item.op === 'DELETE') } else if (item.op === 'DELETE') {
delete this._index[item.key]; delete this._index[item.key];
}
} }
}); });
} }

View File

@ -4,30 +4,30 @@ const Store = require('../Store');
const KVIndex = require('./KeyValueIndex'); const KVIndex = require('./KeyValueIndex');
class KeyValueStore extends Store { class KeyValueStore extends Store {
constructor(ipfs, options) { constructor(ipfs, dbname, options) {
super(ipfs, options) super(ipfs, dbname, options)
this._index = new KVIndex(); this._index = new KVIndex();
} }
delete(dbname) { delete() {
super.delete(dbname); super.delete();
this._index = new KVIndex(); this._index = new KVIndex();
} }
get(dbname, key) { get(key) {
return this._index.get(key); return this._index.get(key);
} }
set(dbname, key, data) { set(key, data) {
this.put(dbname, key, data); this.put(key, data);
} }
put(dbname, key, data) { put(key, data) {
return this._addOperation(dbname, 'PUT', key, data); return this._addOperation('PUT', key, data);
} }
del(dbname, key) { del(key) {
return this._addOperation(dbname, 'DELETE', key); return this._addOperation('DELETE', key);
} }
} }

View File

@ -22,13 +22,13 @@ const startIpfs = () => {
if(err) console.error(err); if(err) console.error(err);
resolve(ipfs); resolve(ipfs);
}); });
ipfsd.local((err, node) => { // ipfsd.local((err, node) => {
if(err) reject(err); // if(err) reject(err);
node.startDaemon((err, ipfs) => { // node.startDaemon((err, ipfs) => {
if(err) reject(err); // if(err) reject(err);
resolve(ipfs); // resolve(ipfs);
}); // });
}); // });
}); });
}; };
@ -134,7 +134,7 @@ describe('Orbit Client', function() {
describe('Add events', function() { describe('Add events', function() {
beforeEach(async((done) => { beforeEach(async((done) => {
db = await(client.eventlog(channel, false)); db = await(client.eventlog(channel, { subscribe: false }));
db.delete(); db.delete();
done(); done();
})); }));
@ -187,7 +187,7 @@ describe('Orbit Client', function() {
it('deletes an item when only one item in the database', async((done) => { it('deletes an item when only one item in the database', async((done) => {
const head = await(db.add('hello1')); const head = await(db.add('hello1'));
const delop = await(db.del(head)); const delop = await(db.remove(head));
const items = db.iterator().collect(); const items = db.iterator().collect();
assert.equal(delop.startsWith('Qm'), true); assert.equal(delop.startsWith('Qm'), true);
assert.equal(items.length, 0); assert.equal(items.length, 0);
@ -197,7 +197,7 @@ describe('Orbit Client', function() {
it('deletes an item when two items in the database', async((done) => { it('deletes an item when two items in the database', async((done) => {
await(db.add('hello1')); await(db.add('hello1'));
const head = await(db.add('hello2')); const head = await(db.add('hello2'));
await(db.del(head)); await(db.remove(head));
const items = db.iterator({ limit: -1 }).collect(); const items = db.iterator({ limit: -1 }).collect();
assert.equal(items.length, 1); assert.equal(items.length, 1);
assert.equal(items[0].value, 'hello1'); assert.equal(items[0].value, 'hello1');
@ -207,7 +207,7 @@ describe('Orbit Client', function() {
it('deletes an item between adds', async((done) => { it('deletes an item between adds', async((done) => {
const head = await(db.add('hello1')); const head = await(db.add('hello1'));
await(db.add('hello2')); await(db.add('hello2'));
db.del(head); db.remove(head);
await(db.add('hello3')); await(db.add('hello3'));
const items = db.iterator().collect(); const items = db.iterator().collect();
assert.equal(items.length, 1); assert.equal(items.length, 1);
@ -224,7 +224,7 @@ describe('Orbit Client', function() {
beforeEach(async((done) => { beforeEach(async((done) => {
items = []; items = [];
db = await(client.eventlog(channel, false)); db = await(client.eventlog(channel, { subscribe: false }));
db.delete(); db.delete();
for(let i = 0; i < itemCount; i ++) { for(let i = 0; i < itemCount; i ++) {
const hash = await(db.add('hello' + i)); const hash = await(db.add('hello' + i));
@ -529,15 +529,18 @@ describe('Orbit Client', function() {
describe('Key-Value Store', function() { describe('Key-Value Store', function() {
beforeEach(async((done) => { beforeEach(async((done) => {
db = await(client.kvstore(channel, false)); db = await(client.kvstore(channel, { subscribe: false }));
db.delete(); db.delete();
done(); done();
})); }));
afterEach((done) => { afterEach((done) => {
db.delete(); db.delete();
client.events.on('closed', (dbname) => {
client.events.removeAllListeners('closed')
done()
});
db.close(); db.close();
done();
}); });
it('put', async((done) => { it('put', async((done) => {
@ -617,7 +620,8 @@ describe('Orbit Client', function() {
})); }));
it('syncs databases', async((done) => { it('syncs databases', async((done) => {
const db2 = await(client2.kvstore(channel, false)); const db2 = await(client2.kvstore(channel, { subscribe: false }));
db2.delete();
await(db.put('key1', 'hello1')); await(db.put('key1', 'hello1'));
await(db2.put('key1', 'hello2')); await(db2.put('key1', 'hello2'));
await(db.sync('QmNtELU2N3heY9cFgRuLWavgov7NTXibNyZCxcTCYjw1TM')) await(db.sync('QmNtELU2N3heY9cFgRuLWavgov7NTXibNyZCxcTCYjw1TM'))

View File

@ -17,6 +17,7 @@ const ipfsPath = '/tmp/orbittests';
const startIpfs = () => { const startIpfs = () => {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
OrbitServer.start();
// ipfsd.local(ipfsPath, (err, node) => { // ipfsd.local(ipfsPath, (err, node) => {
// if(err) reject(err); // if(err) reject(err);
// node.startDaemon((err, ipfs) => { // node.startDaemon((err, ipfs) => {
@ -24,7 +25,6 @@ const startIpfs = () => {
// resolve(ipfs); // resolve(ipfs);
// }); // });
// }); // });
OrbitServer.start();
ipfsd.disposableApi((err, ipfs) => { ipfsd.disposableApi((err, ipfs) => {
if(err) reject(err); if(err) reject(err);
resolve(ipfs); resolve(ipfs);