planetmint-go/monitor/mqtt_monitor.go
Jürgen Eckel b45c381b3b
TLS support for planetmint (#381)
* added TSL support for mqtt
* added configuration value mqtt-tls (bool) to support tls and non-tls connections (testing)


Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
2024-04-24 14:59:17 +02:00

216 lines
5.4 KiB
Go

package monitor
import (
"crypto/tls"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
sdk "github.com/cosmos/cosmos-sdk/types"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/planetmint/planetmint-go/config"
"github.com/planetmint/planetmint-go/util"
"github.com/syndtr/goleveldb/leveldb"
)
var MonitorMQTTClient util.MQTTClientI
type MqttMonitor struct {
db *leveldb.DB
dbMutex sync.Mutex // Mutex to synchronize write operations
ticker *time.Ticker
CleanupPeriodicityInMinutes time.Duration
config config.Config
numberOfElements int64
sdkContext *sdk.Context
contextMutex sync.Mutex
isTerminated bool
terminationMutex sync.Mutex
}
func (mms *MqttMonitor) Terminate() {
mms.terminationMutex.Lock()
mms.isTerminated = true
mms.terminationMutex.Unlock()
}
func (mms *MqttMonitor) IsTerminated() (isTerminated bool) {
mms.terminationMutex.Lock()
isTerminated = mms.isTerminated
mms.terminationMutex.Unlock()
return
}
func LazyLoadMonitorMQTTClient() {
if MonitorMQTTClient != nil {
return
}
conf := config.GetConfig()
hostPort := net.JoinHostPort(conf.MqttDomain, strconv.FormatInt(int64(conf.MqttPort), 10))
uri := "tcp://" + hostPort
if conf.MqttTLS {
uri = "ssl://" + hostPort
}
opts := mqtt.NewClientOptions().AddBroker(uri)
opts.SetClientID(conf.ValidatorAddress + "-monitor")
opts.SetUsername(conf.MqttUser)
opts.SetPassword(conf.MqttPassword)
if conf.MqttTLS {
tlsConfig := &tls.Config{}
opts.SetTLSConfig(tlsConfig)
}
MonitorMQTTClient = mqtt.NewClient(opts)
}
func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor {
service := &MqttMonitor{db: db, config: config, numberOfElements: 0, CleanupPeriodicityInMinutes: 10}
return service
}
func (mms *MqttMonitor) registerPeriodicTasks() {
mms.ticker = time.NewTicker(mms.CleanupPeriodicityInMinutes * time.Minute)
go func() {
for range mms.ticker.C { // Loop over the ticker channel
go mms.CleanupDB()
}
}()
}
func (mms *MqttMonitor) Start() (err error) {
amount, err := mms.getAmountOfElements()
if err != nil {
return
}
mms.numberOfElements = amount
mms.registerPeriodicTasks()
go mms.MonitorActiveParticipants()
return
}
func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) {
for challenger == challengee {
// Generate random numbers
challenger = rand.Intn(int(mms.numberOfElements))
challengee = rand.Intn(int(mms.numberOfElements))
}
return
}
func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
if mms.numberOfElements < 2 {
return
}
randomChallenger, randomChallengee := mms.getRandomNumbers()
mms.Log("[Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements)))
mms.Log("[Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
iter := mms.db.NewIterator(nil, nil)
defer iter.Release()
count := 0
found := 0
var lastSeen LastSeenEvent
for iter.Next() {
mms.Log("[Monitor] count: " + strconv.Itoa(count))
if count == randomChallenger {
lastSeen, err = mms.getDataFromIter(iter)
if err != nil {
return
}
challenger = lastSeen.Address
found++
} else if count == randomChallengee {
lastSeen, err = mms.getDataFromIter(iter)
if err != nil {
return
}
challengee = lastSeen.Address
found++
}
count++
if found == 2 {
break
}
}
return
}
func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
if mms.IsTerminated() {
return
}
topicParts := strings.Split(msg.Topic(), "/")
if len(topicParts) != 3 {
return
}
if topicParts[0] != "tele" {
return
}
if topicParts[2] != "STATE" {
return
}
address := topicParts[1]
valid, err := util.IsValidAddress(address)
if err != nil || !valid {
return
}
payload, err := util.ToJSON(msg.Payload())
if err != nil {
return
}
timeString, ok := payload["Time"].(string)
if !ok {
return
}
unixTime, err := util.String2UnixTime(timeString)
if err != nil {
return
}
err = mms.AddParticipant(address, unixTime)
if err != nil {
mms.Log("[Monitor] error adding active actor to DB: " + address + " " + err.Error())
} else {
mms.Log("[Monitor] added active actor to DB: " + address)
}
}
func (mms *MqttMonitor) MonitorActiveParticipants() {
LazyLoadMonitorMQTTClient()
for !mms.IsTerminated() {
if !MonitorMQTTClient.IsConnected() {
if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error())
panic(token.Error())
}
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
// Subscribe to a topic
subscriptionTopic := "tele/#"
if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error())
panic(token.Error())
}
}
time.Sleep(5 * time.Second)
}
}
func (mms *MqttMonitor) Log(msg string) {
mms.contextMutex.Lock()
if mms.sdkContext != nil {
util.GetAppLogger().Info(*mms.sdkContext, msg)
}
mms.contextMutex.Unlock()
}
func (mms *MqttMonitor) SetContext(ctx sdk.Context) {
mms.contextMutex.Lock()
mms.sdkContext = &ctx
mms.contextMutex.Unlock()
}