[NOD-1212] Request IBD blocks in batches (#835)

* [NOD-1212] Request IBD blocks in batches

* [NOD-1212] Fix condition

* [NOD-1212] Remove redundant functions

* [NOD-1212] gofmt

* [NOD-1212] Fix condition

* [NOD-1212] Fix off by one error and add messages to messages.proto

* [NOD-1212] Refactor downloadBlocks

* [NOD-1212] Fix comment

* [NOD-1212] Return DefaultTimeout to original value
This commit is contained in:
Ori Newman 2020-08-02 13:46:07 +03:00 committed by GitHub
parent 20b547984e
commit e70561141d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 695 additions and 402 deletions

View File

@ -22,13 +22,17 @@ func TestIBD(t *testing.T) {
blockAddedWG := sync.WaitGroup{}
blockAddedWG.Add(numBlocks)
setOnBlockAddedHandler(t, syncee, func(header *wire.BlockHeader) { blockAddedWG.Done() })
receivedBlocks := 0
setOnBlockAddedHandler(t, syncee, func(header *wire.BlockHeader) {
receivedBlocks++
blockAddedWG.Done()
})
connect(t, syncer, syncee)
select {
case <-time.After(defaultTimeout):
t.Fatalf("Timeout waiting for IBD to finish")
t.Fatalf("Timeout waiting for IBD to finish. Received %d blocks out of %d", receivedBlocks, numBlocks)
case <-locks.ReceiveFromChanWhenDone(func() { blockAddedWG.Wait() }):
}

View File

@ -11,7 +11,8 @@ import (
)
const (
defaultMaxMessages = 100
// DefaultMaxMessages is the default capacity for a route with a capacity defined
DefaultMaxMessages = 100
)
var (
@ -40,7 +41,7 @@ type Route struct {
// NewRoute create a new Route
func NewRoute() *Route {
return newRouteWithCapacity(defaultMaxMessages)
return newRouteWithCapacity(DefaultMaxMessages)
}
func newRouteWithCapacity(capacity int) *Route {
@ -58,7 +59,7 @@ func (r *Route) Enqueue(message wire.Message) error {
if r.closed {
return errors.WithStack(ErrRouteClosed)
}
if len(r.channel) == defaultMaxMessages {
if len(r.channel) == DefaultMaxMessages {
r.onCapacityReachedHandler()
}
r.channel <- message

View File

@ -7,7 +7,7 @@ import (
"github.com/pkg/errors"
)
const outgoingRouteMaxMessages = wire.MaxInvPerMsg + defaultMaxMessages
const outgoingRouteMaxMessages = wire.MaxInvPerMsg + DefaultMaxMessages
// OnRouteCapacityReachedHandler is a function that is to
// be called when one of the routes reaches capacity.

View File

@ -0,0 +1,11 @@
package protowire
import "github.com/kaspanet/kaspad/wire"
func (x *KaspadMessage_DoneIBDBlocks) toWireMessage() (wire.Message, error) {
return &wire.MsgDoneIBDBlocks{}, nil
}
func (x *KaspadMessage_DoneIBDBlocks) fromWireMessage(_ *wire.MsgDoneIBDBlocks) error {
return nil
}

View File

@ -0,0 +1,11 @@
package protowire
import "github.com/kaspanet/kaspad/wire"
func (x *KaspadMessage_RequestNextIBDBlocks) toWireMessage() (wire.Message, error) {
return &wire.MsgRequestNextIBDBlocks{}, nil
}
func (x *KaspadMessage_RequestNextIBDBlocks) fromWireMessage(_ *wire.MsgRequestNextIBDBlocks) error {
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -12,17 +12,19 @@ message KaspadMessage {
BlockLocatorMessage blockLocator = 5;
RequestAddressesMessage requestAddresses = 6;
RequestIBDBlocksMessage requestIBDBlocks = 7;
RequestRelayBlocksMessage requestRelayBlocks = 8;
RequestSelectedTipMessage requestSelectedTip = 9;
RequestTransactionsMessage requestTransactions = 10;
BlockMessage ibdBlock = 11;
InvRelayBlockMessage invRelayBlock = 12;
InvTransactionsMessage invTransactions = 13;
PingMessage ping = 14;
PongMessage pong = 15;
SelectedTipMessage selectedTip = 16;
VerackMessage verack = 17;
VersionMessage version = 18;
RequestNextIBDBlocksMessage requestNextIBDBlocks = 8;
DoneIBDBlocksMessage DoneIBDBlocks = 9;
RequestRelayBlocksMessage requestRelayBlocks = 10;
RequestSelectedTipMessage requestSelectedTip = 11;
RequestTransactionsMessage requestTransactions = 12;
BlockMessage ibdBlock = 13;
InvRelayBlockMessage invRelayBlock = 14;
InvTransactionsMessage invTransactions = 15;
PingMessage ping = 16;
PongMessage pong = 17;
SelectedTipMessage selectedTip = 18;
VerackMessage verack = 19;
VersionMessage version = 20;
}
}
@ -127,6 +129,16 @@ message RequestIBDBlocksMessage{
}
// GetBlocksMessage end
// RequestNextIBDBlocksMessage start
message RequestNextIBDBlocksMessage{
}
// RequestNextIBDBlocksMessage end
// DoneIBDBlocksMessage start
message DoneIBDBlocksMessage{
}
// DoneIBDBlocksMessage end
// GetRelayBlocksMessage start
message RequestRelayBlocksMessage{
repeated Hash hashes = 1;

View File

@ -69,6 +69,20 @@ func toPayload(message wire.Message) (isKaspadMessage_Payload, error) {
return nil, err
}
return payload, nil
case *wire.MsgRequestNextIBDBlocks:
payload := new(KaspadMessage_RequestNextIBDBlocks)
err := payload.fromWireMessage(message)
if err != nil {
return nil, err
}
return payload, nil
case *wire.MsgDoneIBDBlocks:
payload := new(KaspadMessage_DoneIBDBlocks)
err := payload.fromWireMessage(message)
if err != nil {
return nil, err
}
return payload, nil
case *wire.MsgRequestRelayBlocks:
payload := new(KaspadMessage_RequestRelayBlocks)
err := payload.fromWireMessage(message)

View File

@ -3,10 +3,13 @@ package ibd
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
)
const ibdBatchSize = router.DefaultMaxMessages
// RequestIBDBlocksContext is the interface for the context needed for the HandleRequestIBDBlocks flow.
type RequestIBDBlocksContext interface {
DAG() *blockdag.BlockDAG
@ -39,9 +42,37 @@ func (flow *handleRequestBlocksFlow) start() error {
return err
}
err = flow.sendMsgIBDBlocks(msgIBDBlocks)
for offset := 0; offset < len(msgIBDBlocks); offset += ibdBatchSize {
end := offset + ibdBatchSize
if end > len(msgIBDBlocks) {
end = len(msgIBDBlocks)
}
blocksToSend := msgIBDBlocks[offset:end]
err = flow.sendMsgIBDBlocks(blocksToSend)
if err != nil {
return nil
}
// Exit the loop and don't wait for the GetNextIBDBlocks message if the last batch was
// less than ibdBatchSize.
if len(blocksToSend) < ibdBatchSize {
break
}
message, err := flow.incomingRoute.Dequeue()
if err != nil {
return err
}
if _, ok := message.(*wire.MsgRequestNextIBDBlocks); !ok {
return protocolerrors.Errorf(true, "received unexpected message type. "+
"expected: %s, got: %s", wire.CmdRequestNextIBDBlocks, message.Command())
}
}
err = flow.outgoingRoute.Enqueue(wire.NewMsgDoneIBDBlocks())
if err != nil {
return nil
return err
}
}
}

View File

@ -122,17 +122,28 @@ func (flow *handleIBDFlow) downloadBlocks(highestSharedBlockHash *daghash.Hash,
return err
}
blocksReceived := 0
for {
msgIBDBlock, err := flow.receiveIBDBlock()
msgIBDBlock, doneIBD, err := flow.receiveIBDBlock()
if err != nil {
return err
}
if doneIBD {
return nil
}
err = flow.processIBDBlock(msgIBDBlock)
if err != nil {
return err
}
if msgIBDBlock.BlockHash().IsEqual(peerSelectedTipHash) {
return nil
blocksReceived++
if blocksReceived%ibdBatchSize == 0 {
err = flow.outgoingRoute.Enqueue(wire.NewMsgRequestNextIBDBlocks())
if err != nil {
return err
}
}
}
}
@ -144,18 +155,21 @@ func (flow *handleIBDFlow) sendGetBlocks(highestSharedBlockHash *daghash.Hash,
return flow.outgoingRoute.Enqueue(msgGetBlockInvs)
}
func (flow *handleIBDFlow) receiveIBDBlock() (msgIBDBlock *wire.MsgIBDBlock, err error) {
func (flow *handleIBDFlow) receiveIBDBlock() (msgIBDBlock *wire.MsgIBDBlock, doneIBD bool, err error) {
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, err
return nil, false, err
}
msgIBDBlock, ok := message.(*wire.MsgIBDBlock)
if !ok {
return nil,
switch message := message.(type) {
case *wire.MsgIBDBlock:
return message, false, nil
case *wire.MsgDoneIBDBlocks:
return nil, true, nil
default:
return nil, false,
protocolerrors.Errorf(true, "received unexpected message type. "+
"expected: %s, got: %s", wire.CmdIBDBlock, message.Command())
}
return msgIBDBlock, nil
}
func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *wire.MsgIBDBlock) error {

View File

@ -161,7 +161,8 @@ func (m *Manager) registerIBDFlows(router *routerpkg.Router, isStopping *uint32,
outgoingRoute := router.OutgoingRoute()
return []*flow{
m.registerFlow("HandleIBD", router, []wire.MessageCommand{wire.CmdBlockLocator, wire.CmdIBDBlock}, isStopping, errChan,
m.registerFlow("HandleIBD", router, []wire.MessageCommand{wire.CmdBlockLocator, wire.CmdIBDBlock,
wire.CmdDoneIBDBlocks}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return ibd.HandleIBD(m.context, incomingRoute, outgoingRoute, peer)
},
@ -185,7 +186,7 @@ func (m *Manager) registerIBDFlows(router *routerpkg.Router, isStopping *uint32,
},
),
m.registerFlow("HandleRequestIBDBlocks", router, []wire.MessageCommand{wire.CmdRequestIBDBlocks}, isStopping, errChan,
m.registerFlow("HandleRequestIBDBlocks", router, []wire.MessageCommand{wire.CmdRequestIBDBlocks, wire.CmdRequestNextIBDBlocks}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return ibd.HandleRequestIBDBlocks(m.context, incomingRoute, outgoingRoute)
},

View File

@ -43,27 +43,31 @@ const (
CmdInvTransaction
CmdRequestTransactions
CmdIBDBlock
CmdRequestNextIBDBlocks
CmdDoneIBDBlocks
)
var messageCommandToString = map[MessageCommand]string{
CmdVersion: "Version",
CmdVerAck: "VerAck",
CmdRequestAddresses: "RequestAddresses",
CmdAddresses: "Addresses",
CmdRequestIBDBlocks: "RequestBlocks",
CmdBlock: "Block",
CmdTx: "Tx",
CmdPing: "Ping",
CmdPong: "Pong",
CmdRequestBlockLocator: "RequestBlockLocator",
CmdBlockLocator: "BlockLocator",
CmdSelectedTip: "SelectedTip",
CmdRequestSelectedTip: "RequestSelectedTip",
CmdInvRelayBlock: "InvRelayBlock",
CmdRequestRelayBlocks: "RequestRelayBlocks",
CmdInvTransaction: "InvTransaction",
CmdRequestTransactions: "RequestTransactions",
CmdIBDBlock: "IBDBlock",
CmdVersion: "Version",
CmdVerAck: "VerAck",
CmdRequestAddresses: "RequestAddresses",
CmdAddresses: "Addresses",
CmdRequestIBDBlocks: "RequestBlocks",
CmdBlock: "Block",
CmdTx: "Tx",
CmdPing: "Ping",
CmdPong: "Pong",
CmdRequestBlockLocator: "RequestBlockLocator",
CmdBlockLocator: "BlockLocator",
CmdSelectedTip: "SelectedTip",
CmdRequestSelectedTip: "RequestSelectedTip",
CmdInvRelayBlock: "InvRelayBlock",
CmdRequestRelayBlocks: "RequestRelayBlocks",
CmdInvTransaction: "InvTransaction",
CmdRequestTransactions: "RequestTransactions",
CmdIBDBlock: "IBDBlock",
CmdRequestNextIBDBlocks: "RequestNextIBDBlocks",
CmdDoneIBDBlocks: "DoneIBDBlocks",
}
// Message is an interface that describes a kaspa message. A type that

20
wire/msgdoneibdblocks.go Normal file
View File

@ -0,0 +1,20 @@
package wire
// MsgDoneIBDBlocks implements the Message interface and represents a kaspa
// DoneIBDBlocks message. It is used to notify the IBD syncing peer that the
// syncer sent all the requested blocks.
//
// This message has no payload.
type MsgDoneIBDBlocks struct{}
// Command returns the protocol command string for the message. This is part
// of the Message interface implementation.
func (msg *MsgDoneIBDBlocks) Command() MessageCommand {
return CmdDoneIBDBlocks
}
// NewMsgDoneIBDBlocks returns a new kaspa DoneIBDBlocks message that conforms to the
// Message interface.
func NewMsgDoneIBDBlocks() *MsgDoneIBDBlocks {
return &MsgDoneIBDBlocks{}
}

View File

@ -0,0 +1,20 @@
package wire
// MsgRequestNextIBDBlocks implements the Message interface and represents a kaspa
// RequestNextIBDBlocks message. It is used to notify the IBD syncer peer to send
// more blocks.
//
// This message has no payload.
type MsgRequestNextIBDBlocks struct{}
// Command returns the protocol command string for the message. This is part
// of the Message interface implementation.
func (msg *MsgRequestNextIBDBlocks) Command() MessageCommand {
return CmdRequestNextIBDBlocks
}
// NewMsgRequestNextIBDBlocks returns a new kaspa RequestNextIBDBlocks message that conforms to the
// Message interface.
func NewMsgRequestNextIBDBlocks() *MsgRequestNextIBDBlocks {
return &MsgRequestNextIBDBlocks{}
}