diff --git a/cmd/kaspaminer/client.go b/cmd/kaspaminer/client.go index 5cdc36bcd..4bc157d2e 100644 --- a/cmd/kaspaminer/client.go +++ b/cmd/kaspaminer/client.go @@ -5,42 +5,95 @@ import ( "github.com/kaspanet/kaspad/infrastructure/logger" "github.com/kaspanet/kaspad/infrastructure/network/rpcclient" "github.com/pkg/errors" + "sync" + "sync/atomic" "time" ) const minerTimeout = 10 * time.Second type minerClient struct { - *rpcclient.RPCClient + isReconnecting uint32 + clientLock sync.RWMutex + rpcClient *rpcclient.RPCClient + cfg *configFlags blockAddedNotificationChan chan struct{} } -func newMinerClient(cfg *configFlags) (*minerClient, error) { - rpcAddress, err := cfg.NetParams().NormalizeRPCServerAddress(cfg.RPCServer) - if err != nil { - return nil, err - } - rpcClient, err := rpcclient.NewRPCClient(rpcAddress) - if err != nil { - return nil, err - } - rpcClient.SetTimeout(minerTimeout) - rpcClient.SetLogger(backendLog, logger.LevelTrace) +func (mc *minerClient) safeRPCClient() *rpcclient.RPCClient { + mc.clientLock.RLock() + defer mc.clientLock.RUnlock() + return mc.rpcClient +} - minerClient := &minerClient{ - RPCClient: rpcClient, - blockAddedNotificationChan: make(chan struct{}), +func (mc *minerClient) reconnect() { + swapped := atomic.CompareAndSwapUint32(&mc.isReconnecting, 0, 1) + if !swapped { + return } - err = rpcClient.RegisterForBlockAddedNotifications(func(_ *appmessage.BlockAddedNotificationMessage) { + defer atomic.StoreUint32(&mc.isReconnecting, 0) + + mc.clientLock.Lock() + defer mc.clientLock.Unlock() + + retryDuration := time.Second + const maxRetryDuration = time.Minute + log.Infof("Reconnecting RPC connection") + for { + err := mc.connect() + if err == nil { + return + } + + if retryDuration < time.Minute { + retryDuration *= 2 + } else { + retryDuration = maxRetryDuration + } + + log.Errorf("Got error '%s' while reconnecting. Trying again in %s", err, retryDuration) + time.Sleep(retryDuration) + } +} + +func (mc *minerClient) connect() error { + rpcAddress, err := mc.cfg.NetParams().NormalizeRPCServerAddress(mc.cfg.RPCServer) + if err != nil { + return err + } + mc.rpcClient, err = rpcclient.NewRPCClient(rpcAddress) + if err != nil { + return err + } + mc.rpcClient.SetTimeout(minerTimeout) + mc.rpcClient.SetLogger(backendLog, logger.LevelTrace) + + err = mc.rpcClient.RegisterForBlockAddedNotifications(func(_ *appmessage.BlockAddedNotificationMessage) { select { - case minerClient.blockAddedNotificationChan <- struct{}{}: + case mc.blockAddedNotificationChan <- struct{}{}: default: } }) if err != nil { - return nil, errors.Wrapf(err, "error requesting block-added notifications") + return errors.Wrapf(err, "error requesting block-added notifications") + } + + log.Infof("Connected to %s", rpcAddress) + + return nil +} + +func newMinerClient(cfg *configFlags) (*minerClient, error) { + minerClient := &minerClient{ + cfg: cfg, + blockAddedNotificationChan: make(chan struct{}), + } + + err := minerClient.connect() + if err != nil { + return nil, err } return minerClient, nil diff --git a/cmd/kaspaminer/main.go b/cmd/kaspaminer/main.go index 593451eee..1027ca244 100644 --- a/cmd/kaspaminer/main.go +++ b/cmd/kaspaminer/main.go @@ -39,7 +39,7 @@ func main() { if err != nil { panic(errors.Wrap(err, "error connecting to the RPC server")) } - defer client.Disconnect() + defer client.safeRPCClient().Disconnect() miningAddr, err := util.DecodeAddress(cfg.MiningAddr, cfg.ActiveNetParams.Prefix) if err != nil { diff --git a/cmd/kaspaminer/mineloop.go b/cmd/kaspaminer/mineloop.go index f449f4481..9dca1cd98 100644 --- a/cmd/kaspaminer/mineloop.go +++ b/cmd/kaspaminer/mineloop.go @@ -113,12 +113,13 @@ func logHashRate() { func handleFoundBlock(client *minerClient, block *externalapi.DomainBlock) error { blockHash := consensushashing.BlockHash(block) - log.Infof("Submitting block %s to %s", blockHash, client.Address()) + log.Infof("Submitting block %s to %s", blockHash, client.safeRPCClient().Address()) - rejectReason, err := client.SubmitBlock(block) + rejectReason, err := client.safeRPCClient().SubmitBlock(block) if err != nil { if nativeerrors.Is(err, router.ErrTimeout) { - log.Warnf("Got timeout while submitting block %s to %s: %s", blockHash, client.Address(), err) + log.Warnf("Got timeout while submitting block %s to %s: %s", blockHash, client.safeRPCClient().Address(), err) + client.reconnect() return nil } if rejectReason == appmessage.RejectReasonIsInIBD { @@ -127,7 +128,7 @@ func handleFoundBlock(client *minerClient, block *externalapi.DomainBlock) error time.Sleep(waitTime) return nil } - return errors.Errorf("Error submitting block %s to %s: %s", blockHash, client.Address(), err) + return errors.Wrapf(err, "Error submitting block %s to %s", blockHash, client.safeRPCClient().Address()) } return nil } @@ -186,13 +187,14 @@ func getBlockForMining(mineWhenNotSynced bool) *externalapi.DomainBlock { func templatesLoop(client *minerClient, miningAddr util.Address, errChan chan error) { getBlockTemplate := func() { - template, err := client.GetBlockTemplate(miningAddr.String()) + template, err := client.safeRPCClient().GetBlockTemplate(miningAddr.String()) if nativeerrors.Is(err, router.ErrTimeout) { - log.Warnf("Got timeout while requesting block template from %s: %s", client.Address(), err) + log.Warnf("Got timeout while requesting block template from %s: %s", client.safeRPCClient().Address(), err) + client.reconnect() return } if err != nil { - errChan <- errors.Errorf("Error getting block template from %s: %s", client.Address(), err) + errChan <- errors.Wrapf(err, "Error getting block template from %s", client.safeRPCClient().Address()) return } templatemanager.Set(template)