[NOD-427] Add selected tip mqtt notification for the API server (#489)

* [NOD-427] Send notifications to `dag/selected-tip`

* [NOD-442] Add selected tip notification

* [NOD-427] Add comment to PublishSelectedTipNotification

* [NOD-427] Remove redundant argument from errors.Wrapf

* [NOD-427] Add handleBlockAddedMsg function

* [NOD-427] Return errors instead of panicking

* [NOD-427] Fix findHashOfBluestBlock to use []string instead of dbmodels.Block

* [NOD-427] Add constants

* [NOD-427] use path.Join instead of topic+address

* [NOD-427] Remove redundant select

* [NOD-427] Change break to return
This commit is contained in:
Ori Newman 2019-11-26 14:44:27 +02:00 committed by Svarog
parent b1f59914d2
commit 532e57b61c
5 changed files with 127 additions and 82 deletions

View File

@ -1,14 +1,20 @@
package mqtt
import (
"errors"
"encoding/json"
"github.com/daglabs/btcd/apiserver/config"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/pkg/errors"
)
// client is an instance of the MQTT client, in case we have an active connection
var client mqtt.Client
const (
qualityOfService = 2
quiesceMilliseconds = 250
)
// GetClient returns an instance of the MQTT client, in case we have an active connection
func GetClient() (mqtt.Client, error) {
if client == nil {
@ -49,6 +55,20 @@ func Close() {
if client == nil {
return
}
client.Disconnect(250)
client.Disconnect(quiesceMilliseconds)
client = nil
}
func publish(topic string, data interface{}) error {
payload, err := json.Marshal(data)
if err != nil {
return err
}
token := client.Publish(topic, qualityOfService, false, payload)
token.Wait()
if token.Error() != nil {
return errors.WithStack(token.Error())
}
return nil
}

View File

@ -0,0 +1,19 @@
package mqtt
import (
"github.com/daglabs/btcd/apiserver/controllers"
)
const selectedTipTopic = "dag/selected-tip"
// PublishSelectedTipNotification publishes notification for a new selected tip
func PublishSelectedTipNotification(selectedTipHash string) error {
if !isConnected() {
return nil
}
block, err := controllers.GetBlockByHashHandler(selectedTipHash)
if err != nil {
return err
}
return publish(selectedTipTopic, block)
}

View File

@ -1,13 +1,13 @@
package mqtt
import (
"encoding/json"
"github.com/daglabs/btcd/apiserver/apimodels"
"github.com/daglabs/btcd/apiserver/controllers"
"github.com/daglabs/btcd/apiserver/database"
"github.com/daglabs/btcd/btcjson"
"github.com/daglabs/btcd/rpcclient"
"github.com/jinzhu/gorm"
"path"
)
// PublishTransactionsNotifications publishes notification for each transaction of the given block
@ -27,7 +27,7 @@ func PublishTransactionsNotifications(db *gorm.DB, rawTransactions []btcjson.TxR
}
for _, transaction := range transactions {
err = publishTransactionNotifications(transaction, "transactions/")
err = publishTransactionNotifications(transaction, "transactions")
if err != nil {
return err
}
@ -59,18 +59,7 @@ func uniqueAddressesForTransaction(transaction *apimodels.TransactionResponse) [
}
func publishTransactionNotificationForAddress(transaction *apimodels.TransactionResponse, address string, topic string) error {
payload, err := json.Marshal(transaction)
if err != nil {
return err
}
token := client.Publish(topic+address, 2, false, payload)
token.Wait()
if token.Error() != nil {
return token.Error()
}
return nil
return publish(path.Join(topic, address), transaction)
}
// PublishAcceptedTransactionsNotifications publishes notification for each accepted transaction of the given chain-block
@ -93,7 +82,7 @@ func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.Chai
}
for _, transaction := range transactions {
err = publishTransactionNotifications(transaction, "transactions/accepted/")
err = publishTransactionNotifications(transaction, "transactions/accepted")
if err != nil {
return err
}

View File

@ -60,8 +60,7 @@ func startSync(doneChan chan struct{}) error {
log.Infof("Finished syncing past data")
// Keep the node and the API server in sync
sync(client, doneChan)
return nil
return sync(client, doneChan)
}
// fetchInitialData downloads all data that's currently missing from
@ -79,20 +78,25 @@ func fetchInitialData(client *jsonrpc.Client) error {
}
// sync keeps the API server in sync with the node via notifications
func sync(client *jsonrpc.Client, doneChan chan struct{}) {
loop:
func sync(client *jsonrpc.Client, doneChan chan struct{}) error {
// Handle client notifications until we're told to stop
for {
select {
case blockAdded := <-client.OnBlockAdded:
enqueueBlockAddedMsg(blockAdded)
processBlockAddedMsgs(client)
err := processBlockAddedMsgs(client)
if err != nil {
return err
}
case chainChanged := <-client.OnChainChanged:
enqueueChainChangedMsg(chainChanged)
processChainChangedMsgs()
err := processChainChangedMsgs()
if err != nil {
return err
}
case <-doneChan:
log.Infof("startSync stopped")
break loop
return nil
}
}
}
@ -131,10 +135,7 @@ func syncBlocks(client *jsonrpc.Client) error {
// chain starting with the bluest chain-block, and then updates the
// database accordingly.
func syncSelectedParentChain(client *jsonrpc.Client) error {
// Start syncing from the bluest chain-block hash. We use blue
// score to simulate the "last" block we have because blue-block
// order is the order that the node uses in the various JSONRPC
// calls.
// Start syncing from the selected tip hash
startHash, err := findHashOfBluestBlock(true)
if err != nil {
return err
@ -168,12 +169,14 @@ func findHashOfBluestBlock(mustBeChainBlock bool) (*string, error) {
return nil, err
}
var block dbmodels.Block
dbQuery := db.Order("blue_score DESC")
var blockHashes []string
dbQuery := db.Model(&dbmodels.Block{}).
Order("blue_score DESC").
Limit(1)
if mustBeChainBlock {
dbQuery = dbQuery.Where(&dbmodels.Block{IsChainBlock: true})
}
dbResult := dbQuery.First(&block)
dbResult := dbQuery.Pluck("block_hash", &blockHashes)
dbErrors := dbResult.GetErrors()
if httpserverutils.HasDBError(dbErrors) {
return nil, httpserverutils.NewErrorFromDBErrors("failed to find hash of bluest block: ", dbErrors)
@ -181,7 +184,7 @@ func findHashOfBluestBlock(mustBeChainBlock bool) (*string, error) {
if httpserverutils.IsDBRecordNotFoundError(dbErrors) {
return nil, nil
}
return &block.BlockHash, nil
return &blockHashes[0], nil
}
// fetchBlock downloads the serialized block and raw block data of
@ -930,12 +933,12 @@ func enqueueBlockAddedMsg(blockAdded *jsonrpc.BlockAddedMsg) {
// processBlockAddedMsgs processes all pending onBlockAdded messages.
// Messages that cannot yet be processed are re-enqueued.
func processBlockAddedMsgs(client *jsonrpc.Client) {
func processBlockAddedMsgs(client *jsonrpc.Client) error {
var unprocessedBlockAddedMsgs []*jsonrpc.BlockAddedMsg
for _, blockAdded := range pendingBlockAddedMsgs {
missingHashes, err := missingParentHashes(blockAdded)
if err != nil {
panic(errors.Errorf("Could not resolve missing parents: %s", err))
return errors.Errorf("Could not resolve missing parents: %s", err)
}
for _, missingHash := range missingHashes {
err := handleMissingParent(client, missingHash)
@ -949,6 +952,13 @@ func processBlockAddedMsgs(client *jsonrpc.Client) {
continue
}
handleBlockAddedMsg(client, blockAdded)
}
pendingBlockAddedMsgs = unprocessedBlockAddedMsgs
return nil
}
func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) {
hash := blockAdded.Header.BlockHash()
log.Debugf("Getting block %s from the RPC server", hash)
rawBlock, verboseBlock, err := fetchBlock(client, hash)
@ -962,8 +972,6 @@ func processBlockAddedMsgs(client *jsonrpc.Client) {
return
}
log.Infof("Added block %s", hash)
}
pendingBlockAddedMsgs = unprocessedBlockAddedMsgs
}
func missingParentHashes(blockAdded *jsonrpc.BlockAddedMsg) ([]string, error) {
@ -1045,35 +1053,43 @@ func enqueueChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) {
// processChainChangedMsgs processes all pending onChainChanged messages.
// Messages that cannot yet be processed are re-enqueued.
func processChainChangedMsgs() {
func processChainChangedMsgs() error {
var unprocessedChainChangedMessages []*jsonrpc.ChainChangedMsg
for _, chainChanged := range pendingChainChangedMsgs {
canHandle, err := canHandleChainChangedMsg(chainChanged)
if err != nil {
panic(errors.Errorf("Could not resolve if can handle ChainChangedMsg: %s", err))
return errors.Wrap(err, "Could not resolve if can handle ChainChangedMsg")
}
if !canHandle {
unprocessedChainChangedMessages = append(unprocessedChainChangedMessages, chainChanged)
continue
}
err = handleChainChangedMsg(chainChanged)
if err != nil {
return err
}
}
pendingChainChangedMsgs = unprocessedChainChangedMessages
return nil
}
func handleChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) error {
// Convert the data in chainChanged to something we can feed into
// updateSelectedParentChain
removedHashes, addedBlocks := convertChainChangedMsg(chainChanged)
err = updateSelectedParentChain(removedHashes, addedBlocks)
err := updateSelectedParentChain(removedHashes, addedBlocks)
if err != nil {
panic(errors.Errorf("Could not update selected parent chain: %s", err))
return errors.Wrap(err, "Could not update selected parent chain")
}
log.Infof("Chain changed: removed %d blocks and added %d block",
len(removedHashes), len(addedBlocks))
err = mqtt.PublishAcceptedTransactionsNotifications(chainChanged.AddedChainBlocks)
if err != nil {
panic(errors.Errorf("Error while publishing accepted transactions notifications %s", err))
return errors.Wrap(err, "Error while publishing accepted transactions notifications")
}
}
pendingChainChangedMsgs = unprocessedChainChangedMessages
return mqtt.PublishSelectedTipNotification(addedBlocks[len(addedBlocks)-1].Hash)
}
// canHandleChainChangedMsg checks whether we have all the necessary data

View File

@ -9,6 +9,7 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"github.com/pkg/errors"
"github.com/daglabs/btcd/btcjson"
"github.com/daglabs/btcd/util/daghash"
@ -31,7 +32,7 @@ func (r FutureGetSelectedTipHashResult) Receive() (*daghash.Hash, error) {
var txHashStr string
err = json.Unmarshal(res, &txHashStr)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return daghash.NewHashFromStr(txHashStr)
}
@ -68,13 +69,13 @@ func (r FutureGetBlockResult) Receive() (*wire.MsgBlock, error) {
var blockHex string
err = json.Unmarshal(res, &blockHex)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
// Decode the serialized block hex to raw bytes.
serializedBlock, err := hex.DecodeString(blockHex)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
// Deserialize the block and return it.
@ -123,7 +124,7 @@ func (r FutureGetBlocksResult) Receive() (*btcjson.GetBlocksResult, error) {
var result btcjson.GetBlocksResult
if err := json.Unmarshal(res, &result); err != nil {
return nil, err
return nil, errors.Wrapf(err, "%s", string(res))
}
return &result, nil
}
@ -160,7 +161,7 @@ func (r FutureGetBlockVerboseResult) Receive() (*btcjson.GetBlockVerboseResult,
var blockResult btcjson.GetBlockVerboseResult
err = json.Unmarshal(res, &blockResult)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return &blockResult, nil
}
@ -265,7 +266,7 @@ func (r FutureGetChainFromBlockResult) Receive() (*btcjson.GetChainFromBlockResu
var result btcjson.GetChainFromBlockResult
if err := json.Unmarshal(res, &result); err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return &result, nil
}
@ -339,7 +340,7 @@ func (r FutureGetBlockDAGInfoResult) Receive() (*btcjson.GetBlockDAGInfoResult,
var dagInfo btcjson.GetBlockDAGInfoResult
if err := json.Unmarshal(res, &dagInfo); err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return &dagInfo, nil
}
@ -377,7 +378,7 @@ func (r FutureGetBlockHashResult) Receive() (*daghash.Hash, error) {
var txHashStr string
err = json.Unmarshal(res, &txHashStr)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return daghash.NewHashFromStr(txHashStr)
}
@ -398,12 +399,12 @@ func (r FutureGetBlockHeaderResult) Receive() (*wire.BlockHeader, error) {
var bhHex string
err = json.Unmarshal(res, &bhHex)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
serializedBH, err := hex.DecodeString(bhHex)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
// Deserialize the blockheader and return it.
@ -455,7 +456,7 @@ func (r FutureGetBlockHeaderVerboseResult) Receive() (*btcjson.GetBlockHeaderVer
var bh btcjson.GetBlockHeaderVerboseResult
err = json.Unmarshal(res, &bh)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return &bh, nil
@ -501,7 +502,7 @@ func (r FutureGetMempoolEntryResult) Receive() (*btcjson.GetMempoolEntryResult,
var mempoolEntryResult btcjson.GetMempoolEntryResult
err = json.Unmarshal(res, &mempoolEntryResult)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return &mempoolEntryResult, nil
@ -539,7 +540,7 @@ func (r FutureGetRawMempoolResult) Receive() ([]*daghash.Hash, error) {
var txHashStrs []string
err = json.Unmarshal(res, &txHashStrs)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
// Create a slice of ShaHash arrays from the string slice.
@ -591,7 +592,7 @@ func (r FutureGetRawMempoolVerboseResult) Receive() (map[string]btcjson.GetRawMe
var mempoolItems map[string]btcjson.GetRawMempoolVerboseResult
err = json.Unmarshal(res, &mempoolItems)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return mempoolItems, nil
}
@ -631,7 +632,7 @@ func (r FutureGetSubnetworkResult) Receive() (*btcjson.GetSubnetworkResult, erro
var getSubnetworkResult *btcjson.GetSubnetworkResult
err = json.Unmarshal(res, &getSubnetworkResult)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return getSubnetworkResult, nil
@ -674,7 +675,7 @@ func (r FutureGetTxOutResult) Receive() (*btcjson.GetTxOutResult, error) {
var txOutInfo *btcjson.GetTxOutResult
err = json.Unmarshal(res, &txOutInfo)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return txOutInfo, nil
@ -722,7 +723,7 @@ func (r FutureRescanBlocksResult) Receive() ([]btcjson.RescannedBlock, error) {
var rescanBlocksResult []btcjson.RescannedBlock
err = json.Unmarshal(res, &rescanBlocksResult)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return rescanBlocksResult, nil
@ -804,13 +805,13 @@ func (r FutureGetCFilterResult) Receive() (*wire.MsgCFilter, error) {
var filterHex string
err = json.Unmarshal(res, &filterHex)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
// Decode the serialized cf hex to raw bytes.
serializedFilter, err := hex.DecodeString(filterHex)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
// Assign the filter bytes to the correct field of the wire message.
@ -859,7 +860,7 @@ func (r FutureGetCFilterHeaderResult) Receive() (*wire.MsgCFHeaders, error) {
var headerHex string
err = json.Unmarshal(res, &headerHex)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
// Assign the decoded header into a hash