Compare commits

...

1 Commits

Author SHA1 Message Date
Ori Newman
225291d451 return error on capacity reached 2020-09-02 18:32:26 +03:00
3 changed files with 9 additions and 32 deletions

View File

@@ -45,10 +45,6 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit
netConnection.invalidMessageChan <- err
})
router.SetOnRouteCapacityReachedHandler(func() {
netConnection.Disconnect()
})
routerInitializer(router, netConnection)
return netConnection

View File

@@ -21,11 +21,10 @@ var (
// ErrRouteClosed indicates that a route was closed while reading/writing.
ErrRouteClosed = errors.New("route is closed")
)
// onCapacityReachedHandler is a function that is to be
// called when a route reaches capacity.
type onCapacityReachedHandler func()
// ErrRouteCapacityReached indicates that route's capacity has reached
ErrRouteCapacityReached = protocolerrors.New(false, "route capacity has reached")
)
// Route represents an incoming or outgoing Router route
type Route struct {
@@ -34,8 +33,7 @@ type Route struct {
// reads use the channel's built-in mechanism to check if the channel is closed
closed bool
closeLock sync.Mutex
onCapacityReachedHandler onCapacityReachedHandler
capacity int
}
// NewRoute create a new Route
@@ -45,8 +43,9 @@ func NewRoute() *Route {
func newRouteWithCapacity(capacity int) *Route {
return &Route{
channel: make(chan appmessage.Message, capacity),
closed: false,
channel: make(chan appmessage.Message, capacity),
closed: false,
capacity: capacity,
}
}
@@ -58,8 +57,8 @@ func (r *Route) Enqueue(message appmessage.Message) error {
if r.closed {
return errors.WithStack(ErrRouteClosed)
}
if len(r.channel) == DefaultMaxMessages {
r.onCapacityReachedHandler()
if len(r.channel) == r.capacity {
return errors.Wrapf(ErrRouteCapacityReached, "reached capacity of %d", r.capacity)
}
r.channel <- message
return nil
@@ -88,10 +87,6 @@ func (r *Route) DequeueWithTimeout(timeout time.Duration) (appmessage.Message, e
}
}
func (r *Route) setOnCapacityReachedHandler(onCapacityReachedHandler onCapacityReachedHandler) {
r.onCapacityReachedHandler = onCapacityReachedHandler
}
// Close closes this route
func (r *Route) Close() {
r.closeLock.Lock()

View File

@@ -20,8 +20,6 @@ type Router struct {
incomingRoutesLock sync.RWMutex
outgoingRoute *Route
onRouteCapacityReachedHandler OnRouteCapacityReachedHandler
}
// NewRouter creates a new empty router
@@ -30,18 +28,9 @@ func NewRouter() *Router {
incomingRoutes: make(map[appmessage.MessageCommand]*Route),
outgoingRoute: newRouteWithCapacity(outgoingRouteMaxMessages),
}
router.outgoingRoute.setOnCapacityReachedHandler(func() {
router.onRouteCapacityReachedHandler()
})
return &router
}
// SetOnRouteCapacityReachedHandler sets the onRouteCapacityReachedHandler
// function for this router
func (r *Router) SetOnRouteCapacityReachedHandler(onRouteCapacityReachedHandler OnRouteCapacityReachedHandler) {
r.onRouteCapacityReachedHandler = onRouteCapacityReachedHandler
}
// AddIncomingRoute registers the messages of types `messageTypes` to
// be routed to the given `route`
func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Route, error) {
@@ -52,9 +41,6 @@ func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Ro
}
r.setIncomingRoute(messageType, route)
}
route.setOnCapacityReachedHandler(func() {
r.onRouteCapacityReachedHandler()
})
return route, nil
}