Fix joins

This commit is contained in:
haad 2016-04-12 20:48:50 +02:00
parent 940731a5ea
commit c68a304fc8
5 changed files with 50 additions and 38 deletions

View File

@ -14,7 +14,7 @@
"main": "src/Client.js", "main": "src/Client.js",
"dependencies": { "dependencies": {
"asyncawait": "^1.0.1", "asyncawait": "^1.0.1",
"ipfs-log": "^1.1.3", "ipfs-log": "^1.1.4",
"lazy.js": "^0.4.2", "lazy.js": "^0.4.2",
"lodash": "^4.3.0", "lodash": "^4.3.0",
"log4js": "^0.6.33", "log4js": "^0.6.33",

View File

@ -11,9 +11,7 @@ let cache = {};
class Cache { class Cache {
static set(key, value) { static set(key, value) {
cache[key] = value; cache[key] = value;
fs.writeFile(filePath, JSON.stringify(cache, null, 2) + "\n", (err) => { fs.writeFileSync(filePath, JSON.stringify(cache, null, 2) + "\n");
if (err) throw err;
});
} }
static get(key) { static get(key) {

View File

@ -17,48 +17,61 @@ class OrbitDB {
this.options = options || {}; this.options = options || {};
this.lastWrite = null; this.lastWrite = null;
this._cached = []; this._cached = [];
this._state = {};
} }
/* Public methods */ /* Public methods */
use(channel, user) { use(channel, user) {
this.user = user; this.user = user;
this._state[channel] = true;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
Log.create(this._ipfs, this.user.username).then((log) => { Log.create(this._ipfs, this.user.username).then((log) => {
this._logs[channel] = log; this._logs[channel] = log;
this.events[channel] = new EventEmitter(); this.events[channel] = new EventEmitter();
if(this.options.cacheFile) { if(this.options.cacheFile) {
Cache.loadCache(this.options.cacheFile); Cache.loadCache(this.options.cacheFile);
this.sync(channel, Cache.get(channel)); this.sync(channel, Cache.get(channel)).then(() => {
this._state[channel] = false;
resolve();
}).catch(reject);
} else {
resolve();
} }
resolve();
}).catch(reject); }).catch(reject);
}); });
} }
sync(channel, hash) { sync(channel, hash) {
// console.log("--> Head:", hash) // console.log("--> Head:", hash)
if(hash && hash !== this.lastWrite && this._logs[channel]) { return new Promise((resolve, reject) => {
this.events[channel].emit('load', 'sync', channel); if(hash && hash !== this.lastWrite && this._logs[channel]) {
const oldCount = this._logs[channel].items.length; this.events[channel].emit('load', 'sync', channel);
Log.fromIpfsHash(this._ipfs, hash).then((other) => { const oldCount = this._logs[channel].items.length;
this._logs[channel].join(other).then(() => { Log.fromIpfsHash(this._ipfs, hash).then((other) => {
// Only emit the event if something was added this._logs[channel].join(other).then((merged) => {
const joinedCount = this._logs[channel].items.length - oldCount; // Only emit the event if something was added
if(joinedCount > 0) { const joinedCount = this._logs[channel].items.length - oldCount;
this.events[channel].emit('sync', channel, hash); if(joinedCount > 0) {
Cache.set(channel, hash); this.events[channel].emit('sync', channel, hash);
// Cache the payloads Cache.set(channel, hash);
this._cacheOperations(other) // Cache the payloads
.then(() => this.events[channel].emit('loaded', 'sync', channel)) this._cacheOperations(this._logs[channel])
.catch((e) => this.events[channel].emit('error', e.message)); .then(() => {
} else { this.events[channel].emit('loaded', 'sync', channel);
this.events[channel].emit('loaded', 'sync', channel); resolve();
} })
.catch(reject);
} else {
this.events[channel].emit('loaded', 'sync', channel);
resolve();
}
});
}); });
}); } else {
} else { this.events[channel].emit('loaded', 'sync', channel);
this.events[channel].emit('loaded', 'sync', channel); resolve();
} }
});
} }
/* DB Operations */ /* DB Operations */
@ -66,7 +79,7 @@ class OrbitDB {
// Get items from the db // Get items from the db
query(channel, password, opts) { query(channel, password, opts) {
this.events[channel].emit('load', 'query', channel); this.events[channel].emit('load', 'query', channel);
// console.log("--> Query:", channel, opts); // console.log("--> Query:", channel, opts, this._logs[channel].items.length);
if(!opts) opts = {}; if(!opts) opts = {};
if(!this._cached) this._cached = []; if(!this._cached) this._cached = [];
@ -80,11 +93,9 @@ class OrbitDB {
// Key-Value, search latest key first // Key-Value, search latest key first
result = this._read(operations.reverse(), opts.key, 1, true).map((f) => f.value); result = this._read(operations.reverse(), opts.key, 1, true).map((f) => f.value);
} else if(opts.gt || opts.gte) { } else if(opts.gt || opts.gte) {
// console.log(1)
// Greater than case // Greater than case
result = this._read(operations, opts.gt ? opts.gt : opts.gte, amount, opts.gte ? opts.gte : false) result = this._read(operations, opts.gt ? opts.gt : opts.gte, amount, opts.gte ? opts.gte : false)
} else { } else {
// console.log(2)
// Lower than and lastN case, search latest first by reversing the sequence // Lower than and lastN case, search latest first by reversing the sequence
result = this._read(operations.reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse() result = this._read(operations.reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse()
} }
@ -92,7 +103,7 @@ class OrbitDB {
// console.log("++", result.toArray()) // console.log("++", result.toArray())
if(opts.reverse) result.reverse(); if(opts.reverse) result.reverse();
const res = result.toArray(); const res = result.toArray();
// console.log("--> Found", res.length, "items"); // console.log("--> Found", res.length, "items", this._logs[channel].items.length);
this.events[channel].emit('loaded', 'query', channel); this.events[channel].emit('loaded', 'query', channel);
return res; return res;
} }
@ -143,6 +154,7 @@ class OrbitDB {
// console.log("e", e, f) // console.log("e", e, f)
return e.hash === f.payload return e.hash === f.payload
})) }))
.compact() // Remove nulls
.skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested .skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested
.map((f) => { .map((f) => {
// console.log("f", f, "key", key); // console.log("f", f, "key", key);
@ -179,7 +191,10 @@ class OrbitDB {
// Cache DB operation entries in memory from a log // Cache DB operation entries in memory from a log
_cacheOperations(log) { _cacheOperations(log) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const payloadHashes = log.items.map((f) => f.payload); const payloadHashes = log.items
.map((f) => f.payload)
.filter((f) => Lazy(this._cached).find((e) => e.hash === f.payload) === undefined);
Promise.map(payloadHashes, (f) => OrbitDB.fetchPayload(this._ipfs, f), { concurrency: 4 }) Promise.map(payloadHashes, (f) => OrbitDB.fetchPayload(this._ipfs, f), { concurrency: 4 })
.then((payloads) => { .then((payloads) => {
payloads.forEach((f) => this._cached.push(f)); payloads.forEach((f) => this._cached.push(f));

View File

@ -15,7 +15,7 @@ const username = 'testrunner';
const password = ''; const password = '';
describe('Orbit Client', function() { describe('Orbit Client', function() {
this.timeout(2000); this.timeout(20000);
let client, db; let client, db;
let channel = 'abcdefgh'; let channel = 'abcdefgh';

View File

@ -12,7 +12,7 @@ const OrbitDB = require('../src/OrbitDB');
const Log = require('ipfs-log'); const Log = require('ipfs-log');
// Mute logging // Mute logging
require('log4js').setGlobalLogLevel('ERROR'); // require('log4js').setGlobalLogLevel('ERROR');
// Orbit // Orbit
const username = 'testrunner'; const username = 'testrunner';
@ -173,11 +173,10 @@ describe('OrbitDB', function() {
}); });
it('throws an error if fetching went wrong', (done) => { it('throws an error if fetching went wrong', (done) => {
db.events[channel].once('error', (e) => { db.sync(channel, otherLogHash).catch((e) => {
assert.equal(e, 'invalid ipfs ref path'); assert.equal(e.message, 'invalid ipfs ref path');
done(); done();
}); })
db.sync(channel, otherLogHash);
}); });
}); });