clear seperation between interface and mqtt-Monitor

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2024-04-05 15:28:23 +02:00
parent 4303c3069d
commit ec0a090dda
No known key found for this signature in database
8 changed files with 74 additions and 13 deletions

View File

@ -81,7 +81,7 @@ func (mms *MqttMonitor) CleanupDB() {
defer iter.Release() // Make sure to release the iterator at the end
// Iterate over all elements in the database
for iter.Next() {
for iter.Next() && !mms.IsTerminated() {
// Use iter.Key() and iter.Value() to access the key and value
lastSeen, err := mms.getDataFromIter(iter)
if err != nil {

View File

@ -1,6 +1,8 @@
package monitor
import (
"sync"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/planetmint/planetmint-go/config"
"github.com/syndtr/goleveldb/leveldb"
@ -13,10 +15,20 @@ type MQTTMonitorClientI interface {
Start() (err error)
}
var MqttMonitorInstance MQTTMonitorClientI
var monitorMutex sync.Mutex
var mqttMonitorInstance MQTTMonitorClientI
func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) {
monitorMutex.Lock()
mqttMonitorInstance = monitorInstance
monitorMutex.Unlock()
}
func LazyMqttMonitorLoader(homeDir string) {
if MqttMonitorInstance != nil {
monitorMutex.Lock()
tmpInstance := mqttMonitorInstance
monitorMutex.Unlock()
if tmpInstance != nil {
return
}
if homeDir == "" {
@ -26,9 +38,37 @@ func LazyMqttMonitorLoader(homeDir string) {
if err != nil {
panic(err)
}
MqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig())
err = MqttMonitorInstance.Start()
monitorMutex.Lock()
mqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig())
monitorMutex.Unlock()
err = mqttMonitorInstance.Start()
if err != nil {
panic(err)
}
}
func SetContext(ctx sdk.Context) {
monitorMutex.Lock()
mqttMonitorInstance.SetContext(ctx)
monitorMutex.Unlock()
}
func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
monitorMutex.Lock()
challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors()
monitorMutex.Unlock()
return
}
func Start() (err error) {
err = mqttMonitorInstance.Start()
return
}
func AddParticipant(address string, lastSeenTS int64) (err error) {
monitorMutex.Lock()
err = mqttMonitorInstance.AddParticipant(address, lastSeenTS)
monitorMutex.Unlock()
return
}

View File

@ -26,6 +26,21 @@ type MqttMonitor struct {
numberOfElements int64
sdkContext *sdk.Context
contextMutex sync.Mutex
isTerminated bool
terminationMutex sync.Mutex
}
func (mms *MqttMonitor) Terminate() {
mms.terminationMutex.Lock()
mms.isTerminated = true
mms.terminationMutex.Unlock()
}
func (mms *MqttMonitor) IsTerminated() (isTerminated bool) {
mms.terminationMutex.Lock()
isTerminated = mms.isTerminated
mms.terminationMutex.Unlock()
return
}
func LazyLoadMonitorMQTTClient() {
@ -115,6 +130,9 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
}
func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
if mms.IsTerminated() {
return
}
topicParts := strings.Split(msg.Topic(), "/")
if len(topicParts) != 3 {
return
@ -153,7 +171,7 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
func (mms *MqttMonitor) MonitorActiveParticipants() {
LazyLoadMonitorMQTTClient()
for {
for !mms.IsTerminated() {
if !MonitorMQTTClient.IsConnected() {
if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error())
@ -169,7 +187,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
panic(token.Error())
}
}
time.Sleep(30 * time.Second)
time.Sleep(5 * time.Second)
}
}

View File

@ -44,6 +44,7 @@ func TestGMonitorActiveParticipants(t *testing.T) {
assert.NoError(t, err)
assert.Contains(t, challenger, "plmnt")
assert.Contains(t, challengee, "plmnt")
mqttMonitor.Terminate()
}
func TestCleanupRemoval(t *testing.T) {
@ -71,6 +72,7 @@ func TestCleanupRemoval(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "", challenger)
assert.Contains(t, "", challengee)
mqttMonitor.Terminate()
}
func TestCleanupPrecisionTest(t *testing.T) {
@ -99,4 +101,5 @@ func TestCleanupPrecisionTest(t *testing.T) {
assert.NoError(t, err)
assert.Contains(t, challenger, "plmnt")
assert.Contains(t, challengee, "plmnt")
mqttMonitor.Terminate()
}

View File

@ -166,7 +166,7 @@ func (s *SelectionE2ETestSuite) TestPopSelectionNoActors() {
func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() {
err := e2etestutil.AttestMachine(s.network, machines[0].name, machines[0].mnemonic, 0, s.feeDenom)
s.Require().NoError(err)
err = monitor.MqttMonitorInstance.AddParticipant(machines[0].address, time.Now().Unix())
err = monitor.AddParticipant(machines[0].address, time.Now().Unix())
s.Require().NoError(err)
out := s.perpareLocalTest()
@ -178,7 +178,7 @@ func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() {
func (s *SelectionE2ETestSuite) TestPopSelectionTwoActors() {
err := e2etestutil.AttestMachine(s.network, machines[1].name, machines[1].mnemonic, 1, s.feeDenom)
s.Require().NoError(err)
err = monitor.MqttMonitorInstance.AddParticipant(machines[1].address, time.Now().Unix())
err = monitor.AddParticipant(machines[1].address, time.Now().Unix())
s.Require().NoError(err)
out := s.perpareLocalTest()

View File

@ -24,7 +24,7 @@ import (
)
func DaoKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) {
monitor.MqttMonitorInstance = &monitormocks.MockMQTTMonitorClientI{}
monitor.SetMqttMonitorInstance(&monitormocks.MockMQTTMonitorClientI{})
storeKey := sdk.NewKVStoreKey(types.StoreKey)
memStoreKey := storetypes.NewMemoryStoreKey(types.MemStoreKey)
challengeStoreKey := storetypes.NewMemoryStoreKey(types.ChallengeKey)

View File

@ -42,7 +42,7 @@ func Load(t *testing.T, configs ...Config) *Network {
// use mock client for testing
util.MQTTClient = &mocks.MockMQTTClient{}
monitor.MonitorMQTTClient = &mocks.MockMQTTClient{}
monitor.MqttMonitorInstance = &monitormocks.MockMQTTMonitorClientI{}
monitor.SetMqttMonitorInstance(&monitormocks.MockMQTTMonitorClientI{})
elements.Client = &elementsmocks.MockClient{}
util.RegisterAssetServiceHTTPClient = &mocks.MockClient{}

View File

@ -24,8 +24,8 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper)
hexProposerAddress := hex.EncodeToString(proposerAddress)
if isPopHeight(ctx, k, currentBlockHeight) {
// select PoP participants
monitor.MqttMonitorInstance.SetContext(ctx)
challenger, challengee, err := monitor.MqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors()
monitor.SetContext(ctx)
challenger, challengee, err := monitor.SelectPoPParticipantsOutOfActiveActors()
if err != nil {
util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err)
}