diff --git a/monitor/backend.go b/monitor/backend.go index edf956d..218c7c2 100644 --- a/monitor/backend.go +++ b/monitor/backend.go @@ -81,7 +81,7 @@ func (mms *MqttMonitor) CleanupDB() { defer iter.Release() // Make sure to release the iterator at the end // Iterate over all elements in the database - for iter.Next() { + for iter.Next() && !mms.IsTerminated() { // Use iter.Key() and iter.Value() to access the key and value lastSeen, err := mms.getDataFromIter(iter) if err != nil { diff --git a/monitor/interface.go b/monitor/interface.go index 7e5d6e0..4e73171 100644 --- a/monitor/interface.go +++ b/monitor/interface.go @@ -1,6 +1,8 @@ package monitor import ( + "sync" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/planetmint/planetmint-go/config" "github.com/syndtr/goleveldb/leveldb" @@ -13,10 +15,20 @@ type MQTTMonitorClientI interface { Start() (err error) } -var MqttMonitorInstance MQTTMonitorClientI +var monitorMutex sync.Mutex +var mqttMonitorInstance MQTTMonitorClientI + +func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { + monitorMutex.Lock() + mqttMonitorInstance = monitorInstance + monitorMutex.Unlock() +} func LazyMqttMonitorLoader(homeDir string) { - if MqttMonitorInstance != nil { + monitorMutex.Lock() + tmpInstance := mqttMonitorInstance + monitorMutex.Unlock() + if tmpInstance != nil { return } if homeDir == "" { @@ -26,9 +38,37 @@ func LazyMqttMonitorLoader(homeDir string) { if err != nil { panic(err) } - MqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig()) - err = MqttMonitorInstance.Start() + monitorMutex.Lock() + mqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig()) + monitorMutex.Unlock() + err = mqttMonitorInstance.Start() if err != nil { panic(err) } + +} + +func SetContext(ctx sdk.Context) { + monitorMutex.Lock() + mqttMonitorInstance.SetContext(ctx) + monitorMutex.Unlock() +} + +func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { + monitorMutex.Lock() + challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors() + monitorMutex.Unlock() + return +} + +func Start() (err error) { + err = mqttMonitorInstance.Start() + return +} + +func AddParticipant(address string, lastSeenTS int64) (err error) { + monitorMutex.Lock() + err = mqttMonitorInstance.AddParticipant(address, lastSeenTS) + monitorMutex.Unlock() + return } diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index f408292..c3830b5 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -26,6 +26,21 @@ type MqttMonitor struct { numberOfElements int64 sdkContext *sdk.Context contextMutex sync.Mutex + isTerminated bool + terminationMutex sync.Mutex +} + +func (mms *MqttMonitor) Terminate() { + mms.terminationMutex.Lock() + mms.isTerminated = true + mms.terminationMutex.Unlock() +} + +func (mms *MqttMonitor) IsTerminated() (isTerminated bool) { + mms.terminationMutex.Lock() + isTerminated = mms.isTerminated + mms.terminationMutex.Unlock() + return } func LazyLoadMonitorMQTTClient() { @@ -115,6 +130,9 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str } func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { + if mms.IsTerminated() { + return + } topicParts := strings.Split(msg.Topic(), "/") if len(topicParts) != 3 { return @@ -153,7 +171,7 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { func (mms *MqttMonitor) MonitorActiveParticipants() { LazyLoadMonitorMQTTClient() - for { + for !mms.IsTerminated() { if !MonitorMQTTClient.IsConnected() { if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil { mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) @@ -169,7 +187,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() { panic(token.Error()) } } - time.Sleep(30 * time.Second) + time.Sleep(5 * time.Second) } } diff --git a/monitor/mqtt_monitor_test.go b/monitor/mqtt_monitor_test.go index ff868ed..7033f4a 100644 --- a/monitor/mqtt_monitor_test.go +++ b/monitor/mqtt_monitor_test.go @@ -44,6 +44,7 @@ func TestGMonitorActiveParticipants(t *testing.T) { assert.NoError(t, err) assert.Contains(t, challenger, "plmnt") assert.Contains(t, challengee, "plmnt") + mqttMonitor.Terminate() } func TestCleanupRemoval(t *testing.T) { @@ -71,6 +72,7 @@ func TestCleanupRemoval(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "", challenger) assert.Contains(t, "", challengee) + mqttMonitor.Terminate() } func TestCleanupPrecisionTest(t *testing.T) { @@ -99,4 +101,5 @@ func TestCleanupPrecisionTest(t *testing.T) { assert.NoError(t, err) assert.Contains(t, challenger, "plmnt") assert.Contains(t, challengee, "plmnt") + mqttMonitor.Terminate() } diff --git a/tests/e2e/dao/pop/selection_suite.go b/tests/e2e/dao/pop/selection_suite.go index 3a1b25a..60c061e 100644 --- a/tests/e2e/dao/pop/selection_suite.go +++ b/tests/e2e/dao/pop/selection_suite.go @@ -166,7 +166,7 @@ func (s *SelectionE2ETestSuite) TestPopSelectionNoActors() { func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() { err := e2etestutil.AttestMachine(s.network, machines[0].name, machines[0].mnemonic, 0, s.feeDenom) s.Require().NoError(err) - err = monitor.MqttMonitorInstance.AddParticipant(machines[0].address, time.Now().Unix()) + err = monitor.AddParticipant(machines[0].address, time.Now().Unix()) s.Require().NoError(err) out := s.perpareLocalTest() @@ -178,7 +178,7 @@ func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() { func (s *SelectionE2ETestSuite) TestPopSelectionTwoActors() { err := e2etestutil.AttestMachine(s.network, machines[1].name, machines[1].mnemonic, 1, s.feeDenom) s.Require().NoError(err) - err = monitor.MqttMonitorInstance.AddParticipant(machines[1].address, time.Now().Unix()) + err = monitor.AddParticipant(machines[1].address, time.Now().Unix()) s.Require().NoError(err) out := s.perpareLocalTest() diff --git a/testutil/keeper/dao.go b/testutil/keeper/dao.go index 1c40f20..9e32731 100644 --- a/testutil/keeper/dao.go +++ b/testutil/keeper/dao.go @@ -24,7 +24,7 @@ import ( ) func DaoKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) { - monitor.MqttMonitorInstance = &monitormocks.MockMQTTMonitorClientI{} + monitor.SetMqttMonitorInstance(&monitormocks.MockMQTTMonitorClientI{}) storeKey := sdk.NewKVStoreKey(types.StoreKey) memStoreKey := storetypes.NewMemoryStoreKey(types.MemStoreKey) challengeStoreKey := storetypes.NewMemoryStoreKey(types.ChallengeKey) diff --git a/testutil/network/loader.go b/testutil/network/loader.go index ea5ac72..028aaaf 100644 --- a/testutil/network/loader.go +++ b/testutil/network/loader.go @@ -42,7 +42,7 @@ func Load(t *testing.T, configs ...Config) *Network { // use mock client for testing util.MQTTClient = &mocks.MockMQTTClient{} monitor.MonitorMQTTClient = &mocks.MockMQTTClient{} - monitor.MqttMonitorInstance = &monitormocks.MockMQTTMonitorClientI{} + monitor.SetMqttMonitorInstance(&monitormocks.MockMQTTMonitorClientI{}) elements.Client = &elementsmocks.MockClient{} util.RegisterAssetServiceHTTPClient = &mocks.MockClient{} diff --git a/x/dao/abci.go b/x/dao/abci.go index 0b1ff0a..49680f0 100644 --- a/x/dao/abci.go +++ b/x/dao/abci.go @@ -24,8 +24,8 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper) hexProposerAddress := hex.EncodeToString(proposerAddress) if isPopHeight(ctx, k, currentBlockHeight) { // select PoP participants - monitor.MqttMonitorInstance.SetContext(ctx) - challenger, challengee, err := monitor.MqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors() + monitor.SetContext(ctx) + challenger, challengee, err := monitor.SelectPoPParticipantsOutOfActiveActors() if err != nil { util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err) }