diff --git a/app/protocol/flows/blockrelay/handle_request_headers.go b/app/protocol/flows/blockrelay/handle_request_headers.go index 0559bba60..5e9f9776d 100644 --- a/app/protocol/flows/blockrelay/handle_request_headers.go +++ b/app/protocol/flows/blockrelay/handle_request_headers.go @@ -1,10 +1,11 @@ package blockrelay import ( + "github.com/kaspanet/kaspad/app/protocol/peer" + "github.com/kaspanet/kaspad/app/protocol/protocolerrors" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/app/appmessage" - "github.com/kaspanet/kaspad/app/protocol/protocolerrors" "github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" ) @@ -19,14 +20,18 @@ type RequestIBDBlocksContext interface { type handleRequestBlocksFlow struct { RequestIBDBlocksContext incomingRoute, outgoingRoute *router.Route + peer *peer.Peer } // HandleRequestHeaders handles RequestHeaders messages -func HandleRequestHeaders(context RequestIBDBlocksContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { +func HandleRequestHeaders(context RequestIBDBlocksContext, incomingRoute *router.Route, + outgoingRoute *router.Route, peer *peer.Peer) error { + flow := &handleRequestBlocksFlow{ RequestIBDBlocksContext: context, incomingRoute: incomingRoute, outgoingRoute: outgoingRoute, + peer: peer, } return flow.start() } @@ -38,53 +43,69 @@ func (flow *handleRequestBlocksFlow) start() error { return err } - // GetHashesBetween is a relatively heavy operation so we limit it. - // We expect that if the other peer did not receive all the headers - // they requested, they'd re-request a block locator and re-request - // headers with a higher lowHash - const maxBlueScoreDifference = 1 << 10 - blockHashes, err := flow.Domain().Consensus().GetHashesBetween(lowHash, highHash, maxBlueScoreDifference) - if err != nil { - return err - } - - for offset := 0; offset < len(blockHashes); offset += ibdBatchSize { - end := offset + ibdBatchSize - if end > len(blockHashes) { - end = len(blockHashes) - } - - blocksHashesToSend := blockHashes[offset:end] - - msgBlockHeadersToSend := make([]*appmessage.MsgBlockHeader, len(blocksHashesToSend)) - for i, blockHash := range blocksHashesToSend { - header, err := flow.Domain().Consensus().GetBlockHeader(blockHash) - if err != nil { - return err - } - msgBlockHeadersToSend[i] = appmessage.DomainBlockHeaderToBlockHeader(header) - } - err = flow.sendHeaders(msgBlockHeadersToSend) - if err != nil { - return nil - } - - // Exit the loop and don't wait for the GetNextIBDBlocks message if the last batch was - // less than ibdBatchSize. - if len(blocksHashesToSend) < ibdBatchSize { - break - } - - message, err := flow.incomingRoute.Dequeue() + batchBlockHeaders := make([]*appmessage.MsgBlockHeader, 0, ibdBatchSize) + for !lowHash.Equal(highHash) { + log.Debugf("Getting block hashes between %s and %s to %s", lowHash, highHash, flow.peer) + // GetHashesBetween is a relatively heavy operation so we limit it + // in order to avoid locking the consensus for too long + const maxBlueScoreDifference = 1 << 10 + blockHashes, err := flow.Domain().Consensus().GetHashesBetween(lowHash, highHash, maxBlueScoreDifference) if err != nil { return err } + log.Debugf("Got %d headers hashes lowHash %s", len(blockHashes), lowHash) - if _, ok := message.(*appmessage.MsgRequestNextHeaders); !ok { - return protocolerrors.Errorf(true, "received unexpected message type. "+ - "expected: %s, got: %s", appmessage.CmdRequestNextHeaders, message.Command()) + offset := 0 + for offset < len(blockHashes) { + for len(batchBlockHeaders) < ibdBatchSize { + hashAtOffset := blockHashes[offset] + blockHeader, err := flow.Domain().Consensus().GetBlockHeader(hashAtOffset) + if err != nil { + return err + } + blockHeaderMessage := appmessage.DomainBlockHeaderToBlockHeader(blockHeader) + batchBlockHeaders = append(batchBlockHeaders, blockHeaderMessage) + + offset++ + if offset == len(blockHashes) { + break + } + } + + if len(batchBlockHeaders) < ibdBatchSize { + break + } + + err = flow.sendHeaders(batchBlockHeaders) + if err != nil { + return nil + } + log.Debugf("Sent %d headers to peer %s", len(batchBlockHeaders), flow.peer) + + batchBlockHeaders = make([]*appmessage.MsgBlockHeader, 0, ibdBatchSize) + + message, err := flow.incomingRoute.Dequeue() + if err != nil { + return err + } + if _, ok := message.(*appmessage.MsgRequestNextHeaders); !ok { + return protocolerrors.Errorf(true, "received unexpected message type. "+ + "expected: %s, got: %s", appmessage.CmdRequestNextHeaders, message.Command()) + } } + + // The next lowHash is the last element in blockHashes + lowHash = blockHashes[len(blockHashes)-1] } + + if len(batchBlockHeaders) > 0 { + err = flow.sendHeaders(batchBlockHeaders) + if err != nil { + return nil + } + log.Debugf("Sent %d headers to peer %s", len(batchBlockHeaders), flow.peer) + } + err = flow.outgoingRoute.Enqueue(appmessage.NewMsgDoneHeaders()) if err != nil { return err diff --git a/app/protocol/protocol.go b/app/protocol/protocol.go index aaa4d4d6c..1bb0c0e5b 100644 --- a/app/protocol/protocol.go +++ b/app/protocol/protocol.go @@ -159,7 +159,7 @@ func (m *Manager) registerBlockRelayFlows(router *routerpkg.Router, isStopping * m.registerFlow("HandleRequestHeaders", router, []appmessage.MessageCommand{appmessage.CmdRequestHeaders, appmessage.CmdRequestNextHeaders}, isStopping, errChan, func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error { - return blockrelay.HandleRequestHeaders(m.context, incomingRoute, outgoingRoute) + return blockrelay.HandleRequestHeaders(m.context, incomingRoute, outgoingRoute, peer) }, ),