From 61adbc0a5d5613fdf0e0c61e20ed50c65f52901f Mon Sep 17 00:00:00 2001 From: Julian Strobl Date: Fri, 15 Nov 2024 11:27:17 +0100 Subject: [PATCH] feat(mqtt): start monitor after api server (#472) * refactor(mqtt): load end of app.new * feat(mqtt): switch to proper logging instance * refactor(mqtt): dry * refactor(mqtt): do not start log message with uppercase closes #435 Signed-off-by: Julian Strobl --- app/app.go | 9 ++++++ monitor/backend.go | 23 +++++++-------- monitor/interface.go | 24 ++++++++++++---- monitor/mqtt_monitor.go | 63 ++++++++++++++++------------------------- x/dao/keeper/keeper.go | 2 -- 5 files changed, 64 insertions(+), 57 deletions(-) diff --git a/app/app.go b/app/app.go index a596fa4..6310cf6 100644 --- a/app/app.go +++ b/app/app.go @@ -109,6 +109,7 @@ import ( ibctm "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" "github.com/spf13/cast" + "github.com/planetmint/planetmint-go/monitor" machinemodule "github.com/planetmint/planetmint-go/x/machine" machinemodulekeeper "github.com/planetmint/planetmint-go/x/machine/keeper" machinemoduletypes "github.com/planetmint/planetmint-go/x/machine/types" @@ -806,6 +807,8 @@ func New( app.ScopedTransferKeeper = scopedTransferKeeper // this line is used by starport scaffolding # stargate/app/beforeInitReturn + monitor.LazyMqttMonitorLoader(logger, homePath) + return app } @@ -952,6 +955,12 @@ func (app *App) RegisterTendermintService(clientCtx client.Context) { // RegisterNodeService implements the Application.RegisterNodeService method. func (app *App) RegisterNodeService(clientCtx client.Context) { nodeservice.RegisterNodeService(clientCtx, app.GRPCQueryRouter()) + // HACK: start mqtt monitor as late as possible (hint: look in vendor directory for startup order) + mqttMonitorInstance := monitor.GetMqttMonitorInstance() + err := mqttMonitorInstance.Start() + if err != nil { + panic(err) + } } // initParamsKeeper init params keeper and its subspaces diff --git a/monitor/backend.go b/monitor/backend.go index ff76767..633f0aa 100644 --- a/monitor/backend.go +++ b/monitor/backend.go @@ -2,7 +2,6 @@ package monitor import ( "encoding/json" - "log" "strconv" "time" @@ -22,7 +21,7 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er lastSeenBytes, err := json.Marshal(lastSeen) if err != nil { - log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error()) + Log("error serializing ConversionRequest: " + err.Error()) return } @@ -35,9 +34,9 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er err = mms.db.Put([]byte(address), lastSeenBytes, nil) mms.dbMutex.Unlock() if err != nil { - log.Println("[app] [Monitor] error storing addresses in DB: " + err.Error()) + Log("error storing addresses in DB: " + err.Error()) } else { - log.Println("[app] [Monitor] stored address in DB: " + address) + Log("stored address in DB: " + address) } return @@ -61,9 +60,9 @@ func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) { // Check for any errors encountered during iteration if err := iter.Error(); err != nil { - log.Println("[app] [Monitor] " + err.Error()) + Log("" + err.Error()) } else { - log.Println("[app] [Monitor] elements: " + strconv.FormatInt(amount, 10)) + Log("elements: " + strconv.FormatInt(amount, 10)) } return @@ -73,14 +72,14 @@ func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSe value := iter.Value() err = json.Unmarshal(value, &lastSeen) if err != nil { - log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error()) + Log("failed to unmarshal entry: " + string(key) + " - " + err.Error()) } return } func (mms *MqttMonitor) CleanupDB() { // Create an iterator for the database - log.Println("[app] [Monitor] Starting clean-up process") + Log("starting clean-up process") iter := mms.db.NewIterator(nil, nil) defer iter.Release() // Make sure to release the iterator at the end @@ -89,7 +88,7 @@ func (mms *MqttMonitor) CleanupDB() { // Use iter.Key() and iter.Value() to access the key and value lastSeen, err := mms.getDataFromIter(iter) if err != nil { - log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) + Log("failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) continue } timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix() @@ -97,15 +96,15 @@ func (mms *MqttMonitor) CleanupDB() { // If the entry is older than 12 hours, delete it err := mms.deleteEntry(iter.Key()) if err != nil { - log.Println("[app] [Monitor] Failed to delete entry: " + err.Error()) + Log("failed to delete entry: " + err.Error()) } else { - log.Println("[app] [Monitor] Delete entry: " + string(iter.Key())) + Log("delete entry: " + string(iter.Key())) } } } // Check for any errors encountered during iteration if err := iter.Error(); err != nil { - log.Println("[app] [Monitor] error during cleanup : " + err.Error()) + Log("error during cleanup : " + err.Error()) } } diff --git a/monitor/interface.go b/monitor/interface.go index fa9482e..e26b0a7 100644 --- a/monitor/interface.go +++ b/monitor/interface.go @@ -3,6 +3,7 @@ package monitor import ( "sync" + "github.com/cometbft/cometbft/libs/log" "github.com/planetmint/planetmint-go/config" "github.com/syndtr/goleveldb/leveldb" ) @@ -15,6 +16,7 @@ type MQTTMonitorClientI interface { } var monitorMutex sync.RWMutex +var mqttLogger log.Logger var mqttMonitorInstance MQTTMonitorClientI func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { @@ -23,13 +25,22 @@ func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { monitorMutex.Unlock() } -func LazyMqttMonitorLoader(homeDir string) { +func GetMqttMonitorInstance() (monitorInstance MQTTMonitorClientI) { + monitorMutex.Lock() + defer monitorMutex.Unlock() + return mqttMonitorInstance +} + +func LazyMqttMonitorLoader(logger log.Logger, homeDir string) { monitorMutex.RLock() tmpInstance := mqttMonitorInstance monitorMutex.RUnlock() if tmpInstance != nil { return } + if logger != nil { + mqttLogger = logger + } if homeDir == "" { homeDir = "./" } @@ -39,10 +50,6 @@ func LazyMqttMonitorLoader(homeDir string) { } SetMqttMonitorInstance(NewMqttMonitorService(aciveActorsDB, *config.GetConfig())) - err = mqttMonitorInstance.Start() - if err != nil { - panic(err) - } } func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { @@ -69,3 +76,10 @@ func GetActiveActorCount() (count uint64) { defer monitorMutex.RUnlock() return mqttMonitorInstance.GetActiveActorCount() } + +func Log(msg string) { + if mqttLogger == nil { + return + } + mqttLogger.Info("[app] [monitor] " + msg) +} diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index a4550c4..7ed6962 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "encoding/json" "io" - "log" "math/rand" "net" "net/http" @@ -14,7 +13,6 @@ import ( "sync" "time" - sdk "github.com/cosmos/cosmos-sdk/types" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/planetmint/planetmint-go/config" "github.com/planetmint/planetmint-go/util" @@ -30,8 +28,6 @@ type MqttMonitor struct { config config.Config numberOfElementsMutex sync.RWMutex numberOfElements int64 - sdkContext *sdk.Context - contextMutex sync.Mutex isTerminated bool terminationMutex sync.RWMutex maxRetries time.Duration @@ -93,7 +89,7 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { opts.SetTLSConfig(tlsConfig) } - log.Println("[app] [Monitor] create new client") + Log("create new client") client := mqtt.NewClient(opts) return client } @@ -145,8 +141,8 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str return } randomChallenger, randomChallengee := mms.getRandomNumbers() - log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(numElements)) - log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) + Log("number of elements: " + strconv.Itoa(numElements)) + Log("selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) iter := mms.db.NewIterator(nil, nil) defer iter.Release() count := 0 @@ -156,7 +152,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str if count == randomChallenger { lastSeen, err = mms.getDataFromIter(iter) if err != nil { - log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallenger)) + Log("could not get Data from ID" + strconv.Itoa(randomChallenger)) return } challenger = lastSeen.Address @@ -164,7 +160,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str } else if count == randomChallengee { lastSeen, err = mms.getDataFromIter(iter) if err != nil { - log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallengee)) + Log("could not get Data from ID" + strconv.Itoa(randomChallengee)) return } challengee = lastSeen.Address @@ -176,7 +172,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str break } } - log.Println("[app] [Monitor] challenger, challengee: " + challenger + " " + challengee) + Log("challenger, challengee: " + challenger + " " + challengee) return } @@ -214,9 +210,9 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { err = mms.AddParticipant(address, unixTime) if err != nil { - log.Println("[app] [Monitor] error adding active actor to DB: " + address + " " + err.Error()) + Log("error adding active actor to DB: " + address + " " + err.Error()) } else { - log.Println("[app] [Monitor] added active actor to DB: " + address) + Log("added active actor to DB: " + address) } } @@ -230,7 +226,7 @@ func IsLegitMachineAddress(address string) (active bool, err error) { ctx := context.Background() req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { - log.Println("[app] [Monitor] cannot send machine query request " + err.Error()) + Log("cannot send machine query request " + err.Error()) return } @@ -240,7 +236,7 @@ func IsLegitMachineAddress(address string) (active bool, err error) { // Send the request resp, err := client.Do(req) if err != nil { - log.Println("[app] [Monitor] cannot connect to server: " + err.Error()) + Log("cannot connect to server: " + err.Error()) return } @@ -250,13 +246,13 @@ func IsLegitMachineAddress(address string) (active bool, err error) { // Read the response body body, err := io.ReadAll(resp.Body) if err != nil { - log.Println("[app] [Monitor] cannot read response: " + err.Error()) + Log("cannot read response: " + err.Error()) return } // Check the status code if resp.StatusCode != http.StatusOK { - log.Print("[app] [Monitor] unexpected status code: " + string(body)) + Log("unexpected status code: " + string(body)) return } @@ -264,29 +260,29 @@ func IsLegitMachineAddress(address string) (active bool, err error) { var data map[string]interface{} err = json.Unmarshal(body, &data) if err != nil { - log.Println("[app] [Monitor] cannot unmarshal response " + err.Error()) + Log("cannot unmarshal response " + err.Error()) return } // Check if the "info" key exists machineValue, ok := data["machine"] if !ok { - log.Println("[app] [Monitor] response does not contain the required machine") + Log("response does not contain the required machine") return } machineMap, ok := machineValue.(map[string]interface{}) if !ok { - log.Println("[app] [Monitor] cannot convert machine map") + Log("cannot convert machine map") return } addressMap, ok := machineMap["address"] if !ok { - log.Println("[app] [Monitor] response does not contain the required name") + Log("response does not contain the required name") return } value, ok := addressMap.(string) if !ok || value != address { - log.Println("[app] [Monitor] return machine is not the required one") + Log("return machine is not the required one") return } @@ -296,7 +292,7 @@ func IsLegitMachineAddress(address string) (active bool, err error) { } func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) { - log.Println("[app] [Monitor] Connection lost: " + err.Error()) + Log("connection lost: " + err.Error()) // Handle connection loss here (e.g., reconnect attempts, logging) if !mms.IsTerminated() { mms.lostConnectionMutex.Lock() @@ -308,7 +304,7 @@ func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) { func (mms *MqttMonitor) MonitorActiveParticipants() { mms.clientMutex.Lock() if mms.localMqttClient != nil { - log.Println("[app] [Monitor] client is still working") + Log("client is still working") mms.clientMutex.Unlock() return } @@ -320,7 +316,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { mms.SetMaxRetries() for !mms.IsTerminated() && mms.maxRetries > 0 { if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { - log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error()) + Log("error connecting to mqtt: " + token.Error().Error()) mms.maxRetries-- time.Sleep(time.Second * 5) continue @@ -329,24 +325,24 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { mms.lostConnection = false mms.lostConnectionMutex.Unlock() - log.Println("[app] [Monitor] established connection") + Log("established connection") var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler // Subscribe to a topic subscriptionTopic := "tele/#" if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { - log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error()) + Log("error registering the mqtt subscription: " + token.Error().Error()) continue } - log.Println("[app] [Monitor] subscribed to tele/# channels") + Log("subscribed to tele/# channels") for !mms.IsTerminated() { mms.lostConnectionMutex.Lock() lostConnectionEvent := mms.lostConnection mms.lostConnectionMutex.Unlock() if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent { - log.Println("[app] [Monitor] retry establishing a connection") + Log("retry establishing a connection") break // Exit inner loop on disconnect } @@ -357,7 +353,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { } if mms.maxRetries == 0 { - log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.") + Log("reached maximum reconnection attempts. Exiting. New client will be activated soon.") } mms.clientMutex.Lock() @@ -375,12 +371,3 @@ func SendUpdateMessage(mqttClient util.MQTTClientI) { func (mms *MqttMonitor) SetMaxRetries() { mms.maxRetries = 5 } - -func (mms *MqttMonitor) Log(msg string) { - mms.contextMutex.Lock() - localContext := mms.sdkContext - mms.contextMutex.Unlock() - if localContext != nil { - util.GetAppLogger().Info(*localContext, msg) - } -} diff --git a/x/dao/keeper/keeper.go b/x/dao/keeper/keeper.go index c9d9414..72fd34d 100644 --- a/x/dao/keeper/keeper.go +++ b/x/dao/keeper/keeper.go @@ -7,7 +7,6 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" paramtypes "github.com/cosmos/cosmos-sdk/x/params/types" - "github.com/planetmint/planetmint-go/monitor" "github.com/planetmint/planetmint-go/x/dao/types" ) @@ -50,7 +49,6 @@ func NewKeeper( if !ps.HasKeyTable() { ps = ps.WithKeyTable(types.ParamKeyTable()) } - monitor.LazyMqttMonitorLoader(rootDir) return &Keeper{ cdc: cdc, storeKey: storeKey,