Refactor db operations. Rename methods to _read() and _write(). Add Post.publish().

This commit is contained in:
haad 2016-03-05 11:54:57 +01:00
parent be2e1ae32a
commit 47c6e1f4cb
6 changed files with 80 additions and 60 deletions

View File

@ -49,11 +49,10 @@ class OrbitClient {
this._store = {}; this._store = {};
this.user = null; this.user = null;
this.network = null; this.network = null;
this.db = null;
} }
_iterator(channel, password, options) { _iterator(channel, password, options) {
const messages = this.db.read(channel, password, options); const messages = this.db.query(channel, password, options);
let currentIndex = 0; let currentIndex = 0;
let iterator = { let iterator = {
[Symbol.iterator]() { [Symbol.iterator]() {

View File

@ -5,11 +5,9 @@ const EventEmitter = require('events').EventEmitter;
const async = require('asyncawait/async'); const async = require('asyncawait/async');
const await = require('asyncawait/await'); const await = require('asyncawait/await');
const ipfsAPI = require('orbit-common/lib/ipfs-api-promised'); const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
const Operations = require('./list/Operations'); const OrbitList = require('./list/OrbitList');
const List = require('./list/OrbitList'); const Operation = require('./db/Operation');
const OrbitDBItem = require('./db/OrbitDBItem'); const OpTypes = require('./db/OpTypes');
const ItemTypes = require('./db/ItemTypes');
const MetaInfo = require('./db/MetaInfo');
const Post = require('./db/Post'); const Post = require('./db/Post');
class OrbitDB { class OrbitDB {
@ -22,54 +20,59 @@ class OrbitDB {
/* Public methods */ /* Public methods */
use(channel, user, password) { use(channel, user, password) {
this.user = user; this.user = user;
this._logs[channel] = new List(this._ipfs, this.user.username); this._logs[channel] = new OrbitList(this._ipfs, this.user.username);
} }
sync(channel, hash) { sync(channel, hash) {
console.log("--> Head:", hash) console.log("--> Head:", hash)
if(hash && this._logs[channel]) { if(hash && this._logs[channel]) {
const other = List.fromIpfsHash(this._ipfs, hash); const other = OrbitList.fromIpfsHash(this._ipfs, hash);
this._logs[channel].join(other); this._logs[channel].join(other);
} }
} }
/* DB Operations */ /* DB Operations */
read(channel, password, opts) {
// Get items from the db
query(channel, password, opts) {
if(!opts) opts = {}; if(!opts) opts = {};
const operations = Lazy(this._logs[channel].items); const operations = Lazy(this._logs[channel].items);
const amount = opts.limit ? (opts.limit > -1 ? opts.limit : this._logs[channel].items.length) : 1; const amount = opts.limit ? (opts.limit > -1 ? opts.limit : this._logs[channel].items.length) : 1; // Return 1 if no limit is provided
let result = []; let result = [];
if(opts.key) { if(opts.key) {
// Key-Value, search latest key first // Key-Value, search latest key first
result = this._query(operations.reverse(), opts.key, 1, true); result = this._read(operations.reverse(), opts.key, 1, true);
} else if(opts.gt || opts.gte) { } else if(opts.gt || opts.gte) {
// Greater than case // Greater than case
result = this._query(operations, opts.gt ? opts.gt : opts.gte, amount, opts.gte || opts.lte); result = this._read(operations, opts.gt ? opts.gt : opts.gte, amount, opts.gte || opts.lte);
} else { } else {
// Lower than and lastN case, search latest first by reversing the sequence // Lower than and lastN case, search latest first by reversing the sequence
result = this._query(operations.reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse(); result = this._read(operations.reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse();
} }
if(opts.reverse) result.reverse(); if(opts.reverse) result.reverse();
return result.toArray(); return result.toArray();
} }
// Adds an event to the log
add(channel, password, data) { add(channel, password, data) {
const post = await(this._publish(data)); const post = await(Post.publish(this._ipfs, data));
const key = post.Hash; const key = post.Hash;
return await(this._createOperation(channel, password, Operations.Add, key, post.Hash, data)); return await(this._write(channel, password, OpTypes.Add, key, post.Hash, data));
} }
// Sets a key-value pair
put(channel, password, key, data) { put(channel, password, key, data) {
const post = await(this._publish(data)); const post = await(Post.publish(this._ipfs, data));
return await(this._createOperation(channel, password, Operations.Put, key, post.Hash)); return await(this._write(channel, password, OpTypes.Put, key, post.Hash));
} }
// Deletes an event based on hash (of the operation)
del(channel, password, hash) { del(channel, password, hash) {
return await(this._createOperation(channel, password, Operations.Delete, hash, null)); return await(this._write(channel, password, OpTypes.Delete, hash, null));
} }
deleteChannel(channel, password) { deleteChannel(channel, password) {
@ -79,14 +82,14 @@ class OrbitDB {
/* Private methods */ /* Private methods */
// The LWW-set // LWW-element-set
_query(sequence, key, amount, inclusive) { _read(sequence, key, amount, inclusive) {
// Last-Write-Wins, ie. use only the first occurance of the key // Last-Write-Wins, ie. use only the first occurance of the key
let handled = []; let handled = [];
const _createLWWSet = (item) => { const _createLWWSet = (item) => {
const wasHandled = Lazy(handled).indexOf(item.key) > -1; const wasHandled = Lazy(handled).indexOf(item.key) > -1;
if(!wasHandled) handled.push(item.key); if(!wasHandled) handled.push(item.key);
if(Operations.isUpdate(item.op) && !wasHandled) return item; if(OpTypes.isInsert(item.op) && !wasHandled) return item;
return null; return null;
}; };
@ -100,37 +103,12 @@ class OrbitDB {
.take(amount) .take(amount)
} }
_createOperation(channel, password, operation, key, value, data) { // Write an op to the db
var createOperation = async(() => { _write(channel, password, operation, key, value, data) {
return new Promise(async((resolve, reject) => { const hash = await(Operation.create(this._ipfs, this._logs[channel], this.user, operation, key, value));
const hash = this._createMessage(channel, password, operation, key, value);
const res = await(this._logs[channel].add(hash));
const listHash = await(this._logs[channel].ipfsHash);
resolve(listHash);
}));
})
const hash = await(createOperation());
this.events.emit('data', hash); this.events.emit('data', hash);
return key; return key;
} }
_createMessage(channel, password, operation, key, value) {
const size = -1;
const meta = new MetaInfo(ItemTypes.Message, size, this.user.username, new Date().getTime());
const item = new OrbitDBItem(operation, key, value, meta);
const data = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(item)));
return data.Hash;
}
_publish(data) {
return new Promise((resolve, reject) => {
let post = new Post(data);
// post.encrypt(privkey, pubkey);
const res = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(post)));
resolve(res);
})
}
} }
module.exports = OrbitDB; module.exports = OrbitDB;

10
src/db/OpTypes.js Normal file
View File

@ -0,0 +1,10 @@
'use strict';
const OpTypes = {
Add: "ADD",
Put: "PUT",
Delete: "DELETE",
isInsert: (op) => op === "ADD" || op === "PUT"
};
module.exports = OpTypes;

32
src/db/Operation.js Normal file
View File

@ -0,0 +1,32 @@
'use strict';
const async = require('asyncawait/async');
const await = require('asyncawait/await');
const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
const OrbitDBItem = require('./OrbitDBItem');
const ItemTypes = require('./ItemTypes');
const MetaInfo = require('./MetaInfo');
class Operation {
static create(ipfs, log, user, operation, key, value, data) {
var createOperation = async(() => {
return new Promise(async((resolve, reject) => {
const hash = Operation._createOperation(ipfs, user, operation, key, value);
const res = await(log.add(hash));
const listHash = await(log.ipfsHash);
resolve(listHash);
}));
})
return await(createOperation());
}
static _createOperation(ipfs, user, operation, key, value) {
const size = -1;
const meta = new MetaInfo(ItemTypes.Message, size, user.username, new Date().getTime());
const item = new OrbitDBItem(operation, key, value, meta);
const data = await (ipfsAPI.putObject(ipfs, JSON.stringify(item)));
return data.Hash;
}
}
module.exports = Operation;

View File

@ -1,5 +1,8 @@
'use strict'; 'use strict';
const async = require('asyncawait/async');
const await = require('asyncawait/await');
const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
const Encryption = require('orbit-common/lib/Encryption'); const Encryption = require('orbit-common/lib/Encryption');
class Post { class Post {
@ -11,6 +14,14 @@ class Post {
encrypt(privkey, pubkey) { encrypt(privkey, pubkey) {
this.content = Encryption.encrypt(this.content, privkey, pubkey); this.content = Encryption.encrypt(this.content, privkey, pubkey);
} }
static publish(ipfs, data) {
return new Promise((resolve, reject) => {
let post = new Post(data);
const res = await (ipfsAPI.putObject(ipfs, JSON.stringify(post)));
resolve(res);
})
}
} }
module.exports = Post; module.exports = Post;

View File

@ -1,10 +0,0 @@
'use strict';
const DBOperations = {
Add: "ADD",
Put: "PUT",
Delete: "DELETE",
isUpdate: (op) => op === "ADD" || op === "PUT"
};
module.exports = DBOperations;