diff --git a/cmd/kaspaminer/client.go b/cmd/kaspaminer/client.go index 4bc157d2e..370518886 100644 --- a/cmd/kaspaminer/client.go +++ b/cmd/kaspaminer/client.go @@ -5,72 +5,32 @@ import ( "github.com/kaspanet/kaspad/infrastructure/logger" "github.com/kaspanet/kaspad/infrastructure/network/rpcclient" "github.com/pkg/errors" - "sync" - "sync/atomic" "time" ) const minerTimeout = 10 * time.Second type minerClient struct { - isReconnecting uint32 - clientLock sync.RWMutex - rpcClient *rpcclient.RPCClient + *rpcclient.RPCClient cfg *configFlags blockAddedNotificationChan chan struct{} } -func (mc *minerClient) safeRPCClient() *rpcclient.RPCClient { - mc.clientLock.RLock() - defer mc.clientLock.RUnlock() - return mc.rpcClient -} - -func (mc *minerClient) reconnect() { - swapped := atomic.CompareAndSwapUint32(&mc.isReconnecting, 0, 1) - if !swapped { - return - } - - defer atomic.StoreUint32(&mc.isReconnecting, 0) - - mc.clientLock.Lock() - defer mc.clientLock.Unlock() - - retryDuration := time.Second - const maxRetryDuration = time.Minute - log.Infof("Reconnecting RPC connection") - for { - err := mc.connect() - if err == nil { - return - } - - if retryDuration < time.Minute { - retryDuration *= 2 - } else { - retryDuration = maxRetryDuration - } - - log.Errorf("Got error '%s' while reconnecting. Trying again in %s", err, retryDuration) - time.Sleep(retryDuration) - } -} - func (mc *minerClient) connect() error { rpcAddress, err := mc.cfg.NetParams().NormalizeRPCServerAddress(mc.cfg.RPCServer) if err != nil { return err } - mc.rpcClient, err = rpcclient.NewRPCClient(rpcAddress) + rpcClient, err := rpcclient.NewRPCClient(rpcAddress) if err != nil { return err } - mc.rpcClient.SetTimeout(minerTimeout) - mc.rpcClient.SetLogger(backendLog, logger.LevelTrace) + mc.RPCClient = rpcClient + mc.SetTimeout(minerTimeout) + mc.SetLogger(backendLog, logger.LevelTrace) - err = mc.rpcClient.RegisterForBlockAddedNotifications(func(_ *appmessage.BlockAddedNotificationMessage) { + err = mc.RegisterForBlockAddedNotifications(func(_ *appmessage.BlockAddedNotificationMessage) { select { case mc.blockAddedNotificationChan <- struct{}{}: default: diff --git a/cmd/kaspaminer/main.go b/cmd/kaspaminer/main.go index c7fb0cf9f..00300ef10 100644 --- a/cmd/kaspaminer/main.go +++ b/cmd/kaspaminer/main.go @@ -40,7 +40,7 @@ func main() { if err != nil { panic(errors.Wrap(err, "error connecting to the RPC server")) } - defer client.safeRPCClient().Disconnect() + defer client.Disconnect() miningAddr, err := util.DecodeAddress(cfg.MiningAddr, cfg.ActiveNetParams.Prefix) if err != nil { diff --git a/cmd/kaspaminer/mineloop.go b/cmd/kaspaminer/mineloop.go index a15e80c1b..6ece17892 100644 --- a/cmd/kaspaminer/mineloop.go +++ b/cmd/kaspaminer/mineloop.go @@ -114,13 +114,17 @@ func logHashRate() { func handleFoundBlock(client *minerClient, block *externalapi.DomainBlock) error { blockHash := consensushashing.BlockHash(block) - log.Infof("Submitting block %s to %s", blockHash, client.safeRPCClient().Address()) + log.Infof("Submitting block %s to %s", blockHash, client.Address()) - rejectReason, err := client.safeRPCClient().SubmitBlock(block) + rejectReason, err := client.SubmitBlock(block) if err != nil { if nativeerrors.Is(err, router.ErrTimeout) { - log.Warnf("Got timeout while submitting block %s to %s: %s", blockHash, client.safeRPCClient().Address(), err) - client.reconnect() + log.Warnf("Got timeout while submitting block %s to %s: %s", blockHash, client.Address(), err) + return client.Reconnect() + } + if nativeerrors.Is(err, router.ErrRouteClosed) { + log.Debugf("Got route is closed while requesting block template from %s. "+ + "The client is most likely reconnecting", client.Address()) return nil } if rejectReason == appmessage.RejectReasonIsInIBD { @@ -129,7 +133,7 @@ func handleFoundBlock(client *minerClient, block *externalapi.DomainBlock) error time.Sleep(waitTime) return nil } - return errors.Wrapf(err, "Error submitting block %s to %s", blockHash, client.safeRPCClient().Address()) + return errors.Wrapf(err, "Error submitting block %s to %s", blockHash, client.Address()) } return nil } @@ -188,19 +192,27 @@ func getBlockForMining(mineWhenNotSynced bool) *externalapi.DomainBlock { func templatesLoop(client *minerClient, miningAddr util.Address, errChan chan error) { getBlockTemplate := func() { - template, err := client.safeRPCClient().GetBlockTemplate(miningAddr.String()) + template, err := client.GetBlockTemplate(miningAddr.String()) if nativeerrors.Is(err, router.ErrTimeout) { - log.Warnf("Got timeout while requesting block template from %s: %s", client.safeRPCClient().Address(), err) - client.reconnect() + log.Warnf("Got timeout while requesting block template from %s: %s", client.Address(), err) + reconnectErr := client.Reconnect() + if reconnectErr != nil { + errChan <- reconnectErr + } + return + } + if nativeerrors.Is(err, router.ErrRouteClosed) { + log.Debugf("Got route is closed while requesting block template from %s. "+ + "The client is most likely reconnecting", client.Address()) return } if err != nil { - errChan <- errors.Wrapf(err, "Error getting block template from %s", client.safeRPCClient().Address()) + errChan <- errors.Wrapf(err, "Error getting block template from %s", client.Address()) return } err = templatemanager.Set(template) if err != nil { - errChan <- errors.Wrapf(err, "Error setting block template from %s", client.safeRPCClient().Address()) + errChan <- errors.Wrapf(err, "Error setting block template from %s", client.Address()) return } } diff --git a/infrastructure/network/rpcclient/grpcclient/grpcclient.go b/infrastructure/network/rpcclient/grpcclient/grpcclient.go index 1ded402ee..e15731aa3 100644 --- a/infrastructure/network/rpcclient/grpcclient/grpcclient.go +++ b/infrastructure/network/rpcclient/grpcclient/grpcclient.go @@ -16,10 +16,14 @@ import ( // OnErrorHandler defines a handler function for when errors occur type OnErrorHandler func(err error) +// OnDisconnectedHandler defines a handler function for when the client disconnected +type OnDisconnectedHandler func() + // GRPCClient is a gRPC-based RPC client type GRPCClient struct { - stream protowire.RPC_MessageStreamClient - onErrorHandler OnErrorHandler + stream protowire.RPC_MessageStreamClient + onErrorHandler OnErrorHandler + onDisconnectedHandler OnDisconnectedHandler } // Connect connects to the RPC server with the given address @@ -52,6 +56,11 @@ func (c *GRPCClient) SetOnErrorHandler(onErrorHandler OnErrorHandler) { c.onErrorHandler = onErrorHandler } +// SetOnDisconnectedHandler sets the client's onDisconnectedHandler +func (c *GRPCClient) SetOnDisconnectedHandler(onDisconnectedHandler OnDisconnectedHandler) { + c.onDisconnectedHandler = onDisconnectedHandler +} + // AttachRouter attaches the given router to the client and starts // sending/receiving messages via it func (c *GRPCClient) AttachRouter(router *router.Router) { @@ -103,6 +112,9 @@ func (c *GRPCClient) receive() (appmessage.Message, error) { func (c *GRPCClient) handleError(err error) { if errors.Is(err, io.EOF) { + if c.onDisconnectedHandler != nil { + c.onDisconnectedHandler() + } return } if errors.Is(err, router.ErrRouteClosed) { diff --git a/infrastructure/network/rpcclient/rpcclient.go b/infrastructure/network/rpcclient/rpcclient.go index 3911a8751..8c614017f 100644 --- a/infrastructure/network/rpcclient/rpcclient.go +++ b/infrastructure/network/rpcclient/rpcclient.go @@ -7,6 +7,7 @@ import ( "github.com/kaspanet/kaspad/infrastructure/network/rpcclient/grpcclient" "github.com/kaspanet/kaspad/util/panics" "github.com/pkg/errors" + "sync/atomic" "time" ) @@ -16,32 +17,112 @@ const defaultTimeout = 30 * time.Second type RPCClient struct { *grpcclient.GRPCClient - rpcAddress string - rpcRouter *rpcRouter + rpcAddress string + rpcRouter *rpcRouter + isConnected uint32 + isClosed uint32 + isReconnecting uint32 timeout time.Duration } // NewRPCClient creates a new RPC client func NewRPCClient(rpcAddress string) (*RPCClient, error) { - rpcClient, err := grpcclient.Connect(rpcAddress) - if err != nil { - return nil, errors.Wrapf(err, "error connecting to address %s", rpcAddress) + rpcClient := &RPCClient{ + rpcAddress: rpcAddress, + timeout: defaultTimeout, } + err := rpcClient.connect() + if err != nil { + return nil, err + } + return rpcClient, nil +} + +func (c *RPCClient) connect() error { + rpcClient, err := grpcclient.Connect(c.rpcAddress) + if err != nil { + return errors.Wrapf(err, "error connecting to address %s", c.rpcAddress) + } + rpcClient.SetOnDisconnectedHandler(c.handleClientDisconnected) + rpcClient.SetOnErrorHandler(c.handleClientError) rpcRouter, err := buildRPCRouter() if err != nil { - return nil, errors.Wrapf(err, "error creating the RPC router") + return errors.Wrapf(err, "error creating the RPC router") } + + atomic.StoreUint32(&c.isConnected, 1) rpcClient.AttachRouter(rpcRouter.router) - log.Infof("Connected to server %s", rpcAddress) + c.GRPCClient = rpcClient + c.rpcRouter = rpcRouter - return &RPCClient{ - GRPCClient: rpcClient, - rpcAddress: rpcAddress, - rpcRouter: rpcRouter, - timeout: defaultTimeout, - }, nil + log.Infof("Connected to %s", c.rpcAddress) + return nil +} + +func (c *RPCClient) disconnect() error { + c.rpcRouter.router.Close() + err := c.GRPCClient.Disconnect() + if err != nil { + return err + } + log.Infof("Disconnected from %s", c.rpcAddress) + return nil +} + +// Reconnect forces the client to attempt to reconnect to the address +// this client initially was connected to +func (c *RPCClient) Reconnect() error { + if atomic.LoadUint32(&c.isClosed) == 1 { + return errors.Errorf("Cannot reconnect from a closed client") + } + + // Protect against multiple threads attempting to reconnect at the same time + swapped := atomic.CompareAndSwapUint32(&c.isReconnecting, 0, 1) + if !swapped { + // Already reconnecting + return nil + } + defer atomic.StoreUint32(&c.isReconnecting, 0) + + log.Warnf("Attempting to reconnect to %s", c.rpcAddress) + + // Disconnect if we're connected + if atomic.LoadUint32(&c.isConnected) == 1 { + err := c.disconnect() + if err != nil { + return err + } + } + + // Attempt to connect until we succeed + for { + err := c.connect() + if err == nil { + return nil + } + log.Warnf("Could not automatically reconnect to %s: %s", c.rpcAddress, err) + + const retryDelay = 10 * time.Second + log.Warnf("Retrying in %s", retryDelay) + time.Sleep(retryDelay) + } +} + +func (c *RPCClient) handleClientDisconnected() { + atomic.StoreUint32(&c.isConnected, 0) + if atomic.LoadUint32(&c.isClosed) == 0 { + err := c.Reconnect() + if err != nil { + panic(err) + } + } +} + +func (c *RPCClient) handleClientError(err error) { + log.Warnf("Received error from client: %s", err) + c.handleClientDisconnected() } // SetTimeout sets the timeout by which to wait for RPC responses @@ -50,8 +131,13 @@ func (c *RPCClient) SetTimeout(timeout time.Duration) { } // Close closes the RPC client -func (c *RPCClient) Close() { +func (c *RPCClient) Close() error { + swapped := atomic.CompareAndSwapUint32(&c.isClosed, 0, 1) + if !swapped { + return errors.Errorf("Cannot close a client that had already been closed") + } c.rpcRouter.router.Close() + return nil } // Address returns the address the RPC client connected to