mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-03-30 15:08:33 +00:00
Fix RPC connections counting (#2026)
* Fix RPC connections counting * show incomming connections count * Use the flag RPCMaxClients instead of the const RPCMaxInboundConnections * Add grpc server name to log message Co-authored-by: Michael Sutton <mikisiton2@gmail.com>
This commit is contained in:
parent
c87e541570
commit
847aafc91f
@ -38,8 +38,9 @@ const (
|
||||
defaultBanDuration = time.Hour * 24
|
||||
defaultBanThreshold = 100
|
||||
//DefaultConnectTimeout is the default connection timeout when dialing
|
||||
DefaultConnectTimeout = time.Second * 30
|
||||
defaultMaxRPCClients = 10
|
||||
DefaultConnectTimeout = time.Second * 30
|
||||
//DefaultMaxRPCClients is the default max number of RPC clients
|
||||
DefaultMaxRPCClients = 128
|
||||
defaultMaxRPCWebsockets = 25
|
||||
defaultMaxRPCConcurrentReqs = 20
|
||||
defaultBlockMaxMass = 10_000_000
|
||||
@ -178,7 +179,7 @@ func defaultFlags() *Flags {
|
||||
MaxInboundPeers: defaultMaxInboundPeers,
|
||||
BanDuration: defaultBanDuration,
|
||||
BanThreshold: defaultBanThreshold,
|
||||
RPCMaxClients: defaultMaxRPCClients,
|
||||
RPCMaxClients: DefaultMaxRPCClients,
|
||||
RPCMaxWebsockets: defaultMaxRPCWebsockets,
|
||||
RPCMaxConcurrentReqs: defaultMaxRPCConcurrentReqs,
|
||||
AppDir: defaultDataDir,
|
||||
|
@ -46,7 +46,7 @@ func NewNetAdapter(cfg *config.Config) (*NetAdapter, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rpcServer, err := grpcserver.NewRPCServer(cfg.RPCListeners)
|
||||
rpcServer, err := grpcserver.NewRPCServer(cfg.RPCListeners, cfg.RPCMaxClients)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -94,10 +94,11 @@ func (s *gRPCServer) SetOnConnectedHandler(onConnectedHandler server.OnConnected
|
||||
}
|
||||
|
||||
func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStream) error {
|
||||
err := s.incrementInboundConnectionCountAndLimitIfRequired()
|
||||
connectionCount, err := s.incrementInboundConnectionCountAndLimitIfRequired()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.decrementInboundConnectionCount()
|
||||
|
||||
peerInfo, ok := peer.FromContext(ctx)
|
||||
if !ok {
|
||||
@ -115,23 +116,23 @@ func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStr
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("%s Incoming connection from %s", s.name, peerInfo.Addr)
|
||||
log.Infof("%s Incoming connection from %s #%d", s.name, peerInfo.Addr, connectionCount)
|
||||
|
||||
<-connection.stopChan
|
||||
s.decrementInboundConnectionCount()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *gRPCServer) incrementInboundConnectionCountAndLimitIfRequired() error {
|
||||
func (s *gRPCServer) incrementInboundConnectionCountAndLimitIfRequired() (int, error) {
|
||||
s.inboundConnectionCountLock.Lock()
|
||||
defer s.inboundConnectionCountLock.Unlock()
|
||||
|
||||
if s.maxInboundConnections > 0 && s.inboundConnectionCount == s.maxInboundConnections {
|
||||
return errors.Errorf("limit of %d inbound connections has been exceeded", s.maxInboundConnections)
|
||||
log.Warnf("Limit of %d %s inbound connections has been exceeded", s.maxInboundConnections, s.name)
|
||||
return s.inboundConnectionCount, errors.Errorf("limit of %d %s inbound connections has been exceeded", s.maxInboundConnections, s.name)
|
||||
}
|
||||
|
||||
s.inboundConnectionCount++
|
||||
return nil
|
||||
return s.inboundConnectionCount, nil
|
||||
}
|
||||
|
||||
func (s *gRPCServer) decrementInboundConnectionCount() {
|
||||
|
@ -14,12 +14,9 @@ type rpcServer struct {
|
||||
// RPCMaxMessageSize is the max message size for the RPC server to send and receive
|
||||
const RPCMaxMessageSize = 1024 * 1024 * 1024 // 1 GB
|
||||
|
||||
// RPCMaxInboundConnections is the max amount of inbound connections for the RPC server
|
||||
const RPCMaxInboundConnections = 128
|
||||
|
||||
// NewRPCServer creates a new RPCServer
|
||||
func NewRPCServer(listeningAddresses []string) (server.Server, error) {
|
||||
gRPCServer := newGRPCServer(listeningAddresses, RPCMaxMessageSize, RPCMaxInboundConnections, "RPC")
|
||||
func NewRPCServer(listeningAddresses []string, rpcMaxInboundConnections int) (server.Server, error) {
|
||||
gRPCServer := newGRPCServer(listeningAddresses, RPCMaxMessageSize, rpcMaxInboundConnections, "RPC")
|
||||
rpcServer := &rpcServer{gRPCServer: *gRPCServer}
|
||||
protowire.RegisterRPCServer(gRPCServer.server, rpcServer)
|
||||
return rpcServer, nil
|
||||
|
@ -1,7 +1,7 @@
|
||||
package integration
|
||||
|
||||
import (
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver"
|
||||
"github.com/kaspanet/kaspad/infrastructure/config"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -45,7 +45,7 @@ func TestRPCMaxInboundConnections(t *testing.T) {
|
||||
rpcClients := []*testRPCClient{}
|
||||
doneChan := make(chan error)
|
||||
go func() {
|
||||
for i := 0; i < grpcserver.RPCMaxInboundConnections; i++ {
|
||||
for i := 0; i < config.DefaultMaxRPCClients; i++ {
|
||||
rpcClient, err := newTestRPCClient(harness.rpcAddress)
|
||||
if err != nil {
|
||||
doneChan <- err
|
||||
@ -60,7 +60,7 @@ func TestRPCMaxInboundConnections(t *testing.T) {
|
||||
t.Fatalf("newTestRPCClient: %s", err)
|
||||
}
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("Timeout for connecting %d RPC connections elapsed", grpcserver.RPCMaxInboundConnections)
|
||||
t.Fatalf("Timeout for connecting %d RPC connections elapsed", config.DefaultMaxRPCClients)
|
||||
}
|
||||
|
||||
// Try to connect another client. We expect this to fail
|
||||
|
Loading…
x
Reference in New Issue
Block a user