diff --git a/apiserver/mqtt/mqtt.go b/apiserver/mqtt/mqtt.go index c85cfe1c6..d69e43f64 100644 --- a/apiserver/mqtt/mqtt.go +++ b/apiserver/mqtt/mqtt.go @@ -1,14 +1,20 @@ package mqtt import ( - "errors" + "encoding/json" "github.com/daglabs/btcd/apiserver/config" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/pkg/errors" ) // client is an instance of the MQTT client, in case we have an active connection var client mqtt.Client +const ( + qualityOfService = 2 + quiesceMilliseconds = 250 +) + // GetClient returns an instance of the MQTT client, in case we have an active connection func GetClient() (mqtt.Client, error) { if client == nil { @@ -49,6 +55,20 @@ func Close() { if client == nil { return } - client.Disconnect(250) + client.Disconnect(quiesceMilliseconds) client = nil } + +func publish(topic string, data interface{}) error { + payload, err := json.Marshal(data) + if err != nil { + return err + } + + token := client.Publish(topic, qualityOfService, false, payload) + token.Wait() + if token.Error() != nil { + return errors.WithStack(token.Error()) + } + return nil +} diff --git a/apiserver/mqtt/selected_tip.go b/apiserver/mqtt/selected_tip.go new file mode 100644 index 000000000..9b7c8a24e --- /dev/null +++ b/apiserver/mqtt/selected_tip.go @@ -0,0 +1,19 @@ +package mqtt + +import ( + "github.com/daglabs/btcd/apiserver/controllers" +) + +const selectedTipTopic = "dag/selected-tip" + +// PublishSelectedTipNotification publishes notification for a new selected tip +func PublishSelectedTipNotification(selectedTipHash string) error { + if !isConnected() { + return nil + } + block, err := controllers.GetBlockByHashHandler(selectedTipHash) + if err != nil { + return err + } + return publish(selectedTipTopic, block) +} diff --git a/apiserver/mqtt/transactions.go b/apiserver/mqtt/transactions.go index 9b2242ef8..46a0df113 100644 --- a/apiserver/mqtt/transactions.go +++ b/apiserver/mqtt/transactions.go @@ -1,13 +1,13 @@ package mqtt import ( - "encoding/json" "github.com/daglabs/btcd/apiserver/apimodels" "github.com/daglabs/btcd/apiserver/controllers" "github.com/daglabs/btcd/apiserver/database" "github.com/daglabs/btcd/btcjson" "github.com/daglabs/btcd/rpcclient" "github.com/jinzhu/gorm" + "path" ) // PublishTransactionsNotifications publishes notification for each transaction of the given block @@ -27,7 +27,7 @@ func PublishTransactionsNotifications(db *gorm.DB, rawTransactions []btcjson.TxR } for _, transaction := range transactions { - err = publishTransactionNotifications(transaction, "transactions/") + err = publishTransactionNotifications(transaction, "transactions") if err != nil { return err } @@ -59,18 +59,7 @@ func uniqueAddressesForTransaction(transaction *apimodels.TransactionResponse) [ } func publishTransactionNotificationForAddress(transaction *apimodels.TransactionResponse, address string, topic string) error { - payload, err := json.Marshal(transaction) - if err != nil { - return err - } - - token := client.Publish(topic+address, 2, false, payload) - token.Wait() - if token.Error() != nil { - return token.Error() - } - - return nil + return publish(path.Join(topic, address), transaction) } // PublishAcceptedTransactionsNotifications publishes notification for each accepted transaction of the given chain-block @@ -93,7 +82,7 @@ func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.Chai } for _, transaction := range transactions { - err = publishTransactionNotifications(transaction, "transactions/accepted/") + err = publishTransactionNotifications(transaction, "transactions/accepted") if err != nil { return err } diff --git a/apiserver/sync.go b/apiserver/sync.go index 5ec619578..df89f128a 100644 --- a/apiserver/sync.go +++ b/apiserver/sync.go @@ -60,8 +60,7 @@ func startSync(doneChan chan struct{}) error { log.Infof("Finished syncing past data") // Keep the node and the API server in sync - sync(client, doneChan) - return nil + return sync(client, doneChan) } // fetchInitialData downloads all data that's currently missing from @@ -79,20 +78,25 @@ func fetchInitialData(client *jsonrpc.Client) error { } // sync keeps the API server in sync with the node via notifications -func sync(client *jsonrpc.Client, doneChan chan struct{}) { -loop: +func sync(client *jsonrpc.Client, doneChan chan struct{}) error { // Handle client notifications until we're told to stop for { select { case blockAdded := <-client.OnBlockAdded: enqueueBlockAddedMsg(blockAdded) - processBlockAddedMsgs(client) + err := processBlockAddedMsgs(client) + if err != nil { + return err + } case chainChanged := <-client.OnChainChanged: enqueueChainChangedMsg(chainChanged) - processChainChangedMsgs() + err := processChainChangedMsgs() + if err != nil { + return err + } case <-doneChan: log.Infof("startSync stopped") - break loop + return nil } } } @@ -131,10 +135,7 @@ func syncBlocks(client *jsonrpc.Client) error { // chain starting with the bluest chain-block, and then updates the // database accordingly. func syncSelectedParentChain(client *jsonrpc.Client) error { - // Start syncing from the bluest chain-block hash. We use blue - // score to simulate the "last" block we have because blue-block - // order is the order that the node uses in the various JSONRPC - // calls. + // Start syncing from the selected tip hash startHash, err := findHashOfBluestBlock(true) if err != nil { return err @@ -168,12 +169,14 @@ func findHashOfBluestBlock(mustBeChainBlock bool) (*string, error) { return nil, err } - var block dbmodels.Block - dbQuery := db.Order("blue_score DESC") + var blockHashes []string + dbQuery := db.Model(&dbmodels.Block{}). + Order("blue_score DESC"). + Limit(1) if mustBeChainBlock { dbQuery = dbQuery.Where(&dbmodels.Block{IsChainBlock: true}) } - dbResult := dbQuery.First(&block) + dbResult := dbQuery.Pluck("block_hash", &blockHashes) dbErrors := dbResult.GetErrors() if httpserverutils.HasDBError(dbErrors) { return nil, httpserverutils.NewErrorFromDBErrors("failed to find hash of bluest block: ", dbErrors) @@ -181,7 +184,7 @@ func findHashOfBluestBlock(mustBeChainBlock bool) (*string, error) { if httpserverutils.IsDBRecordNotFoundError(dbErrors) { return nil, nil } - return &block.BlockHash, nil + return &blockHashes[0], nil } // fetchBlock downloads the serialized block and raw block data of @@ -930,12 +933,12 @@ func enqueueBlockAddedMsg(blockAdded *jsonrpc.BlockAddedMsg) { // processBlockAddedMsgs processes all pending onBlockAdded messages. // Messages that cannot yet be processed are re-enqueued. -func processBlockAddedMsgs(client *jsonrpc.Client) { +func processBlockAddedMsgs(client *jsonrpc.Client) error { var unprocessedBlockAddedMsgs []*jsonrpc.BlockAddedMsg for _, blockAdded := range pendingBlockAddedMsgs { missingHashes, err := missingParentHashes(blockAdded) if err != nil { - panic(errors.Errorf("Could not resolve missing parents: %s", err)) + return errors.Errorf("Could not resolve missing parents: %s", err) } for _, missingHash := range missingHashes { err := handleMissingParent(client, missingHash) @@ -949,21 +952,26 @@ func processBlockAddedMsgs(client *jsonrpc.Client) { continue } - hash := blockAdded.Header.BlockHash() - log.Debugf("Getting block %s from the RPC server", hash) - rawBlock, verboseBlock, err := fetchBlock(client, hash) - if err != nil { - log.Warnf("Could not fetch block %s: %s", hash, err) - return - } - err = addBlock(client, rawBlock, *verboseBlock) - if err != nil { - log.Errorf("Could not insert block %s: %s", hash, err) - return - } - log.Infof("Added block %s", hash) + handleBlockAddedMsg(client, blockAdded) } pendingBlockAddedMsgs = unprocessedBlockAddedMsgs + return nil +} + +func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) { + hash := blockAdded.Header.BlockHash() + log.Debugf("Getting block %s from the RPC server", hash) + rawBlock, verboseBlock, err := fetchBlock(client, hash) + if err != nil { + log.Warnf("Could not fetch block %s: %s", hash, err) + return + } + err = addBlock(client, rawBlock, *verboseBlock) + if err != nil { + log.Errorf("Could not insert block %s: %s", hash, err) + return + } + log.Infof("Added block %s", hash) } func missingParentHashes(blockAdded *jsonrpc.BlockAddedMsg) ([]string, error) { @@ -1045,35 +1053,43 @@ func enqueueChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) { // processChainChangedMsgs processes all pending onChainChanged messages. // Messages that cannot yet be processed are re-enqueued. -func processChainChangedMsgs() { +func processChainChangedMsgs() error { var unprocessedChainChangedMessages []*jsonrpc.ChainChangedMsg for _, chainChanged := range pendingChainChangedMsgs { canHandle, err := canHandleChainChangedMsg(chainChanged) if err != nil { - panic(errors.Errorf("Could not resolve if can handle ChainChangedMsg: %s", err)) + return errors.Wrap(err, "Could not resolve if can handle ChainChangedMsg") } if !canHandle { unprocessedChainChangedMessages = append(unprocessedChainChangedMessages, chainChanged) continue } - - // Convert the data in chainChanged to something we can feed into - // updateSelectedParentChain - removedHashes, addedBlocks := convertChainChangedMsg(chainChanged) - - err = updateSelectedParentChain(removedHashes, addedBlocks) + err = handleChainChangedMsg(chainChanged) if err != nil { - panic(errors.Errorf("Could not update selected parent chain: %s", err)) + return err } - log.Infof("Chain changed: removed %d blocks and added %d block", - len(removedHashes), len(addedBlocks)) - err = mqtt.PublishAcceptedTransactionsNotifications(chainChanged.AddedChainBlocks) - if err != nil { - panic(errors.Errorf("Error while publishing accepted transactions notifications %s", err)) - } } pendingChainChangedMsgs = unprocessedChainChangedMessages + return nil +} + +func handleChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) error { + // Convert the data in chainChanged to something we can feed into + // updateSelectedParentChain + removedHashes, addedBlocks := convertChainChangedMsg(chainChanged) + + err := updateSelectedParentChain(removedHashes, addedBlocks) + if err != nil { + return errors.Wrap(err, "Could not update selected parent chain") + } + log.Infof("Chain changed: removed %d blocks and added %d block", + len(removedHashes), len(addedBlocks)) + err = mqtt.PublishAcceptedTransactionsNotifications(chainChanged.AddedChainBlocks) + if err != nil { + return errors.Wrap(err, "Error while publishing accepted transactions notifications") + } + return mqtt.PublishSelectedTipNotification(addedBlocks[len(addedBlocks)-1].Hash) } // canHandleChainChangedMsg checks whether we have all the necessary data diff --git a/rpcclient/dag.go b/rpcclient/dag.go index 831d14b01..63da50d71 100644 --- a/rpcclient/dag.go +++ b/rpcclient/dag.go @@ -9,6 +9,7 @@ import ( "bytes" "encoding/hex" "encoding/json" + "github.com/pkg/errors" "github.com/daglabs/btcd/btcjson" "github.com/daglabs/btcd/util/daghash" @@ -31,7 +32,7 @@ func (r FutureGetSelectedTipHashResult) Receive() (*daghash.Hash, error) { var txHashStr string err = json.Unmarshal(res, &txHashStr) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return daghash.NewHashFromStr(txHashStr) } @@ -68,13 +69,13 @@ func (r FutureGetBlockResult) Receive() (*wire.MsgBlock, error) { var blockHex string err = json.Unmarshal(res, &blockHex) if err != nil { - return nil, err + return nil, errors.WithStack(err) } // Decode the serialized block hex to raw bytes. serializedBlock, err := hex.DecodeString(blockHex) if err != nil { - return nil, err + return nil, errors.WithStack(err) } // Deserialize the block and return it. @@ -123,7 +124,7 @@ func (r FutureGetBlocksResult) Receive() (*btcjson.GetBlocksResult, error) { var result btcjson.GetBlocksResult if err := json.Unmarshal(res, &result); err != nil { - return nil, err + return nil, errors.Wrapf(err, "%s", string(res)) } return &result, nil } @@ -160,7 +161,7 @@ func (r FutureGetBlockVerboseResult) Receive() (*btcjson.GetBlockVerboseResult, var blockResult btcjson.GetBlockVerboseResult err = json.Unmarshal(res, &blockResult) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return &blockResult, nil } @@ -265,7 +266,7 @@ func (r FutureGetChainFromBlockResult) Receive() (*btcjson.GetChainFromBlockResu var result btcjson.GetChainFromBlockResult if err := json.Unmarshal(res, &result); err != nil { - return nil, err + return nil, errors.WithStack(err) } return &result, nil } @@ -339,7 +340,7 @@ func (r FutureGetBlockDAGInfoResult) Receive() (*btcjson.GetBlockDAGInfoResult, var dagInfo btcjson.GetBlockDAGInfoResult if err := json.Unmarshal(res, &dagInfo); err != nil { - return nil, err + return nil, errors.WithStack(err) } return &dagInfo, nil } @@ -377,7 +378,7 @@ func (r FutureGetBlockHashResult) Receive() (*daghash.Hash, error) { var txHashStr string err = json.Unmarshal(res, &txHashStr) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return daghash.NewHashFromStr(txHashStr) } @@ -398,12 +399,12 @@ func (r FutureGetBlockHeaderResult) Receive() (*wire.BlockHeader, error) { var bhHex string err = json.Unmarshal(res, &bhHex) if err != nil { - return nil, err + return nil, errors.WithStack(err) } serializedBH, err := hex.DecodeString(bhHex) if err != nil { - return nil, err + return nil, errors.WithStack(err) } // Deserialize the blockheader and return it. @@ -455,7 +456,7 @@ func (r FutureGetBlockHeaderVerboseResult) Receive() (*btcjson.GetBlockHeaderVer var bh btcjson.GetBlockHeaderVerboseResult err = json.Unmarshal(res, &bh) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return &bh, nil @@ -501,7 +502,7 @@ func (r FutureGetMempoolEntryResult) Receive() (*btcjson.GetMempoolEntryResult, var mempoolEntryResult btcjson.GetMempoolEntryResult err = json.Unmarshal(res, &mempoolEntryResult) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return &mempoolEntryResult, nil @@ -539,7 +540,7 @@ func (r FutureGetRawMempoolResult) Receive() ([]*daghash.Hash, error) { var txHashStrs []string err = json.Unmarshal(res, &txHashStrs) if err != nil { - return nil, err + return nil, errors.WithStack(err) } // Create a slice of ShaHash arrays from the string slice. @@ -591,7 +592,7 @@ func (r FutureGetRawMempoolVerboseResult) Receive() (map[string]btcjson.GetRawMe var mempoolItems map[string]btcjson.GetRawMempoolVerboseResult err = json.Unmarshal(res, &mempoolItems) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return mempoolItems, nil } @@ -631,7 +632,7 @@ func (r FutureGetSubnetworkResult) Receive() (*btcjson.GetSubnetworkResult, erro var getSubnetworkResult *btcjson.GetSubnetworkResult err = json.Unmarshal(res, &getSubnetworkResult) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return getSubnetworkResult, nil @@ -674,7 +675,7 @@ func (r FutureGetTxOutResult) Receive() (*btcjson.GetTxOutResult, error) { var txOutInfo *btcjson.GetTxOutResult err = json.Unmarshal(res, &txOutInfo) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return txOutInfo, nil @@ -722,7 +723,7 @@ func (r FutureRescanBlocksResult) Receive() ([]btcjson.RescannedBlock, error) { var rescanBlocksResult []btcjson.RescannedBlock err = json.Unmarshal(res, &rescanBlocksResult) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return rescanBlocksResult, nil @@ -804,13 +805,13 @@ func (r FutureGetCFilterResult) Receive() (*wire.MsgCFilter, error) { var filterHex string err = json.Unmarshal(res, &filterHex) if err != nil { - return nil, err + return nil, errors.WithStack(err) } // Decode the serialized cf hex to raw bytes. serializedFilter, err := hex.DecodeString(filterHex) if err != nil { - return nil, err + return nil, errors.WithStack(err) } // Assign the filter bytes to the correct field of the wire message. @@ -859,7 +860,7 @@ func (r FutureGetCFilterHeaderResult) Receive() (*wire.MsgCFHeaders, error) { var headerHex string err = json.Unmarshal(res, &headerHex) if err != nil { - return nil, err + return nil, errors.WithStack(err) } // Assign the decoded header into a hash