Eckelj/mqtt monitoring (#359)

* added a MqttMonitor module with levelDB and periodic cleanup
* initialized in the app
* passed to dao keeper
* added conversion methods (string2unixtime, byte ToJSON)
* removed obsolete keeper code
* maded  RDDLToken.Factor public
* added explicit mqtt client to the monitor module
* restart mqtt connection in mqttmonitor on connection loss
* adjusted mqttmock structure to be compatible
* added some linter exclusions to let the monitor tool pass
* created a MockMqttMonitor interface and mock object
* used this to pass tests
* made the MockMqttMonitor a global object so that it can be easily mocked
* removed MockMqttMonitor from the app/keeper initialization
* adjusted test cases to register "active machines" to the mqttmonitor
* added mutex in mocks to protect against data races
* defined mocks for the dao tests
* clear separation between interface and mqtt-Monitor

* added another waiting block to ensure the tx went through (multi-threading issue, race condition) during tests this failed sometimes
* added memstorage to test instead of a file based DB


Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2024-04-08 10:49:00 +02:00 committed by GitHub
parent 0ec6fba4ec
commit 779b1edd48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 653 additions and 70 deletions

View File

@ -133,3 +133,9 @@ issues:
- path: docs/docs\.go
linters:
- revive
- path: monitor/mqtt_monitor_test\.go
linters:
- paralleltest
- path: monitor/.*\.go
linters:
- durationcheck

2
go.mod
View File

@ -29,6 +29,7 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529
google.golang.org/grpc v1.56.3
gopkg.in/yaml.v2 v2.4.0
@ -155,7 +156,6 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.15.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect
github.com/tendermint/go-amino v0.16.0 // indirect
github.com/tidwall/btree v1.6.0 // indirect

105
monitor/backend.go Normal file
View File

@ -0,0 +1,105 @@
package monitor
import (
"encoding/json"
"log"
"time"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
type LastSeenEvent struct {
Address string `binding:"required" json:"address"`
Timestamp int64 `binding:"required" json:"timestamp"`
}
func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err error) {
// store receive address - planetmint address pair
var lastSeen LastSeenEvent
lastSeen.Address = address
lastSeen.Timestamp = lastSeenTS
lastSeenBytes, err := json.Marshal(lastSeen)
if err != nil {
mms.Log("[Monitor] Error serializing ConversionRequest: " + err.Error())
return
}
increaseCounter := false
// returns an error if the entry does not exist (we have to increase the counter in this case)
_, err = mms.db.Get([]byte(address), nil)
if err != nil {
increaseCounter = true
}
mms.dbMutex.Lock()
if increaseCounter {
mms.numberOfElements++
}
err = mms.db.Put([]byte(address), lastSeenBytes, nil)
mms.dbMutex.Unlock()
if err != nil {
log.Println("[Monitor] storing addresses in DB: " + err.Error())
return
}
return
}
func (mms *MqttMonitor) deleteEntry(key []byte) (err error) {
mms.dbMutex.Lock()
err = mms.db.Delete(key, nil)
mms.numberOfElements--
mms.dbMutex.Unlock()
return
}
func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) {
iter := mms.db.NewIterator(nil, nil)
defer iter.Release()
for iter.Next() {
amount++
}
// Check for any errors encountered during iteration
if err := iter.Error(); err != nil {
log.Println("[Monitor] " + err.Error())
}
return
}
func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSeenEvent, err error) {
key := iter.Key()
value := iter.Value()
err = json.Unmarshal(value, &lastSeen)
if err != nil {
mms.Log("[Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error())
}
return
}
func (mms *MqttMonitor) CleanupDB() {
// Create an iterator for the database
iter := mms.db.NewIterator(nil, nil)
defer iter.Release() // Make sure to release the iterator at the end
// Iterate over all elements in the database
for iter.Next() && !mms.IsTerminated() {
// Use iter.Key() and iter.Value() to access the key and value
lastSeen, err := mms.getDataFromIter(iter)
if err != nil {
mms.Log("[Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error())
continue
}
timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix()
if lastSeen.Timestamp <= timeThreshold {
// If the entry is older than 12 hours, delete it
err := mms.deleteEntry(iter.Key())
if err != nil {
mms.Log("[Monitor] Failed to delete entry: " + err.Error())
}
}
}
// Check for any errors encountered during iteration
if err := iter.Error(); err != nil {
mms.Log(err.Error())
}
}

73
monitor/interface.go Normal file
View File

@ -0,0 +1,73 @@
package monitor
import (
"sync"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/planetmint/planetmint-go/config"
"github.com/syndtr/goleveldb/leveldb"
)
type MQTTMonitorClientI interface {
AddParticipant(address string, lastSeenTS int64) (err error)
SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error)
SetContext(ctx sdk.Context)
Start() (err error)
}
var monitorMutex sync.Mutex
var mqttMonitorInstance MQTTMonitorClientI
func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) {
monitorMutex.Lock()
mqttMonitorInstance = monitorInstance
monitorMutex.Unlock()
}
func LazyMqttMonitorLoader(homeDir string) {
monitorMutex.Lock()
tmpInstance := mqttMonitorInstance
monitorMutex.Unlock()
if tmpInstance != nil {
return
}
if homeDir == "" {
homeDir = "./"
}
aciveActorsDB, err := leveldb.OpenFile(homeDir+"activeActors.db", nil)
if err != nil {
panic(err)
}
monitorMutex.Lock()
mqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig())
monitorMutex.Unlock()
err = mqttMonitorInstance.Start()
if err != nil {
panic(err)
}
}
func SetContext(ctx sdk.Context) {
monitorMutex.Lock()
mqttMonitorInstance.SetContext(ctx)
monitorMutex.Unlock()
}
func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
monitorMutex.Lock()
challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors()
monitorMutex.Unlock()
return
}
func Start() (err error) {
err = mqttMonitorInstance.Start()
return
}
func AddParticipant(address string, lastSeenTS int64) (err error) {
monitorMutex.Lock()
err = mqttMonitorInstance.AddParticipant(address, lastSeenTS)
monitorMutex.Unlock()
return
}

