* added explicit mqtt client to the monitor module

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2024-04-04 14:16:06 +02:00
parent 75f1444c81
commit d317a50ce8
No known key found for this signature in database
3 changed files with 26 additions and 11 deletions

View File

@ -2,6 +2,7 @@ package monitor
import ( import (
"math/rand" "math/rand"
"net"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -14,6 +15,8 @@ import (
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
var MonitorMQTTClient util.MQTTClientI
type MqttMonitor struct { type MqttMonitor struct {
db *leveldb.DB db *leveldb.DB
dbMutex sync.Mutex // Mutex to synchronize write operations dbMutex sync.Mutex // Mutex to synchronize write operations
@ -25,6 +28,22 @@ type MqttMonitor struct {
contextMutex sync.Mutex contextMutex sync.Mutex
} }
func LazyLoadMonitorMQTTClient() {
if MonitorMQTTClient != nil {
return
}
conf := config.GetConfig()
hostPort := net.JoinHostPort(conf.MqttDomain, strconv.FormatInt(int64(conf.MqttPort), 10))
uri := "tcp://" + hostPort
opts := mqtt.NewClientOptions().AddBroker(uri)
opts.SetClientID(conf.ValidatorAddress + "-monitor")
opts.SetUsername(conf.MqttUser)
opts.SetPassword(conf.MqttPassword)
MonitorMQTTClient = mqtt.NewClient(opts)
}
func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor { func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor {
service := &MqttMonitor{db: db, config: config, numberOfElements: 0, CleanupPeriodicityInMinutes: 10} service := &MqttMonitor{db: db, config: config, numberOfElements: 0, CleanupPeriodicityInMinutes: 10}
return service return service
@ -133,8 +152,8 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
} }
func (mms *MqttMonitor) MonitorActiveParticipants() { func (mms *MqttMonitor) MonitorActiveParticipants() {
util.LazyLoadMQTTClient() LazyLoadMonitorMQTTClient()
if token := util.MQTTClient.Connect(); token.Wait() && token.Error() != nil { if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error())
panic(token.Error()) panic(token.Error())
} }
@ -143,7 +162,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
// Subscribe to a topic // Subscribe to a topic
subscriptionTopic := "tele/#" subscriptionTopic := "tele/#"
if token := util.MQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error()) mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error())
panic(token.Error()) panic(token.Error())
} }

View File

@ -6,7 +6,6 @@ import (
"github.com/planetmint/planetmint-go/config" "github.com/planetmint/planetmint-go/config"
"github.com/planetmint/planetmint-go/monitor" "github.com/planetmint/planetmint-go/monitor"
"github.com/planetmint/planetmint-go/util"
"github.com/planetmint/planetmint-go/util/mocks" "github.com/planetmint/planetmint-go/util/mocks"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
@ -14,7 +13,7 @@ import (
func init() { func init() {
// Use MQTT mock client // Use MQTT mock client
util.MQTTClient = &mocks.MockMQTTClient{} monitor.MonitorMQTTClient = &mocks.MockMQTTClient{}
} }
const ( const (
@ -23,7 +22,7 @@ const (
) )
func TestGMonitorActiveParticipants(t *testing.T) { func TestGMonitorActiveParticipants(t *testing.T) {
util.LazyLoadMQTTClient() monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig() cfg := config.GetConfig()
db, err := leveldb.OpenFile("./activeActors.db", nil) db, err := leveldb.OpenFile("./activeActors.db", nil)
assert.NoError(t, err) assert.NoError(t, err)
@ -46,7 +45,7 @@ func TestGMonitorActiveParticipants(t *testing.T) {
} }
func TestCleanupRemoval(t *testing.T) { func TestCleanupRemoval(t *testing.T) {
util.LazyLoadMQTTClient() monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig() cfg := config.GetConfig()
db, err := leveldb.OpenFile("./activeActors.db", nil) db, err := leveldb.OpenFile("./activeActors.db", nil)
@ -71,7 +70,7 @@ func TestCleanupRemoval(t *testing.T) {
} }
func TestCleanupPrecisionTest(t *testing.T) { func TestCleanupPrecisionTest(t *testing.T) {
util.LazyLoadMQTTClient() monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig() cfg := config.GetConfig()
db, err := leveldb.OpenFile("./activeActors.db", nil) db, err := leveldb.OpenFile("./activeActors.db", nil)

View File

@ -26,9 +26,7 @@ type MQTTClientI interface {
var ( var (
MQTTClient MQTTClientI MQTTClient MQTTClientI
mqttMachineByAddressAvailabilityMapping map[string]bool mqttMachineByAddressAvailabilityMapping map[string]bool
mqttAcitveMachineMapping map[string]int64
rwMu sync.RWMutex rwMu sync.RWMutex
rwActiveMachineMu sync.RWMutex
) )
const ( const (
@ -53,7 +51,6 @@ func LazyLoadMQTTClient() {
func init() { func init() {
mqttMachineByAddressAvailabilityMapping = make(map[string]bool) mqttMachineByAddressAvailabilityMapping = make(map[string]bool)
mqttAcitveMachineMapping = make(map[string]int64)
} }
func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) { func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) {