mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-03-30 15:08:33 +00:00

* add names to nameless routes * Update router.go Co-authored-by: Ori Newman <orinewman1@gmail.com>
139 lines
4.0 KiB
Go
139 lines
4.0 KiB
Go
package router
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/kaspanet/kaspad/app/appmessage"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const outgoingRouteMaxMessages = appmessage.MaxInvPerMsg + DefaultMaxMessages
|
|
|
|
// OnRouteCapacityReachedHandler is a function that is to
|
|
// be called when one of the routes reaches capacity.
|
|
type OnRouteCapacityReachedHandler func()
|
|
|
|
// Router routes messages by type to their respective
|
|
// input channels
|
|
type Router struct {
|
|
incomingRoutes map[appmessage.MessageCommand]*Route
|
|
incomingRoutesLock sync.RWMutex
|
|
|
|
outgoingRoute *Route
|
|
}
|
|
|
|
// NewRouter creates a new empty router
|
|
func NewRouter(name string) *Router {
|
|
router := Router{
|
|
incomingRoutes: make(map[appmessage.MessageCommand]*Route),
|
|
outgoingRoute: newRouteWithCapacity(fmt.Sprintf("%s - outgoing", name), outgoingRouteMaxMessages),
|
|
}
|
|
return &router
|
|
}
|
|
|
|
// AddIncomingRoute registers the messages of types `messageTypes` to
|
|
// be routed to the given `route`
|
|
func (r *Router) AddIncomingRoute(name string, messageTypes []appmessage.MessageCommand) (*Route, error) {
|
|
route := NewRoute(fmt.Sprintf("%s - incoming", name))
|
|
err := r.initializeIncomingRoute(route, messageTypes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return route, nil
|
|
}
|
|
|
|
// AddIncomingRouteWithCapacity registers the messages of types `messageTypes` to
|
|
// be routed to the given `route` with a capacity of `capacity`
|
|
func (r *Router) AddIncomingRouteWithCapacity(name string, capacity int, messageTypes []appmessage.MessageCommand) (*Route, error) {
|
|
route := newRouteWithCapacity(fmt.Sprintf("%s - incoming", name), capacity)
|
|
err := r.initializeIncomingRoute(route, messageTypes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return route, nil
|
|
}
|
|
|
|
func (r *Router) initializeIncomingRoute(route *Route, messageTypes []appmessage.MessageCommand) error {
|
|
for _, messageType := range messageTypes {
|
|
if r.doesIncomingRouteExist(messageType) {
|
|
return errors.Errorf("a route for '%s' already exists", messageType)
|
|
}
|
|
r.setIncomingRoute(messageType, route)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RemoveRoute unregisters the messages of types `messageTypes` from
|
|
// the router
|
|
func (r *Router) RemoveRoute(messageTypes []appmessage.MessageCommand) error {
|
|
for _, messageType := range messageTypes {
|
|
if !r.doesIncomingRouteExist(messageType) {
|
|
return errors.Errorf("a route for '%s' does not exist", messageType)
|
|
}
|
|
r.deleteIncomingRoute(messageType)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// EnqueueIncomingMessage enqueues the given message to the
|
|
// appropriate route
|
|
func (r *Router) EnqueueIncomingMessage(message appmessage.Message) error {
|
|
route, ok := r.incomingRoute(message.Command())
|
|
if !ok {
|
|
return errors.Errorf("a route for '%s' does not exist", message.Command())
|
|
}
|
|
return route.Enqueue(message)
|
|
}
|
|
|
|
// OutgoingRoute returns the outgoing route
|
|
func (r *Router) OutgoingRoute() *Route {
|
|
return r.outgoingRoute
|
|
}
|
|
|
|
// Close shuts down the router by closing all registered
|
|
// incoming routes and the outgoing route
|
|
func (r *Router) Close() {
|
|
r.incomingRoutesLock.Lock()
|
|
defer r.incomingRoutesLock.Unlock()
|
|
|
|
incomingRoutes := make(map[*Route]struct{})
|
|
for _, route := range r.incomingRoutes {
|
|
incomingRoutes[route] = struct{}{}
|
|
}
|
|
for route := range incomingRoutes {
|
|
route.Close()
|
|
}
|
|
r.outgoingRoute.Close()
|
|
}
|
|
|
|
func (r *Router) incomingRoute(messageType appmessage.MessageCommand) (*Route, bool) {
|
|
r.incomingRoutesLock.RLock()
|
|
defer r.incomingRoutesLock.RUnlock()
|
|
|
|
route, ok := r.incomingRoutes[messageType]
|
|
return route, ok
|
|
}
|
|
|
|
func (r *Router) doesIncomingRouteExist(messageType appmessage.MessageCommand) bool {
|
|
r.incomingRoutesLock.RLock()
|
|
defer r.incomingRoutesLock.RUnlock()
|
|
|
|
_, ok := r.incomingRoutes[messageType]
|
|
return ok
|
|
}
|
|
|
|
func (r *Router) setIncomingRoute(messageType appmessage.MessageCommand, route *Route) {
|
|
r.incomingRoutesLock.Lock()
|
|
defer r.incomingRoutesLock.Unlock()
|
|
|
|
r.incomingRoutes[messageType] = route
|
|
}
|
|
|
|
func (r *Router) deleteIncomingRoute(messageType appmessage.MessageCommand) {
|
|
r.incomingRoutesLock.Lock()
|
|
defer r.incomingRoutesLock.Unlock()
|
|
|
|
delete(r.incomingRoutes, messageType)
|
|
}
|