* added a bit of logging

* added update messages to mqtt (every second with a timestamp)

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2024-05-16 12:20:49 +02:00
parent 0c05081604
commit 2223abc2d4
No known key found for this signature in database

View File

@ -47,6 +47,11 @@ func (mms *MqttMonitor) IsTerminated() (isTerminated bool) {
return return
} }
func getClientID() string {
conf := config.GetConfig()
return "-monitor" + conf.ValidatorAddress
}
func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI { func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI {
if MonitorMQTTClient != nil { if MonitorMQTTClient != nil {
return MonitorMQTTClient return MonitorMQTTClient
@ -60,7 +65,7 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI {
} }
opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true) opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true)
opts.SetClientID(conf.ValidatorAddress + "-monitor") opts.SetClientID(getClientID())
opts.SetUsername(conf.MqttUser) opts.SetUsername(conf.MqttUser)
opts.SetPassword(conf.MqttPassword) opts.SetPassword(conf.MqttPassword)
if conf.MqttTLS { if conf.MqttTLS {
@ -191,33 +196,37 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
return return
} }
localMqttClient = mms.lazyLoadMonitorMQTTClient() localMqttClient = mms.lazyLoadMonitorMQTTClient()
client := localMqttClient mqttClient := localMqttClient
clientMutex.Unlock() clientMutex.Unlock()
// Maximum reconnection attempts (adjust as needed) // Maximum reconnection attempts (adjust as needed)
mms.SetMaxRetries() mms.SetMaxRetries()
for !mms.IsTerminated() && mms.maxRetries > 0 { for !mms.IsTerminated() && mms.maxRetries > 0 {
if token := client.Connect(); token.Wait() && token.Error() != nil { if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error()) log.Println("[app] [Monitor] error connecting to mqtt: " + token.Error().Error())
mms.maxRetries-- mms.maxRetries--
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
continue continue
} }
log.Println("[app] [Monitor] established connection")
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
// Subscribe to a topic // Subscribe to a topic
subscriptionTopic := "tele/#" subscriptionTopic := "tele/#"
if token := client.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error()) log.Println("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error())
continue continue
} }
log.Println("[app] [Monitor] subscribed to tele/# channels")
for !mms.IsTerminated() { for !mms.IsTerminated() {
if !client.IsConnected() { if !mqttClient.IsConnected() {
log.Println("[app] [Monitor] retry establishing a connection") log.Println("[app] [Monitor] retry establishing a connection")
break // Exit inner loop on disconnect break // Exit inner loop on disconnect
} }
SendUpdateMessage(mqttClient)
mms.SetMaxRetries() mms.SetMaxRetries()
time.Sleep(60 * time.Second) // Adjust sleep time based on your needs time.Sleep(60 * time.Second) // Adjust sleep time based on your needs
} }
@ -232,6 +241,13 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
clientMutex.Unlock() clientMutex.Unlock()
} }
func SendUpdateMessage(mqttClient util.MQTTClientI) {
// Publish message
now := time.Now().Format("2006-01-02 15:04:05") // Adjust format as needed
token := mqttClient.Publish("tele/"+getClientID(), 1, false, now)
token.Wait()
}
func (mms *MqttMonitor) SetMaxRetries() { func (mms *MqttMonitor) SetMaxRetries() {
mms.maxRetries = 5 mms.maxRetries = 5
} }