Refactor kv-store

This commit is contained in:
haad 2016-01-19 21:59:53 +08:00
parent 67fd17984e
commit 11dc672e31
3 changed files with 166 additions and 168 deletions

View File

@ -9,8 +9,9 @@ const HashCacheOps = {
}; };
class HashCacheItem { class HashCacheItem {
constructor(operation, sequenceNumber, targetHash, metaInfo) { constructor(operation, key, sequenceNumber, targetHash, metaInfo) {
this.op = operation; this.op = operation;
this.key = key;
this.seq = sequenceNumber; this.seq = sequenceNumber;
this.target = targetHash; this.target = targetHash;
this.meta = metaInfo; this.meta = metaInfo;
@ -18,8 +19,8 @@ class HashCacheItem {
} }
class EncryptedHashCacheItem extends HashCacheItem { class EncryptedHashCacheItem extends HashCacheItem {
constructor(operation, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt) { constructor(operation, key, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt) {
super(operation, sequenceNumber, targetHash, metaInfo); super(operation, key, sequenceNumber, targetHash, metaInfo);
this.pubkey = publicKey; this.pubkey = publicKey;
try { try {
this.target = encryption.encrypt(targetHash, privateKey, publicKey); this.target = encryption.encrypt(targetHash, privateKey, publicKey);
@ -32,16 +33,8 @@ class EncryptedHashCacheItem extends HashCacheItem {
} }
} }
class KeyedEncryptedHashCacheItem extends EncryptedHashCacheItem {
constructor(operation, key, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt) {
super(operation, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt);
this.key = key;
}
}
module.exports = { module.exports = {
HashCacheOps: HashCacheOps, HashCacheOps: HashCacheOps,
HashCacheItem: HashCacheItem, HashCacheItem: HashCacheItem,
EncryptedHashCacheItem: EncryptedHashCacheItem, EncryptedHashCacheItem: EncryptedHashCacheItem
KeyedEncryptedHashCacheItem: KeyedEncryptedHashCacheItem
}; };

View File

@ -6,7 +6,6 @@ var ipfsDaemon = require('./ipfs-daemon');
var ipfsAPI = require('./ipfs-api-promised'); var ipfsAPI = require('./ipfs-api-promised');
var HashCache = require('./HashCacheClient'); var HashCache = require('./HashCacheClient');
var HashCacheItem = require('./HashCacheItem').EncryptedHashCacheItem; var HashCacheItem = require('./HashCacheItem').EncryptedHashCacheItem;
var KeyedHashCacheItem = require('./HashCacheItem').KeyedEncryptedHashCacheItem;
var HashCacheOps = require('./HashCacheItem').HashCacheOps; var HashCacheOps = require('./HashCacheItem').HashCacheOps;
var MetaInfo = require('./MetaInfo'); var MetaInfo = require('./MetaInfo');
var ItemTypes = require('./ItemTypes'); var ItemTypes = require('./ItemTypes');
@ -19,7 +18,6 @@ var privkey = Keystore.getKeys().privateKey;
class OrbitClient { class OrbitClient {
constructor(ipfs) { constructor(ipfs) {
this.sequences = {};
this.ipfs = ipfs; this.ipfs = ipfs;
this.network = {}; this.network = {};
this.user = null; this.user = null;
@ -29,41 +27,20 @@ class OrbitClient {
if(password === undefined) password = ''; if(password === undefined) password = '';
return { return {
info: (options) => this._info(hash, password), info: (options) => this._info(hash, password),
delete: () => this._deleteChannel(hash, password),
iterator: (options) => this._iterator(hash, password, options), iterator: (options) => this._iterator(hash, password, options),
add: (text, options) => { setMode: (mode) => this._setMode(hash, password, mode),
// TODO: create updateChannelSequence(), move the update to send() and remove() add: (data) => this._add(hash, password, data),
this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1; //TODO: tests
return this._send(hash, password, text, options); remove: (options) => this._remove(hash, password, options),
}, put: (key, data) => this._put(hash, password, key, data),
put: (key, data, options) => {
this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1;
return this._put(hash, password, key, data, options);
},
get: (key, options) => { get: (key, options) => {
options = options ? Object.assign(options, { key: key }) : { key: key } const items = this._iterator(hash, password, { key: key }).collect();
// console.log(JSON.stringify(this._iterator(hash, password, options).collect()));
const items = this._iterator(hash, password, options).collect();
return items[0] ? items[0].item.Payload : null; return items[0] ? items[0].item.Payload : null;
}, },
remove: (options) => {
this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1;
return this._remove(hash, password, options);
},
delete: () => this._delete(hash, password),
setMode: (mode) => this._setMode(hash, password, mode)
} }
} }
_getChannelSequence(channel, password) {
var seq = 0
var item = await(this.client.linkedList(channel, password).head())
if(item.head) {
var headItem = await (ipfsAPI.getObject(this.ipfs, item.head));
seq = JSON.parse(headItem.Data)["seq"] + 1;
}
return seq;
}
_iterator(channel, password, options) { _iterator(channel, password, options) {
const messages = this._getMessages(channel, password, options); const messages = this._getMessages(channel, password, options);
@ -88,24 +65,24 @@ class OrbitClient {
} }
_getMessages(channel, password, options) { _getMessages(channel, password, options) {
var messages = []; let messages = [];
if(!options) options = {}; if(!options) options = {};
// Options // Options
var limit = options.limit ? options.limit : 1; let limit = options.limit ? options.limit : 1;
var gt = options.gt ? options.gt : null; const gt = options.gt ? options.gt : null;
var gte = options.gte ? options.gte : null; const gte = options.gte ? options.gte : null;
var lt = options.lt ? options.lt : null; const lt = options.lt ? options.lt : null;
var lte = options.lte ? options.lte : null; const lte = options.lte ? options.lte : null;
var reverse = options.reverse ? options.reverse : false; const reverse = options.reverse ? options.reverse : false;
var key = options.key ? options.key : null; const key = options.key ? options.key : null;
var startFromHash; let startFromHash;
if(lt || lte) { if(lt || lte) {
startFromHash = lte ? lte : lt; startFromHash = lte ? lte : lt;
} else { } else {
var channel = await (this.client.linkedList(channel, password).head()) var channel = await (this.client.linkedList(channel, password).head())
startFromHash = channel.head ? channel.head : null; startFromHash = channel.head ? channel.head : null;
} }
@ -113,11 +90,16 @@ class OrbitClient {
if(startFromHash) { if(startFromHash) {
// Get messages // Get messages
messages = this._fetchRecursive(startFromHash, password, limit, gte ? gte : gt, 0, [], key); const opts = {
amount: limit,
last: gte ? gte : gt,
key: key
};
messages = this._fetchRecursive(startFromHash, password, opts);
// Slice the array // Slice the array
var startIndex = 0; let startIndex = 0;
var endIndex = messages.length; let endIndex = messages.length;
if(limit < 0) { if(limit < 0) {
endIndex = messages.length - (gt ? 1 : 0); endIndex = messages.length - (gt ? 1 : 0);
} else { } else {
@ -134,31 +116,30 @@ class OrbitClient {
} }
_fetchOne(hash, password) { _fetchOne(hash, password) {
let item = null; let item;
if(hash) { item = await (ipfsAPI.getObject(this.ipfs, hash));
item = await (ipfsAPI.getObject(this.ipfs, hash)); let data = JSON.parse(item.Data);
let data = JSON.parse(item.Data);
// verify signature // verify signature
const verified = Encryption.verify(data.target, data.pubkey, data.sig, data.seq, password); const verified = Encryption.verify(data.target, data.pubkey, data.sig, data.seq, password);
if(!verified) throw "Item '" + hash + "' has the wrong signature" if(!verified) throw "Item '" + hash + "' has the wrong signature"
// decrypt data structure // decrypt data structure
const targetDec = Encryption.decrypt(data.target, privkey, 'TODO: pubkey'); const targetDec = Encryption.decrypt(data.target, privkey, 'TODO: pubkey');
const metaDec = Encryption.decrypt(data.meta, privkey, 'TODO: pubkey'); const metaDec = Encryption.decrypt(data.meta, privkey, 'TODO: pubkey');
data.target = targetDec; data.target = targetDec;
data.meta = JSON.parse(metaDec); data.meta = JSON.parse(metaDec);
// fetch and decrypt content // fetch and decrypt content
if(data.op === HashCacheOps.Add || data.op === HashCacheOps.Put) { // TODO: add possibility to fetch content separately
const payload = await (ipfsAPI.getObject(this.ipfs, data.target)); if(data.op === HashCacheOps.Add || data.op === HashCacheOps.Put) {
const contentEnc = JSON.parse(payload.Data)["content"]; const payload = await (ipfsAPI.getObject(this.ipfs, data.target));
const contentDec = Encryption.decrypt(contentEnc, privkey, 'TODO: pubkey'); const contentEnc = JSON.parse(payload.Data)["content"];
item.Payload = contentDec; const contentDec = Encryption.decrypt(contentEnc, privkey, 'TODO: pubkey');
} item.Payload = contentDec;
item.Data = data;
} }
item.Data = data;
return item; return item;
} }
@ -171,117 +152,125 @@ class OrbitClient {
return contains; return contains;
} }
_fetchRecursive(hash, password, amount, last, currentDepth, deleted, key) { _fetchRecursive(hash, password, options, currentDepth, deleted) {
var res = []; const opts = {
var deletedItems = deleted ? deleted : []; amount: options.amount ? options.amount : 1,
last: options.last ? options.last : null,
key: options.key ? options.key : null
};
let res = [];
let deletedItems = deleted ? deleted : [];
if(!currentDepth) currentDepth = 0; if(!currentDepth) currentDepth = 0;
var message = await (this._fetchOne(hash, password)); const message = await (this._fetchOne(hash, password));
// console.log(message); // TODO: test this part
if(message.Data.op === HashCacheOps.Delete) {
if(message.Data.op === HashCacheOps.Add && !this._contains(deletedItems, hash)) { deletedItems.push(message.Data.target);
} else if(message.Data.op === HashCacheOps.Add && !this._contains(deletedItems, hash)) {
res.push({ hash: hash, item: message }); res.push({ hash: hash, item: message });
currentDepth ++; currentDepth ++;
} else if(message.Data.op === HashCacheOps.Put && !this._contains(deletedItems, message.Data.key)) { } else if(message.Data.op === HashCacheOps.Put && !this._contains(deletedItems, message.Data.key)) {
if(!key || key && key === message.Data.key) { if(!opts.key || opts.key && opts.key === message.Data.key) {
res.push({ hash: hash, item: message }); res.push({ hash: hash, item: message });
currentDepth ++; currentDepth ++;
deletedItems.push(message.Data.key); deletedItems.push(message.Data.key);
} }
} else if(message.Data.op === HashCacheOps.Delete) {
deletedItems.push(message.Data.target);
} }
if(key && message.Data.key === key) if(opts.key && message.Data.key === opts.key)
return res; return res;
if(last && hash === last) if(opts.last && hash === opts.last)
return res; return res;
if(!last && amount > -1 && currentDepth >= amount) if(!opts.last && opts.amount > -1 && currentDepth >= opts.amount)
return res; return res;
if(message && message.Links[0]) { if(message && message.Links[0]) {
var next = this._fetchRecursive(message.Links[0].Hash, password, amount, last, currentDepth, deletedItems, key); const next = this._fetchRecursive(message.Links[0].Hash, password, opts, currentDepth, deletedItems);
res = res.concat(next); res = res.concat(next);
} }
return res; return res;
} }
_publish(text) { _publish(data) {
var post = new Post(text); let post = new Post(data);
post.encrypt(privkey, pubkey); post.encrypt(privkey, pubkey);
return await (ipfsAPI.putObject(this.ipfs, JSON.stringify(post))); return await (ipfsAPI.putObject(this.ipfs, JSON.stringify(post)));
} }
_createMessage(channel, password, key, target, operation, options) { _createMessage(channel, password, operation, key, target) {
var seq = this.sequences[channel]; // Get the current channel head and bump the sequence number
var size = -1; let seq = 0;
var metaInfo = new MetaInfo(ItemTypes.Message, size, new Date().getTime()); const currentHead = await(this.client.linkedList(channel, password).head())
var hcItem; if(currentHead.head) {
if(operation === HashCacheOps.Put) const headItem = await (ipfsAPI.getObject(this.ipfs, currentHead.head));
hcItem = new KeyedHashCacheItem(operation, key, seq, target, metaInfo, pubkey, privkey, password);
else
hcItem = new HashCacheItem(operation, seq, target, metaInfo, pubkey, privkey, password);
var item = await (ipfsAPI.putObject(this.ipfs, JSON.stringify(hcItem)));
var newHead = { Hash: item.Hash };
if(seq > 0) {
var prevHead = await(this.client.linkedList(channel, password).head());
var headItem = await (ipfsAPI.getObject(this.ipfs, prevHead.head));
seq = JSON.parse(headItem.Data)["seq"] + 1; seq = JSON.parse(headItem.Data)["seq"] + 1;
newHead = await (ipfsAPI.patchObject(this.ipfs, item.Hash, prevHead.head))
this.sequences[channel] = seq;
} }
// Create meta info
const size = -1;
const metaInfo = new MetaInfo(ItemTypes.Message, size, new Date().getTime());
// Create the hash cache item
const hcItem = new HashCacheItem(operation, key, seq, target, metaInfo, pubkey, privkey, password);
// Save the item to ipfs
const data = await (ipfsAPI.putObject(this.ipfs, JSON.stringify(hcItem)));
let newHead = { Hash: data.Hash };
// If this is not the first item in the channel, patch with the previous (ie. link as next)
if(seq > 0)
newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, currentHead.head));
return newHead; return newHead;
} }
_send(channel, password, text, options) { /* DB Operations */
// TODO: check options for what type to publish as (text, snippet, file, etc.) _add(channel, password, data) {
var post = this._publish(text); const key = null;
var message = this._createMessage(channel, password, null, post.Hash, HashCacheOps.Add); const post = this._publish(data);
await(this.client.linkedList(channel, password).add(message.Hash)); return this._createOperation(channel, password, HashCacheOps.Add, key, post.Hash);
return message.Hash;
} }
// WIP _put(channel, password, key, data) {
_put(channel, password, key, data, options) { const post = this._publish(data);
// TODO: options return this._createOperation(channel, password, HashCacheOps.Put, key, post.Hash);
var post = this._publish(data);
var message = this._createMessage(channel, password, key, post.Hash, HashCacheOps.Put);
await(this.client.linkedList(channel, password).add(message.Hash));
return message.Hash;
} }
_remove(channel, password, options) { _remove(channel, password, options) {
const target = options.key ? options.key : (options.hash ? options.hash : null); const key = null;
const message = this._createMessage(channel, password, null, target, HashCacheOps.Delete); const target = options.key ? options.key : (options.hash ? options.hash : null);
await(this.client.linkedList(channel, password).add(message.Hash)) return this._createOperation(channel, password, HashCacheOps.Delete, key, target);
}
_createOperation(channel, password, operation, key, value) {
const message = this._createMessage(channel, password, operation, key, value);
await(this.client.linkedList(channel, password).add(message.Hash));
return message.Hash; return message.Hash;
} }
_delete(channel, password) { _deleteChannel(channel, password) {
await(this.client.linkedList(channel, password).delete()) await(this.client.linkedList(channel, password).delete());
delete this.sequences[channel];
return true; return true;
} }
_setMode(channel, password, modes) { _setMode(channel, password, modes) {
var m = [] let m = [];
if(typeof modes !== 'Array') if(typeof modes !== 'Array')
m.push(modes); m.push(modes);
else else
m = modes; m = modes;
var res = await(this.client.linkedList(channel, password).setMode(m)); const res = await(this.client.linkedList(channel, password).setMode(m));
return res.modes; return res.modes;
} }
_info(channel, password) { _info(channel, password) {
return await(this.client.linkedList(channel, password).head()) return await(this.client.linkedList(channel, password).head());
} }
_connect(host, username, password) { _connect(host, username, password) {
@ -298,11 +287,11 @@ class OrbitClient {
class OrbitClientFactory { class OrbitClientFactory {
static connect(host, username, password, ipfs) { static connect(host, username, password, ipfs) {
if(!ipfs) { if(!ipfs) {
var ipfsd = await(ipfsDaemon()); let ipfsd = await(ipfsDaemon());
ipfs = ipfsd.daemon; ipfs = ipfsd.daemon;
} }
var client = new OrbitClient(ipfs); const client = new OrbitClient(ipfs);
await(client._connect(host, username, password)) await(client._connect(host, username, password))
return client; return client;
} }

View File

@ -2,36 +2,52 @@
## Introduction ## Introduction
***VERY MUCH WIP! WILL NOT WORK WHEN CLONED, orbit-server REQUIRED!*** Key-Value Store and Event Store on IPFS.
Client library to interact with orbit-server. Implements the levelDOWN API without get(key, cb). ***VERY MUCH WIP! WILL NOT WORK WHEN CLONED, orbit-server (not released yet) REQUIRED!***
orbit-server uses linked lists on top of IPFS. orbit-server not *yet* released, working on it. ## Features
- Distributed kv-store and event log database
- Stores all data in IPFS
- Data encrypted on the wire and at rest
- Per channel access rights
### TODO _Channel maps to "table", "keyspace", "topic" or "feed" in similar systems_
- Tests for .remove(...)
- Local caching of messages
- Use HTTPS instead of HTTP (channel password are sent in plaintext atm)
- API for fetching user info
- OrbitNetwork
+ channel system (join, part, pub/sub)
## API ## API
connect(host, username, password) connect(host, username, password)
channel(name, password) channel(name, password)
.add(data: String) .add(data: String) // Insert an event to a channel, returns <ipfs-hash> of the event
.put(key, data: String) .iterator([options]) // Returns an iterator of events
.remove(hash or key) // options : {
// gt: <ipfs-hash>, // Return events newer than <ipfs-hash>
// gte: <ipfs-hash>, // Return events newer then <ipfs-hash> (inclusive)
// lt: <ipfs-hash>, // Return events older than <ipfs-hash>
// lte: <ipfs-hash>, // Return events older than <ipfs-hash> (inclusive)
// limit: -1, // Number of events to return, -1 returns all, default 1
// reverse: true // Return items oldest first, default latest first
// }
.iterator([options]) .put(key, data: String) // Insert (key,value) to a channel
.setMode(modes) .get(key) // Retrieve value
.delete() .remove({ key: <key>, hash: <event's ipfs-hash> }) // Remove entry (use one option)
.setMode(modes) // Set channel modes, can be an object or an array of objects
// { mode: "+r", params: { password: password } } // Set read mode
// { mode: "-r" } // Remove read-mode
// { mode: "+w", params: { ops: [orbit.user.id] } } // Set write-mode, only users in ops can write
// { mode: "-w" } // Remove write-mode
.info() // Returns channel's current head and modes
.delete() // Deletes the channel, all data will be "removed" (unassociated with the channel, actual data is not deleted)
## Usage ## Usage
```javascript ```javascript
@ -42,41 +58,35 @@ var host = 'localhost:3006'; // orbit-server address
async(() => { async(() => {
// Connect // Connect
const orbit = OrbitClient.connect(host, username, password); // OrbitClient const orbit = OrbitClient.connect(host, username, password);
const channelName = 'hello-world'; const channelName = 'hello-world';
// Send a message // Send an event
const head = orbit.channel(channelName).send('hello'); // <ipfs-hash> const head = orbit.channel(channelName).add('hello'); // <ipfs-hash>
// Delete a message // Delete an event
orbit.channel(channelName).remove(head); orbit.channel(channelName).remove(head);
// Iterator options // Iterator options
const options = { limit: -1 }; // fetch all messages, default is 1 const options = { limit: -1 }; // fetch all messages
// {
// gt: <hash>,
// gte: <hash>,
// lt: <hash>,
// lte: <hash>,
// limit: 10,
// reverse: true
// }
// Get messages // Get events
const iter = orbit.channel(channelName).iterator(options); // Symbol.iterator const iter = orbit.channel(channelName).iterator(options); // Symbol.iterator
const next = iter.next(); // { value: <item>, done: false|true} const next = iter.next(); // { value: <item>, done: false|true}
// OR: // OR:
// var all = iter.collect(); // returns all elements as an array // var all = iter.collect(); // returns all elements as an array
// OR: // OR:
// for(let i of iter) // for(let i of iter)
// console.log(i.hash, i.item.Data.seq); // console.log(i.hash, i.item);
// Remove element // KV-store
orbit.channel(channelName).remove(next.value.hash); // remove first element iterator returns orbit.channel(channelName).put("key1", "hello world");
orbit.channel(channelName).get("key1"); // returns "hello world"
// Set modes // Modes
const password = 'hello'; const password = 'hello';
const channelModes; const channelModes;
channelModes = orbit.channel(channel).setMode({ mode: "+r", params: { password: password } }); // { modes: { r: { password: 'hello' } } } channelModes = orbit.channel(channel).setMode({ mode: "+r", params: { password: password } }); // { modes: { r: { password: 'hello' } } }
@ -88,3 +98,9 @@ async(() => {
const result = orbit.channel(channelName, channelPwd).delete(); // true | false const result = orbit.channel(channelName, channelPwd).delete(); // true | false
})(); })();
``` ```
### TODO
- Tests for remove(), put() and get()
- Local caching of messages
- Possibility to fetch content separately from data structure
- Use HTTPS instead of HTTP (channel password are sent in plaintext atm)