diff --git a/examples/pubsubKeyValue.js b/examples/pubsubKeyValue.js index 746daf9..0fb697e 100644 --- a/examples/pubsubKeyValue.js +++ b/examples/pubsubKeyValue.js @@ -22,25 +22,28 @@ let run = (async(() => { let id = 'Log: Query ' let running = false; - setInterval(async(() => { - if(!running) { + // setInterval(async(() => { + // if(!running) { + while(true) { running = true; - // let timer = new Timer(true); - channel.put("lamb", "of god" + count); - // console.log(`Query #${count} took ${timer.stop(true)} ms\n`); - let v = channel.get("lamb"); + const key = "Lamb"; + let timer = new Timer(true); + channel.put(key, "Of God " + count); + let v = channel.get(key); + console.log(`Query #${count} took ${timer.stop(true)} ms\n`); console.log("---------------------------------------------------") - console.log("Id | Seq | Ver | Data") + console.log("Key | Value") console.log("---------------------------------------------------") - console.log(v); + console.log(`${key} | ${v}`); console.log("---------------------------------------------------") running = false; count ++; - } - }), 500); + } + // } + // }), 500); } catch(e) { console.error("error:", e); diff --git a/src/DataStore.js b/src/DataStore.js index 28ccacf..cc67a12 100644 --- a/src/DataStore.js +++ b/src/DataStore.js @@ -7,6 +7,8 @@ const ipfsAPI = require('orbit-common/lib/ipfs-api-promised'); const OrbitList = require('./list/OrbitList'); const HashCacheOps = require('./HashCacheOps'); +var Timer = require('../examples/Timer'); + const DefaultAmount = 1; class DataStore { @@ -16,7 +18,7 @@ class DataStore { } add(hash) { - this.list.add(hash); + return this.list.add(hash); } join(other) { diff --git a/src/OrbitClient.js b/src/OrbitClient.js index ee7470a..42b083f 100644 --- a/src/OrbitClient.js +++ b/src/OrbitClient.js @@ -124,12 +124,9 @@ class OrbitClient { } _createMessage(channel, password, operation, key, value) { - // Create meta info const size = -1; - const metaInfo = new MetaInfo(ItemTypes.Message, size, this.user.id, new Date().getTime()); - // Create the hash cache item - const item = new OrbitDBItem(operation, key, value, metaInfo); - // Save the item to ipfs + const meta = new MetaInfo(ItemTypes.Message, size, this.user.id, new Date().getTime()); + const item = new OrbitDBItem(operation, key, value, meta); const data = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(item))); return data.Hash; } @@ -151,11 +148,12 @@ class OrbitClient { } _createOperation(channel, password, operation, key, value, data) { - let hash = this._createMessage(channel, password, operation, key, value); - this._store.add(hash); + const hash = this._createMessage(channel, password, operation, key, value); + const res = await(this._store.add(hash)); const listHash = await(this._store.list.getIpfsHash()); await(this._pubsub.publish(channel, listHash)); return key; + // return res; } _deleteChannel(channel, password) { diff --git a/src/list/List.js b/src/list/List.js index fbf8736..10ca246 100644 --- a/src/list/List.js +++ b/src/list/List.js @@ -24,13 +24,8 @@ class List { } join(other) { - if(other.seq && other.seq > this.seq) { - this.seq = other.seq + 1; - this.ver = 0; - } else { - this.seq = this.seq + 1; - this.ver = 0; - } + this.seq = (other.seq && other.seq > this.seq ? other.seq : this.seq) + 1; + this.ver = 0; const current = _.differenceWith(this._currentBatch, this._items, this._equals); const others = _.differenceWith(other.items, this._items, this._equals); const final = _.unionWith(current, others, this._equals); diff --git a/src/list/OrbitList.js b/src/list/OrbitList.js index 627beb3..db8c071 100644 --- a/src/list/OrbitList.js +++ b/src/list/OrbitList.js @@ -7,6 +7,8 @@ const ipfsAPI = require('orbit-common/lib/ipfs-api-promised'); const List = require('./List'); const Node = require('./OrbitNode'); +const MaxBatchSize = 200; + class OrbitList extends List { constructor(id, ipfs) { super(id); @@ -19,6 +21,11 @@ class OrbitList extends List { const node = new Node(this._ipfs, this.id, this.seq, this.ver, data, heads); this._currentBatch.push(node); this.ver ++; + + if(this.ver >= MaxBatchSize) + this._commit(); + + return node.ipfsHash; } clear() { @@ -27,27 +34,36 @@ class OrbitList extends List { } getIpfsHash() { - return new Promise(async((resolve, reject) => { - const list = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(this.toJson()))); - resolve(list.Hash); - })); + const list = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(this.toJson()))); + return list.Hash; } static fromIpfsHash(ipfs, hash) { - return new Promise(async((resolve, reject) => { - const l = await(ipfsAPI.getObject(ipfs, hash)); - const list = OrbitList.fromJson(ipfs, JSON.parse(l.Data)); - resolve(list); - })); + const l = await(ipfsAPI.getObject(ipfs, hash)); + const list = OrbitList.fromJson(ipfs, JSON.parse(l.Data)); + return list; } static fromJson(ipfs, json) { let list = new List(json.id); list.seq = json.seq; list.ver = json.ver; - list._items = _.uniqWith(json.items.map((f) => new Node(ipfs, f.id, f.seq, f.ver, f.data, f.next)), _.isEqual); + // list._items = _.uniqWith(json.items.map((f) => new Node(ipfs, f.id, f.seq, f.ver, f.data, f.next)), _.isEqual); + list._items = json.items.map((f) => new Node(ipfs, f.id, f.seq, f.ver, f.data, f.next)); return list; } + + static get batchSize() { + return MaxBatchSize; + } + + _commit() { + const current = _.differenceWith(this._currentBatch, this._items, this._equals); + this._items = this._items.concat(current); + this._currentBatch = []; + this.ver = 0; + this.seq ++; + } } module.exports = OrbitList; diff --git a/src/list/OrbitNode.js b/src/list/OrbitNode.js index 38cfc5d..14cc9e8 100644 --- a/src/list/OrbitNode.js +++ b/src/list/OrbitNode.js @@ -21,19 +21,23 @@ class OrbitNode extends Node { return "" + this.id + "." + this.seq + "." + this.ver + "." + this.hash; } + get ipfsHash() { + if(!this.hash) { + const t = this.compact(); + const r = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(t))); + this.hash = r.Hash; + } + return this.hash; + } + getPayload() { - if(!this.Payload) { - return new Promise(async((resolve, reject) => { - const payload = await(ipfsAPI.getObject(this._ipfs, this.data)); - this.Payload = JSON.parse(payload.Data); - if(this.Payload.value) { - const value = await(ipfsAPI.getObject(this._ipfs, this.Payload.value)); - this.Payload.value = JSON.parse(value.Data)["content"]; - } - resolve(this); - })); - } else { - return this; + if(!this.Payload) { + const payload = await(ipfsAPI.getObject(this._ipfs, this.data)); + this.Payload = JSON.parse(payload.Data); + if(this.Payload.value) { + const value = await(ipfsAPI.getObject(this._ipfs, this.Payload.value)); + this.Payload.value = JSON.parse(value.Data)["content"]; + } } } diff --git a/test/orbit-client-tests.js b/test/orbit-client-tests.js index 9437013..d700486 100644 --- a/test/orbit-client-tests.js +++ b/test/orbit-client-tests.js @@ -70,6 +70,7 @@ describe('Orbit Client', () => { it('adds five items', async((done) => { for(let i = 0; i < 5; i ++) { let hash = db.add('hello'); + // console.log(hash) assert.notEqual(hash, null); assert.equal(hash.startsWith('Qm'), true); assert.equal(hash.length, 46); @@ -78,7 +79,7 @@ describe('Orbit Client', () => { })); it('adds an item that is > 256 bytes', async((done) => { - let msg = new Buffer(512); + let msg = new Buffer(1024); msg.fill('a') const hash = db.add(msg.toString()); assert.notEqual(hash, null); diff --git a/test/orbit-list-tests.js b/test/orbit-list-tests.js index af905f2..036277a 100644 --- a/test/orbit-list-tests.js +++ b/test/orbit-list-tests.js @@ -231,6 +231,30 @@ describe('OrbitList', async(function() { done(); })); + + it('commits a list after batch size was reached', async((done) => { + const list = new List('A', ipfs); + + for(let i = 1; i <= List.batchSize; i ++) { + list.add("hello" + i); + } + + assert.equal(list.id, 'A'); + assert.equal(list.seq, 1); + assert.equal(list.ver, 0); + assert.equal(list.items.length, List.batchSize); + assert.equal(list._currentBatch.length, 0); + assert.equal(list._items.length, List.batchSize); + + const item = list.items[list.items.length - 1]; + assert.equal(item.id, 'A'); + assert.equal(item.seq, 0); + assert.equal(item.ver, List.batchSize - 1); + assert.equal(item.data, 'hello' + List.batchSize); + assert.equal(item.next, 'A.0.198.QmRKrcfkejCvxTxApZACjHpxzAKKGnCtFi2rD31CT7RkBS'); + + done(); + })); }); describe('join', () => {