diff --git a/apiserver/apimodels/response_types.go b/apiserver/apimodels/response_types.go index db81d0f43..c67031211 100644 --- a/apiserver/apimodels/response_types.go +++ b/apiserver/apimodels/response_types.go @@ -4,8 +4,8 @@ package apimodels type TransactionResponse struct { TransactionHash string `json:"transactionHash"` TransactionID string `json:"transactionId"` - AcceptingBlockHash string `json:"acceptingBlockHash,omitempty"` - AcceptingBlockBlueScore uint64 `json:"acceptingBlockBlueScore,omitempty"` + AcceptingBlockHash *string `json:"acceptingBlockHash,omitempty"` + AcceptingBlockBlueScore *uint64 `json:"acceptingBlockBlueScore,omitempty"` SubnetworkID string `json:"subnetworkId"` LockTime uint64 `json:"lockTime"` Gas uint64 `json:"gas,omitempty"` diff --git a/apiserver/controllers/common.go b/apiserver/controllers/common.go index a379b4828..4b19f68de 100644 --- a/apiserver/controllers/common.go +++ b/apiserver/controllers/common.go @@ -9,18 +9,20 @@ import ( func convertTxDBModelToTxResponse(tx *dbmodels.Transaction) *apimodels.TransactionResponse { txRes := &apimodels.TransactionResponse{ - TransactionHash: tx.TransactionHash, - TransactionID: tx.TransactionID, - AcceptingBlockHash: tx.AcceptingBlock.BlockHash, - AcceptingBlockBlueScore: tx.AcceptingBlock.BlueScore, - SubnetworkID: tx.Subnetwork.SubnetworkID, - LockTime: tx.LockTime, - Gas: tx.Gas, - PayloadHash: tx.PayloadHash, - Payload: hex.EncodeToString(tx.Payload), - Inputs: make([]*apimodels.TransactionInputResponse, len(tx.TransactionInputs)), - Outputs: make([]*apimodels.TransactionOutputResponse, len(tx.TransactionOutputs)), - Mass: tx.Mass, + TransactionHash: tx.TransactionHash, + TransactionID: tx.TransactionID, + SubnetworkID: tx.Subnetwork.SubnetworkID, + LockTime: tx.LockTime, + Gas: tx.Gas, + PayloadHash: tx.PayloadHash, + Payload: hex.EncodeToString(tx.Payload), + Inputs: make([]*apimodels.TransactionInputResponse, len(tx.TransactionInputs)), + Outputs: make([]*apimodels.TransactionOutputResponse, len(tx.TransactionOutputs)), + Mass: tx.Mass, + } + if tx.AcceptingBlock != nil { + txRes.AcceptingBlockHash = &tx.AcceptingBlock.BlockHash + txRes.AcceptingBlockBlueScore = &tx.AcceptingBlock.BlueScore } for i, txOut := range tx.TransactionOutputs { txRes.Outputs[i] = &apimodels.TransactionOutputResponse{ diff --git a/apiserver/controllers/transaction.go b/apiserver/controllers/transaction.go index f9bd6d4d1..0ff775f82 100644 --- a/apiserver/controllers/transaction.go +++ b/apiserver/controllers/transaction.go @@ -89,12 +89,7 @@ func GetTransactionsByAddressHandler(address string, skip uint64, limit uint64) } txs := []*dbmodels.Transaction{} - query := db. - Joins("LEFT JOIN `transaction_outputs` ON `transaction_outputs`.`transaction_id` = `transactions`.`id`"). - Joins("LEFT JOIN `addresses` AS `out_addresses` ON `out_addresses`.`id` = `transaction_outputs`.`address_id`"). - Joins("LEFT JOIN `transaction_inputs` ON `transaction_inputs`.`transaction_id` = `transactions`.`id`"). - Joins("LEFT JOIN `transaction_outputs` AS `inputs_outs` ON `inputs_outs`.`id` = `transaction_inputs`.`previous_transaction_output_id`"). - Joins("LEFT JOIN `addresses` AS `in_addresses` ON `in_addresses`.`id` = `inputs_outs`.`address_id`"). + query := joinTxInputsTxOutputsAndAddresses(db). Where("`out_addresses`.`address` = ?", address). Or("`in_addresses`.`address` = ?", address). Limit(limit). @@ -224,6 +219,15 @@ func GetUTXOsByAddressHandler(address string) (interface{}, error) { return UTXOsResponses, nil } +func joinTxInputsTxOutputsAndAddresses(query *gorm.DB) *gorm.DB { + return query. + Joins("LEFT JOIN `transaction_outputs` ON `transaction_outputs`.`transaction_id` = `transactions`.`id`"). + Joins("LEFT JOIN `addresses` AS `out_addresses` ON `out_addresses`.`id` = `transaction_outputs`.`address_id`"). + Joins("LEFT JOIN `transaction_inputs` ON `transaction_inputs`.`transaction_id` = `transactions`.`id`"). + Joins("LEFT JOIN `transaction_outputs` AS `inputs_outs` ON `inputs_outs`.`id` = `transaction_inputs`.`previous_transaction_output_id`"). + Joins("LEFT JOIN `addresses` AS `in_addresses` ON `in_addresses`.`id` = `inputs_outs`.`address_id`") +} + func addTxPreloadedFields(query *gorm.DB) *gorm.DB { return query.Preload("AcceptingBlock"). Preload("Subnetwork"). @@ -274,3 +278,22 @@ func PostTransaction(requestBody []byte) error { return nil } + +// GetTransactionsByIdsHandler finds transactions by the given transactionIds. +func GetTransactionsByIdsHandler(db *gorm.DB, transactionIds []string) ([]*apimodels.TransactionResponse, error) { + var txs []*dbmodels.Transaction + query := joinTxInputsTxOutputsAndAddresses(db). + Where("`transactions`.`transaction_id` IN (?)", transactionIds) + + dbResult := addTxPreloadedFields(query).Find(&txs) + dbErrors := dbResult.GetErrors() + if httpserverutils.HasDBError(dbErrors) { + return nil, httpserverutils.NewErrorFromDBErrors("Some errors were encountered when loading transactions from the database:", dbErrors) + } + + txResponses := make([]*apimodels.TransactionResponse, len(txs)) + for i, tx := range txs { + txResponses[i] = convertTxDBModelToTxResponse(tx) + } + return txResponses, nil +} diff --git a/apiserver/mqtt/mqtt.go b/apiserver/mqtt/mqtt.go index 80ec4a649..c85cfe1c6 100644 --- a/apiserver/mqtt/mqtt.go +++ b/apiserver/mqtt/mqtt.go @@ -17,6 +17,10 @@ func GetClient() (mqtt.Client, error) { return client, nil } +func isConnected() bool { + return client != nil +} + // Connect initiates a connection to the MQTT server, if defined func Connect() error { cfg := config.ActiveConfig() diff --git a/apiserver/mqtt/transactions.go b/apiserver/mqtt/transactions.go new file mode 100644 index 000000000..3daa15b25 --- /dev/null +++ b/apiserver/mqtt/transactions.go @@ -0,0 +1,77 @@ +package mqtt + +import ( + "encoding/json" + "fmt" + "github.com/daglabs/btcd/apiserver/apimodels" + "github.com/daglabs/btcd/apiserver/controllers" + "github.com/daglabs/btcd/btcjson" + "github.com/jinzhu/gorm" +) + +// PublishTransactionsNotifications publishes notification for each transaction of the given block +func PublishTransactionsNotifications(db *gorm.DB, rawTransactions []btcjson.TxRawResult) error { + if !isConnected() { + return nil + } + + transactionIds := make([]string, len(rawTransactions)) + for i, tx := range rawTransactions { + transactionIds[i] = tx.TxID + } + + transactions, err := controllers.GetTransactionsByIdsHandler(db, transactionIds) + if err != nil { + return err + } + + for _, transaction := range transactions { + err = publishTransactionNotifications(transaction) + if err != nil { + return err + } + } + return nil +} + +func publishTransactionNotifications(transaction *apimodels.TransactionResponse) error { + addresses := uniqueAddressesForTransaction(transaction) + for _, address := range addresses { + err := publishTransactionNotificationForAddress(transaction, address) + if err != nil { + return err + } + } + return nil +} + +func uniqueAddressesForTransaction(transaction *apimodels.TransactionResponse) []string { + addressesMap := make(map[string]struct{}) + addresses := []string{} + for _, output := range transaction.Outputs { + if _, exists := addressesMap[output.Address]; !exists { + addresses = append(addresses, output.Address) + addressesMap[output.Address] = struct{}{} + } + } + return addresses +} + +func publishTransactionNotificationForAddress(transaction *apimodels.TransactionResponse, address string) error { + payload, err := json.Marshal(transaction) + if err != nil { + return err + } + + token := client.Publish(transactionsTopic(address), 2, false, payload) + token.Wait() + if token.Error() != nil { + return token.Error() + } + + return nil +} + +func transactionsTopic(address string) string { + return fmt.Sprintf("transactions/%s", address) +} diff --git a/apiserver/sync.go b/apiserver/sync.go index 3c98736a7..28a8b4acc 100644 --- a/apiserver/sync.go +++ b/apiserver/sync.go @@ -3,6 +3,7 @@ package main import ( "bytes" "encoding/hex" + "github.com/daglabs/btcd/apiserver/mqtt" "strconv" "strings" "time" @@ -298,6 +299,11 @@ func addBlock(client *jsonrpc.Client, rawBlock string, verboseBlock btcjson.GetB return httpserverutils.NewErrorFromDBErrors("failed to update block: ", dbErrors) } + err = mqtt.PublishTransactionsNotifications(dbTx, verboseBlock.RawTx) + if err != nil { + return err + } + dbTx.Commit() // If the block was previously missing, remove it from