refactor(mqtt): dry

Signed-off-by: Julian Strobl <jmastr@mailbox.org>
This commit is contained in:
Julian Strobl 2024-11-15 10:36:19 +01:00
parent 8d2ecf6e28
commit 1bda49e2cc
No known key found for this signature in database
GPG Key ID: E0A8F9AD733499A7
3 changed files with 37 additions and 37 deletions

View File

@ -21,7 +21,7 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
lastSeenBytes, err := json.Marshal(lastSeen) lastSeenBytes, err := json.Marshal(lastSeen)
if err != nil { if err != nil {
Log("[app] [Monitor] Error serializing ConversionRequest: " + err.Error()) Log("Error serializing ConversionRequest: " + err.Error())
return return
} }
@ -34,9 +34,9 @@ func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err er
err = mms.db.Put([]byte(address), lastSeenBytes, nil) err = mms.db.Put([]byte(address), lastSeenBytes, nil)
mms.dbMutex.Unlock() mms.dbMutex.Unlock()
if err != nil { if err != nil {
Log("[app] [Monitor] error storing addresses in DB: " + err.Error()) Log("error storing addresses in DB: " + err.Error())
} else { } else {
Log("[app] [Monitor] stored address in DB: " + address) Log("stored address in DB: " + address)
} }
return return
@ -60,9 +60,9 @@ func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) {
// Check for any errors encountered during iteration // Check for any errors encountered during iteration
if err := iter.Error(); err != nil { if err := iter.Error(); err != nil {
Log("[app] [Monitor] " + err.Error()) Log("" + err.Error())
} else { } else {
Log("[app] [Monitor] elements: " + strconv.FormatInt(amount, 10)) Log("elements: " + strconv.FormatInt(amount, 10))
} }
return return
@ -72,14 +72,14 @@ func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSe
value := iter.Value() value := iter.Value()
err = json.Unmarshal(value, &lastSeen) err = json.Unmarshal(value, &lastSeen)
if err != nil { if err != nil {
Log("[app] [Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error()) Log("Failed to unmarshal entry: " + string(key) + " - " + err.Error())
} }
return return
} }
func (mms *MqttMonitor) CleanupDB() { func (mms *MqttMonitor) CleanupDB() {
// Create an iterator for the database // Create an iterator for the database
Log("[app] [Monitor] Starting clean-up process") Log("Starting clean-up process")
iter := mms.db.NewIterator(nil, nil) iter := mms.db.NewIterator(nil, nil)
defer iter.Release() // Make sure to release the iterator at the end defer iter.Release() // Make sure to release the iterator at the end
@ -88,7 +88,7 @@ func (mms *MqttMonitor) CleanupDB() {
// Use iter.Key() and iter.Value() to access the key and value // Use iter.Key() and iter.Value() to access the key and value
lastSeen, err := mms.getDataFromIter(iter) lastSeen, err := mms.getDataFromIter(iter)
if err != nil { if err != nil {
Log("[app] [Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) Log("Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error())
continue continue
} }
timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix() timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix()
@ -96,15 +96,15 @@ func (mms *MqttMonitor) CleanupDB() {
// If the entry is older than 12 hours, delete it // If the entry is older than 12 hours, delete it
err := mms.deleteEntry(iter.Key()) err := mms.deleteEntry(iter.Key())
if err != nil { if err != nil {
Log("[app] [Monitor] Failed to delete entry: " + err.Error()) Log("Failed to delete entry: " + err.Error())
} else { } else {
Log("[app] [Monitor] Delete entry: " + string(iter.Key())) Log("Delete entry: " + string(iter.Key()))
} }
} }
} }
// Check for any errors encountered during iteration // Check for any errors encountered during iteration
if err := iter.Error(); err != nil { if err := iter.Error(); err != nil {
Log("[app] [Monitor] error during cleanup : " + err.Error()) Log("error during cleanup : " + err.Error())
} }
} }

View File

