Implement reconnection logic within the RPC client (#1643)

* Add a reconnect mechanism to RPCClient.

* Fix Reconnect().

* Connect the internal reconnection logic to the miner reconnection logic.

* Rename shouldReconnect to isClosed.

* Move safe reconnection logic from the miner to rpcclient.

* Remove sleep from HandleSubmitBlock.

* Properly handle client errors and only disconnect if we're already connected.

* Make go lint happy.

Co-authored-by: Ori Newman <orinewman1@gmail.com>
This commit is contained in:
stasatdaglabs 2021-04-05 13:57:28 +03:00 committed by GitHub
parent 86ba80a091
commit 7ad8ce521c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 143 additions and 73 deletions

View File

@ -5,72 +5,32 @@ import (
"github.com/kaspanet/kaspad/infrastructure/logger" "github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/infrastructure/network/rpcclient" "github.com/kaspanet/kaspad/infrastructure/network/rpcclient"
"github.com/pkg/errors" "github.com/pkg/errors"
"sync"
"sync/atomic"
"time" "time"
) )
const minerTimeout = 10 * time.Second const minerTimeout = 10 * time.Second
type minerClient struct { type minerClient struct {
isReconnecting uint32 *rpcclient.RPCClient
clientLock sync.RWMutex
rpcClient *rpcclient.RPCClient
cfg *configFlags cfg *configFlags
blockAddedNotificationChan chan struct{} blockAddedNotificationChan chan struct{}
} }
func (mc *minerClient) safeRPCClient() *rpcclient.RPCClient {
mc.clientLock.RLock()
defer mc.clientLock.RUnlock()
return mc.rpcClient
}
func (mc *minerClient) reconnect() {
swapped := atomic.CompareAndSwapUint32(&mc.isReconnecting, 0, 1)
if !swapped {
return
}
defer atomic.StoreUint32(&mc.isReconnecting, 0)
mc.clientLock.Lock()
defer mc.clientLock.Unlock()
retryDuration := time.Second
const maxRetryDuration = time.Minute
log.Infof("Reconnecting RPC connection")
for {
err := mc.connect()
if err == nil {
return
}
if retryDuration < time.Minute {
retryDuration *= 2
} else {
retryDuration = maxRetryDuration
}
log.Errorf("Got error '%s' while reconnecting. Trying again in %s", err, retryDuration)
time.Sleep(retryDuration)
}
}
func (mc *minerClient) connect() error { func (mc *minerClient) connect() error {
rpcAddress, err := mc.cfg.NetParams().NormalizeRPCServerAddress(mc.cfg.RPCServer) rpcAddress, err := mc.cfg.NetParams().NormalizeRPCServerAddress(mc.cfg.RPCServer)
if err != nil { if err != nil {
return err return err
} }
mc.rpcClient, err = rpcclient.NewRPCClient(rpcAddress) rpcClient, err := rpcclient.NewRPCClient(rpcAddress)
if err != nil { if err != nil {
return err return err
} }
mc.rpcClient.SetTimeout(minerTimeout) mc.RPCClient = rpcClient
mc.rpcClient.SetLogger(backendLog, logger.LevelTrace) mc.SetTimeout(minerTimeout)
mc.SetLogger(backendLog, logger.LevelTrace)
err = mc.rpcClient.RegisterForBlockAddedNotifications(func(_ *appmessage.BlockAddedNotificationMessage) { err = mc.RegisterForBlockAddedNotifications(func(_ *appmessage.BlockAddedNotificationMessage) {
select { select {
case mc.blockAddedNotificationChan <- struct{}{}: case mc.blockAddedNotificationChan <- struct{}{}:
default: default:

View File

@ -40,7 +40,7 @@ func main() {
if err != nil { if err != nil {
panic(errors.Wrap(err, "error connecting to the RPC server")) panic(errors.Wrap(err, "error connecting to the RPC server"))
} }
defer client.safeRPCClient().Disconnect() defer client.Disconnect()
miningAddr, err := util.DecodeAddress(cfg.MiningAddr, cfg.ActiveNetParams.Prefix) miningAddr, err := util.DecodeAddress(cfg.MiningAddr, cfg.ActiveNetParams.Prefix)
if err != nil { if err != nil {

View File

@ -114,13 +114,17 @@ func logHashRate() {
func handleFoundBlock(client *minerClient, block *externalapi.DomainBlock) error { func handleFoundBlock(client *minerClient, block *externalapi.DomainBlock) error {
blockHash := consensushashing.BlockHash(block) blockHash := consensushashing.BlockHash(block)
log.Infof("Submitting block %s to %s", blockHash, client.safeRPCClient().Address()) log.Infof("Submitting block %s to %s", blockHash, client.Address())
rejectReason, err := client.safeRPCClient().SubmitBlock(block) rejectReason, err := client.SubmitBlock(block)
if err != nil { if err != nil {
if nativeerrors.Is(err, router.ErrTimeout) { if nativeerrors.Is(err, router.ErrTimeout) {
log.Warnf("Got timeout while submitting block %s to %s: %s", blockHash, client.safeRPCClient().Address(), err) log.Warnf("Got timeout while submitting block %s to %s: %s", blockHash, client.Address(), err)
client.reconnect() return client.Reconnect()
}
if nativeerrors.Is(err, router.ErrRouteClosed) {
log.Debugf("Got route is closed while requesting block template from %s. "+
"The client is most likely reconnecting", client.Address())
return nil return nil
} }
if rejectReason == appmessage.RejectReasonIsInIBD { if rejectReason == appmessage.RejectReasonIsInIBD {
@ -129,7 +133,7 @@ func handleFoundBlock(client *minerClient, block *externalapi.DomainBlock) error
time.Sleep(waitTime) time.Sleep(waitTime)
return nil return nil
} }
return errors.Wrapf(err, "Error submitting block %s to %s", blockHash, client.safeRPCClient().Address()) return errors.Wrapf(err, "Error submitting block %s to %s", blockHash, client.Address())
} }
return nil return nil
} }
@ -188,19 +192,27 @@ func getBlockForMining(mineWhenNotSynced bool) *externalapi.DomainBlock {
func templatesLoop(client *minerClient, miningAddr util.Address, errChan chan error) { func templatesLoop(client *minerClient, miningAddr util.Address, errChan chan error) {
getBlockTemplate := func() { getBlockTemplate := func() {
template, err := client.safeRPCClient().GetBlockTemplate(miningAddr.String()) template, err := client.GetBlockTemplate(miningAddr.String())
if nativeerrors.Is(err, router.ErrTimeout) { if nativeerrors.Is(err, router.ErrTimeout) {
log.Warnf("Got timeout while requesting block template from %s: %s", client.safeRPCClient().Address(), err) log.Warnf("Got timeout while requesting block template from %s: %s", client.Address(), err)
client.reconnect() reconnectErr := client.Reconnect()
if reconnectErr != nil {
errChan <- reconnectErr
}
return
}
if nativeerrors.Is(err, router.ErrRouteClosed) {
log.Debugf("Got route is closed while requesting block template from %s. "+
"The client is most likely reconnecting", client.Address())
return return
} }
if err != nil { if err != nil {
errChan <- errors.Wrapf(err, "Error getting block template from %s", client.safeRPCClient().Address()) errChan <- errors.Wrapf(err, "Error getting block template from %s", client.Address())
return return
} }
err = templatemanager.Set(template) err = templatemanager.Set(template)
if err != nil { if err != nil {
errChan <- errors.Wrapf(err, "Error setting block template from %s", client.safeRPCClient().Address()) errChan <- errors.Wrapf(err, "Error setting block template from %s", client.Address())
return return
} }
} }

View File

@ -16,10 +16,14 @@ import (
// OnErrorHandler defines a handler function for when errors occur // OnErrorHandler defines a handler function for when errors occur
type OnErrorHandler func(err error) type OnErrorHandler func(err error)
// OnDisconnectedHandler defines a handler function for when the client disconnected
type OnDisconnectedHandler func()
// GRPCClient is a gRPC-based RPC client // GRPCClient is a gRPC-based RPC client
type GRPCClient struct { type GRPCClient struct {
stream protowire.RPC_MessageStreamClient stream protowire.RPC_MessageStreamClient
onErrorHandler OnErrorHandler onErrorHandler OnErrorHandler
onDisconnectedHandler OnDisconnectedHandler
} }
// Connect connects to the RPC server with the given address // Connect connects to the RPC server with the given address
@ -52,6 +56,11 @@ func (c *GRPCClient) SetOnErrorHandler(onErrorHandler OnErrorHandler) {
c.onErrorHandler = onErrorHandler c.onErrorHandler = onErrorHandler
} }
// SetOnDisconnectedHandler sets the client's onDisconnectedHandler
func (c *GRPCClient) SetOnDisconnectedHandler(onDisconnectedHandler OnDisconnectedHandler) {
c.onDisconnectedHandler = onDisconnectedHandler
}
// AttachRouter attaches the given router to the client and starts // AttachRouter attaches the given router to the client and starts
// sending/receiving messages via it // sending/receiving messages via it
func (c *GRPCClient) AttachRouter(router *router.Router) { func (c *GRPCClient) AttachRouter(router *router.Router) {
@ -103,6 +112,9 @@ func (c *GRPCClient) receive() (appmessage.Message, error) {
func (c *GRPCClient) handleError(err error) { func (c *GRPCClient) handleError(err error) {
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
if c.onDisconnectedHandler != nil {
c.onDisconnectedHandler()
}
return return
} }
if errors.Is(err, router.ErrRouteClosed) { if errors.Is(err, router.ErrRouteClosed) {

View File

@ -7,6 +7,7 @@ import (
"github.com/kaspanet/kaspad/infrastructure/network/rpcclient/grpcclient" "github.com/kaspanet/kaspad/infrastructure/network/rpcclient/grpcclient"
"github.com/kaspanet/kaspad/util/panics" "github.com/kaspanet/kaspad/util/panics"
"github.com/pkg/errors" "github.com/pkg/errors"
"sync/atomic"
"time" "time"
) )
@ -16,32 +17,112 @@ const defaultTimeout = 30 * time.Second
type RPCClient struct { type RPCClient struct {
*grpcclient.GRPCClient *grpcclient.GRPCClient
rpcAddress string rpcAddress string
rpcRouter *rpcRouter rpcRouter *rpcRouter
isConnected uint32
isClosed uint32
isReconnecting uint32
timeout time.Duration timeout time.Duration
} }
// NewRPCClient creates a new RPC client // NewRPCClient creates a new RPC client
func NewRPCClient(rpcAddress string) (*RPCClient, error) { func NewRPCClient(rpcAddress string) (*RPCClient, error) {
rpcClient, err := grpcclient.Connect(rpcAddress) rpcClient := &RPCClient{
if err != nil { rpcAddress: rpcAddress,
return nil, errors.Wrapf(err, "error connecting to address %s", rpcAddress) timeout: defaultTimeout,
} }
err := rpcClient.connect()
if err != nil {
return nil, err
}
return rpcClient, nil
}
func (c *RPCClient) connect() error {
rpcClient, err := grpcclient.Connect(c.rpcAddress)
if err != nil {
return errors.Wrapf(err, "error connecting to address %s", c.rpcAddress)
}
rpcClient.SetOnDisconnectedHandler(c.handleClientDisconnected)
rpcClient.SetOnErrorHandler(c.handleClientError)
rpcRouter, err := buildRPCRouter() rpcRouter, err := buildRPCRouter()
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error creating the RPC router") return errors.Wrapf(err, "error creating the RPC router")
} }
atomic.StoreUint32(&c.isConnected, 1)
rpcClient.AttachRouter(rpcRouter.router) rpcClient.AttachRouter(rpcRouter.router)
log.Infof("Connected to server %s", rpcAddress) c.GRPCClient = rpcClient
c.rpcRouter = rpcRouter
return &RPCClient{ log.Infof("Connected to %s", c.rpcAddress)
GRPCClient: rpcClient, return nil
rpcAddress: rpcAddress, }
rpcRouter: rpcRouter,
timeout: defaultTimeout, func (c *RPCClient) disconnect() error {
}, nil c.rpcRouter.router.Close()
err := c.GRPCClient.Disconnect()
if err != nil {
return err
}
log.Infof("Disconnected from %s", c.rpcAddress)
return nil
}
// Reconnect forces the client to attempt to reconnect to the address
// this client initially was connected to
func (c *RPCClient) Reconnect() error {
if atomic.LoadUint32(&c.isClosed) == 1 {
return errors.Errorf("Cannot reconnect from a closed client")
}
// Protect against multiple threads attempting to reconnect at the same time
swapped := atomic.CompareAndSwapUint32(&c.isReconnecting, 0, 1)
if !swapped {
// Already reconnecting
return nil
}
defer atomic.StoreUint32(&c.isReconnecting, 0)
log.Warnf("Attempting to reconnect to %s", c.rpcAddress)
// Disconnect if we're connected
if atomic.LoadUint32(&c.isConnected) == 1 {
err := c.disconnect()
if err != nil {
return err
}
}
// Attempt to connect until we succeed
for {
err := c.connect()
if err == nil {
return nil
}
log.Warnf("Could not automatically reconnect to %s: %s", c.rpcAddress, err)
const retryDelay = 10 * time.Second
log.Warnf("Retrying in %s", retryDelay)
time.Sleep(retryDelay)
}
}
func (c *RPCClient) handleClientDisconnected() {
atomic.StoreUint32(&c.isConnected, 0)
if atomic.LoadUint32(&c.isClosed) == 0 {
err := c.Reconnect()
if err != nil {
panic(err)
}
}
}
func (c *RPCClient) handleClientError(err error) {
log.Warnf("Received error from client: %s", err)
c.handleClientDisconnected()
} }
// SetTimeout sets the timeout by which to wait for RPC responses // SetTimeout sets the timeout by which to wait for RPC responses
@ -50,8 +131,13 @@ func (c *RPCClient) SetTimeout(timeout time.Duration) {
} }
// Close closes the RPC client // Close closes the RPC client
func (c *RPCClient) Close() { func (c *RPCClient) Close() error {
swapped := atomic.CompareAndSwapUint32(&c.isClosed, 0, 1)
if !swapped {
return errors.Errorf("Cannot close a client that had already been closed")
}
c.rpcRouter.router.Close() c.rpcRouter.router.Close()
return nil
} }
// Address returns the address the RPC client connected to // Address returns the address the RPC client connected to