Increase the route capacity of InvTransaction messages. (#1603)

This commit is contained in:
stasatdaglabs 2021-03-14 13:02:55 +02:00 committed by GitHub
parent b5933bc4fe
commit e4e3541a30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 3 deletions

View File

@ -238,7 +238,7 @@ func (m *Manager) registerTransactionRelayFlow(router *routerpkg.Router, isStopp
outgoingRoute := router.OutgoingRoute()
return []*flow{
m.registerFlow("HandleRelayedTransactions", router,
m.registerFlowWithCapacity("HandleRelayedTransactions", 10_000, router,
[]appmessage.MessageCommand{appmessage.CmdInvTransaction, appmessage.CmdTx, appmessage.CmdTransactionNotFound}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return transactionrelay.HandleRelayedTransactions(m.context, incomingRoute, outgoingRoute)
@ -274,6 +274,24 @@ func (m *Manager) registerFlow(name string, router *routerpkg.Router, messageTyp
panic(err)
}
return m.registerFlowForRoute(route, name, isStopping, errChan, initializeFunc)
}
func (m *Manager) registerFlowWithCapacity(name string, capacity int, router *routerpkg.Router,
messageTypes []appmessage.MessageCommand, isStopping *uint32,
errChan chan error, initializeFunc flowInitializeFunc) *flow {
route, err := router.AddIncomingRouteWithCapacity(capacity, messageTypes)
if err != nil {
panic(err)
}
return m.registerFlowForRoute(route, name, isStopping, errChan, initializeFunc)
}
func (m *Manager) registerFlowForRoute(route *routerpkg.Route, name string, isStopping *uint32,
errChan chan error, initializeFunc flowInitializeFunc) *flow {
return &flow{
name: name,
executeFunc: func(peer *peerpkg.Peer) {

View File

@ -35,13 +35,32 @@ func NewRouter() *Router {
// be routed to the given `route`
func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Route, error) {
route := NewRoute()
err := r.initializeIncomingRoute(route, messageTypes)
if err != nil {
return nil, err
}
return route, nil
}
// AddIncomingRouteWithCapacity registers the messages of types `messageTypes` to
// be routed to the given `route` with a capacity of `capacity`
func (r *Router) AddIncomingRouteWithCapacity(capacity int, messageTypes []appmessage.MessageCommand) (*Route, error) {
route := newRouteWithCapacity(capacity)
err := r.initializeIncomingRoute(route, messageTypes)
if err != nil {
return nil, err
}
return route, nil
}
func (r *Router) initializeIncomingRoute(route *Route, messageTypes []appmessage.MessageCommand) error {
for _, messageType := range messageTypes {
if r.doesIncomingRouteExist(messageType) {
return nil, errors.Errorf("a route for '%s' already exists", messageType)
return errors.Errorf("a route for '%s' already exists", messageType)
}
r.setIncomingRoute(messageType, route)
}
return route, nil
return nil
}
// RemoveRoute unregisters the messages of types `messageTypes` from