From 7712c1943f99dfea4223ee2940374a5be4daaf8b Mon Sep 17 00:00:00 2001 From: haad Date: Fri, 4 Mar 2016 23:18:08 +0100 Subject: [PATCH] Split OrbitClient to OrbitClient and OrbitDB --- src/OrbitClient.js | 112 ++++++++---------------------------------- src/OrbitDB.js | 98 ++++++++++++++++++++++++++++++++++++ src/list/OrbitList.js | 7 ++- 3 files changed, 121 insertions(+), 96 deletions(-) create mode 100644 src/OrbitDB.js diff --git a/src/OrbitClient.js b/src/OrbitClient.js index a88d85e..1e78db4 100644 --- a/src/OrbitClient.js +++ b/src/OrbitClient.js @@ -4,55 +4,43 @@ const EventEmitter = require('events').EventEmitter; const async = require('asyncawait/async'); const await = require('asyncawait/await'); const ipfsDaemon = require('orbit-common/lib/ipfs-daemon'); -const ipfsAPI = require('orbit-common/lib/ipfs-api-promised'); -const Operations = require('./list/Operations'); -const List = require('./list/OrbitList'); -const OrbitDBItem = require('./db/OrbitDBItem'); -const ItemTypes = require('./db/ItemTypes'); -const MetaInfo = require('./db/MetaInfo'); -const Post = require('./db/Post'); const PubSub = require('./PubSub'); +const OrbitDB = require('./OrbitDB'); -class OrbitDB { +class OrbitClient { constructor(ipfs, daemon) { this._ipfs = ipfs; - this._store = {}; this._pubsub = null; this.user = null; this.network = null; this.events = new EventEmitter(); + this.db = new OrbitDB(this._ipfs); } - channel(hash, password, subscribe) { + channel(channel, password, subscribe) { if(password === undefined) password = ''; if(subscribe === undefined) subscribe = true; - this._store[hash] = new List(this._ipfs, this.user.username); - - const onMessage = async((hash, message) => { - // console.log("--> Head:", message) - if(message && this._store[hash]) { - const other = List.fromIpfsHash(this._ipfs, message); - this._store[hash].join(other); - } - this.events.emit('data', hash, message); - }); + this.db.use(channel, this.user, password); + this.db.events.on('data', async((hash) => { + await(this._pubsub.publish(channel, hash)); + this.events.emit('data', channel, hash); + })); if(subscribe) - this._pubsub.subscribe(hash, password, onMessage, onMessage); + this._pubsub.subscribe(channel, password, async((channel, message) => this.db.sync(channel, message))); return { - iterator: (options) => this._iterator(hash, password, options), - delete: () => this._deleteChannel(hash, password), - add: (data) => this._add(hash, password, data), - del: (key) => this._remove(hash, password, key), - put: (key, data) => this._put(hash, password, key, data), + iterator: (options) => this._iterator(channel, password, options), + delete: () => this.db.deleteChannel(channel, password), + del: (key) => this.db.del(channel, password, key), + add: (data) => this.db.add(channel, password, data), + put: (key, data) => this.db.put(channel, password, key, data), get: (key, options) => { - const items = this._iterator(hash, password, { key: key }).collect(); + const items = this._iterator(channel, password, { key: key }).collect(); return items[0] ? items[0].payload.value : null; }, - //TODO: tests - leave: () => this._pubsub.unsubscribe(hash) + leave: () => this._pubsub.unsubscribe(channel) } } @@ -61,10 +49,11 @@ class OrbitDB { this._store = {}; this.user = null; this.network = null; + this.db = null; } _iterator(channel, password, options) { - const messages = this._getMessages(channel, password, options); + const messages = this.db.read(channel, password, options); let currentIndex = 0; let iterator = { [Symbol.iterator]() { @@ -84,67 +73,6 @@ class OrbitDB { return iterator; } - _getMessages(channel, password, options) { - let opts = options || {}; - Object.assign(opts, { amount: opts.limit || 1 }); - let messages = await(this._store[channel].findAll(opts)); - if(opts.reverse) messages.reverse(); - return messages; - } - - _publish(data) { - return new Promise((resolve, reject) => { - let post = new Post(data); - // post.encrypt(privkey, pubkey); - const res = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(post))); - resolve(res); - }) - } - - _createMessage(channel, password, operation, key, value) { - const size = -1; - const meta = new MetaInfo(ItemTypes.Message, size, this.user.username, new Date().getTime()); - const item = new OrbitDBItem(operation, key, value, meta); - const data = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(item))); - return data.Hash; - } - - /* DB Operations */ - _add(channel, password, data) { - const post = await(this._publish(data)); - const key = post.Hash; - return await(this._createOperation(channel, password, Operations.Add, key, post.Hash, data)); - } - - _put(channel, password, key, data) { - const post = await(this._publish(data)); - return await(this._createOperation(channel, password, Operations.Put, key, post.Hash)); - } - - _remove(channel, password, hash) { - return await(this._createOperation(channel, password, Operations.Delete, hash, null)); - } - - _createOperation(channel, password, operation, key, value, data) { - var createOperation = async(() => { - return new Promise(async((resolve, reject) => { - const hash = this._createMessage(channel, password, operation, key, value); - const res = await(this._store[channel].add(hash)); - const listHash = await(this._store[channel].ipfsHash); - await(this._pubsub.publish(channel, listHash)); - resolve(); - })); - }) - await(createOperation()); - return key; - // return res; - } - - _deleteChannel(channel, password) { - this._store[channel].clear(); - return true; - } - _connect(host, port, username, password, allowOffline) { if(allowOffline === undefined) allowOffline = false; try { @@ -165,7 +93,7 @@ class OrbitClientFactory { ipfs = ipfsd.ipfs; } - const client = new OrbitDB(ipfs); + const client = new OrbitClient(ipfs); await(client._connect(host, port, username, password, allowOffline)) return client; } diff --git a/src/OrbitDB.js b/src/OrbitDB.js new file mode 100644 index 0000000..90e0a37 --- /dev/null +++ b/src/OrbitDB.js @@ -0,0 +1,98 @@ +'use strict'; + +const EventEmitter = require('events').EventEmitter; +const async = require('asyncawait/async'); +const await = require('asyncawait/await'); +const ipfsAPI = require('orbit-common/lib/ipfs-api-promised'); +const Operations = require('./list/Operations'); +const List = require('./list/OrbitList'); +const OrbitDBItem = require('./db/OrbitDBItem'); +const ItemTypes = require('./db/ItemTypes'); +const MetaInfo = require('./db/MetaInfo'); +const Post = require('./db/Post'); + +class OrbitDB { + constructor(ipfs) { + this._ipfs = ipfs; + this._logs = {}; + this.events = new EventEmitter(); + } + + /* Public methods */ + use(channel, user, password) { + this.user = user; + this._logs[channel] = new List(this._ipfs, this.user.username); + } + + sync(channel, hash) { + console.log("--> Head:", hash) + if(hash && this._logs[channel]) { + const other = List.fromIpfsHash(this._ipfs, hash); + this._logs[channel].join(other); + } + } + + /* DB Operations */ + read(channel, password, options) { + let opts = options || {}; + Object.assign(opts, { amount: opts.limit || 1 }); + let messages = await(this._logs[channel].find(opts)); + if(opts.reverse) messages.reverse(); + return messages; + } + + add(channel, password, data) { + const post = await(this._publish(data)); + const key = post.Hash; + return await(this._createOperation(channel, password, Operations.Add, key, post.Hash, data)); + } + + put(channel, password, key, data) { + const post = await(this._publish(data)); + return await(this._createOperation(channel, password, Operations.Put, key, post.Hash)); + } + + del(channel, password, hash) { + return await(this._createOperation(channel, password, Operations.Delete, hash, null)); + } + + deleteChannel(channel, password) { + this._logs[channel].clear(); + return true; + } + + /* Private methods */ + _createOperation(channel, password, operation, key, value, data) { + var createOperation = async(() => { + return new Promise(async((resolve, reject) => { + const hash = this._createMessage(channel, password, operation, key, value); + const res = await(this._logs[channel].add(hash)); + const listHash = await(this._logs[channel].ipfsHash); + resolve(listHash); + })); + }) + const hash = await(createOperation()); + this.events.emit('data', hash); + return key; + } + + _createMessage(channel, password, operation, key, value) { + const size = -1; + const meta = new MetaInfo(ItemTypes.Message, size, this.user.username, new Date().getTime()); + const item = new OrbitDBItem(operation, key, value, meta); + const data = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(item))); + return data.Hash; + } + + _publish(data) { + return new Promise((resolve, reject) => { + let post = new Post(data); + // post.encrypt(privkey, pubkey); + const res = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(post))); + resolve(res); + }) + } + +} + +module.exports = OrbitDB; diff --git a/src/list/OrbitList.js b/src/list/OrbitList.js index 6b31148..4f22679 100644 --- a/src/list/OrbitList.js +++ b/src/list/OrbitList.js @@ -25,7 +25,6 @@ class OrbitList extends List { const heads = List.findHeads(this.items); const node = new Node(this._ipfs, this.id, this.seq, this.ver, data, heads); - // node._commit(); // TODO: obsolete? this._currentBatch.push(node); this.ver ++; } @@ -36,7 +35,7 @@ class OrbitList extends List { } // The LWW-set query interface - findAll(opts) { + find(opts) { let list = Lazy(this.items); const hash = (opts.gt ? opts.gt : (opts.gte ? opts.gte : (opts.lt ? opts.lt : opts.lte))); const amount = opts.amount ? (opts.amount && opts.amount > -1 ? opts.amount : this.items.length) : 1; @@ -77,7 +76,7 @@ class OrbitList extends List { _fetchHistory(items) { let allHashes = this._items.map((a) => a.hash); const res = Lazy(items) - .reverse() + .reverse() // Start from the latest item .map((f) => f.heads).flatten() // Go through all heads .filter((f) => !(f instanceof Node === true)) // OrbitNode vs. {}, filter out instances (we already have them in mem) .map((f) => this._fetchRecursive(f, MaxHistory, allHashes)).flatten() // IO - get the data from IPFS @@ -92,7 +91,7 @@ class OrbitList extends List { let result = []; if(!isReferenced(all, hash)) { all.push(hash); - const item = await(Node.fromIpfsHash(this._ipfs, hash)); + const item = await(Node.fromIpfsHash(this._ipfs, hash)); // IO - get from IPFS result.push(item); result = result.concat(Lazy(item.heads) .map((f) => this._fetchRecursive(f, amount, all))