diff --git a/package.json b/package.json index 8ec6f58..8bcfe33 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,7 @@ "main": "src/Client.js", "dependencies": { "asyncawait": "^1.0.1", - "ipfs-log": "^1.1.3", + "ipfs-log": "^1.1.4", "lazy.js": "^0.4.2", "lodash": "^4.3.0", "log4js": "^0.6.33", diff --git a/src/Cache.js b/src/Cache.js index 924e91a..42833fe 100644 --- a/src/Cache.js +++ b/src/Cache.js @@ -11,9 +11,7 @@ let cache = {}; class Cache { static set(key, value) { cache[key] = value; - fs.writeFile(filePath, JSON.stringify(cache, null, 2) + "\n", (err) => { - if (err) throw err; - }); + fs.writeFileSync(filePath, JSON.stringify(cache, null, 2) + "\n"); } static get(key) { diff --git a/src/OrbitDB.js b/src/OrbitDB.js index fcd887d..fd95933 100644 --- a/src/OrbitDB.js +++ b/src/OrbitDB.js @@ -17,48 +17,61 @@ class OrbitDB { this.options = options || {}; this.lastWrite = null; this._cached = []; + this._state = {}; } /* Public methods */ use(channel, user) { this.user = user; + this._state[channel] = true; return new Promise((resolve, reject) => { Log.create(this._ipfs, this.user.username).then((log) => { this._logs[channel] = log; this.events[channel] = new EventEmitter(); if(this.options.cacheFile) { Cache.loadCache(this.options.cacheFile); - this.sync(channel, Cache.get(channel)); + this.sync(channel, Cache.get(channel)).then(() => { + this._state[channel] = false; + resolve(); + }).catch(reject); + } else { + resolve(); } - resolve(); }).catch(reject); }); } sync(channel, hash) { // console.log("--> Head:", hash) - if(hash && hash !== this.lastWrite && this._logs[channel]) { - this.events[channel].emit('load', 'sync', channel); - const oldCount = this._logs[channel].items.length; - Log.fromIpfsHash(this._ipfs, hash).then((other) => { - this._logs[channel].join(other).then(() => { - // Only emit the event if something was added - const joinedCount = this._logs[channel].items.length - oldCount; - if(joinedCount > 0) { - this.events[channel].emit('sync', channel, hash); - Cache.set(channel, hash); - // Cache the payloads - this._cacheOperations(other) - .then(() => this.events[channel].emit('loaded', 'sync', channel)) - .catch((e) => this.events[channel].emit('error', e.message)); - } else { - this.events[channel].emit('loaded', 'sync', channel); - } + return new Promise((resolve, reject) => { + if(hash && hash !== this.lastWrite && this._logs[channel]) { + this.events[channel].emit('load', 'sync', channel); + const oldCount = this._logs[channel].items.length; + Log.fromIpfsHash(this._ipfs, hash).then((other) => { + this._logs[channel].join(other).then((merged) => { + // Only emit the event if something was added + const joinedCount = this._logs[channel].items.length - oldCount; + if(joinedCount > 0) { + this.events[channel].emit('sync', channel, hash); + Cache.set(channel, hash); + // Cache the payloads + this._cacheOperations(this._logs[channel]) + .then(() => { + this.events[channel].emit('loaded', 'sync', channel); + resolve(); + }) + .catch(reject); + } else { + this.events[channel].emit('loaded', 'sync', channel); + resolve(); + } + }); }); - }); - } else { - this.events[channel].emit('loaded', 'sync', channel); - } + } else { + this.events[channel].emit('loaded', 'sync', channel); + resolve(); + } + }); } /* DB Operations */ @@ -66,7 +79,7 @@ class OrbitDB { // Get items from the db query(channel, password, opts) { this.events[channel].emit('load', 'query', channel); - // console.log("--> Query:", channel, opts); + // console.log("--> Query:", channel, opts, this._logs[channel].items.length); if(!opts) opts = {}; if(!this._cached) this._cached = []; @@ -80,11 +93,9 @@ class OrbitDB { // Key-Value, search latest key first result = this._read(operations.reverse(), opts.key, 1, true).map((f) => f.value); } else if(opts.gt || opts.gte) { - // console.log(1) // Greater than case result = this._read(operations, opts.gt ? opts.gt : opts.gte, amount, opts.gte ? opts.gte : false) } else { - // console.log(2) // Lower than and lastN case, search latest first by reversing the sequence result = this._read(operations.reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse() } @@ -92,7 +103,7 @@ class OrbitDB { // console.log("++", result.toArray()) if(opts.reverse) result.reverse(); const res = result.toArray(); - // console.log("--> Found", res.length, "items"); + // console.log("--> Found", res.length, "items", this._logs[channel].items.length); this.events[channel].emit('loaded', 'query', channel); return res; } @@ -143,6 +154,7 @@ class OrbitDB { // console.log("e", e, f) return e.hash === f.payload })) + .compact() // Remove nulls .skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested .map((f) => { // console.log("f", f, "key", key); @@ -179,7 +191,10 @@ class OrbitDB { // Cache DB operation entries in memory from a log _cacheOperations(log) { return new Promise((resolve, reject) => { - const payloadHashes = log.items.map((f) => f.payload); + const payloadHashes = log.items + .map((f) => f.payload) + .filter((f) => Lazy(this._cached).find((e) => e.hash === f.payload) === undefined); + Promise.map(payloadHashes, (f) => OrbitDB.fetchPayload(this._ipfs, f), { concurrency: 4 }) .then((payloads) => { payloads.forEach((f) => this._cached.push(f)); diff --git a/test/client.test.js b/test/client.test.js index 078215d..c1c88ff 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -15,7 +15,7 @@ const username = 'testrunner'; const password = ''; describe('Orbit Client', function() { - this.timeout(2000); + this.timeout(20000); let client, db; let channel = 'abcdefgh'; diff --git a/test/orbit-db.test.js b/test/orbit-db.test.js index 185f335..5d787cd 100644 --- a/test/orbit-db.test.js +++ b/test/orbit-db.test.js @@ -12,7 +12,7 @@ const OrbitDB = require('../src/OrbitDB'); const Log = require('ipfs-log'); // Mute logging -require('log4js').setGlobalLogLevel('ERROR'); +// require('log4js').setGlobalLogLevel('ERROR'); // Orbit const username = 'testrunner'; @@ -173,11 +173,10 @@ describe('OrbitDB', function() { }); it('throws an error if fetching went wrong', (done) => { - db.events[channel].once('error', (e) => { - assert.equal(e, 'invalid ipfs ref path'); + db.sync(channel, otherLogHash).catch((e) => { + assert.equal(e.message, 'invalid ipfs ref path'); done(); - }); - db.sync(channel, otherLogHash); + }) }); });