added connection loss handler, extended logging, mqtt msg updates (#400)

* added a bit of logging
* added update messages to mqtt (every second with a timestamp)
* added isConnectionOpen to the interface and analysis this during execution
* added connection loss handler and status check
* put some global objects into an objects to avoid inter service testing issues
* TestingLocker Mutex to RWMutex (increase performance)
* termintationMutex became a sync.RWMutex
* added logging to the mqtt mock client
* made monitor Mutex a RWMutex
* added Mutex protection to the numberOfElements varialbe
* added another Waiting block to the machine attestation methods (CI tests)
* had to adjust the test cases to the impact of that change.

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2024-05-16 17:06:19 +02:00 committed by GitHub
parent 0c05081604
commit e99b614e4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 125 additions and 55 deletions

View File

@ -25,16 +25,13 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error()) log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error())
return return
} }
increaseCounter := false
// returns an error if the entry does not exist (we have to increase the counter in this case) // returns an error if the entry does not exist (we have to increase the counter in this case)
_, err = mms.db.Get([]byte(address), nil) _, err = mms.db.Get([]byte(address), nil)
if err != nil { if err != nil {
increaseCounter = true mms.setNumDBElements(mms.getNumDBElements() + 1)
} }
mms.dbMutex.Lock() mms.dbMutex.Lock()
if increaseCounter {
mms.numberOfElements++
}
err = mms.db.Put([]byte(address), lastSeenBytes, nil) err = mms.db.Put([]byte(address), lastSeenBytes, nil)
mms.dbMutex.Unlock() mms.dbMutex.Unlock()
if err != nil { if err != nil {
@ -47,9 +44,9 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
} }
func (mms *MqttMonitor) deleteEntry(key []byte) (err error) { func (mms *MqttMonitor) deleteEntry(key []byte) (err error) {
mms.setNumDBElements(mms.getNumDBElements() - 1)
mms.dbMutex.Lock() mms.dbMutex.Lock()
err = mms.db.Delete(key, nil) err = mms.db.Delete(key, nil)
mms.numberOfElements--
mms.dbMutex.Unlock() mms.dbMutex.Unlock()
return return
} }

View File

@ -13,7 +13,7 @@ type MQTTMonitorClientI interface {
Start() (err error) Start() (err error)
} }
var monitorMutex sync.Mutex var monitorMutex sync.RWMutex
var mqttMonitorInstance MQTTMonitorClientI var mqttMonitorInstance MQTTMonitorClientI
func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) {
@ -23,9 +23,9 @@ func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) {
} }
func LazyMqttMonitorLoader(homeDir string) { func LazyMqttMonitorLoader(homeDir string) {
monitorMutex.Lock() monitorMutex.RLock()
tmpInstance := mqttMonitorInstance tmpInstance := mqttMonitorInstance
monitorMutex.Unlock() monitorMutex.RUnlock()
if tmpInstance != nil { if tmpInstance != nil {
return return
} }
@ -36,9 +36,8 @@ func LazyMqttMonitorLoader(homeDir string) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
monitorMutex.Lock()
mqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig()) SetMqttMonitorInstance(NewMqttMonitorService(aciveActorsDB, *config.GetConfig()))
monitorMutex.Unlock()
err = mqttMonitorInstance.Start() err = mqttMonitorInstance.Start()
if err != nil { if err != nil {
panic(err) panic(err)
@ -46,9 +45,9 @@ func LazyMqttMonitorLoader(homeDir string) {
} }
func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
monitorMutex.Lock() monitorMutex.RLock()
challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors() challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors()
monitorMutex.Unlock() monitorMutex.RUnlock()
return return
} }
@ -58,8 +57,8 @@ func Start() (err error) {
} }
func AddParticipant(address string, lastSeenTS int64) (err error) { func AddParticipant(address string, lastSeenTS int64) (err error) {
monitorMutex.Lock() monitorMutex.RLock()
err = mqttMonitorInstance.AddParticipant(address, lastSeenTS) err = mqttMonitorInstance.AddParticipant(address, lastSeenTS)
monitorMutex.Unlock() monitorMutex.RUnlock()
return return
} }

