Fixes an RPC client goroutine leak by properly closing the underlying connection

This commit is contained in:
msutton 2022-08-09 13:37:27 +03:00
parent d2dc236f07
commit aff0ac0d7d
2 changed files with 11 additions and 2 deletions

View File

@ -22,6 +22,7 @@ type OnDisconnectedHandler func()
// GRPCClient is a gRPC-based RPC client // GRPCClient is a gRPC-based RPC client
type GRPCClient struct { type GRPCClient struct {
stream protowire.RPC_MessageStreamClient stream protowire.RPC_MessageStreamClient
connection *grpc.ClientConn
onErrorHandler OnErrorHandler onErrorHandler OnErrorHandler
onDisconnectedHandler OnDisconnectedHandler onDisconnectedHandler OnDisconnectedHandler
} }
@ -43,7 +44,12 @@ func Connect(address string) (*GRPCClient, error) {
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error getting client stream for %s", address) return nil, errors.Wrapf(err, "error getting client stream for %s", address)
} }
return &GRPCClient{stream: stream}, nil return &GRPCClient{stream: stream, connection: gRPCConnection}, nil
}
// Close closes the underlying grpc connection
func (c *GRPCClient) Close() error {
return c.connection.Close()
} }
// Disconnect disconnects from the RPC server // Disconnect disconnects from the RPC server

View File

@ -143,6 +143,9 @@ func (c *RPCClient) handleClientDisconnected() {
} }
func (c *RPCClient) handleClientError(err error) { func (c *RPCClient) handleClientError(err error) {
if atomic.LoadUint32(&c.isClosed) == 1 {
return
}
log.Warnf("Received error from client: %s", err) log.Warnf("Received error from client: %s", err)
c.handleClientDisconnected() c.handleClientDisconnected()
} }
@ -159,7 +162,7 @@ func (c *RPCClient) Close() error {
return errors.Errorf("Cannot close a client that had already been closed") return errors.Errorf("Cannot close a client that had already been closed")
} }
c.rpcRouter.router.Close() c.rpcRouter.router.Close()
return nil return c.GRPCClient.Close()
} }
// Address returns the address the RPC client connected to // Address returns the address the RPC client connected to