Verifying the availability of PoP participants (#274)

* feat: verify the availability of PoP participants

* simplified and improved logging
* added MQTT-based availability check for PoP participants
* extended MQTT mocking
* Only the block proposer will send out the MQTT messages to the pop participants
* Added a configuration value for the MQTT response timeout
* removed parallel execution of one test case
* added r/w locking to the MQTT response processing
* set MQTT timeout unit to ms

Signed-off-by: Julian Strobl <jmastr@mailbox.org>
This commit is contained in:
Jürgen Eckel 2024-01-12 10:02:09 +01:00 committed by GitHub
parent 986685d8db
commit 3ca10dfff1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 191 additions and 22 deletions

View File

@ -39,6 +39,7 @@ mqtt-domain = "{{ .PlmntConfig.MqttDomain }}"
mqtt-port = {{ .PlmntConfig.MqttPort }}
mqtt-user = "{{ .PlmntConfig.MqttUser }}"
mqtt-password = "{{ .PlmntConfig.MqttPassword }}"
mqtt-response-timeout = {{ .PlmntConfig.MqttResponseTimeout }}
`
// Config defines Planetmint's top level configuration
@ -70,6 +71,7 @@ type Config struct {
MqttPort int `json:"mqtt-port" mapstructure:"mqtt-port"`
MqttUser string `json:"mqtt-user" mapstructure:"mqtt-user"`
MqttPassword string `json:"mqtt-password" mapstructure:"mqtt-password"`
MqttResponseTimeout int `json:"mqtt-response-timeout" mapstructure:"mqtt-response-timeout"`
}
// cosmos-sdk wide global singleton
@ -110,11 +112,12 @@ func DefaultConfig() *Config {
// issuing new tokens. This configuration parameter specifies the number of epochs (each epoch is 5
// seconds) that need to pass before reissuance can occur. In this case, `ReissuanceEpochs` is set
// to 17280, which means that reissuance can occur after 1 day (12*60*24) of epochs.
ReissuanceEpochs: 17280,
MqttDomain: "testnet-mqtt.rddl.io",
MqttPort: 1885,
MqttUser: "user",
MqttPassword: "password",
ReissuanceEpochs: 17280,
MqttDomain: "testnet-mqtt.rddl.io",
MqttPort: 1885,
MqttUser: "user",
MqttPassword: "password",
MqttResponseTimeout: 2000, // the value is defined in milliseconds
}
}

View File

@ -45,6 +45,7 @@ func (s *PopSelectionE2ETestSuite) SetupSuite() {
s.T().Log("setting up e2e test suite")
conf := config.GetConfig()
conf.FeeDenom = sample.FeeDenom
conf.MqttResponseTimeout = 200
s.network = network.New(s.T(), s.cfg)

View File

@ -87,6 +87,6 @@ func SendLiquidAssetRegistration(goCtx context.Context, notarizedAsset machinety
func SendInitPoP(goCtx context.Context, proposer string, challenger string, challengee string, blockHeight int64) {
sendingValidatorAddress := config.GetConfig().ValidatorAddress
msg := daotypes.NewMsgInitPop(sendingValidatorAddress, proposer, challenger, challengee, blockHeight)
loggingContext := "init PoP"
loggingContext := "PoP"
buildSignBroadcastTx(goCtx, loggingContext, sendingValidatorAddress, msg)
}

View File

@ -1,14 +1,19 @@
package mocks
import (
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// MockMQTTClient is the mock mqtt client
type MockMQTTClient struct {
ConnectFunc func() mqtt.Token
DisconnectFunc func(quiesce uint)
PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
ConnectFunc func() mqtt.Token
DisconnectFunc func(quiesce uint)
PublishFunc func(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
SubscribeFunc func(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
UnsubscribeFunc func(topics ...string) mqtt.Token
}
// GetConnectFunc fetches the mock client's `Connect` func
@ -27,6 +32,67 @@ func GetPublishFunc(_ string, _ byte, _ bool, _ interface{}) mqtt.Token {
return &token
}
type message struct {
duplicate bool
qos byte
retained bool
topic string
messageID uint16
payload []byte
ack func()
once sync.Once
}
func (m *message) Duplicate() bool {
return m.duplicate
}
func (m *message) Qos() byte {
return m.qos
}
func (m *message) Retained() bool {
return m.retained
}
func (m *message) Topic() string {
return m.topic
}
func (m *message) MessageID() uint16 {
return m.messageID
}
func (m *message) Payload() []byte {
return m.payload
}
func (m *message) Ack() {
m.once.Do(m.ack)
}
func SendCallbackResponse(topic string, callback mqtt.MessageHandler) {
time.Sleep(100 * time.Millisecond)
var opt mqtt.ClientOptions
client := mqtt.NewClient(&opt)
var msg message
msg.topic = topic
var msgInter mqtt.Message = &msg
callback(client, msgInter)
}
func GetSubscribeFunc(topic string, _ byte, callback mqtt.MessageHandler) mqtt.Token {
go SendCallbackResponse(topic, callback)
token := mqtt.DummyToken{}
return &token
}
func GetUnsubscribeFunc(_ ...string) mqtt.Token {
token := mqtt.DummyToken{}
return &token
}
// Connect is the mock client's `Disconnect` func
func (m *MockMQTTClient) Connect() mqtt.Token {
return GetConnectFunc()
@ -41,3 +107,11 @@ func (m *MockMQTTClient) Disconnect(quiesce uint) {
func (m *MockMQTTClient) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token {
return GetPublishFunc(topic, qos, retained, payload)
}
func (m *MockMQTTClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token {
return GetSubscribeFunc(topic, qos, callback)
}
func (m *MockMQTTClient) Unsubscribe(topics ...string) mqtt.Token {
return GetUnsubscribeFunc(topics...)
}

View File

@ -4,6 +4,9 @@ import (
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
sdk "github.com/cosmos/cosmos-sdk/types"
mqtt "github.com/eclipse/paho.mqtt.golang"
@ -16,12 +19,18 @@ type MQTTClientI interface {
Connect() mqtt.Token
Disconnect(quiesce uint)
Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token
Unsubscribe(topics ...string) mqtt.Token
}
var (
MQTTClient MQTTClientI
)
const (
MqttCmdPrefix = "cmnd/"
)
func init() {
conf := config.GetConfig()
hostPort := net.JoinHostPort(conf.MqttDomain, strconv.FormatInt(int64(conf.MqttPort), 10))
@ -34,8 +43,8 @@ func init() {
MQTTClient = mqtt.NewClient(opts)
}
func SendMqttMessagesToServer(ctx sdk.Context, challenge types.Challenge) {
err := sendMqttMessages(challenge)
func SendMqttPopInitMessagesToServer(ctx sdk.Context, challenge types.Challenge) {
err := sendMqttPopInitMessages(challenge)
if err != nil {
GetAppLogger().Error(ctx, "MQTT error: "+err.Error())
return
@ -43,20 +52,21 @@ func SendMqttMessagesToServer(ctx sdk.Context, challenge types.Challenge) {
GetAppLogger().Info(ctx, "MQTT message successfully sent: "+challenge.String())
}
func sendMqttMessages(challenge types.Challenge) (err error) {
func sendMqttPopInitMessages(challenge types.Challenge) (err error) {
if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil {
err = token.Error()
return
}
blockHeight := strconv.FormatInt(challenge.GetHeight(), 10)
token := MQTTClient.Publish("cmnd/"+challenge.GetChallengee()+"/PoPInit", 0, false, blockHeight)
token := MQTTClient.Publish(MqttCmdPrefix+challenge.GetChallengee()+"/PoPInit", 0, false, blockHeight)
token.Wait()
err = token.Error()
if err != nil {
return
}
token = MQTTClient.Publish("cmnd/"+challenge.GetChallenger()+"/PoPInit", 0, false, blockHeight)
token = MQTTClient.Publish(MqttCmdPrefix+challenge.GetChallenger()+"/PoPInit", 0, false, blockHeight)
token.Wait()
err = token.Error()
if err != nil {
@ -66,3 +76,61 @@ func sendMqttMessages(challenge types.Challenge) (err error) {
MQTTClient.Disconnect(1000)
return
}
var mqttMachineByAddressAvailabilityMapping map[string]bool
var rwMu sync.RWMutex
func init() {
mqttMachineByAddressAvailabilityMapping = make(map[string]bool)
}
func GetMqttStatusOfParticipant(address string) (isAvailable bool, err error) {
if token := MQTTClient.Connect(); token.Wait() && token.Error() != nil {
err = token.Error()
return
}
rwMu.RLock() // Lock for reading
_, ok := mqttMachineByAddressAvailabilityMapping[address]
rwMu.RUnlock() // Unlock after reading
if ok {
return
}
var messageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
topicParts := strings.Split(msg.Topic(), "/")
if len(topicParts) == 3 && topicParts[1] == address {
rwMu.Lock() // Lock for writing
mqttMachineByAddressAvailabilityMapping[address] = true
rwMu.Unlock() // Unlock after writing
}
}
subscriptionTopic := "stat/" + address + "/STATUS"
publishingTopic := MqttCmdPrefix + address + "/STATUS"
// Subscribe to a topic
if token := MQTTClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
err = token.Error()
return
}
rwMu.Lock() // Lock for writing
mqttMachineByAddressAvailabilityMapping[address] = false
rwMu.Unlock() // Unlock after writing
if token := MQTTClient.Publish(publishingTopic, 0, false, ""); token.Wait() && token.Error() != nil {
err = token.Error()
} else {
duration := int64(config.GetConfig().MqttResponseTimeout)
time.Sleep(time.Millisecond * time.Duration(duration))
}
// Unsubscribe and disconnect
if token := MQTTClient.Unsubscribe(subscriptionTopic); token.Wait() && token.Error() != nil {
err = token.Error()
}
rwMu.Lock() // Lock for writing
isAvailable = mqttMachineByAddressAvailabilityMapping[address]
delete(mqttMachineByAddressAvailabilityMapping, address)
rwMu.Unlock() // Unlock after writing
MQTTClient.Disconnect(1000)
return
}

View File

@ -8,15 +8,26 @@ import (
"github.com/stretchr/testify/assert"
)
func TestSendMqttMessages(t *testing.T) {
func init() {
// Use MQTT mock client
MQTTClient = &mocks.MockMQTTClient{}
}
func TestSendMqttPopInitMessages(t *testing.T) {
t.Parallel()
var challenge types.Challenge
challenge.Initiator = ""
challenge.Challengee = "plmnt15gdanx0nm2lwsx30a6wft7429p32dhzaq37c06"
challenge.Challenger = "plmnt1683t0us0r85840nsepx6jrk2kjxw7zrcnkf0rp"
challenge.Height = 58
// Use MQTT mock client
MQTTClient = &mocks.MockMQTTClient{}
err := sendMqttMessages(challenge)
err := sendMqttPopInitMessages(challenge)
assert.NoError(t, err)
}
func TestGetMqttStatusOfParticipantMocked(t *testing.T) {
t.Parallel()
participant := "plmnt15gdanx0nm2lwsx30a6wft7429p32dhzaq37c06"
isAvailable, err := GetMqttStatusOfParticipant(participant)
assert.NoError(t, err)
assert.True(t, isAvailable)
}

View File

@ -22,14 +22,14 @@ func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper)
currentBlockHeight := req.Header.GetHeight()
hexProposerAddress := hex.EncodeToString(proposerAddress)
if isPopHeight(req.Header.GetHeight()) {
if isPopHeight(currentBlockHeight) {
// select PoP participants
challenger, challengee := k.SelectPopParticipants(ctx)
if challenger != "" && challengee != "" {
// Issue PoP
// The keeper will send MQTT the initializing message to challenger && challengee
util.SendInitPoP(ctx, hexProposerAddress, challenger, challengee, currentBlockHeight)
// TODO send MQTT message to challenger && challengee
}
}

View File

@ -196,7 +196,10 @@ func (k Keeper) iterateAccountsForMachines(ctx sdk.Context, start uint64, partic
participant := sdk.AccAddress(iterator.Value())
_, found := k.machineKeeper.GetMachineIndexByAddress(ctx, participant.String())
if found {
*participants = append(*participants, participant)
available, err := util.GetMqttStatusOfParticipant(participant.String())
if err == nil && available {
*participants = append(*participants, participant)
}
}
if len(*participants) == 2 {

View File

@ -19,7 +19,10 @@ func (k msgServer) InitPop(goCtx context.Context, msg *types.MsgInitPop) (*types
k.StoreChallenge(ctx, challenge)
go util.SendMqttMessagesToServer(ctx, challenge)
validatorIdentity, validResult := util.GetValidatorCometBFTIdentity(ctx)
if validResult && msg.Initiator == validatorIdentity {
go util.SendMqttPopInitMessagesToServer(ctx, challenge)
}
return &types.MsgInitPopResponse{}, nil
}

View File

@ -19,6 +19,12 @@ func (k msgServer) ReportPopResult(goCtx context.Context, msg *types.MsgReportPo
return nil, errorsmod.Wrapf(types.ErrInvalidChallenge, err.Error())
}
if msg.Challenge.GetSuccess() {
util.GetAppLogger().Info(ctx, "PoP at height %v was successful", msg.Challenge.GetHeight())
} else {
util.GetAppLogger().Info(ctx, "PoP at height %v was unsuccessful", msg.Challenge.GetHeight())
}
// TODO: develop a more resilient pattern: if the distribution does not work,
// the challenge shouldn't be discarded. it's most likely not the fault of the PoP participants.
err = k.issuePoPRewards(ctx, *msg.Challenge)