diff --git a/app/protocol/flows/v3/blockrelay/block_locator.go b/app/protocol/flows/v3/blockrelay/block_locator.go index bf395b4c5..70b18e6ea 100644 --- a/app/protocol/flows/v3/blockrelay/block_locator.go +++ b/app/protocol/flows/v3/blockrelay/block_locator.go @@ -13,15 +13,21 @@ func (flow *handleRelayInvsFlow) sendGetBlockLocator(highHash *externalapi.Domai } func (flow *handleRelayInvsFlow) receiveBlockLocator() (blockLocatorHashes []*externalapi.DomainHash, err error) { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) - if err != nil { - return nil, err + for { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + if err != nil { + return nil, err + } + + switch message := message.(type) { + case *appmessage.MsgInvRelayBlock: + flow.invsQueue = append(flow.invsQueue, message) + case *appmessage.MsgBlockLocator: + return message.BlockLocatorHashes, nil + default: + return nil, + protocolerrors.Errorf(true, "received unexpected message type. "+ + "expected: %s, got: %s", appmessage.CmdBlockLocator, message.Command()) + } } - msgBlockLocator, ok := message.(*appmessage.MsgBlockLocator) - if !ok { - return nil, - protocolerrors.Errorf(true, "received unexpected message type. "+ - "expected: %s, got: %s", appmessage.CmdBlockLocator, message.Command()) - } - return msgBlockLocator.BlockLocatorHashes, nil } diff --git a/app/protocol/flows/v3/blockrelay/handle_relay_invs.go b/app/protocol/flows/v3/blockrelay/handle_relay_invs.go index 6143608ff..227416f9e 100644 --- a/app/protocol/flows/v3/blockrelay/handle_relay_invs.go +++ b/app/protocol/flows/v3/blockrelay/handle_relay_invs.go @@ -33,8 +33,6 @@ type RelayInvsContext interface { GetOrphanRoots(orphanHash *externalapi.DomainHash) ([]*externalapi.DomainHash, bool, error) IsOrphan(blockHash *externalapi.DomainHash) bool IsIBDRunning() bool - TrySetIBDRunning(ibdPeer *peerpkg.Peer) bool - UnsetIBDRunning() IsRecoverableError(err error) bool } @@ -57,7 +55,10 @@ func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outg peer: peer, invsQueue: make([]*appmessage.MsgInvRelayBlock, 0), } - return flow.start() + err := flow.start() + // Currently, HandleRelayInvs flow is the only place where IBD is triggered, so the channel can be closed now + close(peer.IBDRequestChannel()) + return err } func (flow *handleRelayInvsFlow) start() error { @@ -306,7 +307,14 @@ func (flow *handleRelayInvsFlow) processOrphan(block *externalapi.DomainBlock) e // Start IBD unless we already are in IBD log.Debugf("Block %s is out of orphan resolution range. "+ "Attempting to start IBD against it.", blockHash) - return flow.runIBDIfNotRunning(block) + + // Send the block to IBD flow via the IBDRequestChannel. + // Note that this is a non-blocking send, since if IBD is already running, there is no need to trigger it + select { + case flow.peer.IBDRequestChannel() <- block: + default: + } + return nil } func (flow *handleRelayInvsFlow) isGenesisVirtualSelectedParent() (bool, error) { diff --git a/app/protocol/flows/v3/blockrelay/ibd.go b/app/protocol/flows/v3/blockrelay/ibd.go index 5d22aa000..9f76d82e3 100644 --- a/app/protocol/flows/v3/blockrelay/ibd.go +++ b/app/protocol/flows/v3/blockrelay/ibd.go @@ -1,22 +1,69 @@ package blockrelay import ( - "time" - - "github.com/kaspanet/kaspad/infrastructure/logger" - - "github.com/kaspanet/kaspad/domain/consensus/model" - "github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/protocol/common" + peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" "github.com/kaspanet/kaspad/app/protocol/protocolerrors" + "github.com/kaspanet/kaspad/domain" + "github.com/kaspanet/kaspad/domain/consensus/model" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/ruleerrors" "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" + "github.com/kaspanet/kaspad/infrastructure/config" + "github.com/kaspanet/kaspad/infrastructure/logger" + "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "time" ) -func (flow *handleRelayInvsFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) error { +// IBDContext is the interface for the context needed for the HandleIBD flow. +type IBDContext interface { + Domain() domain.Domain + Config() *config.Config + OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error + OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error + OnPruningPointUTXOSetOverride() error + IsIBDRunning() bool + TrySetIBDRunning(ibdPeer *peerpkg.Peer) bool + UnsetIBDRunning() + IsRecoverableError(err error) bool +} + +type handleIBDFlow struct { + IBDContext + incomingRoute, outgoingRoute *router.Route + peer *peerpkg.Peer +} + +// HandleIBD handles IBD +func HandleIBD(context IBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer) error { + + flow := &handleIBDFlow{ + IBDContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + peer: peer, + } + return flow.start() +} + +func (flow *handleIBDFlow) start() error { + for { + // Wait for IBD requests triggered by other flows + block, ok := <-flow.peer.IBDRequestChannel() + if !ok { + return nil + } + err := flow.runIBDIfNotRunning(block) + if err != nil { + return err + } + } +} + +func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) error { wasIBDNotRunning := flow.TrySetIBDRunning(flow.peer) if !wasIBDNotRunning { log.Debugf("IBD is already running") @@ -84,7 +131,16 @@ func (flow *handleRelayInvsFlow) runIBDIfNotRunning(block *externalapi.DomainBlo return nil } -func (flow *handleRelayInvsFlow) logIBDFinished(isFinishedSuccessfully bool) { +func (flow *handleIBDFlow) isGenesisVirtualSelectedParent() (bool, error) { + virtualSelectedParent, err := flow.Domain().Consensus().GetVirtualSelectedParent() + if err != nil { + return false, err + } + + return virtualSelectedParent.Equal(flow.Config().NetParams().GenesisHash), nil +} + +func (flow *handleIBDFlow) logIBDFinished(isFinishedSuccessfully bool) { successString := "successfully" if !isFinishedSuccessfully { successString = "(interrupted)" @@ -95,7 +151,7 @@ func (flow *handleRelayInvsFlow) logIBDFinished(isFinishedSuccessfully bool) { // findHighestSharedBlock attempts to find the highest shared block between the peer // and this node. This method may fail because the peer and us have conflicting pruning // points. In that case we return (nil, false, nil) so that we may stop IBD gracefully. -func (flow *handleRelayInvsFlow) findHighestSharedBlockHash( +func (flow *handleIBDFlow) findHighestSharedBlockHash( targetHash *externalapi.DomainHash) (*externalapi.DomainHash, bool, error) { log.Debugf("Sending a blockLocator to %s between pruning point and headers selected tip", flow.peer) @@ -138,7 +194,7 @@ func (flow *handleRelayInvsFlow) findHighestSharedBlockHash( } } -func (flow *handleRelayInvsFlow) nextBlockLocator(lowHash, highHash *externalapi.DomainHash) (externalapi.BlockLocator, error) { +func (flow *handleIBDFlow) nextBlockLocator(lowHash, highHash *externalapi.DomainHash) (externalapi.BlockLocator, error) { log.Debugf("Sending a blockLocator to %s between %s and %s", flow.peer, lowHash, highHash) blockLocator, err := flow.Domain().Consensus().CreateHeadersSelectedChainBlockLocator(lowHash, highHash) if err != nil { @@ -156,7 +212,7 @@ func (flow *handleRelayInvsFlow) nextBlockLocator(lowHash, highHash *externalapi return blockLocator, nil } -func (flow *handleRelayInvsFlow) findHighestHashIndex( +func (flow *handleIBDFlow) findHighestHashIndex( highestHash *externalapi.DomainHash, blockLocator externalapi.BlockLocator) (int, error) { highestHashIndex := 0 @@ -181,7 +237,7 @@ func (flow *handleRelayInvsFlow) findHighestHashIndex( // fetchHighestHash attempts to fetch the highest hash the peer knows amongst the given // blockLocator. This method may fail because the peer and us have conflicting pruning // points. In that case we return (nil, false, nil) so that we may stop IBD gracefully. -func (flow *handleRelayInvsFlow) fetchHighestHash( +func (flow *handleIBDFlow) fetchHighestHash( targetHash *externalapi.DomainHash, blockLocator externalapi.BlockLocator) (*externalapi.DomainHash, bool, error) { ibdBlockLocatorMessage := appmessage.NewMsgIBDBlockLocator(targetHash, blockLocator) @@ -189,7 +245,7 @@ func (flow *handleRelayInvsFlow) fetchHighestHash( if err != nil { return nil, false, err } - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, false, err } @@ -209,7 +265,7 @@ func (flow *handleRelayInvsFlow) fetchHighestHash( } } -func (flow *handleRelayInvsFlow) syncPruningPointFutureHeaders(consensus externalapi.Consensus, highestSharedBlockHash *externalapi.DomainHash, +func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.Consensus, highestSharedBlockHash *externalapi.DomainHash, highHash *externalapi.DomainHash) error { log.Infof("Downloading headers from %s", flow.peer) @@ -273,15 +329,15 @@ func (flow *handleRelayInvsFlow) syncPruningPointFutureHeaders(consensus externa } } -func (flow *handleRelayInvsFlow) sendRequestHeaders(highestSharedBlockHash *externalapi.DomainHash, +func (flow *handleIBDFlow) sendRequestHeaders(highestSharedBlockHash *externalapi.DomainHash, peerSelectedTipHash *externalapi.DomainHash) error { msgGetBlockInvs := appmessage.NewMsgRequstHeaders(highestSharedBlockHash, peerSelectedTipHash) return flow.outgoingRoute.Enqueue(msgGetBlockInvs) } -func (flow *handleRelayInvsFlow) receiveHeaders() (msgIBDBlock *appmessage.BlockHeadersMessage, doneHeaders bool, err error) { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) +func (flow *handleIBDFlow) receiveHeaders() (msgIBDBlock *appmessage.BlockHeadersMessage, doneHeaders bool, err error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, false, err } @@ -300,7 +356,7 @@ func (flow *handleRelayInvsFlow) receiveHeaders() (msgIBDBlock *appmessage.Block } } -func (flow *handleRelayInvsFlow) processHeader(consensus externalapi.Consensus, msgBlockHeader *appmessage.MsgBlockHeader) error { +func (flow *handleIBDFlow) processHeader(consensus externalapi.Consensus, msgBlockHeader *appmessage.MsgBlockHeader) error { header := appmessage.BlockHeaderToDomainBlockHeader(msgBlockHeader) block := &externalapi.DomainBlock{ Header: header, @@ -333,7 +389,7 @@ func (flow *handleRelayInvsFlow) processHeader(consensus externalapi.Consensus, return nil } -func (flow *handleRelayInvsFlow) validatePruningPointFutureHeaderTimestamps() error { +func (flow *handleIBDFlow) validatePruningPointFutureHeaderTimestamps() error { headerSelectedTipHash, err := flow.Domain().StagingConsensus().GetHeadersSelectedTip() if err != nil { return err @@ -367,7 +423,7 @@ func (flow *handleRelayInvsFlow) validatePruningPointFutureHeaderTimestamps() er return nil } -func (flow *handleRelayInvsFlow) receiveAndInsertPruningPointUTXOSet( +func (flow *handleIBDFlow) receiveAndInsertPruningPointUTXOSet( consensus externalapi.Consensus, pruningPointHash *externalapi.DomainHash) (bool, error) { onEnd := logger.LogAndMeasureExecutionTime(log, "receiveAndInsertPruningPointUTXOSet") @@ -376,7 +432,7 @@ func (flow *handleRelayInvsFlow) receiveAndInsertPruningPointUTXOSet( receivedChunkCount := 0 receivedUTXOCount := 0 for { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return false, err } @@ -422,7 +478,7 @@ func (flow *handleRelayInvsFlow) receiveAndInsertPruningPointUTXOSet( } } -func (flow *handleRelayInvsFlow) syncMissingBlockBodies(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHash) error { hashes, err := flow.Domain().Consensus().GetMissingBlockBodyHashes(highHash) if err != nil { return err @@ -449,7 +505,7 @@ func (flow *handleRelayInvsFlow) syncMissingBlockBodies(highHash *externalapi.Do } for _, expectedHash := range hashesToRequest { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return err } @@ -489,7 +545,16 @@ func (flow *handleRelayInvsFlow) syncMissingBlockBodies(highHash *externalapi.Do return flow.resolveVirtual() } -func (flow *handleRelayInvsFlow) resolveVirtual() error { +func (flow *handleIBDFlow) banIfBlockIsHeaderOnly(block *externalapi.DomainBlock) error { + if len(block.Transactions) == 0 { + return protocolerrors.Errorf(true, "sent header of %s block where expected block with body", + consensushashing.BlockHash(block)) + } + + return nil +} + +func (flow *handleIBDFlow) resolveVirtual() error { for i := 0; ; i++ { if i%10 == 0 { log.Infof("Resolving virtual. This may take some time...") @@ -510,18 +575,3 @@ func (flow *handleRelayInvsFlow) resolveVirtual() error { } } } - -// dequeueIncomingMessageAndSkipInvs is a convenience method to be used during -// IBD. Inv messages are expected to arrive at any given moment, but should be -// ignored while we're in IBD -func (flow *handleRelayInvsFlow) dequeueIncomingMessageAndSkipInvs(timeout time.Duration) (appmessage.Message, error) { - for { - message, err := flow.incomingRoute.DequeueWithTimeout(timeout) - if err != nil { - return nil, err - } - if _, ok := message.(*appmessage.MsgInvRelayBlock); !ok { - return message, nil - } - } -} diff --git a/app/protocol/flows/v3/blockrelay/ibd_with_headers_proof.go b/app/protocol/flows/v3/blockrelay/ibd_with_headers_proof.go index 97a1a1d9e..e0b0d300f 100644 --- a/app/protocol/flows/v3/blockrelay/ibd_with_headers_proof.go +++ b/app/protocol/flows/v3/blockrelay/ibd_with_headers_proof.go @@ -11,7 +11,7 @@ import ( "github.com/pkg/errors" ) -func (flow *handleRelayInvsFlow) ibdWithHeadersProof(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) ibdWithHeadersProof(highHash *externalapi.DomainHash) error { err := flow.Domain().InitStagingConsensus() if err != nil { return err @@ -44,7 +44,7 @@ func (flow *handleRelayInvsFlow) ibdWithHeadersProof(highHash *externalapi.Domai return nil } -func (flow *handleRelayInvsFlow) shouldSyncAndShouldDownloadHeadersProof(highBlock *externalapi.DomainBlock, +func (flow *handleIBDFlow) shouldSyncAndShouldDownloadHeadersProof(highBlock *externalapi.DomainBlock, highestSharedBlockFound bool) (shouldDownload, shouldSync bool, err error) { if !highestSharedBlockFound { @@ -63,7 +63,7 @@ func (flow *handleRelayInvsFlow) shouldSyncAndShouldDownloadHeadersProof(highBlo return false, true, nil } -func (flow *handleRelayInvsFlow) checkIfHighHashHasMoreBlueWorkThanSelectedTipAndPruningDepthMoreBlueScore(highBlock *externalapi.DomainBlock) (bool, error) { +func (flow *handleIBDFlow) checkIfHighHashHasMoreBlueWorkThanSelectedTipAndPruningDepthMoreBlueScore(highBlock *externalapi.DomainBlock) (bool, error) { headersSelectedTip, err := flow.Domain().Consensus().GetHeadersSelectedTip() if err != nil { return false, err @@ -81,13 +81,13 @@ func (flow *handleRelayInvsFlow) checkIfHighHashHasMoreBlueWorkThanSelectedTipAn return highBlock.Header.BlueWork().Cmp(headersSelectedTipInfo.BlueWork) > 0, nil } -func (flow *handleRelayInvsFlow) syncAndValidatePruningPointProof() (*externalapi.DomainHash, error) { +func (flow *handleIBDFlow) syncAndValidatePruningPointProof() (*externalapi.DomainHash, error) { log.Infof("Downloading the pruning point proof from %s", flow.peer) err := flow.outgoingRoute.Enqueue(appmessage.NewMsgRequestPruningPointProof()) if err != nil { return nil, err } - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, err } @@ -113,7 +113,7 @@ func (flow *handleRelayInvsFlow) syncAndValidatePruningPointProof() (*externalap return consensushashing.HeaderHash(pruningPointProof.Headers[0][len(pruningPointProof.Headers[0])-1]), nil } -func (flow *handleRelayInvsFlow) downloadHeadersAndPruningUTXOSet(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) downloadHeadersAndPruningUTXOSet(highHash *externalapi.DomainHash) error { proofPruningPoint, err := flow.syncAndValidatePruningPointProof() if err != nil { return err @@ -164,7 +164,7 @@ func (flow *handleRelayInvsFlow) downloadHeadersAndPruningUTXOSet(highHash *exte return nil } -func (flow *handleRelayInvsFlow) syncPruningPointsAndPruningPointAnticone(proofPruningPoint *externalapi.DomainHash) error { +func (flow *handleIBDFlow) syncPruningPointsAndPruningPointAnticone(proofPruningPoint *externalapi.DomainHash) error { log.Infof("Downloading the past pruning points and the pruning point anticone from %s", flow.peer) err := flow.outgoingRoute.Enqueue(appmessage.NewMsgRequestPruningPointAndItsAnticone()) if err != nil { @@ -214,15 +214,15 @@ func (flow *handleRelayInvsFlow) syncPruningPointsAndPruningPointAnticone(proofP return nil } -func (flow *handleRelayInvsFlow) processBlockWithTrustedData( +func (flow *handleIBDFlow) processBlockWithTrustedData( consensus externalapi.Consensus, block *appmessage.MsgBlockWithTrustedData) error { _, err := consensus.ValidateAndInsertBlockWithTrustedData(appmessage.BlockWithTrustedDataToDomainBlockWithTrustedData(block), false) return err } -func (flow *handleRelayInvsFlow) receiveBlockWithTrustedData() (*appmessage.MsgBlockWithTrustedData, bool, error) { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) +func (flow *handleIBDFlow) receiveBlockWithTrustedData() (*appmessage.MsgBlockWithTrustedData, bool, error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, false, err } @@ -242,8 +242,8 @@ func (flow *handleRelayInvsFlow) receiveBlockWithTrustedData() (*appmessage.MsgB } } -func (flow *handleRelayInvsFlow) receivePruningPoints() (*appmessage.MsgPruningPoints, error) { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) +func (flow *handleIBDFlow) receivePruningPoints() (*appmessage.MsgPruningPoints, error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, err } @@ -258,7 +258,7 @@ func (flow *handleRelayInvsFlow) receivePruningPoints() (*appmessage.MsgPruningP return msgPruningPoints, nil } -func (flow *handleRelayInvsFlow) validateAndInsertPruningPoints(proofPruningPoint *externalapi.DomainHash) error { +func (flow *handleIBDFlow) validateAndInsertPruningPoints(proofPruningPoint *externalapi.DomainHash) error { currentPruningPoint, err := flow.Domain().Consensus().PruningPoint() if err != nil { return err @@ -302,7 +302,7 @@ func (flow *handleRelayInvsFlow) validateAndInsertPruningPoints(proofPruningPoin return nil } -func (flow *handleRelayInvsFlow) syncPruningPointUTXOSet(consensus externalapi.Consensus, +func (flow *handleIBDFlow) syncPruningPointUTXOSet(consensus externalapi.Consensus, pruningPoint *externalapi.DomainHash) (bool, error) { log.Infof("Checking if the suggested pruning point %s is compatible to the node DAG", pruningPoint) @@ -330,7 +330,7 @@ func (flow *handleRelayInvsFlow) syncPruningPointUTXOSet(consensus externalapi.C return true, nil } -func (flow *handleRelayInvsFlow) fetchMissingUTXOSet(consensus externalapi.Consensus, pruningPointHash *externalapi.DomainHash) (succeed bool, err error) { +func (flow *handleIBDFlow) fetchMissingUTXOSet(consensus externalapi.Consensus, pruningPointHash *externalapi.DomainHash) (succeed bool, err error) { defer func() { err := flow.Domain().StagingConsensus().ClearImportedPruningPointData() if err != nil { diff --git a/app/protocol/flows/v3/register.go b/app/protocol/flows/v3/register.go index f3efd1a42..3f55a9a2b 100644 --- a/app/protocol/flows/v3/register.go +++ b/app/protocol/flows/v3/register.go @@ -58,6 +58,14 @@ func registerBlockRelayFlows(m protocolManager, router *routerpkg.Router, isStop m.RegisterFlow("HandleRelayInvs", router, []appmessage.MessageCommand{ appmessage.CmdInvRelayBlock, appmessage.CmdBlock, appmessage.CmdBlockLocator, + }, + isStopping, errChan, func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error { + return blockrelay.HandleRelayInvs(m.Context(), incomingRoute, + outgoingRoute, peer) + }, + ), + + m.RegisterFlow("HandleIBD", router, []appmessage.MessageCommand{ appmessage.CmdDoneHeaders, appmessage.CmdUnexpectedPruningPoint, appmessage.CmdPruningPointUTXOSetChunk, appmessage.CmdBlockHeaders, appmessage.CmdIBDBlockLocatorHighestHash, appmessage.CmdBlockWithTrustedData, appmessage.CmdDoneBlocksWithTrustedData, appmessage.CmdIBDBlockLocatorHighestHashNotFound, @@ -65,7 +73,7 @@ func registerBlockRelayFlows(m protocolManager, router *routerpkg.Router, isStop appmessage.CmdPruningPointProof, }, isStopping, errChan, func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error { - return blockrelay.HandleRelayInvs(m.Context(), incomingRoute, + return blockrelay.HandleIBD(m.Context(), incomingRoute, outgoingRoute, peer) }, ), diff --git a/app/protocol/flows/v4/blockrelay/block_locator.go b/app/protocol/flows/v4/blockrelay/block_locator.go index bf395b4c5..70b18e6ea 100644 --- a/app/protocol/flows/v4/blockrelay/block_locator.go +++ b/app/protocol/flows/v4/blockrelay/block_locator.go @@ -13,15 +13,21 @@ func (flow *handleRelayInvsFlow) sendGetBlockLocator(highHash *externalapi.Domai } func (flow *handleRelayInvsFlow) receiveBlockLocator() (blockLocatorHashes []*externalapi.DomainHash, err error) { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) - if err != nil { - return nil, err + for { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + if err != nil { + return nil, err + } + + switch message := message.(type) { + case *appmessage.MsgInvRelayBlock: + flow.invsQueue = append(flow.invsQueue, message) + case *appmessage.MsgBlockLocator: + return message.BlockLocatorHashes, nil + default: + return nil, + protocolerrors.Errorf(true, "received unexpected message type. "+ + "expected: %s, got: %s", appmessage.CmdBlockLocator, message.Command()) + } } - msgBlockLocator, ok := message.(*appmessage.MsgBlockLocator) - if !ok { - return nil, - protocolerrors.Errorf(true, "received unexpected message type. "+ - "expected: %s, got: %s", appmessage.CmdBlockLocator, message.Command()) - } - return msgBlockLocator.BlockLocatorHashes, nil } diff --git a/app/protocol/flows/v4/blockrelay/handle_relay_invs.go b/app/protocol/flows/v4/blockrelay/handle_relay_invs.go index 6143608ff..227416f9e 100644 --- a/app/protocol/flows/v4/blockrelay/handle_relay_invs.go +++ b/app/protocol/flows/v4/blockrelay/handle_relay_invs.go @@ -33,8 +33,6 @@ type RelayInvsContext interface { GetOrphanRoots(orphanHash *externalapi.DomainHash) ([]*externalapi.DomainHash, bool, error) IsOrphan(blockHash *externalapi.DomainHash) bool IsIBDRunning() bool - TrySetIBDRunning(ibdPeer *peerpkg.Peer) bool - UnsetIBDRunning() IsRecoverableError(err error) bool } @@ -57,7 +55,10 @@ func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outg peer: peer, invsQueue: make([]*appmessage.MsgInvRelayBlock, 0), } - return flow.start() + err := flow.start() + // Currently, HandleRelayInvs flow is the only place where IBD is triggered, so the channel can be closed now + close(peer.IBDRequestChannel()) + return err } func (flow *handleRelayInvsFlow) start() error { @@ -306,7 +307,14 @@ func (flow *handleRelayInvsFlow) processOrphan(block *externalapi.DomainBlock) e // Start IBD unless we already are in IBD log.Debugf("Block %s is out of orphan resolution range. "+ "Attempting to start IBD against it.", blockHash) - return flow.runIBDIfNotRunning(block) + + // Send the block to IBD flow via the IBDRequestChannel. + // Note that this is a non-blocking send, since if IBD is already running, there is no need to trigger it + select { + case flow.peer.IBDRequestChannel() <- block: + default: + } + return nil } func (flow *handleRelayInvsFlow) isGenesisVirtualSelectedParent() (bool, error) { diff --git a/app/protocol/flows/v4/blockrelay/ibd.go b/app/protocol/flows/v4/blockrelay/ibd.go index 5d22aa000..9f76d82e3 100644 --- a/app/protocol/flows/v4/blockrelay/ibd.go +++ b/app/protocol/flows/v4/blockrelay/ibd.go @@ -1,22 +1,69 @@ package blockrelay import ( - "time" - - "github.com/kaspanet/kaspad/infrastructure/logger" - - "github.com/kaspanet/kaspad/domain/consensus/model" - "github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/protocol/common" + peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" "github.com/kaspanet/kaspad/app/protocol/protocolerrors" + "github.com/kaspanet/kaspad/domain" + "github.com/kaspanet/kaspad/domain/consensus/model" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/ruleerrors" "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" + "github.com/kaspanet/kaspad/infrastructure/config" + "github.com/kaspanet/kaspad/infrastructure/logger" + "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "time" ) -func (flow *handleRelayInvsFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) error { +// IBDContext is the interface for the context needed for the HandleIBD flow. +type IBDContext interface { + Domain() domain.Domain + Config() *config.Config + OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error + OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error + OnPruningPointUTXOSetOverride() error + IsIBDRunning() bool + TrySetIBDRunning(ibdPeer *peerpkg.Peer) bool + UnsetIBDRunning() + IsRecoverableError(err error) bool +} + +type handleIBDFlow struct { + IBDContext + incomingRoute, outgoingRoute *router.Route + peer *peerpkg.Peer +} + +// HandleIBD handles IBD +func HandleIBD(context IBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer) error { + + flow := &handleIBDFlow{ + IBDContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + peer: peer, + } + return flow.start() +} + +func (flow *handleIBDFlow) start() error { + for { + // Wait for IBD requests triggered by other flows + block, ok := <-flow.peer.IBDRequestChannel() + if !ok { + return nil + } + err := flow.runIBDIfNotRunning(block) + if err != nil { + return err + } + } +} + +func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) error { wasIBDNotRunning := flow.TrySetIBDRunning(flow.peer) if !wasIBDNotRunning { log.Debugf("IBD is already running") @@ -84,7 +131,16 @@ func (flow *handleRelayInvsFlow) runIBDIfNotRunning(block *externalapi.DomainBlo return nil } -func (flow *handleRelayInvsFlow) logIBDFinished(isFinishedSuccessfully bool) { +func (flow *handleIBDFlow) isGenesisVirtualSelectedParent() (bool, error) { + virtualSelectedParent, err := flow.Domain().Consensus().GetVirtualSelectedParent() + if err != nil { + return false, err + } + + return virtualSelectedParent.Equal(flow.Config().NetParams().GenesisHash), nil +} + +func (flow *handleIBDFlow) logIBDFinished(isFinishedSuccessfully bool) { successString := "successfully" if !isFinishedSuccessfully { successString = "(interrupted)" @@ -95,7 +151,7 @@ func (flow *handleRelayInvsFlow) logIBDFinished(isFinishedSuccessfully bool) { // findHighestSharedBlock attempts to find the highest shared block between the peer // and this node. This method may fail because the peer and us have conflicting pruning // points. In that case we return (nil, false, nil) so that we may stop IBD gracefully. -func (flow *handleRelayInvsFlow) findHighestSharedBlockHash( +func (flow *handleIBDFlow) findHighestSharedBlockHash( targetHash *externalapi.DomainHash) (*externalapi.DomainHash, bool, error) { log.Debugf("Sending a blockLocator to %s between pruning point and headers selected tip", flow.peer) @@ -138,7 +194,7 @@ func (flow *handleRelayInvsFlow) findHighestSharedBlockHash( } } -func (flow *handleRelayInvsFlow) nextBlockLocator(lowHash, highHash *externalapi.DomainHash) (externalapi.BlockLocator, error) { +func (flow *handleIBDFlow) nextBlockLocator(lowHash, highHash *externalapi.DomainHash) (externalapi.BlockLocator, error) { log.Debugf("Sending a blockLocator to %s between %s and %s", flow.peer, lowHash, highHash) blockLocator, err := flow.Domain().Consensus().CreateHeadersSelectedChainBlockLocator(lowHash, highHash) if err != nil { @@ -156,7 +212,7 @@ func (flow *handleRelayInvsFlow) nextBlockLocator(lowHash, highHash *externalapi return blockLocator, nil } -func (flow *handleRelayInvsFlow) findHighestHashIndex( +func (flow *handleIBDFlow) findHighestHashIndex( highestHash *externalapi.DomainHash, blockLocator externalapi.BlockLocator) (int, error) { highestHashIndex := 0 @@ -181,7 +237,7 @@ func (flow *handleRelayInvsFlow) findHighestHashIndex( // fetchHighestHash attempts to fetch the highest hash the peer knows amongst the given // blockLocator. This method may fail because the peer and us have conflicting pruning // points. In that case we return (nil, false, nil) so that we may stop IBD gracefully. -func (flow *handleRelayInvsFlow) fetchHighestHash( +func (flow *handleIBDFlow) fetchHighestHash( targetHash *externalapi.DomainHash, blockLocator externalapi.BlockLocator) (*externalapi.DomainHash, bool, error) { ibdBlockLocatorMessage := appmessage.NewMsgIBDBlockLocator(targetHash, blockLocator) @@ -189,7 +245,7 @@ func (flow *handleRelayInvsFlow) fetchHighestHash( if err != nil { return nil, false, err } - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, false, err } @@ -209,7 +265,7 @@ func (flow *handleRelayInvsFlow) fetchHighestHash( } } -func (flow *handleRelayInvsFlow) syncPruningPointFutureHeaders(consensus externalapi.Consensus, highestSharedBlockHash *externalapi.DomainHash, +func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.Consensus, highestSharedBlockHash *externalapi.DomainHash, highHash *externalapi.DomainHash) error { log.Infof("Downloading headers from %s", flow.peer) @@ -273,15 +329,15 @@ func (flow *handleRelayInvsFlow) syncPruningPointFutureHeaders(consensus externa } } -func (flow *handleRelayInvsFlow) sendRequestHeaders(highestSharedBlockHash *externalapi.DomainHash, +func (flow *handleIBDFlow) sendRequestHeaders(highestSharedBlockHash *externalapi.DomainHash, peerSelectedTipHash *externalapi.DomainHash) error { msgGetBlockInvs := appmessage.NewMsgRequstHeaders(highestSharedBlockHash, peerSelectedTipHash) return flow.outgoingRoute.Enqueue(msgGetBlockInvs) } -func (flow *handleRelayInvsFlow) receiveHeaders() (msgIBDBlock *appmessage.BlockHeadersMessage, doneHeaders bool, err error) { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) +func (flow *handleIBDFlow) receiveHeaders() (msgIBDBlock *appmessage.BlockHeadersMessage, doneHeaders bool, err error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, false, err } @@ -300,7 +356,7 @@ func (flow *handleRelayInvsFlow) receiveHeaders() (msgIBDBlock *appmessage.Block } } -func (flow *handleRelayInvsFlow) processHeader(consensus externalapi.Consensus, msgBlockHeader *appmessage.MsgBlockHeader) error { +func (flow *handleIBDFlow) processHeader(consensus externalapi.Consensus, msgBlockHeader *appmessage.MsgBlockHeader) error { header := appmessage.BlockHeaderToDomainBlockHeader(msgBlockHeader) block := &externalapi.DomainBlock{ Header: header, @@ -333,7 +389,7 @@ func (flow *handleRelayInvsFlow) processHeader(consensus externalapi.Consensus, return nil } -func (flow *handleRelayInvsFlow) validatePruningPointFutureHeaderTimestamps() error { +func (flow *handleIBDFlow) validatePruningPointFutureHeaderTimestamps() error { headerSelectedTipHash, err := flow.Domain().StagingConsensus().GetHeadersSelectedTip() if err != nil { return err @@ -367,7 +423,7 @@ func (flow *handleRelayInvsFlow) validatePruningPointFutureHeaderTimestamps() er return nil } -func (flow *handleRelayInvsFlow) receiveAndInsertPruningPointUTXOSet( +func (flow *handleIBDFlow) receiveAndInsertPruningPointUTXOSet( consensus externalapi.Consensus, pruningPointHash *externalapi.DomainHash) (bool, error) { onEnd := logger.LogAndMeasureExecutionTime(log, "receiveAndInsertPruningPointUTXOSet") @@ -376,7 +432,7 @@ func (flow *handleRelayInvsFlow) receiveAndInsertPruningPointUTXOSet( receivedChunkCount := 0 receivedUTXOCount := 0 for { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return false, err } @@ -422,7 +478,7 @@ func (flow *handleRelayInvsFlow) receiveAndInsertPruningPointUTXOSet( } } -func (flow *handleRelayInvsFlow) syncMissingBlockBodies(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHash) error { hashes, err := flow.Domain().Consensus().GetMissingBlockBodyHashes(highHash) if err != nil { return err @@ -449,7 +505,7 @@ func (flow *handleRelayInvsFlow) syncMissingBlockBodies(highHash *externalapi.Do } for _, expectedHash := range hashesToRequest { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return err } @@ -489,7 +545,16 @@ func (flow *handleRelayInvsFlow) syncMissingBlockBodies(highHash *externalapi.Do return flow.resolveVirtual() } -func (flow *handleRelayInvsFlow) resolveVirtual() error { +func (flow *handleIBDFlow) banIfBlockIsHeaderOnly(block *externalapi.DomainBlock) error { + if len(block.Transactions) == 0 { + return protocolerrors.Errorf(true, "sent header of %s block where expected block with body", + consensushashing.BlockHash(block)) + } + + return nil +} + +func (flow *handleIBDFlow) resolveVirtual() error { for i := 0; ; i++ { if i%10 == 0 { log.Infof("Resolving virtual. This may take some time...") @@ -510,18 +575,3 @@ func (flow *handleRelayInvsFlow) resolveVirtual() error { } } } - -// dequeueIncomingMessageAndSkipInvs is a convenience method to be used during -// IBD. Inv messages are expected to arrive at any given moment, but should be -// ignored while we're in IBD -func (flow *handleRelayInvsFlow) dequeueIncomingMessageAndSkipInvs(timeout time.Duration) (appmessage.Message, error) { - for { - message, err := flow.incomingRoute.DequeueWithTimeout(timeout) - if err != nil { - return nil, err - } - if _, ok := message.(*appmessage.MsgInvRelayBlock); !ok { - return message, nil - } - } -} diff --git a/app/protocol/flows/v4/blockrelay/ibd_with_headers_proof.go b/app/protocol/flows/v4/blockrelay/ibd_with_headers_proof.go index 215f7a044..e2f42a47a 100644 --- a/app/protocol/flows/v4/blockrelay/ibd_with_headers_proof.go +++ b/app/protocol/flows/v4/blockrelay/ibd_with_headers_proof.go @@ -11,7 +11,7 @@ import ( "github.com/pkg/errors" ) -func (flow *handleRelayInvsFlow) ibdWithHeadersProof(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) ibdWithHeadersProof(highHash *externalapi.DomainHash) error { err := flow.Domain().InitStagingConsensus() if err != nil { return err @@ -44,7 +44,7 @@ func (flow *handleRelayInvsFlow) ibdWithHeadersProof(highHash *externalapi.Domai return nil } -func (flow *handleRelayInvsFlow) shouldSyncAndShouldDownloadHeadersProof(highBlock *externalapi.DomainBlock, +func (flow *handleIBDFlow) shouldSyncAndShouldDownloadHeadersProof(highBlock *externalapi.DomainBlock, highestSharedBlockFound bool) (shouldDownload, shouldSync bool, err error) { if !highestSharedBlockFound { @@ -63,7 +63,7 @@ func (flow *handleRelayInvsFlow) shouldSyncAndShouldDownloadHeadersProof(highBlo return false, true, nil } -func (flow *handleRelayInvsFlow) checkIfHighHashHasMoreBlueWorkThanSelectedTipAndPruningDepthMoreBlueScore(highBlock *externalapi.DomainBlock) (bool, error) { +func (flow *handleIBDFlow) checkIfHighHashHasMoreBlueWorkThanSelectedTipAndPruningDepthMoreBlueScore(highBlock *externalapi.DomainBlock) (bool, error) { headersSelectedTip, err := flow.Domain().Consensus().GetHeadersSelectedTip() if err != nil { return false, err @@ -81,13 +81,13 @@ func (flow *handleRelayInvsFlow) checkIfHighHashHasMoreBlueWorkThanSelectedTipAn return highBlock.Header.BlueWork().Cmp(headersSelectedTipInfo.BlueWork) > 0, nil } -func (flow *handleRelayInvsFlow) syncAndValidatePruningPointProof() (*externalapi.DomainHash, error) { +func (flow *handleIBDFlow) syncAndValidatePruningPointProof() (*externalapi.DomainHash, error) { log.Infof("Downloading the pruning point proof from %s", flow.peer) err := flow.outgoingRoute.Enqueue(appmessage.NewMsgRequestPruningPointProof()) if err != nil { return nil, err } - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, err } @@ -113,7 +113,7 @@ func (flow *handleRelayInvsFlow) syncAndValidatePruningPointProof() (*externalap return consensushashing.HeaderHash(pruningPointProof.Headers[0][len(pruningPointProof.Headers[0])-1]), nil } -func (flow *handleRelayInvsFlow) downloadHeadersAndPruningUTXOSet(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) downloadHeadersAndPruningUTXOSet(highHash *externalapi.DomainHash) error { proofPruningPoint, err := flow.syncAndValidatePruningPointProof() if err != nil { return err @@ -164,7 +164,7 @@ func (flow *handleRelayInvsFlow) downloadHeadersAndPruningUTXOSet(highHash *exte return nil } -func (flow *handleRelayInvsFlow) syncPruningPointsAndPruningPointAnticone(proofPruningPoint *externalapi.DomainHash) error { +func (flow *handleIBDFlow) syncPruningPointsAndPruningPointAnticone(proofPruningPoint *externalapi.DomainHash) error { log.Infof("Downloading the past pruning points and the pruning point anticone from %s", flow.peer) err := flow.outgoingRoute.Enqueue(appmessage.NewMsgRequestPruningPointAndItsAnticone()) if err != nil { @@ -176,7 +176,7 @@ func (flow *handleRelayInvsFlow) syncPruningPointsAndPruningPointAnticone(proofP return err } - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return err } @@ -225,7 +225,7 @@ func (flow *handleRelayInvsFlow) syncPruningPointsAndPruningPointAnticone(proofP return nil } -func (flow *handleRelayInvsFlow) processBlockWithTrustedData( +func (flow *handleIBDFlow) processBlockWithTrustedData( consensus externalapi.Consensus, block *appmessage.MsgBlockWithTrustedDataV4, data *appmessage.MsgTrustedData) error { blockWithTrustedData := &externalapi.BlockWithTrustedData{ @@ -246,8 +246,8 @@ func (flow *handleRelayInvsFlow) processBlockWithTrustedData( return err } -func (flow *handleRelayInvsFlow) receiveBlockWithTrustedData() (*appmessage.MsgBlockWithTrustedDataV4, bool, error) { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) +func (flow *handleIBDFlow) receiveBlockWithTrustedData() (*appmessage.MsgBlockWithTrustedDataV4, bool, error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, false, err } @@ -267,8 +267,8 @@ func (flow *handleRelayInvsFlow) receiveBlockWithTrustedData() (*appmessage.MsgB } } -func (flow *handleRelayInvsFlow) receivePruningPoints() (*appmessage.MsgPruningPoints, error) { - message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout) +func (flow *handleIBDFlow) receivePruningPoints() (*appmessage.MsgPruningPoints, error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, err } @@ -283,7 +283,7 @@ func (flow *handleRelayInvsFlow) receivePruningPoints() (*appmessage.MsgPruningP return msgPruningPoints, nil } -func (flow *handleRelayInvsFlow) validateAndInsertPruningPoints(proofPruningPoint *externalapi.DomainHash) error { +func (flow *handleIBDFlow) validateAndInsertPruningPoints(proofPruningPoint *externalapi.DomainHash) error { currentPruningPoint, err := flow.Domain().Consensus().PruningPoint() if err != nil { return err @@ -327,7 +327,7 @@ func (flow *handleRelayInvsFlow) validateAndInsertPruningPoints(proofPruningPoin return nil } -func (flow *handleRelayInvsFlow) syncPruningPointUTXOSet(consensus externalapi.Consensus, +func (flow *handleIBDFlow) syncPruningPointUTXOSet(consensus externalapi.Consensus, pruningPoint *externalapi.DomainHash) (bool, error) { log.Infof("Checking if the suggested pruning point %s is compatible to the node DAG", pruningPoint) @@ -355,7 +355,7 @@ func (flow *handleRelayInvsFlow) syncPruningPointUTXOSet(consensus externalapi.C return true, nil } -func (flow *handleRelayInvsFlow) fetchMissingUTXOSet(consensus externalapi.Consensus, pruningPointHash *externalapi.DomainHash) (succeed bool, err error) { +func (flow *handleIBDFlow) fetchMissingUTXOSet(consensus externalapi.Consensus, pruningPointHash *externalapi.DomainHash) (succeed bool, err error) { defer func() { err := flow.Domain().StagingConsensus().ClearImportedPruningPointData() if err != nil { diff --git a/app/protocol/flows/v4/register.go b/app/protocol/flows/v4/register.go index 4b8385216..596ca9d92 100644 --- a/app/protocol/flows/v4/register.go +++ b/app/protocol/flows/v4/register.go @@ -66,6 +66,14 @@ func registerBlockRelayFlows(m protocolManager, router *routerpkg.Router, isStop m.RegisterFlow("HandleRelayInvs", router, []appmessage.MessageCommand{ appmessage.CmdInvRelayBlock, appmessage.CmdBlock, appmessage.CmdBlockLocator, + }, + isStopping, errChan, func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error { + return blockrelay.HandleRelayInvs(m.Context(), incomingRoute, + outgoingRoute, peer) + }, + ), + + m.RegisterFlow("HandleIBD", router, []appmessage.MessageCommand{ appmessage.CmdDoneHeaders, appmessage.CmdUnexpectedPruningPoint, appmessage.CmdPruningPointUTXOSetChunk, appmessage.CmdBlockHeaders, appmessage.CmdIBDBlockLocatorHighestHash, appmessage.CmdBlockWithTrustedDataV4, appmessage.CmdDoneBlocksWithTrustedData, appmessage.CmdIBDBlockLocatorHighestHashNotFound, @@ -74,7 +82,7 @@ func registerBlockRelayFlows(m protocolManager, router *routerpkg.Router, isStop appmessage.CmdTrustedData, }, isStopping, errChan, func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error { - return blockrelay.HandleRelayInvs(m.Context(), incomingRoute, + return blockrelay.HandleIBD(m.Context(), incomingRoute, outgoingRoute, peer) }, ), diff --git a/app/protocol/peer/peer.go b/app/protocol/peer/peer.go index 9567ea90b..be3a128df 100644 --- a/app/protocol/peer/peer.go +++ b/app/protocol/peer/peer.go @@ -31,6 +31,8 @@ type Peer struct { lastPingNonce uint64 // The nonce of the last ping we sent lastPingTime time.Time // Time we sent last ping lastPingDuration time.Duration // Time for last ping to return + + ibdRequestChannel chan *externalapi.DomainBlock // A channel used to communicate IBD requests between flows } // New returns a new Peer @@ -38,6 +40,7 @@ func New(connection *netadapter.NetConnection) *Peer { return &Peer{ connection: connection, connectionStarted: time.Now(), + ibdRequestChannel: make(chan *externalapi.DomainBlock), } } @@ -143,3 +146,8 @@ func (p *Peer) LastPingDuration() time.Duration { return p.lastPingDuration } + +// IBDRequestChannel returns the channel used in order to communicate an IBD request between peer flows +func (p *Peer) IBDRequestChannel() chan *externalapi.DomainBlock { + return p.ibdRequestChannel +} diff --git a/testing/integration/ibd_test.go b/testing/integration/ibd_test.go index c1c5b4773..ab8b53fdd 100644 --- a/testing/integration/ibd_test.go +++ b/testing/integration/ibd_test.go @@ -47,6 +47,8 @@ func TestIBD(t *testing.T) { } disableOnBlockAddedHandler = true + // Wait for syncee to exit IBD + time.Sleep(time.Second) // This should trigger resolving the syncee virtual mineNextBlock(t, syncer) time.Sleep(time.Second) @@ -87,7 +89,7 @@ func TestIBDWithPruning(t *testing.T) { start := time.Now() for range ticker.C { - if time.Since(start) > defaultTimeout { + if time.Since(start) > 2*defaultTimeout { t.Fatalf("Timeout waiting for IBD to finish.") } @@ -121,6 +123,7 @@ func TestIBDWithPruning(t *testing.T) { // This should trigger resolving the syncee virtual syncerTip := mineNextBlockWithMockTimestamps(t, syncer, rand.New(rand.NewSource(time.Now().UnixNano()))) time.Sleep(time.Second) + synceeSelectedTip, err := syncee.rpcClient.GetSelectedTipHash() if err != nil { t.Fatalf("Error getting tip for syncee")