View File

@ -1,5 +1,7 @@
package mocks package mocks
import "log"
// MockMQTTMonitorClientI is a mock of MQTTMonitorClientI interface. // MockMQTTMonitorClientI is a mock of MQTTMonitorClientI interface.
type MockMQTTMonitorClientI struct { type MockMQTTMonitorClientI struct {
myStringList []string myStringList []string
@ -7,6 +9,7 @@ type MockMQTTMonitorClientI struct {
// AddParticipant mocks base method. // AddParticipant mocks base method.
func (m *MockMQTTMonitorClientI) AddParticipant(address string, _ int64) error { func (m *MockMQTTMonitorClientI) AddParticipant(address string, _ int64) error {
log.Println("[app] [Monitor] [Mock] added participant: " + address)
m.myStringList = append(m.myStringList, address) m.myStringList = append(m.myStringList, address)
return nil return nil
@ -20,6 +23,7 @@ func (m *MockMQTTMonitorClientI) SelectPoPParticipantsOutOfActiveActors() (strin
challenger = m.myStringList[amount-2] challenger = m.myStringList[amount-2]
challengee = m.myStringList[amount-1] challengee = m.myStringList[amount-1]
} }
log.Println("[app] [Monitor] [Mock] participants: " + challenger + ", " + challengee)
return challenger, challengee, nil return challenger, challengee, nil
} }

View File

@ -18,20 +18,23 @@ import (
) )
var MonitorMQTTClient util.MQTTClientI var MonitorMQTTClient util.MQTTClientI
var clientMutex sync.Mutex
var localMqttClient util.MQTTClientI
type MqttMonitor struct { type MqttMonitor struct {
db *leveldb.DB db *leveldb.DB
dbMutex sync.Mutex // Mutex to synchronize write operations dbMutex sync.Mutex // Mutex to synchronize write operations
CleanupPeriodicityInMinutes time.Duration CleanupPeriodicityInMinutes time.Duration
config config.Config config config.Config
numberOfElementsMutex sync.RWMutex
numberOfElements int64 numberOfElements int64
sdkContext *sdk.Context sdkContext *sdk.Context
contextMutex sync.Mutex contextMutex sync.Mutex
isTerminated bool isTerminated bool
terminationMutex sync.Mutex terminationMutex sync.RWMutex
maxRetries time.Duration maxRetries time.Duration
lostConnection bool
lostConnectionMutex sync.Mutex
clientMutex sync.Mutex
localMqttClient util.MQTTClientI
} }
func (mms *MqttMonitor) Terminate() { func (mms *MqttMonitor) Terminate() {
@ -41,12 +44,29 @@ func (mms *MqttMonitor) Terminate() {
} }
func (mms *MqttMonitor) IsTerminated() (isTerminated bool) { func (mms *MqttMonitor) IsTerminated() (isTerminated bool) {
mms.terminationMutex.Lock() mms.terminationMutex.RLock()
isTerminated = mms.isTerminated isTerminated = mms.isTerminated
mms.terminationMutex.Unlock() mms.terminationMutex.RUnlock()
return return
} }
func (mms *MqttMonitor) getNumDBElements() int64 {
mms.numberOfElementsMutex.RLock()
defer mms.numberOfElementsMutex.RUnlock()
return mms.numberOfElements
}
func (mms *MqttMonitor) setNumDBElements(numElements int64) {
mms.numberOfElementsMutex.Lock()
defer mms.numberOfElementsMutex.Unlock()
mms.numberOfElements = numElements
}
func getClientID() string {
conf := config.GetConfig()
return "monitor-" + conf.ValidatorAddress
}
func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI {
if MonitorMQTTClient != nil { if MonitorMQTTClient != nil {
return MonitorMQTTClient return MonitorMQTTClient
@ -59,10 +79,11 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI {
uri = "ssl://" + hostPort uri = "ssl://" + hostPort
} }
opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true) opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(time.Second * 60).SetCleanSession(true)
opts.SetClientID(conf.ValidatorAddress + "-monitor") opts.SetClientID(getClientID())
opts.SetUsername(conf.MqttUser) opts.SetUsername(conf.MqttUser)
opts.SetPassword(conf.MqttPassword) opts.SetPassword(conf.MqttPassword)
opts.SetConnectionLostHandler(mms.onConnectionLost)
if conf.MqttTLS { if conf.MqttTLS {
tlsConfig := &tls.Config{} tlsConfig := &tls.Config{}
opts.SetTLSConfig(tlsConfig) opts.SetTLSConfig(tlsConfig)
@ -99,7 +120,7 @@ func (mms *MqttMonitor) Start() (err error) {
if err != nil { if err != nil {
return return
} }
mms.numberOfElements = amount mms.setNumDBElements(amount)
go mms.runPeriodicTasks() go mms.runPeriodicTasks()
go mms.MonitorActiveParticipants() go mms.MonitorActiveParticipants()
go mms.CleanupDB() go mms.CleanupDB()
@ -108,17 +129,19 @@ func (mms *MqttMonitor) Start() (err error) {
func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) { func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) {
for challenger == challengee { for challenger == challengee {
// Generate random numbers // Generate random numbers
challenger = rand.Intn(int(mms.numberOfElements)) numElements := int(mms.getNumDBElements())
challengee = rand.Intn(int(mms.numberOfElements)) challenger = rand.Intn(numElements)
challengee = rand.Intn(numElements)
} }
return return
} }
func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
if mms.numberOfElements < 2 { numElements := int(mms.getNumDBElements())
if numElements < 2 {
return return
} }
randomChallenger, randomChallengee := mms.getRandomNumbers() randomChallenger, randomChallengee := mms.getRandomNumbers()
log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(numElements))
log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
iter := mms.db.NewIterator(nil, nil) iter := mms.db.NewIterator(nil, nil)
defer iter.Release() defer iter.Release()
@ -183,41 +206,62 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
} }
} }
func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) {
log.Println("[app] [Monitor] Connection lost: " + err.Error())
// Handle connection loss here (e.g., reconnect attempts, logging)
if !mms.IsTerminated() {
mms.lostConnectionMutex.Lock()
mms.lostConnection = true
mms.lostConnectionMutex.Unlock()
}
}
func (mms *MqttMonitor) MonitorActiveParticipants() { func (mms *MqttMonitor) MonitorActiveParticipants() {
clientMutex.Lock() mms.clientMutex.Lock()
if localMqttClient != nil { if mms.localMqttClient != nil {
log.Println("[app] [Monitor] client is still working") log.Println("[app] [Monitor] client is still working")
clientMutex.Unlock() mms.clientMutex.Unlock()
return return
} }
localMqttClient = mms.lazyLoadMonitorMQTTClient() mms.localMqttClient = mms.lazyLoadMonitorMQTTClient()
client := localMqttClient mqttClient := mms.localMqttClient
clientMutex.Unlock() mms.clientMutex.Unlock()
// Maximum reconnection attempts (adjust as needed) // Maximum reconnection attempts (adjust as needed)
mms.SetMaxRetries() mms.SetMaxRetries()
for !mms.IsTerminated() && mms.maxRetries > 0 { for !mms.IsTerminated() && mms.maxRetries > 0 {
if token := client.Connect(); token.Wait() && token.Error() != nil { if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error()) log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error())
mms.maxRetries-- mms.maxRetries--
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
continue continue
} }
mms.lostConnectionMutex.Lock()
mms.lostConnection = false
mms.lostConnectionMutex.Unlock()
log.Println("[app] [Monitor] established connection")
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
// Subscribe to a topic // Subscribe to a topic
subscriptionTopic := "tele/#" subscriptionTopic := "tele/#"
if token := client.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error()) log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error())
continue continue
} }
log.Println("[app] [Monitor] subscribed to tele/# channels")
for !mms.IsTerminated() { for !mms.IsTerminated() {
if !client.IsConnected() { mms.lostConnectionMutex.Lock()
lostConnectionEvent := mms.lostConnection
mms.lostConnectionMutex.Unlock()
if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent {
log.Println("[app] [Monitor] retry establishing a connection") log.Println("[app] [Monitor] retry establishing a connection")
break // Exit inner loop on disconnect break // Exit inner loop on disconnect
} }
SendUpdateMessage(mqttClient)
mms.SetMaxRetries() mms.SetMaxRetries()
time.Sleep(60 * time.Second) // Adjust sleep time based on your needs time.Sleep(60 * time.Second) // Adjust sleep time based on your needs
} }
@ -227,9 +271,16 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.") log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.")
} }
clientMutex.Lock() mms.clientMutex.Lock()
localMqttClient = nil mms.localMqttClient = nil
clientMutex.Unlock() mms.clientMutex.Unlock()
}
func SendUpdateMessage(mqttClient util.MQTTClientI) {
// Publish message
now := time.Now().Format("2006-01-02 15:04:05") // Adjust format as needed
token := mqttClient.Publish("tele/"+getClientID(), 1, false, now)
token.Wait()
} }
func (mms *MqttMonitor) SetMaxRetries() { func (mms *MqttMonitor) SetMaxRetries() {

View File

@ -109,6 +109,7 @@ func (s *ConsumptionE2ETestSuite) TestValidatorConsumption() {
out, err := lib.BroadcastTxWithFileLock(val.Address, msgs...) out, err := lib.BroadcastTxWithFileLock(val.Address, msgs...)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NoError(s.network.WaitForNextBlock())
s.Require().NoError(s.network.WaitForNextBlock()) s.Require().NoError(s.network.WaitForNextBlock())
_, err = clitestutil.GetRawLogFromTxOut(val, out) _, err = clitestutil.GetRawLogFromTxOut(val, out)

View File

@ -164,9 +164,9 @@ func (s *SelectionE2ETestSuite) TestPopSelectionNoActors() {
} }
func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() { func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() {
err := e2etestutil.AttestMachine(s.network, machines[0].name, machines[0].mnemonic, 0, s.feeDenom) err := monitor.AddParticipant(machines[0].address, time.Now().Unix())
s.Require().NoError(err) s.Require().NoError(err)
err = monitor.AddParticipant(machines[0].address, time.Now().Unix()) err = e2etestutil.AttestMachine(s.network, machines[0].name, machines[0].mnemonic, 0, s.feeDenom)
s.Require().NoError(err) s.Require().NoError(err)
out := s.perpareLocalTest() out := s.perpareLocalTest()
@ -176,9 +176,9 @@ func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() {
} }
func (s *SelectionE2ETestSuite) TestPopSelectionTwoActors() { func (s *SelectionE2ETestSuite) TestPopSelectionTwoActors() {
err := e2etestutil.AttestMachine(s.network, machines[1].name, machines[1].mnemonic, 1, s.feeDenom) err := monitor.AddParticipant(machines[1].address, time.Now().Unix())
s.Require().NoError(err) s.Require().NoError(err)
err = monitor.AddParticipant(machines[1].address, time.Now().Unix()) err = e2etestutil.AttestMachine(s.network, machines[1].name, machines[1].mnemonic, 1, s.feeDenom)
s.Require().NoError(err) s.Require().NoError(err)
out := s.perpareLocalTest() out := s.perpareLocalTest()

View File

@ -68,6 +68,7 @@ func (s *E2ETestSuite) TestAttestMachine() {
out, err := e2etestutil.BuildSignBroadcastTx(s.T(), val.Address, msg1) out, err := e2etestutil.BuildSignBroadcastTx(s.T(), val.Address, msg1)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NoError(s.network.WaitForNextBlock())
s.Require().NoError(s.network.WaitForNextBlock()) s.Require().NoError(s.network.WaitForNextBlock())
rawLog, err := clitestutil.GetRawLogFromTxOut(val, out) rawLog, err := clitestutil.GetRawLogFromTxOut(val, out)
s.Require().NoError(err) s.Require().NoError(err)

View File

@ -44,6 +44,10 @@ func FundAccount(network *network.Network, account *keyring.Record, tokenDenom s
return err return err
} }
err = network.WaitForNextBlock()
if err != nil {
return err
}
err = network.WaitForNextBlock() err = network.WaitForNextBlock()
if err != nil { if err != nil {
return err return err
@ -99,6 +103,10 @@ func AttestMachine(network *network.Network, name string, mnemonic string, num i
return err return err
} }
err = network.WaitForNextBlock()
if err != nil {
return err
}
err = network.WaitForNextBlock() err = network.WaitForNextBlock()
if err != nil { if err != nil {
return err return err

View File

@ -16,7 +16,7 @@ var (
globalApplicationLoggerTag string globalApplicationLoggerTag string
appLogger *AppLogger appLogger *AppLogger
initAppLogger sync.Once initAppLogger sync.Once
syncTestingLog sync.Mutex syncTestingLog sync.RWMutex
) )
func init() { func init() {
@ -48,13 +48,13 @@ func format(msg string, keyvals ...interface{}) string {
} }
func (logger *AppLogger) testingLog(msg string, keyvals ...interface{}) { func (logger *AppLogger) testingLog(msg string, keyvals ...interface{}) {
syncTestingLog.RLock()
defer syncTestingLog.RUnlock()
if logger.testingLogger == nil { if logger.testingLogger == nil {
return return
} }
msg = format(msg, keyvals...) msg = format(msg, keyvals...)
syncTestingLog.Lock()
logger.testingLogger.Logf(msg) logger.testingLogger.Logf(msg)
syncTestingLog.Unlock()
} }
func (logger *AppLogger) Info(ctx sdk.Context, msg string, keyvals ...interface{}) { func (logger *AppLogger) Info(ctx sdk.Context, msg string, keyvals ...interface{}) {

View File

@ -9,14 +9,15 @@ import (
// MockMQTTClient is the mock mqtt client // MockMQTTClient is the mock mqtt client
type MockMQTTClient struct { type MockMQTTClient struct {
ConnectFunc func() mqtt.Token ConnectFunc func() mqtt.Token
DisconnectFunc func(quiesce uint) DisconnectFunc func(quiesce uint)
PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
UnsubscribeFunc func(topics ...string) mqtt.Token UnsubscribeFunc func(topics ...string) mqtt.Token
IsConnectedFunc func() bool IsConnectedFunc func() bool
connected bool IsConnectionOpenFunc func() bool
connectedMutex sync.Mutex connected bool
connectedMutex sync.Mutex
} }
// GetConnectFunc fetches the mock client's `Connect` func // GetConnectFunc fetches the mock client's `Connect` func
@ -129,3 +130,10 @@ func (m *MockMQTTClient) IsConnected() bool {
m.connectedMutex.Unlock() m.connectedMutex.Unlock()
return connected return connected
} }
func (m *MockMQTTClient) IsConnectionOpen() bool {
m.connectedMutex.Lock()
connected := m.connected
m.connectedMutex.Unlock()
return connected
}

View File

@ -23,6 +23,7 @@ type MQTTClientI interface {
Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
Unsubscribe(topics ...string) mqtt.Token Unsubscribe(topics ...string) mqtt.Token
IsConnected() bool IsConnected() bool
IsConnectionOpen() bool
} }
var ( var (