mirror of
https://github.com/kaspanet/kaspad.git
synced 2026-02-21 03:03:08 +00:00
Compare commits
1 Commits
v0.7.2-dev
...
big-capaci
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
225291d451 |
@@ -45,10 +45,6 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit
|
||||
netConnection.invalidMessageChan <- err
|
||||
})
|
||||
|
||||
router.SetOnRouteCapacityReachedHandler(func() {
|
||||
netConnection.Disconnect()
|
||||
})
|
||||
|
||||
routerInitializer(router, netConnection)
|
||||
|
||||
return netConnection
|
||||
|
||||
@@ -21,11 +21,10 @@ var (
|
||||
|
||||
// ErrRouteClosed indicates that a route was closed while reading/writing.
|
||||
ErrRouteClosed = errors.New("route is closed")
|
||||
)
|
||||
|
||||
// onCapacityReachedHandler is a function that is to be
|
||||
// called when a route reaches capacity.
|
||||
type onCapacityReachedHandler func()
|
||||
// ErrRouteCapacityReached indicates that route's capacity has reached
|
||||
ErrRouteCapacityReached = protocolerrors.New(false, "route capacity has reached")
|
||||
)
|
||||
|
||||
// Route represents an incoming or outgoing Router route
|
||||
type Route struct {
|
||||
@@ -34,8 +33,7 @@ type Route struct {
|
||||
// reads use the channel's built-in mechanism to check if the channel is closed
|
||||
closed bool
|
||||
closeLock sync.Mutex
|
||||
|
||||
onCapacityReachedHandler onCapacityReachedHandler
|
||||
capacity int
|
||||
}
|
||||
|
||||
// NewRoute create a new Route
|
||||
@@ -45,8 +43,9 @@ func NewRoute() *Route {
|
||||
|
||||
func newRouteWithCapacity(capacity int) *Route {
|
||||
return &Route{
|
||||
channel: make(chan appmessage.Message, capacity),
|
||||
closed: false,
|
||||
channel: make(chan appmessage.Message, capacity),
|
||||
closed: false,
|
||||
capacity: capacity,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,8 +57,8 @@ func (r *Route) Enqueue(message appmessage.Message) error {
|
||||
if r.closed {
|
||||
return errors.WithStack(ErrRouteClosed)
|
||||
}
|
||||
if len(r.channel) == DefaultMaxMessages {
|
||||
r.onCapacityReachedHandler()
|
||||
if len(r.channel) == r.capacity {
|
||||
return errors.Wrapf(ErrRouteCapacityReached, "reached capacity of %d", r.capacity)
|
||||
}
|
||||
r.channel <- message
|
||||
return nil
|
||||
@@ -88,10 +87,6 @@ func (r *Route) DequeueWithTimeout(timeout time.Duration) (appmessage.Message, e
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Route) setOnCapacityReachedHandler(onCapacityReachedHandler onCapacityReachedHandler) {
|
||||
r.onCapacityReachedHandler = onCapacityReachedHandler
|
||||
}
|
||||
|
||||
// Close closes this route
|
||||
func (r *Route) Close() {
|
||||
r.closeLock.Lock()
|
||||
|
||||
@@ -20,8 +20,6 @@ type Router struct {
|
||||
incomingRoutesLock sync.RWMutex
|
||||
|
||||
outgoingRoute *Route
|
||||
|
||||
onRouteCapacityReachedHandler OnRouteCapacityReachedHandler
|
||||
}
|
||||
|
||||
// NewRouter creates a new empty router
|
||||
@@ -30,18 +28,9 @@ func NewRouter() *Router {
|
||||
incomingRoutes: make(map[appmessage.MessageCommand]*Route),
|
||||
outgoingRoute: newRouteWithCapacity(outgoingRouteMaxMessages),
|
||||
}
|
||||
router.outgoingRoute.setOnCapacityReachedHandler(func() {
|
||||
router.onRouteCapacityReachedHandler()
|
||||
})
|
||||
return &router
|
||||
}
|
||||
|
||||
// SetOnRouteCapacityReachedHandler sets the onRouteCapacityReachedHandler
|
||||
// function for this router
|
||||
func (r *Router) SetOnRouteCapacityReachedHandler(onRouteCapacityReachedHandler OnRouteCapacityReachedHandler) {
|
||||
r.onRouteCapacityReachedHandler = onRouteCapacityReachedHandler
|
||||
}
|
||||
|
||||
// AddIncomingRoute registers the messages of types `messageTypes` to
|
||||
// be routed to the given `route`
|
||||
func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Route, error) {
|
||||
@@ -52,9 +41,6 @@ func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Ro
|
||||
}
|
||||
r.setIncomingRoute(messageType, route)
|
||||
}
|
||||
route.setOnCapacityReachedHandler(func() {
|
||||
r.onRouteCapacityReachedHandler()
|
||||
})
|
||||
return route, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user