[NOD-381] Send transaction notifications to MQTT (#483)

* [NOD-381] Publish transaction messages to MQTT

* [NOD-381] Remove redundant variable

* [NOD-381] Send payload as string

* [NOD-381] Add Error handling

* [NOD-381] Respond with TransactionResponse

* [NOD-381] Use transactionResponse for notifications

* [NOD-381] Move code to appropriate places

* [NOD-381] Pass raw block instead of txId

* [NOD-381] Add comments to public functions

* [NOD-381] Remove print statement

* [NOD-381] Pass transaction instead of block; Use pointers so default will be nil;

* [NOD-381] Use pointers so value could be nil

* [NOD-381] Change variable name

* [NOD-381] Set QoS to 2

* [NOD-381] Move isConnected to MQTT, so client won't have to worry about it; General code refactors;
This commit is contained in:
Dan Aharoni 2019-11-24 10:53:09 +02:00 committed by stasatdaglabs
parent af39e96e3e
commit 28ee6a8026
6 changed files with 132 additions and 20 deletions

View File

@ -4,8 +4,8 @@ package apimodels
type TransactionResponse struct {
TransactionHash string `json:"transactionHash"`
TransactionID string `json:"transactionId"`
AcceptingBlockHash string `json:"acceptingBlockHash,omitempty"`
AcceptingBlockBlueScore uint64 `json:"acceptingBlockBlueScore,omitempty"`
AcceptingBlockHash *string `json:"acceptingBlockHash,omitempty"`
AcceptingBlockBlueScore *uint64 `json:"acceptingBlockBlueScore,omitempty"`
SubnetworkID string `json:"subnetworkId"`
LockTime uint64 `json:"lockTime"`
Gas uint64 `json:"gas,omitempty"`

View File

@ -9,18 +9,20 @@ import (
func convertTxDBModelToTxResponse(tx *dbmodels.Transaction) *apimodels.TransactionResponse {
txRes := &apimodels.TransactionResponse{
TransactionHash: tx.TransactionHash,
TransactionID: tx.TransactionID,
AcceptingBlockHash: tx.AcceptingBlock.BlockHash,
AcceptingBlockBlueScore: tx.AcceptingBlock.BlueScore,
SubnetworkID: tx.Subnetwork.SubnetworkID,
LockTime: tx.LockTime,
Gas: tx.Gas,
PayloadHash: tx.PayloadHash,
Payload: hex.EncodeToString(tx.Payload),
Inputs: make([]*apimodels.TransactionInputResponse, len(tx.TransactionInputs)),
Outputs: make([]*apimodels.TransactionOutputResponse, len(tx.TransactionOutputs)),
Mass: tx.Mass,
TransactionHash: tx.TransactionHash,
TransactionID: tx.TransactionID,
SubnetworkID: tx.Subnetwork.SubnetworkID,
LockTime: tx.LockTime,
Gas: tx.Gas,
PayloadHash: tx.PayloadHash,
Payload: hex.EncodeToString(tx.Payload),
Inputs: make([]*apimodels.TransactionInputResponse, len(tx.TransactionInputs)),
Outputs: make([]*apimodels.TransactionOutputResponse, len(tx.TransactionOutputs)),
Mass: tx.Mass,
}
if tx.AcceptingBlock != nil {
txRes.AcceptingBlockHash = &tx.AcceptingBlock.BlockHash
txRes.AcceptingBlockBlueScore = &tx.AcceptingBlock.BlueScore
}
for i, txOut := range tx.TransactionOutputs {
txRes.Outputs[i] = &apimodels.TransactionOutputResponse{

View File

@ -89,12 +89,7 @@ func GetTransactionsByAddressHandler(address string, skip uint64, limit uint64)
}
txs := []*dbmodels.Transaction{}
query := db.
Joins("LEFT JOIN `transaction_outputs` ON `transaction_outputs`.`transaction_id` = `transactions`.`id`").
Joins("LEFT JOIN `addresses` AS `out_addresses` ON `out_addresses`.`id` = `transaction_outputs`.`address_id`").
Joins("LEFT JOIN `transaction_inputs` ON `transaction_inputs`.`transaction_id` = `transactions`.`id`").
Joins("LEFT JOIN `transaction_outputs` AS `inputs_outs` ON `inputs_outs`.`id` = `transaction_inputs`.`previous_transaction_output_id`").
Joins("LEFT JOIN `addresses` AS `in_addresses` ON `in_addresses`.`id` = `inputs_outs`.`address_id`").
query := joinTxInputsTxOutputsAndAddresses(db).
Where("`out_addresses`.`address` = ?", address).
Or("`in_addresses`.`address` = ?", address).
Limit(limit).
@ -224,6 +219,15 @@ func GetUTXOsByAddressHandler(address string) (interface{}, error) {
return UTXOsResponses, nil
}
func joinTxInputsTxOutputsAndAddresses(query *gorm.DB) *gorm.DB {
return query.
Joins("LEFT JOIN `transaction_outputs` ON `transaction_outputs`.`transaction_id` = `transactions`.`id`").
Joins("LEFT JOIN `addresses` AS `out_addresses` ON `out_addresses`.`id` = `transaction_outputs`.`address_id`").
Joins("LEFT JOIN `transaction_inputs` ON `transaction_inputs`.`transaction_id` = `transactions`.`id`").
Joins("LEFT JOIN `transaction_outputs` AS `inputs_outs` ON `inputs_outs`.`id` = `transaction_inputs`.`previous_transaction_output_id`").
Joins("LEFT JOIN `addresses` AS `in_addresses` ON `in_addresses`.`id` = `inputs_outs`.`address_id`")
}
func addTxPreloadedFields(query *gorm.DB) *gorm.DB {
return query.Preload("AcceptingBlock").
Preload("Subnetwork").
@ -274,3 +278,22 @@ func PostTransaction(requestBody []byte) error {
return nil
}
// GetTransactionsByIdsHandler finds transactions by the given transactionIds.
func GetTransactionsByIdsHandler(db *gorm.DB, transactionIds []string) ([]*apimodels.TransactionResponse, error) {
var txs []*dbmodels.Transaction
query := joinTxInputsTxOutputsAndAddresses(db).
Where("`transactions`.`transaction_id` IN (?)", transactionIds)
dbResult := addTxPreloadedFields(query).Find(&txs)
dbErrors := dbResult.GetErrors()
if httpserverutils.HasDBError(dbErrors) {
return nil, httpserverutils.NewErrorFromDBErrors("Some errors were encountered when loading transactions from the database:", dbErrors)
}
txResponses := make([]*apimodels.TransactionResponse, len(txs))
for i, tx := range txs {
txResponses[i] = convertTxDBModelToTxResponse(tx)
}
return txResponses, nil
}

View File

@ -17,6 +17,10 @@ func GetClient() (mqtt.Client, error) {
return client, nil
}
func isConnected() bool {
return client != nil
}
// Connect initiates a connection to the MQTT server, if defined
func Connect() error {
cfg := config.ActiveConfig()

View File

@ -0,0 +1,77 @@
package mqtt
import (
"encoding/json"
"fmt"
"github.com/daglabs/btcd/apiserver/apimodels"
"github.com/daglabs/btcd/apiserver/controllers"
"github.com/daglabs/btcd/btcjson"
"github.com/jinzhu/gorm"
)
// PublishTransactionsNotifications publishes notification for each transaction of the given block
func PublishTransactionsNotifications(db *gorm.DB, rawTransactions []btcjson.TxRawResult) error {
if !isConnected() {
return nil
}
transactionIds := make([]string, len(rawTransactions))
for i, tx := range rawTransactions {
transactionIds[i] = tx.TxID
}
transactions, err := controllers.GetTransactionsByIdsHandler(db, transactionIds)
if err != nil {
return err
}
for _, transaction := range transactions {
err = publishTransactionNotifications(transaction)
if err != nil {
return err
}
}
return nil
}
func publishTransactionNotifications(transaction *apimodels.TransactionResponse) error {
addresses := uniqueAddressesForTransaction(transaction)
for _, address := range addresses {
err := publishTransactionNotificationForAddress(transaction, address)
if err != nil {
return err
}
}
return nil
}
func uniqueAddressesForTransaction(transaction *apimodels.TransactionResponse) []string {
addressesMap := make(map[string]struct{})
addresses := []string{}
for _, output := range transaction.Outputs {
if _, exists := addressesMap[output.Address]; !exists {
addresses = append(addresses, output.Address)
addressesMap[output.Address] = struct{}{}
}
}
return addresses
}
func publishTransactionNotificationForAddress(transaction *apimodels.TransactionResponse, address string) error {
payload, err := json.Marshal(transaction)
if err != nil {
return err
}
token := client.Publish(transactionsTopic(address), 2, false, payload)
token.Wait()
if token.Error() != nil {
return token.Error()
}
return nil
}
func transactionsTopic(address string) string {
return fmt.Sprintf("transactions/%s", address)
}

View File

@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/hex"
"github.com/daglabs/btcd/apiserver/mqtt"
"strconv"
"strings"
"time"
@ -298,6 +299,11 @@ func addBlock(client *jsonrpc.Client, rawBlock string, verboseBlock btcjson.GetB
return httpserverutils.NewErrorFromDBErrors("failed to update block: ", dbErrors)
}
err = mqtt.PublishTransactionsNotifications(dbTx, verboseBlock.RawTx)
if err != nil {
return err
}
dbTx.Commit()
// If the block was previously missing, remove it from