mirror of
https://github.com/orbitdb/orbitdb.git
synced 2025-05-22 14:56:38 +00:00
Split OrbitClient to OrbitClient and OrbitDB
This commit is contained in:
parent
0f592c65db
commit
7712c1943f
@ -4,55 +4,43 @@ const EventEmitter = require('events').EventEmitter;
|
||||
const async = require('asyncawait/async');
|
||||
const await = require('asyncawait/await');
|
||||
const ipfsDaemon = require('orbit-common/lib/ipfs-daemon');
|
||||
const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
|
||||
const Operations = require('./list/Operations');
|
||||
const List = require('./list/OrbitList');
|
||||
const OrbitDBItem = require('./db/OrbitDBItem');
|
||||
const ItemTypes = require('./db/ItemTypes');
|
||||
const MetaInfo = require('./db/MetaInfo');
|
||||
const Post = require('./db/Post');
|
||||
const PubSub = require('./PubSub');
|
||||
const OrbitDB = require('./OrbitDB');
|
||||
|
||||
class OrbitDB {
|
||||
class OrbitClient {
|
||||
constructor(ipfs, daemon) {
|
||||
this._ipfs = ipfs;
|
||||
this._store = {};
|
||||
this._pubsub = null;
|
||||
this.user = null;
|
||||
this.network = null;
|
||||
this.events = new EventEmitter();
|
||||
this.db = new OrbitDB(this._ipfs);
|
||||
}
|
||||
|
||||
channel(hash, password, subscribe) {
|
||||
channel(channel, password, subscribe) {
|
||||
if(password === undefined) password = '';
|
||||
if(subscribe === undefined) subscribe = true;
|
||||
|
||||
this._store[hash] = new List(this._ipfs, this.user.username);
|
||||
|
||||
const onMessage = async((hash, message) => {
|
||||
// console.log("--> Head:", message)
|
||||
if(message && this._store[hash]) {
|
||||
const other = List.fromIpfsHash(this._ipfs, message);
|
||||
this._store[hash].join(other);
|
||||
}
|
||||
this.events.emit('data', hash, message);
|
||||
});
|
||||
this.db.use(channel, this.user, password);
|
||||
this.db.events.on('data', async((hash) => {
|
||||
await(this._pubsub.publish(channel, hash));
|
||||
this.events.emit('data', channel, hash);
|
||||
}));
|
||||
|
||||
if(subscribe)
|
||||
this._pubsub.subscribe(hash, password, onMessage, onMessage);
|
||||
this._pubsub.subscribe(channel, password, async((channel, message) => this.db.sync(channel, message)));
|
||||
|
||||
return {
|
||||
iterator: (options) => this._iterator(hash, password, options),
|
||||
delete: () => this._deleteChannel(hash, password),
|
||||
add: (data) => this._add(hash, password, data),
|
||||
del: (key) => this._remove(hash, password, key),
|
||||
put: (key, data) => this._put(hash, password, key, data),
|
||||
iterator: (options) => this._iterator(channel, password, options),
|
||||
delete: () => this.db.deleteChannel(channel, password),
|
||||
del: (key) => this.db.del(channel, password, key),
|
||||
add: (data) => this.db.add(channel, password, data),
|
||||
put: (key, data) => this.db.put(channel, password, key, data),
|
||||
get: (key, options) => {
|
||||
const items = this._iterator(hash, password, { key: key }).collect();
|
||||
const items = this._iterator(channel, password, { key: key }).collect();
|
||||
return items[0] ? items[0].payload.value : null;
|
||||
},
|
||||
//TODO: tests
|
||||
leave: () => this._pubsub.unsubscribe(hash)
|
||||
leave: () => this._pubsub.unsubscribe(channel)
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,10 +49,11 @@ class OrbitDB {
|
||||
this._store = {};
|
||||
this.user = null;
|
||||
this.network = null;
|
||||
this.db = null;
|
||||
}
|
||||
|
||||
_iterator(channel, password, options) {
|
||||
const messages = this._getMessages(channel, password, options);
|
||||
const messages = this.db.read(channel, password, options);
|
||||
let currentIndex = 0;
|
||||
let iterator = {
|
||||
[Symbol.iterator]() {
|
||||
@ -84,67 +73,6 @@ class OrbitDB {
|
||||
return iterator;
|
||||
}
|
||||
|
||||
_getMessages(channel, password, options) {
|
||||
let opts = options || {};
|
||||
Object.assign(opts, { amount: opts.limit || 1 });
|
||||
let messages = await(this._store[channel].findAll(opts));
|
||||
if(opts.reverse) messages.reverse();
|
||||
return messages;
|
||||
}
|
||||
|
||||
_publish(data) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let post = new Post(data);
|
||||
// post.encrypt(privkey, pubkey);
|
||||
const res = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(post)));
|
||||
resolve(res);
|
||||
})
|
||||
}
|
||||
|
||||
_createMessage(channel, password, operation, key, value) {
|
||||
const size = -1;
|
||||
const meta = new MetaInfo(ItemTypes.Message, size, this.user.username, new Date().getTime());
|
||||
const item = new OrbitDBItem(operation, key, value, meta);
|
||||
const data = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(item)));
|
||||
return data.Hash;
|
||||
}
|
||||
|
||||
/* DB Operations */
|
||||
_add(channel, password, data) {
|
||||
const post = await(this._publish(data));
|
||||
const key = post.Hash;
|
||||
return await(this._createOperation(channel, password, Operations.Add, key, post.Hash, data));
|
||||
}
|
||||
|
||||
_put(channel, password, key, data) {
|
||||
const post = await(this._publish(data));
|
||||
return await(this._createOperation(channel, password, Operations.Put, key, post.Hash));
|
||||
}
|
||||
|
||||
_remove(channel, password, hash) {
|
||||
return await(this._createOperation(channel, password, Operations.Delete, hash, null));
|
||||
}
|
||||
|
||||
_createOperation(channel, password, operation, key, value, data) {
|
||||
var createOperation = async(() => {
|
||||
return new Promise(async((resolve, reject) => {
|
||||
const hash = this._createMessage(channel, password, operation, key, value);
|
||||
const res = await(this._store[channel].add(hash));
|
||||
const listHash = await(this._store[channel].ipfsHash);
|
||||
await(this._pubsub.publish(channel, listHash));
|
||||
resolve();
|
||||
}));
|
||||
})
|
||||
await(createOperation());
|
||||
return key;
|
||||
// return res;
|
||||
}
|
||||
|
||||
_deleteChannel(channel, password) {
|
||||
this._store[channel].clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
_connect(host, port, username, password, allowOffline) {
|
||||
if(allowOffline === undefined) allowOffline = false;
|
||||
try {
|
||||
@ -165,7 +93,7 @@ class OrbitClientFactory {
|
||||
ipfs = ipfsd.ipfs;
|
||||
}
|
||||
|
||||
const client = new OrbitDB(ipfs);
|
||||
const client = new OrbitClient(ipfs);
|
||||
await(client._connect(host, port, username, password, allowOffline))
|
||||
return client;
|
||||
}
|
||||
|
98
src/OrbitDB.js
Normal file
98
src/OrbitDB.js
Normal file
@ -0,0 +1,98 @@
|
||||
'use strict';
|
||||
|
||||
const EventEmitter = require('events').EventEmitter;
|
||||
const async = require('asyncawait/async');
|
||||
const await = require('asyncawait/await');
|
||||
const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
|
||||
const Operations = require('./list/Operations');
|
||||
const List = require('./list/OrbitList');
|
||||
const OrbitDBItem = require('./db/OrbitDBItem');
|
||||
const ItemTypes = require('./db/ItemTypes');
|
||||
const MetaInfo = require('./db/MetaInfo');
|
||||
const Post = require('./db/Post');
|
||||
|
||||
class OrbitDB {
|
||||
constructor(ipfs) {
|
||||
this._ipfs = ipfs;
|
||||
this._logs = {};
|
||||
this.events = new EventEmitter();
|
||||
}
|
||||
|
||||
/* Public methods */
|
||||
use(channel, user, password) {
|
||||
this.user = user;
|
||||
this._logs[channel] = new List(this._ipfs, this.user.username);
|
||||
}
|
||||
|
||||
sync(channel, hash) {
|
||||
console.log("--> Head:", hash)
|
||||
if(hash && this._logs[channel]) {
|
||||
const other = List.fromIpfsHash(this._ipfs, hash);
|
||||
this._logs[channel].join(other);
|
||||
}
|
||||
}
|
||||
|
||||
/* DB Operations */
|
||||
read(channel, password, options) {
|
||||
let opts = options || {};
|
||||
Object.assign(opts, { amount: opts.limit || 1 });
|
||||
let messages = await(this._logs[channel].find(opts));
|
||||
if(opts.reverse) messages.reverse();
|
||||
return messages;
|
||||
}
|
||||
|
||||
add(channel, password, data) {
|
||||
const post = await(this._publish(data));
|
||||
const key = post.Hash;
|
||||
return await(this._createOperation(channel, password, Operations.Add, key, post.Hash, data));
|
||||
}
|
||||
|
||||
put(channel, password, key, data) {
|
||||
const post = await(this._publish(data));
|
||||
return await(this._createOperation(channel, password, Operations.Put, key, post.Hash));
|
||||
}
|
||||
|
||||
del(channel, password, hash) {
|
||||
return await(this._createOperation(channel, password, Operations.Delete, hash, null));
|
||||
}
|
||||
|
||||
deleteChannel(channel, password) {
|
||||
this._logs[channel].clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Private methods */
|
||||
_createOperation(channel, password, operation, key, value, data) {
|
||||
var createOperation = async(() => {
|
||||
return new Promise(async((resolve, reject) => {
|
||||
const hash = this._createMessage(channel, password, operation, key, value);
|
||||
const res = await(this._logs[channel].add(hash));
|
||||
const listHash = await(this._logs[channel].ipfsHash);
|
||||
resolve(listHash);
|
||||
}));
|
||||
})
|
||||
const hash = await(createOperation());
|
||||
this.events.emit('data', hash);
|
||||
return key;
|
||||
}
|
||||
|
||||
_createMessage(channel, password, operation, key, value) {
|
||||
const size = -1;
|
||||
const meta = new MetaInfo(ItemTypes.Message, size, this.user.username, new Date().getTime());
|
||||
const item = new OrbitDBItem(operation, key, value, meta);
|
||||
const data = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(item)));
|
||||
return data.Hash;
|
||||
}
|
||||
|
||||
_publish(data) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let post = new Post(data);
|
||||
// post.encrypt(privkey, pubkey);
|
||||
const res = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(post)));
|
||||
resolve(res);
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
module.exports = OrbitDB;
|
@ -25,7 +25,6 @@ class OrbitList extends List {
|
||||
|
||||
const heads = List.findHeads(this.items);
|
||||
const node = new Node(this._ipfs, this.id, this.seq, this.ver, data, heads);
|
||||
// node._commit(); // TODO: obsolete?
|
||||
this._currentBatch.push(node);
|
||||
this.ver ++;
|
||||
}
|
||||
@ -36,7 +35,7 @@ class OrbitList extends List {
|
||||
}
|
||||
|
||||
// The LWW-set query interface
|
||||
findAll(opts) {
|
||||
find(opts) {
|
||||
let list = Lazy(this.items);
|
||||
const hash = (opts.gt ? opts.gt : (opts.gte ? opts.gte : (opts.lt ? opts.lt : opts.lte)));
|
||||
const amount = opts.amount ? (opts.amount && opts.amount > -1 ? opts.amount : this.items.length) : 1;
|
||||
@ -77,7 +76,7 @@ class OrbitList extends List {
|
||||
_fetchHistory(items) {
|
||||
let allHashes = this._items.map((a) => a.hash);
|
||||
const res = Lazy(items)
|
||||
.reverse()
|
||||
.reverse() // Start from the latest item
|
||||
.map((f) => f.heads).flatten() // Go through all heads
|
||||
.filter((f) => !(f instanceof Node === true)) // OrbitNode vs. {}, filter out instances (we already have them in mem)
|
||||
.map((f) => this._fetchRecursive(f, MaxHistory, allHashes)).flatten() // IO - get the data from IPFS
|
||||
@ -92,7 +91,7 @@ class OrbitList extends List {
|
||||
let result = [];
|
||||
if(!isReferenced(all, hash)) {
|
||||
all.push(hash);
|
||||
const item = await(Node.fromIpfsHash(this._ipfs, hash));
|
||||
const item = await(Node.fromIpfsHash(this._ipfs, hash)); // IO - get from IPFS
|
||||
result.push(item);
|
||||
result = result.concat(Lazy(item.heads)
|
||||
.map((f) => this._fetchRecursive(f, amount, all))
|
||||
|
Loading…
x
Reference in New Issue
Block a user