From ce17348175bfe80e7959cd82826f93c5a8f97d30 Mon Sep 17 00:00:00 2001 From: stasatdaglabs <39559713+stasatdaglabs@users.noreply.github.com> Date: Thu, 12 Aug 2021 14:40:49 +0300 Subject: [PATCH] Limit the amount of inbound RPC connections (#1818) * Limit the amount of inbound RPC connections. * Increment/decrement the right variable. * Implement TestRPCMaxInboundConnections. * Make go vet happy. * Increase RPCMaxInboundConnections to 128. * Set NUM_CLIENTS=128 in the rpc-idle-clients stability test. * Explain why the P2P server has unlimited inbound connections. --- .../server/grpcserver/grpc_server.go | 46 ++++++++++++--- .../netadapter/server/grpcserver/p2pserver.go | 8 ++- .../netadapter/server/grpcserver/rpcserver.go | 5 +- infrastructure/network/rpcclient/rpcclient.go | 27 +++++---- stability-tests/rpc-idle-clients/run/run.sh | 2 +- testing/integration/rpc_test.go | 58 +++++++++++++++++++ 6 files changed, 124 insertions(+), 22 deletions(-) diff --git a/infrastructure/network/netadapter/server/grpcserver/grpc_server.go b/infrastructure/network/netadapter/server/grpcserver/grpc_server.go index 1c445f350..c18a47357 100644 --- a/infrastructure/network/netadapter/server/grpcserver/grpc_server.go +++ b/infrastructure/network/netadapter/server/grpcserver/grpc_server.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/peer" "net" + "sync" "time" ) @@ -17,15 +18,22 @@ type gRPCServer struct { listeningAddresses []string server *grpc.Server name string + + maxInboundConnections int + inboundConnectionCount int + inboundConnectionCountLock *sync.Mutex } // newGRPCServer creates a gRPC server -func newGRPCServer(listeningAddresses []string, maxMessageSize int, name string) *gRPCServer { - log.Debugf("Created new %s GRPC server with maxMessageSize %d", name, maxMessageSize) +func newGRPCServer(listeningAddresses []string, maxMessageSize int, maxInboundConnections int, name string) *gRPCServer { + log.Debugf("Created new %s GRPC server with maxMessageSize %d and maxInboundConnections %d", name, maxMessageSize, maxInboundConnections) return &gRPCServer{ - server: grpc.NewServer(grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize)), - listeningAddresses: listeningAddresses, - name: name, + server: grpc.NewServer(grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize)), + listeningAddresses: listeningAddresses, + name: name, + maxInboundConnections: maxInboundConnections, + inboundConnectionCount: 0, + inboundConnectionCountLock: &sync.Mutex{}, } } @@ -86,6 +94,11 @@ func (s *gRPCServer) SetOnConnectedHandler(onConnectedHandler server.OnConnected } func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStream) error { + err := s.incrementInboundConnectionCountAndLimitIfRequired() + if err != nil { + return err + } + peerInfo, ok := peer.FromContext(ctx) if !ok { return errors.Errorf("Error getting stream peer info from context") @@ -97,7 +110,7 @@ func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStr connection := newConnection(s, tcpAddress, stream, nil) - err := s.onConnectedHandler(connection) + err = s.onConnectedHandler(connection) if err != nil { return err } @@ -105,6 +118,25 @@ func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStr log.Infof("%s Incoming connection from %s", s.name, peerInfo.Addr) <-connection.stopChan - + s.decrementInboundConnectionCount() return nil } + +func (s *gRPCServer) incrementInboundConnectionCountAndLimitIfRequired() error { + s.inboundConnectionCountLock.Lock() + defer s.inboundConnectionCountLock.Unlock() + + if s.maxInboundConnections > 0 && s.inboundConnectionCount == s.maxInboundConnections { + return errors.Errorf("limit of %d inbound connections has been exceeded", s.maxInboundConnections) + } + + s.inboundConnectionCount++ + return nil +} + +func (s *gRPCServer) decrementInboundConnectionCount() { + s.inboundConnectionCountLock.Lock() + defer s.inboundConnectionCountLock.Unlock() + + s.inboundConnectionCount-- +} diff --git a/infrastructure/network/netadapter/server/grpcserver/p2pserver.go b/infrastructure/network/netadapter/server/grpcserver/p2pserver.go index 9b61d9efb..a27f1f10a 100644 --- a/infrastructure/network/netadapter/server/grpcserver/p2pserver.go +++ b/infrastructure/network/netadapter/server/grpcserver/p2pserver.go @@ -20,9 +20,15 @@ type p2pServer struct { const p2pMaxMessageSize = 10 * 1024 * 1024 // 10MB +// p2pMaxInboundConnections is the max amount of inbound connections for the P2P server. +// Note that inbound connections are not limited by the gRPC server. (A value of 0 means +// unlimited inbound connections.) The P2P limiting logic is more applicative, and as such +// is handled in the ConnectionManager instead. +const p2pMaxInboundConnections = 0 + // NewP2PServer creates a new P2PServer func NewP2PServer(listeningAddresses []string) (server.P2PServer, error) { - gRPCServer := newGRPCServer(listeningAddresses, p2pMaxMessageSize, "P2P") + gRPCServer := newGRPCServer(listeningAddresses, p2pMaxMessageSize, p2pMaxInboundConnections, "P2P") p2pServer := &p2pServer{gRPCServer: *gRPCServer} protowire.RegisterP2PServer(gRPCServer.server, p2pServer) return p2pServer, nil diff --git a/infrastructure/network/netadapter/server/grpcserver/rpcserver.go b/infrastructure/network/netadapter/server/grpcserver/rpcserver.go index 233129ecd..516463956 100644 --- a/infrastructure/network/netadapter/server/grpcserver/rpcserver.go +++ b/infrastructure/network/netadapter/server/grpcserver/rpcserver.go @@ -14,9 +14,12 @@ type rpcServer struct { // RPCMaxMessageSize is the max message size for the RPC server to send and receive const RPCMaxMessageSize = 1024 * 1024 * 1024 // 1 GB +// RPCMaxInboundConnections is the max amount of inbound connections for the RPC server +const RPCMaxInboundConnections = 128 + // NewRPCServer creates a new RPCServer func NewRPCServer(listeningAddresses []string) (server.Server, error) { - gRPCServer := newGRPCServer(listeningAddresses, RPCMaxMessageSize, "RPC") + gRPCServer := newGRPCServer(listeningAddresses, RPCMaxMessageSize, RPCMaxInboundConnections, "RPC") rpcServer := &rpcServer{gRPCServer: *gRPCServer} protowire.RegisterRPCServer(gRPCServer.server, rpcServer) return rpcServer, nil diff --git a/infrastructure/network/rpcclient/rpcclient.go b/infrastructure/network/rpcclient/rpcclient.go index 1bfd7b163..13a0ccb85 100644 --- a/infrastructure/network/rpcclient/rpcclient.go +++ b/infrastructure/network/rpcclient/rpcclient.go @@ -18,11 +18,12 @@ const defaultTimeout = 30 * time.Second type RPCClient struct { *grpcclient.GRPCClient - rpcAddress string - rpcRouter *rpcRouter - isConnected uint32 - isClosed uint32 - isReconnecting uint32 + rpcAddress string + rpcRouter *rpcRouter + isConnected uint32 + isClosed uint32 + isReconnecting uint32 + lastDisconnectedTime time.Time timeout time.Duration } @@ -112,14 +113,15 @@ func (c *RPCClient) Reconnect() error { // Attempt to connect until we succeed for { - err := c.connect() - if err == nil { - return nil - } - log.Warnf("Could not automatically reconnect to %s: %s", c.rpcAddress, err) - const retryDelay = 10 * time.Second - log.Warnf("Retrying in %s", retryDelay) + if time.Since(c.lastDisconnectedTime) > retryDelay { + err := c.connect() + if err == nil { + return nil + } + log.Warnf("Could not automatically reconnect to %s: %s", c.rpcAddress, err) + log.Warnf("Retrying in %s", retryDelay) + } time.Sleep(retryDelay) } } @@ -127,6 +129,7 @@ func (c *RPCClient) Reconnect() error { func (c *RPCClient) handleClientDisconnected() { atomic.StoreUint32(&c.isConnected, 0) if atomic.LoadUint32(&c.isClosed) == 0 { + c.lastDisconnectedTime = time.Now() err := c.Reconnect() if err != nil { panic(err) diff --git a/stability-tests/rpc-idle-clients/run/run.sh b/stability-tests/rpc-idle-clients/run/run.sh index 5c1399f68..b21793eed 100755 --- a/stability-tests/rpc-idle-clients/run/run.sh +++ b/stability-tests/rpc-idle-clients/run/run.sh @@ -1,7 +1,7 @@ #!/bin/bash rm -rf /tmp/kaspad-temp -NUM_CLIENTS=1000 +NUM_CLIENTS=128 kaspad --devnet --appdir=/tmp/kaspad-temp --profile=6061 --rpcmaxwebsockets=$NUM_CLIENTS & KASPAD_PID=$! KASPAD_KILLED=0 diff --git a/testing/integration/rpc_test.go b/testing/integration/rpc_test.go index e4bc9ec0b..c3aa1f4a6 100644 --- a/testing/integration/rpc_test.go +++ b/testing/integration/rpc_test.go @@ -1,6 +1,8 @@ package integration import ( + "github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver" + "testing" "time" "github.com/kaspanet/kaspad/infrastructure/network/rpcclient" @@ -23,3 +25,59 @@ func newTestRPCClient(rpcAddress string) (*testRPCClient, error) { RPCClient: rpcClient, }, nil } + +func TestRPCMaxInboundConnections(t *testing.T) { + harness, teardown := setupHarness(t, &harnessParams{ + p2pAddress: p2pAddress1, + rpcAddress: rpcAddress1, + miningAddress: miningAddress1, + miningAddressPrivateKey: miningAddress1PrivateKey, + }) + defer teardown() + + // Close the default RPC client so that it won't interfere with the test + err := harness.rpcClient.Close() + if err != nil { + t.Fatalf("Failed to close the default harness RPCClient: %s", err) + } + + // Connect `RPCMaxInboundConnections` clients. We expect this to succeed immediately + rpcClients := []*testRPCClient{} + doneChan := make(chan error) + go func() { + for i := 0; i < grpcserver.RPCMaxInboundConnections; i++ { + rpcClient, err := newTestRPCClient(harness.rpcAddress) + if err != nil { + doneChan <- err + } + rpcClients = append(rpcClients, rpcClient) + } + doneChan <- nil + }() + select { + case err = <-doneChan: + if err != nil { + t.Fatalf("newTestRPCClient: %s", err) + } + case <-time.After(time.Second): + t.Fatalf("Timeout for connecting %d RPC connections elapsed", grpcserver.RPCMaxInboundConnections) + } + + // Try to connect another client. We expect this to fail + // We set a timeout to account for reconnection mechanisms + go func() { + rpcClient, err := newTestRPCClient(harness.rpcAddress) + if err != nil { + doneChan <- err + } + rpcClients = append(rpcClients, rpcClient) + doneChan <- nil + }() + select { + case err = <-doneChan: + if err == nil { + t.Fatalf("newTestRPCClient unexpectedly succeeded") + } + case <-time.After(time.Second * 15): + } +}