mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-06-10 08:06:44 +00:00

* [NOD-1162] Separate kaspad to it's own package, so that I can use it out of integration test * [NOD-1162] Begin integration tests * [NOD-1162] [FIX] Assign cfg to RPCServer * [NOD-1162] Basic integration test ready * [NOD-1162] Wait for connection for real * [NOD-1162] [FIX] Connection manager should run the moment it adds a request * [NOD-1162] Make connect something that can be invoked in middle of test * [NOD-1162] Complete first integration test * [NOD-1162] Undo refactor error * [NOD-1162] Rename Kaspad to App * [NOD-1162] Convert checking connection to polling * [NOD-1162] [FIX] Set peerID on handshake * [NOD-1162] [FIX] Broadcast should send to outgoing route, not incoming * [NOD-1162] [FIX] Add CmdInvRelayBlock to MakeEmptyMessage * [NOD-1162] [FIX] Initialize Hash before decoding MsgInvRelayBlock * [NOD-1162] [FIX] Invert condition * [NOD-1162] [FIX] Fixes to encoding of MsgGetRelayBlocks * [NOD-1162] [FIX] Add MsgGetRelayBlocks to MakeEmptyMessage * [NOD-1162] [FIX] Connection manager should run the moment it adds a request * [NOD-1162] [FIX] Set peerID on handshake * [NOD-1162] [FIX] Broadcast should send to outgoing route, not incoming * [NOD-1162] [FIX] Add CmdInvRelayBlock to MakeEmptyMessage * [NOD-1162] [FIX] Initialize Hash before decoding MsgInvRelayBlock * [NOD-1162] [FIX] Invert condition * [NOD-1162] [FIX] Fixes to encoding of MsgGetRelayBlocks * [NOD-1162] [FIX] Add MsgGetRelayBlocks to MakeEmptyMessage * [NOD-1162] Add comment * [NOD-1162] Added support for 3 nodes and clients in integration tests * [NOD-1162] Add third node to integration test * [NOD-1192] Use lock-less functions in TxPool.HandleNewBlock * [NOD-1192] Broadcast transactions only if there's more then 0 * [NOD-1162] Removed double waitTillNextIteration * [NOD-1192] Rename: broadcastTransactions -> broadcastTransactionsAfterBlockAdded * [NOD-1162] Call NotifyBlocks on client3 as well * [NOD-1162] ErrTimeout and ErrRouteClosed should be ProtocolErrors * [NOD-1162] Added comment and removed redundant type PeerAddedCallback * [NOD-1162] Revert overly eager rename * [NOD-1162] Move DisalbeTLS to common config + minimize call for ioutil.TempDir() * [NOD-1162] Add some clarifications in code * [NOD-1193] Skip closed connections in NetAdapter.Broadcast * [NOD-1193] Make sure to protect connectionsToRouters from concurrent access * [NOD-1162] Add _test to all files in integration package * [NOD-1162] Introduced appHarness to better encapsulate a single node * [NOD-1162] Removed onChainChanged handler * [NOD-1162] Remove redundant closure * [NOD-1162] Correctly mark integration_test config as Simnet * [NOD-1162] Rename app.ID -> app.P2PNodeID
103 lines
2.5 KiB
Go
103 lines
2.5 KiB
Go
package router
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/kaspanet/kaspad/protocol/protocolerrors"
|
|
|
|
"github.com/kaspanet/kaspad/wire"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const (
|
|
defaultMaxMessages = 100
|
|
)
|
|
|
|
var (
|
|
// ErrTimeout signifies that one of the router functions had a timeout.
|
|
ErrTimeout = protocolerrors.New(false, "timeout expired")
|
|
|
|
// ErrRouteClosed indicates that a route was closed while reading/writing.
|
|
// TODO(libp2p): Remove protocol error here
|
|
ErrRouteClosed = protocolerrors.New(false, "route is closed")
|
|
)
|
|
|
|
// onCapacityReachedHandler is a function that is to be
|
|
// called when a route reaches capacity.
|
|
type onCapacityReachedHandler func()
|
|
|
|
// Route represents an incoming or outgoing Router route
|
|
type Route struct {
|
|
channel chan wire.Message
|
|
// closed and closeLock are used to protect us from writing to a closed channel
|
|
// reads use the channel's built-in mechanism to check if the channel is closed
|
|
closed bool
|
|
closeLock sync.Mutex
|
|
|
|
onCapacityReachedHandler onCapacityReachedHandler
|
|
}
|
|
|
|
// NewRoute create a new Route
|
|
func NewRoute() *Route {
|
|
return newRouteWithCapacity(defaultMaxMessages)
|
|
}
|
|
|
|
func newRouteWithCapacity(capacity int) *Route {
|
|
return &Route{
|
|
channel: make(chan wire.Message, capacity),
|
|
closed: false,
|
|
}
|
|
}
|
|
|
|
// Enqueue enqueues a message to the Route
|
|
func (r *Route) Enqueue(message wire.Message) error {
|
|
r.closeLock.Lock()
|
|
defer r.closeLock.Unlock()
|
|
|
|
if r.closed {
|
|
return errors.WithStack(ErrRouteClosed)
|
|
}
|
|
if len(r.channel) == defaultMaxMessages {
|
|
r.onCapacityReachedHandler()
|
|
}
|
|
r.channel <- message
|
|
return nil
|
|
}
|
|
|
|
// Dequeue dequeues a message from the Route
|
|
func (r *Route) Dequeue() (wire.Message, error) {
|
|
message, isOpen := <-r.channel
|
|
if !isOpen {
|
|
return nil, errors.WithStack(ErrRouteClosed)
|
|
}
|
|
return message, nil
|
|
}
|
|
|
|
// DequeueWithTimeout attempts to dequeue a message from the Route
|
|
// and returns an error if the given timeout expires first.
|
|
func (r *Route) DequeueWithTimeout(timeout time.Duration) (wire.Message, error) {
|
|
select {
|
|
case <-time.After(timeout):
|
|
return nil, errors.Wrapf(ErrTimeout, "got timeout after %s", timeout)
|
|
case message, isOpen := <-r.channel:
|
|
if !isOpen {
|
|
return nil, errors.WithStack(ErrRouteClosed)
|
|
}
|
|
return message, nil
|
|
}
|
|
}
|
|
|
|
func (r *Route) setOnCapacityReachedHandler(onCapacityReachedHandler onCapacityReachedHandler) {
|
|
r.onCapacityReachedHandler = onCapacityReachedHandler
|
|
}
|
|
|
|
// Close closes this route
|
|
func (r *Route) Close() {
|
|
r.closeLock.Lock()
|
|
defer r.closeLock.Unlock()
|
|
|
|
r.closed = true
|
|
close(r.channel)
|
|
}
|