Eckelj/mqtt monitoring fix (#398)

* set keepalive ping
* made mutex more granular
* avoid using app.Logger to have a consistent log from the beginning
* made lazyLoading local to the module
* fixed ticker bug so that the tasks are executed in an endless loop
* fixed timezone bug of Add Participant. only validator local time is used not the timestamp from the mqtt messages as these might come from a different time zone (resulted in delayed deletion from the activeActors DB)
* improved connection management for the monitor
* added a watchdog task to restart the monitor in case of connection loss (every 2 min)
* removed obsolete SetContext function/interface
* removed obsolete time conversion method

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2024-05-15 13:23:06 +02:00 committed by GitHub
parent ccf491d658
commit 1f2b1702ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 102 additions and 123 deletions

View File

@ -3,6 +3,7 @@ package monitor
import ( import (
"encoding/json" "encoding/json"
"log" "log"
"strconv"
"time" "time"
"github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/iterator"
@ -21,7 +22,7 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
lastSeenBytes, err := json.Marshal(lastSeen) lastSeenBytes, err := json.Marshal(lastSeen)
if err != nil { if err != nil {
mms.Log("[Monitor] Error serializing ConversionRequest: " + err.Error()) log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error())
return return
} }
increaseCounter := false increaseCounter := false
@ -37,9 +38,11 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
err = mms.db.Put([]byte(address), lastSeenBytes, nil) err = mms.db.Put([]byte(address), lastSeenBytes, nil)
mms.dbMutex.Unlock() mms.dbMutex.Unlock()
if err != nil { if err != nil {
log.Println("[Monitor] storing addresses in DB: " + err.Error()) log.Println("[app] [Monitor] error storing addresses in DB: " + err.Error())
return } else {
log.Println("[app] [Monitor] stored address in DB: " + address)
} }
return return
} }
@ -61,8 +64,11 @@ func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) {
// Check for any errors encountered during iteration // Check for any errors encountered during iteration
if err := iter.Error(); err != nil { if err := iter.Error(); err != nil {
log.Println("[Monitor] " + err.Error()) log.Println("[app] [Monitor] " + err.Error())
} else {
log.Println("[app] [Monitor] elements: " + strconv.FormatInt(amount, 10))
} }
return return
} }
func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSeenEvent, err error) { func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSeenEvent, err error) {
@ -70,13 +76,14 @@ func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSe
value := iter.Value() value := iter.Value()
err = json.Unmarshal(value, &lastSeen) err = json.Unmarshal(value, &lastSeen)
if err != nil { if err != nil {
mms.Log("[Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error()) log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error())
} }
return return
} }
func (mms *MqttMonitor) CleanupDB() { func (mms *MqttMonitor) CleanupDB() {
// Create an iterator for the database // Create an iterator for the database
log.Println("[app] [Monitor] Starting clean-up process")
iter := mms.db.NewIterator(nil, nil) iter := mms.db.NewIterator(nil, nil)
defer iter.Release() // Make sure to release the iterator at the end defer iter.Release() // Make sure to release the iterator at the end
@ -85,7 +92,7 @@ func (mms *MqttMonitor) CleanupDB() {
// Use iter.Key() and iter.Value() to access the key and value // Use iter.Key() and iter.Value() to access the key and value
lastSeen, err := mms.getDataFromIter(iter) lastSeen, err := mms.getDataFromIter(iter)
if err != nil { if err != nil {
mms.Log("[Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) log.Println("[app] [Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error())
continue continue
} }
timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix() timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix()
@ -93,13 +100,15 @@ func (mms *MqttMonitor) CleanupDB() {
// If the entry is older than 12 hours, delete it // If the entry is older than 12 hours, delete it
err := mms.deleteEntry(iter.Key()) err := mms.deleteEntry(iter.Key())
if err != nil { if err != nil {
mms.Log("[Monitor] Failed to delete entry: " + err.Error()) log.Println("[app] [Monitor] Failed to delete entry: " + err.Error())
} else {
log.Println("[app] [Monitor] Delete entry: " + string(iter.Key()))
} }
} }
} }
// Check for any errors encountered during iteration // Check for any errors encountered during iteration
if err := iter.Error(); err != nil { if err := iter.Error(); err != nil {
mms.Log(err.Error()) log.Println("[app] [Monitor] error during cleanup : " + err.Error())
} }
} }

