mirror of
https://github.com/planetmint/planetmint-go.git
synced 2025-11-24 06:25:47 +00:00
added conenction loss handler and status check
Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
parent
281e3d16a5
commit
b09d568fa7
@ -32,6 +32,8 @@ type MqttMonitor struct {
|
|||||||
isTerminated bool
|
isTerminated bool
|
||||||
terminationMutex sync.Mutex
|
terminationMutex sync.Mutex
|
||||||
maxRetries time.Duration
|
maxRetries time.Duration
|
||||||
|
lostConnection bool
|
||||||
|
lostConnectionMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mms *MqttMonitor) Terminate() {
|
func (mms *MqttMonitor) Terminate() {
|
||||||
@ -64,10 +66,11 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI {
|
|||||||
uri = "ssl://" + hostPort
|
uri = "ssl://" + hostPort
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(60).SetCleanSession(true)
|
opts := mqtt.NewClientOptions().AddBroker(uri).SetKeepAlive(time.Second * 60).SetCleanSession(true)
|
||||||
opts.SetClientID(getClientID())
|
opts.SetClientID(getClientID())
|
||||||
opts.SetUsername(conf.MqttUser)
|
opts.SetUsername(conf.MqttUser)
|
||||||
opts.SetPassword(conf.MqttPassword)
|
opts.SetPassword(conf.MqttPassword)
|
||||||
|
opts.SetConnectionLostHandler(mms.onConnectionLost)
|
||||||
if conf.MqttTLS {
|
if conf.MqttTLS {
|
||||||
tlsConfig := &tls.Config{}
|
tlsConfig := &tls.Config{}
|
||||||
opts.SetTLSConfig(tlsConfig)
|
opts.SetTLSConfig(tlsConfig)
|
||||||
@ -188,6 +191,14 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) {
|
||||||
|
log.Println("[app] [Monitor] Connection lost: " + err.Error())
|
||||||
|
// Handle connection loss here (e.g., reconnect attempts, logging)
|
||||||
|
mms.lostConnectionMutex.Lock()
|
||||||
|
mms.lostConnection = true
|
||||||
|
mms.lostConnectionMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (mms *MqttMonitor) MonitorActiveParticipants() {
|
func (mms *MqttMonitor) MonitorActiveParticipants() {
|
||||||
clientMutex.Lock()
|
clientMutex.Lock()
|
||||||
if localMqttClient != nil {
|
if localMqttClient != nil {
|
||||||
@ -208,6 +219,10 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
|
|||||||
time.Sleep(time.Second * 5)
|
time.Sleep(time.Second * 5)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
mms.lostConnectionMutex.Lock()
|
||||||
|
mms.lostConnection = false
|
||||||
|
mms.lostConnectionMutex.Unlock()
|
||||||
|
|
||||||
log.Println("[app] [Monitor] established connection")
|
log.Println("[app] [Monitor] established connection")
|
||||||
|
|
||||||
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
|
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
|
||||||
@ -221,7 +236,10 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
|
|||||||
log.Println("[app] [Monitor] subscribed to tele/# channels")
|
log.Println("[app] [Monitor] subscribed to tele/# channels")
|
||||||
|
|
||||||
for !mms.IsTerminated() {
|
for !mms.IsTerminated() {
|
||||||
if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() {
|
mms.lostConnectionMutex.Lock()
|
||||||
|
lostConnectionEvent := mms.lostConnection
|
||||||
|
mms.lostConnectionMutex.Unlock()
|
||||||
|
if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent {
|
||||||
log.Println("[app] [Monitor] retry establishing a connection")
|
log.Println("[app] [Monitor] retry establishing a connection")
|
||||||
break // Exit inner loop on disconnect
|
break // Exit inner loop on disconnect
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user