From d80144ef15be8a899684fbcab2d28e0c308d7c3f Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 30 Mar 2021 13:39:45 +0300 Subject: [PATCH] Increase the route capacity of InvTransaction messages. (#1603) (#1637) Co-authored-by: stasatdaglabs <39559713+stasatdaglabs@users.noreply.github.com> --- app/protocol/protocol.go | 20 +++++++++++++++- .../network/netadapter/router/router.go | 23 +++++++++++++++++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/app/protocol/protocol.go b/app/protocol/protocol.go index 2d4f48e04..ee5fadd2c 100644 --- a/app/protocol/protocol.go +++ b/app/protocol/protocol.go @@ -238,7 +238,7 @@ func (m *Manager) registerTransactionRelayFlow(router *routerpkg.Router, isStopp outgoingRoute := router.OutgoingRoute() return []*flow{ - m.registerFlow("HandleRelayedTransactions", router, + m.registerFlowWithCapacity("HandleRelayedTransactions", 10_000, router, []appmessage.MessageCommand{appmessage.CmdInvTransaction, appmessage.CmdTx, appmessage.CmdTransactionNotFound}, isStopping, errChan, func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error { return transactionrelay.HandleRelayedTransactions(m.context, incomingRoute, outgoingRoute) @@ -274,6 +274,24 @@ func (m *Manager) registerFlow(name string, router *routerpkg.Router, messageTyp panic(err) } + return m.registerFlowForRoute(route, name, isStopping, errChan, initializeFunc) +} + +func (m *Manager) registerFlowWithCapacity(name string, capacity int, router *routerpkg.Router, + messageTypes []appmessage.MessageCommand, isStopping *uint32, + errChan chan error, initializeFunc flowInitializeFunc) *flow { + + route, err := router.AddIncomingRouteWithCapacity(capacity, messageTypes) + if err != nil { + panic(err) + } + + return m.registerFlowForRoute(route, name, isStopping, errChan, initializeFunc) +} + +func (m *Manager) registerFlowForRoute(route *routerpkg.Route, name string, isStopping *uint32, + errChan chan error, initializeFunc flowInitializeFunc) *flow { + return &flow{ name: name, executeFunc: func(peer *peerpkg.Peer) { diff --git a/infrastructure/network/netadapter/router/router.go b/infrastructure/network/netadapter/router/router.go index 92157c485..51089f3d8 100644 --- a/infrastructure/network/netadapter/router/router.go +++ b/infrastructure/network/netadapter/router/router.go @@ -35,13 +35,32 @@ func NewRouter() *Router { // be routed to the given `route` func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Route, error) { route := NewRoute() + err := r.initializeIncomingRoute(route, messageTypes) + if err != nil { + return nil, err + } + return route, nil +} + +// AddIncomingRouteWithCapacity registers the messages of types `messageTypes` to +// be routed to the given `route` with a capacity of `capacity` +func (r *Router) AddIncomingRouteWithCapacity(capacity int, messageTypes []appmessage.MessageCommand) (*Route, error) { + route := newRouteWithCapacity(capacity) + err := r.initializeIncomingRoute(route, messageTypes) + if err != nil { + return nil, err + } + return route, nil +} + +func (r *Router) initializeIncomingRoute(route *Route, messageTypes []appmessage.MessageCommand) error { for _, messageType := range messageTypes { if r.doesIncomingRouteExist(messageType) { - return nil, errors.Errorf("a route for '%s' already exists", messageType) + return errors.Errorf("a route for '%s' already exists", messageType) } r.setIncomingRoute(messageType, route) } - return route, nil + return nil } // RemoveRoute unregisters the messages of types `messageTypes` from