[NOD-858] Don't switch sync peer if the syncing process hasn't yet started with the current sync peer (#700)

* [NOD-858] Don't switch sync peer if the syncing process hasn't yet started with the current sync peer

* [NOD-858] SetShouldSendBlockLocator(false) on OnBlockLocator

* [NOD-858] Rename shouldSendBlockLocator->wasBlockLocatorRequested

* [NOD-858] Move panic to shouldReplaceSyncPeer
This commit is contained in:
Ori Newman 2020-04-13 15:49:46 +03:00 committed by GitHub
parent 2f255952b7
commit 3fd647b291
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 16 deletions

View File

@ -157,6 +157,7 @@ type SyncManager struct {
msgChan chan interface{}
wg sync.WaitGroup
quit chan struct{}
syncPeerLock sync.Mutex
// These fields should only be accessed from the messageHandler thread
rejectedTxns map[daghash.TxID]struct{}
@ -170,6 +171,8 @@ type SyncManager struct {
// download/sync the blockDAG from. When syncing is already running, it
// simply returns. It also examines the candidates for any which are no longer
// candidates and removes them as needed.
//
// This function MUST be called with the sync peer lock held.
func (sm *SyncManager) startSync() {
// Return now if we're already syncing.
if sm.syncPeer != nil {
@ -189,6 +192,7 @@ func (sm *SyncManager) startSync() {
// TODO(davec): Use a better algorithm to choose the sync peer.
// For now, just pick the first available candidate.
syncPeer = peer
break
}
// Start syncing from the sync peer if one was selected.
@ -294,8 +298,8 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
}
// Start syncing by choosing the best candidate if needed.
if isSyncCandidate && sm.syncPeer == nil {
sm.startSync()
if isSyncCandidate {
sm.restartSyncIfNeeded()
}
}
@ -337,7 +341,7 @@ func (sm *SyncManager) stopSyncFromPeer(peer *peerpkg.Peer) {
// sync peer.
if sm.syncPeer == peer {
sm.syncPeer = nil
sm.startSync()
sm.restartSyncIfNeeded()
}
}
@ -427,24 +431,34 @@ func (sm *SyncManager) current() bool {
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any
// blocks from the current one.
func (sm *SyncManager) restartSyncIfNeeded() {
if sm.syncPeer != nil {
syncPeerState, exists := sm.peerStates[sm.syncPeer]
if exists {
isWaitingForBlocks := func() bool {
syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0
}()
if isWaitingForBlocks {
return
}
}
sm.syncPeerLock.Lock()
defer sm.syncPeerLock.Unlock()
if !sm.shouldReplaceSyncPeer() {
return
}
sm.syncPeer = nil
sm.startSync()
}
func (sm *SyncManager) shouldReplaceSyncPeer() bool {
if sm.syncPeer == nil {
return true
}
syncPeerState, exists := sm.peerStates[sm.syncPeer]
if !exists {
panic(errors.Errorf("no peer state for sync peer %s", sm.syncPeer))
}
syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) == 0 &&
len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) == 0 &&
!sm.syncPeer.WasBlockLocatorRequested()
}
// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer
@ -905,7 +919,7 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
return
}
peer.SetSelectedTipHash(selectedTipHash)
sm.startSync()
sm.restartSyncIfNeeded()
}
// messageHandler is the main handler for the sync manager. It must be run as a

View File

@ -414,6 +414,8 @@ type Peer struct {
prevGetBlockInvsLow *daghash.Hash
prevGetBlockInvsHigh *daghash.Hash
wasBlockLocatorRequested bool
// These fields keep track of statistics for the peer and are protected
// by the statsMtx mutex.
statsMtx sync.RWMutex
@ -435,6 +437,20 @@ type Peer struct {
quit chan struct{}
}
// WasBlockLocatorRequested returns whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) WasBlockLocatorRequested() bool {
return p.wasBlockLocatorRequested
}
// SetWasBlockLocatorRequested sets whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) SetWasBlockLocatorRequested(wasBlockLocatorRequested bool) {
p.wasBlockLocatorRequested = wasBlockLocatorRequested
}
// String returns the peer's address and directionality as a human-readable
// string.
//
@ -775,6 +791,7 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress, subnetworkID *subnetwor
//
// This function is safe for concurrent access.
func (p *Peer) PushGetBlockLocatorMsg(highHash, lowHash *daghash.Hash) {
p.SetWasBlockLocatorRequested(true)
msg := wire.NewMsgGetBlockLocator(highHash, lowHash)
p.QueueMessage(msg, nil)
}

View File

@ -8,6 +8,7 @@ import (
// OnBlockLocator is invoked when a peer receives a locator kaspa
// message.
func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) {
sp.SetWasBlockLocatorRequested(false)
// Find the highest known shared block between the peers, and asks
// the block and its future from the peer. If the block is not
// found, create a lower resolution block locator and send it to