View File

@ -0,0 +1,37 @@
package mocks
import (
types "github.com/cosmos/cosmos-sdk/types"
)
// MockMQTTMonitorClientI is a mock of MQTTMonitorClientI interface.
type MockMQTTMonitorClientI struct {
myStringList []string
}
// AddParticipant mocks base method.
func (m *MockMQTTMonitorClientI) AddParticipant(address string, _ int64) error {
m.myStringList = append(m.myStringList, address)
return nil
}
// SelectPoPParticipantsOutOfActiveActors mocks base method.
func (m *MockMQTTMonitorClientI) SelectPoPParticipantsOutOfActiveActors() (string, string, error) {
var challenger, challengee string
amount := len(m.myStringList)
if amount >= 2 {
challenger = m.myStringList[amount-2]
challengee = m.myStringList[amount-1]
}
return challenger, challengee, nil
}
// SetContext mocks base method.
func (m *MockMQTTMonitorClientI) SetContext(_ types.Context) {
}
// Start mocks base method.
func (m *MockMQTTMonitorClientI) Start() error {
return nil
}

206
monitor/mqtt_monitor.go Normal file
View File

@ -0,0 +1,206 @@
package monitor
import (
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
sdk "github.com/cosmos/cosmos-sdk/types"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/planetmint/planetmint-go/config"
"github.com/planetmint/planetmint-go/util"
"github.com/syndtr/goleveldb/leveldb"
)
var MonitorMQTTClient util.MQTTClientI
type MqttMonitor struct {
db *leveldb.DB
dbMutex sync.Mutex // Mutex to synchronize write operations
ticker *time.Ticker
CleanupPeriodicityInMinutes time.Duration
config config.Config
numberOfElements int64
sdkContext *sdk.Context
contextMutex sync.Mutex
isTerminated bool
terminationMutex sync.Mutex
}
func (mms *MqttMonitor) Terminate() {
mms.terminationMutex.Lock()
mms.isTerminated = true
mms.terminationMutex.Unlock()
}
func (mms *MqttMonitor) IsTerminated() (isTerminated bool) {
mms.terminationMutex.Lock()
isTerminated = mms.isTerminated
mms.terminationMutex.Unlock()
return
}
func LazyLoadMonitorMQTTClient() {
if MonitorMQTTClient != nil {
return
}
conf := config.GetConfig()
hostPort := net.JoinHostPort(conf.MqttDomain, strconv.FormatInt(int64(conf.MqttPort), 10))
uri := "tcp://" + hostPort
opts := mqtt.NewClientOptions().AddBroker(uri)
opts.SetClientID(conf.ValidatorAddress + "-monitor")
opts.SetUsername(conf.MqttUser)
opts.SetPassword(conf.MqttPassword)
MonitorMQTTClient = mqtt.NewClient(opts)
}
func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor {
service := &MqttMonitor{db: db, config: config, numberOfElements: 0, CleanupPeriodicityInMinutes: 10}
return service
}
func (mms *MqttMonitor) registerPeriodicTasks() {
mms.ticker = time.NewTicker(mms.CleanupPeriodicityInMinutes * time.Minute)
go func() {
for range mms.ticker.C { // Loop over the ticker channel
go mms.CleanupDB()
}
}()
}
func (mms *MqttMonitor) Start() (err error) {
amount, err := mms.getAmountOfElements()
if err != nil {
return
}
mms.numberOfElements = amount
mms.registerPeriodicTasks()
go mms.MonitorActiveParticipants()
return
}
func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) {
for challenger == challengee {
// Generate random numbers
challenger = rand.Intn(int(mms.numberOfElements))
challengee = rand.Intn(int(mms.numberOfElements))
}
return
}
func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
if mms.numberOfElements < 2 {
return
}
randomChallenger, randomChallengee := mms.getRandomNumbers()
mms.Log("[Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements)))
mms.Log("[Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
iter := mms.db.NewIterator(nil, nil)
defer iter.Release()
count := 0
found := 0
var lastSeen LastSeenEvent
for iter.Next() {
mms.Log("[Monitor] count: " + strconv.Itoa(count))
if count == randomChallenger {
lastSeen, err = mms.getDataFromIter(iter)
if err != nil {
return
}
challenger = lastSeen.Address
found++
} else if count == randomChallengee {
lastSeen, err = mms.getDataFromIter(iter)
if err != nil {
return
}
challengee = lastSeen.Address
found++
}
count++
if found == 2 {
break
}
}
return
}
func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
if mms.IsTerminated() {
return
}
topicParts := strings.Split(msg.Topic(), "/")
if len(topicParts) != 3 {
return
}
if topicParts[0] != "tele" {
return
}
if topicParts[2] != "STATE" {
return
}
address := topicParts[1]
valid, err := util.IsValidAddress(address)
if err != nil || !valid {
return
}
payload, err := util.ToJSON(msg.Payload())
if err != nil {
return
}
timeString, ok := payload["Time"].(string)
if !ok {
return
}
unixTime, err := util.String2UnixTime(timeString)
if err != nil {
return
}
err = mms.AddParticipant(address, unixTime)
if err != nil {
mms.Log("[Monitor] error adding active actor to DB: " + address + " " + err.Error())
} else {
mms.Log("[Monitor] added active actor to DB: " + address)
}
}
func (mms *MqttMonitor) MonitorActiveParticipants() {
LazyLoadMonitorMQTTClient()
for !mms.IsTerminated() {
if !MonitorMQTTClient.IsConnected() {
if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error())
panic(token.Error())
}
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
// Subscribe to a topic
subscriptionTopic := "tele/#"
if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error())
panic(token.Error())
}
}
time.Sleep(5 * time.Second)
}
}
func (mms *MqttMonitor) Log(msg string) {
mms.contextMutex.Lock()
if mms.sdkContext != nil {
util.GetAppLogger().Info(*mms.sdkContext, msg)
}
mms.contextMutex.Unlock()
}
func (mms *MqttMonitor) SetContext(ctx sdk.Context) {
mms.contextMutex.Lock()
mms.sdkContext = &ctx
mms.contextMutex.Unlock()
}

