mirror of
https://github.com/planetmint/planetmint-go.git
synced 2025-07-01 10:22:30 +00:00
feat(mqtt): start monitor after api server (#472)
* refactor(mqtt): load end of app.new * feat(mqtt): switch to proper logging instance * refactor(mqtt): dry * refactor(mqtt): do not start log message with uppercase closes #435 Signed-off-by: Julian Strobl <jmastr@mailbox.org>
This commit is contained in:
parent
6ddc07356e
commit
61adbc0a5d
@ -109,6 +109,7 @@ import (
|
|||||||
ibctm "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint"
|
ibctm "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint"
|
||||||
"github.com/spf13/cast"
|
"github.com/spf13/cast"
|
||||||
|
|
||||||
|
"github.com/planetmint/planetmint-go/monitor"
|
||||||
machinemodule "github.com/planetmint/planetmint-go/x/machine"
|
machinemodule "github.com/planetmint/planetmint-go/x/machine"
|
||||||
machinemodulekeeper "github.com/planetmint/planetmint-go/x/machine/keeper"
|
machinemodulekeeper "github.com/planetmint/planetmint-go/x/machine/keeper"
|
||||||
machinemoduletypes "github.com/planetmint/planetmint-go/x/machine/types"
|
machinemoduletypes "github.com/planetmint/planetmint-go/x/machine/types"
|
||||||
@ -806,6 +807,8 @@ func New(
|
|||||||
app.ScopedTransferKeeper = scopedTransferKeeper
|
app.ScopedTransferKeeper = scopedTransferKeeper
|
||||||
// this line is used by starport scaffolding # stargate/app/beforeInitReturn
|
// this line is used by starport scaffolding # stargate/app/beforeInitReturn
|
||||||
|
|
||||||
|
monitor.LazyMqttMonitorLoader(logger, homePath)
|
||||||
|
|
||||||
return app
|
return app
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -952,6 +955,12 @@ func (app *App) RegisterTendermintService(clientCtx client.Context) {
|
|||||||
// RegisterNodeService implements the Application.RegisterNodeService method.
|
// RegisterNodeService implements the Application.RegisterNodeService method.
|
||||||
func (app *App) RegisterNodeService(clientCtx client.Context) {
|
func (app *App) RegisterNodeService(clientCtx client.Context) {
|
||||||
nodeservice.RegisterNodeService(clientCtx, app.GRPCQueryRouter())
|
nodeservice.RegisterNodeService(clientCtx, app.GRPCQueryRouter())
|
||||||
|
// HACK: start mqtt monitor as late as possible (hint: look in vendor directory for startup order)
|
||||||
|
mqttMonitorInstance := monitor.GetMqttMonitorInstance()
|
||||||
|
err := mqttMonitorInstance.Start()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initParamsKeeper init params keeper and its subspaces
|
// initParamsKeeper init params keeper and its subspaces
|
||||||
|
@ -2,7 +2,6 @@ package monitor
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -22,7 +21,7 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
|
|||||||
|
|
||||||
lastSeenBytes, err := json.Marshal(lastSeen)
|
lastSeenBytes, err := json.Marshal(lastSeen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error())
|
Log("error serializing ConversionRequest: " + err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,9 +34,9 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
|
|||||||
err = mms.db.Put([]byte(address), lastSeenBytes, nil)
|
err = mms.db.Put([]byte(address), lastSeenBytes, nil)
|
||||||
mms.dbMutex.Unlock()
|
mms.dbMutex.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] error storing addresses in DB: " + err.Error())
|
Log("error storing addresses in DB: " + err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Println("[app] [Monitor] stored address in DB: " + address)
|
Log("stored address in DB: " + address)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
@ -61,9 +60,9 @@ func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) {
|
|||||||
|
|
||||||
// Check for any errors encountered during iteration
|
// Check for any errors encountered during iteration
|
||||||
if err := iter.Error(); err != nil {
|
if err := iter.Error(); err != nil {
|
||||||
log.Println("[app] [Monitor] " + err.Error())
|
Log("" + err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Println("[app] [Monitor] elements: " + strconv.FormatInt(amount, 10))
|
Log("elements: " + strconv.FormatInt(amount, 10))
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
@ -73,14 +72,14 @@ func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSe
|
|||||||
value := iter.Value()
|
value := iter.Value()
|
||||||
err = json.Unmarshal(value, &lastSeen)
|
err = json.Unmarshal(value, &lastSeen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error())
|
Log("failed to unmarshal entry: " + string(key) + " - " + err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mms *MqttMonitor) CleanupDB() {
|
func (mms *MqttMonitor) CleanupDB() {
|
||||||
// Create an iterator for the database
|
// Create an iterator for the database
|
||||||
log.Println("[app] [Monitor] Starting clean-up process")
|
Log("starting clean-up process")
|
||||||
iter := mms.db.NewIterator(nil, nil)
|
iter := mms.db.NewIterator(nil, nil)
|
||||||
defer iter.Release() // Make sure to release the iterator at the end
|
defer iter.Release() // Make sure to release the iterator at the end
|
||||||
|
|
||||||
@ -89,7 +88,7 @@ func (mms *MqttMonitor) CleanupDB() {
|
|||||||
// Use iter.Key() and iter.Value() to access the key and value
|
// Use iter.Key() and iter.Value() to access the key and value
|
||||||
lastSeen, err := mms.getDataFromIter(iter)
|
lastSeen, err := mms.getDataFromIter(iter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error())
|
Log("failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix()
|
timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix()
|
||||||
@ -97,15 +96,15 @@ func (mms *MqttMonitor) CleanupDB() {
|
|||||||
// If the entry is older than 12 hours, delete it
|
// If the entry is older than 12 hours, delete it
|
||||||
err := mms.deleteEntry(iter.Key())
|
err := mms.deleteEntry(iter.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] Failed to delete entry: " + err.Error())
|
Log("failed to delete entry: " + err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Println("[app] [Monitor] Delete entry: " + string(iter.Key()))
|
Log("delete entry: " + string(iter.Key()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for any errors encountered during iteration
|
// Check for any errors encountered during iteration
|
||||||
if err := iter.Error(); err != nil {
|
if err := iter.Error(); err != nil {
|
||||||
log.Println("[app] [Monitor] error during cleanup : " + err.Error())
|
Log("error during cleanup : " + err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package monitor
|
|||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/cometbft/cometbft/libs/log"
|
||||||
"github.com/planetmint/planetmint-go/config"
|
"github.com/planetmint/planetmint-go/config"
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
)
|
)
|
||||||
@ -15,6 +16,7 @@ type MQTTMonitorClientI interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var monitorMutex sync.RWMutex
|
var monitorMutex sync.RWMutex
|
||||||
|
var mqttLogger log.Logger
|
||||||
var mqttMonitorInstance MQTTMonitorClientI
|
var mqttMonitorInstance MQTTMonitorClientI
|
||||||
|
|
||||||
func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) {
|
func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) {
|
||||||
@ -23,13 +25,22 @@ func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) {
|
|||||||
monitorMutex.Unlock()
|
monitorMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func LazyMqttMonitorLoader(homeDir string) {
|
func GetMqttMonitorInstance() (monitorInstance MQTTMonitorClientI) {
|
||||||
|
monitorMutex.Lock()
|
||||||
|
defer monitorMutex.Unlock()
|
||||||
|
return mqttMonitorInstance
|
||||||
|
}
|
||||||
|
|
||||||
|
func LazyMqttMonitorLoader(logger log.Logger, homeDir string) {
|
||||||
monitorMutex.RLock()
|
monitorMutex.RLock()
|
||||||
tmpInstance := mqttMonitorInstance
|
tmpInstance := mqttMonitorInstance
|
||||||
monitorMutex.RUnlock()
|
monitorMutex.RUnlock()
|
||||||
if tmpInstance != nil {
|
if tmpInstance != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if logger != nil {
|
||||||
|
mqttLogger = logger
|
||||||
|
}
|
||||||
if homeDir == "" {
|
if homeDir == "" {
|
||||||
homeDir = "./"
|
homeDir = "./"
|
||||||
}
|
}
|
||||||
@ -39,10 +50,6 @@ func LazyMqttMonitorLoader(homeDir string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SetMqttMonitorInstance(NewMqttMonitorService(aciveActorsDB, *config.GetConfig()))
|
SetMqttMonitorInstance(NewMqttMonitorService(aciveActorsDB, *config.GetConfig()))
|
||||||
err = mqttMonitorInstance.Start()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
|
func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
|
||||||
@ -69,3 +76,10 @@ func GetActiveActorCount() (count uint64) {
|
|||||||
defer monitorMutex.RUnlock()
|
defer monitorMutex.RUnlock()
|
||||||
return mqttMonitorInstance.GetActiveActorCount()
|
return mqttMonitorInstance.GetActiveActorCount()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Log(msg string) {
|
||||||
|
if mqttLogger == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mqttLogger.Info("[app] [monitor] " + msg)
|
||||||
|
}
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -14,7 +13,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
|
||||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
"github.com/planetmint/planetmint-go/config"
|
"github.com/planetmint/planetmint-go/config"
|
||||||
"github.com/planetmint/planetmint-go/util"
|
"github.com/planetmint/planetmint-go/util"
|
||||||
@ -30,8 +28,6 @@ type MqttMonitor struct {
|
|||||||
config config.Config
|
config config.Config
|
||||||
numberOfElementsMutex sync.RWMutex
|
numberOfElementsMutex sync.RWMutex
|
||||||
numberOfElements int64
|
numberOfElements int64
|
||||||
sdkContext *sdk.Context
|
|
||||||
contextMutex sync.Mutex
|
|
||||||
isTerminated bool
|
isTerminated bool
|
||||||
terminationMutex sync.RWMutex
|
terminationMutex sync.RWMutex
|
||||||
maxRetries time.Duration
|
maxRetries time.Duration
|
||||||
@ -93,7 +89,7 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI {
|
|||||||
opts.SetTLSConfig(tlsConfig)
|
opts.SetTLSConfig(tlsConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("[app] [Monitor] create new client")
|
Log("create new client")
|
||||||
client := mqtt.NewClient(opts)
|
client := mqtt.NewClient(opts)
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
@ -145,8 +141,8 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
randomChallenger, randomChallengee := mms.getRandomNumbers()
|
randomChallenger, randomChallengee := mms.getRandomNumbers()
|
||||||
log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(numElements))
|
Log("number of elements: " + strconv.Itoa(numElements))
|
||||||
log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
|
Log("selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
|
||||||
iter := mms.db.NewIterator(nil, nil)
|
iter := mms.db.NewIterator(nil, nil)
|
||||||
defer iter.Release()
|
defer iter.Release()
|
||||||
count := 0
|
count := 0
|
||||||
@ -156,7 +152,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
|
|||||||
if count == randomChallenger {
|
if count == randomChallenger {
|
||||||
lastSeen, err = mms.getDataFromIter(iter)
|
lastSeen, err = mms.getDataFromIter(iter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallenger))
|
Log("could not get Data from ID" + strconv.Itoa(randomChallenger))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
challenger = lastSeen.Address
|
challenger = lastSeen.Address
|
||||||
@ -164,7 +160,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
|
|||||||
} else if count == randomChallengee {
|
} else if count == randomChallengee {
|
||||||
lastSeen, err = mms.getDataFromIter(iter)
|
lastSeen, err = mms.getDataFromIter(iter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallengee))
|
Log("could not get Data from ID" + strconv.Itoa(randomChallengee))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
challengee = lastSeen.Address
|
challengee = lastSeen.Address
|
||||||
@ -176,7 +172,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Println("[app] [Monitor] challenger, challengee: " + challenger + " " + challengee)
|
Log("challenger, challengee: " + challenger + " " + challengee)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,9 +210,9 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
|
|||||||
err = mms.AddParticipant(address, unixTime)
|
err = mms.AddParticipant(address, unixTime)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] error adding active actor to DB: " + address + " " + err.Error())
|
Log("error adding active actor to DB: " + address + " " + err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Println("[app] [Monitor] added active actor to DB: " + address)
|
Log("added active actor to DB: " + address)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -230,7 +226,7 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] cannot send machine query request " + err.Error())
|
Log("cannot send machine query request " + err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -240,7 +236,7 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
|
|||||||
// Send the request
|
// Send the request
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] cannot connect to server: " + err.Error())
|
Log("cannot connect to server: " + err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -250,13 +246,13 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
|
|||||||
// Read the response body
|
// Read the response body
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] cannot read response: " + err.Error())
|
Log("cannot read response: " + err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the status code
|
// Check the status code
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
log.Print("[app] [Monitor] unexpected status code: " + string(body))
|
Log("unexpected status code: " + string(body))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -264,29 +260,29 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
|
|||||||
var data map[string]interface{}
|
var data map[string]interface{}
|
||||||
err = json.Unmarshal(body, &data)
|
err = json.Unmarshal(body, &data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("[app] [Monitor] cannot unmarshal response " + err.Error())
|
Log("cannot unmarshal response " + err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the "info" key exists
|
// Check if the "info" key exists
|
||||||
machineValue, ok := data["machine"]
|
machineValue, ok := data["machine"]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Println("[app] [Monitor] response does not contain the required machine")
|
Log("response does not contain the required machine")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
machineMap, ok := machineValue.(map[string]interface{})
|
machineMap, ok := machineValue.(map[string]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Println("[app] [Monitor] cannot convert machine map")
|
Log("cannot convert machine map")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
addressMap, ok := machineMap["address"]
|
addressMap, ok := machineMap["address"]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Println("[app] [Monitor] response does not contain the required name")
|
Log("response does not contain the required name")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
value, ok := addressMap.(string)
|
value, ok := addressMap.(string)
|
||||||
if !ok || value != address {
|
if !ok || value != address {
|
||||||
log.Println("[app] [Monitor] return machine is not the required one")
|
Log("return machine is not the required one")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,7 +292,7 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) {
|
func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) {
|
||||||
log.Println("[app] [Monitor] Connection lost: " + err.Error())
|
Log("connection lost: " + err.Error())
|
||||||
// Handle connection loss here (e.g., reconnect attempts, logging)
|
// Handle connection loss here (e.g., reconnect attempts, logging)
|
||||||
if !mms.IsTerminated() {
|
if !mms.IsTerminated() {
|
||||||
mms.lostConnectionMutex.Lock()
|
mms.lostConnectionMutex.Lock()
|
||||||
@ -308,7 +304,7 @@ func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) {
|
|||||||
func (mms *MqttMonitor) MonitorActiveParticipants() {
|
func (mms *MqttMonitor) MonitorActiveParticipants() {
|
||||||
mms.clientMutex.Lock()
|
mms.clientMutex.Lock()
|
||||||
if mms.localMqttClient != nil {
|
if mms.localMqttClient != nil {
|
||||||
log.Println("[app] [Monitor] client is still working")
|
Log("client is still working")
|
||||||
mms.clientMutex.Unlock()
|
mms.clientMutex.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -320,7 +316,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
|
|||||||
mms.SetMaxRetries()
|
mms.SetMaxRetries()
|
||||||
for !mms.IsTerminated() && mms.maxRetries > 0 {
|
for !mms.IsTerminated() && mms.maxRetries > 0 {
|
||||||
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
||||||
log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error())
|
Log("error connecting to mqtt: " + token.Error().Error())
|
||||||
mms.maxRetries--
|
mms.maxRetries--
|
||||||
time.Sleep(time.Second * 5)
|
time.Sleep(time.Second * 5)
|
||||||
continue
|
continue
|
||||||
@ -329,24 +325,24 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
|
|||||||
mms.lostConnection = false
|
mms.lostConnection = false
|
||||||
mms.lostConnectionMutex.Unlock()
|
mms.lostConnectionMutex.Unlock()
|
||||||
|
|
||||||
log.Println("[app] [Monitor] established connection")
|
Log("established connection")
|
||||||
|
|
||||||
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
|
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
|
||||||
|
|
||||||
// Subscribe to a topic
|
// Subscribe to a topic
|
||||||
subscriptionTopic := "tele/#"
|
subscriptionTopic := "tele/#"
|
||||||
if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
|
if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
|
||||||
log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error())
|
Log("error registering the mqtt subscription: " + token.Error().Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Println("[app] [Monitor] subscribed to tele/# channels")
|
Log("subscribed to tele/# channels")
|
||||||
|
|
||||||
for !mms.IsTerminated() {
|
for !mms.IsTerminated() {
|
||||||
mms.lostConnectionMutex.Lock()
|
mms.lostConnectionMutex.Lock()
|
||||||
lostConnectionEvent := mms.lostConnection
|
lostConnectionEvent := mms.lostConnection
|
||||||
mms.lostConnectionMutex.Unlock()
|
mms.lostConnectionMutex.Unlock()
|
||||||
if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent {
|
if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent {
|
||||||
log.Println("[app] [Monitor] retry establishing a connection")
|
Log("retry establishing a connection")
|
||||||
break // Exit inner loop on disconnect
|
break // Exit inner loop on disconnect
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -357,7 +353,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if mms.maxRetries == 0 {
|
if mms.maxRetries == 0 {
|
||||||
log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.")
|
Log("reached maximum reconnection attempts. Exiting. New client will be activated soon.")
|
||||||
}
|
}
|
||||||
|
|
||||||
mms.clientMutex.Lock()
|
mms.clientMutex.Lock()
|
||||||
@ -375,12 +371,3 @@ func SendUpdateMessage(mqttClient util.MQTTClientI) {
|
|||||||
func (mms *MqttMonitor) SetMaxRetries() {
|
func (mms *MqttMonitor) SetMaxRetries() {
|
||||||
mms.maxRetries = 5
|
mms.maxRetries = 5
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mms *MqttMonitor) Log(msg string) {
|
|
||||||
mms.contextMutex.Lock()
|
|
||||||
localContext := mms.sdkContext
|
|
||||||
mms.contextMutex.Unlock()
|
|
||||||
if localContext != nil {
|
|
||||||
util.GetAppLogger().Info(*localContext, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||||
paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"
|
paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"
|
||||||
|
|
||||||
"github.com/planetmint/planetmint-go/monitor"
|
|
||||||
"github.com/planetmint/planetmint-go/x/dao/types"
|
"github.com/planetmint/planetmint-go/x/dao/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -50,7 +49,6 @@ func NewKeeper(
|
|||||||
if !ps.HasKeyTable() {
|
if !ps.HasKeyTable() {
|
||||||
ps = ps.WithKeyTable(types.ParamKeyTable())
|
ps = ps.WithKeyTable(types.ParamKeyTable())
|
||||||
}
|
}
|
||||||
monitor.LazyMqttMonitorLoader(rootDir)
|
|
||||||
return &Keeper{
|
return &Keeper{
|
||||||
cdc: cdc,
|
cdc: cdc,
|
||||||
storeKey: storeKey,
|
storeKey: storeKey,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user