* added a MqttMonitor modul with levelDB and periodic cleanup

* initialized in the app
* passed to dao keeper
* added conversion methods (string2unixtime, byte ToJSON)
* removed obsolete keeper code
* maded  RDDLToken.Factor public

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2024-04-04 13:57:34 +02:00
parent 0ec6fba4ec
commit 75f1444c81
No known key found for this signature in database
13 changed files with 484 additions and 67 deletions

View File

@ -108,7 +108,9 @@ import (
solomachine "github.com/cosmos/ibc-go/v7/modules/light-clients/06-solomachine"
ibctm "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint"
"github.com/spf13/cast"
"github.com/syndtr/goleveldb/leveldb"
"github.com/planetmint/planetmint-go/monitor"
machinemodule "github.com/planetmint/planetmint-go/x/machine"
machinemodulekeeper "github.com/planetmint/planetmint-go/x/machine/keeper"
machinemoduletypes "github.com/planetmint/planetmint-go/x/machine/types"
@ -124,6 +126,7 @@ import (
// this line is used by starport scaffolding # stargate/app/moduleImport
pmante "github.com/planetmint/planetmint-go/app/ante"
plmntconfig "github.com/planetmint/planetmint-go/config"
"github.com/planetmint/planetmint-go/docs"
appparams "github.com/planetmint/planetmint-go/lib/params"
)
@ -269,7 +272,8 @@ type App struct {
// this line is used by starport scaffolding # stargate/app/keeperDeclaration
// mm is the module manager
mm *module.Manager
mm *module.Manager
mqttMonitor *monitor.MqttMonitor
// sm is the simulation manager
sm *module.SimulationManager
@ -333,6 +337,15 @@ func New(
memKeys: memKeys,
}
aciveActorsDB, err := leveldb.OpenFile(homePath+"/activeActors.db", nil)
if err != nil {
panic(err)
}
app.mqttMonitor = monitor.NewMqttMonitorService(aciveActorsDB, *plmntconfig.GetConfig())
err = app.mqttMonitor.Start()
if err != nil {
panic(err)
}
app.ParamsKeeper = initParamsKeeper(
appCodec,
cdc,
@ -578,6 +591,7 @@ func New(
app.MachineKeeper,
authtypes.NewModuleAddress(govtypes.ModuleName).String(),
homePath,
app.mqttMonitor,
)
daoModule := daomodule.NewAppModule(appCodec, app.DaoKeeper, app.AccountKeeper, app.BankKeeper)

105
monitor/backend.go Normal file
View File

@ -0,0 +1,105 @@
package monitor
import (
"encoding/json"
"log"
"time"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
type LastSeenEvent struct {
Address string `binding:"required" json:"address"`
Timestamp int64 `binding:"required" json:"timestamp"`
}
func (mms *MqttMonitor) AddParticipant(address string, lastSeenTS int64) (err error) {
// store receive address - planetmint address pair
var lastSeen LastSeenEvent
lastSeen.Address = address
lastSeen.Timestamp = lastSeenTS
lastSeenBytes, err := json.Marshal(lastSeen)
if err != nil {
mms.Log("[Monitor] Error serializing ConversionRequest: " + err.Error())
return
}
increaseCounter := false
// returns an error if the entry does not exist (we have to increase the counter in this case)
_, err = mms.db.Get([]byte(address), nil)
if err != nil {
increaseCounter = true
}
mms.dbMutex.Lock()
if increaseCounter {
mms.numberOfElements++
}
err = mms.db.Put([]byte(address), lastSeenBytes, nil)
mms.dbMutex.Unlock()
if err != nil {
log.Println("[Monitor] storing addresses in DB: " + err.Error())
return
}
return
}
func (mms *MqttMonitor) deleteEntry(key []byte) (err error) {
mms.dbMutex.Lock()
err = mms.db.Delete(key, nil)
mms.numberOfElements--
mms.dbMutex.Unlock()
return
}
func (mms *MqttMonitor) getAmountOfElements() (amount int64, err error) {
iter := mms.db.NewIterator(nil, nil)
defer iter.Release()
for iter.Next() {
amount++
}
// Check for any errors encountered during iteration
if err := iter.Error(); err != nil {
log.Println("[Monitor] " + err.Error())
}
return
}
func (mms *MqttMonitor) getDataFromIter(iter iterator.Iterator) (lastSeen LastSeenEvent, err error) {
key := iter.Key()
value := iter.Value()
err = json.Unmarshal(value, &lastSeen)
if err != nil {
mms.Log("[Monitor] Failed to unmarshal entry: " + string(key) + " - " + err.Error())
}
return
}
func (mms *MqttMonitor) CleanupDB() {
// Create an iterator for the database
iter := mms.db.NewIterator(nil, nil)
defer iter.Release() // Make sure to release the iterator at the end
// Iterate over all elements in the database
for iter.Next() {
// Use iter.Key() and iter.Value() to access the key and value
lastSeen, err := mms.getDataFromIter(iter)
if err != nil {
mms.Log("[Monitor] Failed to unmarshal entry: " + string(iter.Key()) + " - " + err.Error())
continue
}
timeThreshold := time.Now().Add(-1 * mms.CleanupPeriodicityInMinutes * time.Minute).Unix()
if lastSeen.Timestamp <= timeThreshold {
// If the entry is older than 12 hours, delete it
err := mms.deleteEntry(iter.Key())
if err != nil {
mms.Log("[Monitor] Failed to delete entry: " + err.Error())
}
}
}
// Check for any errors encountered during iteration
if err := iter.Error(); err != nil {
mms.Log(err.Error())
}
}

164
monitor/mqtt_monitor.go Normal file
View File

@ -0,0 +1,164 @@
package monitor
import (
"math/rand"
"strconv"
"strings"
"sync"
"time"
sdk "github.com/cosmos/cosmos-sdk/types"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/planetmint/planetmint-go/config"
"github.com/planetmint/planetmint-go/util"
"github.com/syndtr/goleveldb/leveldb"
)
type MqttMonitor struct {
db *leveldb.DB
dbMutex sync.Mutex // Mutex to synchronize write operations
ticker *time.Ticker
CleanupPeriodicityInMinutes time.Duration
config config.Config
numberOfElements int64
sdkContext *sdk.Context
contextMutex sync.Mutex
}
func NewMqttMonitorService(db *leveldb.DB, config config.Config) *MqttMonitor {
service := &MqttMonitor{db: db, config: config, numberOfElements: 0, CleanupPeriodicityInMinutes: 10}
return service
}
func (mms *MqttMonitor) registerPeriodicTasks() {
mms.ticker = time.NewTicker(mms.CleanupPeriodicityInMinutes * time.Minute)
go func() {
for range mms.ticker.C { // Loop over the ticker channel
go mms.CleanupDB()
}
}()
}
func (mms *MqttMonitor) Start() (err error) {
amount, err := mms.getAmountOfElements()
if err != nil {
return
}
mms.numberOfElements = amount
mms.registerPeriodicTasks()
go mms.MonitorActiveParticipants()
return
}
func (mms *MqttMonitor) getRandomNumbers() (challenger int, challengee int) {
for challenger == challengee {
// Generate random numbers
challenger = rand.Intn(int(mms.numberOfElements))
challengee = rand.Intn(int(mms.numberOfElements))
}
return
}
func (mms *MqttMonitor) SelectPoPParticipantsOutOfActiveActors() (challenger string, challengee string, err error) {
if mms.numberOfElements < 2 {
return
}
randomChallenger, randomChallengee := mms.getRandomNumbers()
mms.Log("[Monitor] number of elements: " + strconv.Itoa(int(mms.numberOfElements)))
mms.Log("[Monitor] selected IDs: " + strconv.Itoa(randomChallenger) + " " + strconv.Itoa(randomChallengee))
iter := mms.db.NewIterator(nil, nil)
defer iter.Release()
count := 0
found := 0
var lastSeen LastSeenEvent
for iter.Next() {
mms.Log("[Monitor] count: " + strconv.Itoa(count))
if count == randomChallenger {
lastSeen, err = mms.getDataFromIter(iter)
if err != nil {
return
}
challenger = lastSeen.Address
found++
} else if count == randomChallengee {
lastSeen, err = mms.getDataFromIter(iter)
if err != nil {
return
}
challengee = lastSeen.Address
found++
}
count++
if found == 2 {
break
}
}
return
}
func (mms *MqttMonitor) MqttMsgHandler(_ mqtt.Client, msg mqtt.Message) {
topicParts := strings.Split(msg.Topic(), "/")
if len(topicParts) != 3 {
return
}
if topicParts[0] != "tele" {
return
}
if topicParts[2] != "STATE" {
return
}
address := topicParts[1]
valid, err := util.IsValidAddress(address)
if err != nil || !valid {
return
}
payload, err := util.ToJSON(msg.Payload())
if err != nil {
return
}
timeString, ok := payload["Time"].(string)
if !ok {
return
}
unixTime, err := util.String2UnixTime(timeString)
if err != nil {
return
}
err = mms.AddParticipant(address, unixTime)
if err != nil {
mms.Log("[Monitor] error adding active actor to DB: " + address + " " + err.Error())
} else {
mms.Log("[Monitor] added active actor to DB: " + address)
}
}
func (mms *MqttMonitor) MonitorActiveParticipants() {
util.LazyLoadMQTTClient()
if token := util.MQTTClient.Connect(); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error connecting to mqtt: " + token.Error().Error())
panic(token.Error())
}
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
// Subscribe to a topic
subscriptionTopic := "tele/#"
if token := util.MQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
mms.Log("[Monitor] error registering the mqtt subscription: " + token.Error().Error())
panic(token.Error())
}
}
func (mms *MqttMonitor) Log(msg string) {
mms.contextMutex.Lock()
if mms.sdkContext != nil {
util.GetAppLogger().Info(*mms.sdkContext, msg)
}
mms.contextMutex.Unlock()
}
func (mms *MqttMonitor) SetContext(ctx sdk.Context) {
mms.contextMutex.Lock()
mms.sdkContext = &ctx
mms.contextMutex.Unlock()
}

View File

@ -0,0 +1,97 @@
package monitor_test
import (
"testing"
"time"
"github.com/planetmint/planetmint-go/config"
"github.com/planetmint/planetmint-go/monitor"
"github.com/planetmint/planetmint-go/util"
"github.com/planetmint/planetmint-go/util/mocks"
"github.com/stretchr/testify/assert"
"github.com/syndtr/goleveldb/leveldb"
)
func init() {
// Use MQTT mock client
util.MQTTClient = &mocks.MockMQTTClient{}
}
const (
challengerInput = "plmnt1fx3x6u8k5q8kjl7pamsuwjtut8nkks8dk92dek"
challengeeInput = "plmnt1fsaljz3xqf6vchkjxfzfrd30cdp3j4vqh298pr"
)
func TestGMonitorActiveParticipants(t *testing.T) {
util.LazyLoadMQTTClient()
cfg := config.GetConfig()
db, err := leveldb.OpenFile("./activeActors.db", nil)
assert.NoError(t, err)
mqttMonitor := monitor.NewMqttMonitorService(db, *cfg)
err = mqttMonitor.Start()
assert.NoError(t, err)
currentTime := time.Now()
unixTime := currentTime.Unix()
err = mqttMonitor.AddParticipant(challengerInput, unixTime)
assert.NoError(t, err)
err = mqttMonitor.AddParticipant(challengeeInput, unixTime)
assert.NoError(t, err)
mqttMonitor.CleanupDB()
challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors()
assert.NoError(t, err)
assert.Contains(t, challenger, "plmnt")
assert.Contains(t, challengee, "plmnt")
}
func TestCleanupRemoval(t *testing.T) {
util.LazyLoadMQTTClient()
cfg := config.GetConfig()
db, err := leveldb.OpenFile("./activeActors.db", nil)
assert.NoError(t, err)
mqttMonitor := monitor.NewMqttMonitorService(db, *cfg)
err = mqttMonitor.Start()
assert.NoError(t, err)
currentTime := time.Now()
CleanupPeriodicityAgo := currentTime.Add(-1 * mqttMonitor.CleanupPeriodicityInMinutes * time.Minute)
unixTimeNow := currentTime.Unix()
err = mqttMonitor.AddParticipant(challengerInput, unixTimeNow)
assert.NoError(t, err)
err = mqttMonitor.AddParticipant(challengeeInput, CleanupPeriodicityAgo.Unix()-1)
assert.NoError(t, err)
mqttMonitor.CleanupDB()
challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors()
assert.NoError(t, err)
assert.Equal(t, "", challenger)
assert.Contains(t, "", challengee)
}
func TestCleanupPrecisionTest(t *testing.T) {
util.LazyLoadMQTTClient()
cfg := config.GetConfig()
db, err := leveldb.OpenFile("./activeActors.db", nil)
assert.NoError(t, err)
mqttMonitor := monitor.NewMqttMonitorService(db, *cfg)
err = mqttMonitor.Start()
assert.NoError(t, err)
currentTime := time.Now()
CleanupThresholdAgo := currentTime.Add(-1 * mqttMonitor.CleanupPeriodicityInMinutes * time.Minute)
aboveThreshold := CleanupThresholdAgo.Unix() + 10
unixTimeNow := currentTime.Unix()
err = mqttMonitor.AddParticipant(challengerInput, unixTimeNow)
assert.NoError(t, err)
err = mqttMonitor.AddParticipant(challengeeInput, aboveThreshold)
assert.NoError(t, err)
mqttMonitor.CleanupDB()
challenger, challengee, err := mqttMonitor.SelectPoPParticipantsOutOfActiveActors()
assert.NoError(t, err)
assert.Contains(t, challenger, "plmnt")
assert.Contains(t, challengee, "plmnt")
}

View File

@ -15,9 +15,12 @@ import (
govtypes "github.com/cosmos/cosmos-sdk/x/gov/types"
typesparams "github.com/cosmos/cosmos-sdk/x/params/types"
"github.com/golang/mock/gomock"
"github.com/planetmint/planetmint-go/config"
"github.com/planetmint/planetmint-go/monitor"
"github.com/planetmint/planetmint-go/x/dao/keeper"
"github.com/planetmint/planetmint-go/x/dao/types"
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb"
daotestutil "github.com/planetmint/planetmint-go/x/dao/testutil"
)
@ -53,6 +56,11 @@ func DaoKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) {
ctrl := gomock.NewController(t)
bk := daotestutil.NewMockBankKeeper(ctrl)
aciveActorsDB, err := leveldb.OpenFile("./activeActors.db", nil)
if err != nil {
panic(err)
}
mqttMonitor := monitor.NewMqttMonitorService(aciveActorsDB, *config.GetConfig())
bk.EXPECT().MintCoins(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
bk.EXPECT().BurnCoins(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
@ -74,10 +82,11 @@ func DaoKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) {
nil,
authtypes.NewModuleAddress(govtypes.ModuleName).String(),
"",
mqttMonitor,
)
// Initialize params
err := k.SetParams(ctx, types.DefaultParams())
err = k.SetParams(ctx, types.DefaultParams())
if err != nil {
panic(err)
}

View File

@ -57,3 +57,17 @@ func IsValidatorBlockProposer(ctx sdk.Context, proposerAddress []byte, rootDir s
result = hexProposerAddress == validatorIdentity
return
}
func IsValidAddress(address string) (valid bool, err error) {
// Attempt to decode the address
_, err = sdk.AccAddressFromBech32(address)
if err != nil {
return
}
if !strings.Contains(address, "plmnt") {
valid = false
return
}
valid = true
return
}

View File

@ -1,6 +1,7 @@
package util
import (
"encoding/json"
"net"
"strconv"
"strings"
@ -25,14 +26,16 @@ type MQTTClientI interface {
var (
MQTTClient MQTTClientI
mqttMachineByAddressAvailabilityMapping map[string]bool
mqttAcitveMachineMapping map[string]int64
rwMu sync.RWMutex
rwActiveMachineMu sync.RWMutex
)
const (
MqttCmdPrefix = "cmnd/"
)
func lazyLoadMQTTClient() {
func LazyLoadMQTTClient() {
if MQTTClient != nil {
return
}
@ -50,6 +53,7 @@ func lazyLoadMQTTClient() {
func init() {
mqttMachineByAddressAvailabilityMapping = make(map[string]bool)
mqttAcitveMachineMapping = make(map[string]int64)
}
func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) {
@ -66,7 +70,7 @@ func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge)
}
func sendMqttPopInitMessages(challenge types.Challenge) (err error) {
lazyLoadMQTTClient()
LazyLoadMQTTClient()
if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil {
err = token.Error()
return
@ -92,7 +96,7 @@ func sendMqttPopInitMessages(challenge types.Challenge) (err error) {
}
func GetMqttStatusOfParticipant(address string, responseTimeoutInMs int64) (isAvailable bool, err error) {
lazyLoadMQTTClient()
LazyLoadMQTTClient()
if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil {
err = token.Error()
return
@ -142,3 +146,12 @@ func GetMqttStatusOfParticipant(address string, responseTimeoutInMs int64) (isAv
MQTTClient.Disconnect(1000)
return
}
func ToJSON(payload []byte) (map[string]interface{}, error) {
jsonString := string(payload)
var result map[string]interface{}
// Unmarshal the JSON string into the map
err := json.Unmarshal([]byte(jsonString), &result)
return result, err
}

View File

@ -31,3 +31,12 @@ func TestGetMqttStatusOfParticipantMocked(t *testing.T) {
assert.NoError(t, err)
assert.True(t, isAvailable)
}
func TestToJSON(t *testing.T) {
t.Parallel()
payload := []byte(`{"Time":"2024-03-26T11:50:42","Uptime":"0T00:50:19","UptimeSec":3019,"Heap":97,"SleepMode":"Dynamic","Sleep":10,"LoadAvg":99,"MqttCount":2,"Berry":{"HeapUsed":27,"Objects":491},"POWER1":"ON","POWER2":"ON","Dimmer":17,"Color":"00182C","HSBColor":"207,100,17","Channel":[0,9,17],"Scheme":0,"Width":1,"Fade":"OFF","Speed":1,"LedTable":"ON","Wifi":{"AP":1,"SSId":"UPC5729E56","BSSId":"C2:14:7E:6F:BC:C5","Channel":11,"Mode":"11n","RSSI":96,"Signal":-52,"LinkCount":1,"Downtime":"0T00:00:10"}}`)
result, err := ToJSON(payload)
assert.NoError(t, err)
assert.Equal(t, 21, len(result))
}

View File

@ -5,15 +5,15 @@ import (
)
const (
factor = 100000000.0
Factor = 100000000.0
)
func RDDLToken2Uint(amount float64) uint64 {
return uint64(amount * factor)
return uint64(amount * Factor)
}
func RDDLToken2Float(amount uint64) float64 {
return float64(amount) / factor
return float64(amount) / Factor
}
func RDDLTokenStringToFloat(amount string) (amountFloat float64, err error) {

23
util/time.go Normal file
View File

@ -0,0 +1,23 @@
package util
import (
"time"
)
func String2UnixTime(timeInput string) (int64, error) {
// Layout specifying the format of the input string
// Note: Go uses a specific reference time (Mon Jan 2 15:04:05 MST 2006) to define format layouts
layout := "2006-01-02T15:04:05"
// Parse the string into a time.Time struct in local time zone
parsedTime, err := time.Parse(layout, timeInput)
if err != nil {
return 0, err
}
// Convert to UTC if not already
utcTime := parsedTime.UTC()
unixTime := utcTime.Unix()
return unixTime, nil
}

15
util/time_test.go Normal file
View File

@ -0,0 +1,15 @@
package util
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestString2UnixTime(t *testing.T) {
t.Parallel()
input := "2024-03-26T11:10:41"
unixTime, err := String2UnixTime(input)
assert.NoError(t, err)
assert.Equal(t, int64(1711451441), unixTime)
}

View File

@ -23,7 +23,15 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper)
hexProposerAddress := hex.EncodeToString(proposerAddress)
if isPopHeight(ctx, k, currentBlockHeight) {
// select PoP participants
challenger, challengee := k.SelectPopParticipants(ctx)
k.MqttMonitor.SetContext(ctx)
challenger, challengee, err := k.MqttMonitor.SelectPoPParticipantsOutOfActiveActors()
if err != nil {
util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err)
}
if err != nil || challenger == "" || challengee == "" {
challenger = ""
challengee = ""
}
// Init PoP - independent from challenger and challengee
// The keeper will send the MQTT initializing message to challenger && challengee

View File

@ -1,16 +1,13 @@
package keeper
import (
db "github.com/cometbft/cometbft-db"
"github.com/cometbft/cometbft/libs/log"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/prefix"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"
"github.com/planetmint/planetmint-go/util"
"github.com/planetmint/planetmint-go/monitor"
"github.com/planetmint/planetmint-go/x/dao/types"
)
@ -30,6 +27,7 @@ type (
machineKeeper types.MachineKeeper
authority string
RootDir string
MqttMonitor *monitor.MqttMonitor
}
)
@ -48,6 +46,7 @@ func NewKeeper(
machineKeeper types.MachineKeeper,
authority string,
rootDir string,
mqttMonitor *monitor.MqttMonitor,
) *Keeper {
// set KeyTable if it has not already been set
if !ps.HasKeyTable() {
@ -69,63 +68,10 @@ func NewKeeper(
machineKeeper: machineKeeper,
authority: authority,
RootDir: rootDir,
MqttMonitor: mqttMonitor,
}
}
func (k Keeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With("module", "x/"+types.ModuleName)
}
func (k Keeper) SelectPopParticipants(ctx sdk.Context) (challenger string, challengee string) {
var startAccountNumber uint64
lastPopHeight := ctx.BlockHeight() - k.GetParams(ctx).PopEpochs
lastPop, found := k.LookupChallenge(ctx, lastPopHeight)
if lastPopHeight > 0 && found && lastPop.Challengee != "" {
lastAccountAddr := sdk.MustAccAddressFromBech32(lastPop.Challengee)
lastAccount := k.accountKeeper.GetAccount(ctx, lastAccountAddr)
startAccountNumber = lastAccount.GetAccountNumber() + 1
}
var participants []sdk.AccAddress
k.iterateAccountsForMachines(ctx, startAccountNumber, &participants, true)
if len(participants) != 2 {
k.iterateAccountsForMachines(ctx, startAccountNumber, &participants, false)
}
// Not enough participants
if len(participants) != 2 {
return
}
challenger = participants[0].String()
challengee = participants[1].String()
return
}
func (k Keeper) iterateAccountsForMachines(ctx sdk.Context, start uint64, participants *[]sdk.AccAddress, iterateFromStart bool) {
store := ctx.KVStore(k.accountKeeperKey)
accountStore := prefix.NewStore(store, authtypes.AccountNumberStoreKeyPrefix)
var iterator db.Iterator
if iterateFromStart {
iterator = accountStore.Iterator(sdk.Uint64ToBigEndian(start), nil)
} else {
iterator = accountStore.Iterator(nil, sdk.Uint64ToBigEndian(start))
}
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
participant := sdk.AccAddress(iterator.Value())
_, found := k.machineKeeper.GetMachineIndexByAddress(ctx, participant.String())
if found {
available, err := util.GetMqttStatusOfParticipant(participant.String(), k.GetParams(ctx).MqttResponseTimeout)
if err == nil && available {
*participants = append(*participants, participant)
}
}
if len(*participants) == 2 {
return
}
}
}