Jürgen Eckel 779b1edd48
Eckelj/mqtt monitoring (#359)
* added a MqttMonitor module with levelDB and periodic cleanup
* initialized in the app
* passed to dao keeper
* added conversion methods (string2unixtime, byte ToJSON)
* removed obsolete keeper code
* maded  RDDLToken.Factor public
* added explicit mqtt client to the monitor module
* restart mqtt connection in mqttmonitor on connection loss
* adjusted mqttmock structure to be compatible
* added some linter exclusions to let the monitor tool pass
* created a MockMqttMonitor interface and mock object
* used this to pass tests
* made the MockMqttMonitor a global object so that it can be easily mocked
* removed MockMqttMonitor from the app/keeper initialization
* adjusted test cases to register "active machines" to the mqttmonitor
* added mutex in mocks to protect against data races
* defined mocks for the dao tests
* clear separation between interface and mqtt-Monitor

* added another waiting block to ensure the tx went through (multi-threading issue, race condition) during tests this failed sometimes
* added memstorage to test instead of a file based DB


Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
2024-04-08 10:49:00 +02:00

156 lines
4.3 KiB
Go

package util
import (
"encoding/json"
"net"
"strconv"
"strings"
"sync"
"time"
sdk "github.com/cosmos/cosmos-sdk/types"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/planetmint/planetmint-go/config"
"github.com/planetmint/planetmint-go/x/dao/types"
)
// MQTTClientI interface
type MQTTClientI interface {
Connect() mqtt.Token
Disconnect(quiesce uint)
Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
Unsubscribe(topics ...string) mqtt.Token
IsConnected() bool
}
var (
MQTTClient MQTTClientI
mqttMachineByAddressAvailabilityMapping map[string]bool
rwMu sync.RWMutex
)
const (
MqttCmdPrefix = "cmnd/"
)
func LazyLoadMQTTClient() {
if MQTTClient != nil {
return
}
conf := config.GetConfig()
hostPort := net.JoinHostPort(conf.MqttDomain, strconv.FormatInt(int64(conf.MqttPort), 10))
uri := "tcp://" + hostPort
opts := mqtt.NewClientOptions().AddBroker(uri)
opts.SetClientID(conf.ValidatorAddress)
opts.SetUsername(conf.MqttUser)
opts.SetPassword(conf.MqttPassword)
MQTTClient = mqtt.NewClient(opts)
}
func init() {
mqttMachineByAddressAvailabilityMapping = make(map[string]bool)
}
func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) {
// PoP can only be executed if at least two actors are available.
if challenge.Challenger == "" || challenge.Challengee == "" {
return
}
err := sendMqttPopInitMessages(challenge)
if err != nil {
GetAppLogger().Error(ctx, "MQTT error: "+err.Error())
return
}
GetAppLogger().Info(ctx, "MQTT message successfully sent: "+challenge.String())
}
func sendMqttPopInitMessages(challenge types.Challenge) (err error) {
LazyLoadMQTTClient()
if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil {
err = token.Error()
return
}
blockHeight := strconv.FormatInt(challenge.GetHeight(), 10)
token := MQTTClient.Publish(MqttCmdPrefix+challenge.GetChallengee()+"/PoPInit", 0, false, blockHeight)
token.Wait()
err = token.Error()
if err != nil {
return
}
token = MQTTClient.Publish(MqttCmdPrefix+challenge.GetChallenger()+"/PoPInit", 0, false, blockHeight)
token.Wait()
err = token.Error()
if err != nil {
return
}
MQTTClient.Disconnect(1000)
return
}
func GetMqttStatusOfParticipant(address string, responseTimeoutInMs int64) (isAvailable bool, err error) {
LazyLoadMQTTClient()
if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil {
err = token.Error()
return
}
rwMu.RLock() // Lock for reading
_, ok := mqttMachineByAddressAvailabilityMapping[address]
rwMu.RUnlock() // Unlock after reading
if ok {
return
}
var messageHandler mqtt.MessageHandler = func(_ mqtt.Client, msg mqtt.Message) {
topicParts := strings.Split(msg.Topic(), "/")
if len(topicParts) == 3 && topicParts[1] == address {
rwMu.Lock() // Lock for writing
mqttMachineByAddressAvailabilityMapping[address] = true
rwMu.Unlock() // Unlock after writing
}
}
subscriptionTopic := "stat/" + address + "/STATUS"
publishingTopic := MqttCmdPrefix + address + "/STATUS"
// Subscribe to a topic
if token := MQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
err = token.Error()
return
}
rwMu.Lock() // Lock for writing
mqttMachineByAddressAvailabilityMapping[address] = false
rwMu.Unlock() // Unlock after writing
if token := MQTTClient.Publish(publishingTopic, 0, false, ""); token.Wait() && token.Error() != nil {
err = token.Error()
} else {
duration := responseTimeoutInMs
time.Sleep(time.Millisecond * time.Duration(duration))
}
// Unsubscribe and disconnect
if token := MQTTClient.Unsubscribe(subscriptionTopic); token.Wait() && token.Error() != nil {
err = token.Error()
}
rwMu.Lock() // Lock for writing
isAvailable = mqttMachineByAddressAvailabilityMapping[address]
delete(mqttMachineByAddressAvailabilityMapping, address)
rwMu.Unlock() // Unlock after writing
MQTTClient.Disconnect(1000)
return
}
func ToJSON(payload []byte) (map[string]interface{}, error) {
jsonString := string(payload)
var result map[string]interface{}
// Unmarshal the JSON string into the map
err := json.Unmarshal([]byte(jsonString), &result)
return result, err
}