View File

@ -3,7 +3,6 @@ package monitor
import ( import (
"sync" "sync"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/planetmint/planetmint-go/config" "github.com/planetmint/planetmint-go/config"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
@ -11,7 +10,6 @@ import (
type MQTTMonitorClientI interface { type MQTTMonitorClientI interface {
AddParticipant(address string, lastSeenTS int64) (err error) AddParticipant(address string, lastSeenTS int64) (err error)
SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error)
SetContext(ctx sdk.Context)
Start() (err error) Start() (err error)
} }
@ -47,12 +45,6 @@ func LazyMqttMonitorLoader(homeDir string) {
} }
} }
func SetContext(ctx sdk.Context) {
monitorMutex.Lock()
mqttMonitorInstance.SetContext(ctx)
monitorMutex.Unlock()
}
func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
monitorMutex.Lock() monitorMutex.Lock()
challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors() challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors()

View File

@ -1,9 +1,5 @@
package mocks package mocks
import (
types "github.com/cosmos/cosmos-sdk/types"
)
// MockMQTTMonitorClientI is a mock of MQTTMonitorClientI interface. // MockMQTTMonitorClientI is a mock of MQTTMonitorClientI interface.
type MockMQTTMonitorClientI struct { type MockMQTTMonitorClientI struct {
myStringList []string myStringList []string
@ -27,10 +23,6 @@ func (m *MockMQTTMonitorClientI) SelectPoPParticipantsOutOfActiveActors() (strin
return challenger, challengee, nil return challenger, challengee, nil
} }
// SetContext mocks base method.
func (m *MockMQTTMonitorClientI) SetContext(_ types.Context) {
}
// Start mocks base method. // Start mocks base method.
func (m *MockMQTTMonitorClientI) Start() error { func (m *MockMQTTMonitorClientI) Start() error {
return nil return nil

View File

@ -2,6 +2,7 @@ package monitor
import ( import (
"crypto/tls" "crypto/tls"
"log"
"math/rand" "math/rand"
"net" "net"
"strconv" "strconv"
@ -17,11 +18,12 @@ import (
) )
var MonitorMQTTClient util.MQTTClientI var MonitorMQTTClient util.MQTTClientI
var clientMutex sync.Mutex
var localMqttClient util.MQTTClientI
type MqttMonitor struct { type MqttMonitor struct {
db *leveldb.DB db *leveldb.DB
dbMutex sync.Mutex // Mutex to synchronize write operations dbMutex sync.Mutex // Mutex to synchronize write operations
ticker *time.Ticker
CleanupPeriodicityInMinutes time.Duration CleanupPeriodicityInMinutes time.Duration
config config.Config config config.Config
numberOfElements int64 numberOfElements int64
@ -29,6 +31,7 @@ type MqttMonitor struct {
contextMutex sync.Mutex contextMutex sync.Mutex
isTerminated bool isTerminated bool
terminationMutex sync.Mutex terminationMutex sync.Mutex
maxRetries time.Duration
} }
func (mms *MqttMonitor) Terminate() { func (mms *MqttMonitor) Terminate() {
@ -44,9 +47,9 @@ func (mms *MqttMonitor) IsTerminated() (isTerminated bool) {
return return
} }
func LazyLoadMonitorMQTTClient() { func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI {
if MonitorMQTTClient != nil { if MonitorMQTTClient != nil {
return return MonitorMQTTClient
} }
conf := config.GetConfig() conf := config.GetConfig()
@ -56,7 +59,7 @@ func LazyLoadMonitorMQTTClient() {
uri = "ssl://" + hostPort uri = "ssl://" + hostPort
} }
opts := mqtt.NewClientOptions().AddBroker(uri) opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true)
opts.SetClientID(conf.ValidatorAddress + "-monitor") opts.SetClientID(conf.ValidatorAddress + "-monitor")
opts.SetUsername(conf.MqttUser) opts.SetUsername(conf.MqttUser)
opts.SetPassword(conf.MqttPassword) opts.SetPassword(conf.MqttPassword)
@ -65,7 +68,9 @@ func LazyLoadMonitorMQTTClient() {
opts.SetTLSConfig(tlsConfig) opts.SetTLSConfig(tlsConfig)
} }
MonitorMQTTClient = mqtt.NewClient(opts) log.Println("[app] [Monitor] create new client")
client := mqtt.NewClient(opts)
return client
} }
func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor { func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor {
@ -73,13 +78,20 @@ func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor {
return service return service
} }
func (mms *MqttMonitor) registerPeriodicTasks() { func (mms *MqttMonitor) runPeriodicTasks() {
mms.ticker = time.NewTicker(mms.CleanupPeriodicityInMinutes * time.Minute) tickerRestablishConnection := time.NewTicker(2 * time.Minute)
go func() { tickerCleanup := time.NewTicker(5 * time.Minute)
for range mms.ticker.C { // Loop over the ticker channel defer tickerRestablishConnection.Stop()
defer tickerCleanup.Stop()
for {
select {
case <-tickerRestablishConnection.C:
go mms.MonitorActiveParticipants()
case <-tickerCleanup.C:
go mms.CleanupDB() go mms.CleanupDB()
} }
}() }
} }
func (mms *MqttMonitor) Start() (err error) { func (mms *MqttMonitor) Start() (err error) {
@ -88,8 +100,9 @@ func (mms *MqttMonitor) Start() (err error) {
return return
} }
mms.numberOfElements = amount mms.numberOfElements = amount
mms.registerPeriodicTasks() go mms.runPeriodicTasks()
go mms.MonitorActiveParticipants() go mms.MonitorActiveParticipants()
go mms.CleanupDB()
return return
} }
func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) { func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) {
@ -105,18 +118,18 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
return return
} }
randomChallenger, randomChallengee := mms.getRandomNumbers() randomChallenger, randomChallengee := mms.getRandomNumbers()
mms.Log("[Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements)))
mms.Log("[Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
iter := mms.db.NewIterator(nil, nil) iter := mms.db.NewIterator(nil, nil)
defer iter.Release() defer iter.Release()
count := 0 count := 0
found := 0 found := 0
var lastSeen LastSeenEvent var lastSeen LastSeenEvent
for iter.Next() { for iter.Next() {
mms.Log("[Monitor] count: " + strconv.Itoa(count))
if count == randomChallenger { if count == randomChallenger {
lastSeen, err = mms.getDataFromIter(iter) lastSeen, err = mms.getDataFromIter(iter)
if err != nil { if err != nil {
log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallenger))
return return
} }
challenger = lastSeen.Address challenger = lastSeen.Address
@ -124,6 +137,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
} else if count == randomChallengee { } else if count == randomChallengee {
lastSeen, err = mms.getDataFromIter(iter) lastSeen, err = mms.getDataFromIter(iter)
if err != nil { if err != nil {
log.Println("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallengee))
return return
} }
challengee = lastSeen.Address challengee = lastSeen.Address
@ -135,6 +149,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
break break
} }
} }
log.Println("[app] [Monitor] challenger, challengee: " + challenger + " " + challengee)
return return
} }
@ -157,59 +172,74 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
if err != nil || !valid { if err != nil || !valid {
return return
} }
payload, err := util.ToJSON(msg.Payload())
if err != nil {
return
}
timeString, ok := payload["Time"].(string) unixTime := time.Now().Unix()
if !ok {
return
}
unixTime, err := util.String2UnixTime(timeString)
if err != nil {
return
}
err = mms.AddParticipant(address, unixTime) err = mms.AddParticipant(address, unixTime)
if err != nil { if err != nil {
mms.Log("[Monitor] error adding active actor to DB: " + address + " " + err.Error()) log.Println("[app] [Monitor] error adding active actor to DB: " + address + " " + err.Error())
} else { } else {
mms.Log("[Monitor] added active actor to DB: " + address) log.Println("[app] [Monitor] added active actor to DB: " + address)
} }
} }
func (mms *MqttMonitor) MonitorActiveParticipants() { func (mms *MqttMonitor) MonitorActiveParticipants() {
LazyLoadMonitorMQTTClient() clientMutex.Lock()
for !mms.IsTerminated() { if localMqttClient != nil {
if !MonitorMQTTClient.IsConnected() { log.Println("[app] [Monitor] client is still working")
if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil { return
mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) }
panic(token.Error()) localMqttClient = mms.lazyLoadMonitorMQTTClient()
client := localMqttClient
clientMutex.Unlock()
// Maximum reconnection attempts (adjust as needed)
mms.SetMaxRetries()
for !mms.IsTerminated() && mms.maxRetries > 0 {
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error())
mms.maxRetries--
time.Sleep(time.Second * 5)
continue
} }
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
// Subscribe to a topic // Subscribe to a topic
subscriptionTopic := "tele/#" subscriptionTopic := "tele/#"
if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { if token := client.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error()) log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error())
panic(token.Error()) continue
}
for !mms.IsTerminated() {
if !client.IsConnected() {
log.Println("[app] [Monitor] retry establishing a connection")
break // Exit inner loop on disconnect
}
mms.SetMaxRetries()
time.Sleep(60 * time.Second) // Adjust sleep time based on your needs
} }
} }
time.Sleep(5 * time.Second)
if mms.maxRetries == 0 {
log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.")
} }
clientMutex.Lock()
localMqttClient = nil
clientMutex.Unlock()
}
func (mms *MqttMonitor) SetMaxRetries() {
mms.maxRetries = 5
} }
func (mms *MqttMonitor) Log(msg string) { func (mms *MqttMonitor) Log(msg string) {
mms.contextMutex.Lock() mms.contextMutex.Lock()
if mms.sdkContext != nil { localContext := mms.sdkContext
util.GetAppLogger().Info(*mms.sdkContext, msg)
}
mms.contextMutex.Unlock() mms.contextMutex.Unlock()
if localContext != nil {
util.GetAppLogger().Info(*localContext, msg)
} }
func (mms *MqttMonitor) SetContext(ctx sdk.Context) {
mms.contextMutex.Lock()
mms.sdkContext = &ctx
mms.contextMutex.Unlock()
} }

