Ori Newman 4dd7113dc5
Increase virtualChangeChan to 100e3 (#2056)
* Increase virtualChangeChan to 100e3
Don't crash when sending UTXO RPC notification to a closed route
Throw error if virtualChangeChan is full

* Use MaybeEnqueue in more places

* Remove comment

* Ignore capacity reached errors on MaybeEnqueue
2022-05-20 19:31:13 +03:00

117 lines
3.1 KiB
Go

package router
import (
"sync"
"time"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/pkg/errors"
)
const (
// DefaultMaxMessages is the default capacity for a route with a capacity defined
DefaultMaxMessages = 200
)
var (
// ErrTimeout signifies that one of the router functions had a timeout.
ErrTimeout = protocolerrors.New(false, "timeout expired")
// ErrRouteClosed indicates that a route was closed while reading/writing.
ErrRouteClosed = errors.New("route is closed")
// ErrRouteCapacityReached indicates that route's capacity has been reached
ErrRouteCapacityReached = protocolerrors.New(false, "route capacity has been reached")
)
// Route represents an incoming or outgoing Router route
type Route struct {
name string
channel chan appmessage.Message
// closed and closeLock are used to protect us from writing to a closed channel
// reads use the channel's built-in mechanism to check if the channel is closed
closed bool
closeLock sync.Mutex
capacity int
}
// NewRoute create a new Route
func NewRoute(name string) *Route {
return newRouteWithCapacity(name, DefaultMaxMessages)
}
func newRouteWithCapacity(name string, capacity int) *Route {
return &Route{
name: name,
channel: make(chan appmessage.Message, capacity),
closed: false,
capacity: capacity,
}
}
// Enqueue enqueues a message to the Route
func (r *Route) Enqueue(message appmessage.Message) error {
r.closeLock.Lock()
defer r.closeLock.Unlock()
if r.closed {
return errors.WithStack(ErrRouteClosed)
}
if len(r.channel) == r.capacity {
return errors.Wrapf(ErrRouteCapacityReached, "route '%s' reached capacity of %d", r.name, r.capacity)
}
r.channel <- message
return nil
}
// MaybeEnqueue enqueues a message to the route, but doesn't throw an error
// if it's closed or its capacity has been reached.
func (r *Route) MaybeEnqueue(message appmessage.Message) error {
err := r.Enqueue(message)
if errors.Is(err, ErrRouteClosed) {
log.Infof("Couldn't send message to closed route '%s'", r.name)
return nil
}
if errors.Is(err, ErrRouteCapacityReached) {
log.Infof("Capacity (%d) of route '%s' has been reached. Couldn't send message", r.capacity, r.name)
return nil
}
return err
}
// Dequeue dequeues a message from the Route
func (r *Route) Dequeue() (appmessage.Message, error) {
message, isOpen := <-r.channel
if !isOpen {
return nil, errors.Wrapf(ErrRouteClosed, "route '%s' is closed", r.name)
}
return message, nil
}
// DequeueWithTimeout attempts to dequeue a message from the Route
// and returns an error if the given timeout expires first.
func (r *Route) DequeueWithTimeout(timeout time.Duration) (appmessage.Message, error) {
select {
case <-time.After(timeout):
return nil, errors.Wrapf(ErrTimeout, "route '%s' got timeout after %s", r.name, timeout)
case message, isOpen := <-r.channel:
if !isOpen {
return nil, errors.WithStack(ErrRouteClosed)
}
return message, nil
}
}
// Close closes this route
func (r *Route) Close() {
r.closeLock.Lock()
defer r.closeLock.Unlock()
r.closed = true
close(r.channel)
}