[NOD-1038] Give higher priority for requesting missing ancestors when sending a getdata message (#767)

This commit is contained in:
Ori Newman 2020-06-17 11:29:39 +03:00 committed by Mike Zak
parent bc0227b49b
commit 1271d2f113
6 changed files with 53 additions and 26 deletions

View File

@ -298,7 +298,7 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
// Initialize the peer state
isSyncCandidate := sm.isSyncCandidate(peer)
requestQueues := make(map[wire.InvType]*requestQueueAndSet)
requestQueueInvTypes := []wire.InvType{wire.InvTypeTx, wire.InvTypeBlock, wire.InvTypeSyncBlock}
requestQueueInvTypes := []wire.InvType{wire.InvTypeTx, wire.InvTypeBlock, wire.InvTypeSyncBlock, wire.InvTypeMissingAncestor}
for _, invType := range requestQueueInvTypes {
requestQueues[invType] = &requestQueueAndSet{
set: make(map[daghash.Hash]struct{}),
@ -351,8 +351,6 @@ func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
}
func (sm *SyncManager) stopSyncFromPeer(peer *peerpkg.Peer) {
// Attempt to find a new peer to sync from if the quitting peer is the
// sync peer.
if sm.syncPeer == peer {
sm.syncPeer = nil
sm.restartSyncIfNeeded()
@ -558,7 +556,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
blockHash, err)
return
}
sm.addBlocksToRequestQueue(state, missingAncestors, false)
sm.addBlocksToRequestQueue(state, missingAncestors, wire.InvTypeMissingAncestor)
} else {
// When the block is not an orphan, log information about it and
// update the DAG state.
@ -584,15 +582,11 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}
func (sm *SyncManager) addBlocksToRequestQueue(state *peerSyncState, hashes []*daghash.Hash, isRelayedInv bool) {
func (sm *SyncManager) addBlocksToRequestQueue(state *peerSyncState, hashes []*daghash.Hash, invType wire.InvType) {
state.requestQueueMtx.Lock()
defer state.requestQueueMtx.Unlock()
for _, hash := range hashes {
if _, exists := sm.requestedBlocks[*hash]; !exists {
invType := wire.InvTypeSyncBlock
if isRelayedInv {
invType = wire.InvTypeBlock
}
iv := wire.NewInvVect(invType, hash)
state.addInvToRequestQueueNoLock(iv)
}
@ -604,10 +598,13 @@ func (state *peerSyncState) addInvToRequestQueueNoLock(iv *wire.InvVect) {
if !ok {
panic(errors.Errorf("got unsupported inventory type %s", iv.Type))
}
if _, exists := requestQueue.set[*iv.Hash]; !exists {
requestQueue.set[*iv.Hash] = struct{}{}
requestQueue.queue = append(requestQueue.queue, iv)
if _, exists := requestQueue.set[*iv.Hash]; exists {
return
}
requestQueue.set[*iv.Hash] = struct{}{}
requestQueue.queue = append(requestQueue.queue, iv)
}
func (state *peerSyncState) addInvToRequestQueue(iv *wire.InvVect) {
@ -623,6 +620,8 @@ func (state *peerSyncState) addInvToRequestQueue(iv *wire.InvVect) {
// (either the main pool or orphan pool).
func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
switch invVect.Type {
case wire.InvTypeMissingAncestor:
fallthrough
case wire.InvTypeSyncBlock:
fallthrough
case wire.InvTypeBlock:
@ -698,6 +697,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
case wire.InvTypeSyncBlock:
case wire.InvTypeTx:
default:
log.Warnf("got unsupported inv type %s from %s", iv.Type, peer)
continue
}
@ -737,7 +737,8 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
if iv.IsBlockOrSyncBlock() {
if sm.dag.IsKnownInvalid(iv.Hash) {
peer.AddBanScoreAndPushRejectMsg(imsg.inv.Command(), wire.RejectInvalid, iv.Hash, peerpkg.BanScoreInvalidInvBlock, 0, fmt.Sprintf("sent inv of invalid block %s", iv.Hash))
peer.AddBanScoreAndPushRejectMsg(imsg.inv.Command(), wire.RejectInvalid, iv.Hash,
peerpkg.BanScoreInvalidInvBlock, 0, fmt.Sprintf("sent inv of invalid block %s", iv.Hash))
return
}
// The block is an orphan block that we already have.
@ -751,13 +752,22 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
// to signal there are more missing blocks that need to
// be requested.
if sm.dag.IsKnownOrphan(iv.Hash) {
if iv.Type == wire.InvTypeSyncBlock {
peer.AddBanScoreAndPushRejectMsg(imsg.inv.Command(), wire.RejectInvalid, iv.Hash,
peerpkg.BanScoreOrphanInvAsPartOfNetsync, 0,
fmt.Sprintf("sent inv of orphan block %s as part of netsync", iv.Hash))
// Whether the peer will be banned or not, syncing from a node that doesn't follow
// the netsync protocol is undesired.
sm.stopSyncFromPeer(peer)
return
}
missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(iv.Hash)
if err != nil {
log.Errorf("Failed to find missing ancestors for block %s: %s",
iv.Hash, err)
return
}
sm.addBlocksToRequestQueue(state, missingAncestors, iv.Type != wire.InvTypeSyncBlock)
sm.addBlocksToRequestQueue(state, missingAncestors, wire.InvTypeMissingAncestor)
continue
}
@ -831,6 +841,8 @@ func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData,
for _, iv := range invsToAdd {
delete(requestQueue.set, *iv.Hash)
switch invType {
case wire.InvTypeMissingAncestor:
addBlockInv(iv)
case wire.InvTypeSyncBlock:
addBlockInv(iv)
case wire.InvTypeBlock:
@ -859,13 +871,21 @@ func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData,
func (sm *SyncManager) sendInvsFromRequestQueue(peer *peerpkg.Peer, state *peerSyncState) error {
state.requestQueueMtx.Lock()
defer state.requestQueueMtx.Unlock()
if len(sm.requestedBlocks) != 0 {
return nil
}
gdmsg := wire.NewMsgGetData()
err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeSyncBlock, wire.MaxSyncBlockInvPerGetDataMsg)
if err != nil {
return err
}
if !sm.isSyncing || sm.isSynced() {
err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeBlock, wire.MaxInvPerGetDataMsg)
err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeMissingAncestor, wire.MaxInvPerGetDataMsg)
if err != nil {
return err
}
err = sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeBlock, wire.MaxInvPerGetDataMsg)
if err != nil {
return err
}

View File

@ -5,6 +5,7 @@ const (
BanScoreUnrequestedBlock = 100
BanScoreInvalidBlock = 100
BanScoreInvalidInvBlock = 100
BanScoreOrphanInvAsPartOfNetsync = 100
BanScoreMalformedBlueScoreInOrphan = 100
BanScoreUnrequestedSelectedTip = 20

View File

@ -46,6 +46,8 @@ func invSummary(invList []*wire.InvVect) string {
return fmt.Sprintf("block %s", iv.Hash)
case wire.InvTypeSyncBlock:
return fmt.Sprintf("sync block %s", iv.Hash)
case wire.InvTypeMissingAncestor:
return fmt.Sprintf("missing ancestor %s", iv.Hash)
case wire.InvTypeTx:
return fmt.Sprintf("tx %s", iv.Hash)
}

View File

@ -1558,7 +1558,7 @@ out:
// No handshake? They'll find out soon enough.
if p.VersionKnown() {
// If this is a new block, then we'll blast it
// out immediately, sipping the inv trickle
// out immediately, skipping the inv trickle
// queue.
if iv.Type == wire.InvTypeBlock {
invMsg := wire.NewMsgInvSizeHint(1)

View File

@ -44,6 +44,8 @@ func (sp *Peer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
err = sp.server.pushTxMsg(sp, (*daghash.TxID)(iv.Hash), c, waitChan)
case wire.InvTypeSyncBlock:
fallthrough
case wire.InvTypeMissingAncestor:
fallthrough
case wire.InvTypeBlock:
err = sp.server.pushBlockMsg(sp, iv.Hash, c, waitChan)
case wire.InvTypeFilteredBlock:

View File

@ -33,20 +33,22 @@ type InvType uint32
// These constants define the various supported inventory vector types.
const (
InvTypeError InvType = 0
InvTypeTx InvType = 1
InvTypeBlock InvType = 2
InvTypeFilteredBlock InvType = 3
InvTypeSyncBlock InvType = 4
InvTypeError InvType = 0
InvTypeTx InvType = 1
InvTypeBlock InvType = 2
InvTypeFilteredBlock InvType = 3
InvTypeSyncBlock InvType = 4
InvTypeMissingAncestor InvType = 5
)
// Map of service flags back to their constant names for pretty printing.
var ivStrings = map[InvType]string{
InvTypeError: "ERROR",
InvTypeTx: "MSG_TX",
InvTypeBlock: "MSG_BLOCK",
InvTypeFilteredBlock: "MSG_FILTERED_BLOCK",
InvTypeSyncBlock: "MSG_SYNC_BLOCK",
InvTypeError: "ERROR",
InvTypeTx: "MSG_TX",
InvTypeBlock: "MSG_BLOCK",
InvTypeFilteredBlock: "MSG_FILTERED_BLOCK",
InvTypeSyncBlock: "MSG_SYNC_BLOCK",
InvTypeMissingAncestor: "MSG_MISSING_ANCESTOR",
}
// String returns the InvType in human-readable form.