diff --git a/config/config.go b/config/config.go index 62805c3..a552850 100644 --- a/config/config.go +++ b/config/config.go @@ -39,6 +39,7 @@ mqtt-domain = "{{ .PlmntConfig.MqttDomain }}" mqtt-port = {{ .PlmntConfig.MqttPort }} mqtt-user = "{{ .PlmntConfig.MqttUser }}" mqtt-password = "{{ .PlmntConfig.MqttPassword }}" +mqtt-response-timeout = {{ .PlmntConfig.MqttResponseTimeout }} ` // Config defines Planetmint's top level configuration @@ -70,6 +71,7 @@ type Config struct { MqttPort int `json:"mqtt-port" mapstructure:"mqtt-port"` MqttUser string `json:"mqtt-user" mapstructure:"mqtt-user"` MqttPassword string `json:"mqtt-password" mapstructure:"mqtt-password"` + MqttResponseTimeout int `json:"mqtt-response-timeout" mapstructure:"mqtt-response-timeout"` } // cosmos-sdk wide global singleton @@ -110,11 +112,12 @@ func DefaultConfig() *Config { // issuing new tokens. This configuration parameter specifies the number of epochs (each epoch is 5 // seconds) that need to pass before reissuance can occur. In this case, `ReissuanceEpochs` is set // to 17280, which means that reissuance can occur after 1 day (12*60*24) of epochs. - ReissuanceEpochs: 17280, - MqttDomain: "testnet-mqtt.rddl.io", - MqttPort: 1885, - MqttUser: "user", - MqttPassword: "password", + ReissuanceEpochs: 17280, + MqttDomain: "testnet-mqtt.rddl.io", + MqttPort: 1885, + MqttUser: "user", + MqttPassword: "password", + MqttResponseTimeout: 2000, // the value is defined in milliseconds } } diff --git a/tests/e2e/dao/pop_participant_selection_suite.go b/tests/e2e/dao/pop_participant_selection_suite.go index a6d1cc0..7099369 100644 --- a/tests/e2e/dao/pop_participant_selection_suite.go +++ b/tests/e2e/dao/pop_participant_selection_suite.go @@ -45,6 +45,7 @@ func (s *PopSelectionE2ETestSuite) SetupSuite() { s.T().Log("setting up e2e test suite") conf := config.GetConfig() conf.FeeDenom = sample.FeeDenom + conf.MqttResponseTimeout = 200 s.network = network.New(s.T(), s.cfg) diff --git a/util/issue_commands.go b/util/issue_commands.go index cfc782c..50219f0 100644 --- a/util/issue_commands.go +++ b/util/issue_commands.go @@ -87,6 +87,6 @@ func SendLiquidAssetRegistration(goCtx context.Context, notarizedAsset machinety func SendInitPoP(goCtx context.Context, proposer string, challenger string, challengee string, blockHeight int64) { sendingValidatorAddress := config.GetConfig().ValidatorAddress msg := daotypes.NewMsgInitPop(sendingValidatorAddress, proposer, challenger, challengee, blockHeight) - loggingContext := "init PoP" + loggingContext := "PoP" buildSignBroadcastTx(goCtx, loggingContext, sendingValidatorAddress, msg) } diff --git a/util/mocks/mqtt.go b/util/mocks/mqtt.go index bcf4560..a4ae06e 100644 --- a/util/mocks/mqtt.go +++ b/util/mocks/mqtt.go @@ -1,14 +1,19 @@ package mocks import ( + "sync" + "time" + mqtt "github.com/eclipse/paho.mqtt.golang" ) // MockMQTTClient is the mock mqtt client type MockMQTTClient struct { - ConnectFunc func() mqtt.Token - DisconnectFunc func(quiesce uint) - PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token + ConnectFunc func() mqtt.Token + DisconnectFunc func(quiesce uint) + PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token + SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token + UnsubscribeFunc func(topics ...string) mqtt.Token } // GetConnectFunc fetches the mock client's `Connect` func @@ -27,6 +32,67 @@ func GetPublishFunc(_ string, _ byte, _ bool, _ interface{}) mqtt.Token { return &token } +type message struct { + duplicate bool + qos byte + retained bool + topic string + messageID uint16 + payload []byte + ack func() + once sync.Once +} + +func (m *message) Duplicate() bool { + return m.duplicate +} + +func (m *message) Qos() byte { + return m.qos +} + +func (m *message) Retained() bool { + return m.retained +} + +func (m *message) Topic() string { + return m.topic +} + +func (m *message) MessageID() uint16 { + return m.messageID +} + +func (m *message) Payload() []byte { + return m.payload +} + +func (m *message) Ack() { + m.once.Do(m.ack) +} + +func SendCallbackResponse(topic string, callback mqtt.MessageHandler) { + time.Sleep(100 * time.Millisecond) + var opt mqtt.ClientOptions + client := mqtt.NewClient(&opt) + var msg message + msg.topic = topic + var msgInter mqtt.Message = &msg + + callback(client, msgInter) +} + +func GetSubscribeFunc(topic string, _ byte, callback mqtt.MessageHandler) mqtt.Token { + go SendCallbackResponse(topic, callback) + token := mqtt.DummyToken{} + return &token +} + +func GetUnsubscribeFunc(_ ...string) mqtt.Token { + token := mqtt.DummyToken{} + return &token +} + // Connect is the mock client's `Disconnect` func func (m *MockMQTTClient) Connect() mqtt.Token { return GetConnectFunc() @@ -41,3 +107,11 @@ func (m *MockMQTTClient) Disconnect(quiesce uint) { func (m *MockMQTTClient) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token { return GetPublishFunc(topic, qos, retained, payload) } + +func (m *MockMQTTClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token { + return GetSubscribeFunc(topic, qos, callback) +} + +func (m *MockMQTTClient) Unsubscribe(topics ...string) mqtt.Token { + return GetUnsubscribeFunc(topics...) +} diff --git a/util/mqtt.go b/util/mqtt.go index efc9dcf..40606e4 100644 --- a/util/mqtt.go +++ b/util/mqtt.go @@ -4,6 +4,9 @@ import ( "fmt" "net" "strconv" + "strings" + "sync" + "time" sdk "github.com/cosmos/cosmos-sdk/types" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -16,12 +19,18 @@ type MQTTClientI interface { Connect() mqtt.Token Disconnect(quiesce uint) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token + Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token + Unsubscribe(topics ...string) mqtt.Token } var ( MQTTClient MQTTClientI ) +const ( + MqttCmdPrefix = "cmnd/" +) + func init() { conf := config.GetConfig() hostPort := net.JoinHostPort(conf.MqttDomain, strconv.FormatInt(int64(conf.MqttPort), 10)) @@ -34,8 +43,8 @@ func init() { MQTTClient = mqtt.NewClient(opts) } -func SendMqttMessagesToServer(ctx sdk.Context, challenge types.Challenge) { - err := sendMqttMessages(challenge) +func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) { + err := sendMqttPopInitMessages(challenge) if err != nil { GetAppLogger().Error(ctx, "MQTT error: "+err.Error()) return @@ -43,20 +52,21 @@ func SendMqttMessagesToServer(ctx sdk.Context, challenge types.Challenge) { GetAppLogger().Info(ctx, "MQTT message successfully sent: "+challenge.String()) } -func sendMqttMessages(challenge types.Challenge) (err error) { +func sendMqttPopInitMessages(challenge types.Challenge) (err error) { if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil { err = token.Error() return } + blockHeight := strconv.FormatInt(challenge.GetHeight(), 10) - token := MQTTClient.Publish("cmnd/"+challenge.GetChallengee()+"/PoPInit", 0, false, blockHeight) + token := MQTTClient.Publish(MqttCmdPrefix+challenge.GetChallengee()+"/PoPInit", 0, false, blockHeight) token.Wait() err = token.Error() if err != nil { return } - token = MQTTClient.Publish("cmnd/"+challenge.GetChallenger()+"/PoPInit", 0, false, blockHeight) + token = MQTTClient.Publish(MqttCmdPrefix+challenge.GetChallenger()+"/PoPInit", 0, false, blockHeight) token.Wait() err = token.Error() if err != nil { @@ -66,3 +76,61 @@ func sendMqttMessages(challenge types.Challenge) (err error) { MQTTClient.Disconnect(1000) return } + +var mqttMachineByAddressAvailabilityMapping map[string]bool +var rwMu sync.RWMutex + +func init() { + mqttMachineByAddressAvailabilityMapping = make(map[string]bool) +} + +func GetMqttStatusOfParticipant(address string) (isAvailable bool, err error) { + if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil { + err = token.Error() + return + } + rwMu.RLock() // Lock for reading + _, ok := mqttMachineByAddressAvailabilityMapping[address] + rwMu.RUnlock() // Unlock after reading + if ok { + return + } + var messageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { + topicParts := strings.Split(msg.Topic(), "/") + if len(topicParts) == 3 && topicParts[1] == address { + rwMu.Lock() // Lock for writing + mqttMachineByAddressAvailabilityMapping[address] = true + rwMu.Unlock() // Unlock after writing + } + } + subscriptionTopic := "stat/" + address + "/STATUS" + publishingTopic := MqttCmdPrefix + address + "/STATUS" + // Subscribe to a topic + if token := MQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + err = token.Error() + return + } + + rwMu.Lock() // Lock for writing + mqttMachineByAddressAvailabilityMapping[address] = false + rwMu.Unlock() // Unlock after writing + + if token := MQTTClient.Publish(publishingTopic, 0, false, ""); token.Wait() && token.Error() != nil { + err = token.Error() + } else { + duration := int64(config.GetConfig().MqttResponseTimeout) + time.Sleep(time.Millisecond * time.Duration(duration)) + } + + // Unsubscribe and disconnect + if token := MQTTClient.Unsubscribe(subscriptionTopic); token.Wait() && token.Error() != nil { + err = token.Error() + } + + rwMu.Lock() // Lock for writing + isAvailable = mqttMachineByAddressAvailabilityMapping[address] + delete(mqttMachineByAddressAvailabilityMapping, address) + rwMu.Unlock() // Unlock after writing + MQTTClient.Disconnect(1000) + return +} diff --git a/util/mqtt_test.go b/util/mqtt_test.go index cbe6044..23e1fd6 100644 --- a/util/mqtt_test.go +++ b/util/mqtt_test.go @@ -8,15 +8,26 @@ import ( "github.com/stretchr/testify/assert" ) -func TestSendMqttMessages(t *testing.T) { +func init() { + // Use MQTT mock client + MQTTClient = &mocks.MockMQTTClient{} +} + +func TestSendMqttPopInitMessages(t *testing.T) { t.Parallel() var challenge types.Challenge challenge.Initiator = "" challenge.Challengee = "plmnt15gdanx0nm2lwsx30a6wft7429p32dhzaq37c06" challenge.Challenger = "plmnt1683t0us0r85840nsepx6jrk2kjxw7zrcnkf0rp" challenge.Height = 58 - // Use MQTT mock client - MQTTClient = &mocks.MockMQTTClient{} - err := sendMqttMessages(challenge) + err := sendMqttPopInitMessages(challenge) assert.NoError(t, err) } + +func TestGetMqttStatusOfParticipantMocked(t *testing.T) { + t.Parallel() + participant := "plmnt15gdanx0nm2lwsx30a6wft7429p32dhzaq37c06" + isAvailable, err := GetMqttStatusOfParticipant(participant) + assert.NoError(t, err) + assert.True(t, isAvailable) +} diff --git a/x/dao/abci.go b/x/dao/abci.go index 7e0f574..b15c342 100644 --- a/x/dao/abci.go +++ b/x/dao/abci.go @@ -22,14 +22,14 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper) currentBlockHeight := req.Header.GetHeight() hexProposerAddress := hex.EncodeToString(proposerAddress) - if isPopHeight(req.Header.GetHeight()) { + if isPopHeight(currentBlockHeight) { // select PoP participants challenger, challengee := k.SelectPopParticipants(ctx) if challenger != "" && challengee != "" { // Issue PoP + // The keeper will send MQTT the initializing message to challenger && challengee util.SendInitPoP(ctx, hexProposerAddress, challenger, challengee, currentBlockHeight) - // TODO send MQTT message to challenger && challengee } } diff --git a/x/dao/keeper/keeper.go b/x/dao/keeper/keeper.go index 43daef8..a6dc7a4 100644 --- a/x/dao/keeper/keeper.go +++ b/x/dao/keeper/keeper.go @@ -196,7 +196,10 @@ func (k Keeper) iterateAccountsForMachines(ctx sdk.Context, start uint64, partic participant := sdk.AccAddress(iterator.Value()) _, found := k.machineKeeper.GetMachineIndexByAddress(ctx, participant.String()) if found { - *participants = append(*participants, participant) + available, err := util.GetMqttStatusOfParticipant(participant.String()) + if err == nil && available { + *participants = append(*participants, participant) + } } if len(*participants) == 2 { diff --git a/x/dao/keeper/msg_server_init_pop.go b/x/dao/keeper/msg_server_init_pop.go index 2c4e603..afa8da6 100644 --- a/x/dao/keeper/msg_server_init_pop.go +++ b/x/dao/keeper/msg_server_init_pop.go @@ -19,7 +19,10 @@ func (k msgServer) InitPop(goCtx context.Context, msg *types.MsgInitPop) (*types k.StoreChallenge(ctx, challenge) - go util.SendMqttMessagesToServer(ctx, challenge) + validatorIdentity, validResult := util.GetValidatorCometBFTIdentity(ctx) + if validResult && msg.Initiator == validatorIdentity { + go util.SendMqttPopInitMessagesToServer(ctx, challenge) + } return &types.MsgInitPopResponse{}, nil } diff --git a/x/dao/keeper/msg_server_report_pop_result.go b/x/dao/keeper/msg_server_report_pop_result.go index a337ce7..e0da7c7 100644 --- a/x/dao/keeper/msg_server_report_pop_result.go +++ b/x/dao/keeper/msg_server_report_pop_result.go @@ -19,6 +19,12 @@ func (k msgServer) ReportPopResult(goCtx context.Context, msg *types.MsgReportPo return nil, errorsmod.Wrapf(types.ErrInvalidChallenge, err.Error()) } + if msg.Challenge.GetSuccess() { + util.GetAppLogger().Info(ctx, "PoP at height %v was successful", msg.Challenge.GetHeight()) + } else { + util.GetAppLogger().Info(ctx, "PoP at height %v was unsuccessful", msg.Challenge.GetHeight()) + } + // TODO: develop a more resilient pattern: if the distribution does not work, // the challenge shouldn't be discarded. it's most likely not the fault of the PoP participants. err = k.issuePoPRewards(ctx, *msg.Challenge)