From b09d568fa76d73ea0b5a1b95f83aedad5f30ff61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Eckel?= Date: Thu, 16 May 2024 12:48:43 +0200 Subject: [PATCH] added conenction loss handler and status check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jürgen Eckel --- monitor/mqtt_monitor.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 4132d95..6079e80 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -32,6 +32,8 @@ type MqttMonitor struct { isTerminated bool terminationMutex sync.Mutex maxRetries time.Duration + lostConnection bool + lostConnectionMutex sync.Mutex } func (mms *MqttMonitor) Terminate() { @@ -64,10 +66,11 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { uri = "ssl://" + hostPort } - opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true) + opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(time.Second * 60).SetCleanSession(true) opts.SetClientID(getClientID()) opts.SetUsername(conf.MqttUser) opts.SetPassword(conf.MqttPassword) + opts.SetConnectionLostHandler(mms.onConnectionLost) if conf.MqttTLS { tlsConfig := &tls.Config{} opts.SetTLSConfig(tlsConfig) @@ -188,6 +191,14 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { } } +func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) { + log.Println("[app] [Monitor] Connection lost: " + err.Error()) + // Handle connection loss here (e.g., reconnect attempts, logging) + mms.lostConnectionMutex.Lock() + mms.lostConnection = true + mms.lostConnectionMutex.Unlock() +} + func (mms *MqttMonitor) MonitorActiveParticipants() { clientMutex.Lock() if localMqttClient != nil { @@ -208,6 +219,10 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { time.Sleep(time.Second * 5) continue } + mms.lostConnectionMutex.Lock() + mms.lostConnection = false + mms.lostConnectionMutex.Unlock() + log.Println("[app] [Monitor] established connection") var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler @@ -221,7 +236,10 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { log.Println("[app] [Monitor] subscribed to tele/# channels") for !mms.IsTerminated() { - if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() { + mms.lostConnectionMutex.Lock() + lostConnectionEvent := mms.lostConnection + mms.lostConnectionMutex.Unlock() + if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent { log.Println("[app] [Monitor] retry establishing a connection") break // Exit inner loop on disconnect }