From 08a4b0dbf6131ce08f4237ebf7d40583e6ca8085 Mon Sep 17 00:00:00 2001 From: Dan Aharoni Date: Tue, 26 Nov 2019 16:59:16 +0200 Subject: [PATCH] [NOD-426] Publish unaccepted transaction notifications (#490) * [NOD-382] Add notification for accepted transactions * [NOD-382] Remove print statement * [NOD-426] Publish notifications for unaccepted transactions * [NOD-426] Load DB in controller * [NOD-426] Remove function name from error message * [NOD-426] Add input addresses for transactions notifications * [NOD-426] Remove function name from error message * [NOD-426] Change method name to accepted transactions * [NOD-426] Remove newlines * [NOD-426] Use join instead of separate query * [NOD-426] Remove new line --- apiserver/controllers/block.go | 26 +++++++++++++++++ apiserver/controllers/transaction.go | 7 ++++- apiserver/mqtt/transactions.go | 43 +++++++++++++++++++++------- apiserver/sync.go | 9 ++++-- 4 files changed, 72 insertions(+), 13 deletions(-) diff --git a/apiserver/controllers/block.go b/apiserver/controllers/block.go index a5282db7e..6d4b148ea 100644 --- a/apiserver/controllers/block.go +++ b/apiserver/controllers/block.go @@ -80,3 +80,29 @@ func GetBlocksHandler(order string, skip uint64, limit uint64) (interface{}, err } return blockResponses, nil } + +// GetAcceptedTransactionIDsByBlockHashHandler returns an array of transaction IDs for a given block hash +func GetAcceptedTransactionIDsByBlockHashHandler(blockHash *daghash.Hash) ([]string, error) { + db, err := database.DB() + if err != nil { + return nil, err + } + + var transactions []dbmodels.Transaction + dbResult := db. + Joins("LEFT JOIN `blocks` ON `blocks`.`id` = `transactions`.`accepting_block_id`"). + Where("`blocks`.`block_hash` = ?", blockHash). + Find(&transactions) + + dbErrors := dbResult.GetErrors() + if httpserverutils.HasDBError(dbErrors) { + return nil, httpserverutils.NewErrorFromDBErrors("Failed to find transactions: ", dbErrors) + } + + result := make([]string, len(transactions)) + for _, transaction := range transactions { + result = append(result, transaction.TransactionID) + } + + return result, nil +} diff --git a/apiserver/controllers/transaction.go b/apiserver/controllers/transaction.go index e4b9f2c8b..73b07c0bc 100644 --- a/apiserver/controllers/transaction.go +++ b/apiserver/controllers/transaction.go @@ -288,7 +288,12 @@ func PostTransaction(requestBody []byte) error { } // GetTransactionsByIDsHandler finds transactions by the given transactionIds. -func GetTransactionsByIDsHandler(db *gorm.DB, transactionIds []string) ([]*apimodels.TransactionResponse, error) { +func GetTransactionsByIDsHandler(transactionIds []string) ([]*apimodels.TransactionResponse, error) { + db, err := database.DB() + if err != nil { + return nil, err + } + var txs []*dbmodels.Transaction query := joinTxInputsTxOutputsAndAddresses(db). Where("`transactions`.`transaction_id` IN (?)", transactionIds) diff --git a/apiserver/mqtt/transactions.go b/apiserver/mqtt/transactions.go index 46a0df113..e17ee117d 100644 --- a/apiserver/mqtt/transactions.go +++ b/apiserver/mqtt/transactions.go @@ -3,15 +3,14 @@ package mqtt import ( "github.com/daglabs/btcd/apiserver/apimodels" "github.com/daglabs/btcd/apiserver/controllers" - "github.com/daglabs/btcd/apiserver/database" "github.com/daglabs/btcd/btcjson" "github.com/daglabs/btcd/rpcclient" - "github.com/jinzhu/gorm" + "github.com/daglabs/btcd/util/daghash" "path" ) // PublishTransactionsNotifications publishes notification for each transaction of the given block -func PublishTransactionsNotifications(db *gorm.DB, rawTransactions []btcjson.TxRawResult) error { +func PublishTransactionsNotifications(rawTransactions []btcjson.TxRawResult) error { if !isConnected() { return nil } @@ -21,7 +20,7 @@ func PublishTransactionsNotifications(db *gorm.DB, rawTransactions []btcjson.TxR transactionIDs[i] = tx.TxID } - transactions, err := controllers.GetTransactionsByIDsHandler(db, transactionIDs) + transactions, err := controllers.GetTransactionsByIDsHandler(transactionIDs) if err != nil { return err } @@ -55,6 +54,12 @@ func uniqueAddressesForTransaction(transaction *apimodels.TransactionResponse) [ addressesMap[output.Address] = struct{}{} } } + for _, input := range transaction.Inputs { + if _, exists := addressesMap[input.Address]; !exists { + addresses = append(addresses, input.Address) + addressesMap[input.Address] = struct{}{} + } + } return addresses } @@ -64,11 +69,6 @@ func publishTransactionNotificationForAddress(transaction *apimodels.Transaction // PublishAcceptedTransactionsNotifications publishes notification for each accepted transaction of the given chain-block func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.ChainBlock) error { - db, err := database.DB() - if err != nil { - return err - } - for _, addedChainBlock := range addedChainBlocks { for _, acceptedBlock := range addedChainBlock.AcceptedBlocks { transactionIDs := make([]string, len(acceptedBlock.AcceptedTxIDs)) @@ -76,7 +76,7 @@ func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.Chai transactionIDs[i] = acceptedTxID.String() } - transactions, err := controllers.GetTransactionsByIDsHandler(db, transactionIDs) + transactions, err := controllers.GetTransactionsByIDsHandler(transactionIDs) if err != nil { return err } @@ -92,3 +92,26 @@ func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.Chai } return nil } + +// PublishUnacceptedTransactionsNotifications publishes notification for each unaccepted transaction of the given chain-block +func PublishUnacceptedTransactionsNotifications(removedChainHashes []*daghash.Hash) error { + for _, removedHash := range removedChainHashes { + transactionIDs, err := controllers.GetAcceptedTransactionIDsByBlockHashHandler(removedHash) + if err != nil { + return err + } + + transactions, err := controllers.GetTransactionsByIDsHandler(transactionIDs) + if err != nil { + return err + } + + for _, transaction := range transactions { + err = publishTransactionNotifications(transaction, "transactions/unaccepted") + if err != nil { + return err + } + } + } + return nil +} diff --git a/apiserver/sync.go b/apiserver/sync.go index 69f0b8905..2d961bf9c 100644 --- a/apiserver/sync.go +++ b/apiserver/sync.go @@ -302,7 +302,7 @@ func addBlock(client *jsonrpc.Client, rawBlock string, verboseBlock btcjson.GetB return httpserverutils.NewErrorFromDBErrors("failed to update block: ", dbErrors) } - err = mqtt.PublishTransactionsNotifications(dbTx, verboseBlock.RawTx) + err = mqtt.PublishTransactionsNotifications(verboseBlock.RawTx) if err != nil { return err } @@ -1064,11 +1064,16 @@ func processChainChangedMsgs() error { unprocessedChainChangedMessages = append(unprocessedChainChangedMessages, chainChanged) continue } + + err = mqtt.PublishUnacceptedTransactionsNotifications(chainChanged.RemovedChainBlockHashes) + if err != nil { + panic(errors.Errorf("Error while publishing unaccepted transactions notifications %s", err)) + } + err = handleChainChangedMsg(chainChanged) if err != nil { return err } - } pendingChainChangedMsgs = unprocessedChainChangedMessages return nil