diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 096efd7..478a337 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -47,6 +47,11 @@ func (mms *MqttMonitor) IsTerminated() (isTerminated bool) { return } +func getClientID() string { + conf := config.GetConfig() + return "-monitor" + conf.ValidatorAddress +} + func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { if MonitorMQTTClient != nil { return MonitorMQTTClient @@ -60,7 +65,7 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { } opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true) - opts.SetClientID(conf.ValidatorAddress + "-monitor") + opts.SetClientID(getClientID()) opts.SetUsername(conf.MqttUser) opts.SetPassword(conf.MqttPassword) if conf.MqttTLS { @@ -191,33 +196,37 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { return } localMqttClient = mms.lazyLoadMonitorMQTTClient() - client := localMqttClient + mqttClient := localMqttClient clientMutex.Unlock() // Maximum reconnection attempts (adjust as needed) mms.SetMaxRetries() for !mms.IsTerminated() && mms.maxRetries > 0 { - if token := client.Connect(); token.Wait() && token.Error() != nil { + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error()) mms.maxRetries-- time.Sleep(time.Second * 5) continue } + log.Println("[app] [Monitor] established connection") var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler // Subscribe to a topic subscriptionTopic := "tele/#" - if token := client.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error()) continue } + log.Println("[app] [Monitor] subscribed to tele/# channels") for !mms.IsTerminated() { - if !client.IsConnected() { + if !mqttClient.IsConnected() { log.Println("[app] [Monitor] retry establishing a connection") break // Exit inner loop on disconnect } + + SendUpdateMessage(mqttClient) mms.SetMaxRetries() time.Sleep(60 * time.Second) // Adjust sleep time based on your needs } @@ -232,6 +241,13 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { clientMutex.Unlock() } +func SendUpdateMessage(mqttClient util.MQTTClientI) { + // Publish message + now := time.Now().Format("2006-01-02 15:04:05") // Adjust format as needed + token := mqttClient.Publish("tele/"+getClientID(), 1, false, now) + token.Wait() +} + func (mms *MqttMonitor) SetMaxRetries() { mms.maxRetries = 5 }