mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-06-06 14:16:43 +00:00
[NOD-1286] Close router from netConnection.Disconnect (#881)
* [NOD-1286] Close router from netConnection.Disconnect * [NOD-1286] Close router in grpc errors as well * [NOD-1286] Fix typo * [NOD-1286] Rename isConnected->isRouterClosed
This commit is contained in:
parent
5a4cafe342
commit
fcae491e6d
@ -8,8 +8,12 @@ func (c *ConnectionManager) checkIncomingConnections(incomingConnectionSet conne
|
|||||||
}
|
}
|
||||||
|
|
||||||
numConnectionsOverMax := len(incomingConnectionSet) - c.maxIncoming
|
numConnectionsOverMax := len(incomingConnectionSet) - c.maxIncoming
|
||||||
|
log.Debugf("Got %d incoming connections while only %d are allowed. Disconnecting "+
|
||||||
|
"%d", len(incomingConnectionSet), c.maxIncoming, numConnectionsOverMax)
|
||||||
|
|
||||||
// randomly disconnect nodes until the number of incoming connections is smaller than maxIncoming
|
// randomly disconnect nodes until the number of incoming connections is smaller than maxIncoming
|
||||||
for _, connection := range incomingConnectionSet {
|
for _, connection := range incomingConnectionSet {
|
||||||
|
log.Debugf("Disconnecting %s due to exceeding incoming connections", connection)
|
||||||
connection.Disconnect()
|
connection.Disconnect()
|
||||||
|
|
||||||
numConnectionsOverMax--
|
numConnectionsOverMax--
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"github.com/kaspanet/kaspad/app/appmessage"
|
"github.com/kaspanet/kaspad/app/appmessage"
|
||||||
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
|
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
|
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
|
||||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
|
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
|
||||||
@ -17,7 +18,7 @@ type NetConnection struct {
|
|||||||
router *routerpkg.Router
|
router *routerpkg.Router
|
||||||
invalidMessageChan chan error
|
invalidMessageChan chan error
|
||||||
onDisconnectedHandler server.OnDisconnectedHandler
|
onDisconnectedHandler server.OnDisconnectedHandler
|
||||||
isConnected uint32
|
isRouterClosed uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func newNetConnection(connection server.Connection, routerInitializer RouterInitializer) *NetConnection {
|
func newNetConnection(connection server.Connection, routerInitializer RouterInitializer) *NetConnection {
|
||||||
@ -30,7 +31,12 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit
|
|||||||
}
|
}
|
||||||
|
|
||||||
netConnection.connection.SetOnDisconnectedHandler(func() {
|
netConnection.connection.SetOnDisconnectedHandler(func() {
|
||||||
router.Close()
|
// If the disconnection came because of a network error and not because of the application layer, we
|
||||||
|
// need to close the router as well.
|
||||||
|
if atomic.AddUint32(&netConnection.isRouterClosed, 1) == 1 {
|
||||||
|
netConnection.router.Close()
|
||||||
|
}
|
||||||
|
|
||||||
close(netConnection.invalidMessageChan)
|
close(netConnection.invalidMessageChan)
|
||||||
netConnection.onDisconnectedHandler()
|
netConnection.onDisconnectedHandler()
|
||||||
})
|
})
|
||||||
@ -91,7 +97,9 @@ func (c *NetConnection) setOnDisconnectedHandler(onDisconnectedHandler server.On
|
|||||||
|
|
||||||
// Disconnect disconnects the given connection
|
// Disconnect disconnects the given connection
|
||||||
func (c *NetConnection) Disconnect() {
|
func (c *NetConnection) Disconnect() {
|
||||||
c.connection.Disconnect()
|
if atomic.AddUint32(&c.isRouterClosed, 1) == 1 {
|
||||||
|
c.router.Close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DequeueInvalidMessage dequeues the next invalid message
|
// DequeueInvalidMessage dequeues the next invalid message
|
||||||
|
@ -100,8 +100,3 @@ func (r *Route) Close() {
|
|||||||
r.closed = true
|
r.closed = true
|
||||||
close(r.channel)
|
close(r.channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsEmpty returns true if the route doesn't have any pending messages
|
|
||||||
func (r *Route) IsEmpty() bool {
|
|
||||||
return len(r.channel) == 0
|
|
||||||
}
|
|
||||||
|
@ -33,8 +33,7 @@ func (c *gRPCConnection) connectionLoops() error {
|
|||||||
|
|
||||||
func (c *gRPCConnection) sendLoop() error {
|
func (c *gRPCConnection) sendLoop() error {
|
||||||
outgoingRoute := c.router.OutgoingRoute()
|
outgoingRoute := c.router.OutgoingRoute()
|
||||||
// Once the connection is closed, the send loop empties the remaining messages in the channel.
|
for c.IsConnected() {
|
||||||
for c.IsConnected() || !outgoingRoute.IsEmpty() {
|
|
||||||
message, err := outgoingRoute.Dequeue()
|
message, err := outgoingRoute.Dequeue()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, routerpkg.ErrRouteClosed) {
|
if errors.Is(err, routerpkg.ErrRouteClosed) {
|
||||||
@ -57,7 +56,6 @@ func (c *gRPCConnection) sendLoop() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1,13 +1,11 @@
|
|||||||
package grpcserver
|
package grpcserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
|
||||||
|
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"net"
|
"net"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
|
|
||||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire"
|
|
||||||
|
|
||||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
|
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@ -93,14 +91,9 @@ func (c *gRPCConnection) Disconnect() {
|
|||||||
close(c.stopChan)
|
close(c.stopChan)
|
||||||
|
|
||||||
if c.isOutbound {
|
if c.isOutbound {
|
||||||
spawn("gRPCConnection.Disconnect-clientStream.CloseSend", func() {
|
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
|
||||||
// Wait a second before closing the stream, to let the send queue to get emptied.
|
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
|
||||||
const finishSendDuration = time.Second
|
log.Debugf("Disconnected from %s", c)
|
||||||
time.Sleep(finishSendDuration)
|
|
||||||
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
|
|
||||||
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
|
|
||||||
log.Debugf("Disconnected from %s", c)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Disconnecting from %s", c)
|
log.Debugf("Disconnecting from %s", c)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user