diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 1576170..7f3b2ea 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -2,6 +2,7 @@ package monitor import ( "math/rand" + "net" "strconv" "strings" "sync" @@ -14,6 +15,8 @@ import ( "github.com/syndtr/goleveldb/leveldb" ) +var MonitorMQTTClient util.MQTTClientI + type MqttMonitor struct { db *leveldb.DB dbMutex sync.Mutex // Mutex to synchronize write operations @@ -25,6 +28,22 @@ type MqttMonitor struct { contextMutex sync.Mutex } +func LazyLoadMonitorMQTTClient() { + if MonitorMQTTClient != nil { + return + } + + conf := config.GetConfig() + hostPort := net.JoinHostPort(conf.MqttDomain, strconv.FormatInt(int64(conf.MqttPort), 10)) + uri := "tcp://" + hostPort + + opts := mqtt.NewClientOptions().AddBroker(uri) + opts.SetClientID(conf.ValidatorAddress + "-monitor") + opts.SetUsername(conf.MqttUser) + opts.SetPassword(conf.MqttPassword) + MonitorMQTTClient = mqtt.NewClient(opts) +} + func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor { service := &MqttMonitor{db: db, config: config, numberOfElements: 0, CleanupPeriodicityInMinutes: 10} return service @@ -133,8 +152,8 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { } func (mms *MqttMonitor) MonitorActiveParticipants() { - util.LazyLoadMQTTClient() - if token := util.MQTTClient.Connect(); token.Wait() && token.Error() != nil { + LazyLoadMonitorMQTTClient() + if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil { mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) panic(token.Error()) } @@ -143,7 +162,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { // Subscribe to a topic subscriptionTopic := "tele/#" - if token := util.MQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error()) panic(token.Error()) } diff --git a/monitor/mqtt_monitor_test.go b/monitor/mqtt_monitor_test.go index 5c5c9e1..6162705 100644 --- a/monitor/mqtt_monitor_test.go +++ b/monitor/mqtt_monitor_test.go @@ -6,7 +6,6 @@ import ( "github.com/planetmint/planetmint-go/config" "github.com/planetmint/planetmint-go/monitor" - "github.com/planetmint/planetmint-go/util" "github.com/planetmint/planetmint-go/util/mocks" "github.com/stretchr/testify/assert" "github.com/syndtr/goleveldb/leveldb" @@ -14,7 +13,7 @@ import ( func init() { // Use MQTT mock client - util.MQTTClient = &mocks.MockMQTTClient{} + monitor.MonitorMQTTClient = &mocks.MockMQTTClient{} } const ( @@ -23,7 +22,7 @@ const ( ) func TestGMonitorActiveParticipants(t *testing.T) { - util.LazyLoadMQTTClient() + monitor.LazyLoadMonitorMQTTClient() cfg := config.GetConfig() db, err := leveldb.OpenFile("./activeActors.db", nil) assert.NoError(t, err) @@ -46,7 +45,7 @@ func TestGMonitorActiveParticipants(t *testing.T) { } func TestCleanupRemoval(t *testing.T) { - util.LazyLoadMQTTClient() + monitor.LazyLoadMonitorMQTTClient() cfg := config.GetConfig() db, err := leveldb.OpenFile("./activeActors.db", nil) @@ -71,7 +70,7 @@ func TestCleanupRemoval(t *testing.T) { } func TestCleanupPrecisionTest(t *testing.T) { - util.LazyLoadMQTTClient() + monitor.LazyLoadMonitorMQTTClient() cfg := config.GetConfig() db, err := leveldb.OpenFile("./activeActors.db", nil) diff --git a/util/mqtt.go b/util/mqtt.go index 620bfb1..3334c9a 100644 --- a/util/mqtt.go +++ b/util/mqtt.go @@ -26,9 +26,7 @@ type MQTTClientI interface { var ( MQTTClient MQTTClientI mqttMachineByAddressAvailabilityMapping map[string]bool - mqttAcitveMachineMapping map[string]int64 rwMu sync.RWMutex - rwActiveMachineMu sync.RWMutex ) const ( @@ -53,7 +51,6 @@ func LazyLoadMQTTClient() { func init() { mqttMachineByAddressAvailabilityMapping = make(map[string]bool) - mqttAcitveMachineMapping = make(map[string]int64) } func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) {