diff --git a/monitor/backend.go b/monitor/backend.go index 218c7c2..1471e70 100644 --- a/monitor/backend.go +++ b/monitor/backend.go @@ -3,6 +3,7 @@ package monitor import ( "encoding/json" "log" + "strconv" "time" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -21,7 +22,7 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er lastSeenBytes, err := json.Marshal(lastSeen) if err != nil { - mms.Log("[Monitor] Error serializing ConversionRequest: " + err.Error()) + log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error()) return } increaseCounter := false @@ -37,9 +38,11 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er err = mms.db.Put([]byte(address), lastSeenBytes, nil) mms.dbMutex.Unlock() if err != nil { - log.Println("[Monitor] storing addresses in DB: " + err.Error()) - return + log.Println("[app] [Monitor] error storing addresses in DB: " + err.Error()) + } else { + log.Println("[app] [Monitor] stored address in DB: " + address) } + return } @@ -61,8 +64,11 @@ func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) { // Check for any errors encountered during iteration if err := iter.Error(); err != nil { - log.Println("[Monitor] " + err.Error()) + log.Println("[app] [Monitor] " + err.Error()) + } else { + log.Println("[app] [Monitor] elements: " + strconv.FormatInt(amount, 10)) } + return } func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSeenEvent, err error) { @@ -70,13 +76,14 @@ func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSe value := iter.Value() err = json.Unmarshal(value, &lastSeen) if err != nil { - mms.Log("[Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error()) + log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error()) } return } func (mms *MqttMonitor) CleanupDB() { // Create an iterator for the database + log.Println("[app] [Monitor] Starting clean-up process") iter := mms.db.NewIterator(nil, nil) defer iter.Release() // Make sure to release the iterator at the end @@ -85,7 +92,7 @@ func (mms *MqttMonitor) CleanupDB() { // Use iter.Key() and iter.Value() to access the key and value lastSeen, err := mms.getDataFromIter(iter) if err != nil { - mms.Log("[Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) + log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) continue } timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix() @@ -93,13 +100,15 @@ func (mms *MqttMonitor) CleanupDB() { // If the entry is older than 12 hours, delete it err := mms.deleteEntry(iter.Key()) if err != nil { - mms.Log("[Monitor] Failed to delete entry: " + err.Error()) + log.Println("[app] [Monitor] Failed to delete entry: " + err.Error()) + } else { + log.Println("[app] [Monitor] Delete entry: " + string(iter.Key())) } } } // Check for any errors encountered during iteration if err := iter.Error(); err != nil { - mms.Log(err.Error()) + log.Println("[app] [Monitor] error during cleanup : " + err.Error()) } } diff --git a/monitor/interface.go b/monitor/interface.go index 7789fab..21879af 100644 --- a/monitor/interface.go +++ b/monitor/interface.go @@ -3,7 +3,6 @@ package monitor import ( "sync" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/planetmint/planetmint-go/config" "github.com/syndtr/goleveldb/leveldb" ) @@ -11,7 +10,6 @@ import ( type MQTTMonitorClientI interface { AddParticipant(address string, lastSeenTS int64) (err error) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) - SetContext(ctx sdk.Context) Start() (err error) } @@ -47,12 +45,6 @@ func LazyMqttMonitorLoader(homeDir string) { } } -func SetContext(ctx sdk.Context) { - monitorMutex.Lock() - mqttMonitorInstance.SetContext(ctx) - monitorMutex.Unlock() -} - func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { monitorMutex.Lock() challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors() diff --git a/monitor/mocks/mqtt_monitor.go b/monitor/mocks/mqtt_monitor.go index a08e814..ea020a8 100644 --- a/monitor/mocks/mqtt_monitor.go +++ b/monitor/mocks/mqtt_monitor.go @@ -1,9 +1,5 @@ package mocks -import ( - types "github.com/cosmos/cosmos-sdk/types" -) - // MockMQTTMonitorClientI is a mock of MQTTMonitorClientI interface. type MockMQTTMonitorClientI struct { myStringList []string @@ -27,10 +23,6 @@ func (m *MockMQTTMonitorClientI) SelectPoPParticipantsOutOfActiveActors() (strin return challenger, challengee, nil } -// SetContext mocks base method. -func (m *MockMQTTMonitorClientI) SetContext(_ types.Context) { -} - // Start mocks base method. func (m *MockMQTTMonitorClientI) Start() error { return nil diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 6029788..d4c4bb7 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -2,6 +2,7 @@ package monitor import ( "crypto/tls" + "log" "math/rand" "net" "strconv" @@ -17,11 +18,12 @@ import ( ) var MonitorMQTTClient util.MQTTClientI +var clientMutex sync.Mutex +var localMqttClient util.MQTTClientI type MqttMonitor struct { db *leveldb.DB dbMutex sync.Mutex // Mutex to synchronize write operations - ticker *time.Ticker CleanupPeriodicityInMinutes time.Duration config config.Config numberOfElements int64 @@ -29,6 +31,7 @@ type MqttMonitor struct { contextMutex sync.Mutex isTerminated bool terminationMutex sync.Mutex + maxRetries time.Duration } func (mms *MqttMonitor) Terminate() { @@ -44,9 +47,9 @@ func (mms *MqttMonitor) IsTerminated() (isTerminated bool) { return } -func LazyLoadMonitorMQTTClient() { +func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { if MonitorMQTTClient != nil { - return + return MonitorMQTTClient } conf := config.GetConfig() @@ -56,7 +59,7 @@ func LazyLoadMonitorMQTTClient() { uri = "ssl://" + hostPort } - opts := mqtt.NewClientOptions().AddBroker(uri) + opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true) opts.SetClientID(conf.ValidatorAddress + "-monitor") opts.SetUsername(conf.MqttUser) opts.SetPassword(conf.MqttPassword) @@ -65,7 +68,9 @@ func LazyLoadMonitorMQTTClient() { opts.SetTLSConfig(tlsConfig) } - MonitorMQTTClient = mqtt.NewClient(opts) + log.Println("[app] [Monitor] create new client") + client := mqtt.NewClient(opts) + return client } func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor { @@ -73,13 +78,20 @@ func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor { return service } -func (mms *MqttMonitor) registerPeriodicTasks() { - mms.ticker = time.NewTicker(mms.CleanupPeriodicityInMinutes * time.Minute) - go func() { - for range mms.ticker.C { // Loop over the ticker channel +func (mms *MqttMonitor) runPeriodicTasks() { + tickerRestablishConnection := time.NewTicker(2 * time.Minute) + tickerCleanup := time.NewTicker(5 * time.Minute) + defer tickerRestablishConnection.Stop() + defer tickerCleanup.Stop() + + for { + select { + case <-tickerRestablishConnection.C: + go mms.MonitorActiveParticipants() + case <-tickerCleanup.C: go mms.CleanupDB() } - }() + } } func (mms *MqttMonitor) Start() (err error) { @@ -88,8 +100,9 @@ func (mms *MqttMonitor) Start() (err error) { return } mms.numberOfElements = amount - mms.registerPeriodicTasks() + go mms.runPeriodicTasks() go mms.MonitorActiveParticipants() + go mms.CleanupDB() return } func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) { @@ -105,18 +118,18 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str return } randomChallenger, randomChallengee := mms.getRandomNumbers() - mms.Log("[Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) - mms.Log("[Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) + log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) + log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) iter := mms.db.NewIterator(nil, nil) defer iter.Release() count := 0 found := 0 var lastSeen LastSeenEvent for iter.Next() { - mms.Log("[Monitor] count: " + strconv.Itoa(count)) if count == randomChallenger { lastSeen, err = mms.getDataFromIter(iter) if err != nil { + log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallenger)) return } challenger = lastSeen.Address @@ -124,6 +137,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str } else if count == randomChallengee { lastSeen, err = mms.getDataFromIter(iter) if err != nil { + log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallengee)) return } challengee = lastSeen.Address @@ -135,6 +149,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str break } } + log.Println("[app] [Monitor] challenger, challengee: " + challenger + " " + challengee) return } @@ -157,59 +172,74 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { if err != nil || !valid { return } - payload, err := util.ToJSON(msg.Payload()) - if err != nil { - return - } - timeString, ok := payload["Time"].(string) - if !ok { - return - } - unixTime, err := util.String2UnixTime(timeString) - if err != nil { - return - } + unixTime := time.Now().Unix() err = mms.AddParticipant(address, unixTime) + if err != nil { - mms.Log("[Monitor] error adding active actor to DB: " + address + " " + err.Error()) + log.Println("[app] [Monitor] error adding active actor to DB: " + address + " " + err.Error()) } else { - mms.Log("[Monitor] added active actor to DB: " + address) + log.Println("[app] [Monitor] added active actor to DB: " + address) } } func (mms *MqttMonitor) MonitorActiveParticipants() { - LazyLoadMonitorMQTTClient() - for !mms.IsTerminated() { - if !MonitorMQTTClient.IsConnected() { - if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil { - mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) - panic(token.Error()) - } - - var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler - - // Subscribe to a topic - subscriptionTopic := "tele/#" - if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { - mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error()) - panic(token.Error()) - } - } - time.Sleep(5 * time.Second) + clientMutex.Lock() + if localMqttClient != nil { + log.Println("[app] [Monitor] client is still working") + return } + localMqttClient = mms.lazyLoadMonitorMQTTClient() + client := localMqttClient + clientMutex.Unlock() + + // Maximum reconnection attempts (adjust as needed) + mms.SetMaxRetries() + for !mms.IsTerminated() && mms.maxRetries > 0 { + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error()) + mms.maxRetries-- + time.Sleep(time.Second * 5) + continue + } + + var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler + + // Subscribe to a topic + subscriptionTopic := "tele/#" + if token := client.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error()) + continue + } + + for !mms.IsTerminated() { + if !client.IsConnected() { + log.Println("[app] [Monitor] retry establishing a connection") + break // Exit inner loop on disconnect + } + mms.SetMaxRetries() + time.Sleep(60 * time.Second) // Adjust sleep time based on your needs + } + } + + if mms.maxRetries == 0 { + log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.") + } + + clientMutex.Lock() + localMqttClient = nil + clientMutex.Unlock() +} + +func (mms *MqttMonitor) SetMaxRetries() { + mms.maxRetries = 5 } func (mms *MqttMonitor) Log(msg string) { mms.contextMutex.Lock() - if mms.sdkContext != nil { - util.GetAppLogger().Info(*mms.sdkContext, msg) + localContext := mms.sdkContext + mms.contextMutex.Unlock() + if localContext != nil { + util.GetAppLogger().Info(*localContext, msg) } - mms.contextMutex.Unlock() -} - -func (mms *MqttMonitor) SetContext(ctx sdk.Context) { - mms.contextMutex.Lock() - mms.sdkContext = &ctx - mms.contextMutex.Unlock() } diff --git a/monitor/mqtt_monitor_test.go b/monitor/mqtt_monitor_test.go index c36c1ed..f842669 100644 --- a/monitor/mqtt_monitor_test.go +++ b/monitor/mqtt_monitor_test.go @@ -23,7 +23,6 @@ const ( ) func TestGMonitorActiveParticipants(t *testing.T) { - monitor.LazyLoadMonitorMQTTClient() cfg := config.GetConfig() db, err := leveldb.Open(storage.NewMemStorage(), nil) assert.NoError(t, err) @@ -49,8 +48,6 @@ func TestGMonitorActiveParticipants(t *testing.T) { } func TestCleanupRemoval(t *testing.T) { - monitor.LazyLoadMonitorMQTTClient() - cfg := config.GetConfig() db, err := leveldb.Open(storage.NewMemStorage(), nil) assert.NoError(t, err) @@ -77,8 +74,6 @@ func TestCleanupRemoval(t *testing.T) { } func TestCleanupPrecisionTest(t *testing.T) { - monitor.LazyLoadMonitorMQTTClient() - cfg := config.GetConfig() db, err := leveldb.Open(storage.NewMemStorage(), nil) assert.NoError(t, err) diff --git a/util/time.go b/util/time.go deleted file mode 100644 index e718b38..0000000 --- a/util/time.go +++ /dev/null @@ -1,23 +0,0 @@ -package util - -import ( - "time" -) - -func String2UnixTime(timeInput string) (int64, error) { - // Layout specifying the format of the input string - // Note: Go uses a specific reference time (Mon Jan 2 15:04:05 MST 2006) to define format layouts - layout := "2006-01-02T15:04:05" - - // Parse the string into a time.Time struct in local time zone - parsedTime, err := time.Parse(layout, timeInput) - if err != nil { - return 0, err - } - - // Convert to UTC if not already - utcTime := parsedTime.UTC() - unixTime := utcTime.Unix() - - return unixTime, nil -} diff --git a/util/time_test.go b/util/time_test.go deleted file mode 100644 index d56c517..0000000 --- a/util/time_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package util - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestString2UnixTime(t *testing.T) { - t.Parallel() - input := "2024-03-26T11:10:41" - unixTime, err := String2UnixTime(input) - assert.NoError(t, err) - assert.Equal(t, int64(1711451441), unixTime) -} diff --git a/x/dao/abci.go b/x/dao/abci.go index 49680f0..0f562de 100644 --- a/x/dao/abci.go +++ b/x/dao/abci.go @@ -24,7 +24,6 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper) hexProposerAddress := hex.EncodeToString(proposerAddress) if isPopHeight(ctx, k, currentBlockHeight) { // select PoP participants - monitor.SetContext(ctx) challenger, challengee, err := monitor.SelectPoPParticipantsOutOfActiveActors() if err != nil { util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err)