View File

@ -23,7 +23,6 @@ const (
) )
func TestGMonitorActiveParticipants(t *testing.T) { func TestGMonitorActiveParticipants(t *testing.T) {
monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig() cfg := config.GetConfig()
db, err := leveldb.Open(storage.NewMemStorage(), nil) db, err := leveldb.Open(storage.NewMemStorage(), nil)
assert.NoError(t, err) assert.NoError(t, err)
@ -49,8 +48,6 @@ func TestGMonitorActiveParticipants(t *testing.T) {
} }
func TestCleanupRemoval(t *testing.T) { func TestCleanupRemoval(t *testing.T) {
monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig() cfg := config.GetConfig()
db, err := leveldb.Open(storage.NewMemStorage(), nil) db, err := leveldb.Open(storage.NewMemStorage(), nil)
assert.NoError(t, err) assert.NoError(t, err)
@ -77,8 +74,6 @@ func TestCleanupRemoval(t *testing.T) {
} }
func TestCleanupPrecisionTest(t *testing.T) { func TestCleanupPrecisionTest(t *testing.T) {
monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig() cfg := config.GetConfig()
db, err := leveldb.Open(storage.NewMemStorage(), nil) db, err := leveldb.Open(storage.NewMemStorage(), nil)
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -1,23 +0,0 @@
package util
import (
"time"
)
func String2UnixTime(timeInput string) (int64, error) {
// Layout specifying the format of the input string
// Note: Go uses a specific reference time (Mon Jan 2 15:04:05 MST 2006) to define format layouts
layout := "2006-01-02T15:04:05"
// Parse the string into a time.Time struct in local time zone
parsedTime, err := time.Parse(layout, timeInput)
if err != nil {
return 0, err
}
// Convert to UTC if not already
utcTime := parsedTime.UTC()
unixTime := utcTime.Unix()
return unixTime, nil
}

View File

@ -1,15 +0,0 @@
package util
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestString2UnixTime(t *testing.T) {
t.Parallel()
input := "2024-03-26T11:10:41"
unixTime, err := String2UnixTime(input)
assert.NoError(t, err)
assert.Equal(t, int64(1711451441), unixTime)
}

View File

@ -24,7 +24,6 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper)
hexProposerAddress := hex.EncodeToString(proposerAddress) hexProposerAddress := hex.EncodeToString(proposerAddress)
if isPopHeight(ctx, k, currentBlockHeight) { if isPopHeight(ctx, k, currentBlockHeight) {
// select PoP participants // select PoP participants
monitor.SetContext(ctx)
challenger, challengee, err := monitor.SelectPoPParticipantsOutOfActiveActors() challenger, challengee, err := monitor.SelectPoPParticipantsOutOfActiveActors()
if err != nil { if err != nil {
util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err) util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err)