diff --git a/app/protocol/flows/blockrelay/handle_relay_invs.go b/app/protocol/flows/blockrelay/handle_relay_invs.go index 09264ca22..703fd5f3d 100644 --- a/app/protocol/flows/blockrelay/handle_relay_invs.go +++ b/app/protocol/flows/blockrelay/handle_relay_invs.go @@ -1,6 +1,8 @@ package blockrelay import ( + "time" + "github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/protocol/common" peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" @@ -11,6 +13,7 @@ import ( "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" "github.com/kaspanet/kaspad/infrastructure/config" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" + "github.com/kaspanet/kaspad/util/mstime" "github.com/pkg/errors" ) @@ -109,6 +112,8 @@ func (flow *handleRelayInvsFlow) start() error { if err != nil { return err } + msTime := mstime.UnixMilliseconds(block.Header.TimeInMilliseconds()) + time.Sleep(time.Until(msTime.ToNativeTime().Add(config.DelayDuration))) log.Debugf("Processing block %s", inv.Hash) missingParents, blockInsertionResult, err := flow.processBlock(block) diff --git a/infrastructure/config/config.go b/infrastructure/config/config.go index d130d34ca..a064119ba 100644 --- a/infrastructure/config/config.go +++ b/infrastructure/config/config.go @@ -27,6 +27,9 @@ import ( "github.com/pkg/errors" ) +// DelayDuration a duration for the delay, global for testing +var DelayDuration time.Duration + const ( defaultConfigFilename = "kaspad.conf" defaultDataDirname = "data" @@ -122,6 +125,7 @@ type Flags struct { UTXOIndex bool `long:"utxoindex" description:"Enable the UTXO index"` IsArchivalNode bool `long:"archival" description:"Run as an archival node: don't delete old block data when moving the pruning point (Warning: heavy disk usage)'"` EnableSanityCheckPruningUTXOSet bool `long:"enable-sanity-check-pruning-utxo" hidden:"true" description:"When moving the pruning point - check that the utxo set matches the utxo commitment"` + Delay float32 `long:"delay" description:"Provide a delay in seconds as a floating point"` NetworkFlags ServiceOptions *ServiceOptions } @@ -577,6 +581,8 @@ func LoadConfig() (*Config, error) { if configFileError != nil { log.Warnf("%s", configFileError) } + + DelayDuration = time.Duration(cfg.Delay * float32(time.Second)) return cfg, nil } diff --git a/infrastructure/network/netadapter/netadapter.go b/infrastructure/network/netadapter/netadapter.go index a2111e52a..784f8b57b 100644 --- a/infrastructure/network/netadapter/netadapter.go +++ b/infrastructure/network/netadapter/netadapter.go @@ -3,6 +3,7 @@ package netadapter import ( "sync" "sync/atomic" + "time" "github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/infrastructure/config" @@ -175,18 +176,20 @@ func (na *NetAdapter) ID() *id.ID { // P2PBroadcast sends the given `message` to every peer corresponding // to each NetConnection in the given netConnections func (na *NetAdapter) P2PBroadcast(netConnections []*NetConnection, message appmessage.Message) error { - na.p2pConnectionsLock.RLock() - defer na.p2pConnectionsLock.RUnlock() - - for _, netConnection := range netConnections { - err := netConnection.router.OutgoingRoute().Enqueue(message) - if err != nil { - if errors.Is(err, routerpkg.ErrRouteClosed) { - log.Debugf("Cannot enqueue message to %s: router is closed", netConnection) - continue + go func() { + time.Sleep(config.DelayDuration) + na.p2pConnectionsLock.RLock() + defer na.p2pConnectionsLock.RUnlock() + for _, netConnection := range netConnections { + err := netConnection.router.OutgoingRoute().Enqueue(message) + if err != nil { + if errors.Is(err, routerpkg.ErrRouteClosed) { + log.Debugf("Cannot enqueue message to %s: router is closed", netConnection) + continue + } + log.Error(err) } - return err } - } + }() return nil }