diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 7f3b2ea..f408292 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -153,18 +153,23 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { func (mms *MqttMonitor) MonitorActiveParticipants() { LazyLoadMonitorMQTTClient() - if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil { - mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) - panic(token.Error()) - } + for { + if !MonitorMQTTClient.IsConnected() { + if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil { + mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) + panic(token.Error()) + } - var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler + var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler - // Subscribe to a topic - subscriptionTopic := "tele/#" - if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { - mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error()) - panic(token.Error()) + // Subscribe to a topic + subscriptionTopic := "tele/#" + if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error()) + panic(token.Error()) + } + } + time.Sleep(30 * time.Second) } } diff --git a/util/mocks/mqtt.go b/util/mocks/mqtt.go index e6fcba3..0ad5728 100644 --- a/util/mocks/mqtt.go +++ b/util/mocks/mqtt.go @@ -14,6 +14,8 @@ type MockMQTTClient struct { PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token UnsubscribeFunc func(topics ...string) mqtt.Token + IsConnectedFunc func() bool + connected bool } // GetConnectFunc fetches the mock client's `Connect` func @@ -96,6 +98,7 @@ func GetUnsubscribeFunc(_ ...string) mqtt.Token { // Connect is the mock client's `Disconnect` func func (m *MockMQTTClient) Connect() mqtt.Token { + m.connected = true return GetConnectFunc() } @@ -116,3 +119,7 @@ func (m *MockMQTTClient) Subscribe(topic string, qos byte, callback mqtt.Message func (m *MockMQTTClient) Unsubscribe(topics ...string) mqtt.Token { return GetUnsubscribeFunc(topics...) } + +func (m *MockMQTTClient) IsConnected() bool { + return m.connected +} diff --git a/util/mqtt.go b/util/mqtt.go index 3334c9a..111968b 100644 --- a/util/mqtt.go +++ b/util/mqtt.go @@ -21,6 +21,7 @@ type MQTTClientI interface { Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token Unsubscribe(topics ...string) mqtt.Token + IsConnected() bool } var (