planetmint-go/x/dao/abci.go
Jürgen Eckel 779b1edd48
Eckelj/mqtt monitoring (#359)
* added a MqttMonitor module with levelDB and periodic cleanup
* initialized in the app
* passed to dao keeper
* added conversion methods (string2unixtime, byte ToJSON)
* removed obsolete keeper code
* maded  RDDLToken.Factor public
* added explicit mqtt client to the monitor module
* restart mqtt connection in mqttmonitor on connection loss
* adjusted mqttmock structure to be compatible
* added some linter exclusions to let the monitor tool pass
* created a MockMqttMonitor interface and mock object
* used this to pass tests
* made the MockMqttMonitor a global object so that it can be easily mocked
* removed MockMqttMonitor from the app/keeper initialization
* adjusted test cases to register "active machines" to the mqttmonitor
* added mutex in mocks to protect against data races
* defined mocks for the dao tests
* clear separation between interface and mqtt-Monitor

* added another waiting block to ensure the tx went through (multi-threading issue, race condition) during tests this failed sometimes
* added memstorage to test instead of a file based DB


Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
2024-04-08 10:49:00 +02:00

83 lines
2.8 KiB
Go

package dao
import (
"encoding/hex"
"github.com/planetmint/planetmint-go/monitor"
"github.com/planetmint/planetmint-go/util"
"github.com/planetmint/planetmint-go/x/dao/keeper"
abci "github.com/cometbft/cometbft/abci/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, k keeper.Keeper) {
proposerAddress := req.Header.GetProposerAddress()
// Check if node is block proposer
// take the following actions only once, that's why we filter for the Block Proposer
if !util.IsValidatorBlockProposer(ctx, proposerAddress, k.RootDir) {
return
}
currentBlockHeight := req.Header.GetHeight()
hexProposerAddress := hex.EncodeToString(proposerAddress)
if isPopHeight(ctx, k, currentBlockHeight) {
// select PoP participants
monitor.SetContext(ctx)
challenger, challengee, err := monitor.SelectPoPParticipantsOutOfActiveActors()
if err != nil {
util.GetAppLogger().Error(ctx, "error during PoP Participant selection ", err)
}
if err != nil || challenger == "" || challengee == "" {
challenger = ""
challengee = ""
}
// Init PoP - independent from challenger and challengee
// The keeper will send the MQTT initializing message to challenger && challengee
util.SendInitPoP(ctx, hexProposerAddress, challenger, challengee, currentBlockHeight)
}
if isReissuanceHeight(ctx, k, currentBlockHeight) {
reissuance, err := k.CreateNextReissuanceObject(ctx, currentBlockHeight)
if err == nil {
util.SendInitReissuance(ctx, hexProposerAddress, reissuance.GetCommand(), currentBlockHeight,
reissuance.GetFirstIncludedPop(), reissuance.GetLastIncludedPop())
} else {
util.GetAppLogger().Error(ctx, "error while computing the RDDL reissuance ", err)
}
}
if isDistributionHeight(ctx, k, currentBlockHeight) {
distribution, err := k.GetDistributionForReissuedTokens(ctx, currentBlockHeight)
if err != nil {
util.GetAppLogger().Error(ctx, "error while computing the RDDL distribution ", err)
}
distribution.Proposer = hexProposerAddress
util.SendDistributionRequest(ctx, distribution)
}
}
func isPopHeight(ctx sdk.Context, k keeper.Keeper, height int64) bool {
return height%k.GetParams(ctx).PopEpochs == 0
}
func isReissuanceHeight(ctx sdk.Context, k keeper.Keeper, height int64) bool {
// e.g. 483840 % 17280 = 0
return height%k.GetParams(ctx).ReissuanceEpochs == 0
}
func isDistributionHeight(ctx sdk.Context, k keeper.Keeper, height int64) bool {
// e.g. 360 % 17280 = 360
if height <= k.GetParams(ctx).ReissuanceEpochs {
return false
}
// e.g. 484200 % 17280 = 360
return height%k.GetParams(ctx).ReissuanceEpochs == k.GetParams(ctx).DistributionOffset
}
func EndBlocker(_ sdk.Context, _ abci.RequestEndBlock, _ keeper.Keeper) {
// EndBlocker is currently not implemented and used by planetmint
}