Send headers continuously without needing to run the BlockLocator protocol after every ~maxBlueScoreDifference blocks (#1339)

* Send headers continuously without needing to run the BlockLocator protocol after ever ~maxBlueScoreDifference blocks

* Add logging.

* Make logs more descriptive.
This commit is contained in:
stasatdaglabs 2021-01-03 15:50:21 +02:00 committed by GitHub
parent 97fddeff4b
commit e8cad2b2f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 44 deletions

View File

@ -1,10 +1,11 @@
package blockrelay package blockrelay
import ( import (
"github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/domain"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
) )
@ -19,14 +20,18 @@ type RequestIBDBlocksContext interface {
type handleRequestBlocksFlow struct { type handleRequestBlocksFlow struct {
RequestIBDBlocksContext RequestIBDBlocksContext
incomingRoute, outgoingRoute *router.Route incomingRoute, outgoingRoute *router.Route
peer *peer.Peer
} }
// HandleRequestHeaders handles RequestHeaders messages // HandleRequestHeaders handles RequestHeaders messages
func HandleRequestHeaders(context RequestIBDBlocksContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { func HandleRequestHeaders(context RequestIBDBlocksContext, incomingRoute *router.Route,
outgoingRoute *router.Route, peer *peer.Peer) error {
flow := &handleRequestBlocksFlow{ flow := &handleRequestBlocksFlow{
RequestIBDBlocksContext: context, RequestIBDBlocksContext: context,
incomingRoute: incomingRoute, incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute, outgoingRoute: outgoingRoute,
peer: peer,
} }
return flow.start() return flow.start()
} }
@ -38,53 +43,69 @@ func (flow *handleRequestBlocksFlow) start() error {
return err return err
} }
// GetHashesBetween is a relatively heavy operation so we limit it. batchBlockHeaders := make([]*appmessage.MsgBlockHeader, 0, ibdBatchSize)
// We expect that if the other peer did not receive all the headers for !lowHash.Equal(highHash) {
// they requested, they'd re-request a block locator and re-request log.Debugf("Getting block hashes between %s and %s to %s", lowHash, highHash, flow.peer)
// headers with a higher lowHash // GetHashesBetween is a relatively heavy operation so we limit it
const maxBlueScoreDifference = 1 << 10 // in order to avoid locking the consensus for too long
blockHashes, err := flow.Domain().Consensus().GetHashesBetween(lowHash, highHash, maxBlueScoreDifference) const maxBlueScoreDifference = 1 << 10
if err != nil { blockHashes, err := flow.Domain().Consensus().GetHashesBetween(lowHash, highHash, maxBlueScoreDifference)
return err
}
for offset := 0; offset < len(blockHashes); offset += ibdBatchSize {
end := offset + ibdBatchSize
if end > len(blockHashes) {
end = len(blockHashes)
}
blocksHashesToSend := blockHashes[offset:end]
msgBlockHeadersToSend := make([]*appmessage.MsgBlockHeader, len(blocksHashesToSend))
for i, blockHash := range blocksHashesToSend {
header, err := flow.Domain().Consensus().GetBlockHeader(blockHash)
if err != nil {
return err
}
msgBlockHeadersToSend[i] = appmessage.DomainBlockHeaderToBlockHeader(header)
}
err = flow.sendHeaders(msgBlockHeadersToSend)
if err != nil {
return nil
}
// Exit the loop and don't wait for the GetNextIBDBlocks message if the last batch was
// less than ibdBatchSize.
if len(blocksHashesToSend) < ibdBatchSize {
break
}
message, err := flow.incomingRoute.Dequeue()
if err != nil { if err != nil {
return err return err
} }
log.Debugf("Got %d headers hashes lowHash %s", len(blockHashes), lowHash)
if _, ok := message.(*appmessage.MsgRequestNextHeaders); !ok { offset := 0
return protocolerrors.Errorf(true, "received unexpected message type. "+ for offset < len(blockHashes) {
"expected: %s, got: %s", appmessage.CmdRequestNextHeaders, message.Command()) for len(batchBlockHeaders) < ibdBatchSize {
hashAtOffset := blockHashes[offset]
blockHeader, err := flow.Domain().Consensus().GetBlockHeader(hashAtOffset)
if err != nil {
return err
}
blockHeaderMessage := appmessage.DomainBlockHeaderToBlockHeader(blockHeader)
batchBlockHeaders = append(batchBlockHeaders, blockHeaderMessage)
offset++
if offset == len(blockHashes) {
break
}
}
if len(batchBlockHeaders) < ibdBatchSize {
break
}
err = flow.sendHeaders(batchBlockHeaders)
if err != nil {
return nil
}
log.Debugf("Sent %d headers to peer %s", len(batchBlockHeaders), flow.peer)
batchBlockHeaders = make([]*appmessage.MsgBlockHeader, 0, ibdBatchSize)
message, err := flow.incomingRoute.Dequeue()
if err != nil {
return err
}
if _, ok := message.(*appmessage.MsgRequestNextHeaders); !ok {
return protocolerrors.Errorf(true, "received unexpected message type. "+
"expected: %s, got: %s", appmessage.CmdRequestNextHeaders, message.Command())
}
} }
// The next lowHash is the last element in blockHashes
lowHash = blockHashes[len(blockHashes)-1]
} }
if len(batchBlockHeaders) > 0 {
err = flow.sendHeaders(batchBlockHeaders)
if err != nil {
return nil
}
log.Debugf("Sent %d headers to peer %s", len(batchBlockHeaders), flow.peer)
}
err = flow.outgoingRoute.Enqueue(appmessage.NewMsgDoneHeaders()) err = flow.outgoingRoute.Enqueue(appmessage.NewMsgDoneHeaders())
if err != nil { if err != nil {
return err return err

View File

@ -159,7 +159,7 @@ func (m *Manager) registerBlockRelayFlows(router *routerpkg.Router, isStopping *
m.registerFlow("HandleRequestHeaders", router, m.registerFlow("HandleRequestHeaders", router,
[]appmessage.MessageCommand{appmessage.CmdRequestHeaders, appmessage.CmdRequestNextHeaders}, isStopping, errChan, []appmessage.MessageCommand{appmessage.CmdRequestHeaders, appmessage.CmdRequestNextHeaders}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error { func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestHeaders(m.context, incomingRoute, outgoingRoute) return blockrelay.HandleRequestHeaders(m.context, incomingRoute, outgoingRoute, peer)
}, },
), ),