mirror of
https://github.com/planetmint/planetmint-go.git
synced 2025-03-30 15:08:28 +00:00
refactor: reduce cognitive complexity (#475)
Signed-off-by: Julian Strobl <jmastr@mailbox.org>
This commit is contained in:
parent
c507a3490e
commit
b48432f541
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
@ -302,63 +303,102 @@ func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) {
|
||||
}
|
||||
|
||||
func (mms *MqttMonitor) MonitorActiveParticipants() {
|
||||
mms.clientMutex.Lock()
|
||||
if mms.localMqttClient != nil {
|
||||
Log("client is still working")
|
||||
mms.clientMutex.Unlock()
|
||||
mqttClient, err := mms.initializeClient()
|
||||
if err != nil {
|
||||
Log(err.Error())
|
||||
return
|
||||
}
|
||||
mms.localMqttClient = mms.lazyLoadMonitorMQTTClient()
|
||||
mqttClient := mms.localMqttClient
|
||||
mms.clientMutex.Unlock()
|
||||
|
||||
// Maximum reconnection attempts (adjust as needed)
|
||||
mms.SetMaxRetries()
|
||||
|
||||
for !mms.IsTerminated() && mms.maxRetries > 0 {
|
||||
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
||||
Log("error connecting to mqtt: " + token.Error().Error())
|
||||
mms.maxRetries--
|
||||
time.Sleep(time.Second * 5)
|
||||
if !mms.connectClient(mqttClient) {
|
||||
continue
|
||||
}
|
||||
mms.lostConnectionMutex.Lock()
|
||||
mms.lostConnection = false
|
||||
mms.lostConnectionMutex.Unlock()
|
||||
|
||||
Log("established connection")
|
||||
|
||||
var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler
|
||||
|
||||
// Subscribe to a topic
|
||||
subscriptionTopic := "tele/#"
|
||||
if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
|
||||
Log("error registering the mqtt subscription: " + token.Error().Error())
|
||||
if !mms.subscribeToTopic(mqttClient) {
|
||||
continue
|
||||
}
|
||||
Log("subscribed to tele/# channels")
|
||||
|
||||
for !mms.IsTerminated() {
|
||||
mms.lostConnectionMutex.Lock()
|
||||
lostConnectionEvent := mms.lostConnection
|
||||
mms.lostConnectionMutex.Unlock()
|
||||
if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent {
|
||||
Log("retry establishing a connection")
|
||||
break // Exit inner loop on disconnect
|
||||
}
|
||||
|
||||
SendUpdateMessage(mqttClient)
|
||||
mms.SetMaxRetries()
|
||||
time.Sleep(60 * time.Second) // Adjust sleep time based on your needs
|
||||
}
|
||||
mms.monitorConnection(mqttClient)
|
||||
}
|
||||
|
||||
mms.handleConnectionTermination()
|
||||
}
|
||||
|
||||
func (mms *MqttMonitor) initializeClient() (mqttClient util.MQTTClientI, err error) {
|
||||
mms.clientMutex.Lock()
|
||||
defer mms.clientMutex.Unlock()
|
||||
|
||||
if mms.localMqttClient != nil {
|
||||
return nil, errors.New("client is still working")
|
||||
}
|
||||
|
||||
mms.localMqttClient = mms.lazyLoadMonitorMQTTClient()
|
||||
return mms.localMqttClient, nil
|
||||
}
|
||||
|
||||
func (mms *MqttMonitor) connectClient(mqttClient util.MQTTClientI) bool {
|
||||
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
||||
Log("error connecting to mqtt: " + token.Error().Error())
|
||||
mms.maxRetries--
|
||||
time.Sleep(time.Second * 5)
|
||||
return false
|
||||
}
|
||||
|
||||
mms.setConnectionStatus(false)
|
||||
Log("established connection")
|
||||
return true
|
||||
}
|
||||
|
||||
func (mms *MqttMonitor) subscribeToTopic(mqttClient util.MQTTClientI) bool {
|
||||
messageHandler := mqtt.MessageHandler(mms.MqttMsgHandler)
|
||||
subscriptionTopic := "tele/#"
|
||||
|
||||
if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil {
|
||||
Log("error registering the mqtt subscription: " + token.Error().Error())
|
||||
return false
|
||||
}
|
||||
|
||||
Log("subscribed to tele/# channels")
|
||||
return true
|
||||
}
|
||||
|
||||
func (mms *MqttMonitor) monitorConnection(mqttClient util.MQTTClientI) {
|
||||
for !mms.IsTerminated() {
|
||||
if mms.isConnectionLost(mqttClient) {
|
||||
Log("retry establishing a connection")
|
||||
break
|
||||
}
|
||||
|
||||
SendUpdateMessage(mqttClient)
|
||||
mms.SetMaxRetries()
|
||||
time.Sleep(60 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (mms *MqttMonitor) isConnectionLost(mqttClient util.MQTTClientI) bool {
|
||||
mms.lostConnectionMutex.Lock()
|
||||
defer mms.lostConnectionMutex.Unlock()
|
||||
|
||||
return !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || mms.lostConnection
|
||||
}
|
||||
|
||||
func (mms *MqttMonitor) setConnectionStatus(lost bool) {
|
||||
mms.lostConnectionMutex.Lock()
|
||||
defer mms.lostConnectionMutex.Unlock()
|
||||
mms.lostConnection = lost
|
||||
}
|
||||
|
||||
func (mms *MqttMonitor) handleConnectionTermination() {
|
||||
if mms.maxRetries == 0 {
|
||||
Log("reached maximum reconnection attempts. Exiting. New client will be activated soon.")
|
||||
}
|
||||
|
||||
mms.clientMutex.Lock()
|
||||
defer mms.clientMutex.Unlock()
|
||||
mms.localMqttClient = nil
|
||||
mms.clientMutex.Unlock()
|
||||
}
|
||||
|
||||
func SendUpdateMessage(mqttClient util.MQTTClientI) {
|
||||
|
@ -2,7 +2,6 @@ package keeper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
config "github.com/planetmint/planetmint-go/config"
|
||||
@ -18,63 +17,117 @@ import (
|
||||
)
|
||||
|
||||
func (k msgServer) AttestMachine(goCtx context.Context, msg *types.MsgAttestMachine) (*types.MsgAttestMachineResponse, error) {
|
||||
if err := k.validateMachineAttestation(msg.Machine); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := k.processMachineAttestation(goCtx, msg.Machine); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &types.MsgAttestMachineResponse{}, nil
|
||||
}
|
||||
|
||||
func (k msgServer) validateMachineAttestation(machine *types.Machine) error {
|
||||
// Validate machine signature
|
||||
if err := k.validateMachineSignature(machine); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate issuer keys
|
||||
if err := k.validateIssuerKeys(machine); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate machine type
|
||||
if machine.GetType() == 0 { // 0 == RDDL_MACHINE_UNDEFINED
|
||||
return types.ErrMachineTypeUndefined
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k msgServer) validateMachineSignature(machine *types.Machine) error {
|
||||
isValidSecp256r1, errR1 := signature.ValidateSECP256R1Signature(
|
||||
machine.MachineId,
|
||||
machine.MachineIdSignature,
|
||||
machine.MachineId,
|
||||
)
|
||||
|
||||
if errR1 == nil && isValidSecp256r1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
isValidSecp256k1, errK1 := signature.ValidateSignature(
|
||||
machine.MachineId,
|
||||
machine.MachineIdSignature,
|
||||
machine.MachineId,
|
||||
)
|
||||
|
||||
if errK1 == nil && isValidSecp256k1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("invalid machine signature: %s, %s", errR1.Error(), errK1.Error())
|
||||
}
|
||||
|
||||
func (k msgServer) validateIssuerKeys(machine *types.Machine) error {
|
||||
if !validateExtendedPublicKey(machine.IssuerPlanetmint, config.PlmntNetParams) {
|
||||
return errorsmod.Wrap(types.ErrInvalidKey, "planetmint")
|
||||
}
|
||||
|
||||
if !validateExtendedPublicKey(machine.IssuerLiquid, config.LiquidNetParams) {
|
||||
return errorsmod.Wrap(types.ErrInvalidKey, "liquid")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k msgServer) processMachineAttestation(goCtx context.Context, machine *types.Machine) error {
|
||||
ctx := sdk.UnwrapSDKContext(goCtx)
|
||||
|
||||
// the ante handler verifies that the MachineID exists. Additional result checks got moved to the ante-handler
|
||||
// and removed from here due to inconsistency or checking the same thing over and over again.
|
||||
ta, _, _ := k.GetTrustAnchor(ctx, msg.Machine.MachineId)
|
||||
|
||||
isValidSecp256r1, errR1 := signature.ValidateSECP256R1Signature(msg.Machine.MachineId, msg.Machine.MachineIdSignature, msg.Machine.MachineId)
|
||||
if errR1 != nil || !isValidSecp256r1 {
|
||||
isValidSecp256k1, errK1 := signature.ValidateSignature(msg.Machine.MachineId, msg.Machine.MachineIdSignature, msg.Machine.MachineId)
|
||||
if errK1 != nil || !isValidSecp256k1 {
|
||||
errStr := ""
|
||||
if errR1 != nil {
|
||||
errStr = errR1.Error()
|
||||
}
|
||||
aggreatedErrorMessage := "Invalid machine signature: " + errStr + ", " + errK1.Error()
|
||||
return nil, errors.New(aggreatedErrorMessage)
|
||||
}
|
||||
}
|
||||
|
||||
isValidIssuerPlanetmint := validateExtendedPublicKey(msg.Machine.IssuerPlanetmint, config.PlmntNetParams)
|
||||
if !isValidIssuerPlanetmint {
|
||||
return nil, errorsmod.Wrap(types.ErrInvalidKey, "planetmint")
|
||||
}
|
||||
isValidIssuerLiquid := validateExtendedPublicKey(msg.Machine.IssuerLiquid, config.LiquidNetParams)
|
||||
if !isValidIssuerLiquid {
|
||||
return nil, errorsmod.Wrap(types.ErrInvalidKey, "liquid")
|
||||
}
|
||||
|
||||
if msg.Machine.GetType() == 0 { // 0 == RDDL_MACHINE_UNDEFINED
|
||||
return nil, types.ErrMachineTypeUndefined
|
||||
}
|
||||
params := k.GetParams(ctx)
|
||||
if util.IsValidatorBlockProposer(ctx, k.rootDir) {
|
||||
util.GetAppLogger().Info(ctx, "Issuing Machine NFT: "+msg.Machine.String())
|
||||
scheme := params.AssetRegistryScheme
|
||||
domain := params.AssetRegistryDomain
|
||||
path := params.AssetRegistryPath
|
||||
localErr := util.IssueMachineNFT(goCtx, msg.Machine, scheme, domain, path)
|
||||
if localErr != nil {
|
||||
util.GetAppLogger().Error(ctx, "Machine NFT issuance failed : "+localErr.Error())
|
||||
} else {
|
||||
util.GetAppLogger().Info(ctx, "Machine NFT issuance successful: "+msg.Machine.String())
|
||||
}
|
||||
|
||||
k.sendInitialFundingTokensToMachine(goCtx, msg.GetMachine().GetAddress(), params)
|
||||
// Process NFT issuance if validator is block proposer
|
||||
if util.IsValidatorBlockProposer(ctx, k.rootDir) {
|
||||
if err := k.handleNFTIssuance(goCtx, machine, params); err != nil {
|
||||
return err
|
||||
}
|
||||
k.sendInitialFundingTokensToMachine(goCtx, machine.GetAddress(), params)
|
||||
} else {
|
||||
util.GetAppLogger().Info(ctx, "Not block proposer: skipping Machine NFT issuance")
|
||||
}
|
||||
|
||||
k.StoreMachine(ctx, *msg.Machine)
|
||||
k.StoreMachineIndex(ctx, *msg.Machine)
|
||||
err := k.StoreTrustAnchor(ctx, ta, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Store machine data
|
||||
k.StoreMachine(ctx, *machine)
|
||||
k.StoreMachineIndex(ctx, *machine)
|
||||
|
||||
// Store trust anchor
|
||||
ta, _, _ := k.GetTrustAnchor(ctx, machine.MachineId)
|
||||
if err := k.StoreTrustAnchor(ctx, ta, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return &types.MsgAttestMachineResponse{}, err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k msgServer) handleNFTIssuance(goCtx context.Context, machine *types.Machine, params types.Params) error {
|
||||
ctx := sdk.UnwrapSDKContext(goCtx)
|
||||
logger := util.GetAppLogger()
|
||||
logger.Info(ctx, "Issuing Machine NFT: "+machine.String())
|
||||
|
||||
err := util.IssueMachineNFT(goCtx, machine,
|
||||
params.AssetRegistryScheme,
|
||||
params.AssetRegistryDomain,
|
||||
params.AssetRegistryPath,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Machine NFT issuance failed: "+err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Machine NFT issuance successful: "+machine.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k msgServer) sendInitialFundingTokensToMachine(goCtx context.Context, machineAddressString string, keeperParams types.Params) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user