[NOD-970] Add isSyncing flag (#747)

* [NOD-970] Add isSyncing flag

* [NOD-970] Rename shouldSendSelectedTip->peerShouldSendSelectedTip
This commit is contained in:
Ori Newman 2020-06-07 16:31:17 +03:00 committed by GitHub
parent 96930bd6ea
commit 35c733a4c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -132,13 +132,13 @@ type requestQueueAndSet struct {
// peerSyncState stores additional information that the SyncManager tracks
// about a peer.
type peerSyncState struct {
syncCandidate bool
lastSelectedTipRequest time.Time
isPendingForSelectedTip bool
requestQueueMtx sync.Mutex
requestQueues map[wire.InvType]*requestQueueAndSet
requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{}
syncCandidate bool
lastSelectedTipRequest time.Time
peerShouldSendSelectedTip bool
requestQueueMtx sync.Mutex
requestQueues map[wire.InvType]*requestQueueAndSet
requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{}
}
// SyncManager is used to communicate block related messages with peers. The
@ -158,6 +158,7 @@ type SyncManager struct {
wg sync.WaitGroup
quit chan struct{}
syncPeerLock sync.Mutex
isSyncing bool
// These fields should only be accessed from the messageHandler thread
rejectedTxns map[daghash.TxID]struct{}
@ -206,13 +207,21 @@ func (sm *SyncManager) startSync() {
syncPeer.SelectedTipHash(), syncPeer.Addr())
syncPeer.PushGetBlockLocatorMsg(syncPeer.SelectedTipHash(), sm.dagParams.GenesisHash)
sm.isSyncing = true
sm.syncPeer = syncPeer
return
}
pendingForSelectedTips := false
if sm.shouldQueryPeerSelectedTips() {
sm.isSyncing = true
hasSyncCandidates := false
for peer, state := range sm.peerStates {
if state.peerShouldSendSelectedTip {
pendingForSelectedTips = true
continue
}
if !state.syncCandidate {
continue
}
@ -222,21 +231,26 @@ func (sm *SyncManager) startSync() {
continue
}
queueMsgGetSelectedTip(peer, state)
sm.queueMsgGetSelectedTip(peer, state)
pendingForSelectedTips = true
}
if !hasSyncCandidates {
log.Warnf("No sync peer candidates available")
}
}
if !pendingForSelectedTips {
sm.isSyncing = false
}
}
func (sm *SyncManager) shouldQueryPeerSelectedTips() bool {
return sm.dag.Now().Sub(sm.dag.CalcPastMedianTime()) > minDAGTimeDelay
}
func queueMsgGetSelectedTip(peer *peerpkg.Peer, state *peerSyncState) {
func (sm *SyncManager) queueMsgGetSelectedTip(peer *peerpkg.Peer, state *peerSyncState) {
state.lastSelectedTipRequest = time.Now()
state.isPendingForSelectedTip = true
state.peerShouldSendSelectedTip = true
peer.QueueMessage(wire.NewMsgGetSelectedTip(), nil)
}
@ -417,17 +431,6 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
}
// current returns true if we believe we are synced with our peers, false if we
// still have blocks to check
//
// We consider ourselves current iff both of the following are true:
// 1. there's no syncPeer, a.k.a. all connected peers are at the same tip
// 2. the DAG considers itself current - to prevent attacks where a peer sends an
// unknown tip but never lets us sync to it.
func (sm *SyncManager) current() bool {
return sm.syncPeer == nil && sm.dag.IsCurrent()
}
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any
// blocks from the current one.
func (sm *SyncManager) restartSyncIfNeeded() {
@ -763,7 +766,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
log.Errorf("Failed to send invs from queue: %s", err)
}
if haveUnknownInvBlock && !sm.current() {
if haveUnknownInvBlock && !sm.isSyncing {
// If one of the inv messages is an unknown block
// it is an indication that one of our peers has more
// up-to-date data than us.
@ -848,7 +851,8 @@ func (sm *SyncManager) sendInvsFromRequestQueue(peer *peerpkg.Peer, state *peerS
if err != nil {
return err
}
if sm.syncPeer == nil || sm.isSynced() {
if !sm.isSyncing || sm.isSynced() {
log.Criticalf("wtf? sm.isSyncing: %t sm.isSynced: %t", sm.isSyncing, sm.isSynced())
err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeBlock, wire.MaxInvPerGetDataMsg)
if err != nil {
return err
@ -918,12 +922,12 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
peer := msg.peer
selectedTipHash := msg.selectedTipHash
state := sm.peerStates[peer]
if !state.isPendingForSelectedTip {
if !state.peerShouldSendSelectedTip {
log.Warnf("Got unrequested selected tip message from %s -- "+
"disconnecting", peer.Addr())
peer.Disconnect()
}
state.isPendingForSelectedTip = false
state.peerShouldSendSelectedTip = false
if selectedTipHash.IsEqual(peer.SelectedTipHash()) {
return
}
@ -1028,7 +1032,7 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific
// Relay if we are current and the block was not just now unorphaned.
// Otherwise peers that are current should already know about it
if sm.current() && !data.WasUnorphaned {
if sm.isSynced() && !data.WasUnorphaned {
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
}