From e99b614e4a17ce396f42e02f4743eb4c50ad18ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Eckel?= Date: Thu, 16 May 2024 17:06:19 +0200 Subject: [PATCH] added connection loss handler, extended logging, mqtt msg updates (#400) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * added a bit of logging * added update messages to mqtt (every second with a timestamp) * added isConnectionOpen to the interface and analysis this during execution * added connection loss handler and status check * put some global objects into an objects to avoid inter service testing issues * TestingLocker Mutex to RWMutex (increase performance) * termintationMutex became a sync.RWMutex * added logging to the mqtt mock client * made monitor Mutex a RWMutex * added Mutex protection to the numberOfElements varialbe * added another Waiting block to the machine attestation methods (CI tests) * had to adjust the test cases to the impact of that change. Signed-off-by: Jürgen Eckel --- monitor/backend.go | 9 +- monitor/interface.go | 19 ++--- monitor/mocks/mqtt_monitor.go | 4 + monitor/mqtt_monitor.go | 99 ++++++++++++++++------ tests/e2e/dao/gas/gas_consumption_suite.go | 1 + tests/e2e/dao/pop/selection_suite.go | 8 +- tests/e2e/machine/suite.go | 1 + testutil/e2e/e2e.go | 8 ++ util/logger.go | 6 +- util/mocks/mqtt.go | 24 ++++-- util/mqtt.go | 1 + 11 files changed, 125 insertions(+), 55 deletions(-) diff --git a/monitor/backend.go b/monitor/backend.go index 1471e70..ff76767 100644 --- a/monitor/backend.go +++ b/monitor/backend.go @@ -25,16 +25,13 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error()) return } - increaseCounter := false + // returns an error if the entry does not exist (we have to increase the counter in this case) _, err = mms.db.Get([]byte(address), nil) if err != nil { - increaseCounter = true + mms.setNumDBElements(mms.getNumDBElements() + 1) } mms.dbMutex.Lock() - if increaseCounter { - mms.numberOfElements++ - } err = mms.db.Put([]byte(address), lastSeenBytes, nil) mms.dbMutex.Unlock() if err != nil { @@ -47,9 +44,9 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er } func (mms *MqttMonitor) deleteEntry(key []byte) (err error) { + mms.setNumDBElements(mms.getNumDBElements() - 1) mms.dbMutex.Lock() err = mms.db.Delete(key, nil) - mms.numberOfElements-- mms.dbMutex.Unlock() return } diff --git a/monitor/interface.go b/monitor/interface.go index 21879af..f967e6b 100644 --- a/monitor/interface.go +++ b/monitor/interface.go @@ -13,7 +13,7 @@ type MQTTMonitorClientI interface { Start() (err error) } -var monitorMutex sync.Mutex +var monitorMutex sync.RWMutex var mqttMonitorInstance MQTTMonitorClientI func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { @@ -23,9 +23,9 @@ func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { } func LazyMqttMonitorLoader(homeDir string) { - monitorMutex.Lock() + monitorMutex.RLock() tmpInstance := mqttMonitorInstance - monitorMutex.Unlock() + monitorMutex.RUnlock() if tmpInstance != nil { return } @@ -36,9 +36,8 @@ func LazyMqttMonitorLoader(homeDir string) { if err != nil { panic(err) } - monitorMutex.Lock() - mqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig()) - monitorMutex.Unlock() + + SetMqttMonitorInstance(NewMqttMonitorService(aciveActorsDB, *config.GetConfig())) err = mqttMonitorInstance.Start() if err != nil { panic(err) @@ -46,9 +45,9 @@ func LazyMqttMonitorLoader(homeDir string) { } func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { - monitorMutex.Lock() + monitorMutex.RLock() challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors() - monitorMutex.Unlock() + monitorMutex.RUnlock() return } @@ -58,8 +57,8 @@ func Start() (err error) { } func AddParticipant(address string, lastSeenTS int64) (err error) { - monitorMutex.Lock() + monitorMutex.RLock() err = mqttMonitorInstance.AddParticipant(address, lastSeenTS) - monitorMutex.Unlock() + monitorMutex.RUnlock() return } diff --git a/monitor/mocks/mqtt_monitor.go b/monitor/mocks/mqtt_monitor.go index ea020a8..4553f00 100644 --- a/monitor/mocks/mqtt_monitor.go +++ b/monitor/mocks/mqtt_monitor.go @@ -1,5 +1,7 @@ package mocks +import "log" + // MockMQTTMonitorClientI is a mock of MQTTMonitorClientI interface. type MockMQTTMonitorClientI struct { myStringList []string @@ -7,6 +9,7 @@ type MockMQTTMonitorClientI struct { // AddParticipant mocks base method. func (m *MockMQTTMonitorClientI) AddParticipant(address string, _ int64) error { + log.Println("[app] [Monitor] [Mock] added participant: " + address) m.myStringList = append(m.myStringList, address) return nil @@ -20,6 +23,7 @@ func (m *MockMQTTMonitorClientI) SelectPoPParticipantsOutOfActiveActors() (strin challenger = m.myStringList[amount-2] challengee = m.myStringList[amount-1] } + log.Println("[app] [Monitor] [Mock] participants: " + challenger + ", " + challengee) return challenger, challengee, nil } diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 096efd7..053fa78 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -18,20 +18,23 @@ import ( ) var MonitorMQTTClient util.MQTTClientI -var clientMutex sync.Mutex -var localMqttClient util.MQTTClientI type MqttMonitor struct { db *leveldb.DB dbMutex sync.Mutex // Mutex to synchronize write operations CleanupPeriodicityInMinutes time.Duration config config.Config + numberOfElementsMutex sync.RWMutex numberOfElements int64 sdkContext *sdk.Context contextMutex sync.Mutex isTerminated bool - terminationMutex sync.Mutex + terminationMutex sync.RWMutex maxRetries time.Duration + lostConnection bool + lostConnectionMutex sync.Mutex + clientMutex sync.Mutex + localMqttClient util.MQTTClientI } func (mms *MqttMonitor) Terminate() { @@ -41,12 +44,29 @@ func (mms *MqttMonitor) Terminate() { } func (mms *MqttMonitor) IsTerminated() (isTerminated bool) { - mms.terminationMutex.Lock() + mms.terminationMutex.RLock() isTerminated = mms.isTerminated - mms.terminationMutex.Unlock() + mms.terminationMutex.RUnlock() return } +func (mms *MqttMonitor) getNumDBElements() int64 { + mms.numberOfElementsMutex.RLock() + defer mms.numberOfElementsMutex.RUnlock() + return mms.numberOfElements +} + +func (mms *MqttMonitor) setNumDBElements(numElements int64) { + mms.numberOfElementsMutex.Lock() + defer mms.numberOfElementsMutex.Unlock() + mms.numberOfElements = numElements +} + +func getClientID() string { + conf := config.GetConfig() + return "monitor-" + conf.ValidatorAddress +} + func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { if MonitorMQTTClient != nil { return MonitorMQTTClient @@ -59,10 +79,11 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { uri = "ssl://" + hostPort } - opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true) - opts.SetClientID(conf.ValidatorAddress + "-monitor") + opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(time.Second * 60).SetCleanSession(true) + opts.SetClientID(getClientID()) opts.SetUsername(conf.MqttUser) opts.SetPassword(conf.MqttPassword) + opts.SetConnectionLostHandler(mms.onConnectionLost) if conf.MqttTLS { tlsConfig := &tls.Config{} opts.SetTLSConfig(tlsConfig) @@ -99,7 +120,7 @@ func (mms *MqttMonitor) Start() (err error) { if err != nil { return } - mms.numberOfElements = amount + mms.setNumDBElements(amount) go mms.runPeriodicTasks() go mms.MonitorActiveParticipants() go mms.CleanupDB() @@ -108,17 +129,19 @@ func (mms *MqttMonitor) Start() (err error) { func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) { for challenger == challengee { // Generate random numbers - challenger = rand.Intn(int(mms.numberOfElements)) - challengee = rand.Intn(int(mms.numberOfElements)) + numElements := int(mms.getNumDBElements()) + challenger = rand.Intn(numElements) + challengee = rand.Intn(numElements) } return } func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { - if mms.numberOfElements < 2 { + numElements := int(mms.getNumDBElements()) + if numElements < 2 { return } randomChallenger, randomChallengee := mms.getRandomNumbers() - log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) + log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(numElements)) log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) iter := mms.db.NewIterator(nil, nil) defer iter.Release() @@ -183,41 +206,62 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { } } +func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) { + log.Println("[app] [Monitor] Connection lost: " + err.Error()) + // Handle connection loss here (e.g., reconnect attempts, logging) + if !mms.IsTerminated() { + mms.lostConnectionMutex.Lock() + mms.lostConnection = true + mms.lostConnectionMutex.Unlock() + } +} + func (mms *MqttMonitor) MonitorActiveParticipants() { - clientMutex.Lock() - if localMqttClient != nil { + mms.clientMutex.Lock() + if mms.localMqttClient != nil { log.Println("[app] [Monitor] client is still working") - clientMutex.Unlock() + mms.clientMutex.Unlock() return } - localMqttClient = mms.lazyLoadMonitorMQTTClient() - client := localMqttClient - clientMutex.Unlock() + mms.localMqttClient = mms.lazyLoadMonitorMQTTClient() + mqttClient := mms.localMqttClient + mms.clientMutex.Unlock() // Maximum reconnection attempts (adjust as needed) mms.SetMaxRetries() for !mms.IsTerminated() && mms.maxRetries > 0 { - if token := client.Connect(); token.Wait() && token.Error() != nil { + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error()) mms.maxRetries-- time.Sleep(time.Second * 5) continue } + mms.lostConnectionMutex.Lock() + mms.lostConnection = false + mms.lostConnectionMutex.Unlock() + + log.Println("[app] [Monitor] established connection") var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler // Subscribe to a topic subscriptionTopic := "tele/#" - if token := client.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error()) continue } + log.Println("[app] [Monitor] subscribed to tele/# channels") for !mms.IsTerminated() { - if !client.IsConnected() { + mms.lostConnectionMutex.Lock() + lostConnectionEvent := mms.lostConnection + mms.lostConnectionMutex.Unlock() + if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent { log.Println("[app] [Monitor] retry establishing a connection") break // Exit inner loop on disconnect } + + SendUpdateMessage(mqttClient) mms.SetMaxRetries() time.Sleep(60 * time.Second) // Adjust sleep time based on your needs } @@ -227,9 +271,16 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.") } - clientMutex.Lock() - localMqttClient = nil - clientMutex.Unlock() + mms.clientMutex.Lock() + mms.localMqttClient = nil + mms.clientMutex.Unlock() +} + +func SendUpdateMessage(mqttClient util.MQTTClientI) { + // Publish message + now := time.Now().Format("2006-01-02 15:04:05") // Adjust format as needed + token := mqttClient.Publish("tele/"+getClientID(), 1, false, now) + token.Wait() } func (mms *MqttMonitor) SetMaxRetries() { diff --git a/tests/e2e/dao/gas/gas_consumption_suite.go b/tests/e2e/dao/gas/gas_consumption_suite.go index 5ee6de7..059cb09 100644 --- a/tests/e2e/dao/gas/gas_consumption_suite.go +++ b/tests/e2e/dao/gas/gas_consumption_suite.go @@ -109,6 +109,7 @@ func (s *ConsumptionE2ETestSuite) TestValidatorConsumption() { out, err := lib.BroadcastTxWithFileLock(val.Address, msgs...) s.Require().NoError(err) + s.Require().NoError(s.network.WaitForNextBlock()) s.Require().NoError(s.network.WaitForNextBlock()) _, err = clitestutil.GetRawLogFromTxOut(val, out) diff --git a/tests/e2e/dao/pop/selection_suite.go b/tests/e2e/dao/pop/selection_suite.go index fb2898f..0b001fb 100644 --- a/tests/e2e/dao/pop/selection_suite.go +++ b/tests/e2e/dao/pop/selection_suite.go @@ -164,9 +164,9 @@ func (s *SelectionE2ETestSuite) TestPopSelectionNoActors() { } func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() { - err := e2etestutil.AttestMachine(s.network, machines[0].name, machines[0].mnemonic, 0, s.feeDenom) + err := monitor.AddParticipant(machines[0].address, time.Now().Unix()) s.Require().NoError(err) - err = monitor.AddParticipant(machines[0].address, time.Now().Unix()) + err = e2etestutil.AttestMachine(s.network, machines[0].name, machines[0].mnemonic, 0, s.feeDenom) s.Require().NoError(err) out := s.perpareLocalTest() @@ -176,9 +176,9 @@ func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() { } func (s *SelectionE2ETestSuite) TestPopSelectionTwoActors() { - err := e2etestutil.AttestMachine(s.network, machines[1].name, machines[1].mnemonic, 1, s.feeDenom) + err := monitor.AddParticipant(machines[1].address, time.Now().Unix()) s.Require().NoError(err) - err = monitor.AddParticipant(machines[1].address, time.Now().Unix()) + err = e2etestutil.AttestMachine(s.network, machines[1].name, machines[1].mnemonic, 1, s.feeDenom) s.Require().NoError(err) out := s.perpareLocalTest() diff --git a/tests/e2e/machine/suite.go b/tests/e2e/machine/suite.go index 029f8ba..2a1008b 100644 --- a/tests/e2e/machine/suite.go +++ b/tests/e2e/machine/suite.go @@ -68,6 +68,7 @@ func (s *E2ETestSuite) TestAttestMachine() { out, err := e2etestutil.BuildSignBroadcastTx(s.T(), val.Address, msg1) s.Require().NoError(err) + s.Require().NoError(s.network.WaitForNextBlock()) s.Require().NoError(s.network.WaitForNextBlock()) rawLog, err := clitestutil.GetRawLogFromTxOut(val, out) s.Require().NoError(err) diff --git a/testutil/e2e/e2e.go b/testutil/e2e/e2e.go index 92cb05a..91facf6 100644 --- a/testutil/e2e/e2e.go +++ b/testutil/e2e/e2e.go @@ -44,6 +44,10 @@ func FundAccount(network *network.Network, account *keyring.Record, tokenDenom s return err } + err = network.WaitForNextBlock() + if err != nil { + return err + } err = network.WaitForNextBlock() if err != nil { return err @@ -99,6 +103,10 @@ func AttestMachine(network *network.Network, name string, mnemonic string, num i return err } + err = network.WaitForNextBlock() + if err != nil { + return err + } err = network.WaitForNextBlock() if err != nil { return err diff --git a/util/logger.go b/util/logger.go index abf6b2c..1ad7fdf 100644 --- a/util/logger.go +++ b/util/logger.go @@ -16,7 +16,7 @@ var ( globalApplicationLoggerTag string appLogger *AppLogger initAppLogger sync.Once - syncTestingLog sync.Mutex + syncTestingLog sync.RWMutex ) func init() { @@ -48,13 +48,13 @@ func format(msg string, keyvals ...interface{}) string { } func (logger *AppLogger) testingLog(msg string, keyvals ...interface{}) { + syncTestingLog.RLock() + defer syncTestingLog.RUnlock() if logger.testingLogger == nil { return } msg = format(msg, keyvals...) - syncTestingLog.Lock() logger.testingLogger.Logf(msg) - syncTestingLog.Unlock() } func (logger *AppLogger) Info(ctx sdk.Context, msg string, keyvals ...interface{}) { diff --git a/util/mocks/mqtt.go b/util/mocks/mqtt.go index 0d6bdfd..be8e368 100644 --- a/util/mocks/mqtt.go +++ b/util/mocks/mqtt.go @@ -9,14 +9,15 @@ import ( // MockMQTTClient is the mock mqtt client type MockMQTTClient struct { - ConnectFunc func() mqtt.Token - DisconnectFunc func(quiesce uint) - PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token - SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token - UnsubscribeFunc func(topics ...string) mqtt.Token - IsConnectedFunc func() bool - connected bool - connectedMutex sync.Mutex + ConnectFunc func() mqtt.Token + DisconnectFunc func(quiesce uint) + PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token + SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token + UnsubscribeFunc func(topics ...string) mqtt.Token + IsConnectedFunc func() bool + IsConnectionOpenFunc func() bool + connected bool + connectedMutex sync.Mutex } // GetConnectFunc fetches the mock client's `Connect` func @@ -129,3 +130,10 @@ func (m *MockMQTTClient) IsConnected() bool { m.connectedMutex.Unlock() return connected } + +func (m *MockMQTTClient) IsConnectionOpen() bool { + m.connectedMutex.Lock() + connected := m.connected + m.connectedMutex.Unlock() + return connected +} diff --git a/util/mqtt.go b/util/mqtt.go index a288235..49f7305 100644 --- a/util/mqtt.go +++ b/util/mqtt.go @@ -23,6 +23,7 @@ type MQTTClientI interface { Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token Unsubscribe(topics ...string) mqtt.Token IsConnected() bool + IsConnectionOpen() bool } var (