diff --git a/monitor/backend.go b/monitor/backend.go index 218c7c2..a118ccd 100644 --- a/monitor/backend.go +++ b/monitor/backend.go @@ -3,6 +3,7 @@ package monitor import ( "encoding/json" "log" + "strconv" "time" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -21,7 +22,7 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er lastSeenBytes, err := json.Marshal(lastSeen) if err != nil { - mms.Log("[Monitor] Error serializing ConversionRequest: " + err.Error()) + log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error()) return } increaseCounter := false @@ -37,9 +38,11 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er err = mms.db.Put([]byte(address), lastSeenBytes, nil) mms.dbMutex.Unlock() if err != nil { - log.Println("[Monitor] storing addresses in DB: " + err.Error()) - return + log.Println("[app] [Monitor] error storing addresses in DB: " + err.Error()) + } else { + log.Println("[app] [Monitor] stored address in DB: " + address) } + return } @@ -61,8 +64,11 @@ func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) { // Check for any errors encountered during iteration if err := iter.Error(); err != nil { - log.Println("[Monitor] " + err.Error()) + log.Println("[app] [Monitor] " + err.Error()) + } else { + log.Println("[app] [Monitor] elements: " + strconv.FormatInt(amount, 10)) } + return } func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSeenEvent, err error) { @@ -70,13 +76,14 @@ func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSe value := iter.Value() err = json.Unmarshal(value, &lastSeen) if err != nil { - mms.Log("[Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error()) + log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error()) } return } func (mms *MqttMonitor) CleanupDB() { // Create an iterator for the database + log.Println("[app] [Monitor] Starting clean-up process") iter := mms.db.NewIterator(nil, nil) defer iter.Release() // Make sure to release the iterator at the end @@ -85,21 +92,25 @@ func (mms *MqttMonitor) CleanupDB() { // Use iter.Key() and iter.Value() to access the key and value lastSeen, err := mms.getDataFromIter(iter) if err != nil { - mms.Log("[Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) + log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) continue } timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix() + log.Println("[app] [Monitor] threshold " + strconv.FormatInt(timeThreshold, 10)) + log.Println("[app] [Monitor] timestamp " + strconv.FormatInt(lastSeen.Timestamp, 10)) if lastSeen.Timestamp <= timeThreshold { // If the entry is older than 12 hours, delete it err := mms.deleteEntry(iter.Key()) if err != nil { - mms.Log("[Monitor] Failed to delete entry: " + err.Error()) + log.Println("[app] [Monitor] Failed to delete entry: " + err.Error()) + } else { + log.Println("[app] [Monitor] Delete entry: " + string(iter.Key())) } } } // Check for any errors encountered during iteration if err := iter.Error(); err != nil { - mms.Log(err.Error()) + log.Println("[app] [Monitor] error during cleanup : " + err.Error()) } } diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 4864c13..1bb9ee2 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -2,6 +2,7 @@ package monitor import ( "crypto/tls" + "log" "math/rand" "net" "strconv" @@ -17,6 +18,8 @@ import ( ) var MonitorMQTTClient util.MQTTClientI +var clientMutex sync.Mutex +var localMqttClient util.MQTTClientI = nil type MqttMonitor struct { db *leveldb.DB @@ -29,6 +32,7 @@ type MqttMonitor struct { contextMutex sync.Mutex isTerminated bool terminationMutex sync.Mutex + maxRetries time.Duration } func (mms *MqttMonitor) Terminate() { @@ -44,9 +48,9 @@ func (mms *MqttMonitor) IsTerminated() (isTerminated bool) { return } -func LazyLoadMonitorMQTTClient() { +func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { if MonitorMQTTClient != nil { - return + return MonitorMQTTClient } conf := config.GetConfig() @@ -56,7 +60,7 @@ func LazyLoadMonitorMQTTClient() { uri = "ssl://" + hostPort } - opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60) + opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true) opts.SetClientID(conf.ValidatorAddress + "-monitor") opts.SetUsername(conf.MqttUser) opts.SetPassword(conf.MqttPassword) @@ -65,7 +69,9 @@ func LazyLoadMonitorMQTTClient() { opts.SetTLSConfig(tlsConfig) } - MonitorMQTTClient = mqtt.NewClient(opts) + log.Println("[app] [Monitor] create new client") + client := mqtt.NewClient(opts) + return client } func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor { @@ -73,13 +79,21 @@ func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor { return service } -func (mms *MqttMonitor) registerPeriodicTasks() { - mms.ticker = time.NewTicker(mms.CleanupPeriodicityInMinutes * time.Minute) - go func() { - for range mms.ticker.C { // Loop over the ticker channel +func (mms *MqttMonitor) runPeriodicTasks() { + tickerRestablishConnection := time.NewTicker(2 * time.Minute) + tickerCleanup := time.NewTicker(5 * time.Minute) + defer tickerRestablishConnection.Stop() + defer tickerCleanup.Stop() + + for { + select { + case <-tickerRestablishConnection.C: + go mms.MonitorActiveParticipants() + case <-tickerCleanup.C: go mms.CleanupDB() } - }() + } + } func (mms *MqttMonitor) Start() (err error) { @@ -88,8 +102,9 @@ func (mms *MqttMonitor) Start() (err error) { return } mms.numberOfElements = amount - mms.registerPeriodicTasks() + go mms.runPeriodicTasks() go mms.MonitorActiveParticipants() + go mms.CleanupDB() return } func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) { @@ -105,18 +120,19 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str return } randomChallenger, randomChallengee := mms.getRandomNumbers() - mms.Log("[Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) - mms.Log("[Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) + log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) + log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) iter := mms.db.NewIterator(nil, nil) defer iter.Release() count := 0 found := 0 var lastSeen LastSeenEvent for iter.Next() { - mms.Log("[Monitor] count: " + strconv.Itoa(count)) + log.Println("[app] [Monitor] count: " + strconv.Itoa(count)) if count == randomChallenger { lastSeen, err = mms.getDataFromIter(iter) if err != nil { + log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallenger)) return } challenger = lastSeen.Address @@ -124,6 +140,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str } else if count == randomChallengee { lastSeen, err = mms.getDataFromIter(iter) if err != nil { + log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallengee)) return } challengee = lastSeen.Address @@ -135,6 +152,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str break } } + log.Println("[app] [Monitor] challenger, challengee: " + challenger + " " + challengee) return } @@ -157,47 +175,66 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { if err != nil || !valid { return } - payload, err := util.ToJSON(msg.Payload()) - if err != nil { - return - } - timeString, ok := payload["Time"].(string) - if !ok { - return - } - unixTime, err := util.String2UnixTime(timeString) - if err != nil { - return - } + unixTime := time.Now().Unix() err = mms.AddParticipant(address, unixTime) + if err != nil { - mms.Log("[Monitor] error adding active actor to DB: " + address + " " + err.Error()) + log.Println("[app] [Monitor] error adding active actor to DB: " + address + " " + err.Error()) } else { - mms.Log("[Monitor] added active actor to DB: " + address) + log.Println("[app] [Monitor] added active actor to DB: " + address) } } func (mms *MqttMonitor) MonitorActiveParticipants() { - LazyLoadMonitorMQTTClient() - for !mms.IsTerminated() { - if !MonitorMQTTClient.IsConnected() { - if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil { - mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) - panic(token.Error()) - } - - var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler - - // Subscribe to a topic - subscriptionTopic := "tele/#" - if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { - mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error()) - panic(token.Error()) - } - } - time.Sleep(5 * time.Second) + clientMutex.Lock() + if localMqttClient != nil { + log.Println("[app] [Monitor] client is still working") + return } + localMqttClient = mms.lazyLoadMonitorMQTTClient() + client := localMqttClient + clientMutex.Unlock() + + // Maximum reconnection attempts (adjust as needed) + mms.SetMaxRetries() + for !mms.IsTerminated() && mms.maxRetries > 0 { + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error()) + mms.maxRetries-- + time.Sleep(time.Second * 5) + continue + } + + var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler + + // Subscribe to a topic + subscriptionTopic := "tele/#" + if token := client.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error()) + continue + } + + for !mms.IsTerminated() { + if !client.IsConnected() { + log.Println("[app] [Monitor] retry establishing a connection") + break // Exit inner loop on disconnect + } + time.Sleep(60 * time.Second) // Adjust sleep time based on your needs + } + } + + if mms.maxRetries == 0 { + log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.") + } + + clientMutex.Lock() + localMqttClient = nil + clientMutex.Unlock() +} + +func (mms *MqttMonitor) SetMaxRetries() { + mms.maxRetries = 5 } func (mms *MqttMonitor) Log(msg string) { diff --git a/monitor/mqtt_monitor_test.go b/monitor/mqtt_monitor_test.go index c36c1ed..f842669 100644 --- a/monitor/mqtt_monitor_test.go +++ b/monitor/mqtt_monitor_test.go @@ -23,7 +23,6 @@ const ( ) func TestGMonitorActiveParticipants(t *testing.T) { - monitor.LazyLoadMonitorMQTTClient() cfg := config.GetConfig() db, err := leveldb.Open(storage.NewMemStorage(), nil) assert.NoError(t, err) @@ -49,8 +48,6 @@ func TestGMonitorActiveParticipants(t *testing.T) { } func TestCleanupRemoval(t *testing.T) { - monitor.LazyLoadMonitorMQTTClient() - cfg := config.GetConfig() db, err := leveldb.Open(storage.NewMemStorage(), nil) assert.NoError(t, err) @@ -77,8 +74,6 @@ func TestCleanupRemoval(t *testing.T) { } func TestCleanupPrecisionTest(t *testing.T) { - monitor.LazyLoadMonitorMQTTClient() - cfg := config.GetConfig() db, err := leveldb.Open(storage.NewMemStorage(), nil) assert.NoError(t, err) diff --git a/util/time.go b/util/time.go index e718b38..e8ebd21 100644 --- a/util/time.go +++ b/util/time.go @@ -1,6 +1,8 @@ package util import ( + "log" + "strconv" "time" ) @@ -8,7 +10,7 @@ func String2UnixTime(timeInput string) (int64, error) { // Layout specifying the format of the input string // Note: Go uses a specific reference time (Mon Jan 2 15:04:05 MST 2006) to define format layouts layout := "2006-01-02T15:04:05" - + log.Println("[app] [Monitor] [time] add time string: " + timeInput) // Parse the string into a time.Time struct in local time zone parsedTime, err := time.Parse(layout, timeInput) if err != nil { @@ -18,6 +20,6 @@ func String2UnixTime(timeInput string) (int64, error) { // Convert to UTC if not already utcTime := parsedTime.UTC() unixTime := utcTime.Unix() - + log.Println("[app] [Monitor] [time] unix time: " + strconv.FormatInt(unixTime, 10)) return unixTime, nil }