diff --git a/src/OrbitDB.js b/src/OrbitDB.js index 1980f50..620032e 100644 --- a/src/OrbitDB.js +++ b/src/OrbitDB.js @@ -14,50 +14,33 @@ class OrbitDB { this.user = null; this.network = null; this.events = new EventEmitter(); - this.kvStore = new KeyValueStore(this._ipfs, options); - this.eventStore = new EventStore(this._ipfs, options); - this.counterStore = new CounterStore(this._ipfs, options); + this.stores = {}; } - eventlog(dbname, subscribe) { - const db = this.eventStore; - const api = { - iterator: (options) => db.iterator(dbname, options), - add: (data) => db.add(dbname, data), - del: (hash) => db.remove(dbname, hash), - delete: () => db.delete(dbname), - close: () => this._pubsub.unsubscribe(dbname) - } - - return this._subscribe(db, dbname, subscribe).then(() => api); + eventlog(dbname, options) { + if(!options) options = { subscribe: true }; + const store = new EventStore(this._ipfs, dbname, options); + return this._subscribe(store, dbname, options.subscribe) + .then(() => this.stores[dbname] = store) + .then(() => store); } - kvstore(dbname, subscribe) { - const db = this.kvStore; - const api = { - put: (key, value) => db.put(dbname, key, value), - set: (key, value) => db.set(dbname, key, value), // alias for put() - get: (key) => db.get(dbname, key), - del: (key) => db.del(dbname, key), - delete: () => db.delete(dbname), - close: () => this._pubsub.unsubscribe(dbname), - sync: (hash) => db.sync(dbname, hash) - } + kvstore(dbname, options) { + if(!options) options = { subscribe: true }; + const store = new KeyValueStore(this._ipfs, dbname, options); + if(this.stores[dbname]) this.stores[dbname].close(); - return this._subscribe(db, dbname, subscribe).then(() => api); + return this._subscribe(store, dbname, options.subscribe) + .then(() => this.stores[dbname] = store) + .then(() => store); } - counter(dbname, subscribe) { - const db = this.counterStore; - const api = { - value: () => db.query(dbname), - inc: (amount) => db.inc(dbname, amount), - dec: (amount) => console.log("dec() not implemented yet"), - delete: () => db.delete(dbname), - close: () => this._pubsub.unsubscribe(dbname), - } - - return this._subscribe(db, dbname, subscribe).then(() => api); + counter(dbname, options) { + if(!options) options = { subscribe: true }; + const store = new CounterStore(this._ipfs, dbname, options); + return this._subscribe(store, dbname, options.subscribe) + .then(() => this.stores[dbname] = store) + .then(() => store); } disconnect() { @@ -70,10 +53,11 @@ class OrbitDB { _subscribe(store, dbname, subscribe, callback) { if(subscribe === undefined) subscribe = true; - return store.use(dbname, this.user.username).then((events) => { + return store.use(this.user.username).then((events) => { events.on('readable', this._onSync.bind(this)); events.on('data', this._onWrite.bind(this)); events.on('load', this._onLoad.bind(this)); + events.on('close', this._onClose.bind(this)); if(subscribe) this._pubsub.subscribe(dbname, '', this._onMessage.bind(this)); @@ -82,12 +66,12 @@ class OrbitDB { }); } - _onMessage(channel, message) { - [this.eventStore, this.kvStore, this.counterStore].forEach((store) => { - store.sync(channel, message).catch((e) => logger.error(e.stack)); - }) + _onMessage(dbname, message) { + const store = this.stores[dbname]; + store.sync(message).catch((e) => logger.error(e.stack)); } + // TODO: FIX EVENTS!! _onWrite(channel, hash) { this._pubsub.publish(channel, hash); this.events.emit('data', channel, hash); @@ -101,6 +85,12 @@ class OrbitDB { this.events.emit('load', channel, hash); } + _onClose(dbname) { + this._pubsub.unsubscribe(dbname); + delete this.stores[dbname]; + this.events.emit('closed', dbname); + } + _connect(host, port, username, password, allowOffline) { return new Promise((resolve, reject) => { if(allowOffline === undefined) allowOffline = false; diff --git a/src/oplog/OperationsLog.js b/src/oplog/OperationsLog.js index 7b4d3c8..d94cd3c 100644 --- a/src/oplog/OperationsLog.js +++ b/src/oplog/OperationsLog.js @@ -57,8 +57,12 @@ class OperationsLog { const oldCount = this._log.items.length; let newItems = []; return Log.fromIpfsHash(this._ipfs, hash) - .then((other) => this._log.join(other)) - .then((merged) => newItems = merged) + .then((other) => { + return this._log.join(other) + }) + .then((merged) => { + newItems = merged + }) .then(() => Cache.set(this.dbname, hash)) .then(() => newItems.map((f) => f.payload)) } diff --git a/src/stores/Store.js b/src/stores/Store.js index c5e0ee2..5c34418 100644 --- a/src/stores/Store.js +++ b/src/stores/Store.js @@ -5,44 +5,49 @@ const OperationsLog = require('../oplog/OperationsLog'); const DefaultIndex = require('./DefaultIndex'); class Store { - constructor(ipfs, options) { - this._ipfs = ipfs; - this._index = new DefaultIndex(); - this._oplogs = {}; - this.events = {}; + constructor(ipfs, dbname, options) { + this.dbname = dbname; + this.events = null; this.options = options || {}; + this._index = new DefaultIndex(); + this._oplog = null; + this._ipfs = ipfs; } - use(dbname, id) { - this.events[dbname] = new EventEmitter(); - const oplog = new OperationsLog(this._ipfs, dbname, this.events[dbname], this.options); + use(id) { + this.events = new EventEmitter(); + const oplog = new OperationsLog(this._ipfs, this.dbname, this.events, this.options); return oplog.load(id) .then((merged) => this._index.updateIndex(oplog, merged)) - .then(() => this._oplogs[dbname] = oplog) - .then(() => this.events[dbname]); + .then(() => this._oplog = oplog) + .then(() => this.events); } - sync(dbname, hash) { - const oplog = this._oplogs[dbname]; + close() { + this.events.emit('close', this.dbname); + } + + sync(hash) { + const oplog = this._oplog; let newItems; if(hash && oplog) { return oplog.merge(hash) .then((merged) => newItems = merged) .then(() => this._index.updateIndex(oplog, newItems)) - .then(() => this.events[dbname].emit('readable', dbname)) + .then(() => this.events.emit('readable', this.dbname)) .then(() => newItems) } return Promise.resolve([]); } - delete(dbname) { - if(this._oplogs[dbname]) - this._oplogs[dbname].delete(); + delete() { + if(this._oplog) + this._oplog.delete(); } - _addOperation(dbname, type, key, data) { - const oplog = this._oplogs[dbname]; + _addOperation(type, key, data) { + const oplog = this._oplog; let result; if(oplog) { return oplog.addOperation(type, key, data) diff --git a/src/stores/counters/CounterIndex.js b/src/stores/counters/CounterIndex.js index 05ca89a..ab67d6e 100644 --- a/src/stores/counters/CounterIndex.js +++ b/src/stores/counters/CounterIndex.js @@ -4,25 +4,25 @@ const Counter = require('../../crdts/GCounter'); class CounterIndex { constructor() { - this._index = {}; + this._index = null; } - createCounter(dbname, id) { - this._index[dbname] = new Counter(id); + createCounter(id) { + this._index = new Counter(id); } - get(dbname) { - return this._index[dbname]; + get() { + return this._index; } updateIndex(oplog, updated) { - const counter = this._index[oplog.dbname]; + const counter = this._index; if(counter) { updated.filter((f) => f && f.op === 'COUNTER') .map((f) => Counter.from(f.value)) .forEach((f) => counter.merge(f)) - this._index[oplog.dbname] = counter; + this._index = counter; } } } diff --git a/src/stores/counters/CounterStore.js b/src/stores/counters/CounterStore.js index 805e222..c0c0291 100644 --- a/src/stores/counters/CounterStore.js +++ b/src/stores/counters/CounterStore.js @@ -4,30 +4,30 @@ const Store = require('../Store'); const CounterIndex = require('./CounterIndex'); class CounterStore extends Store { - constructor(ipfs, options) { - super(ipfs, options) + constructor(ipfs, dbname, options) { + super(ipfs, dbname, options) this._index = new CounterIndex(); } - use(dbname, id) { - this._index.createCounter(dbname, id); - return super.use(dbname, id); + use(id) { + this._index.createCounter(id); + return super.use(id); } - delete(dbname) { - super.delete(dbname); + delete() { + super.delete(); this._index = new CounterIndex(); } - query(dbname) { - return this._index.get(dbname).value; + value() { + return this._index.get().value; } - inc(dbname, amount) { - const counter = this._index.get(dbname); + inc(amount) { + const counter = this._index.get(); if(counter) { counter.increment(amount); - return this._addOperation(dbname, 'COUNTER', null, counter.payload); + return this._addOperation('COUNTER', null, counter.payload); } } } diff --git a/src/stores/eventlog/EventStore.js b/src/stores/eventlog/EventStore.js index dc15101..bebee31 100644 --- a/src/stores/eventlog/EventStore.js +++ b/src/stores/eventlog/EventStore.js @@ -5,26 +5,26 @@ const Store = require('../Store'); const EventLogIndex = require('./EventIndex'); class EventStore extends Store { - constructor(ipfs, options) { - super(ipfs, options) + constructor(ipfs, dbname, options) { + super(ipfs, dbname, options) this._index = new EventLogIndex(); } delete(dbname) { - super.delete(dbname); + super.delete(); this._index = new EventLogIndex(); } - add(dbname, data) { - return this._addOperation(dbname, 'ADD', null, data); + add(data) { + return this._addOperation('ADD', null, data); } - remove(dbname, hash) { - return this._addOperation(dbname, 'DELETE', hash); + remove(hash) { + return this._addOperation('DELETE', hash); } - iterator(dbname, options) { - const messages = this._query(dbname, options); + iterator(options) { + const messages = this._query(this.dbname, options); let currentIndex = 0; let iterator = { [Symbol.iterator]() { diff --git a/src/stores/kvstore/KeyValueIndex.js b/src/stores/kvstore/KeyValueIndex.js index 4f78fc8..237473f 100644 --- a/src/stores/kvstore/KeyValueIndex.js +++ b/src/stores/kvstore/KeyValueIndex.js @@ -15,10 +15,11 @@ class KeyValueIndex { updated.reverse().forEach((item) => { if(handled.indexOf(item.key) === -1) { handled.push(item.key); - if(item.op === 'PUT') + if(item.op === 'PUT') { this._index[item.key] = item.value - else if (item.op === 'DELETE') + } else if (item.op === 'DELETE') { delete this._index[item.key]; + } } }); } diff --git a/src/stores/kvstore/KeyValueStore.js b/src/stores/kvstore/KeyValueStore.js index 376a480..8090432 100644 --- a/src/stores/kvstore/KeyValueStore.js +++ b/src/stores/kvstore/KeyValueStore.js @@ -4,30 +4,30 @@ const Store = require('../Store'); const KVIndex = require('./KeyValueIndex'); class KeyValueStore extends Store { - constructor(ipfs, options) { - super(ipfs, options) + constructor(ipfs, dbname, options) { + super(ipfs, dbname, options) this._index = new KVIndex(); } - delete(dbname) { - super.delete(dbname); + delete() { + super.delete(); this._index = new KVIndex(); } - get(dbname, key) { + get(key) { return this._index.get(key); } - set(dbname, key, data) { - this.put(dbname, key, data); + set(key, data) { + this.put(key, data); } - put(dbname, key, data) { - return this._addOperation(dbname, 'PUT', key, data); + put(key, data) { + return this._addOperation('PUT', key, data); } - del(dbname, key) { - return this._addOperation(dbname, 'DELETE', key); + del(key) { + return this._addOperation('DELETE', key); } } diff --git a/test/client.test.js b/test/client.test.js index 28b62b5..e640468 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -22,13 +22,13 @@ const startIpfs = () => { if(err) console.error(err); resolve(ipfs); }); - ipfsd.local((err, node) => { - if(err) reject(err); - node.startDaemon((err, ipfs) => { - if(err) reject(err); - resolve(ipfs); - }); - }); + // ipfsd.local((err, node) => { + // if(err) reject(err); + // node.startDaemon((err, ipfs) => { + // if(err) reject(err); + // resolve(ipfs); + // }); + // }); }); }; @@ -134,7 +134,7 @@ describe('Orbit Client', function() { describe('Add events', function() { beforeEach(async((done) => { - db = await(client.eventlog(channel, false)); + db = await(client.eventlog(channel, { subscribe: false })); db.delete(); done(); })); @@ -187,7 +187,7 @@ describe('Orbit Client', function() { it('deletes an item when only one item in the database', async((done) => { const head = await(db.add('hello1')); - const delop = await(db.del(head)); + const delop = await(db.remove(head)); const items = db.iterator().collect(); assert.equal(delop.startsWith('Qm'), true); assert.equal(items.length, 0); @@ -197,7 +197,7 @@ describe('Orbit Client', function() { it('deletes an item when two items in the database', async((done) => { await(db.add('hello1')); const head = await(db.add('hello2')); - await(db.del(head)); + await(db.remove(head)); const items = db.iterator({ limit: -1 }).collect(); assert.equal(items.length, 1); assert.equal(items[0].value, 'hello1'); @@ -207,7 +207,7 @@ describe('Orbit Client', function() { it('deletes an item between adds', async((done) => { const head = await(db.add('hello1')); await(db.add('hello2')); - db.del(head); + db.remove(head); await(db.add('hello3')); const items = db.iterator().collect(); assert.equal(items.length, 1); @@ -224,7 +224,7 @@ describe('Orbit Client', function() { beforeEach(async((done) => { items = []; - db = await(client.eventlog(channel, false)); + db = await(client.eventlog(channel, { subscribe: false })); db.delete(); for(let i = 0; i < itemCount; i ++) { const hash = await(db.add('hello' + i)); @@ -529,15 +529,18 @@ describe('Orbit Client', function() { describe('Key-Value Store', function() { beforeEach(async((done) => { - db = await(client.kvstore(channel, false)); + db = await(client.kvstore(channel, { subscribe: false })); db.delete(); done(); })); afterEach((done) => { db.delete(); + client.events.on('closed', (dbname) => { + client.events.removeAllListeners('closed') + done() + }); db.close(); - done(); }); it('put', async((done) => { @@ -617,7 +620,8 @@ describe('Orbit Client', function() { })); it('syncs databases', async((done) => { - const db2 = await(client2.kvstore(channel, false)); + const db2 = await(client2.kvstore(channel, { subscribe: false })); + db2.delete(); await(db.put('key1', 'hello1')); await(db2.put('key1', 'hello2')); await(db.sync('QmNtELU2N3heY9cFgRuLWavgov7NTXibNyZCxcTCYjw1TM')) diff --git a/test/counterdb.test.js b/test/counterdb.test.js index d24ac14..cc33218 100644 --- a/test/counterdb.test.js +++ b/test/counterdb.test.js @@ -17,6 +17,7 @@ const ipfsPath = '/tmp/orbittests'; const startIpfs = () => { return new Promise((resolve, reject) => { + OrbitServer.start(); // ipfsd.local(ipfsPath, (err, node) => { // if(err) reject(err); // node.startDaemon((err, ipfs) => { @@ -24,7 +25,6 @@ const startIpfs = () => { // resolve(ipfs); // }); // }); - OrbitServer.start(); ipfsd.disposableApi((err, ipfs) => { if(err) reject(err); resolve(ipfs);