mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-10-14 00:59:33 +00:00
[NOD-426] Publish unaccepted transaction notifications (#490)
* [NOD-382] Add notification for accepted transactions * [NOD-382] Remove print statement * [NOD-426] Publish notifications for unaccepted transactions * [NOD-426] Load DB in controller * [NOD-426] Remove function name from error message * [NOD-426] Add input addresses for transactions notifications * [NOD-426] Remove function name from error message * [NOD-426] Change method name to accepted transactions * [NOD-426] Remove newlines * [NOD-426] Use join instead of separate query * [NOD-426] Remove new line
This commit is contained in:
parent
0c9e55a358
commit
08a4b0dbf6
@ -80,3 +80,29 @@ func GetBlocksHandler(order string, skip uint64, limit uint64) (interface{}, err
|
|||||||
}
|
}
|
||||||
return blockResponses, nil
|
return blockResponses, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetAcceptedTransactionIDsByBlockHashHandler returns an array of transaction IDs for a given block hash
|
||||||
|
func GetAcceptedTransactionIDsByBlockHashHandler(blockHash *daghash.Hash) ([]string, error) {
|
||||||
|
db, err := database.DB()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var transactions []dbmodels.Transaction
|
||||||
|
dbResult := db.
|
||||||
|
Joins("LEFT JOIN `blocks` ON `blocks`.`id` = `transactions`.`accepting_block_id`").
|
||||||
|
Where("`blocks`.`block_hash` = ?", blockHash).
|
||||||
|
Find(&transactions)
|
||||||
|
|
||||||
|
dbErrors := dbResult.GetErrors()
|
||||||
|
if httpserverutils.HasDBError(dbErrors) {
|
||||||
|
return nil, httpserverutils.NewErrorFromDBErrors("Failed to find transactions: ", dbErrors)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]string, len(transactions))
|
||||||
|
for _, transaction := range transactions {
|
||||||
|
result = append(result, transaction.TransactionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
@ -288,7 +288,12 @@ func PostTransaction(requestBody []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetTransactionsByIDsHandler finds transactions by the given transactionIds.
|
// GetTransactionsByIDsHandler finds transactions by the given transactionIds.
|
||||||
func GetTransactionsByIDsHandler(db *gorm.DB, transactionIds []string) ([]*apimodels.TransactionResponse, error) {
|
func GetTransactionsByIDsHandler(transactionIds []string) ([]*apimodels.TransactionResponse, error) {
|
||||||
|
db, err := database.DB()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
var txs []*dbmodels.Transaction
|
var txs []*dbmodels.Transaction
|
||||||
query := joinTxInputsTxOutputsAndAddresses(db).
|
query := joinTxInputsTxOutputsAndAddresses(db).
|
||||||
Where("`transactions`.`transaction_id` IN (?)", transactionIds)
|
Where("`transactions`.`transaction_id` IN (?)", transactionIds)
|
||||||
|
@ -3,15 +3,14 @@ package mqtt
|
|||||||
import (
|
import (
|
||||||
"github.com/daglabs/btcd/apiserver/apimodels"
|
"github.com/daglabs/btcd/apiserver/apimodels"
|
||||||
"github.com/daglabs/btcd/apiserver/controllers"
|
"github.com/daglabs/btcd/apiserver/controllers"
|
||||||
"github.com/daglabs/btcd/apiserver/database"
|
|
||||||
"github.com/daglabs/btcd/btcjson"
|
"github.com/daglabs/btcd/btcjson"
|
||||||
"github.com/daglabs/btcd/rpcclient"
|
"github.com/daglabs/btcd/rpcclient"
|
||||||
"github.com/jinzhu/gorm"
|
"github.com/daglabs/btcd/util/daghash"
|
||||||
"path"
|
"path"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PublishTransactionsNotifications publishes notification for each transaction of the given block
|
// PublishTransactionsNotifications publishes notification for each transaction of the given block
|
||||||
func PublishTransactionsNotifications(db *gorm.DB, rawTransactions []btcjson.TxRawResult) error {
|
func PublishTransactionsNotifications(rawTransactions []btcjson.TxRawResult) error {
|
||||||
if !isConnected() {
|
if !isConnected() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -21,7 +20,7 @@ func PublishTransactionsNotifications(db *gorm.DB, rawTransactions []btcjson.TxR
|
|||||||
transactionIDs[i] = tx.TxID
|
transactionIDs[i] = tx.TxID
|
||||||
}
|
}
|
||||||
|
|
||||||
transactions, err := controllers.GetTransactionsByIDsHandler(db, transactionIDs)
|
transactions, err := controllers.GetTransactionsByIDsHandler(transactionIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -55,6 +54,12 @@ func uniqueAddressesForTransaction(transaction *apimodels.TransactionResponse) [
|
|||||||
addressesMap[output.Address] = struct{}{}
|
addressesMap[output.Address] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for _, input := range transaction.Inputs {
|
||||||
|
if _, exists := addressesMap[input.Address]; !exists {
|
||||||
|
addresses = append(addresses, input.Address)
|
||||||
|
addressesMap[input.Address] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
return addresses
|
return addresses
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,11 +69,6 @@ func publishTransactionNotificationForAddress(transaction *apimodels.Transaction
|
|||||||
|
|
||||||
// PublishAcceptedTransactionsNotifications publishes notification for each accepted transaction of the given chain-block
|
// PublishAcceptedTransactionsNotifications publishes notification for each accepted transaction of the given chain-block
|
||||||
func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.ChainBlock) error {
|
func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.ChainBlock) error {
|
||||||
db, err := database.DB()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, addedChainBlock := range addedChainBlocks {
|
for _, addedChainBlock := range addedChainBlocks {
|
||||||
for _, acceptedBlock := range addedChainBlock.AcceptedBlocks {
|
for _, acceptedBlock := range addedChainBlock.AcceptedBlocks {
|
||||||
transactionIDs := make([]string, len(acceptedBlock.AcceptedTxIDs))
|
transactionIDs := make([]string, len(acceptedBlock.AcceptedTxIDs))
|
||||||
@ -76,7 +76,7 @@ func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.Chai
|
|||||||
transactionIDs[i] = acceptedTxID.String()
|
transactionIDs[i] = acceptedTxID.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
transactions, err := controllers.GetTransactionsByIDsHandler(db, transactionIDs)
|
transactions, err := controllers.GetTransactionsByIDsHandler(transactionIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -92,3 +92,26 @@ func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.Chai
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PublishUnacceptedTransactionsNotifications publishes notification for each unaccepted transaction of the given chain-block
|
||||||
|
func PublishUnacceptedTransactionsNotifications(removedChainHashes []*daghash.Hash) error {
|
||||||
|
for _, removedHash := range removedChainHashes {
|
||||||
|
transactionIDs, err := controllers.GetAcceptedTransactionIDsByBlockHashHandler(removedHash)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
transactions, err := controllers.GetTransactionsByIDsHandler(transactionIDs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, transaction := range transactions {
|
||||||
|
err = publishTransactionNotifications(transaction, "transactions/unaccepted")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -302,7 +302,7 @@ func addBlock(client *jsonrpc.Client, rawBlock string, verboseBlock btcjson.GetB
|
|||||||
return httpserverutils.NewErrorFromDBErrors("failed to update block: ", dbErrors)
|
return httpserverutils.NewErrorFromDBErrors("failed to update block: ", dbErrors)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = mqtt.PublishTransactionsNotifications(dbTx, verboseBlock.RawTx)
|
err = mqtt.PublishTransactionsNotifications(verboseBlock.RawTx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1064,11 +1064,16 @@ func processChainChangedMsgs() error {
|
|||||||
unprocessedChainChangedMessages = append(unprocessedChainChangedMessages, chainChanged)
|
unprocessedChainChangedMessages = append(unprocessedChainChangedMessages, chainChanged)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = mqtt.PublishUnacceptedTransactionsNotifications(chainChanged.RemovedChainBlockHashes)
|
||||||
|
if err != nil {
|
||||||
|
panic(errors.Errorf("Error while publishing unaccepted transactions notifications %s", err))
|
||||||
|
}
|
||||||
|
|
||||||
err = handleChainChangedMsg(chainChanged)
|
err = handleChainChangedMsg(chainChanged)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
pendingChainChangedMsgs = unprocessedChainChangedMessages
|
pendingChainChangedMsgs = unprocessedChainChangedMessages
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user