View File

@ -0,0 +1,106 @@
package monitor_test
import (
"testing"
"time"
"github.com/planetmint/planetmint-go/config"
"github.com/planetmint/planetmint-go/monitor"
"github.com/planetmint/planetmint-go/util/mocks"
"github.com/stretchr/testify/assert"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
func init() {
// Use MQTT mock client
monitor.MonitorMQTTClient = &mocks.MockMQTTClient{}
}
const (
challengerInput = "plmnt1fx3x6u8k5q8kjl7pamsuwjtut8nkks8dk92dek"
challengeeInput = "plmnt1fsaljz3xqf6vchkjxfzfrd30cdp3j4vqh298pr"
)
func TestGMonitorActiveParticipants(t *testing.T) {
monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig()
db, err := leveldb.Open(storage.NewMemStorage(), nil)
assert.NoError(t, err)
defer db.Close()
mqttMonitor := monitor.NewMqttMonitorService(db, *cfg)
err = mqttMonitor.Start()
assert.NoError(t, err)
currentTime := time.Now()
unixTime := currentTime.Unix()
err = mqttMonitor.AddParticipant(challengerInput, unixTime)
assert.NoError(t, err)
err = mqttMonitor.AddParticipant(challengeeInput, unixTime)
assert.NoError(t, err)
mqttMonitor.CleanupDB()
challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors()
assert.NoError(t, err)
assert.Contains(t, challenger, "plmnt")
assert.Contains(t, challengee, "plmnt")
mqttMonitor.Terminate()
}
func TestCleanupRemoval(t *testing.T) {
monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig()
db, err := leveldb.Open(storage.NewMemStorage(), nil)
assert.NoError(t, err)
defer db.Close()
mqttMonitor := monitor.NewMqttMonitorService(db, *cfg)
err = mqttMonitor.Start()
assert.NoError(t, err)
currentTime := time.Now()
CleanupPeriodicityAgo := currentTime.Add(-1 * mqttMonitor.CleanupPeriodicityInMinutes * time.Minute)
unixTimeNow := currentTime.Unix()
err = mqttMonitor.AddParticipant(challengerInput, unixTimeNow)
assert.NoError(t, err)
err = mqttMonitor.AddParticipant(challengeeInput, CleanupPeriodicityAgo.Unix()-1)
assert.NoError(t, err)
mqttMonitor.CleanupDB()
challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors()
assert.NoError(t, err)
assert.Equal(t, "", challenger)
assert.Contains(t, "", challengee)
mqttMonitor.Terminate()
}
func TestCleanupPrecisionTest(t *testing.T) {
monitor.LazyLoadMonitorMQTTClient()
cfg := config.GetConfig()
db, err := leveldb.Open(storage.NewMemStorage(), nil)
assert.NoError(t, err)
defer db.Close()
mqttMonitor := monitor.NewMqttMonitorService(db, *cfg)
err = mqttMonitor.Start()
assert.NoError(t, err)
currentTime := time.Now()
CleanupThresholdAgo := currentTime.Add(-1 * mqttMonitor.CleanupPeriodicityInMinutes * time.Minute)
aboveThreshold := CleanupThresholdAgo.Unix() + 10
unixTimeNow := currentTime.Unix()
err = mqttMonitor.AddParticipant(challengerInput, unixTimeNow)
assert.NoError(t, err)
err = mqttMonitor.AddParticipant(challengeeInput, aboveThreshold)
assert.NoError(t, err)
mqttMonitor.CleanupDB()
challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors()
assert.NoError(t, err)
assert.Contains(t, challenger, "plmnt")
assert.Contains(t, challengee, "plmnt")
mqttMonitor.Terminate()
}

View File

@ -132,6 +132,7 @@ func (s *ConsumptionE2ETestSuite) TestNonValidatorConsumptionOverflow() {
out, err := lib.BroadcastTxWithFileLock(addr, msgs...)
s.Require().NoError(err)
s.Require().NoError(s.network.WaitForNextBlock())
s.Require().NoError(s.network.WaitForNextBlock())
_, err = clitestutil.GetRawLogFromTxOut(val, out)

View File

@ -7,11 +7,13 @@ import (
"math"
"os"
"strconv"
"time"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
sdk "github.com/cosmos/cosmos-sdk/types"
bank "github.com/cosmos/cosmos-sdk/x/bank/client/cli"
"github.com/planetmint/planetmint-go/lib"
"github.com/planetmint/planetmint-go/monitor"
"github.com/planetmint/planetmint-go/testutil"
clitestutil "github.com/planetmint/planetmint-go/testutil/cli"
e2etestutil "github.com/planetmint/planetmint-go/testutil/e2e"
@ -164,6 +166,8 @@ func (s *SelectionE2ETestSuite) TestPopSelectionNoActors() {
func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() {
err := e2etestutil.AttestMachine(s.network, machines[0].name, machines[0].mnemonic, 0, s.feeDenom)
s.Require().NoError(err)
err = monitor.AddParticipant(machines[0].address, time.Now().Unix())
s.Require().NoError(err)
out := s.perpareLocalTest()
@ -174,6 +178,8 @@ func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() {
func (s *SelectionE2ETestSuite) TestPopSelectionTwoActors() {
err := e2etestutil.AttestMachine(s.network, machines[1].name, machines[1].mnemonic, 1, s.feeDenom)
s.Require().NoError(err)
err = monitor.AddParticipant(machines[1].address, time.Now().Unix())
s.Require().NoError(err)
out := s.perpareLocalTest()

View File

@ -15,14 +15,16 @@ import (
govtypes "github.com/cosmos/cosmos-sdk/x/gov/types"
typesparams "github.com/cosmos/cosmos-sdk/x/params/types"
"github.com/golang/mock/gomock"
"github.com/planetmint/planetmint-go/monitor"
monitormocks "github.com/planetmint/planetmint-go/monitor/mocks"
"github.com/planetmint/planetmint-go/x/dao/keeper"
daotestutil "github.com/planetmint/planetmint-go/x/dao/testutil"
"github.com/planetmint/planetmint-go/x/dao/types"
"github.com/stretchr/testify/require"
daotestutil "github.com/planetmint/planetmint-go/x/dao/testutil"
)
func DaoKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) {
monitor.SetMqttMonitorInstance(&monitormocks.MockMQTTMonitorClientI{})
storeKey := sdk.NewKVStoreKey(types.StoreKey)
memStoreKey := storetypes.NewMemoryStoreKey(types.MemStoreKey)
challengeStoreKey := storetypes.NewMemoryStoreKey(types.ChallengeKey)

View File

@ -15,6 +15,8 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"github.com/planetmint/planetmint-go/app"
"github.com/planetmint/planetmint-go/monitor"
monitormocks "github.com/planetmint/planetmint-go/monitor/mocks"
"github.com/planetmint/planetmint-go/testutil/sample"
"github.com/planetmint/planetmint-go/util"
"github.com/planetmint/planetmint-go/util/mocks"
@ -39,6 +41,8 @@ func Load(t *testing.T, configs ...Config) *Network {
// use mock client for testing
util.MQTTClient = &mocks.MockMQTTClient{}
monitor.MonitorMQTTClient = &mocks.MockMQTTClient{}
monitor.SetMqttMonitorInstance(&monitormocks.MockMQTTMonitorClientI{})
elements.Client = &elementsmocks.MockClient{}
util.RegisterAssetServiceHTTPClient = &mocks.MockClient{}

View File

@ -57,3 +57,17 @@ func IsValidatorBlockProposer(ctx sdk.Context, proposerAddress []byte, rootDir s
result = hexProposerAddress == validatorIdentity
return
}
func IsValidAddress(address string) (valid bool, err error) {
// Attempt to decode the address
_, err = sdk.AccAddressFromBech32(address)
if err != nil {
return
}
if !strings.Contains(address, "plmnt") {
valid = false
return
}
valid = true
return
}

View File

@ -14,6 +14,9 @@ type MockMQTTClient struct {
PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
UnsubscribeFunc func(topics ...string) mqtt.Token
IsConnectedFunc func() bool
connected bool
connectedMutex sync.Mutex
}
// GetConnectFunc fetches the mock client's `Connect` func
@ -96,6 +99,9 @@ func GetUnsubscribeFunc(_ ...string) mqtt.Token {
// Connect is the mock client's `Disconnect` func
func (m *MockMQTTClient) Connect() mqtt.Token {
m.connectedMutex.Lock()
m.connected = true
m.connectedMutex.Unlock()
return GetConnectFunc()
}
@ -116,3 +122,10 @@ func (m *MockMQTTClient) Subscribe(topic string, qos byte, callback mqtt.Message
func (m *MockMQTTClient) Unsubscribe(topics ...string) mqtt.Token {
return GetUnsubscribeFunc(topics...)
}
func (m *MockMQTTClient) IsConnected() bool {
m.connectedMutex.Lock()
connected := m.connected
m.connectedMutex.Unlock()
return connected
}

View File

@ -1,6 +1,7 @@
package util
import (
"encoding/json"
"net"
"strconv"
"strings"
@ -20,6 +21,7 @@ type MQTTClientI interface {
Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
Unsubscribe(topics ...string) mqtt.Token
IsConnected() bool
}
var (
@ -32,7 +34,7 @@ const (
MqttCmdPrefix = "cmnd/"
)
func lazyLoadMQTTClient() {
func LazyLoadMQTTClient() {
if MQTTClient != nil {
return
}
@ -66,7 +68,7 @@ func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge)
}
func sendMqttPopInitMessages(challenge types.Challenge) (err error) {
lazyLoadMQTTClient()
LazyLoadMQTTClient()
if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil {
err = token.Error()
return
@ -92,7 +94,7 @@ func sendMqttPopInitMessages(challenge types.Challenge) (err error) {
}
func GetMqttStatusOfParticipant(address string, responseTimeoutInMs int64) (isAvailable bool, err error) {
lazyLoadMQTTClient()
LazyLoadMQTTClient()
if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil {
err = token.Error()
return
@ -142,3 +144,12 @@ func GetMqttStatusOfParticipant(address string, responseTimeoutInMs int64) (isAv
MQTTClient.Disconnect(1000)
return
}
func ToJSON(payload []byte) (map[string]interface{}, error) {
jsonString := string(payload)
var result map[string]interface{}
// Unmarshal the JSON string into the map
err := json.Unmarshal([]byte(jsonString), &result)
return result, err
}

View File

@ -31,3 +31,12 @@ func TestGetMqttStatusOfParticipantMocked(t *testing.T) {
assert.NoError(t, err)
assert.True(t, isAvailable)
}
func TestToJSON(t *testing.T) {
t.Parallel()
payload := []byte(`{"Time":"2024-03-26T11:50:42","Uptime":"0T00:50:19","UptimeSec":3019,"Heap":97,"SleepMode":"Dynamic","Sleep":10,"LoadAvg":99,"MqttCount":2,"Berry":{"HeapUsed":27,"Objects":491},"POWER1":"ON","POWER2":"ON","Dimmer":17,"Color":"00182C","HSBColor":"207,100,17","Channel":[0,9,17],"Scheme":0,"Width":1,"Fade":"OFF","Speed":1,"LedTable":"ON","Wifi":{"AP":1,"SSId":"UPC5729E56","BSSId":"C2:14:7E:6F:BC:C5","Channel":11,"Mode":"11n","RSSI":96,"Signal":-52,"LinkCount":1,"Downtime":"0T00:00:10"}}`)
result, err := ToJSON(payload)
assert.NoError(t, err)
assert.Equal(t, 21, len(result))
}

View File

@ -5,15 +5,15 @@ import (
)
const (
factor = 100000000.0
Factor = 100000000.0
)
func RDDLToken2Uint(amount float64) uint64 {
return uint64(amount * factor)
return uint64(amount * Factor)
}
func RDDLToken2Float(amount uint64) float64 {
return float64(amount) / factor
return float64(amount) / Factor
}
func RDDLTokenStringToFloat(amount string) (amountFloat float64, err error) {

23
util/time.go Normal file
View File

@ -0,0 +1,23 @@
package util
import (
"time"
)
func String2UnixTime(timeInput string) (int64, error) {
// Layout specifying the format of the input string
// Note: Go uses a specific reference time (Mon Jan 2 15:04:05 MST 2006) to define format layouts
layout := "2006-01-02T15:04:05"
// Parse the string into a time.Time struct in local time zone
parsedTime, err := time.Parse(layout, timeInput)
if err != nil {
return 0, err
}
// Convert to UTC if not already
utcTime := parsedTime.UTC()
unixTime := utcTime.Unix()
return unixTime, nil
}

15
util/time_test.go Normal file
View File

@ -0,0 +1,15 @@
package util
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestString2UnixTime(t *testing.T) {
t.Parallel()
input := "2024-03-26T11:10:41"
unixTime, err := String2UnixTime(input)
assert.NoError(t, err)
assert.Equal(t, int64(1711451441), unixTime)
}

View File

@ -3,6 +3,7 @@ package dao
import (
"encoding/hex"
"github.com/planetmint/planetmint-go/monitor"
"github.com/planetmint/planetmint-go/util"
"github.com/planetmint/planetmint-go/x/dao/keeper"
@ -23,7 +24,15 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper)
hexProposerAddress := hex.EncodeToString(proposerAddress)
if isPopHeight(ctx, k, currentBlockHeight) {
// select PoP participants
challenger, challengee := k.SelectPopParticipants(ctx)
monitor.SetContext(ctx)
challenger, challengee, err := monitor.SelectPoPParticipantsOutOfActiveActors()
if err != nil {
util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err)
}
if err != nil || challenger == "" || challengee == "" {
challenger = ""
challengee = ""
}
// Init PoP - independent from challenger and challengee
// The keeper will send the MQTT initializing message to challenger && challengee

View File

@ -1,16 +1,13 @@
package keeper
import (
db "github.com/cometbft/cometbft-db"
"github.com/cometbft/cometbft/libs/log"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/prefix"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"
"github.com/planetmint/planetmint-go/util"
"github.com/planetmint/planetmint-go/monitor"
"github.com/planetmint/planetmint-go/x/dao/types"
)
@ -53,7 +50,7 @@ func NewKeeper(
if !ps.HasKeyTable() {
ps = ps.WithKeyTable(types.ParamKeyTable())
}
monitor.LazyMqttMonitorLoader(rootDir)
return &Keeper{
cdc: cdc,
storeKey: storeKey,
@ -75,57 +72,3 @@ func NewKeeper(
func (k Keeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With("module", "x/"+types.ModuleName)
}
func (k Keeper) SelectPopParticipants(ctx sdk.Context) (challenger string, challengee string) {
var startAccountNumber uint64
lastPopHeight := ctx.BlockHeight() - k.GetParams(ctx).PopEpochs
lastPop, found := k.LookupChallenge(ctx, lastPopHeight)
if lastPopHeight > 0 && found && lastPop.Challengee != "" {
lastAccountAddr := sdk.MustAccAddressFromBech32(lastPop.Challengee)
lastAccount := k.accountKeeper.GetAccount(ctx, lastAccountAddr)
startAccountNumber = lastAccount.GetAccountNumber() + 1
}
var participants []sdk.AccAddress
k.iterateAccountsForMachines(ctx, startAccountNumber, &participants, true)
if len(participants) != 2 {
k.iterateAccountsForMachines(ctx, startAccountNumber, &participants, false)
}
// Not enough participants
if len(participants) != 2 {
return
}
challenger = participants[0].String()
challengee = participants[1].String()
return
}
func (k Keeper) iterateAccountsForMachines(ctx sdk.Context, start uint64, participants *[]sdk.AccAddress, iterateFromStart bool) {
store := ctx.KVStore(k.accountKeeperKey)
accountStore := prefix.NewStore(store, authtypes.AccountNumberStoreKeyPrefix)
var iterator db.Iterator
if iterateFromStart {
iterator = accountStore.Iterator(sdk.Uint64ToBigEndian(start), nil)
} else {
iterator = accountStore.Iterator(nil, sdk.Uint64ToBigEndian(start))
}
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
participant := sdk.AccAddress(iterator.Value())
_, found := k.machineKeeper.GetMachineIndexByAddress(ctx, participant.String())
if found {
available, err := util.GetMqttStatusOfParticipant(participant.String(), k.GetParams(ctx).MqttResponseTimeout)
if err == nil && available {
*participants = append(*participants, participant)
}
}
if len(*participants) == 2 {
return
}
}
}