mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-03-30 15:08:33 +00:00
RPC SubmitTransaction: Dequeue old responses from previous requests (#2262)
This commit is contained in:
parent
d2453f8e7b
commit
6b38bf7069
@ -28,7 +28,8 @@ func HandleSubmitTransaction(context *rpccontext.Context, _ *router.Router, requ
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Rejected transaction %s: %s", transactionID, err)
|
log.Debugf("Rejected transaction %s: %s", transactionID, err)
|
||||||
errorMessage := &appmessage.SubmitTransactionResponseMessage{}
|
// Return the ID also in the case of error, so that clients can match the response to the correct transaction submit request
|
||||||
|
errorMessage := appmessage.NewSubmitTransactionResponseMessage(transactionID.String())
|
||||||
errorMessage.Error = appmessage.RPCErrorf("Rejected transaction %s: %s", transactionID, err)
|
errorMessage.Error = appmessage.RPCErrorf("Rejected transaction %s: %s", transactionID, err)
|
||||||
return errorMessage, nil
|
return errorMessage, nil
|
||||||
}
|
}
|
||||||
|
@ -2,14 +2,16 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/kaspanet/kaspad/app/appmessage"
|
"github.com/kaspanet/kaspad/app/appmessage"
|
||||||
"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
|
"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
|
||||||
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
|
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
|
||||||
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet/serialization"
|
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet/serialization"
|
||||||
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
|
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
|
||||||
|
"github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing"
|
||||||
"github.com/kaspanet/kaspad/infrastructure/network/rpcclient"
|
"github.com/kaspanet/kaspad/infrastructure/network/rpcclient"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *server) Broadcast(_ context.Context, request *pb.BroadcastRequest) (*pb.BroadcastResponse, error) {
|
func (s *server) Broadcast(_ context.Context, request *pb.BroadcastRequest) (*pb.BroadcastResponse, error) {
|
||||||
@ -59,7 +61,7 @@ func (s *server) broadcast(transactions [][]byte, isDomain bool) ([]string, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func sendTransaction(client *rpcclient.RPCClient, tx *externalapi.DomainTransaction) (string, error) {
|
func sendTransaction(client *rpcclient.RPCClient, tx *externalapi.DomainTransaction) (string, error) {
|
||||||
submitTransactionResponse, err := client.SubmitTransaction(appmessage.DomainTransactionToRPCTransaction(tx), false)
|
submitTransactionResponse, err := client.SubmitTransaction(appmessage.DomainTransactionToRPCTransaction(tx), consensushashing.TransactionID(tx).String(), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errors.Wrapf(err, "error submitting transaction")
|
return "", errors.Wrapf(err, "error submitting transaction")
|
||||||
}
|
}
|
||||||
|
@ -1,23 +1,44 @@
|
|||||||
package rpcclient
|
package rpcclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/kaspanet/kaspad/app/appmessage"
|
"github.com/kaspanet/kaspad/app/appmessage"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SubmitTransaction sends an RPC request respective to the function's name and returns the RPC server's response
|
// SubmitTransaction sends an RPC request respective to the function's name and returns the RPC server's response
|
||||||
func (c *RPCClient) SubmitTransaction(transaction *appmessage.RPCTransaction, allowOrphan bool) (*appmessage.SubmitTransactionResponseMessage, error) {
|
func (c *RPCClient) SubmitTransaction(transaction *appmessage.RPCTransaction, transactionID string, allowOrphan bool) (*appmessage.SubmitTransactionResponseMessage, error) {
|
||||||
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewSubmitTransactionRequestMessage(transaction, allowOrphan))
|
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewSubmitTransactionRequestMessage(transaction, allowOrphan))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
response, err := c.route(appmessage.CmdSubmitTransactionResponseMessage).DequeueWithTimeout(c.timeout)
|
for {
|
||||||
if err != nil {
|
response, err := c.route(appmessage.CmdSubmitTransactionResponseMessage).DequeueWithTimeout(c.timeout)
|
||||||
return nil, err
|
if err != nil {
|
||||||
}
|
return nil, err
|
||||||
submitTransactionResponse := response.(*appmessage.SubmitTransactionResponseMessage)
|
}
|
||||||
if submitTransactionResponse.Error != nil {
|
submitTransactionResponse := response.(*appmessage.SubmitTransactionResponseMessage)
|
||||||
return nil, c.convertRPCError(submitTransactionResponse.Error)
|
// Match the response to the expected ID. If they are different it means we got an old response which we
|
||||||
}
|
// previously timed-out on, so we log and continue waiting for the correct current response.
|
||||||
|
if submitTransactionResponse.TransactionID != transactionID {
|
||||||
|
if submitTransactionResponse.Error != nil {
|
||||||
|
// A non-updated Kaspad might return an empty ID in the case of error, so in
|
||||||
|
// such a case we fallback to checking if the error contains the expected ID
|
||||||
|
if submitTransactionResponse.TransactionID != "" || !strings.Contains(submitTransactionResponse.Error.Message, transactionID) {
|
||||||
|
log.Warnf("SubmitTransaction: received an error response for previous request: %s", submitTransactionResponse.Error)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
return submitTransactionResponse, nil
|
} else {
|
||||||
|
log.Warnf("SubmitTransaction: received a successful response for previous request with ID %s",
|
||||||
|
submitTransactionResponse.TransactionID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if submitTransactionResponse.Error != nil {
|
||||||
|
return nil, c.convertRPCError(submitTransactionResponse.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return submitTransactionResponse, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ func submitAnAmountOfTransactionsToTheMempool(t *testing.T, rpcClient *rpcclient
|
|||||||
|
|
||||||
for i, transaction := range transactions {
|
for i, transaction := range transactions {
|
||||||
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
|
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
|
||||||
_, err := rpcClient.SubmitTransaction(rpcTransaction, false)
|
_, err := rpcClient.SubmitTransaction(rpcTransaction, consensushashing.TransactionID(transaction).String(), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ignoreOrphanRejects && strings.Contains(err.Error(), "orphan") {
|
if ignoreOrphanRejects && strings.Contains(err.Error(), "orphan") {
|
||||||
continue
|
continue
|
||||||
|
@ -53,7 +53,7 @@ func TestTxRelay(t *testing.T) {
|
|||||||
msgTx := generateTx(t, secondBlock.Transactions[transactionhelper.CoinbaseTransactionIndex], payer, payee)
|
msgTx := generateTx(t, secondBlock.Transactions[transactionhelper.CoinbaseTransactionIndex], payer, payee)
|
||||||
domainTransaction := appmessage.MsgTxToDomainTransaction(msgTx)
|
domainTransaction := appmessage.MsgTxToDomainTransaction(msgTx)
|
||||||
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(domainTransaction)
|
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(domainTransaction)
|
||||||
response, err := payer.rpcClient.SubmitTransaction(rpcTransaction, false)
|
response, err := payer.rpcClient.SubmitTransaction(rpcTransaction, consensushashing.TransactionID(domainTransaction).String(), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error submitting transaction: %+v", err)
|
t.Fatalf("Error submitting transaction: %+v", err)
|
||||||
}
|
}
|
||||||
|
@ -88,8 +88,8 @@ func TestUTXOIndex(t *testing.T) {
|
|||||||
// Submit a few transactions that spends some UTXOs
|
// Submit a few transactions that spends some UTXOs
|
||||||
const transactionAmountToSpend = 5
|
const transactionAmountToSpend = 5
|
||||||
for i := 0; i < transactionAmountToSpend; i++ {
|
for i := 0; i < transactionAmountToSpend; i++ {
|
||||||
rpcTransaction := buildTransactionForUTXOIndexTest(t, notificationEntries[i])
|
rpcTransaction, transactionID := buildTransactionForUTXOIndexTest(t, notificationEntries[i])
|
||||||
_, err = kaspad.rpcClient.SubmitTransaction(rpcTransaction, false)
|
_, err = kaspad.rpcClient.SubmitTransaction(rpcTransaction, transactionID, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error submitting transaction: %s", err)
|
t.Fatalf("Error submitting transaction: %s", err)
|
||||||
}
|
}
|
||||||
@ -171,7 +171,7 @@ func TestUTXOIndex(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildTransactionForUTXOIndexTest(t *testing.T, entry *appmessage.UTXOsByAddressesEntry) *appmessage.RPCTransaction {
|
func buildTransactionForUTXOIndexTest(t *testing.T, entry *appmessage.UTXOsByAddressesEntry) (*appmessage.RPCTransaction, string) {
|
||||||
transactionIDBytes, err := hex.DecodeString(entry.Outpoint.TransactionID)
|
transactionIDBytes, err := hex.DecodeString(entry.Outpoint.TransactionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error decoding transaction ID: %s", err)
|
t.Fatalf("Error decoding transaction ID: %s", err)
|
||||||
@ -224,5 +224,5 @@ func buildTransactionForUTXOIndexTest(t *testing.T, entry *appmessage.UTXOsByAdd
|
|||||||
msgTx.TxIn[0].SignatureScript = signatureScript
|
msgTx.TxIn[0].SignatureScript = signatureScript
|
||||||
|
|
||||||
domainTransaction := appmessage.MsgTxToDomainTransaction(msgTx)
|
domainTransaction := appmessage.MsgTxToDomainTransaction(msgTx)
|
||||||
return appmessage.DomainTransactionToRPCTransaction(domainTransaction)
|
return appmessage.DomainTransactionToRPCTransaction(domainTransaction), consensushashing.TransactionID(domainTransaction).String()
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user