Fix RPC client memory/goroutine leak (#2122)

* Showcase the RPC client memory leak

* Fixes an RPC client goroutine leak by properly closing the underlying connection
This commit is contained in:
Michael Sutton 2022-08-09 16:30:24 +03:00 committed by GitHub
parent 715cb3b1ac
commit 9ee409afaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 2 deletions

View File

@ -22,6 +22,7 @@ type OnDisconnectedHandler func()
// GRPCClient is a gRPC-based RPC client
type GRPCClient struct {
stream protowire.RPC_MessageStreamClient
connection *grpc.ClientConn
onErrorHandler OnErrorHandler
onDisconnectedHandler OnDisconnectedHandler
}
@ -43,7 +44,12 @@ func Connect(address string) (*GRPCClient, error) {
if err != nil {
return nil, errors.Wrapf(err, "error getting client stream for %s", address)
}
return &GRPCClient{stream: stream}, nil
return &GRPCClient{stream: stream, connection: gRPCConnection}, nil
}
// Close closes the underlying grpc connection
func (c *GRPCClient) Close() error {
return c.connection.Close()
}
// Disconnect disconnects from the RPC server

View File

@ -143,6 +143,9 @@ func (c *RPCClient) handleClientDisconnected() {
}
func (c *RPCClient) handleClientError(err error) {
if atomic.LoadUint32(&c.isClosed) == 1 {
return
}
log.Warnf("Received error from client: %s", err)
c.handleClientDisconnected()
}
@ -159,7 +162,7 @@ func (c *RPCClient) Close() error {
return errors.Errorf("Cannot close a client that had already been closed")
}
c.rpcRouter.router.Close()
return nil
return c.GRPCClient.Close()
}
// Address returns the address the RPC client connected to

View File

@ -2,6 +2,7 @@ package integration
import (
"github.com/kaspanet/kaspad/infrastructure/config"
"runtime"
"testing"
"time"
@ -26,6 +27,37 @@ func newTestRPCClient(rpcAddress string) (*testRPCClient, error) {
}, nil
}
func connectAndClose(rpcAddress string) error {
client, err := rpcclient.NewRPCClient(rpcAddress)
if err != nil {
return err
}
defer client.Close()
return nil
}
func TestRPCClientGoroutineLeak(t *testing.T) {
_, teardown := setupHarness(t, &harnessParams{
p2pAddress: p2pAddress1,
rpcAddress: rpcAddress1,
miningAddress: miningAddress1,
miningAddressPrivateKey: miningAddress1PrivateKey,
})
defer teardown()
numGoroutinesBefore := runtime.NumGoroutine()
for i := 1; i < 100; i++ {
err := connectAndClose(rpcAddress1)
if err != nil {
t.Fatalf("Failed to set up an RPC client: %s", err)
}
time.Sleep(10 * time.Millisecond)
if runtime.NumGoroutine() > numGoroutinesBefore+10 {
t.Fatalf("Number of goroutines is increasing for each RPC client open (%d -> %d), which indicates a memory leak",
numGoroutinesBefore, runtime.NumGoroutine())
}
}
}
func TestRPCMaxInboundConnections(t *testing.T) {
harness, teardown := setupHarness(t, &harnessParams{
p2pAddress: p2pAddress1,