Merge pull request #763 from 3box/update-peer-exchanged

Update peer exchanged
This commit is contained in:
Mark Robert Henderson 2020-08-24 12:21:57 -04:00 committed by GitHub
commit bd777c53b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 2209 additions and 1596 deletions

8
API.md
View File

@ -65,6 +65,7 @@ Read the **[GETTING STARTED](https://github.com/orbitdb/orbit-db/blob/master/GUI
* [`write`](#write) * [`write`](#write)
* [`peer`](#peer) * [`peer`](#peer)
* [`closed`](#closed) * [`closed`](#closed)
* [`peer.exchanged`](#peerexchanged)
<!-- tocstop --> <!-- tocstop -->
@ -642,3 +643,10 @@ Emitted once the database has finished closing.
```javascript ```javascript
db.events.on('closed', (dbname) => ... ) db.events.on('closed', (dbname) => ... )
``` ```
### `peer.exchanged`
```javascript
db.events.on('peer.exchanged', (peer, address, heads) => ... )
```
Emitted after heads have been exchanged with a peer for a specific database. This will be emitted after every exchange, even if no heads are received from the peer, or if all received heads are already present. (This is in contrast with the `replicated` event, which will only fire when new heads have been received.) Note that `heads` here contains heads *received* as part of the exchange, not heads sent.

3742
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -250,12 +250,15 @@ class OrbitDB {
} }
// Callback for receiving a message from the network // Callback for receiving a message from the network
async _onMessage (address, heads) { async _onMessage (address, heads, peer) {
const store = this.stores[address] const store = this.stores[address]
try { try {
logger.debug(`Received ${heads.length} heads for '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2)) logger.debug(`Received ${heads.length} heads for '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2))
if (store && heads && heads.length > 0) { if (store && heads) {
await store.sync(heads) if (heads.length > 0) {
await store.sync(heads)
}
store.events.emit('peer.exchanged', peer, address, heads)
} }
} catch (e) { } catch (e) {
logger.error(e) logger.error(e)
@ -270,7 +273,7 @@ class OrbitDB {
const getDirectConnection = peer => this._directConnections[peer] const getDirectConnection = peer => this._directConnections[peer]
const onChannelCreated = channel => { this._directConnections[channel._receiverID] = channel } const onChannelCreated = channel => { this._directConnections[channel._receiverID] = channel }
const onMessage = (address, heads) => this._onMessage(address, heads) const onMessage = (address, heads) => this._onMessage(address, heads, peer)
await exchangeHeads( await exchangeHeads(
this._ipfs, this._ipfs,

View File

@ -152,9 +152,10 @@ Object.keys(testAPIs).forEach(API => {
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
let finished = false let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 } let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0, 'peer.exchanged': 0 }
let events = [] let events = []
let expectedEventCount = 99 let expectedEventCount = 99
let expectedPeerExchangeCount = 99
db2.events.on('replicate', (address, entry) => { db2.events.on('replicate', (address, entry) => {
eventCount['replicate'] ++ eventCount['replicate'] ++
@ -178,6 +179,15 @@ Object.keys(testAPIs).forEach(API => {
}) })
}) })
db2.events.on('peer.exchanged', (address, entry) => {
eventCount['peer.exchanged'] ++
events.push({
event: 'peer.exchanged',
count: eventCount['peer.exchanged'],
entry: entry,
})
})
db2.events.on('replicated', (address) => { db2.events.on('replicated', (address) => {
eventCount['replicated'] ++ eventCount['replicated'] ++
events.push({ events.push({
@ -192,7 +202,7 @@ Object.keys(testAPIs).forEach(API => {
// don't receive more than one event // don't receive more than one event
setTimeout(() => { setTimeout(() => {
finished = db2.iterator({ limit: -1 }).collect().length === expectedEventCount finished = db2.iterator({ limit: -1 }).collect().length === expectedEventCount
}, 200) }, 1000)
}) })
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
@ -203,6 +213,7 @@ Object.keys(testAPIs).forEach(API => {
assert.equal(eventCount['replicate'], expectedEventCount) assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount) assert.equal(eventCount['replicate.progress'], expectedEventCount)
assert.equal(eventCount['peer.exchanged'] >= expectedPeerExchangeCount, true, 'insuficcient peer.exchanged events fired')
const replicateEvents = events.filter(e => e.event === 'replicate') const replicateEvents = events.filter(e => e.event === 'replicate')
const minClock = Math.min(...replicateEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time)) const minClock = Math.min(...replicateEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time))
@ -242,9 +253,10 @@ Object.keys(testAPIs).forEach(API => {
it('emits correct replication info on fresh replication', async () => { it('emits correct replication info on fresh replication', async () => {
return new Promise(async (resolve, reject) => { return new Promise(async (resolve, reject) => {
let finished = false let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 } let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0, 'peer.exchanged': 0 }
let events = [] let events = []
let expectedEventCount = 512 let expectedEventCount = 512
let expectedPeerExchangeCount = 1
// Trigger replication // Trigger replication
let adds = [] let adds = []
@ -293,6 +305,16 @@ Object.keys(testAPIs).forEach(API => {
}) })
}) })
db2.events.on('peer.exchanged', (address, entry) => {
eventCount['peer.exchanged'] ++
// console.log("[replicate] ", '#' + eventCount['replicate'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
events.push({
event: 'peer.exchanged',
count: eventCount['peer.exchanged'],
entry: entry,
})
})
db2.events.on('replicated', (address, length) => { db2.events.on('replicated', (address, length) => {
eventCount['replicated'] += length eventCount['replicated'] += length
// console.log("[replicated]", '#' + eventCount['replicated'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|") // console.log("[replicated]", '#' + eventCount['replicated'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|")
@ -319,7 +341,7 @@ Object.keys(testAPIs).forEach(API => {
if (eventCount['replicated'] === expectedEventCount) { if (eventCount['replicated'] === expectedEventCount) {
finished = true finished = true
} }
}, 100) }, 1000)
}) })
const st = new Date().getTime() const st = new Date().getTime()
@ -333,6 +355,7 @@ Object.keys(testAPIs).forEach(API => {
try { try {
assert.equal(eventCount['replicate'], expectedEventCount) assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount) assert.equal(eventCount['replicate.progress'], expectedEventCount)
assert.equal(eventCount['peer.exchanged'], expectedPeerExchangeCount)
const replicateEvents = events.filter(e => e.event === 'replicate') const replicateEvents = events.filter(e => e.event === 'replicate')
const maxClock = Math.max(...replicateEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time)) const maxClock = Math.max(...replicateEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time))
@ -366,9 +389,10 @@ Object.keys(testAPIs).forEach(API => {
it('emits correct replication info in two-way replication', async () => { it('emits correct replication info in two-way replication', async () => {
return new Promise(async (resolve, reject) => { return new Promise(async (resolve, reject) => {
let finished = false let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 } let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0, 'peer.exchanged': 0 }
let events = [] let events = []
let expectedEventCount = 100 let expectedEventCount = 100
let expectedPeerExchangeCount = 100
// Trigger replication // Trigger replication
let adds = [] let adds = []
@ -418,6 +442,15 @@ Object.keys(testAPIs).forEach(API => {
}) })
}) })
db2.events.on('peer.exchanged', (address, entry) => {
eventCount['peer.exchanged'] ++
events.push({
event: 'peer.exchanged',
count: eventCount['peer.exchanged'],
entry: entry,
})
})
db2.events.on('replicated', (address, length) => { db2.events.on('replicated', (address, length) => {
eventCount['replicated'] += length eventCount['replicated'] += length
const values = db2.iterator({limit: -1}).collect() const values = db2.iterator({limit: -1}).collect()
@ -457,6 +490,7 @@ Object.keys(testAPIs).forEach(API => {
assert.equal(eventCount['replicate'], expectedEventCount) assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount) assert.equal(eventCount['replicate.progress'], expectedEventCount)
assert.equal(eventCount['replicated'], expectedEventCount) assert.equal(eventCount['replicated'], expectedEventCount)
assert.equal(eventCount['peer.exchanged'] >= expectedPeerExchangeCount, true, 'insuficcient peer.exchanged events fired')
const replicateEvents = events.filter(e => e.event === 'replicate') const replicateEvents = events.filter(e => e.event === 'replicate')
assert.equal(replicateEvents.length, expectedEventCount) assert.equal(replicateEvents.length, expectedEventCount)