diff --git a/infrastructure/config/config.go b/infrastructure/config/config.go index c8c8fd024..bba91219b 100644 --- a/infrastructure/config/config.go +++ b/infrastructure/config/config.go @@ -38,8 +38,9 @@ const ( defaultBanDuration = time.Hour * 24 defaultBanThreshold = 100 //DefaultConnectTimeout is the default connection timeout when dialing - DefaultConnectTimeout = time.Second * 30 - defaultMaxRPCClients = 10 + DefaultConnectTimeout = time.Second * 30 + //DefaultMaxRPCClients is the default max number of RPC clients + DefaultMaxRPCClients = 128 defaultMaxRPCWebsockets = 25 defaultMaxRPCConcurrentReqs = 20 defaultBlockMaxMass = 10_000_000 @@ -178,7 +179,7 @@ func defaultFlags() *Flags { MaxInboundPeers: defaultMaxInboundPeers, BanDuration: defaultBanDuration, BanThreshold: defaultBanThreshold, - RPCMaxClients: defaultMaxRPCClients, + RPCMaxClients: DefaultMaxRPCClients, RPCMaxWebsockets: defaultMaxRPCWebsockets, RPCMaxConcurrentReqs: defaultMaxRPCConcurrentReqs, AppDir: defaultDataDir, diff --git a/infrastructure/network/netadapter/netadapter.go b/infrastructure/network/netadapter/netadapter.go index 33fe00df8..0f50c8deb 100644 --- a/infrastructure/network/netadapter/netadapter.go +++ b/infrastructure/network/netadapter/netadapter.go @@ -46,7 +46,7 @@ func NewNetAdapter(cfg *config.Config) (*NetAdapter, error) { if err != nil { return nil, err } - rpcServer, err := grpcserver.NewRPCServer(cfg.RPCListeners) + rpcServer, err := grpcserver.NewRPCServer(cfg.RPCListeners, cfg.RPCMaxClients) if err != nil { return nil, err } diff --git a/infrastructure/network/netadapter/server/grpcserver/grpc_server.go b/infrastructure/network/netadapter/server/grpcserver/grpc_server.go index c18a47357..cb5f9d25c 100644 --- a/infrastructure/network/netadapter/server/grpcserver/grpc_server.go +++ b/infrastructure/network/netadapter/server/grpcserver/grpc_server.go @@ -94,10 +94,11 @@ func (s *gRPCServer) SetOnConnectedHandler(onConnectedHandler server.OnConnected } func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStream) error { - err := s.incrementInboundConnectionCountAndLimitIfRequired() + connectionCount, err := s.incrementInboundConnectionCountAndLimitIfRequired() if err != nil { return err } + defer s.decrementInboundConnectionCount() peerInfo, ok := peer.FromContext(ctx) if !ok { @@ -115,23 +116,23 @@ func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStr return err } - log.Infof("%s Incoming connection from %s", s.name, peerInfo.Addr) + log.Infof("%s Incoming connection from %s #%d", s.name, peerInfo.Addr, connectionCount) <-connection.stopChan - s.decrementInboundConnectionCount() return nil } -func (s *gRPCServer) incrementInboundConnectionCountAndLimitIfRequired() error { +func (s *gRPCServer) incrementInboundConnectionCountAndLimitIfRequired() (int, error) { s.inboundConnectionCountLock.Lock() defer s.inboundConnectionCountLock.Unlock() if s.maxInboundConnections > 0 && s.inboundConnectionCount == s.maxInboundConnections { - return errors.Errorf("limit of %d inbound connections has been exceeded", s.maxInboundConnections) + log.Warnf("Limit of %d inbound connections has been exceeded", s.maxInboundConnections) + return s.inboundConnectionCount, errors.Errorf("limit of %d inbound connections has been exceeded", s.maxInboundConnections) } s.inboundConnectionCount++ - return nil + return s.inboundConnectionCount, nil } func (s *gRPCServer) decrementInboundConnectionCount() { diff --git a/infrastructure/network/netadapter/server/grpcserver/rpcserver.go b/infrastructure/network/netadapter/server/grpcserver/rpcserver.go index 516463956..c80a36c2d 100644 --- a/infrastructure/network/netadapter/server/grpcserver/rpcserver.go +++ b/infrastructure/network/netadapter/server/grpcserver/rpcserver.go @@ -14,12 +14,9 @@ type rpcServer struct { // RPCMaxMessageSize is the max message size for the RPC server to send and receive const RPCMaxMessageSize = 1024 * 1024 * 1024 // 1 GB -// RPCMaxInboundConnections is the max amount of inbound connections for the RPC server -const RPCMaxInboundConnections = 128 - // NewRPCServer creates a new RPCServer -func NewRPCServer(listeningAddresses []string) (server.Server, error) { - gRPCServer := newGRPCServer(listeningAddresses, RPCMaxMessageSize, RPCMaxInboundConnections, "RPC") +func NewRPCServer(listeningAddresses []string, rpcMaxInboundConnections int) (server.Server, error) { + gRPCServer := newGRPCServer(listeningAddresses, RPCMaxMessageSize, rpcMaxInboundConnections, "RPC") rpcServer := &rpcServer{gRPCServer: *gRPCServer} protowire.RegisterRPCServer(gRPCServer.server, rpcServer) return rpcServer, nil diff --git a/testing/integration/rpc_test.go b/testing/integration/rpc_test.go index b0febc0a2..44fb1e40d 100644 --- a/testing/integration/rpc_test.go +++ b/testing/integration/rpc_test.go @@ -1,7 +1,7 @@ package integration import ( - "github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver" + "github.com/kaspanet/kaspad/infrastructure/config" "testing" "time" @@ -45,7 +45,7 @@ func TestRPCMaxInboundConnections(t *testing.T) { rpcClients := []*testRPCClient{} doneChan := make(chan error) go func() { - for i := 0; i < grpcserver.RPCMaxInboundConnections; i++ { + for i := 0; i < config.DefaultMaxRPCClients; i++ { rpcClient, err := newTestRPCClient(harness.rpcAddress) if err != nil { doneChan <- err @@ -60,7 +60,7 @@ func TestRPCMaxInboundConnections(t *testing.T) { t.Fatalf("newTestRPCClient: %s", err) } case <-time.After(time.Second * 5): - t.Fatalf("Timeout for connecting %d RPC connections elapsed", grpcserver.RPCMaxInboundConnections) + t.Fatalf("Timeout for connecting %d RPC connections elapsed", config.DefaultMaxRPCClients) } // Try to connect another client. We expect this to fail