mirror of
https://github.com/planetmint/planetmint-go.git
synced 2025-11-27 07:48:29 +00:00
* restart mqtt connection in mqttmonitor on connection loss
* adjusted mqttmock structure to be compatible Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
parent
d317a50ce8
commit
270793eb7c
@ -153,6 +153,8 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
|
|||||||
|
|
||||||
func (mms *MqttMonitor) MonitorActiveParticipants() {
|
func (mms *MqttMonitor) MonitorActiveParticipants() {
|
||||||
LazyLoadMonitorMQTTClient()
|
LazyLoadMonitorMQTTClient()
|
||||||
|
for {
|
||||||
|
if !MonitorMQTTClient.IsConnected() {
|
||||||
if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil {
|
if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil {
|
||||||
mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error())
|
mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error())
|
||||||
panic(token.Error())
|
panic(token.Error())
|
||||||
@ -167,6 +169,9 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
|
|||||||
panic(token.Error())
|
panic(token.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (mms *MqttMonitor) Log(msg string) {
|
func (mms *MqttMonitor) Log(msg string) {
|
||||||
mms.contextMutex.Lock()
|
mms.contextMutex.Lock()
|
||||||
|
|||||||
@ -14,6 +14,8 @@ type MockMQTTClient struct {
|
|||||||
PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
|
PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
|
||||||
SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
|
SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
|
||||||
UnsubscribeFunc func(topics ...string) mqtt.Token
|
UnsubscribeFunc func(topics ...string) mqtt.Token
|
||||||
|
IsConnectedFunc func() bool
|
||||||
|
connected bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetConnectFunc fetches the mock client's `Connect` func
|
// GetConnectFunc fetches the mock client's `Connect` func
|
||||||
@ -96,6 +98,7 @@ func GetUnsubscribeFunc(_ ...string) mqtt.Token {
|
|||||||
|
|
||||||
// Connect is the mock client's `Disconnect` func
|
// Connect is the mock client's `Disconnect` func
|
||||||
func (m *MockMQTTClient) Connect() mqtt.Token {
|
func (m *MockMQTTClient) Connect() mqtt.Token {
|
||||||
|
m.connected = true
|
||||||
return GetConnectFunc()
|
return GetConnectFunc()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,3 +119,7 @@ func (m *MockMQTTClient) Subscribe(topic string, qos byte, callback mqtt.Message
|
|||||||
func (m *MockMQTTClient) Unsubscribe(topics ...string) mqtt.Token {
|
func (m *MockMQTTClient) Unsubscribe(topics ...string) mqtt.Token {
|
||||||
return GetUnsubscribeFunc(topics...)
|
return GetUnsubscribeFunc(topics...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MockMQTTClient) IsConnected() bool {
|
||||||
|
return m.connected
|
||||||
|
}
|
||||||
|
|||||||
@ -21,6 +21,7 @@ type MQTTClientI interface {
|
|||||||
Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
|
Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
|
||||||
Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
|
Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
|
||||||
Unsubscribe(topics ...string) mqtt.Token
|
Unsubscribe(topics ...string) mqtt.Token
|
||||||
|
IsConnected() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user