diff --git a/infrastructure/network/connmanager/incoming_connections.go b/infrastructure/network/connmanager/incoming_connections.go index 587421de3..9c206a2e1 100644 --- a/infrastructure/network/connmanager/incoming_connections.go +++ b/infrastructure/network/connmanager/incoming_connections.go @@ -8,8 +8,12 @@ func (c *ConnectionManager) checkIncomingConnections(incomingConnectionSet conne } numConnectionsOverMax := len(incomingConnectionSet) - c.maxIncoming + log.Debugf("Got %d incoming connections while only %d are allowed. Disconnecting "+ + "%d", len(incomingConnectionSet), c.maxIncoming, numConnectionsOverMax) + // randomly disconnect nodes until the number of incoming connections is smaller than maxIncoming for _, connection := range incomingConnectionSet { + log.Debugf("Disconnecting %s due to exceeding incoming connections", connection) connection.Disconnect() numConnectionsOverMax-- diff --git a/infrastructure/network/netadapter/netconnection.go b/infrastructure/network/netadapter/netconnection.go index aa45def53..3c286018c 100644 --- a/infrastructure/network/netadapter/netconnection.go +++ b/infrastructure/network/netadapter/netconnection.go @@ -5,6 +5,7 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "sync/atomic" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/id" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/server" @@ -17,7 +18,7 @@ type NetConnection struct { router *routerpkg.Router invalidMessageChan chan error onDisconnectedHandler server.OnDisconnectedHandler - isConnected uint32 + isRouterClosed uint32 } func newNetConnection(connection server.Connection, routerInitializer RouterInitializer) *NetConnection { @@ -30,7 +31,12 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit } netConnection.connection.SetOnDisconnectedHandler(func() { - router.Close() + // If the disconnection came because of a network error and not because of the application layer, we + // need to close the router as well. + if atomic.AddUint32(&netConnection.isRouterClosed, 1) == 1 { + netConnection.router.Close() + } + close(netConnection.invalidMessageChan) netConnection.onDisconnectedHandler() }) @@ -91,7 +97,9 @@ func (c *NetConnection) setOnDisconnectedHandler(onDisconnectedHandler server.On // Disconnect disconnects the given connection func (c *NetConnection) Disconnect() { - c.connection.Disconnect() + if atomic.AddUint32(&c.isRouterClosed, 1) == 1 { + c.router.Close() + } } // DequeueInvalidMessage dequeues the next invalid message diff --git a/infrastructure/network/netadapter/router/route.go b/infrastructure/network/netadapter/router/route.go index 6a04cab93..a1296722b 100644 --- a/infrastructure/network/netadapter/router/route.go +++ b/infrastructure/network/netadapter/router/route.go @@ -100,8 +100,3 @@ func (r *Route) Close() { r.closed = true close(r.channel) } - -// IsEmpty returns true if the route doesn't have any pending messages -func (r *Route) IsEmpty() bool { - return len(r.channel) == 0 -} diff --git a/infrastructure/network/netadapter/server/grpcserver/connection_loops.go b/infrastructure/network/netadapter/server/grpcserver/connection_loops.go index a6b3905e3..c7eaf8232 100644 --- a/infrastructure/network/netadapter/server/grpcserver/connection_loops.go +++ b/infrastructure/network/netadapter/server/grpcserver/connection_loops.go @@ -33,8 +33,7 @@ func (c *gRPCConnection) connectionLoops() error { func (c *gRPCConnection) sendLoop() error { outgoingRoute := c.router.OutgoingRoute() - // Once the connection is closed, the send loop empties the remaining messages in the channel. - for c.IsConnected() || !outgoingRoute.IsEmpty() { + for c.IsConnected() { message, err := outgoingRoute.Dequeue() if err != nil { if errors.Is(err, routerpkg.ErrRouteClosed) { @@ -57,7 +56,6 @@ func (c *gRPCConnection) sendLoop() error { if err != nil { return err } - } return nil } diff --git a/infrastructure/network/netadapter/server/grpcserver/grpc_connection.go b/infrastructure/network/netadapter/server/grpcserver/grpc_connection.go index ba8144066..af145fffb 100644 --- a/infrastructure/network/netadapter/server/grpcserver/grpc_connection.go +++ b/infrastructure/network/netadapter/server/grpcserver/grpc_connection.go @@ -1,13 +1,11 @@ package grpcserver import ( + "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" + "github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire" "github.com/pkg/errors" "net" "sync/atomic" - "time" - - "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" - "github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/server" "google.golang.org/grpc" @@ -93,14 +91,9 @@ func (c *gRPCConnection) Disconnect() { close(c.stopChan) if c.isOutbound { - spawn("gRPCConnection.Disconnect-clientStream.CloseSend", func() { - // Wait a second before closing the stream, to let the send queue to get emptied. - const finishSendDuration = time.Second - time.Sleep(finishSendDuration) - clientStream := c.stream.(protowire.P2P_MessageStreamClient) - _ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection - log.Debugf("Disconnected from %s", c) - }) + clientStream := c.stream.(protowire.P2P_MessageStreamClient) + _ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection + log.Debugf("Disconnected from %s", c) } log.Debugf("Disconnecting from %s", c)