[NOD-858] Don't switch sync peer if the syncing process hasn't yet started with the current sync peer (#700)

* [NOD-858] Don't switch sync peer if the syncing process hasn't yet started with the current sync peer

* [NOD-858] SetShouldSendBlockLocator(false) on OnBlockLocator

* [NOD-858] Rename shouldSendBlockLocator->wasBlockLocatorRequested

* [NOD-858] Move panic to shouldReplaceSyncPeer
This commit is contained in:
Ori Newman 2020-04-13 15:49:46 +03:00 committed by Mike Zak
parent d015286f65
commit 291df8bfef
3 changed files with 48 additions and 16 deletions

View File

@ -157,6 +157,7 @@ type SyncManager struct {
msgChan chan interface{} msgChan chan interface{}
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
syncPeerLock sync.Mutex
// These fields should only be accessed from the messageHandler thread // These fields should only be accessed from the messageHandler thread
rejectedTxns map[daghash.TxID]struct{} rejectedTxns map[daghash.TxID]struct{}
@ -170,6 +171,8 @@ type SyncManager struct {
// download/sync the blockDAG from. When syncing is already running, it // download/sync the blockDAG from. When syncing is already running, it
// simply returns. It also examines the candidates for any which are no longer // simply returns. It also examines the candidates for any which are no longer
// candidates and removes them as needed. // candidates and removes them as needed.
//
// This function MUST be called with the sync peer lock held.
func (sm *SyncManager) startSync() { func (sm *SyncManager) startSync() {
// Return now if we're already syncing. // Return now if we're already syncing.
if sm.syncPeer != nil { if sm.syncPeer != nil {
@ -189,6 +192,7 @@ func (sm *SyncManager) startSync() {
// TODO(davec): Use a better algorithm to choose the sync peer. // TODO(davec): Use a better algorithm to choose the sync peer.
// For now, just pick the first available candidate. // For now, just pick the first available candidate.
syncPeer = peer syncPeer = peer
break
} }
// Start syncing from the sync peer if one was selected. // Start syncing from the sync peer if one was selected.
@ -294,8 +298,8 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
} }
// Start syncing by choosing the best candidate if needed. // Start syncing by choosing the best candidate if needed.
if isSyncCandidate && sm.syncPeer == nil { if isSyncCandidate {
sm.startSync() sm.restartSyncIfNeeded()
} }
} }
@ -337,7 +341,7 @@ func (sm *SyncManager) stopSyncFromPeer(peer *peerpkg.Peer) {
// sync peer. // sync peer.
if sm.syncPeer == peer { if sm.syncPeer == peer {
sm.syncPeer = nil sm.syncPeer = nil
sm.startSync() sm.restartSyncIfNeeded()
} }
} }
@ -427,24 +431,34 @@ func (sm *SyncManager) current() bool {
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any // restartSyncIfNeeded finds a new sync candidate if we're not expecting any
// blocks from the current one. // blocks from the current one.
func (sm *SyncManager) restartSyncIfNeeded() { func (sm *SyncManager) restartSyncIfNeeded() {
if sm.syncPeer != nil { sm.syncPeerLock.Lock()
syncPeerState, exists := sm.peerStates[sm.syncPeer] defer sm.syncPeerLock.Unlock()
if exists {
isWaitingForBlocks := func() bool { if !sm.shouldReplaceSyncPeer() {
syncPeerState.requestQueueMtx.Lock() return
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0
}()
if isWaitingForBlocks {
return
}
}
} }
sm.syncPeer = nil sm.syncPeer = nil
sm.startSync() sm.startSync()
} }
func (sm *SyncManager) shouldReplaceSyncPeer() bool {
if sm.syncPeer == nil {
return true
}
syncPeerState, exists := sm.peerStates[sm.syncPeer]
if !exists {
panic(errors.Errorf("no peer state for sync peer %s", sm.syncPeer))
}
syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) == 0 &&
len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) == 0 &&
!sm.syncPeer.WasBlockLocatorRequested()
}
// handleBlockMsg handles block messages from all peers. // handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer peer := bmsg.peer
@ -905,7 +919,7 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
return return
} }
peer.SetSelectedTipHash(selectedTipHash) peer.SetSelectedTipHash(selectedTipHash)
sm.startSync() sm.restartSyncIfNeeded()
} }
// messageHandler is the main handler for the sync manager. It must be run as a // messageHandler is the main handler for the sync manager. It must be run as a

View File

@ -414,6 +414,8 @@ type Peer struct {
prevGetBlockInvsLow *daghash.Hash prevGetBlockInvsLow *daghash.Hash
prevGetBlockInvsHigh *daghash.Hash prevGetBlockInvsHigh *daghash.Hash
wasBlockLocatorRequested bool
// These fields keep track of statistics for the peer and are protected // These fields keep track of statistics for the peer and are protected
// by the statsMtx mutex. // by the statsMtx mutex.
statsMtx sync.RWMutex statsMtx sync.RWMutex
@ -435,6 +437,20 @@ type Peer struct {
quit chan struct{} quit chan struct{}
} }
// WasBlockLocatorRequested returns whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) WasBlockLocatorRequested() bool {
return p.wasBlockLocatorRequested
}
// SetWasBlockLocatorRequested sets whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) SetWasBlockLocatorRequested(wasBlockLocatorRequested bool) {
p.wasBlockLocatorRequested = wasBlockLocatorRequested
}
// String returns the peer's address and directionality as a human-readable // String returns the peer's address and directionality as a human-readable
// string. // string.
// //
@ -775,6 +791,7 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress, subnetworkID *subnetwor
// //
// This function is safe for concurrent access. // This function is safe for concurrent access.
func (p *Peer) PushGetBlockLocatorMsg(highHash, lowHash *daghash.Hash) { func (p *Peer) PushGetBlockLocatorMsg(highHash, lowHash *daghash.Hash) {
p.SetWasBlockLocatorRequested(true)
msg := wire.NewMsgGetBlockLocator(highHash, lowHash) msg := wire.NewMsgGetBlockLocator(highHash, lowHash)
p.QueueMessage(msg, nil) p.QueueMessage(msg, nil)
} }

View File

@ -8,6 +8,7 @@ import (
// OnBlockLocator is invoked when a peer receives a locator kaspa // OnBlockLocator is invoked when a peer receives a locator kaspa
// message. // message.
func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) { func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) {
sp.SetWasBlockLocatorRequested(false)
// Find the highest known shared block between the peers, and asks // Find the highest known shared block between the peers, and asks
// the block and its future from the peer. If the block is not // the block and its future from the peer. If the block is not
// found, create a lower resolution block locator and send it to // found, create a lower resolution block locator and send it to