diff --git a/.golangci.yaml b/.golangci.yaml index ba0e7d4..3dcd13f 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -132,4 +132,10 @@ issues: - revive - path: docs/docs\.go linters: - - revive \ No newline at end of file + - revive + - path: monitor/mqtt_monitor_test\.go + linters: + - paralleltest + - path: monitor/.*\.go + linters: + - durationcheck \ No newline at end of file diff --git a/go.mod b/go.mod index 6e56952..3fb5f8f 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 + github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 google.golang.org/grpc v1.56.3 gopkg.in/yaml.v2 v2.4.0 @@ -155,7 +156,6 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.15.0 // indirect github.com/subosito/gotenv v1.4.2 // indirect - github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect github.com/tendermint/go-amino v0.16.0 // indirect github.com/tidwall/btree v1.6.0 // indirect diff --git a/monitor/backend.go b/monitor/backend.go new file mode 100644 index 0000000..218c7c2 --- /dev/null +++ b/monitor/backend.go @@ -0,0 +1,105 @@ +package monitor + +import ( + "encoding/json" + "log" + "time" + + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +type LastSeenEvent struct { + Address string `binding:"required" json:"address"` + Timestamp int64 `binding:"required" json:"timestamp"` +} + +func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err error) { + // store receive address - planetmint address pair + var lastSeen LastSeenEvent + lastSeen.Address = address + lastSeen.Timestamp = lastSeenTS + + lastSeenBytes, err := json.Marshal(lastSeen) + if err != nil { + mms.Log("[Monitor] Error serializing ConversionRequest: " + err.Error()) + return + } + increaseCounter := false + // returns an error if the entry does not exist (we have to increase the counter in this case) + _, err = mms.db.Get([]byte(address), nil) + if err != nil { + increaseCounter = true + } + mms.dbMutex.Lock() + if increaseCounter { + mms.numberOfElements++ + } + err = mms.db.Put([]byte(address), lastSeenBytes, nil) + mms.dbMutex.Unlock() + if err != nil { + log.Println("[Monitor] storing addresses in DB: " + err.Error()) + return + } + return +} + +func (mms *MqttMonitor) deleteEntry(key []byte) (err error) { + mms.dbMutex.Lock() + err = mms.db.Delete(key, nil) + mms.numberOfElements-- + mms.dbMutex.Unlock() + return +} + +func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) { + iter := mms.db.NewIterator(nil, nil) + defer iter.Release() + + for iter.Next() { + amount++ + } + + // Check for any errors encountered during iteration + if err := iter.Error(); err != nil { + log.Println("[Monitor] " + err.Error()) + } + return +} +func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSeenEvent, err error) { + key := iter.Key() + value := iter.Value() + err = json.Unmarshal(value, &lastSeen) + if err != nil { + mms.Log("[Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error()) + } + return +} + +func (mms *MqttMonitor) CleanupDB() { + // Create an iterator for the database + iter := mms.db.NewIterator(nil, nil) + defer iter.Release() // Make sure to release the iterator at the end + + // Iterate over all elements in the database + for iter.Next() && !mms.IsTerminated() { + // Use iter.Key() and iter.Value() to access the key and value + lastSeen, err := mms.getDataFromIter(iter) + if err != nil { + mms.Log("[Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) + continue + } + timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix() + if lastSeen.Timestamp <= timeThreshold { + // If the entry is older than 12 hours, delete it + err := mms.deleteEntry(iter.Key()) + if err != nil { + mms.Log("[Monitor] Failed to delete entry: " + err.Error()) + } + } + } + + // Check for any errors encountered during iteration + if err := iter.Error(); err != nil { + mms.Log(err.Error()) + } +} diff --git a/monitor/interface.go b/monitor/interface.go new file mode 100644 index 0000000..7789fab --- /dev/null +++ b/monitor/interface.go @@ -0,0 +1,73 @@ +package monitor + +import ( + "sync" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/planetmint/planetmint-go/config" + "github.com/syndtr/goleveldb/leveldb" +) + +type MQTTMonitorClientI interface { + AddParticipant(address string, lastSeenTS int64) (err error) + SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) + SetContext(ctx sdk.Context) + Start() (err error) +} + +var monitorMutex sync.Mutex +var mqttMonitorInstance MQTTMonitorClientI + +func SetMqttMonitorInstance(monitorInstance MQTTMonitorClientI) { + monitorMutex.Lock() + mqttMonitorInstance = monitorInstance + monitorMutex.Unlock() +} + +func LazyMqttMonitorLoader(homeDir string) { + monitorMutex.Lock() + tmpInstance := mqttMonitorInstance + monitorMutex.Unlock() + if tmpInstance != nil { + return + } + if homeDir == "" { + homeDir = "./" + } + aciveActorsDB, err := leveldb.OpenFile(homeDir+"activeActors.db", nil) + if err != nil { + panic(err) + } + monitorMutex.Lock() + mqttMonitorInstance = NewMqttMonitorService(aciveActorsDB, *config.GetConfig()) + monitorMutex.Unlock() + err = mqttMonitorInstance.Start() + if err != nil { + panic(err) + } +} + +func SetContext(ctx sdk.Context) { + monitorMutex.Lock() + mqttMonitorInstance.SetContext(ctx) + monitorMutex.Unlock() +} + +func SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { + monitorMutex.Lock() + challenger, challengee, err = mqttMonitorInstance.SelectPoPParticipantsOutOfActiveActors() + monitorMutex.Unlock() + return +} + +func Start() (err error) { + err = mqttMonitorInstance.Start() + return +} + +func AddParticipant(address string, lastSeenTS int64) (err error) { + monitorMutex.Lock() + err = mqttMonitorInstance.AddParticipant(address, lastSeenTS) + monitorMutex.Unlock() + return +} diff --git a/monitor/mocks/mqtt_monitor.go b/monitor/mocks/mqtt_monitor.go new file mode 100644 index 0000000..a08e814 --- /dev/null +++ b/monitor/mocks/mqtt_monitor.go @@ -0,0 +1,37 @@ +package mocks + +import ( + types "github.com/cosmos/cosmos-sdk/types" +) + +// MockMQTTMonitorClientI is a mock of MQTTMonitorClientI interface. +type MockMQTTMonitorClientI struct { + myStringList []string +} + +// AddParticipant mocks base method. +func (m *MockMQTTMonitorClientI) AddParticipant(address string, _ int64) error { + m.myStringList = append(m.myStringList, address) + + return nil +} + +// SelectPoPParticipantsOutOfActiveActors mocks base method. +func (m *MockMQTTMonitorClientI) SelectPoPParticipantsOutOfActiveActors() (string, string, error) { + var challenger, challengee string + amount := len(m.myStringList) + if amount >= 2 { + challenger = m.myStringList[amount-2] + challengee = m.myStringList[amount-1] + } + return challenger, challengee, nil +} + +// SetContext mocks base method. +func (m *MockMQTTMonitorClientI) SetContext(_ types.Context) { +} + +// Start mocks base method. +func (m *MockMQTTMonitorClientI) Start() error { + return nil +} diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go new file mode 100644 index 0000000..c3830b5 --- /dev/null +++ b/monitor/mqtt_monitor.go @@ -0,0 +1,206 @@ +package monitor + +import ( + "math/rand" + "net" + "strconv" + "strings" + "sync" + "time" + + sdk "github.com/cosmos/cosmos-sdk/types" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/planetmint/planetmint-go/config" + "github.com/planetmint/planetmint-go/util" + "github.com/syndtr/goleveldb/leveldb" +) + +var MonitorMQTTClient util.MQTTClientI + +type MqttMonitor struct { + db *leveldb.DB + dbMutex sync.Mutex // Mutex to synchronize write operations + ticker *time.Ticker + CleanupPeriodicityInMinutes time.Duration + config config.Config + numberOfElements int64 + sdkContext *sdk.Context + contextMutex sync.Mutex + isTerminated bool + terminationMutex sync.Mutex +} + +func (mms *MqttMonitor) Terminate() { + mms.terminationMutex.Lock() + mms.isTerminated = true + mms.terminationMutex.Unlock() +} + +func (mms *MqttMonitor) IsTerminated() (isTerminated bool) { + mms.terminationMutex.Lock() + isTerminated = mms.isTerminated + mms.terminationMutex.Unlock() + return +} + +func LazyLoadMonitorMQTTClient() { + if MonitorMQTTClient != nil { + return + } + + conf := config.GetConfig() + hostPort := net.JoinHostPort(conf.MqttDomain, strconv.FormatInt(int64(conf.MqttPort), 10)) + uri := "tcp://" + hostPort + + opts := mqtt.NewClientOptions().AddBroker(uri) + opts.SetClientID(conf.ValidatorAddress + "-monitor") + opts.SetUsername(conf.MqttUser) + opts.SetPassword(conf.MqttPassword) + MonitorMQTTClient = mqtt.NewClient(opts) +} + +func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor { + service := &MqttMonitor{db: db, config: config, numberOfElements: 0, CleanupPeriodicityInMinutes: 10} + return service +} + +func (mms *MqttMonitor) registerPeriodicTasks() { + mms.ticker = time.NewTicker(mms.CleanupPeriodicityInMinutes * time.Minute) + go func() { + for range mms.ticker.C { // Loop over the ticker channel + go mms.CleanupDB() + } + }() +} + +func (mms *MqttMonitor) Start() (err error) { + amount, err := mms.getAmountOfElements() + if err != nil { + return + } + mms.numberOfElements = amount + mms.registerPeriodicTasks() + go mms.MonitorActiveParticipants() + return +} +func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) { + for challenger == challengee { + // Generate random numbers + challenger = rand.Intn(int(mms.numberOfElements)) + challengee = rand.Intn(int(mms.numberOfElements)) + } + return +} +func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { + if mms.numberOfElements < 2 { + return + } + randomChallenger, randomChallengee := mms.getRandomNumbers() + mms.Log("[Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) + mms.Log("[Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) + iter := mms.db.NewIterator(nil, nil) + defer iter.Release() + count := 0 + found := 0 + var lastSeen LastSeenEvent + for iter.Next() { + mms.Log("[Monitor] count: " + strconv.Itoa(count)) + if count == randomChallenger { + lastSeen, err = mms.getDataFromIter(iter) + if err != nil { + return + } + challenger = lastSeen.Address + found++ + } else if count == randomChallengee { + lastSeen, err = mms.getDataFromIter(iter) + if err != nil { + return + } + challengee = lastSeen.Address + found++ + } + + count++ + if found == 2 { + break + } + } + return +} + +func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { + if mms.IsTerminated() { + return + } + topicParts := strings.Split(msg.Topic(), "/") + if len(topicParts) != 3 { + return + } + if topicParts[0] != "tele" { + return + } + if topicParts[2] != "STATE" { + return + } + address := topicParts[1] + valid, err := util.IsValidAddress(address) + if err != nil || !valid { + return + } + payload, err := util.ToJSON(msg.Payload()) + if err != nil { + return + } + + timeString, ok := payload["Time"].(string) + if !ok { + return + } + unixTime, err := util.String2UnixTime(timeString) + if err != nil { + return + } + err = mms.AddParticipant(address, unixTime) + if err != nil { + mms.Log("[Monitor] error adding active actor to DB: " + address + " " + err.Error()) + } else { + mms.Log("[Monitor] added active actor to DB: " + address) + } +} + +func (mms *MqttMonitor) MonitorActiveParticipants() { + LazyLoadMonitorMQTTClient() + for !mms.IsTerminated() { + if !MonitorMQTTClient.IsConnected() { + if token := MonitorMQTTClient.Connect(); token.Wait() && token.Error() != nil { + mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) + panic(token.Error()) + } + + var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler + + // Subscribe to a topic + subscriptionTopic := "tele/#" + if token := MonitorMQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error()) + panic(token.Error()) + } + } + time.Sleep(5 * time.Second) + } +} + +func (mms *MqttMonitor) Log(msg string) { + mms.contextMutex.Lock() + if mms.sdkContext != nil { + util.GetAppLogger().Info(*mms.sdkContext, msg) + } + mms.contextMutex.Unlock() +} + +func (mms *MqttMonitor) SetContext(ctx sdk.Context) { + mms.contextMutex.Lock() + mms.sdkContext = &ctx + mms.contextMutex.Unlock() +} diff --git a/monitor/mqtt_monitor_test.go b/monitor/mqtt_monitor_test.go new file mode 100644 index 0000000..c36c1ed --- /dev/null +++ b/monitor/mqtt_monitor_test.go @@ -0,0 +1,106 @@ +package monitor_test + +import ( + "testing" + "time" + + "github.com/planetmint/planetmint-go/config" + "github.com/planetmint/planetmint-go/monitor" + "github.com/planetmint/planetmint-go/util/mocks" + "github.com/stretchr/testify/assert" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +func init() { + // Use MQTT mock client + monitor.MonitorMQTTClient = &mocks.MockMQTTClient{} +} + +const ( + challengerInput = "plmnt1fx3x6u8k5q8kjl7pamsuwjtut8nkks8dk92dek" + challengeeInput = "plmnt1fsaljz3xqf6vchkjxfzfrd30cdp3j4vqh298pr" +) + +func TestGMonitorActiveParticipants(t *testing.T) { + monitor.LazyLoadMonitorMQTTClient() + cfg := config.GetConfig() + db, err := leveldb.Open(storage.NewMemStorage(), nil) + assert.NoError(t, err) + defer db.Close() + + mqttMonitor := monitor.NewMqttMonitorService(db, *cfg) + err = mqttMonitor.Start() + assert.NoError(t, err) + + currentTime := time.Now() + unixTime := currentTime.Unix() + err = mqttMonitor.AddParticipant(challengerInput, unixTime) + assert.NoError(t, err) + err = mqttMonitor.AddParticipant(challengeeInput, unixTime) + assert.NoError(t, err) + mqttMonitor.CleanupDB() + + challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors() + assert.NoError(t, err) + assert.Contains(t, challenger, "plmnt") + assert.Contains(t, challengee, "plmnt") + mqttMonitor.Terminate() +} + +func TestCleanupRemoval(t *testing.T) { + monitor.LazyLoadMonitorMQTTClient() + + cfg := config.GetConfig() + db, err := leveldb.Open(storage.NewMemStorage(), nil) + assert.NoError(t, err) + defer db.Close() + + mqttMonitor := monitor.NewMqttMonitorService(db, *cfg) + err = mqttMonitor.Start() + assert.NoError(t, err) + + currentTime := time.Now() + CleanupPeriodicityAgo := currentTime.Add(-1 * mqttMonitor.CleanupPeriodicityInMinutes * time.Minute) + unixTimeNow := currentTime.Unix() + err = mqttMonitor.AddParticipant(challengerInput, unixTimeNow) + assert.NoError(t, err) + err = mqttMonitor.AddParticipant(challengeeInput, CleanupPeriodicityAgo.Unix()-1) + assert.NoError(t, err) + mqttMonitor.CleanupDB() + + challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors() + assert.NoError(t, err) + assert.Equal(t, "", challenger) + assert.Contains(t, "", challengee) + mqttMonitor.Terminate() +} + +func TestCleanupPrecisionTest(t *testing.T) { + monitor.LazyLoadMonitorMQTTClient() + + cfg := config.GetConfig() + db, err := leveldb.Open(storage.NewMemStorage(), nil) + assert.NoError(t, err) + defer db.Close() + + mqttMonitor := monitor.NewMqttMonitorService(db, *cfg) + err = mqttMonitor.Start() + assert.NoError(t, err) + + currentTime := time.Now() + CleanupThresholdAgo := currentTime.Add(-1 * mqttMonitor.CleanupPeriodicityInMinutes * time.Minute) + aboveThreshold := CleanupThresholdAgo.Unix() + 10 + unixTimeNow := currentTime.Unix() + err = mqttMonitor.AddParticipant(challengerInput, unixTimeNow) + assert.NoError(t, err) + err = mqttMonitor.AddParticipant(challengeeInput, aboveThreshold) + assert.NoError(t, err) + mqttMonitor.CleanupDB() + + challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors() + assert.NoError(t, err) + assert.Contains(t, challenger, "plmnt") + assert.Contains(t, challengee, "plmnt") + mqttMonitor.Terminate() +} diff --git a/tests/e2e/dao/gas/gas_consumption_suite.go b/tests/e2e/dao/gas/gas_consumption_suite.go index f167d51..5ee6de7 100644 --- a/tests/e2e/dao/gas/gas_consumption_suite.go +++ b/tests/e2e/dao/gas/gas_consumption_suite.go @@ -132,6 +132,7 @@ func (s *ConsumptionE2ETestSuite) TestNonValidatorConsumptionOverflow() { out, err := lib.BroadcastTxWithFileLock(addr, msgs...) s.Require().NoError(err) + s.Require().NoError(s.network.WaitForNextBlock()) s.Require().NoError(s.network.WaitForNextBlock()) _, err = clitestutil.GetRawLogFromTxOut(val, out) diff --git a/tests/e2e/dao/pop/selection_suite.go b/tests/e2e/dao/pop/selection_suite.go index dbe0619..60c061e 100644 --- a/tests/e2e/dao/pop/selection_suite.go +++ b/tests/e2e/dao/pop/selection_suite.go @@ -7,11 +7,13 @@ import ( "math" "os" "strconv" + "time" "github.com/cosmos/cosmos-sdk/crypto/keyring" sdk "github.com/cosmos/cosmos-sdk/types" bank "github.com/cosmos/cosmos-sdk/x/bank/client/cli" "github.com/planetmint/planetmint-go/lib" + "github.com/planetmint/planetmint-go/monitor" "github.com/planetmint/planetmint-go/testutil" clitestutil "github.com/planetmint/planetmint-go/testutil/cli" e2etestutil "github.com/planetmint/planetmint-go/testutil/e2e" @@ -164,6 +166,8 @@ func (s *SelectionE2ETestSuite) TestPopSelectionNoActors() { func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() { err := e2etestutil.AttestMachine(s.network, machines[0].name, machines[0].mnemonic, 0, s.feeDenom) s.Require().NoError(err) + err = monitor.AddParticipant(machines[0].address, time.Now().Unix()) + s.Require().NoError(err) out := s.perpareLocalTest() @@ -174,6 +178,8 @@ func (s *SelectionE2ETestSuite) TestPopSelectionOneActors() { func (s *SelectionE2ETestSuite) TestPopSelectionTwoActors() { err := e2etestutil.AttestMachine(s.network, machines[1].name, machines[1].mnemonic, 1, s.feeDenom) s.Require().NoError(err) + err = monitor.AddParticipant(machines[1].address, time.Now().Unix()) + s.Require().NoError(err) out := s.perpareLocalTest() diff --git a/testutil/keeper/dao.go b/testutil/keeper/dao.go index 8fc16a2..9e32731 100644 --- a/testutil/keeper/dao.go +++ b/testutil/keeper/dao.go @@ -15,14 +15,16 @@ import ( govtypes "github.com/cosmos/cosmos-sdk/x/gov/types" typesparams "github.com/cosmos/cosmos-sdk/x/params/types" "github.com/golang/mock/gomock" + "github.com/planetmint/planetmint-go/monitor" + monitormocks "github.com/planetmint/planetmint-go/monitor/mocks" "github.com/planetmint/planetmint-go/x/dao/keeper" + daotestutil "github.com/planetmint/planetmint-go/x/dao/testutil" "github.com/planetmint/planetmint-go/x/dao/types" "github.com/stretchr/testify/require" - - daotestutil "github.com/planetmint/planetmint-go/x/dao/testutil" ) func DaoKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) { + monitor.SetMqttMonitorInstance(&monitormocks.MockMQTTMonitorClientI{}) storeKey := sdk.NewKVStoreKey(types.StoreKey) memStoreKey := storetypes.NewMemoryStoreKey(types.MemStoreKey) challengeStoreKey := storetypes.NewMemoryStoreKey(types.ChallengeKey) diff --git a/testutil/network/loader.go b/testutil/network/loader.go index e27e0fd..028aaaf 100644 --- a/testutil/network/loader.go +++ b/testutil/network/loader.go @@ -15,6 +15,8 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" "github.com/planetmint/planetmint-go/app" + "github.com/planetmint/planetmint-go/monitor" + monitormocks "github.com/planetmint/planetmint-go/monitor/mocks" "github.com/planetmint/planetmint-go/testutil/sample" "github.com/planetmint/planetmint-go/util" "github.com/planetmint/planetmint-go/util/mocks" @@ -39,6 +41,8 @@ func Load(t *testing.T, configs ...Config) *Network { // use mock client for testing util.MQTTClient = &mocks.MockMQTTClient{} + monitor.MonitorMQTTClient = &mocks.MockMQTTClient{} + monitor.SetMqttMonitorInstance(&monitormocks.MockMQTTMonitorClientI{}) elements.Client = &elementsmocks.MockClient{} util.RegisterAssetServiceHTTPClient = &mocks.MockClient{} diff --git a/util/determine_block_proposer.go b/util/determine_block_proposer.go index a6db7a2..2c36834 100644 --- a/util/determine_block_proposer.go +++ b/util/determine_block_proposer.go @@ -57,3 +57,17 @@ func IsValidatorBlockProposer(ctx sdk.Context, proposerAddress []byte, rootDir s result = hexProposerAddress == validatorIdentity return } + +func IsValidAddress(address string) (valid bool, err error) { + // Attempt to decode the address + _, err = sdk.AccAddressFromBech32(address) + if err != nil { + return + } + if !strings.Contains(address, "plmnt") { + valid = false + return + } + valid = true + return +} diff --git a/util/mocks/mqtt.go b/util/mocks/mqtt.go index e6fcba3..0d6bdfd 100644 --- a/util/mocks/mqtt.go +++ b/util/mocks/mqtt.go @@ -14,6 +14,9 @@ type MockMQTTClient struct { PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token UnsubscribeFunc func(topics ...string) mqtt.Token + IsConnectedFunc func() bool + connected bool + connectedMutex sync.Mutex } // GetConnectFunc fetches the mock client's `Connect` func @@ -96,6 +99,9 @@ func GetUnsubscribeFunc(_ ...string) mqtt.Token { // Connect is the mock client's `Disconnect` func func (m *MockMQTTClient) Connect() mqtt.Token { + m.connectedMutex.Lock() + m.connected = true + m.connectedMutex.Unlock() return GetConnectFunc() } @@ -116,3 +122,10 @@ func (m *MockMQTTClient) Subscribe(topic string, qos byte, callback mqtt.Message func (m *MockMQTTClient) Unsubscribe(topics ...string) mqtt.Token { return GetUnsubscribeFunc(topics...) } + +func (m *MockMQTTClient) IsConnected() bool { + m.connectedMutex.Lock() + connected := m.connected + m.connectedMutex.Unlock() + return connected +} diff --git a/util/mqtt.go b/util/mqtt.go index 3de4bc5..111968b 100644 --- a/util/mqtt.go +++ b/util/mqtt.go @@ -1,6 +1,7 @@ package util import ( + "encoding/json" "net" "strconv" "strings" @@ -20,6 +21,7 @@ type MQTTClientI interface { Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token Unsubscribe(topics ...string) mqtt.Token + IsConnected() bool } var ( @@ -32,7 +34,7 @@ const ( MqttCmdPrefix = "cmnd/" ) -func lazyLoadMQTTClient() { +func LazyLoadMQTTClient() { if MQTTClient != nil { return } @@ -66,7 +68,7 @@ func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) } func sendMqttPopInitMessages(challenge types.Challenge) (err error) { - lazyLoadMQTTClient() + LazyLoadMQTTClient() if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil { err = token.Error() return @@ -92,7 +94,7 @@ func sendMqttPopInitMessages(challenge types.Challenge) (err error) { } func GetMqttStatusOfParticipant(address string, responseTimeoutInMs int64) (isAvailable bool, err error) { - lazyLoadMQTTClient() + LazyLoadMQTTClient() if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil { err = token.Error() return @@ -142,3 +144,12 @@ func GetMqttStatusOfParticipant(address string, responseTimeoutInMs int64) (isAv MQTTClient.Disconnect(1000) return } + +func ToJSON(payload []byte) (map[string]interface{}, error) { + jsonString := string(payload) + + var result map[string]interface{} + // Unmarshal the JSON string into the map + err := json.Unmarshal([]byte(jsonString), &result) + return result, err +} diff --git a/util/mqtt_test.go b/util/mqtt_test.go index e210c17..922b6c5 100644 --- a/util/mqtt_test.go +++ b/util/mqtt_test.go @@ -31,3 +31,12 @@ func TestGetMqttStatusOfParticipantMocked(t *testing.T) { assert.NoError(t, err) assert.True(t, isAvailable) } + +func TestToJSON(t *testing.T) { + t.Parallel() + payload := []byte(`{"Time":"2024-03-26T11:50:42","Uptime":"0T00:50:19","UptimeSec":3019,"Heap":97,"SleepMode":"Dynamic","Sleep":10,"LoadAvg":99,"MqttCount":2,"Berry":{"HeapUsed":27,"Objects":491},"POWER1":"ON","POWER2":"ON","Dimmer":17,"Color":"00182C","HSBColor":"207,100,17","Channel":[0,9,17],"Scheme":0,"Width":1,"Fade":"OFF","Speed":1,"LedTable":"ON","Wifi":{"AP":1,"SSId":"UPC5729E56","BSSId":"C2:14:7E:6F:BC:C5","Channel":11,"Mode":"11n","RSSI":96,"Signal":-52,"LinkCount":1,"Downtime":"0T00:00:10"}}`) + + result, err := ToJSON(payload) + assert.NoError(t, err) + assert.Equal(t, 21, len(result)) +} diff --git a/util/rddl_token.go b/util/rddl_token.go index 6eb47b2..63b121b 100644 --- a/util/rddl_token.go +++ b/util/rddl_token.go @@ -5,15 +5,15 @@ import ( ) const ( - factor = 100000000.0 + Factor = 100000000.0 ) func RDDLToken2Uint(amount float64) uint64 { - return uint64(amount * factor) + return uint64(amount * Factor) } func RDDLToken2Float(amount uint64) float64 { - return float64(amount) / factor + return float64(amount) / Factor } func RDDLTokenStringToFloat(amount string) (amountFloat float64, err error) { diff --git a/util/time.go b/util/time.go new file mode 100644 index 0000000..e718b38 --- /dev/null +++ b/util/time.go @@ -0,0 +1,23 @@ +package util + +import ( + "time" +) + +func String2UnixTime(timeInput string) (int64, error) { + // Layout specifying the format of the input string + // Note: Go uses a specific reference time (Mon Jan 2 15:04:05 MST 2006) to define format layouts + layout := "2006-01-02T15:04:05" + + // Parse the string into a time.Time struct in local time zone + parsedTime, err := time.Parse(layout, timeInput) + if err != nil { + return 0, err + } + + // Convert to UTC if not already + utcTime := parsedTime.UTC() + unixTime := utcTime.Unix() + + return unixTime, nil +} diff --git a/util/time_test.go b/util/time_test.go new file mode 100644 index 0000000..d56c517 --- /dev/null +++ b/util/time_test.go @@ -0,0 +1,15 @@ +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestString2UnixTime(t *testing.T) { + t.Parallel() + input := "2024-03-26T11:10:41" + unixTime, err := String2UnixTime(input) + assert.NoError(t, err) + assert.Equal(t, int64(1711451441), unixTime) +} diff --git a/x/dao/abci.go b/x/dao/abci.go index f412597..49680f0 100644 --- a/x/dao/abci.go +++ b/x/dao/abci.go @@ -3,6 +3,7 @@ package dao import ( "encoding/hex" + "github.com/planetmint/planetmint-go/monitor" "github.com/planetmint/planetmint-go/util" "github.com/planetmint/planetmint-go/x/dao/keeper" @@ -23,7 +24,15 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper) hexProposerAddress := hex.EncodeToString(proposerAddress) if isPopHeight(ctx, k, currentBlockHeight) { // select PoP participants - challenger, challengee := k.SelectPopParticipants(ctx) + monitor.SetContext(ctx) + challenger, challengee, err := monitor.SelectPoPParticipantsOutOfActiveActors() + if err != nil { + util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err) + } + if err != nil || challenger == "" || challengee == "" { + challenger = "" + challengee = "" + } // Init PoP - independent from challenger and challengee // The keeper will send the MQTT initializing message to challenger && challengee diff --git a/x/dao/keeper/keeper.go b/x/dao/keeper/keeper.go index de10ec9..c9d9414 100644 --- a/x/dao/keeper/keeper.go +++ b/x/dao/keeper/keeper.go @@ -1,16 +1,13 @@ package keeper import ( - db "github.com/cometbft/cometbft-db" "github.com/cometbft/cometbft/libs/log" "github.com/cosmos/cosmos-sdk/codec" - "github.com/cosmos/cosmos-sdk/store/prefix" storetypes "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" - authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" paramtypes "github.com/cosmos/cosmos-sdk/x/params/types" - "github.com/planetmint/planetmint-go/util" + "github.com/planetmint/planetmint-go/monitor" "github.com/planetmint/planetmint-go/x/dao/types" ) @@ -53,7 +50,7 @@ func NewKeeper( if !ps.HasKeyTable() { ps = ps.WithKeyTable(types.ParamKeyTable()) } - + monitor.LazyMqttMonitorLoader(rootDir) return &Keeper{ cdc: cdc, storeKey: storeKey, @@ -75,57 +72,3 @@ func NewKeeper( func (k Keeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger().With("module", "x/"+types.ModuleName) } - -func (k Keeper) SelectPopParticipants(ctx sdk.Context) (challenger string, challengee string) { - var startAccountNumber uint64 - lastPopHeight := ctx.BlockHeight() - k.GetParams(ctx).PopEpochs - lastPop, found := k.LookupChallenge(ctx, lastPopHeight) - if lastPopHeight > 0 && found && lastPop.Challengee != "" { - lastAccountAddr := sdk.MustAccAddressFromBech32(lastPop.Challengee) - lastAccount := k.accountKeeper.GetAccount(ctx, lastAccountAddr) - startAccountNumber = lastAccount.GetAccountNumber() + 1 - } - - var participants []sdk.AccAddress - k.iterateAccountsForMachines(ctx, startAccountNumber, &participants, true) - if len(participants) != 2 { - k.iterateAccountsForMachines(ctx, startAccountNumber, &participants, false) - } - - // Not enough participants - if len(participants) != 2 { - return - } - - challenger = participants[0].String() - challengee = participants[1].String() - - return -} - -func (k Keeper) iterateAccountsForMachines(ctx sdk.Context, start uint64, participants *[]sdk.AccAddress, iterateFromStart bool) { - store := ctx.KVStore(k.accountKeeperKey) - accountStore := prefix.NewStore(store, authtypes.AccountNumberStoreKeyPrefix) - var iterator db.Iterator - if iterateFromStart { - iterator = accountStore.Iterator(sdk.Uint64ToBigEndian(start), nil) - } else { - iterator = accountStore.Iterator(nil, sdk.Uint64ToBigEndian(start)) - } - defer iterator.Close() - - for ; iterator.Valid(); iterator.Next() { - participant := sdk.AccAddress(iterator.Value()) - _, found := k.machineKeeper.GetMachineIndexByAddress(ctx, participant.String()) - if found { - available, err := util.GetMqttStatusOfParticipant(participant.String(), k.GetParams(ctx).MqttResponseTimeout) - if err == nil && available { - *participants = append(*participants, participant) - } - } - - if len(*participants) == 2 { - return - } - } -}