From 2223abc2d42e566d5b7aed59525a7de2646b41bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Eckel?= Date: Thu, 16 May 2024 12:20:49 +0200 Subject: [PATCH] * added a bit of logging * added update messages to mqtt (every second with a timestamp) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jürgen Eckel --- monitor/mqtt_monitor.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 096efd7..478a337 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -47,6 +47,11 @@ func (mms *MqttMonitor) IsTerminated() (isTerminated bool) { return } +func getClientID() string { + conf := config.GetConfig() + return "-monitor" + conf.ValidatorAddress +} + func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { if MonitorMQTTClient != nil { return MonitorMQTTClient @@ -60,7 +65,7 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { } opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true) - opts.SetClientID(conf.ValidatorAddress + "-monitor") + opts.SetClientID(getClientID()) opts.SetUsername(conf.MqttUser) opts.SetPassword(conf.MqttPassword) if conf.MqttTLS { @@ -191,33 +196,37 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { return } localMqttClient = mms.lazyLoadMonitorMQTTClient() - client := localMqttClient + mqttClient := localMqttClient clientMutex.Unlock() // Maximum reconnection attempts (adjust as needed) mms.SetMaxRetries() for !mms.IsTerminated() && mms.maxRetries > 0 { - if token := client.Connect(); token.Wait() && token.Error() != nil { + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error()) mms.maxRetries-- time.Sleep(time.Second * 5) continue } + log.Println("[app] [Monitor] established connection") var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler // Subscribe to a topic subscriptionTopic := "tele/#" - if token := client.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error()) continue } + log.Println("[app] [Monitor] subscribed to tele/# channels") for !mms.IsTerminated() { - if !client.IsConnected() { + if !mqttClient.IsConnected() { log.Println("[app] [Monitor] retry establishing a connection") break // Exit inner loop on disconnect } + + SendUpdateMessage(mqttClient) mms.SetMaxRetries() time.Sleep(60 * time.Second) // Adjust sleep time based on your needs } @@ -232,6 +241,13 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { clientMutex.Unlock() } +func SendUpdateMessage(mqttClient util.MQTTClientI) { + // Publish message + now := time.Now().Format("2006-01-02 15:04:05") // Adjust format as needed + token := mqttClient.Publish("tele/"+getClientID(), 1, false, now) + token.Wait() +} + func (mms *MqttMonitor) SetMaxRetries() { mms.maxRetries = 5 }