Improve RPC command issuance (#206)

* removed error handling from util/issue_command.go methods
* integrated the complete error handling with context into util/issue_command.go: buildSignBroadcastTx
* setting the chainID of lib/tx dynamically
* removed obsolete timeout
* removed obsolete TODO message
* improved logging information and clarity of information
* improved readability and debugability of BroadcastTxWithFileLock
* fixed getClientContext bug for chains different from the testnet by removing a hard-coded value
* renamed setConfig to setRPCConfig

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2023-11-29 08:10:34 +01:00 committed by Julian Strobl
parent d5339f331c
commit 6ad235e847
6 changed files with 84 additions and 80 deletions

View File

@ -91,7 +91,7 @@ func getClientContext(address sdk.AccAddress) (clientCtx client.Context, err err
clientCtx = client.Context{
AccountRetriever: authtypes.AccountRetriever{},
BroadcastMode: "sync",
ChainID: "planetmint-testnet-1",
ChainID: GetConfig().ChainID,
Client: wsClient,
Codec: codec,
From: address.String(),
@ -172,9 +172,45 @@ func broadcastTx(clientCtx client.Context, txf tx.Factory, msgs ...sdk.Msg) (bro
broadcastTxResponseJSON = output.String()
return
}
func getSequenceFromFile(seqFile *os.File, filename string) (sequence uint64, err error) {
var sequenceString string
lineCount := int64(0)
scanner := bufio.NewScanner(seqFile)
for scanner.Scan() {
sequenceString = scanner.Text()
lineCount++
}
err = scanner.Err()
if err != nil {
return
}
if lineCount == 0 {
err = errors.New("Sequence file empty " + filename + ": no lines")
return
} else if lineCount != 1 {
err = errors.New("Malformed " + filename + ": wrong number of lines")
return
}
sequence, err = strconv.ParseUint(sequenceString, 10, 64)
if err != nil {
return
}
return
}
func getSequenceFromChain(clientCtx client.Context) (sequence uint64, err error) {
// Get sequence number from chain.
account, err := clientCtx.AccountRetriever.GetAccount(clientCtx, clientCtx.FromAddress)
if err != nil {
return
}
sequence = account.GetSequence()
return
}
// BroadcastTxWithFileLock broadcasts a transaction via gRPC and synchronises requests via a file lock.
func BroadcastTxWithFileLock(address sdk.AccAddress, msgs ...sdk.Msg) (broadcastTxResponseJSON string, err error) {
// open and lock file, if it exists
usr, err := user.Current()
if err != nil {
return
@ -208,64 +244,41 @@ func BroadcastTxWithFileLock(address sdk.AccAddress, msgs ...sdk.Msg) (broadcast
}
}()
var sequenceString string
lineCount := int64(0)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
sequenceString = scanner.Text()
lineCount++
}
err = scanner.Err()
if err != nil {
return
}
// Get sequence number from chain.
// get basic chain information
clientCtx, txf, err := getClientContextAndTxFactory(address)
if err != nil {
return
}
account, err := clientCtx.AccountRetriever.GetAccount(clientCtx, clientCtx.FromAddress)
if err != nil {
sequenceFromFile, errFile := getSequenceFromFile(file, filename)
sequenceFromChain, errChain := getSequenceFromChain(clientCtx)
var sequence uint64
if errFile != nil && errChain != nil {
err = errors.New("unable to determine sequence number")
return
}
sequence := account.GetSequence()
if lineCount == 0 {
// File does not exist yet.
sequenceString = strconv.FormatUint(sequence, 10)
} else if lineCount != 1 {
err = errors.New("Malformed " + filename + ": wrong number of lines")
return
}
sequenceCount, err := strconv.ParseUint(sequenceString, 10, 64)
if err != nil {
return
}
// Sequence number on chain is bigger than in text file.
// Someone manually sent a transaction from our account?
if sequence > sequenceCount {
sequenceCount = sequence
sequence = sequenceFromChain
if sequenceFromFile > sequenceFromChain {
sequence = sequenceFromFile
}
// Set new sequence number
txf = txf.WithSequence(sequenceCount)
txf = txf.WithSequence(sequence)
broadcastTxResponseJSON, err = broadcastTx(clientCtx, txf, msgs...)
if err != nil {
return
}
// Increase counter for next round.
sequenceCount++
sequence++
_, err = file.Seek(0, io.SeekStart)
if err != nil {
return
}
_, err = file.WriteString(strconv.FormatUint(sequenceCount, 10) + "\n")
_, err = file.WriteString(strconv.FormatUint(sequence, 10) + "\n")
return
}

View File

@ -11,70 +11,73 @@ import (
machinetypes "github.com/planetmint/planetmint-go/x/machine/types"
)
func buildSignBroadcastTx(goCtx context.Context, sendingValidatorAddress string, msg sdk.Msg) (err error) {
func setRPCConfig(goCtx context.Context) {
rpcConf := lib.GetConfig()
ctx := sdk.UnwrapSDKContext(goCtx)
rpcConf.SetChainID(ctx.ChainID())
}
func buildSignBroadcastTx(goCtx context.Context, loggingContext string, sendingValidatorAddress string, msg sdk.Msg) {
go func() {
setRPCConfig(goCtx)
ctx := sdk.UnwrapSDKContext(goCtx)
addr := sdk.MustAccAddressFromBech32(sendingValidatorAddress)
txJSON, err := lib.BuildUnsignedTx(addr, msg)
if err != nil {
GetAppLogger().Error(ctx, "build unsigned tx failed: "+err.Error())
GetAppLogger().Error(ctx, loggingContext+" build unsigned tx failed: "+err.Error())
return
}
GetAppLogger().Info(ctx, "broadcast tx: "+txJSON)
GetAppLogger().Info(ctx, loggingContext+" unsigned tx: "+txJSON)
_, err = lib.BroadcastTxWithFileLock(addr, msg)
if err != nil {
GetAppLogger().Error(ctx, "broadcast tx failed: "+err.Error())
GetAppLogger().Error(ctx, loggingContext+" broadcast tx failed: "+err.Error())
return
}
GetAppLogger().Info(ctx, loggingContext+" broadcast tx succeeded")
}()
return
}
func InitRDDLReissuanceProcess(goCtx context.Context, proposerAddress string, txUnsigned string, blockHeight int64) (err error) {
func InitRDDLReissuanceProcess(goCtx context.Context, proposerAddress string, txUnsigned string, blockHeight int64) {
ctx := sdk.UnwrapSDKContext(goCtx)
// get_last_PoPBlockHeight() // TODO: to be read form the upcoming PoP-store
sendingValidatorAddress := config.GetConfig().ValidatorAddress
GetAppLogger().Info(ctx, "create Reissuance Proposal")
GetAppLogger().Info(ctx, "create re-issuance proposal")
msg := daotypes.NewMsgReissueRDDLProposal(sendingValidatorAddress, proposerAddress, txUnsigned, blockHeight)
err = buildSignBroadcastTx(goCtx, sendingValidatorAddress, msg)
return
buildSignBroadcastTx(goCtx, "initializing RDDL re-issuance", sendingValidatorAddress, msg)
}
func SendRDDLReissuanceResult(goCtx context.Context, proposerAddress string, txID string, blockHeight int64) (err error) {
func SendRDDLReissuanceResult(goCtx context.Context, proposerAddress string, txID string, blockHeight int64) {
ctx := sdk.UnwrapSDKContext(goCtx)
sendingValidatorAddress := config.GetConfig().ValidatorAddress
GetAppLogger().Info(ctx, "create Reissuance Result")
GetAppLogger().Info(ctx, "create re-issuance result")
msg := daotypes.NewMsgReissueRDDLResult(sendingValidatorAddress, proposerAddress, txID, blockHeight)
err = buildSignBroadcastTx(goCtx, sendingValidatorAddress, msg)
return
buildSignBroadcastTx(goCtx, "sending the re-issuance result", sendingValidatorAddress, msg)
}
func SendRDDLDistributionRequest(goCtx context.Context, distribution daotypes.DistributionOrder) (err error) {
func SendRDDLDistributionRequest(goCtx context.Context, distribution daotypes.DistributionOrder) {
ctx := sdk.UnwrapSDKContext(goCtx)
sendingValidatorAddress := config.GetConfig().ValidatorAddress
GetAppLogger().Info(ctx, "create Distribution Request")
msg := daotypes.NewMsgDistributionRequest(sendingValidatorAddress, &distribution)
err = buildSignBroadcastTx(goCtx, sendingValidatorAddress, msg)
return
buildSignBroadcastTx(goCtx, "sending the distribution request", sendingValidatorAddress, msg)
}
func SendRDDLDistributionResult(goCtx context.Context, lastPoP string, daoTxID string, invTxID string, popTxID string) (err error) {
func SendRDDLDistributionResult(goCtx context.Context, lastPoP string, daoTxID string, invTxID string, popTxID string) {
ctx := sdk.UnwrapSDKContext(goCtx)
sendingValidatorAddress := config.GetConfig().ValidatorAddress
GetAppLogger().Info(ctx, "create Distribution Result")
iLastPoP, err := strconv.ParseInt(lastPoP, 10, 64)
if err != nil {
ctx.Logger().Error("Distribution Result: preparation failed ", err.Error())
return
}
msg := daotypes.NewMsgDistributionResult(sendingValidatorAddress, iLastPoP, daoTxID, invTxID, popTxID)
err = buildSignBroadcastTx(goCtx, sendingValidatorAddress, msg)
return
buildSignBroadcastTx(goCtx, "send distribution result", sendingValidatorAddress, msg)
}
func SendLiquidAssetRegistration(goCtx context.Context, notarizedAsset machinetypes.LiquidAsset) (err error) {
func SendLiquidAssetRegistration(goCtx context.Context, notarizedAsset machinetypes.LiquidAsset) {
ctx := sdk.UnwrapSDKContext(goCtx)
sendingValidatorAddress := config.GetConfig().ValidatorAddress
GetAppLogger().Info(ctx, "create Liquid Asset Registration")
msg := machinetypes.NewMsgNotarizeLiquidAsset(sendingValidatorAddress, &notarizedAsset)
err = buildSignBroadcastTx(goCtx, sendingValidatorAddress, msg)
return
buildSignBroadcastTx(goCtx, "Liquid Asset Registration:", sendingValidatorAddress, msg)
}

View File

@ -21,15 +21,10 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper)
}
blockHeight := req.Header.GetHeight()
if isPoPHeight(req.Header.GetHeight()) && util.IsValidatorBlockProposer(ctx, proposerAddress) {
// TODO: implement PoP trigger
util.GetAppLogger().Info(ctx, "TODO: implement PoP trigger")
hexProposerAddress := hex.EncodeToString(proposerAddress)
conf := config.GetConfig()
txUnsigned := keeper.GetReissuanceCommand(conf.ReissuanceAsset, blockHeight)
err := util.InitRDDLReissuanceProcess(ctx, hexProposerAddress, txUnsigned, blockHeight)
if err != nil {
util.GetAppLogger().Error(ctx, "error while initializing RDDL issuance", err)
}
util.InitRDDLReissuanceProcess(ctx, hexProposerAddress, txUnsigned, blockHeight)
}
if isDistributionHeight(blockHeight) {
// initialize the distribution message
@ -37,10 +32,7 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper)
if err != nil {
util.GetAppLogger().Error(ctx, "error while computing the RDDL distribution ", err)
}
err = util.SendRDDLDistributionRequest(ctx, distribution)
if err != nil {
util.GetAppLogger().Error(ctx, "sending the distribution request failed")
}
util.SendRDDLDistributionRequest(ctx, distribution)
}
}

View File

@ -32,10 +32,7 @@ func (k msgServer) DistributionRequest(goCtx context.Context, msg *types.MsgDist
msg.Distribution.PopTxID = popTx
msg.Distribution.DaoTxID = daoTx
lastPopString := strconv.FormatInt(msg.Distribution.LastPop, 10)
err = util.SendRDDLDistributionResult(goCtx, lastPopString, daoTx, investorTx, popTx)
if err != nil {
ctx.Logger().Error("Distribution Request: could not send distribution result ", err.Error())
}
util.SendRDDLDistributionResult(goCtx, lastPopString, daoTx, investorTx, popTx)
}
k.StoreDistributionOrder(ctx, *msg.GetDistribution())

View File

@ -14,12 +14,11 @@ func (k msgServer) ReissueRDDLProposal(goCtx context.Context, msg *types.MsgReis
if validResult && msg.Proposer == validatorIdentity {
util.GetAppLogger().Info(ctx, "REISSUE: Asset")
txID, err := util.ReissueAsset(msg.Tx)
if err == nil {
// 3. notarize result by notarizing the liquid tx-id
_ = util.SendRDDLReissuanceResult(goCtx, msg.GetProposer(), txID, msg.GetBlockHeight())
} else {
util.GetAppLogger().Error(ctx, "REISSUE: Asset reissuance failure: "+err.Error())
if err != nil {
util.GetAppLogger().Error(ctx, "REISSUE: Asset reissuance failed: "+err.Error())
}
// 3. notarize result by notarizing the liquid tx-id
util.SendRDDLReissuanceResult(goCtx, msg.GetProposer(), txID, msg.GetBlockHeight())
}
var reissuance types.Reissuance

View File

@ -135,7 +135,7 @@ func (k msgServer) issueMachineNFT(goCtx context.Context, machine *types.Machine
notarizedAsset.MachineID = machine.GetMachineId()
notarizedAsset.MachineAddress = machine.Address
err = util.SendLiquidAssetRegistration(goCtx, notarizedAsset)
util.SendLiquidAssetRegistration(goCtx, notarizedAsset)
return err
}