From 11dc672e319c54ea95a2194d03e764d2ec0b2be3 Mon Sep 17 00:00:00 2001 From: haad Date: Tue, 19 Jan 2016 21:59:53 +0800 Subject: [PATCH] Refactor kv-store --- HashCacheItem.js | 17 +--- OrbitClient.js | 233 ++++++++++++++++++++++------------------------- README.md | 84 ++++++++++------- 3 files changed, 166 insertions(+), 168 deletions(-) diff --git a/HashCacheItem.js b/HashCacheItem.js index 23d215f..4f40197 100644 --- a/HashCacheItem.js +++ b/HashCacheItem.js @@ -9,8 +9,9 @@ const HashCacheOps = { }; class HashCacheItem { - constructor(operation, sequenceNumber, targetHash, metaInfo) { + constructor(operation, key, sequenceNumber, targetHash, metaInfo) { this.op = operation; + this.key = key; this.seq = sequenceNumber; this.target = targetHash; this.meta = metaInfo; @@ -18,8 +19,8 @@ class HashCacheItem { } class EncryptedHashCacheItem extends HashCacheItem { - constructor(operation, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt) { - super(operation, sequenceNumber, targetHash, metaInfo); + constructor(operation, key, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt) { + super(operation, key, sequenceNumber, targetHash, metaInfo); this.pubkey = publicKey; try { this.target = encryption.encrypt(targetHash, privateKey, publicKey); @@ -32,16 +33,8 @@ class EncryptedHashCacheItem extends HashCacheItem { } } -class KeyedEncryptedHashCacheItem extends EncryptedHashCacheItem { - constructor(operation, key, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt) { - super(operation, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt); - this.key = key; - } -} - module.exports = { HashCacheOps: HashCacheOps, HashCacheItem: HashCacheItem, - EncryptedHashCacheItem: EncryptedHashCacheItem, - KeyedEncryptedHashCacheItem: KeyedEncryptedHashCacheItem + EncryptedHashCacheItem: EncryptedHashCacheItem }; diff --git a/OrbitClient.js b/OrbitClient.js index c4cf521..0b8f26b 100644 --- a/OrbitClient.js +++ b/OrbitClient.js @@ -6,7 +6,6 @@ var ipfsDaemon = require('./ipfs-daemon'); var ipfsAPI = require('./ipfs-api-promised'); var HashCache = require('./HashCacheClient'); var HashCacheItem = require('./HashCacheItem').EncryptedHashCacheItem; -var KeyedHashCacheItem = require('./HashCacheItem').KeyedEncryptedHashCacheItem; var HashCacheOps = require('./HashCacheItem').HashCacheOps; var MetaInfo = require('./MetaInfo'); var ItemTypes = require('./ItemTypes'); @@ -19,7 +18,6 @@ var privkey = Keystore.getKeys().privateKey; class OrbitClient { constructor(ipfs) { - this.sequences = {}; this.ipfs = ipfs; this.network = {}; this.user = null; @@ -29,41 +27,20 @@ class OrbitClient { if(password === undefined) password = ''; return { info: (options) => this._info(hash, password), + delete: () => this._deleteChannel(hash, password), iterator: (options) => this._iterator(hash, password, options), - add: (text, options) => { - // TODO: create updateChannelSequence(), move the update to send() and remove() - this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1; - return this._send(hash, password, text, options); - }, - put: (key, data, options) => { - this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1; - return this._put(hash, password, key, data, options); - }, + setMode: (mode) => this._setMode(hash, password, mode), + add: (data) => this._add(hash, password, data), + //TODO: tests + remove: (options) => this._remove(hash, password, options), + put: (key, data) => this._put(hash, password, key, data), get: (key, options) => { - options = options ? Object.assign(options, { key: key }) : { key: key } - // console.log(JSON.stringify(this._iterator(hash, password, options).collect())); - const items = this._iterator(hash, password, options).collect(); + const items = this._iterator(hash, password, { key: key }).collect(); return items[0] ? items[0].item.Payload : null; }, - remove: (options) => { - this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1; - return this._remove(hash, password, options); - }, - delete: () => this._delete(hash, password), - setMode: (mode) => this._setMode(hash, password, mode) } } - _getChannelSequence(channel, password) { - var seq = 0 - var item = await(this.client.linkedList(channel, password).head()) - if(item.head) { - var headItem = await (ipfsAPI.getObject(this.ipfs, item.head)); - seq = JSON.parse(headItem.Data)["seq"] + 1; - } - return seq; - } - _iterator(channel, password, options) { const messages = this._getMessages(channel, password, options); @@ -88,24 +65,24 @@ class OrbitClient { } _getMessages(channel, password, options) { - var messages = []; + let messages = []; if(!options) options = {}; // Options - var limit = options.limit ? options.limit : 1; - var gt = options.gt ? options.gt : null; - var gte = options.gte ? options.gte : null; - var lt = options.lt ? options.lt : null; - var lte = options.lte ? options.lte : null; - var reverse = options.reverse ? options.reverse : false; - var key = options.key ? options.key : null; + let limit = options.limit ? options.limit : 1; + const gt = options.gt ? options.gt : null; + const gte = options.gte ? options.gte : null; + const lt = options.lt ? options.lt : null; + const lte = options.lte ? options.lte : null; + const reverse = options.reverse ? options.reverse : false; + const key = options.key ? options.key : null; - var startFromHash; + let startFromHash; if(lt || lte) { startFromHash = lte ? lte : lt; } else { - var channel = await (this.client.linkedList(channel, password).head()) + var channel = await (this.client.linkedList(channel, password).head()) startFromHash = channel.head ? channel.head : null; } @@ -113,11 +90,16 @@ class OrbitClient { if(startFromHash) { // Get messages - messages = this._fetchRecursive(startFromHash, password, limit, gte ? gte : gt, 0, [], key); + const opts = { + amount: limit, + last: gte ? gte : gt, + key: key + }; + messages = this._fetchRecursive(startFromHash, password, opts); // Slice the array - var startIndex = 0; - var endIndex = messages.length; + let startIndex = 0; + let endIndex = messages.length; if(limit < 0) { endIndex = messages.length - (gt ? 1 : 0); } else { @@ -134,31 +116,30 @@ class OrbitClient { } _fetchOne(hash, password) { - let item = null; - if(hash) { - item = await (ipfsAPI.getObject(this.ipfs, hash)); - let data = JSON.parse(item.Data); + let item; + item = await (ipfsAPI.getObject(this.ipfs, hash)); + let data = JSON.parse(item.Data); - // verify signature - const verified = Encryption.verify(data.target, data.pubkey, data.sig, data.seq, password); - if(!verified) throw "Item '" + hash + "' has the wrong signature" + // verify signature + const verified = Encryption.verify(data.target, data.pubkey, data.sig, data.seq, password); + if(!verified) throw "Item '" + hash + "' has the wrong signature" - // decrypt data structure - const targetDec = Encryption.decrypt(data.target, privkey, 'TODO: pubkey'); - const metaDec = Encryption.decrypt(data.meta, privkey, 'TODO: pubkey'); - data.target = targetDec; - data.meta = JSON.parse(metaDec); + // decrypt data structure + const targetDec = Encryption.decrypt(data.target, privkey, 'TODO: pubkey'); + const metaDec = Encryption.decrypt(data.meta, privkey, 'TODO: pubkey'); + data.target = targetDec; + data.meta = JSON.parse(metaDec); - // fetch and decrypt content - if(data.op === HashCacheOps.Add || data.op === HashCacheOps.Put) { - const payload = await (ipfsAPI.getObject(this.ipfs, data.target)); - const contentEnc = JSON.parse(payload.Data)["content"]; - const contentDec = Encryption.decrypt(contentEnc, privkey, 'TODO: pubkey'); - item.Payload = contentDec; - } - - item.Data = data; + // fetch and decrypt content + // TODO: add possibility to fetch content separately + if(data.op === HashCacheOps.Add || data.op === HashCacheOps.Put) { + const payload = await (ipfsAPI.getObject(this.ipfs, data.target)); + const contentEnc = JSON.parse(payload.Data)["content"]; + const contentDec = Encryption.decrypt(contentEnc, privkey, 'TODO: pubkey'); + item.Payload = contentDec; } + + item.Data = data; return item; } @@ -171,117 +152,125 @@ class OrbitClient { return contains; } - _fetchRecursive(hash, password, amount, last, currentDepth, deleted, key) { - var res = []; - var deletedItems = deleted ? deleted : []; + _fetchRecursive(hash, password, options, currentDepth, deleted) { + const opts = { + amount: options.amount ? options.amount : 1, + last: options.last ? options.last : null, + key: options.key ? options.key : null + }; + + let res = []; + let deletedItems = deleted ? deleted : []; if(!currentDepth) currentDepth = 0; - var message = await (this._fetchOne(hash, password)); + const message = await (this._fetchOne(hash, password)); - // console.log(message); - - if(message.Data.op === HashCacheOps.Add && !this._contains(deletedItems, hash)) { + // TODO: test this part + if(message.Data.op === HashCacheOps.Delete) { + deletedItems.push(message.Data.target); + } else if(message.Data.op === HashCacheOps.Add && !this._contains(deletedItems, hash)) { res.push({ hash: hash, item: message }); currentDepth ++; } else if(message.Data.op === HashCacheOps.Put && !this._contains(deletedItems, message.Data.key)) { - if(!key || key && key === message.Data.key) { + if(!opts.key || opts.key && opts.key === message.Data.key) { res.push({ hash: hash, item: message }); currentDepth ++; deletedItems.push(message.Data.key); } - } else if(message.Data.op === HashCacheOps.Delete) { - deletedItems.push(message.Data.target); } - if(key && message.Data.key === key) + if(opts.key && message.Data.key === opts.key) return res; - if(last && hash === last) + if(opts.last && hash === opts.last) return res; - if(!last && amount > -1 && currentDepth >= amount) + if(!opts.last && opts.amount > -1 && currentDepth >= opts.amount) return res; if(message && message.Links[0]) { - var next = this._fetchRecursive(message.Links[0].Hash, password, amount, last, currentDepth, deletedItems, key); + const next = this._fetchRecursive(message.Links[0].Hash, password, opts, currentDepth, deletedItems); res = res.concat(next); } return res; } - _publish(text) { - var post = new Post(text); + _publish(data) { + let post = new Post(data); post.encrypt(privkey, pubkey); return await (ipfsAPI.putObject(this.ipfs, JSON.stringify(post))); } - _createMessage(channel, password, key, target, operation, options) { - var seq = this.sequences[channel]; - var size = -1; - var metaInfo = new MetaInfo(ItemTypes.Message, size, new Date().getTime()); - var hcItem; - if(operation === HashCacheOps.Put) - hcItem = new KeyedHashCacheItem(operation, key, seq, target, metaInfo, pubkey, privkey, password); - else - hcItem = new HashCacheItem(operation, seq, target, metaInfo, pubkey, privkey, password); - - var item = await (ipfsAPI.putObject(this.ipfs, JSON.stringify(hcItem))); - var newHead = { Hash: item.Hash }; - - if(seq > 0) { - var prevHead = await(this.client.linkedList(channel, password).head()); - var headItem = await (ipfsAPI.getObject(this.ipfs, prevHead.head)); + _createMessage(channel, password, operation, key, target) { + // Get the current channel head and bump the sequence number + let seq = 0; + const currentHead = await(this.client.linkedList(channel, password).head()) + if(currentHead.head) { + const headItem = await (ipfsAPI.getObject(this.ipfs, currentHead.head)); seq = JSON.parse(headItem.Data)["seq"] + 1; - newHead = await (ipfsAPI.patchObject(this.ipfs, item.Hash, prevHead.head)) - this.sequences[channel] = seq; } + + // Create meta info + const size = -1; + const metaInfo = new MetaInfo(ItemTypes.Message, size, new Date().getTime()); + + // Create the hash cache item + const hcItem = new HashCacheItem(operation, key, seq, target, metaInfo, pubkey, privkey, password); + + // Save the item to ipfs + const data = await (ipfsAPI.putObject(this.ipfs, JSON.stringify(hcItem))); + let newHead = { Hash: data.Hash }; + + // If this is not the first item in the channel, patch with the previous (ie. link as next) + if(seq > 0) + newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, currentHead.head)); + return newHead; } - _send(channel, password, text, options) { - // TODO: check options for what type to publish as (text, snippet, file, etc.) - var post = this._publish(text); - var message = this._createMessage(channel, password, null, post.Hash, HashCacheOps.Add); - await(this.client.linkedList(channel, password).add(message.Hash)); - return message.Hash; + /* DB Operations */ + _add(channel, password, data) { + const key = null; + const post = this._publish(data); + return this._createOperation(channel, password, HashCacheOps.Add, key, post.Hash); } - // WIP - _put(channel, password, key, data, options) { - // TODO: options - var post = this._publish(data); - var message = this._createMessage(channel, password, key, post.Hash, HashCacheOps.Put); - await(this.client.linkedList(channel, password).add(message.Hash)); - return message.Hash; + _put(channel, password, key, data) { + const post = this._publish(data); + return this._createOperation(channel, password, HashCacheOps.Put, key, post.Hash); } _remove(channel, password, options) { - const target = options.key ? options.key : (options.hash ? options.hash : null); - const message = this._createMessage(channel, password, null, target, HashCacheOps.Delete); - await(this.client.linkedList(channel, password).add(message.Hash)) + const key = null; + const target = options.key ? options.key : (options.hash ? options.hash : null); + return this._createOperation(channel, password, HashCacheOps.Delete, key, target); + } + + _createOperation(channel, password, operation, key, value) { + const message = this._createMessage(channel, password, operation, key, value); + await(this.client.linkedList(channel, password).add(message.Hash)); return message.Hash; } - _delete(channel, password) { - await(this.client.linkedList(channel, password).delete()) - delete this.sequences[channel]; + _deleteChannel(channel, password) { + await(this.client.linkedList(channel, password).delete()); return true; } _setMode(channel, password, modes) { - var m = [] + let m = []; if(typeof modes !== 'Array') m.push(modes); else m = modes; - var res = await(this.client.linkedList(channel, password).setMode(m)); + const res = await(this.client.linkedList(channel, password).setMode(m)); return res.modes; } _info(channel, password) { - return await(this.client.linkedList(channel, password).head()) + return await(this.client.linkedList(channel, password).head()); } _connect(host, username, password) { @@ -298,11 +287,11 @@ class OrbitClient { class OrbitClientFactory { static connect(host, username, password, ipfs) { if(!ipfs) { - var ipfsd = await(ipfsDaemon()); + let ipfsd = await(ipfsDaemon()); ipfs = ipfsd.daemon; } - var client = new OrbitClient(ipfs); + const client = new OrbitClient(ipfs); await(client._connect(host, username, password)) return client; } diff --git a/README.md b/README.md index 9b59489..f406703 100644 --- a/README.md +++ b/README.md @@ -2,36 +2,52 @@ ## Introduction -***VERY MUCH WIP! WILL NOT WORK WHEN CLONED, orbit-server REQUIRED!*** +Key-Value Store and Event Store on IPFS. -Client library to interact with orbit-server. Implements the levelDOWN API without get(key, cb). +***VERY MUCH WIP! WILL NOT WORK WHEN CLONED, orbit-server (not released yet) REQUIRED!*** -orbit-server uses linked lists on top of IPFS. orbit-server not *yet* released, working on it. +## Features +- Distributed kv-store and event log database +- Stores all data in IPFS +- Data encrypted on the wire and at rest +- Per channel access rights -### TODO -- Tests for .remove(...) -- Local caching of messages -- Use HTTPS instead of HTTP (channel password are sent in plaintext atm) -- API for fetching user info -- OrbitNetwork - + channel system (join, part, pub/sub) +_Channel maps to "table", "keyspace", "topic" or "feed" in similar systems_ ## API connect(host, username, password) channel(name, password) - .add(data: String) + .add(data: String) // Insert an event to a channel, returns of the event - .put(key, data: String) + .iterator([options]) // Returns an iterator of events - .remove(hash or key) + // options : { + // gt: , // Return events newer than + // gte: , // Return events newer then (inclusive) + // lt: , // Return events older than + // lte: , // Return events older than (inclusive) + // limit: -1, // Number of events to return, -1 returns all, default 1 + // reverse: true // Return items oldest first, default latest first + // } - .iterator([options]) + .put(key, data: String) // Insert (key,value) to a channel - .setMode(modes) + .get(key) // Retrieve value - .delete() + .remove({ key: , hash: }) // Remove entry (use one option) + + .setMode(modes) // Set channel modes, can be an object or an array of objects + + // { mode: "+r", params: { password: password } } // Set read mode + // { mode: "-r" } // Remove read-mode + // { mode: "+w", params: { ops: [orbit.user.id] } } // Set write-mode, only users in ops can write + // { mode: "-w" } // Remove write-mode + + .info() // Returns channel's current head and modes + + .delete() // Deletes the channel, all data will be "removed" (unassociated with the channel, actual data is not deleted) ## Usage ```javascript @@ -42,41 +58,35 @@ var host = 'localhost:3006'; // orbit-server address async(() => { // Connect - const orbit = OrbitClient.connect(host, username, password); // OrbitClient + const orbit = OrbitClient.connect(host, username, password); const channelName = 'hello-world'; - // Send a message - const head = orbit.channel(channelName).send('hello'); // + // Send an event + const head = orbit.channel(channelName).add('hello'); // - // Delete a message + // Delete an event orbit.channel(channelName).remove(head); // Iterator options - const options = { limit: -1 }; // fetch all messages, default is 1 - // { - // gt: , - // gte: , - // lt: , - // lte: , - // limit: 10, - // reverse: true - // } + const options = { limit: -1 }; // fetch all messages - // Get messages + // Get events const iter = orbit.channel(channelName).iterator(options); // Symbol.iterator const next = iter.next(); // { value: , done: false|true} + // OR: // var all = iter.collect(); // returns all elements as an array // OR: // for(let i of iter) - // console.log(i.hash, i.item.Data.seq); + // console.log(i.hash, i.item); - // Remove element - orbit.channel(channelName).remove(next.value.hash); // remove first element iterator returns + // KV-store + orbit.channel(channelName).put("key1", "hello world"); + orbit.channel(channelName).get("key1"); // returns "hello world" - // Set modes + // Modes const password = 'hello'; const channelModes; channelModes = orbit.channel(channel).setMode({ mode: "+r", params: { password: password } }); // { modes: { r: { password: 'hello' } } } @@ -88,3 +98,9 @@ async(() => { const result = orbit.channel(channelName, channelPwd).delete(); // true | false })(); ``` + +### TODO +- Tests for remove(), put() and get() +- Local caching of messages +- Possibility to fetch content separately from data structure +- Use HTTPS instead of HTTP (channel password are sent in plaintext atm)