Refactor everything

This commit is contained in:
haad 2016-04-26 16:22:19 +02:00
parent a858eabbe3
commit 2ddba23db0
22 changed files with 16734 additions and 15790 deletions

16625
dist/orbitdb.min.js vendored

File diff suppressed because it is too large Load Diff

View File

@ -16,7 +16,7 @@ const value = 'Hello world';
try {
const ipfs = ipfsAPI();
OrbitDB.connect(host, port, username, password, ipfs).then((orbit) => {
orbit.channel(channel).then((db) => {
orbit.kvstore(channel).then((db) => {
let count = 1;
const query = () => {
const startTime = new Date().getTime();

File diff suppressed because it is too large Load Diff

View File

@ -31,7 +31,7 @@ let run = (async(() => {
try {
const ipfs = await(startIpfs());
const orbit = await(OrbitDB.connect(host, port, username, password, ipfs));
const db = await(orbit.channel(channelName));
const db = await(orbit.eventlog(channelName));
let count = 1;
let running = false;

View File

@ -30,7 +30,7 @@ let run = (async(() => {
try {
const ipfs = await(startIpfs());
const orbit = await(OrbitDB.connect(host, port, username, password, ipfs));
const db = await(orbit.channel(channel));
const db = await(orbit.kvstore(channel));
let count = 1;

View File

@ -4,14 +4,16 @@ const fs = require('fs');
const path = require('path');
const logger = require('logplease').create("orbit-db.Cache");
const defaultFilepath = path.resolve('./orbit-db-cache.json');
let filePath = defaultFilepath;
// const defaultFilepath = path.resolve('./orbit-db-cache.json');
// let filePath = defaultFilepath;
let filePath;
let cache = {};
class Cache {
static set(key, value) {
cache[key] = value;
fs.writeFileSync(filePath, JSON.stringify(cache, null, 2) + "\n");
if(filePath)
fs.writeFileSync(filePath, JSON.stringify(cache, null, 2) + "\n");
}
static get(key) {
@ -19,10 +21,11 @@ class Cache {
}
static loadCache(cacheFile) {
filePath = cacheFile ? cacheFile : defaultFilepath;
if(fs.existsSync(filePath)) {
logger.debug('Load cache from ' + filePath);
cache = JSON.parse(fs.readFileSync(filePath));
// filePath = cacheFile ? cacheFile : defaultFilepath;
if(cacheFile && fs.existsSync(cacheFile)) {
filePath = cacheFile;
logger.debug('Load cache from ' + cacheFile);
cache = JSON.parse(fs.readFileSync(cacheFile));
}
}
}

View File

@ -1,11 +1,11 @@
'use strict';
const EventEmitter = require('events').EventEmitter;
const logger = require('logplease').create("orbit-db.Client");
const PubSub = require('./PubSub');
const CounterDB = require('./db/CounterDB');
const KeyValueDB = require('./db/KeyValueDB');
const EventLogDB = require('./db/EventLogDB');
const EventEmitter = require('events').EventEmitter;
const logger = require('logplease').create("orbit-db.Client");
const PubSub = require('./PubSub');
const CounterStore = require('./db/CounterStore');
const KeyValueStore = require('./db/KeyValueStore');
const EventStore = require('./db/EventStore');
class Client {
constructor(ipfs, options) {
@ -14,13 +14,13 @@ class Client {
this.user = null;
this.network = null;
this.events = new EventEmitter();
this.eventlogDB = new EventLogDB(this._ipfs, options);
this.counterDB = new CounterDB(this._ipfs, options);
this.keyvalueDB = new KeyValueDB(this._ipfs, options);
this.kvStore = new KeyValueStore(this._ipfs, options);
this.eventStore = new EventStore(this._ipfs, options);
this.counterStore = new CounterStore(this._ipfs, options);
}
eventlog(dbname, subscribe) {
const db = this.eventlogDB;
const db = this.eventStore;
const api = {
iterator: (options) => db.iterator(dbname, options),
add: (data) => db.add(dbname, data),
@ -33,7 +33,7 @@ class Client {
}
kvstore(dbname, subscribe) {
const db = this.keyvalueDB;
const db = this.kvStore;
const api = {
put: (key, value) => db.put(dbname, key, value),
set: (key, value) => db.set(dbname, key, value), // alias for put()
@ -47,7 +47,7 @@ class Client {
}
counter(dbname, subscribe) {
const db = this.counterDB;
const db = this.counterStore;
const api = {
value: () => db.query(dbname),
inc: (amount) => db.inc(dbname, amount),
@ -69,11 +69,10 @@ class Client {
_subscribe(db, dbname, subscribe, callback) {
if(subscribe === undefined) subscribe = true;
return db.use(dbname, this.user.username).then(() => {
db.events[dbname].on('write', this._onWrite.bind(this));
db.events[dbname].on('sync', this._onSync.bind(this));
db.events[dbname].on('load', this._onLoad.bind(this));
db.events[dbname].on('loaded', this._onLoaded.bind(this));
return db.use(dbname, this.user.username).then((events) => {
events.on('readable', this._onSync.bind(this));
events.on('data', this._onWrite.bind(this));
events.on('load', this._onLoad.bind(this));
if(subscribe)
this._pubsub.subscribe(dbname, '', this._onMessage.bind(this));
@ -83,9 +82,9 @@ class Client {
}
_onMessage(channel, message) {
this.eventlogDB.sync(channel, message).catch((e) => logger.error(e.stack));
this.counterDB.sync(channel, message).catch((e) => logger.error(e.stack));
this.keyvalueDB.sync(channel, message).catch((e) => logger.error(e.stack));
[this.eventStore, this.kvStore, this.counterStore].forEach((db) => {
db.sync(channel, message).catch((e) => logger.error(e.stack));
})
}
_onWrite(channel, hash) {
@ -94,17 +93,13 @@ class Client {
}
_onSync(channel, hash) {
this.events.emit('data', channel, hash);
this.events.emit('readable', channel, hash);
}
_onLoad(channel, hash) {
this.events.emit('load', channel, hash);
}
_onLoaded(channel, hash) {
this.events.emit('loaded', channel, hash);
}
_connect(host, port, username, password, allowOffline) {
return new Promise((resolve, reject) => {
if(allowOffline === undefined) allowOffline = false;

View File

@ -1,77 +0,0 @@
'use strict';
const Lazy = require('lazy.js');
const EventEmitter = require('events').EventEmitter;
const logger = require('logplease').create("orbit-db.OrbitDB");
const Log = require('ipfs-log');
const DBOperation = require('./db/Operation');
const Post = require('./post/Post');
const Cache = require('./Cache');
const Counter = require('./GCounter.js');
class CounterDB {
constructor(ipfs, options) {
this._ipfs = ipfs;
this.options = options || {};
this.events = {};
this._counters = {};
this._aggregators = {};
}
/* Public methods */
use(channel, user) {
this.user = user;
this.events[channel] = new EventEmitter();
this._counters[channel] = new Counter(user.username);
this._aggregators[channel] = new LogAggregator(this._ipfs, this.options);
return this._aggregators[channel].create(user)
.then(() => Cache.loadCache(this.options.cacheFile))
.then(() => this.sync(channel, Cache.get(channel)));
}
sync(channel, hash) {
// console.log("--> Head:", hash, this.user.username)
const aggregator = this._aggregators[channel];
const counter = this._counters[channel];
return aggregator.sync(hash)
.then(() => {
return Lazy(aggregator.cached)
.map((f) => Counter.from(f.value))
.map((f) => counter.merge(f))
.toArray();
}).then(() => Cache.set(channel, hash));
}
_write(channel, password, operation, key, value) {
const aggregator = this._aggregators[channel];
const log = aggregator._log;
return DBOperation.create(this._ipfs, log, this.user, operation, key, value)
.then((result) => {
aggregator._cachePayload(result.node.payload, result.op);
return result;
}).then((result) => {
return Log.getIpfsHash(this._ipfs, log)
.then((listHash) => {
aggregator.lastWrite = listHash;
Cache.set(channel, listHash);
this.events[channel].emit('write', channel, listHash);
return result;
});
}).then((result) => result.node.payload);
}
/* Operations */
query(channel) {
return this._counters[channel].value;
}
inc(channel, amount) {
const counter = this._counters[channel];
if(counter) {
counter.increment(amount);
return this._write(channel, '', DBOperation.Types.Inc, null, counter.payload);
}
}
}
module.exports = CounterDB;

View File

@ -1,216 +0,0 @@
'use strict';
const Lazy = require('lazy.js');
const EventEmitter = require('events').EventEmitter;
const logger = require('logplease').create("orbit-db.OrbitDB");
const Log = require('ipfs-log');
const DBOperation = require('./db/Operation');
const Post = require('./post/Post');
const Cache = require('./Cache');
class OrbitDB {
constructor(ipfs, options) {
this._ipfs = ipfs;
this._logs = {};
this.events = {};
this.options = options || {};
this.lastWrite = null;
this._cached = [];
this._state = {};
}
/* Public methods */
use(channel, user) {
this.user = user;
this._state[channel] = true;
return new Promise((resolve, reject) => {
Log.create(this._ipfs, this.user.username).then((log) => {
this._logs[channel] = log;
this.events[channel] = new EventEmitter();
if(this.options.cacheFile) {
Cache.loadCache(this.options.cacheFile);
this.sync(channel, Cache.get(channel)).then(() => {
this._state[channel] = false;
resolve();
}).catch(reject);
} else {
resolve();
}
}).catch(reject);
});
}
sync(channel, hash) {
// console.log("--> Head:", hash)
return new Promise((resolve, reject) => {
if(hash && hash !== this.lastWrite && this._logs[channel]) {
this.events[channel].emit('load', 'sync', channel);
const oldCount = this._logs[channel].items.length;
Log.fromIpfsHash(this._ipfs, hash).then((other) => {
this._logs[channel].join(other).then((merged) => {
// Only emit the event if something was added
const joinedCount = this._logs[channel].items.length - oldCount;
if(joinedCount > 0) {
Cache.set(channel, hash);
// Cache the payloads
this._cacheOperations(this._logs[channel])
.then(() => {
this.events[channel].emit('sync', channel, hash);
this.events[channel].emit('loaded', 'sync', channel);
resolve();
})
.catch(reject);
} else {
this.events[channel].emit('loaded', 'sync', channel);
resolve();
}
});
});
} else {
this.events[channel].emit('loaded', 'sync', channel);
resolve();
}
});
}
/* DB Operations */
// Get items from the db
query(channel, password, opts) {
this.events[channel].emit('load', 'query', channel);
// console.log("--> Query:", channel, opts, this._logs[channel].items.length);
if(!opts) opts = {};
if(!this._cached) this._cached = [];
const operations = Lazy(this._logs[channel].items);
const amount = opts.limit ? (opts.limit > -1 ? opts.limit : this._logs[channel].items.length) : 1; // Return 1 if no limit is provided
let result = [];
if(opts.key) {
// Key-Value, search latest key first
result = this._read(operations.reverse(), opts.key, 1, true).map((f) => f.value);
} else if(opts.gt || opts.gte) {
// Greater than case
result = this._read(operations, opts.gt ? opts.gt : opts.gte, amount, opts.gte ? opts.gte : false)
} else {
// Lower than and lastN case, search latest first by reversing the sequence
result = this._read(operations.reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse()
}
if(opts.reverse) result.reverse();
const res = result.toArray();
// console.log("--> Found", res.length, "items", this._logs[channel].items.length);
this.events[channel].emit('loaded', 'query', channel);
return res;
}
// Adds an event to the log
add(channel, password, data) {
return this._write(channel, password, DBOperation.Types.Add, null, data);
}
// Sets a key-value pair
put(channel, password, key, data) {
return this._write(channel, password, DBOperation.Types.Put, key, data);
}
// Deletes an event based on hash (of the operation) or 'key' of a key/val pair
del(channel, password, key) {
return this._write(channel, password, DBOperation.Types.Delete, key);
}
deleteChannel(channel, password) {
if(this._logs[channel]) {
this._logs[channel].clear();
return true;
}
return false;
}
/* Private methods */
// Cache DB operation entries in memory from a log
_cacheOperations(log) {
return new Promise((resolve, reject) => {
const payloadHashes = log.items
.map((f) => f.payload)
.filter((f) => Lazy(this._cached).find((e) => e.hash === f.payload) === undefined);
const promises = payloadHashes.map((f) => OrbitDB.fetchPayload(this._ipfs, f));
Promise.all(promises).then((payloads) => {
payloads.forEach((f) => this._cached.push(f));
resolve();
}).catch(reject);
});
}
// LWW-element-set
_read(sequence, key, amount, inclusive) {
// Last-Write-Wins, ie. use only the first occurance of the key
let handled = [];
const _createLWWSet = (item) => {
if(Lazy(handled).indexOf(item.key) === -1) {
handled.push(item.key);
if(DBOperation.Types.isInsert(item.op))
return item;
}
return null;
};
// Find the items from the sequence (list of operations)
return sequence
.map((f) => Lazy(this._cached).find((e) => {
return e.hash === f.payload
}))
.compact() // Remove nulls
.skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested
.map((f) => {
return f;
})
.map(_createLWWSet) // Return items as LWW (ignore values after the first found)
.compact() // Remove nulls
.drop(inclusive ? 0 : 1) // Drop the 'gt/lt' item, include 'gte/lte' item
.take(amount);
}
// Write an op to the db
_write(channel, password, operation, key, value) {
return new Promise((resolve, reject) => {
DBOperation.create(this._ipfs, this._logs[channel], this.user, operation, key, value)
.then((res) => {
Log.getIpfsHash(this._ipfs, this._logs[channel]).then((listHash) => {
this.lastWrite = listHash;
Cache.set(channel, listHash);
// Cache the payload
let op = JSON.parse(JSON.stringify(res.op));
Object.assign(op, { hash: res.node.payload });
if(op.key === null) Object.assign(op, { key: res.node.payload });
this._cached.push(op);
this.events[channel].emit('write', channel, listHash);
resolve(res.node.payload);
})
}).catch(reject);
});
}
static fetchPayload(ipfs, hash) {
return new Promise((resolve, reject) => {
ipfs.object.get(hash)
.then((payload) => {
let data = JSON.parse(payload.Data);
Object.assign(data, { hash: hash });
if(data.key === null) Object.assign(data, { key: hash });
resolve(data);
})
.catch(reject);
});
}
}
// TODO: move to where this is needed
module.exports = OrbitDB;

View File

@ -1,6 +1,6 @@
'use strict';
const Counter = require('./GCounter');
const Counter = require('./crdts/GCounter');
class CounterIndex {
constructor() {

View File

@ -1,10 +1,10 @@
'use strict';
const OrbitDB = require('./OrbitDB');
const OpTypes = require('./Operation').Types;
const Store = require('./Store');
const CounterIndex = require('./CounterIndex');
const OpTypes = require('./Operation').Types;
class CounterDB extends OrbitDB {
class CounterStore extends Store {
constructor(ipfs, options) {
super(ipfs, options)
this._index = new CounterIndex();
@ -33,4 +33,4 @@ class CounterDB extends OrbitDB {
}
}
module.exports = CounterDB;
module.exports = CounterStore;

View File

@ -10,7 +10,7 @@ class DefaultIndex {
}
updateIndex(oplog) {
this._index = oplog.ops;
this._index = oplog.ops
}
}

View File

@ -1,11 +1,11 @@
'use strict';
const Lazy = require('lazy.js');
const OrbitDB = require('./OrbitDB');
const Store = require('./Store');
const OpTypes = require('./Operation').Types;
const EventLogIndex = require('./EventLogIndex');
class EventLogDB extends OrbitDB {
class EventStore extends Store {
constructor(ipfs, options) {
super(ipfs, options)
this._index = new EventLogIndex();
@ -72,4 +72,4 @@ class EventLogDB extends OrbitDB {
}
}
module.exports = EventLogDB;
module.exports = EventStore;

View File

@ -2,7 +2,7 @@
const OpTypes = require('./Operation').Types;
class KVIndex {
class KeyValueIndex {
constructor() {
this._index = {};
}
@ -31,4 +31,4 @@ class KVIndex {
}
}
module.exports = KVIndex;
module.exports = KeyValueIndex;

View File

@ -1,10 +1,10 @@
'use strict';
const OrbitDB = require('./OrbitDB');
const Store = require('./Store');
const KVIndex = require('./KeyValueIndex');
const OpTypes = require('./Operation').Types;
const KVIndex = require('./KVIndex');
class KeyValueDB extends OrbitDB {
class KeyValueStore extends Store {
constructor(ipfs, options) {
super(ipfs, options)
this._index = new KVIndex();
@ -32,4 +32,4 @@ class KeyValueDB extends OrbitDB {
}
}
module.exports = KeyValueDB;
module.exports = KeyValueStore;

View File

@ -1,57 +0,0 @@
'use strict';
const isEqual = require('./utils').isEqual;
class GSet {
constructor(id, payload) {
this.id = id;
this._items = payload ? payload : [];
// this._counters[this.id] = this._counters[this.id] ? this._counters[this.id] : 0;
}
add(data) {
this._items.push(data);
}
// remove(data) {
// }
// increment(amount) {
// if(!amount) amount = 1;
// this._counters[this.id] = this._counters[this.id] + amount;
// }
// get value() {
// return Object.keys(this._counters)
// .map((f) => this._counters[f])
// .reduce((previousValue, currentValue) => previousValue + currentValue, 0);
// }
query() {
return this._items;
}
get payload() {
return { id: this.id, items: this._items };
}
// compare(other) {
// if(other.id !== this.id)
// return false;
// return isEqual(other._counters, this._counters);
// }
// merge(other) {
// Object.keys(other._counters).forEach((f) => {
// this._counters[f] = Math.max(this._counters[f] ? this._counters[f] : 0, other._counters[f]);
// });
// }
// static from(payload) {
// return new LWWSet(payload.id, payload.counters);
// }
}
module.exports = GSet;

View File

@ -5,30 +5,13 @@ const OrbitDBItem = require('../post/OrbitDBItem');
const Post = require('../post/Post');
class Operation {
static create(ipfs, log, user, operation, key, value, data) {
// return new Promise((resolve, reject) => {
let post;
return Operation._createOperation(ipfs, user, operation, key, value)
// .then((op) => {
// post = op.Post;
// return log.add(op.Hash);
// })
// .then((node) => resolve({ node: node, op: post }))
// .catch(reject);
// });
}
static _createOperation(ipfs, user, operation, key, value) {
return new Promise((resolve, reject) => {
const data = {
operation: operation,
key: key,
value: value
};
Post.create(ipfs, Post.Types.OrbitDBItem, data)
.then(resolve)
.catch(reject);
});
static create(ipfs, operation, key, value) {
const data = {
operation: operation,
key: key,
value: value
};
return Post.create(ipfs, Post.Types.OrbitDBItem, data);
}
static get Types() {

View File

@ -1,6 +1,5 @@
'use strict';
const Lazy = require('lazy.js');
const Log = require('ipfs-log');
const Cache = require('../Cache');
const DBOperation = require('./Operation');
@ -18,36 +17,24 @@ class OperationsLog {
}
get ops() {
return Lazy(this._log.items)
.map((f) => this._cached[f.payload])
.toArray();
return this._log.items.map((f) => this._cached[f.payload]);
}
create(id) {
this.events.emit('load', this);
this.events.emit('load', this.dbname);
this.id = id;
return Log.create(this._ipfs, id)
.then((log) => this._log = log)
.then(() => {
if(this.options.cacheFile)
return Cache.loadCache(this.options.cacheFile)
return;
})
.then(() => {
if(this.options.cacheFile)
return this.sync(Cache.get(this.dbname))
return;
})
.then(() => this)
.then(() => Cache.loadCache(this.options.cacheFile))
.then(() => this.merge(Cache.get(this.dbname)))
.then(() => this);
}
delete() {
this._log.clear();
}
sync(hash) {
merge(hash) {
if(!hash || hash === this.lastWrite || !this._log)
return Promise.resolve();
@ -64,14 +51,14 @@ class OperationsLog {
})
.then(() => {
Cache.set(this.dbname, hash)
this.events.emit('sync', this.dbname, hash)
this.events.emit('readable', this.dbname, hash)
return this;
})
}
addOperation(operation, key, value) {
let post;
return DBOperation.create(this._ipfs, this._log, this.user, operation, key, value)
return DBOperation.create(this._ipfs, operation, key, value)
.then((result) => {
return this._log.add(result.Hash).then((node) => {
return { node: node, op: result.Post };
@ -85,7 +72,7 @@ class OperationsLog {
return Log.getIpfsHash(this._ipfs, this._log).then((hash) => {
this.lastWrite = hash;
Cache.set(this.dbname, hash);
this.events.emit('write', this.dbname, hash);
this.events.emit('data', this.dbname, hash);
return result.op.hash;
});
})

View File

@ -4,10 +4,10 @@ const EventEmitter = require('events').EventEmitter;
const OperationsLog = require('./OperationsLog');
const DefaultIndex = require('./DefaultIndex');
class OrbitDB {
class Store {
constructor(ipfs, options) {
this._ipfs = ipfs;
this._index = new DefaultIndex();;
this._index = new DefaultIndex();
this._oplogs = {};
this.events = {};
this.options = options || {};
@ -17,21 +17,17 @@ class OrbitDB {
this.events[dbname] = new EventEmitter();
const oplog = new OperationsLog(this._ipfs, dbname, this.events[dbname], this.options);
return oplog.create(id)
.then(() => {
this._index.updateIndex(oplog);
this._oplogs[dbname] = oplog;
return this;
});
.then(() => this._oplogs[dbname] = oplog)
.then(() => this._index.updateIndex(oplog))
.then(() => this.events[dbname]);
}
sync(dbname, hash) {
const oplog = this._oplogs[dbname];
if(hash && oplog) {
return oplog.sync(hash)
.then((result) => {
this._index.updateIndex(oplog);
return this;
});
return oplog.merge(hash)
.then(() => this._index.updateIndex(oplog))
.then(() => this);
}
return Promise.resolve(this);
@ -44,14 +40,14 @@ class OrbitDB {
_addOperation(dbname, type, key, data) {
const oplog = this._oplogs[dbname];
let result;
if(oplog) {
return oplog.addOperation(type, key, data)
.then((result) => {
this._index.updateIndex(oplog);
return result;
});
.then((op) => result = op)
.then(() => this._index.updateIndex(oplog))
.then(() => result);
}
}
}
module.exports = OrbitDB;
module.exports = Store;