* avoid using app.Logger to have a consistent log from the beginning

* made lazyLoading local to the module
* fixed ticker bug so that the tasks are executed in an endless loop
* fixed timezone bug of Add Participant. only validator local time is used not the timestamp from the mqtt messages as these might come from a different time zone (resulted in delayed deletion from the activeActors DB)
* improved connection management for the monitor
* added a watchdog task to restart the monitor in case of connection loss (every 2 min)

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2024-05-15 11:12:59 +02:00
parent c3bbf76dde
commit f68d0df98b
No known key found for this signature in database
4 changed files with 105 additions and 60 deletions

View File

@ -3,6 +3,7 @@ package monitor
import (
"encoding/json"
"log"
"strconv"
"time"
"github.com/syndtr/goleveldb/leveldb/iterator"
@ -21,7 +22,7 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
lastSeenBytes, err := json.Marshal(lastSeen)
if err != nil {
mms.Log("[Monitor] Error serializing ConversionRequest: " + err.Error())
log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error())
return
}
increaseCounter := false
@ -37,9 +38,11 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
err = mms.db.Put([]byte(address), lastSeenBytes, nil)
mms.dbMutex.Unlock()
if err != nil {
log.Println("[Monitor] storing addresses in DB: " + err.Error())
return
log.Println("[app] [Monitor] error storing addresses in DB: " + err.Error())
} else {
log.Println("[app] [Monitor] stored address in DB: " + address)
}
return
}
@ -61,8 +64,11 @@ func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) {
// Check for any errors encountered during iteration
if err := iter.Error(); err != nil {
log.Println("[Monitor] " + err.Error())
log.Println("[app] [Monitor] " + err.Error())
} else {
log.Println("[app] [Monitor] elements: " + strconv.FormatInt(amount, 10))
}
return
}
func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSeenEvent, err error) {
@ -70,13 +76,14 @@ func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSe
value := iter.Value()
err = json.Unmarshal(value, &lastSeen)
if err != nil {
mms.Log("[Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error())
log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error())
}
return
}
func (mms *MqttMonitor) CleanupDB() {
// Create an iterator for the database
log.Println("[app] [Monitor] Starting clean-up process")
iter := mms.db.NewIterator(nil, nil)
defer iter.Release() // Make sure to release the iterator at the end
@ -85,21 +92,25 @@ func (mms *MqttMonitor) CleanupDB() {
// Use iter.Key() and iter.Value() to access the key and value
lastSeen, err := mms.getDataFromIter(iter)
if err != nil {
mms.Log("[Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error())
log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error())
continue
}
timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix()
log.Println("[app] [Monitor] threshold " + strconv.FormatInt(timeThreshold, 10))
log.Println("[app] [Monitor] timestamp " + strconv.FormatInt(lastSeen.Timestamp, 10))
if lastSeen.Timestamp <= timeThreshold {
// If the entry is older than 12 hours, delete it
err := mms.deleteEntry(iter.Key())
if err != nil {
mms.Log("[Monitor] Failed to delete entry: " + err.Error())
log.Println("[app] [Monitor] Failed to delete entry: " + err.Error())
} else {
log.Println("[app] [Monitor] Delete entry: " + string(iter.Key()))
}
}
}
// Check for any errors encountered during iteration
if err := iter.Error(); err != nil {
mms.Log(err.Error())
log.Println("[app] [Monitor] error during cleanup : " + err.Error())
}
}

View File

