diff --git a/netsync/manager.go b/netsync/manager.go index bc1cefb20..d10e4e066 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -157,6 +157,7 @@ type SyncManager struct { msgChan chan interface{} wg sync.WaitGroup quit chan struct{} + syncPeerLock sync.Mutex // These fields should only be accessed from the messageHandler thread rejectedTxns map[daghash.TxID]struct{} @@ -170,6 +171,8 @@ type SyncManager struct { // download/sync the blockDAG from. When syncing is already running, it // simply returns. It also examines the candidates for any which are no longer // candidates and removes them as needed. +// +// This function MUST be called with the sync peer lock held. func (sm *SyncManager) startSync() { // Return now if we're already syncing. if sm.syncPeer != nil { @@ -189,6 +192,7 @@ func (sm *SyncManager) startSync() { // TODO(davec): Use a better algorithm to choose the sync peer. // For now, just pick the first available candidate. syncPeer = peer + break } // Start syncing from the sync peer if one was selected. @@ -294,8 +298,8 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { } // Start syncing by choosing the best candidate if needed. - if isSyncCandidate && sm.syncPeer == nil { - sm.startSync() + if isSyncCandidate { + sm.restartSyncIfNeeded() } } @@ -337,7 +341,7 @@ func (sm *SyncManager) stopSyncFromPeer(peer *peerpkg.Peer) { // sync peer. if sm.syncPeer == peer { sm.syncPeer = nil - sm.startSync() + sm.restartSyncIfNeeded() } } @@ -427,24 +431,34 @@ func (sm *SyncManager) current() bool { // restartSyncIfNeeded finds a new sync candidate if we're not expecting any // blocks from the current one. func (sm *SyncManager) restartSyncIfNeeded() { - if sm.syncPeer != nil { - syncPeerState, exists := sm.peerStates[sm.syncPeer] - if exists { - isWaitingForBlocks := func() bool { - syncPeerState.requestQueueMtx.Lock() - defer syncPeerState.requestQueueMtx.Unlock() - return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0 - }() - if isWaitingForBlocks { - return - } - } + sm.syncPeerLock.Lock() + defer sm.syncPeerLock.Unlock() + + if !sm.shouldReplaceSyncPeer() { + return } sm.syncPeer = nil sm.startSync() } +func (sm *SyncManager) shouldReplaceSyncPeer() bool { + if sm.syncPeer == nil { + return true + } + + syncPeerState, exists := sm.peerStates[sm.syncPeer] + if !exists { + panic(errors.Errorf("no peer state for sync peer %s", sm.syncPeer)) + } + + syncPeerState.requestQueueMtx.Lock() + defer syncPeerState.requestQueueMtx.Unlock() + return len(syncPeerState.requestedBlocks) == 0 && + len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) == 0 && + !sm.syncPeer.WasBlockLocatorRequested() +} + // handleBlockMsg handles block messages from all peers. func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { peer := bmsg.peer @@ -905,7 +919,7 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) { return } peer.SetSelectedTipHash(selectedTipHash) - sm.startSync() + sm.restartSyncIfNeeded() } // messageHandler is the main handler for the sync manager. It must be run as a diff --git a/peer/peer.go b/peer/peer.go index 6074aa794..a9559d684 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -414,6 +414,8 @@ type Peer struct { prevGetBlockInvsLow *daghash.Hash prevGetBlockInvsHigh *daghash.Hash + wasBlockLocatorRequested bool + // These fields keep track of statistics for the peer and are protected // by the statsMtx mutex. statsMtx sync.RWMutex @@ -435,6 +437,20 @@ type Peer struct { quit chan struct{} } +// WasBlockLocatorRequested returns whether the node +// is expecting to get a block locator from this +// peer. +func (p *Peer) WasBlockLocatorRequested() bool { + return p.wasBlockLocatorRequested +} + +// SetWasBlockLocatorRequested sets whether the node +// is expecting to get a block locator from this +// peer. +func (p *Peer) SetWasBlockLocatorRequested(wasBlockLocatorRequested bool) { + p.wasBlockLocatorRequested = wasBlockLocatorRequested +} + // String returns the peer's address and directionality as a human-readable // string. // @@ -775,6 +791,7 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress, subnetworkID *subnetwor // // This function is safe for concurrent access. func (p *Peer) PushGetBlockLocatorMsg(highHash, lowHash *daghash.Hash) { + p.SetWasBlockLocatorRequested(true) msg := wire.NewMsgGetBlockLocator(highHash, lowHash) p.QueueMessage(msg, nil) } diff --git a/server/p2p/on_block_locator.go b/server/p2p/on_block_locator.go index d043ffbb8..6934b16e9 100644 --- a/server/p2p/on_block_locator.go +++ b/server/p2p/on_block_locator.go @@ -8,6 +8,7 @@ import ( // OnBlockLocator is invoked when a peer receives a locator kaspa // message. func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) { + sp.SetWasBlockLocatorRequested(false) // Find the highest known shared block between the peers, and asks // the block and its future from the peer. If the block is not // found, create a lower resolution block locator and send it to