From 75f1444c81e45b093163384d4411b186f11734ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Eckel?= Date: Thu, 4 Apr 2024 13:57:34 +0200 Subject: [PATCH] * added a MqttMonitor modul with levelDB and periodic cleanup * initialized in the app * passed to dao keeper * added conversion methods (string2unixtime, byte ToJSON) * removed obsolete keeper code * maded RDDLToken.Factor public MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jürgen Eckel --- app/app.go | 16 ++- monitor/backend.go | 105 ++++++++++++++++++++ monitor/mqtt_monitor.go | 164 +++++++++++++++++++++++++++++++ monitor/mqtt_monitor_test.go | 97 ++++++++++++++++++ testutil/keeper/dao.go | 11 ++- util/determine_block_proposer.go | 14 +++ util/mqtt.go | 19 +++- util/mqtt_test.go | 9 ++ util/rddl_token.go | 6 +- util/time.go | 23 +++++ util/time_test.go | 15 +++ x/dao/abci.go | 10 +- x/dao/keeper/keeper.go | 62 +----------- 13 files changed, 484 insertions(+), 67 deletions(-) create mode 100644 monitor/backend.go create mode 100644 monitor/mqtt_monitor.go create mode 100644 monitor/mqtt_monitor_test.go create mode 100644 util/time.go create mode 100644 util/time_test.go diff --git a/app/app.go b/app/app.go index 04aae52..b449304 100644 --- a/app/app.go +++ b/app/app.go @@ -108,7 +108,9 @@ import ( solomachine "github.com/cosmos/ibc-go/v7/modules/light-clients/06-solomachine" ibctm "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" "github.com/spf13/cast" + "github.com/syndtr/goleveldb/leveldb" + "github.com/planetmint/planetmint-go/monitor" machinemodule "github.com/planetmint/planetmint-go/x/machine" machinemodulekeeper "github.com/planetmint/planetmint-go/x/machine/keeper" machinemoduletypes "github.com/planetmint/planetmint-go/x/machine/types" @@ -124,6 +126,7 @@ import ( // this line is used by starport scaffolding # stargate/app/moduleImport pmante "github.com/planetmint/planetmint-go/app/ante" + plmntconfig "github.com/planetmint/planetmint-go/config" "github.com/planetmint/planetmint-go/docs" appparams "github.com/planetmint/planetmint-go/lib/params" ) @@ -269,7 +272,8 @@ type App struct { // this line is used by starport scaffolding # stargate/app/keeperDeclaration // mm is the module manager - mm *module.Manager + mm *module.Manager + mqttMonitor *monitor.MqttMonitor // sm is the simulation manager sm *module.SimulationManager @@ -333,6 +337,15 @@ func New( memKeys: memKeys, } + aciveActorsDB, err := leveldb.OpenFile(homePath+"/activeActors.db", nil) + if err != nil { + panic(err) + } + app.mqttMonitor = monitor.NewMqttMonitorService(aciveActorsDB, *plmntconfig.GetConfig()) + err = app.mqttMonitor.Start() + if err != nil { + panic(err) + } app.ParamsKeeper = initParamsKeeper( appCodec, cdc, @@ -578,6 +591,7 @@ func New( app.MachineKeeper, authtypes.NewModuleAddress(govtypes.ModuleName).String(), homePath, + app.mqttMonitor, ) daoModule := daomodule.NewAppModule(appCodec, app.DaoKeeper, app.AccountKeeper, app.BankKeeper) diff --git a/monitor/backend.go b/monitor/backend.go new file mode 100644 index 0000000..edf956d --- /dev/null +++ b/monitor/backend.go @@ -0,0 +1,105 @@ +package monitor + +import ( + "encoding/json" + "log" + "time" + + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +type LastSeenEvent struct { + Address string `binding:"required" json:"address"` + Timestamp int64 `binding:"required" json:"timestamp"` +} + +func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err error) { + // store receive address - planetmint address pair + var lastSeen LastSeenEvent + lastSeen.Address = address + lastSeen.Timestamp = lastSeenTS + + lastSeenBytes, err := json.Marshal(lastSeen) + if err != nil { + mms.Log("[Monitor] Error serializing ConversionRequest: " + err.Error()) + return + } + increaseCounter := false + // returns an error if the entry does not exist (we have to increase the counter in this case) + _, err = mms.db.Get([]byte(address), nil) + if err != nil { + increaseCounter = true + } + mms.dbMutex.Lock() + if increaseCounter { + mms.numberOfElements++ + } + err = mms.db.Put([]byte(address), lastSeenBytes, nil) + mms.dbMutex.Unlock() + if err != nil { + log.Println("[Monitor] storing addresses in DB: " + err.Error()) + return + } + return +} + +func (mms *MqttMonitor) deleteEntry(key []byte) (err error) { + mms.dbMutex.Lock() + err = mms.db.Delete(key, nil) + mms.numberOfElements-- + mms.dbMutex.Unlock() + return +} + +func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) { + iter := mms.db.NewIterator(nil, nil) + defer iter.Release() + + for iter.Next() { + amount++ + } + + // Check for any errors encountered during iteration + if err := iter.Error(); err != nil { + log.Println("[Monitor] " + err.Error()) + } + return +} +func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSeenEvent, err error) { + key := iter.Key() + value := iter.Value() + err = json.Unmarshal(value, &lastSeen) + if err != nil { + mms.Log("[Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error()) + } + return +} + +func (mms *MqttMonitor) CleanupDB() { + // Create an iterator for the database + iter := mms.db.NewIterator(nil, nil) + defer iter.Release() // Make sure to release the iterator at the end + + // Iterate over all elements in the database + for iter.Next() { + // Use iter.Key() and iter.Value() to access the key and value + lastSeen, err := mms.getDataFromIter(iter) + if err != nil { + mms.Log("[Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error()) + continue + } + timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix() + if lastSeen.Timestamp <= timeThreshold { + // If the entry is older than 12 hours, delete it + err := mms.deleteEntry(iter.Key()) + if err != nil { + mms.Log("[Monitor] Failed to delete entry: " + err.Error()) + } + } + } + + // Check for any errors encountered during iteration + if err := iter.Error(); err != nil { + mms.Log(err.Error()) + } +} diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go new file mode 100644 index 0000000..1576170 --- /dev/null +++ b/monitor/mqtt_monitor.go @@ -0,0 +1,164 @@ +package monitor + +import ( + "math/rand" + "strconv" + "strings" + "sync" + "time" + + sdk "github.com/cosmos/cosmos-sdk/types" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/planetmint/planetmint-go/config" + "github.com/planetmint/planetmint-go/util" + "github.com/syndtr/goleveldb/leveldb" +) + +type MqttMonitor struct { + db *leveldb.DB + dbMutex sync.Mutex // Mutex to synchronize write operations + ticker *time.Ticker + CleanupPeriodicityInMinutes time.Duration + config config.Config + numberOfElements int64 + sdkContext *sdk.Context + contextMutex sync.Mutex +} + +func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor { + service := &MqttMonitor{db: db, config: config, numberOfElements: 0, CleanupPeriodicityInMinutes: 10} + return service +} + +func (mms *MqttMonitor) registerPeriodicTasks() { + mms.ticker = time.NewTicker(mms.CleanupPeriodicityInMinutes * time.Minute) + go func() { + for range mms.ticker.C { // Loop over the ticker channel + go mms.CleanupDB() + } + }() +} + +func (mms *MqttMonitor) Start() (err error) { + amount, err := mms.getAmountOfElements() + if err != nil { + return + } + mms.numberOfElements = amount + mms.registerPeriodicTasks() + go mms.MonitorActiveParticipants() + return +} +func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) { + for challenger == challengee { + // Generate random numbers + challenger = rand.Intn(int(mms.numberOfElements)) + challengee = rand.Intn(int(mms.numberOfElements)) + } + return +} +func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) { + if mms.numberOfElements < 2 { + return + } + randomChallenger, randomChallengee := mms.getRandomNumbers() + mms.Log("[Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements))) + mms.Log("[Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee)) + iter := mms.db.NewIterator(nil, nil) + defer iter.Release() + count := 0 + found := 0 + var lastSeen LastSeenEvent + for iter.Next() { + mms.Log("[Monitor] count: " + strconv.Itoa(count)) + if count == randomChallenger { + lastSeen, err = mms.getDataFromIter(iter) + if err != nil { + return + } + challenger = lastSeen.Address + found++ + } else if count == randomChallengee { + lastSeen, err = mms.getDataFromIter(iter) + if err != nil { + return + } + challengee = lastSeen.Address + found++ + } + + count++ + if found == 2 { + break + } + } + return +} + +func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) { + topicParts := strings.Split(msg.Topic(), "/") + if len(topicParts) != 3 { + return + } + if topicParts[0] != "tele" { + return + } + if topicParts[2] != "STATE" { + return + } + address := topicParts[1] + valid, err := util.IsValidAddress(address) + if err != nil || !valid { + return + } + payload, err := util.ToJSON(msg.Payload()) + if err != nil { + return + } + + timeString, ok := payload["Time"].(string) + if !ok { + return + } + unixTime, err := util.String2UnixTime(timeString) + if err != nil { + return + } + err = mms.AddParticipant(address, unixTime) + if err != nil { + mms.Log("[Monitor] error adding active actor to DB: " + address + " " + err.Error()) + } else { + mms.Log("[Monitor] added active actor to DB: " + address) + } +} + +func (mms *MqttMonitor) MonitorActiveParticipants() { + util.LazyLoadMQTTClient() + if token := util.MQTTClient.Connect(); token.Wait() && token.Error() != nil { + mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error()) + panic(token.Error()) + } + + var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler + + // Subscribe to a topic + subscriptionTopic := "tele/#" + if token := util.MQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error()) + panic(token.Error()) + } +} + +func (mms *MqttMonitor) Log(msg string) { + mms.contextMutex.Lock() + if mms.sdkContext != nil { + util.GetAppLogger().Info(*mms.sdkContext, msg) + } + mms.contextMutex.Unlock() +} + +func (mms *MqttMonitor) SetContext(ctx sdk.Context) { + mms.contextMutex.Lock() + mms.sdkContext = &ctx + mms.contextMutex.Unlock() +} diff --git a/monitor/mqtt_monitor_test.go b/monitor/mqtt_monitor_test.go new file mode 100644 index 0000000..5c5c9e1 --- /dev/null +++ b/monitor/mqtt_monitor_test.go @@ -0,0 +1,97 @@ +package monitor_test + +import ( + "testing" + "time" + + "github.com/planetmint/planetmint-go/config" + "github.com/planetmint/planetmint-go/monitor" + "github.com/planetmint/planetmint-go/util" + "github.com/planetmint/planetmint-go/util/mocks" + "github.com/stretchr/testify/assert" + "github.com/syndtr/goleveldb/leveldb" +) + +func init() { + // Use MQTT mock client + util.MQTTClient = &mocks.MockMQTTClient{} +} + +const ( + challengerInput = "plmnt1fx3x6u8k5q8kjl7pamsuwjtut8nkks8dk92dek" + challengeeInput = "plmnt1fsaljz3xqf6vchkjxfzfrd30cdp3j4vqh298pr" +) + +func TestGMonitorActiveParticipants(t *testing.T) { + util.LazyLoadMQTTClient() + cfg := config.GetConfig() + db, err := leveldb.OpenFile("./activeActors.db", nil) + assert.NoError(t, err) + mqttMonitor := monitor.NewMqttMonitorService(db, *cfg) + err = mqttMonitor.Start() + assert.NoError(t, err) + + currentTime := time.Now() + unixTime := currentTime.Unix() + err = mqttMonitor.AddParticipant(challengerInput, unixTime) + assert.NoError(t, err) + err = mqttMonitor.AddParticipant(challengeeInput, unixTime) + assert.NoError(t, err) + mqttMonitor.CleanupDB() + + challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors() + assert.NoError(t, err) + assert.Contains(t, challenger, "plmnt") + assert.Contains(t, challengee, "plmnt") +} + +func TestCleanupRemoval(t *testing.T) { + util.LazyLoadMQTTClient() + + cfg := config.GetConfig() + db, err := leveldb.OpenFile("./activeActors.db", nil) + assert.NoError(t, err) + mqttMonitor := monitor.NewMqttMonitorService(db, *cfg) + err = mqttMonitor.Start() + assert.NoError(t, err) + + currentTime := time.Now() + CleanupPeriodicityAgo := currentTime.Add(-1 * mqttMonitor.CleanupPeriodicityInMinutes * time.Minute) + unixTimeNow := currentTime.Unix() + err = mqttMonitor.AddParticipant(challengerInput, unixTimeNow) + assert.NoError(t, err) + err = mqttMonitor.AddParticipant(challengeeInput, CleanupPeriodicityAgo.Unix()-1) + assert.NoError(t, err) + mqttMonitor.CleanupDB() + + challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors() + assert.NoError(t, err) + assert.Equal(t, "", challenger) + assert.Contains(t, "", challengee) +} + +func TestCleanupPrecisionTest(t *testing.T) { + util.LazyLoadMQTTClient() + + cfg := config.GetConfig() + db, err := leveldb.OpenFile("./activeActors.db", nil) + assert.NoError(t, err) + mqttMonitor := monitor.NewMqttMonitorService(db, *cfg) + err = mqttMonitor.Start() + assert.NoError(t, err) + + currentTime := time.Now() + CleanupThresholdAgo := currentTime.Add(-1 * mqttMonitor.CleanupPeriodicityInMinutes * time.Minute) + aboveThreshold := CleanupThresholdAgo.Unix() + 10 + unixTimeNow := currentTime.Unix() + err = mqttMonitor.AddParticipant(challengerInput, unixTimeNow) + assert.NoError(t, err) + err = mqttMonitor.AddParticipant(challengeeInput, aboveThreshold) + assert.NoError(t, err) + mqttMonitor.CleanupDB() + + challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors() + assert.NoError(t, err) + assert.Contains(t, challenger, "plmnt") + assert.Contains(t, challengee, "plmnt") +} diff --git a/testutil/keeper/dao.go b/testutil/keeper/dao.go index 8fc16a2..6d1d742 100644 --- a/testutil/keeper/dao.go +++ b/testutil/keeper/dao.go @@ -15,9 +15,12 @@ import ( govtypes "github.com/cosmos/cosmos-sdk/x/gov/types" typesparams "github.com/cosmos/cosmos-sdk/x/params/types" "github.com/golang/mock/gomock" + "github.com/planetmint/planetmint-go/config" + "github.com/planetmint/planetmint-go/monitor" "github.com/planetmint/planetmint-go/x/dao/keeper" "github.com/planetmint/planetmint-go/x/dao/types" "github.com/stretchr/testify/require" + "github.com/syndtr/goleveldb/leveldb" daotestutil "github.com/planetmint/planetmint-go/x/dao/testutil" ) @@ -53,6 +56,11 @@ func DaoKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) { ctrl := gomock.NewController(t) bk := daotestutil.NewMockBankKeeper(ctrl) + aciveActorsDB, err := leveldb.OpenFile("./activeActors.db", nil) + if err != nil { + panic(err) + } + mqttMonitor := monitor.NewMqttMonitorService(aciveActorsDB, *config.GetConfig()) bk.EXPECT().MintCoins(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() bk.EXPECT().BurnCoins(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -74,10 +82,11 @@ func DaoKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) { nil, authtypes.NewModuleAddress(govtypes.ModuleName).String(), "", + mqttMonitor, ) // Initialize params - err := k.SetParams(ctx, types.DefaultParams()) + err = k.SetParams(ctx, types.DefaultParams()) if err != nil { panic(err) } diff --git a/util/determine_block_proposer.go b/util/determine_block_proposer.go index a6db7a2..2c36834 100644 --- a/util/determine_block_proposer.go +++ b/util/determine_block_proposer.go @@ -57,3 +57,17 @@ func IsValidatorBlockProposer(ctx sdk.Context, proposerAddress []byte, rootDir s result = hexProposerAddress == validatorIdentity return } + +func IsValidAddress(address string) (valid bool, err error) { + // Attempt to decode the address + _, err = sdk.AccAddressFromBech32(address) + if err != nil { + return + } + if !strings.Contains(address, "plmnt") { + valid = false + return + } + valid = true + return +} diff --git a/util/mqtt.go b/util/mqtt.go index 3de4bc5..620bfb1 100644 --- a/util/mqtt.go +++ b/util/mqtt.go @@ -1,6 +1,7 @@ package util import ( + "encoding/json" "net" "strconv" "strings" @@ -25,14 +26,16 @@ type MQTTClientI interface { var ( MQTTClient MQTTClientI mqttMachineByAddressAvailabilityMapping map[string]bool + mqttAcitveMachineMapping map[string]int64 rwMu sync.RWMutex + rwActiveMachineMu sync.RWMutex ) const ( MqttCmdPrefix = "cmnd/" ) -func lazyLoadMQTTClient() { +func LazyLoadMQTTClient() { if MQTTClient != nil { return } @@ -50,6 +53,7 @@ func lazyLoadMQTTClient() { func init() { mqttMachineByAddressAvailabilityMapping = make(map[string]bool) + mqttAcitveMachineMapping = make(map[string]int64) } func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) { @@ -66,7 +70,7 @@ func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) } func sendMqttPopInitMessages(challenge types.Challenge) (err error) { - lazyLoadMQTTClient() + LazyLoadMQTTClient() if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil { err = token.Error() return @@ -92,7 +96,7 @@ func sendMqttPopInitMessages(challenge types.Challenge) (err error) { } func GetMqttStatusOfParticipant(address string, responseTimeoutInMs int64) (isAvailable bool, err error) { - lazyLoadMQTTClient() + LazyLoadMQTTClient() if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil { err = token.Error() return @@ -142,3 +146,12 @@ func GetMqttStatusOfParticipant(address string, responseTimeoutInMs int64) (isAv MQTTClient.Disconnect(1000) return } + +func ToJSON(payload []byte) (map[string]interface{}, error) { + jsonString := string(payload) + + var result map[string]interface{} + // Unmarshal the JSON string into the map + err := json.Unmarshal([]byte(jsonString), &result) + return result, err +} diff --git a/util/mqtt_test.go b/util/mqtt_test.go index e210c17..922b6c5 100644 --- a/util/mqtt_test.go +++ b/util/mqtt_test.go @@ -31,3 +31,12 @@ func TestGetMqttStatusOfParticipantMocked(t *testing.T) { assert.NoError(t, err) assert.True(t, isAvailable) } + +func TestToJSON(t *testing.T) { + t.Parallel() + payload := []byte(`{"Time":"2024-03-26T11:50:42","Uptime":"0T00:50:19","UptimeSec":3019,"Heap":97,"SleepMode":"Dynamic","Sleep":10,"LoadAvg":99,"MqttCount":2,"Berry":{"HeapUsed":27,"Objects":491},"POWER1":"ON","POWER2":"ON","Dimmer":17,"Color":"00182C","HSBColor":"207,100,17","Channel":[0,9,17],"Scheme":0,"Width":1,"Fade":"OFF","Speed":1,"LedTable":"ON","Wifi":{"AP":1,"SSId":"UPC5729E56","BSSId":"C2:14:7E:6F:BC:C5","Channel":11,"Mode":"11n","RSSI":96,"Signal":-52,"LinkCount":1,"Downtime":"0T00:00:10"}}`) + + result, err := ToJSON(payload) + assert.NoError(t, err) + assert.Equal(t, 21, len(result)) +} diff --git a/util/rddl_token.go b/util/rddl_token.go index 6eb47b2..63b121b 100644 --- a/util/rddl_token.go +++ b/util/rddl_token.go @@ -5,15 +5,15 @@ import ( ) const ( - factor = 100000000.0 + Factor = 100000000.0 ) func RDDLToken2Uint(amount float64) uint64 { - return uint64(amount * factor) + return uint64(amount * Factor) } func RDDLToken2Float(amount uint64) float64 { - return float64(amount) / factor + return float64(amount) / Factor } func RDDLTokenStringToFloat(amount string) (amountFloat float64, err error) { diff --git a/util/time.go b/util/time.go new file mode 100644 index 0000000..e718b38 --- /dev/null +++ b/util/time.go @@ -0,0 +1,23 @@ +package util + +import ( + "time" +) + +func String2UnixTime(timeInput string) (int64, error) { + // Layout specifying the format of the input string + // Note: Go uses a specific reference time (Mon Jan 2 15:04:05 MST 2006) to define format layouts + layout := "2006-01-02T15:04:05" + + // Parse the string into a time.Time struct in local time zone + parsedTime, err := time.Parse(layout, timeInput) + if err != nil { + return 0, err + } + + // Convert to UTC if not already + utcTime := parsedTime.UTC() + unixTime := utcTime.Unix() + + return unixTime, nil +} diff --git a/util/time_test.go b/util/time_test.go new file mode 100644 index 0000000..d56c517 --- /dev/null +++ b/util/time_test.go @@ -0,0 +1,15 @@ +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestString2UnixTime(t *testing.T) { + t.Parallel() + input := "2024-03-26T11:10:41" + unixTime, err := String2UnixTime(input) + assert.NoError(t, err) + assert.Equal(t, int64(1711451441), unixTime) +} diff --git a/x/dao/abci.go b/x/dao/abci.go index f412597..e3d8de9 100644 --- a/x/dao/abci.go +++ b/x/dao/abci.go @@ -23,7 +23,15 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper) hexProposerAddress := hex.EncodeToString(proposerAddress) if isPopHeight(ctx, k, currentBlockHeight) { // select PoP participants - challenger, challengee := k.SelectPopParticipants(ctx) + k.MqttMonitor.SetContext(ctx) + challenger, challengee, err := k.MqttMonitor.SelectPoPParticipantsOutOfActiveActors() + if err != nil { + util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err) + } + if err != nil || challenger == "" || challengee == "" { + challenger = "" + challengee = "" + } // Init PoP - independent from challenger and challengee // The keeper will send the MQTT initializing message to challenger && challengee diff --git a/x/dao/keeper/keeper.go b/x/dao/keeper/keeper.go index de10ec9..5379529 100644 --- a/x/dao/keeper/keeper.go +++ b/x/dao/keeper/keeper.go @@ -1,16 +1,13 @@ package keeper import ( - db "github.com/cometbft/cometbft-db" "github.com/cometbft/cometbft/libs/log" "github.com/cosmos/cosmos-sdk/codec" - "github.com/cosmos/cosmos-sdk/store/prefix" storetypes "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" - authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" paramtypes "github.com/cosmos/cosmos-sdk/x/params/types" - "github.com/planetmint/planetmint-go/util" + "github.com/planetmint/planetmint-go/monitor" "github.com/planetmint/planetmint-go/x/dao/types" ) @@ -30,6 +27,7 @@ type ( machineKeeper types.MachineKeeper authority string RootDir string + MqttMonitor *monitor.MqttMonitor } ) @@ -48,6 +46,7 @@ func NewKeeper( machineKeeper types.MachineKeeper, authority string, rootDir string, + mqttMonitor *monitor.MqttMonitor, ) *Keeper { // set KeyTable if it has not already been set if !ps.HasKeyTable() { @@ -69,63 +68,10 @@ func NewKeeper( machineKeeper: machineKeeper, authority: authority, RootDir: rootDir, + MqttMonitor: mqttMonitor, } } func (k Keeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger().With("module", "x/"+types.ModuleName) } - -func (k Keeper) SelectPopParticipants(ctx sdk.Context) (challenger string, challengee string) { - var startAccountNumber uint64 - lastPopHeight := ctx.BlockHeight() - k.GetParams(ctx).PopEpochs - lastPop, found := k.LookupChallenge(ctx, lastPopHeight) - if lastPopHeight > 0 && found && lastPop.Challengee != "" { - lastAccountAddr := sdk.MustAccAddressFromBech32(lastPop.Challengee) - lastAccount := k.accountKeeper.GetAccount(ctx, lastAccountAddr) - startAccountNumber = lastAccount.GetAccountNumber() + 1 - } - - var participants []sdk.AccAddress - k.iterateAccountsForMachines(ctx, startAccountNumber, &participants, true) - if len(participants) != 2 { - k.iterateAccountsForMachines(ctx, startAccountNumber, &participants, false) - } - - // Not enough participants - if len(participants) != 2 { - return - } - - challenger = participants[0].String() - challengee = participants[1].String() - - return -} - -func (k Keeper) iterateAccountsForMachines(ctx sdk.Context, start uint64, participants *[]sdk.AccAddress, iterateFromStart bool) { - store := ctx.KVStore(k.accountKeeperKey) - accountStore := prefix.NewStore(store, authtypes.AccountNumberStoreKeyPrefix) - var iterator db.Iterator - if iterateFromStart { - iterator = accountStore.Iterator(sdk.Uint64ToBigEndian(start), nil) - } else { - iterator = accountStore.Iterator(nil, sdk.Uint64ToBigEndian(start)) - } - defer iterator.Close() - - for ; iterator.Valid(); iterator.Next() { - participant := sdk.AccAddress(iterator.Value()) - _, found := k.machineKeeper.GetMachineIndexByAddress(ctx, participant.String()) - if found { - available, err := util.GetMqttStatusOfParticipant(participant.String(), k.GetParams(ctx).MqttResponseTimeout) - if err == nil && available { - *participants = append(*participants, participant) - } - } - - if len(*participants) == 2 { - return - } - } -}