* made monitor Mutex a RWMutex

* added Mutex protection to the numberOfElements varialbe

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2024-05-16 16:53:19 +02:00
parent d403eebc9a
commit 9aa0b63b4d
No known key found for this signature in database
3 changed files with 32 additions and 21 deletions

View File

@ -25,16 +25,13 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error()) log.Println("[app] [Monitor] Error serializing ConversionRequest: " + err.Error())
return return
} }
increaseCounter := false
// returns an error if the entry does not exist (we have to increase the counter in this case) // returns an error if the entry does not exist (we have to increase the counter in this case)
_, err = mms.db.Get([]byte(address), nil) _, err = mms.db.Get([]byte(address), nil)
if err != nil { if err != nil {
increaseCounter = true mms.setNumDBElements(mms.getNumDBElements() + 1)
} }
mms.dbMutex.Lock() mms.dbMutex.Lock()
if increaseCounter {
mms.numberOfElements++
}
err = mms.db.Put([]byte(address), lastSeenBytes, nil) err = mms.db.Put([]byte(address), lastSeenBytes, nil)
mms.dbMutex.Unlock() mms.dbMutex.Unlock()
if err != nil { if err != nil {
@ -47,9 +44,9 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
} }
func (mms *MqttMonitor) deleteEntry(key []byte) (err error) { func (mms *MqttMonitor) deleteEntry(key []byte) (err error) {
mms.setNumDBElements(mms.getNumDBElements() - 1)
mms.dbMutex.Lock() mms.dbMutex.Lock()
err = mms.db.Delete(key, nil) err = mms.db.Delete(key, nil)
mms.numberOfElements--
mms.dbMutex.Unlock() mms.dbMutex.Unlock()
return return
} }

View File

@ -13,7 +13,7 @@ type MQTTMonitorClientI interface {
Start() (err error) Start() (err error)
} }
var monitorMutex sync.Mutex var monitorMutex sync.RWMutex
var mqttMonitorInstance MQTTMonitorClientI var mqttMonitorInstance MQTTMonitorClientI
func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) {
@ -23,9 +23,9 @@ func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) {
} }
func LazyMqttMonitorLoader(homeDir string) { func LazyMqttMonitorLoader(homeDir string) {
monitorMutex.Lock() monitorMutex.RLock()
tmpInstance := mqttMonitorInstance tmpInstance := mqttMonitorInstance
monitorMutex.Unlock() monitorMutex.RUnlock()
if tmpInstance != nil { if tmpInstance != nil {
return return
} }
@ -36,9 +36,8 @@ func LazyMqttMonitorLoader(homeDir string) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
monitorMutex.Lock()
mqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig()) SetMqttMonitorInstance(NewMqttMonitorService(aciveActorsDB, *config.GetConfig()))
monitorMutex.Unlock()
err = mqttMonitorInstance.Start() err = mqttMonitorInstance.Start()
if err != nil { if err != nil {
panic(err) panic(err)
@ -46,9 +45,9 @@ func LazyMqttMonitorLoader(homeDir string) {
} }
func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
monitorMutex.Lock() monitorMutex.RLock()
challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors() challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors()
monitorMutex.Unlock() monitorMutex.RUnlock()
return return
} }
@ -58,8 +57,8 @@ func Start() (err error) {
} }
func AddParticipant(address string, lastSeenTS int64) (err error) { func AddParticipant(address string, lastSeenTS int64) (err error) {
monitorMutex.Lock() monitorMutex.RLock()
err = mqttMonitorInstance.AddParticipant(address, lastSeenTS) err = mqttMonitorInstance.AddParticipant(address, lastSeenTS)
monitorMutex.Unlock() monitorMutex.RUnlock()
return return
} }

View File

@ -24,6 +24,7 @@ type MqttMonitor struct {
dbMutex sync.Mutex // Mutex to synchronize write operations dbMutex sync.Mutex // Mutex to synchronize write operations
CleanupPeriodicityInMinutes time.Duration CleanupPeriodicityInMinutes time.Duration
config config.Config config config.Config
numberOfElementsMutex sync.RWMutex
numberOfElements int64 numberOfElements int64
sdkContext *sdk.Context sdkContext *sdk.Context
contextMutex sync.Mutex contextMutex sync.Mutex
@ -49,6 +50,18 @@ func (mms *MqttMonitor) IsTerminated() (isTerminated bool) {
return return
} }
func (mms *MqttMonitor) getNumDBElements() int64 {
mms.numberOfElementsMutex.RLock()
defer mms.numberOfElementsMutex.RUnlock()
return mms.numberOfElements
}
func (mms *MqttMonitor) setNumDBElements(numElements int64) {
mms.numberOfElementsMutex.Lock()
defer mms.numberOfElementsMutex.Unlock()
mms.numberOfElements = numElements
}
func getClientID() string { func getClientID() string {
conf := config.GetConfig() conf := config.GetConfig()
return "monitor-" + conf.ValidatorAddress return "monitor-" + conf.ValidatorAddress
@ -107,7 +120,7 @@ func (mms *MqttMonitor) Start() (err error) {
if err != nil { if err != nil {
return return
} }
mms.numberOfElements = amount mms.setNumDBElements(amount)
go mms.runPeriodicTasks() go mms.runPeriodicTasks()
go mms.MonitorActiveParticipants() go mms.MonitorActiveParticipants()
go mms.CleanupDB() go mms.CleanupDB()
@ -116,17 +129,19 @@ func (mms *MqttMonitor) Start() (err error) {
func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) { func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) {
for challenger == challengee { for challenger == challengee {
// Generate random numbers // Generate random numbers
challenger = rand.Intn(int(mms.numberOfElements)) numElements := int(mms.getNumDBElements())
challengee = rand.Intn(int(mms.numberOfElements)) challenger = rand.Intn(numElements)
challengee = rand.Intn(numElements)
} }
return return
} }
func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
if mms.numberOfElements < 2 { numElements := int(mms.getNumDBElements())
if numElements < 2 {
return return
} }
randomChallenger, randomChallengee := mms.getRandomNumbers() randomChallenger, randomChallengee := mms.getRandomNumbers()
log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) log.Println("[app] [Monitor] number of elements: " + strconv.Itoa(numElements))
log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) log.Println("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
iter := mms.db.NewIterator(nil, nil) iter := mms.db.NewIterator(nil, nil)
defer iter.Release() defer iter.Release()