Use ipfs-pubsub-1on1 to exchange heads between connected peers

This commit is contained in:
haad 2018-03-17 08:11:02 +02:00
parent af6f3faa1d
commit d191876ea9
4 changed files with 1081 additions and 657 deletions

1672
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -13,15 +13,16 @@
},
"main": "src/OrbitDB.js",
"dependencies": {
"ipfs-pubsub-1on1": "~0.0.2",
"logplease": "^1.2.14",
"multihashes": "^0.4.12",
"orbit-db-cache": "~0.2.2",
"orbit-db-counterstore": "~1.2.0",
"orbit-db-docstore": "~1.2.0",
"orbit-db-eventstore": "~1.2.0",
"orbit-db-feedstore": "~1.2.0",
"orbit-db-counterstore": "~1.3.0",
"orbit-db-docstore": "~1.3.0",
"orbit-db-eventstore": "~1.3.0",
"orbit-db-feedstore": "~1.3.0",
"orbit-db-keystore": "~0.1.0",
"orbit-db-kvstore": "~1.2.0",
"orbit-db-kvstore": "~1.3.0",
"orbit-db-pubsub": "~0.4.0"
},
"devDependencies": {

View File

@ -7,6 +7,7 @@ const KeyValueStore = require('orbit-db-kvstore')
const CounterStore = require('orbit-db-counterstore')
const DocumentStore = require('orbit-db-docstore')
const Pubsub = require('orbit-db-pubsub')
const Channel = require('ipfs-pubsub-1on1')
const Cache = require('orbit-db-cache')
const Keystore = require('orbit-db-keystore')
const AccessController = require('./ipfs-access-controller')
@ -37,6 +38,7 @@ class OrbitDB {
this.directory = directory || './orbitdb'
this.keystore = options.keystore || Keystore.create(path.join(this.directory, this.id, '/keystore'))
this.key = this.keystore.getKey(this.id) || this.keystore.createKey(this.id)
this._directConnections = {}
}
/* Databases */
@ -85,6 +87,12 @@ class OrbitDB {
delete this.stores[db.address.toString()]
}
// Close all direct connections to peers
Object.keys(this._directConnections).forEach(e => {
this._directConnections[e].close()
delete this._directConnections[e]
})
// Disconnect from pubsub
if (this._pubsub)
this._pubsub.disconnect()
@ -118,11 +126,11 @@ class OrbitDB {
accessController: accessController,
keystore: this.keystore,
cache: cache,
onClose: this._onClose.bind(this),
})
const store = new Store(this._ipfs, this.id, address, opts)
store.events.on('write', this._onWrite.bind(this))
store.events.on('closed', this._onClosed.bind(this))
// ID of the store is the address as a string
const addr = address.toString()
@ -145,33 +153,49 @@ class OrbitDB {
const store = this.stores[address]
try {
logger.debug(`Received ${heads.length} heads for '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2))
await store.sync(heads)
if (store)
await store.sync(heads)
} catch (e) {
logger.error(e)
}
}
// Callback for when a peer connected to a database
_onPeerConnected (address, peer, room) {
async _onPeerConnected (address, peer, room) {
logger.debug(`New peer '${peer}' connected to '${address}'`)
const store = this.stores[address]
if (store) {
// Create a direct channel to the connected peer
let channel = this._directConnections[peer]
if (!channel) {
try {
logger.debug(`Create a channel`)
channel = await Channel.open(this._ipfs, peer)
channel.on('message', (message) => this._onMessage(address, JSON.parse(message.data)))
this._directConnections[peer] = channel
logger.debug(`Channel created`)
} catch (e) {
console.error(e)
logger.error(e)
}
}
// Send the newly connected peer our latest heads
let heads = store._oplog.heads
if (heads.length > 0) {
// Wait for the direct channel to be fully connected
await channel.connect()
logger.debug(`Send latest heads of '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2))
room.sendTo(peer, JSON.stringify(heads))
channel.send(JSON.stringify(heads))
}
store.events.emit('peer', peer)
} else {
logger.error(`Database '${address}' is not open!`)
}
}
// Callback when database was closed
_onClosed (address) {
logger.debug(`Database '${address}' was closed`)
// Remove the callback from the database
this.stores[address].events.removeAllListeners('closed')
_onClose (address) {
logger.debug(`Close ${address}`)
// Unsubscribe from pubsub
if(this._pubsub)

View File

@ -41,16 +41,11 @@ describe('orbit-db - Automatic Replication', function() {
if(orbitdb2)
await orbitdb2.stop()
return new Promise((resolve) => {
setTimeout(async () => {
if (ipfs1)
await ipfs1.stop()
if (ipfs1)
await ipfs1.stop()
if (ipfs2)
await ipfs2.stop()
resolve()
}, 2000)
})
if (ipfs2)
await ipfs2.stop()
})
beforeEach(async () => {