Compare commits

...

6 Commits

Author SHA1 Message Date
msutton
a014c9b47e Merge branch 'grpc-connections' into bump-to-v0.12.2 2022-06-15 16:14:08 +03:00
msutton
2ffc08d433 Bump version to v0.12.2 2022-06-15 16:11:47 +03:00
Michael Sutton
1026b02c46
Merge branch 'dev' into grpc-connections 2022-06-15 16:05:22 +03:00
msutton
feb35165da Use the flag RPCMaxClients instead of the const RPCMaxInboundConnections 2022-06-15 14:45:21 +03:00
Michael Sutton
346efb90c5
Merge branch 'dev' into grpc-connections 2022-06-15 14:06:44 +03:00
biospb
8a3d30853c Fix RPC connections counting
* show incomming connections count
2022-04-17 02:16:45 +03:00
6 changed files with 18 additions and 19 deletions

View File

@ -39,7 +39,8 @@ const (
defaultBanThreshold = 100 defaultBanThreshold = 100
//DefaultConnectTimeout is the default connection timeout when dialing //DefaultConnectTimeout is the default connection timeout when dialing
DefaultConnectTimeout = time.Second * 30 DefaultConnectTimeout = time.Second * 30
defaultMaxRPCClients = 10 //DefaultMaxRPCClients is the default max number of RPC clients
DefaultMaxRPCClients = 128
defaultMaxRPCWebsockets = 25 defaultMaxRPCWebsockets = 25
defaultMaxRPCConcurrentReqs = 20 defaultMaxRPCConcurrentReqs = 20
defaultBlockMaxMass = 10_000_000 defaultBlockMaxMass = 10_000_000
@ -178,7 +179,7 @@ func defaultFlags() *Flags {
MaxInboundPeers: defaultMaxInboundPeers, MaxInboundPeers: defaultMaxInboundPeers,
BanDuration: defaultBanDuration, BanDuration: defaultBanDuration,
BanThreshold: defaultBanThreshold, BanThreshold: defaultBanThreshold,
RPCMaxClients: defaultMaxRPCClients, RPCMaxClients: DefaultMaxRPCClients,
RPCMaxWebsockets: defaultMaxRPCWebsockets, RPCMaxWebsockets: defaultMaxRPCWebsockets,
RPCMaxConcurrentReqs: defaultMaxRPCConcurrentReqs, RPCMaxConcurrentReqs: defaultMaxRPCConcurrentReqs,
AppDir: defaultDataDir, AppDir: defaultDataDir,

View File

@ -46,7 +46,7 @@ func NewNetAdapter(cfg *config.Config) (*NetAdapter, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
rpcServer, err := grpcserver.NewRPCServer(cfg.RPCListeners) rpcServer, err := grpcserver.NewRPCServer(cfg.RPCListeners, cfg.RPCMaxClients)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -94,10 +94,11 @@ func (s *gRPCServer) SetOnConnectedHandler(onConnectedHandler server.OnConnected
} }
func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStream) error { func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStream) error {
err := s.incrementInboundConnectionCountAndLimitIfRequired() connectionCount, err := s.incrementInboundConnectionCountAndLimitIfRequired()
if err != nil { if err != nil {
return err return err
} }
defer s.decrementInboundConnectionCount()
peerInfo, ok := peer.FromContext(ctx) peerInfo, ok := peer.FromContext(ctx)
if !ok { if !ok {
@ -115,23 +116,23 @@ func (s *gRPCServer) handleInboundConnection(ctx context.Context, stream grpcStr
return err return err
} }
log.Infof("%s Incoming connection from %s", s.name, peerInfo.Addr) log.Infof("%s Incoming connection from %s #%d", s.name, peerInfo.Addr, connectionCount)
<-connection.stopChan <-connection.stopChan
s.decrementInboundConnectionCount()
return nil return nil
} }
func (s *gRPCServer) incrementInboundConnectionCountAndLimitIfRequired() error { func (s *gRPCServer) incrementInboundConnectionCountAndLimitIfRequired() (int, error) {
s.inboundConnectionCountLock.Lock() s.inboundConnectionCountLock.Lock()
defer s.inboundConnectionCountLock.Unlock() defer s.inboundConnectionCountLock.Unlock()
if s.maxInboundConnections > 0 && s.inboundConnectionCount == s.maxInboundConnections { if s.maxInboundConnections > 0 && s.inboundConnectionCount == s.maxInboundConnections {
return errors.Errorf("limit of %d inbound connections has been exceeded", s.maxInboundConnections) log.Warnf("Limit of %d inbound connections has been exceeded", s.maxInboundConnections)
return s.inboundConnectionCount, errors.Errorf("limit of %d inbound connections has been exceeded", s.maxInboundConnections)
} }
s.inboundConnectionCount++ s.inboundConnectionCount++
return nil return s.inboundConnectionCount, nil
} }
func (s *gRPCServer) decrementInboundConnectionCount() { func (s *gRPCServer) decrementInboundConnectionCount() {

View File

@ -14,12 +14,9 @@ type rpcServer struct {
// RPCMaxMessageSize is the max message size for the RPC server to send and receive // RPCMaxMessageSize is the max message size for the RPC server to send and receive
const RPCMaxMessageSize = 1024 * 1024 * 1024 // 1 GB const RPCMaxMessageSize = 1024 * 1024 * 1024 // 1 GB
// RPCMaxInboundConnections is the max amount of inbound connections for the RPC server
const RPCMaxInboundConnections = 128
// NewRPCServer creates a new RPCServer // NewRPCServer creates a new RPCServer
func NewRPCServer(listeningAddresses []string) (server.Server, error) { func NewRPCServer(listeningAddresses []string, rpcMaxInboundConnections int) (server.Server, error) {
gRPCServer := newGRPCServer(listeningAddresses, RPCMaxMessageSize, RPCMaxInboundConnections, "RPC") gRPCServer := newGRPCServer(listeningAddresses, RPCMaxMessageSize, rpcMaxInboundConnections, "RPC")
rpcServer := &rpcServer{gRPCServer: *gRPCServer} rpcServer := &rpcServer{gRPCServer: *gRPCServer}
protowire.RegisterRPCServer(gRPCServer.server, rpcServer) protowire.RegisterRPCServer(gRPCServer.server, rpcServer)
return rpcServer, nil return rpcServer, nil

View File

@ -1,7 +1,7 @@
package integration package integration
import ( import (
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver" "github.com/kaspanet/kaspad/infrastructure/config"
"testing" "testing"
"time" "time"
@ -45,7 +45,7 @@ func TestRPCMaxInboundConnections(t *testing.T) {
rpcClients := []*testRPCClient{} rpcClients := []*testRPCClient{}
doneChan := make(chan error) doneChan := make(chan error)
go func() { go func() {
for i := 0; i < grpcserver.RPCMaxInboundConnections; i++ { for i := 0; i < config.DefaultMaxRPCClients; i++ {
rpcClient, err := newTestRPCClient(harness.rpcAddress) rpcClient, err := newTestRPCClient(harness.rpcAddress)
if err != nil { if err != nil {
doneChan <- err doneChan <- err
@ -60,7 +60,7 @@ func TestRPCMaxInboundConnections(t *testing.T) {
t.Fatalf("newTestRPCClient: %s", err) t.Fatalf("newTestRPCClient: %s", err)
} }
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
t.Fatalf("Timeout for connecting %d RPC connections elapsed", grpcserver.RPCMaxInboundConnections) t.Fatalf("Timeout for connecting %d RPC connections elapsed", config.DefaultMaxRPCClients)
} }
// Try to connect another client. We expect this to fail // Try to connect another client. We expect this to fail

View File

@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
const ( const (
appMajor uint = 0 appMajor uint = 0
appMinor uint = 12 appMinor uint = 12
appPatch uint = 1 appPatch uint = 2
) )
// appBuild is defined as a variable so it can be overridden during the build // appBuild is defined as a variable so it can be overridden during the build