@ -2,6 +2,7 @@ package monitor
import (
"crypto/tls"
"log"
"math/rand"
"net"
"strconv"
@ -17,6 +18,8 @@ import (
)
var MonitorMQTTClient util.MQTTClientI
var clientMutex sync.Mutex
var localMqttClient util.MQTTClientI = nil
type MqttMonitor struct {
db *leveldb.DB
@ -29,6 +32,7 @@ type MqttMonitor struct {
contextMutex sync.Mutex
isTerminated bool
terminationMutex sync.Mutex
maxRetries time.Duration
}
func (mms *MqttMonitor) Terminate() {
@ -44,9 +48,9 @@ func (mms *MqttMonitor) IsTerminated() (isTerminated bool) {
return
}
func LazyLoadMonitorMQTTClient() {
func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI {
if MonitorMQTTClient != nil {
return
return MonitorMQTTClient
}
conf := config.GetConfig()
@ -56,7 +60,7 @@ func LazyLoadMonitorMQTTClient() {
uri = "ssl://" + hostPort
}
opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60)
opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true)
opts.SetClientID(conf.ValidatorAddress + "-monitor")
opts.SetUsername(conf.MqttUser)
opts.SetPassword(conf.MqttPassword)
@ -65,7 +69,9 @@ func LazyLoadMonitorMQTTClient() {
opts.SetTLSConfig(tlsConfig)
}
MonitorMQTTClient = mqtt.NewClient(opts)
log.Println("[app] [Monitor] create new client")
client := mqtt.NewClient(opts)
return client
}
func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor {
@ -73,13 +79,21 @@ func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor {
return service
}
func (mms *MqttMonitor) registerPeriodicTasks() {
mms.ticker = time.NewTicker(mms.CleanupPeriodicityInMinutes * time.Minute)
go func() {
for range mms.ticker.C { // Loop over the ticker channel
func (mms *MqttMonitor) runPeriodicTasks() {
tickerRestablishConnection := time.NewTicker(2 * time.Minute)
tickerCleanup := time.NewTicker(5 * time.Minute)
defer tickerRestablishConnection.Stop()
defer tickerCleanup.Stop()
for {
select {
case <-tickerRestablishConnection.C:
go mms.MonitorActiveParticipants()
case <-tickerCleanup.C:
go mms.CleanupDB()
}
}()
}
}
func (mms *MqttMonitor) Start() (err error) {
@ -88,8 +102,9 @@ func (mms *MqttMonitor) Start() (err error) {
return
}
mms.numberOfElements = amount
mms.registerPeriodicTasks()
go mms.runPeriodicTasks()
go mms.MonitorActiveParticipants()
go mms.CleanupDB()
return
}
func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) {
@ -105,18 +120,19 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
return
}
randomChallenger, randomChallengee := mms.getRandomNumbers()
mms.Log("[Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements)))
mms.Log("[Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements)))
log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
iter := mms.db.NewIterator(nil, nil)
defer iter.Release()
count := 0
found := 0
var lastSeen LastSeenEvent
for iter.Next() {
mms.Log("[Monitor] count: " + strconv.Itoa(count))
log.Println("[app] [Monitor] count: " + strconv.Itoa(count))
if count == randomChallenger {
lastSeen, err = mms.getDataFromIter(iter)
if err != nil {
log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallenger))
return
}
challenger = lastSeen.Address
@ -124,6 +140,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
} else if count == randomChallengee {
lastSeen, err = mms.getDataFromIter(iter)
if err != nil {
log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallengee))
return
}
challengee = lastSeen.Address
@ -135,6 +152,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
break
}
}
log.Println("[app] [Monitor] challenger, challengee: " + challenger + " " + challengee)
return
}
@ -157,47 +175,66 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
if err != nil || !valid {
return
}
payload, err := util.ToJSON(msg.Payload())
if err != nil {
return
}
timeString, ok := payload["Time"].(string)
if !ok {
return
}
unixTime, err := util.String2UnixTime(timeString)
if err != nil {
return
}
unixTime := time.Now().Unix()
err = mms.AddParticipant(address, unixTime)
if err != nil {
mms.Log("[Monitor] error adding active actor to DB: " + address + " " + err.Error())
log.Println("[app] [Monitor] error adding active actor to DB: " + address + " " + err.Error())
} else {
mms.Log("[Monitor] added active actor to DB: " + address)
log.Println("[app] [Monitor] added active actor to DB: " + address)
}
}
func (mms *MqttMonitor) MonitorActiveParticipants() {
LazyLoadMonitorMQTTClient()
for !mms.IsTerminated() {
if !MonitorMQTTClient.IsConnected() {
if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error())
panic(token.Error())
clientMutex.Lock()
if localMqttClient != nil {
log.Println("[app] [Monitor] client is still working")
return
}
localMqttClient = mms.lazyLoadMonitorMQTTClient()
client := localMqttClient
clientMutex.Unlock()
// Maximum reconnection attempts (adjust as needed)
mms.SetMaxRetries()
for !mms.IsTerminated() && mms.maxRetries > 0 {
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error())
mms.maxRetries--
time.Sleep(time.Second * 5)
continue
}
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
// Subscribe to a topic
subscriptionTopic := "tele/#"
if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error())
panic(token.Error())
if token := client.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error())
continue
}
for !mms.IsTerminated() {
if !client.IsConnected() {
log.Println("[app] [Monitor] retry establishing a connection")
break // Exit inner loop on disconnect
}
time.Sleep(60 * time.Second) // Adjust sleep time based on your needs
}
}
time.Sleep(5 * time.Second)
if mms.maxRetries == 0 {
log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.")
}
clientMutex.Lock()
localMqttClient = nil
clientMutex.Unlock()
}
func (mms *MqttMonitor) SetMaxRetries() {
mms.maxRetries = 5
}
func (mms *MqttMonitor) Log(msg string) {

View File

@ -23,7 +23,6 @@ const (
)
func TestGMonitorActiveParticipants(t *testing.T) {
monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig()
db, err := leveldb.Open(storage.NewMemStorage(), nil)
assert.NoError(t, err)
@ -49,8 +48,6 @@ func TestGMonitorActiveParticipants(t *testing.T) {
}
func TestCleanupRemoval(t *testing.T) {
monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig()
db, err := leveldb.Open(storage.NewMemStorage(), nil)
assert.NoError(t, err)
@ -77,8 +74,6 @@ func TestCleanupRemoval(t *testing.T) {
}
func TestCleanupPrecisionTest(t *testing.T) {
monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig()
db, err := leveldb.Open(storage.NewMemStorage(), nil)
assert.NoError(t, err)

View File

@ -1,6 +1,8 @@
package util
import (
"log"
"strconv"
"time"
)
@ -8,7 +10,7 @@ func String2UnixTime(timeInput string) (int64, error) {
// Layout specifying the format of the input string
// Note: Go uses a specific reference time (Mon Jan 2 15:04:05 MST 2006) to define format layouts
layout := "2006-01-02T15:04:05"
log.Println("[app] [Monitor] [time] add time string: " + timeInput)
// Parse the string into a time.Time struct in local time zone
parsedTime, err := time.Parse(layout, timeInput)
if err != nil {
@ -18,6 +20,6 @@ func String2UnixTime(timeInput string) (int64, error) {
// Convert to UTC if not already
utcTime := parsedTime.UTC()
unixTime := utcTime.Unix()
log.Println("[app] [Monitor] [time] unix time: " + strconv.FormatInt(unixTime, 10))
return unixTime, nil
}