diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 7d89c67..0d2a9b7 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -18,8 +18,6 @@ import ( ) var MonitorMQTTClient util.MQTTClientI -var clientMutex sync.Mutex -var localMqttClient util.MQTTClientI type MqttMonitor struct { db *leveldb.DB @@ -34,6 +32,8 @@ type MqttMonitor struct { maxRetries time.Duration lostConnection bool lostConnectionMutex sync.Mutex + clientMutex sync.Mutex + localMqttClient util.MQTTClientI } func (mms *MqttMonitor) Terminate() { @@ -202,15 +202,15 @@ func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) { } func (mms *MqttMonitor) MonitorActiveParticipants() { - clientMutex.Lock() - if localMqttClient != nil { + mms.clientMutex.Lock() + if mms.localMqttClient != nil { log.Println("[app] [Monitor] client is still working") - clientMutex.Unlock() + mms.clientMutex.Unlock() return } - localMqttClient = mms.lazyLoadMonitorMQTTClient() - mqttClient := localMqttClient - clientMutex.Unlock() + mms.localMqttClient = mms.lazyLoadMonitorMQTTClient() + mqttClient := mms.localMqttClient + mms.clientMutex.Unlock() // Maximum reconnection attempts (adjust as needed) mms.SetMaxRetries() @@ -256,9 +256,9 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { log.Println("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.") } - clientMutex.Lock() - localMqttClient = nil - clientMutex.Unlock() + mms.clientMutex.Lock() + mms.localMqttClient = nil + mms.clientMutex.Unlock() } func SendUpdateMessage(mqttClient util.MQTTClientI) {