Add RPC reconnection to the miner (#1552)

* Add RPC reconnection to the miner

* Fix wrapf

* Change logs
This commit is contained in:
Ori Newman 2021-02-24 10:25:13 +02:00 committed by GitHub
parent fb6c9c8f21
commit 581a12db96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 26 deletions

View File

@ -5,42 +5,95 @@ import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/infrastructure/network/rpcclient"
"github.com/pkg/errors"
"sync"
"sync/atomic"
"time"
)
const minerTimeout = 10 * time.Second
type minerClient struct {
*rpcclient.RPCClient
isReconnecting uint32
clientLock sync.RWMutex
rpcClient *rpcclient.RPCClient
cfg *configFlags
blockAddedNotificationChan chan struct{}
}
func newMinerClient(cfg *configFlags) (*minerClient, error) {
rpcAddress, err := cfg.NetParams().NormalizeRPCServerAddress(cfg.RPCServer)
if err != nil {
return nil, err
}
rpcClient, err := rpcclient.NewRPCClient(rpcAddress)
if err != nil {
return nil, err
}
rpcClient.SetTimeout(minerTimeout)
rpcClient.SetLogger(backendLog, logger.LevelTrace)
minerClient := &minerClient{
RPCClient: rpcClient,
blockAddedNotificationChan: make(chan struct{}),
func (mc *minerClient) safeRPCClient() *rpcclient.RPCClient {
mc.clientLock.RLock()
defer mc.clientLock.RUnlock()
return mc.rpcClient
}
err = rpcClient.RegisterForBlockAddedNotifications(func(_ *appmessage.BlockAddedNotificationMessage) {
func (mc *minerClient) reconnect() {
swapped := atomic.CompareAndSwapUint32(&mc.isReconnecting, 0, 1)
if !swapped {
return
}
defer atomic.StoreUint32(&mc.isReconnecting, 0)
mc.clientLock.Lock()
defer mc.clientLock.Unlock()
retryDuration := time.Second
const maxRetryDuration = time.Minute
log.Infof("Reconnecting RPC connection")
for {
err := mc.connect()
if err == nil {
return
}
if retryDuration < time.Minute {
retryDuration *= 2
} else {
retryDuration = maxRetryDuration
}
log.Errorf("Got error '%s' while reconnecting. Trying again in %s", err, retryDuration)
time.Sleep(retryDuration)
}
}
func (mc *minerClient) connect() error {
rpcAddress, err := mc.cfg.NetParams().NormalizeRPCServerAddress(mc.cfg.RPCServer)
if err != nil {
return err
}
mc.rpcClient, err = rpcclient.NewRPCClient(rpcAddress)
if err != nil {
return err
}
mc.rpcClient.SetTimeout(minerTimeout)
mc.rpcClient.SetLogger(backendLog, logger.LevelTrace)
err = mc.rpcClient.RegisterForBlockAddedNotifications(func(_ *appmessage.BlockAddedNotificationMessage) {
select {
case minerClient.blockAddedNotificationChan <- struct{}{}:
case mc.blockAddedNotificationChan <- struct{}{}:
default:
}
})
if err != nil {
return nil, errors.Wrapf(err, "error requesting block-added notifications")
return errors.Wrapf(err, "error requesting block-added notifications")
}
log.Infof("Connected to %s", rpcAddress)
return nil
}
func newMinerClient(cfg *configFlags) (*minerClient, error) {
minerClient := &minerClient{
cfg: cfg,
blockAddedNotificationChan: make(chan struct{}),
}
err := minerClient.connect()
if err != nil {
return nil, err
}
return minerClient, nil

View File

@ -39,7 +39,7 @@ func main() {
if err != nil {
panic(errors.Wrap(err, "error connecting to the RPC server"))
}
defer client.Disconnect()
defer client.safeRPCClient().Disconnect()
miningAddr, err := util.DecodeAddress(cfg.MiningAddr, cfg.ActiveNetParams.Prefix)
if err != nil {

View File

@ -113,12 +113,13 @@ func logHashRate() {
func handleFoundBlock(client *minerClient, block *externalapi.DomainBlock) error {
blockHash := consensushashing.BlockHash(block)
log.Infof("Submitting block %s to %s", blockHash, client.Address())
log.Infof("Submitting block %s to %s", blockHash, client.safeRPCClient().Address())
rejectReason, err := client.SubmitBlock(block)
rejectReason, err := client.safeRPCClient().SubmitBlock(block)
if err != nil {
if nativeerrors.Is(err, router.ErrTimeout) {
log.Warnf("Got timeout while submitting block %s to %s: %s", blockHash, client.Address(), err)
log.Warnf("Got timeout while submitting block %s to %s: %s", blockHash, client.safeRPCClient().Address(), err)
client.reconnect()
return nil
}
if rejectReason == appmessage.RejectReasonIsInIBD {
@ -127,7 +128,7 @@ func handleFoundBlock(client *minerClient, block *externalapi.DomainBlock) error
time.Sleep(waitTime)
return nil
}
return errors.Errorf("Error submitting block %s to %s: %s", blockHash, client.Address(), err)
return errors.Wrapf(err, "Error submitting block %s to %s", blockHash, client.safeRPCClient().Address())
}
return nil
}
@ -186,13 +187,14 @@ func getBlockForMining(mineWhenNotSynced bool) *externalapi.DomainBlock {
func templatesLoop(client *minerClient, miningAddr util.Address, errChan chan error) {
getBlockTemplate := func() {
template, err := client.GetBlockTemplate(miningAddr.String())
template, err := client.safeRPCClient().GetBlockTemplate(miningAddr.String())
if nativeerrors.Is(err, router.ErrTimeout) {
log.Warnf("Got timeout while requesting block template from %s: %s", client.Address(), err)
log.Warnf("Got timeout while requesting block template from %s: %s", client.safeRPCClient().Address(), err)
client.reconnect()
return
}
if err != nil {
errChan <- errors.Errorf("Error getting block template from %s: %s", client.Address(), err)
errChan <- errors.Wrapf(err, "Error getting block template from %s", client.safeRPCClient().Address())
return
}
templatemanager.Set(template)