From 9aa0b63b4d547584919f5bbff5e5fa26b69a94b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Eckel?= Date: Thu, 16 May 2024 16:53:19 +0200 Subject: [PATCH] * made monitor Mutex a RWMutex * added Mutex protection to the numberOfElements varialbe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jürgen Eckel --- monitor/backend.go | 9 +++------ monitor/interface.go | 19 +++++++++---------- monitor/mqtt_monitor.go | 25 ++++++++++++++++++++----- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/monitor/backend.go b/monitor/backend.go index 1471e70..ff76767 100644 --- a/monitor/backend.go +++ b/monitor/backend.go @@ -25,16 +25,13 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error()) return } - increaseCounter := false + // returns an error if the entry does not exist (we have to increase the counter in this case) _, err = mms.db.Get([]byte(address), nil) if err != nil { - increaseCounter = true + mms.setNumDBElements(mms.getNumDBElements() + 1) } mms.dbMutex.Lock() - if increaseCounter { - mms.numberOfElements++ - } err = mms.db.Put([]byte(address), lastSeenBytes, nil) mms.dbMutex.Unlock() if err != nil { @@ -47,9 +44,9 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er } func (mms *MqttMonitor) deleteEntry(key []byte) (err error) { + mms.setNumDBElements(mms.getNumDBElements() - 1) mms.dbMutex.Lock() err = mms.db.Delete(key, nil) - mms.numberOfElements-- mms.dbMutex.Unlock() return } diff --git a/monitor/interface.go b/monitor/interface.go index 21879af..f967e6b 100644 --- a/monitor/interface.go +++ b/monitor/interface.go @@ -13,7 +13,7 @@ type MQTTMonitorClientI interface { Start() (err error) } -var monitorMutex sync.Mutex +var monitorMutex sync.RWMutex var mqttMonitorInstance MQTTMonitorClientI func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { @@ -23,9 +23,9 @@ func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { } func LazyMqttMonitorLoader(homeDir string) { - monitorMutex.Lock() + monitorMutex.RLock() tmpInstance := mqttMonitorInstance - monitorMutex.Unlock() + monitorMutex.RUnlock() if tmpInstance != nil { return } @@ -36,9 +36,8 @@ func LazyMqttMonitorLoader(homeDir string) { if err != nil { panic(err) } - monitorMutex.Lock() - mqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig()) - monitorMutex.Unlock() + + SetMqttMonitorInstance(NewMqttMonitorService(aciveActorsDB, *config.GetConfig())) err = mqttMonitorInstance.Start() if err != nil { panic(err) @@ -46,9 +45,9 @@ func LazyMqttMonitorLoader(homeDir string) { } func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { - monitorMutex.Lock() + monitorMutex.RLock() challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors() - monitorMutex.Unlock() + monitorMutex.RUnlock() return } @@ -58,8 +57,8 @@ func Start() (err error) { } func AddParticipant(address string, lastSeenTS int64) (err error) { - monitorMutex.Lock() + monitorMutex.RLock() err = mqttMonitorInstance.AddParticipant(address, lastSeenTS) - monitorMutex.Unlock() + monitorMutex.RUnlock() return } diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 8cd71b6..053fa78 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -24,6 +24,7 @@ type MqttMonitor struct { dbMutex sync.Mutex // Mutex to synchronize write operations CleanupPeriodicityInMinutes time.Duration config config.Config + numberOfElementsMutex sync.RWMutex numberOfElements int64 sdkContext *sdk.Context contextMutex sync.Mutex @@ -49,6 +50,18 @@ func (mms *MqttMonitor) IsTerminated() (isTerminated bool) { return } +func (mms *MqttMonitor) getNumDBElements() int64 { + mms.numberOfElementsMutex.RLock() + defer mms.numberOfElementsMutex.RUnlock() + return mms.numberOfElements +} + +func (mms *MqttMonitor) setNumDBElements(numElements int64) { + mms.numberOfElementsMutex.Lock() + defer mms.numberOfElementsMutex.Unlock() + mms.numberOfElements = numElements +} + func getClientID() string { conf := config.GetConfig() return "monitor-" + conf.ValidatorAddress @@ -107,7 +120,7 @@ func (mms *MqttMonitor) Start() (err error) { if err != nil { return } - mms.numberOfElements = amount + mms.setNumDBElements(amount) go mms.runPeriodicTasks() go mms.MonitorActiveParticipants() go mms.CleanupDB() @@ -116,17 +129,19 @@ func (mms *MqttMonitor) Start() (err error) { func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) { for challenger == challengee { // Generate random numbers - challenger = rand.Intn(int(mms.numberOfElements)) - challengee = rand.Intn(int(mms.numberOfElements)) + numElements := int(mms.getNumDBElements()) + challenger = rand.Intn(numElements) + challengee = rand.Intn(numElements) } return } func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { - if mms.numberOfElements < 2 { + numElements := int(mms.getNumDBElements()) + if numElements < 2 { return } randomChallenger, randomChallengee := mms.getRandomNumbers() - log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) + log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(numElements)) log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) iter := mms.db.NewIterator(nil, nil) defer iter.Release()