@ -79,5 +79,5 @@ func Log(msg string) {
if mqttLogger == nil { if mqttLogger == nil {
return return
} }
mqttLogger.Info(msg) mqttLogger.Info("[app] [Monitor] " + msg)
} }

View File

@ -89,7 +89,7 @@ func (mms *MqttMonitor) lazyLoadMonitorMQTTClient() util.MQTTClientI {
opts.SetTLSConfig(tlsConfig) opts.SetTLSConfig(tlsConfig)
} }
Log("[app] [Monitor] create new client") Log("create new client")
client := mqtt.NewClient(opts) client := mqtt.NewClient(opts)
return client return client
} }
@ -141,8 +141,8 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
return return
} }
randomChallenger, randomChallengee := mms.getRandomNumbers() randomChallenger, randomChallengee := mms.getRandomNumbers()
Log("[app] [Monitor] number of elements: " + strconv.Itoa(numElements)) Log("number of elements: " + strconv.Itoa(numElements))
Log("[app] [Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) Log("selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
iter := mms.db.NewIterator(nil, nil) iter := mms.db.NewIterator(nil, nil)
defer iter.Release() defer iter.Release()
count := 0 count := 0
@ -152,7 +152,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
if count == randomChallenger { if count == randomChallenger {
lastSeen, err = mms.getDataFromIter(iter) lastSeen, err = mms.getDataFromIter(iter)
if err != nil { if err != nil {
Log("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallenger)) Log("could not get Data from ID" + strconv.Itoa(randomChallenger))
return return
} }
challenger = lastSeen.Address challenger = lastSeen.Address
@ -160,7 +160,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
} else if count == randomChallengee { } else if count == randomChallengee {
lastSeen, err = mms.getDataFromIter(iter) lastSeen, err = mms.getDataFromIter(iter)
if err != nil { if err != nil {
Log("[app] [Monitor] could not get Data from ID" + strconv.Itoa(randomChallengee)) Log("could not get Data from ID" + strconv.Itoa(randomChallengee))
return return
} }
challengee = lastSeen.Address challengee = lastSeen.Address
@ -172,7 +172,7 @@ func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger str
break break
} }
} }
Log("[app] [Monitor] challenger, challengee: " + challenger + " " + challengee) Log("challenger, challengee: " + challenger + " " + challengee)
return return
} }
@ -210,9 +210,9 @@ func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
err = mms.AddParticipant(address, unixTime) err = mms.AddParticipant(address, unixTime)
if err != nil { if err != nil {
Log("[app] [Monitor] error adding active actor to DB: " + address + " " + err.Error()) Log("error adding active actor to DB: " + address + " " + err.Error())
} else { } else {
Log("[app] [Monitor] added active actor to DB: " + address) Log("added active actor to DB: " + address)
} }
} }
@ -226,7 +226,7 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
ctx := context.Background() ctx := context.Background()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil { if err != nil {
Log("[app] [Monitor] cannot send machine query request " + err.Error()) Log("cannot send machine query request " + err.Error())
return return
} }
@ -236,7 +236,7 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
// Send the request // Send the request
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
Log("[app] [Monitor] cannot connect to server: " + err.Error()) Log("cannot connect to server: " + err.Error())
return return
} }
@ -246,13 +246,13 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
// Read the response body // Read the response body
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
Log("[app] [Monitor] cannot read response: " + err.Error()) Log("cannot read response: " + err.Error())
return return
} }
// Check the status code // Check the status code
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
Log("[app] [Monitor] unexpected status code: " + string(body)) Log("unexpected status code: " + string(body))
return return
} }
@ -260,29 +260,29 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
var data map[string]interface{} var data map[string]interface{}
err = json.Unmarshal(body, &data) err = json.Unmarshal(body, &data)
if err != nil { if err != nil {
Log("[app] [Monitor] cannot unmarshal response " + err.Error()) Log("cannot unmarshal response " + err.Error())
return return
} }
// Check if the "info" key exists // Check if the "info" key exists
machineValue, ok := data["machine"] machineValue, ok := data["machine"]
if !ok { if !ok {
Log("[app] [Monitor] response does not contain the required machine") Log("response does not contain the required machine")
return return
} }
machineMap, ok := machineValue.(map[string]interface{}) machineMap, ok := machineValue.(map[string]interface{})
if !ok { if !ok {
Log("[app] [Monitor] cannot convert machine map") Log("cannot convert machine map")
return return
} }
addressMap, ok := machineMap["address"] addressMap, ok := machineMap["address"]
if !ok { if !ok {
Log("[app] [Monitor] response does not contain the required name") Log("response does not contain the required name")
return return
} }
value, ok := addressMap.(string) value, ok := addressMap.(string)
if !ok || value != address { if !ok || value != address {
Log("[app] [Monitor] return machine is not the required one") Log("return machine is not the required one")
return return
} }
@ -292,7 +292,7 @@ func IsLegitMachineAddress(address string) (active bool, err error) {
} }
func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) { func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) {
Log("[app] [Monitor] Connection lost: " + err.Error()) Log("Connection lost: " + err.Error())
// Handle connection loss here (e.g., reconnect attempts, logging) // Handle connection loss here (e.g., reconnect attempts, logging)
if !mms.IsTerminated() { if !mms.IsTerminated() {
mms.lostConnectionMutex.Lock() mms.lostConnectionMutex.Lock()
@ -304,7 +304,7 @@ func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) {
func (mms *MqttMonitor) MonitorActiveParticipants() { func (mms *MqttMonitor) MonitorActiveParticipants() {
mms.clientMutex.Lock() mms.clientMutex.Lock()
if mms.localMqttClient != nil { if mms.localMqttClient != nil {
Log("[app] [Monitor] client is still working") Log("client is still working")
mms.clientMutex.Unlock() mms.clientMutex.Unlock()
return return
} }
@ -316,7 +316,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
mms.SetMaxRetries() mms.SetMaxRetries()
for !mms.IsTerminated() && mms.maxRetries > 0 { for !mms.IsTerminated() && mms.maxRetries > 0 {
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
Log("[app] [Monitor] error connecting to mqtt: " + token.Error().Error()) Log("error connecting to mqtt: " + token.Error().Error())
mms.maxRetries-- mms.maxRetries--
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
continue continue
@ -325,24 +325,24 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
mms.lostConnection = false mms.lostConnection = false
mms.lostConnectionMutex.Unlock() mms.lostConnectionMutex.Unlock()
Log("[app] [Monitor] established connection") Log("established connection")
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
// Subscribe to a topic // Subscribe to a topic
subscriptionTopic := "tele/#" subscriptionTopic := "tele/#"
if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
Log("[app] [Monitor] error registering the mqtt subscription: " + token.Error().Error()) Log("error registering the mqtt subscription: " + token.Error().Error())
continue continue
} }
Log("[app] [Monitor] subscribed to tele/# channels") Log("subscribed to tele/# channels")
for !mms.IsTerminated() { for !mms.IsTerminated() {
mms.lostConnectionMutex.Lock() mms.lostConnectionMutex.Lock()
lostConnectionEvent := mms.lostConnection lostConnectionEvent := mms.lostConnection
mms.lostConnectionMutex.Unlock() mms.lostConnectionMutex.Unlock()
if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent { if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent {
Log("[app] [Monitor] retry establishing a connection") Log("retry establishing a connection")
break // Exit inner loop on disconnect break // Exit inner loop on disconnect
} }
@ -353,7 +353,7 @@ func (mms *MqttMonitor) MonitorActiveParticipants() {
} }
if mms.maxRetries == 0 { if mms.maxRetries == 0 {
Log("[app] [Monitor] Reached maximum reconnection attempts. Exiting. New client will be activated soon.") Log("Reached maximum reconnection attempts. Exiting. New client will be activated soon.")
} }
mms.clientMutex.Lock() mms.clientMutex.Lock()