From b48432f5414bdc9dfdb2866f68cf3a2c940b22fb Mon Sep 17 00:00:00 2001 From: Julian Strobl Date: Mon, 18 Nov 2024 10:14:11 +0100 Subject: [PATCH] refactor: reduce cognitive complexity (#475) Signed-off-by: Julian Strobl --- monitor/mqtt_monitor.go | 114 ++++++++----- x/machine/keeper/msg_server_attest_machine.go | 151 ++++++++++++------ 2 files changed, 179 insertions(+), 86 deletions(-) diff --git a/monitor/mqtt_monitor.go b/monitor/mqtt_monitor.go index 7ed6962..3234b81 100644 --- a/monitor/mqtt_monitor.go +++ b/monitor/mqtt_monitor.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "io" "math/rand" "net" @@ -302,63 +303,102 @@ func (mms *MqttMonitor) onConnectionLost(_ mqtt.Client, err error) { } func (mms *MqttMonitor) MonitorActiveParticipants() { - mms.clientMutex.Lock() - if mms.localMqttClient != nil { - Log("client is still working") - mms.clientMutex.Unlock() + mqttClient, err := mms.initializeClient() + if err != nil { + Log(err.Error()) return } - mms.localMqttClient = mms.lazyLoadMonitorMQTTClient() - mqttClient := mms.localMqttClient - mms.clientMutex.Unlock() // Maximum reconnection attempts (adjust as needed) mms.SetMaxRetries() + for !mms.IsTerminated() && mms.maxRetries > 0 { - if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { - Log("error connecting to mqtt: " + token.Error().Error()) - mms.maxRetries-- - time.Sleep(time.Second * 5) + if !mms.connectClient(mqttClient) { continue } - mms.lostConnectionMutex.Lock() - mms.lostConnection = false - mms.lostConnectionMutex.Unlock() - Log("established connection") - - var messageHandler mqtt.MessageHandler = mms.MqttMsgHandler - - // Subscribe to a topic - subscriptionTopic := "tele/#" - if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { - Log("error registering the mqtt subscription: " + token.Error().Error()) + if !mms.subscribeToTopic(mqttClient) { continue } - Log("subscribed to tele/# channels") - for !mms.IsTerminated() { - mms.lostConnectionMutex.Lock() - lostConnectionEvent := mms.lostConnection - mms.lostConnectionMutex.Unlock() - if !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || lostConnectionEvent { - Log("retry establishing a connection") - break // Exit inner loop on disconnect - } - - SendUpdateMessage(mqttClient) - mms.SetMaxRetries() - time.Sleep(60 * time.Second) // Adjust sleep time based on your needs - } + mms.monitorConnection(mqttClient) } + mms.handleConnectionTermination() +} + +func (mms *MqttMonitor) initializeClient() (mqttClient util.MQTTClientI, err error) { + mms.clientMutex.Lock() + defer mms.clientMutex.Unlock() + + if mms.localMqttClient != nil { + return nil, errors.New("client is still working") + } + + mms.localMqttClient = mms.lazyLoadMonitorMQTTClient() + return mms.localMqttClient, nil +} + +func (mms *MqttMonitor) connectClient(mqttClient util.MQTTClientI) bool { + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + Log("error connecting to mqtt: " + token.Error().Error()) + mms.maxRetries-- + time.Sleep(time.Second * 5) + return false + } + + mms.setConnectionStatus(false) + Log("established connection") + return true +} + +func (mms *MqttMonitor) subscribeToTopic(mqttClient util.MQTTClientI) bool { + messageHandler := mqtt.MessageHandler(mms.MqttMsgHandler) + subscriptionTopic := "tele/#" + + if token := mqttClient.Subscribe(subscriptionTopic, 0, messageHandler); token.Wait() && token.Error() != nil { + Log("error registering the mqtt subscription: " + token.Error().Error()) + return false + } + + Log("subscribed to tele/# channels") + return true +} + +func (mms *MqttMonitor) monitorConnection(mqttClient util.MQTTClientI) { + for !mms.IsTerminated() { + if mms.isConnectionLost(mqttClient) { + Log("retry establishing a connection") + break + } + + SendUpdateMessage(mqttClient) + mms.SetMaxRetries() + time.Sleep(60 * time.Second) + } +} + +func (mms *MqttMonitor) isConnectionLost(mqttClient util.MQTTClientI) bool { + mms.lostConnectionMutex.Lock() + defer mms.lostConnectionMutex.Unlock() + + return !mqttClient.IsConnected() || !mqttClient.IsConnectionOpen() || mms.lostConnection +} + +func (mms *MqttMonitor) setConnectionStatus(lost bool) { + mms.lostConnectionMutex.Lock() + defer mms.lostConnectionMutex.Unlock() + mms.lostConnection = lost +} + +func (mms *MqttMonitor) handleConnectionTermination() { if mms.maxRetries == 0 { Log("reached maximum reconnection attempts. Exiting. New client will be activated soon.") } mms.clientMutex.Lock() + defer mms.clientMutex.Unlock() mms.localMqttClient = nil - mms.clientMutex.Unlock() } func SendUpdateMessage(mqttClient util.MQTTClientI) { diff --git a/x/machine/keeper/msg_server_attest_machine.go b/x/machine/keeper/msg_server_attest_machine.go index e0de4df..f8d1a5b 100644 --- a/x/machine/keeper/msg_server_attest_machine.go +++ b/x/machine/keeper/msg_server_attest_machine.go @@ -2,7 +2,6 @@ package keeper import ( "context" - "errors" "fmt" config "github.com/planetmint/planetmint-go/config" @@ -18,63 +17,117 @@ import ( ) func (k msgServer) AttestMachine(goCtx context.Context, msg *types.MsgAttestMachine) (*types.MsgAttestMachineResponse, error) { + if err := k.validateMachineAttestation(msg.Machine); err != nil { + return nil, err + } + + if err := k.processMachineAttestation(goCtx, msg.Machine); err != nil { + return nil, err + } + + return &types.MsgAttestMachineResponse{}, nil +} + +func (k msgServer) validateMachineAttestation(machine *types.Machine) error { + // Validate machine signature + if err := k.validateMachineSignature(machine); err != nil { + return err + } + + // Validate issuer keys + if err := k.validateIssuerKeys(machine); err != nil { + return err + } + + // Validate machine type + if machine.GetType() == 0 { // 0 == RDDL_MACHINE_UNDEFINED + return types.ErrMachineTypeUndefined + } + + return nil +} + +func (k msgServer) validateMachineSignature(machine *types.Machine) error { + isValidSecp256r1, errR1 := signature.ValidateSECP256R1Signature( + machine.MachineId, + machine.MachineIdSignature, + machine.MachineId, + ) + + if errR1 == nil && isValidSecp256r1 { + return nil + } + + isValidSecp256k1, errK1 := signature.ValidateSignature( + machine.MachineId, + machine.MachineIdSignature, + machine.MachineId, + ) + + if errK1 == nil && isValidSecp256k1 { + return nil + } + + return fmt.Errorf("invalid machine signature: %s, %s", errR1.Error(), errK1.Error()) +} + +func (k msgServer) validateIssuerKeys(machine *types.Machine) error { + if !validateExtendedPublicKey(machine.IssuerPlanetmint, config.PlmntNetParams) { + return errorsmod.Wrap(types.ErrInvalidKey, "planetmint") + } + + if !validateExtendedPublicKey(machine.IssuerLiquid, config.LiquidNetParams) { + return errorsmod.Wrap(types.ErrInvalidKey, "liquid") + } + + return nil +} + +func (k msgServer) processMachineAttestation(goCtx context.Context, machine *types.Machine) error { ctx := sdk.UnwrapSDKContext(goCtx) - - // the ante handler verifies that the MachineID exists. Additional result checks got moved to the ante-handler - // and removed from here due to inconsistency or checking the same thing over and over again. - ta, _, _ := k.GetTrustAnchor(ctx, msg.Machine.MachineId) - - isValidSecp256r1, errR1 := signature.ValidateSECP256R1Signature(msg.Machine.MachineId, msg.Machine.MachineIdSignature, msg.Machine.MachineId) - if errR1 != nil || !isValidSecp256r1 { - isValidSecp256k1, errK1 := signature.ValidateSignature(msg.Machine.MachineId, msg.Machine.MachineIdSignature, msg.Machine.MachineId) - if errK1 != nil || !isValidSecp256k1 { - errStr := "" - if errR1 != nil { - errStr = errR1.Error() - } - aggreatedErrorMessage := "Invalid machine signature: " + errStr + ", " + errK1.Error() - return nil, errors.New(aggreatedErrorMessage) - } - } - - isValidIssuerPlanetmint := validateExtendedPublicKey(msg.Machine.IssuerPlanetmint, config.PlmntNetParams) - if !isValidIssuerPlanetmint { - return nil, errorsmod.Wrap(types.ErrInvalidKey, "planetmint") - } - isValidIssuerLiquid := validateExtendedPublicKey(msg.Machine.IssuerLiquid, config.LiquidNetParams) - if !isValidIssuerLiquid { - return nil, errorsmod.Wrap(types.ErrInvalidKey, "liquid") - } - - if msg.Machine.GetType() == 0 { // 0 == RDDL_MACHINE_UNDEFINED - return nil, types.ErrMachineTypeUndefined - } params := k.GetParams(ctx) - if util.IsValidatorBlockProposer(ctx, k.rootDir) { - util.GetAppLogger().Info(ctx, "Issuing Machine NFT: "+msg.Machine.String()) - scheme := params.AssetRegistryScheme - domain := params.AssetRegistryDomain - path := params.AssetRegistryPath - localErr := util.IssueMachineNFT(goCtx, msg.Machine, scheme, domain, path) - if localErr != nil { - util.GetAppLogger().Error(ctx, "Machine NFT issuance failed : "+localErr.Error()) - } else { - util.GetAppLogger().Info(ctx, "Machine NFT issuance successful: "+msg.Machine.String()) - } - k.sendInitialFundingTokensToMachine(goCtx, msg.GetMachine().GetAddress(), params) + // Process NFT issuance if validator is block proposer + if util.IsValidatorBlockProposer(ctx, k.rootDir) { + if err := k.handleNFTIssuance(goCtx, machine, params); err != nil { + return err + } + k.sendInitialFundingTokensToMachine(goCtx, machine.GetAddress(), params) } else { util.GetAppLogger().Info(ctx, "Not block proposer: skipping Machine NFT issuance") } - k.StoreMachine(ctx, *msg.Machine) - k.StoreMachineIndex(ctx, *msg.Machine) - err := k.StoreTrustAnchor(ctx, ta, true) - if err != nil { - return nil, err + // Store machine data + k.StoreMachine(ctx, *machine) + k.StoreMachineIndex(ctx, *machine) + + // Store trust anchor + ta, _, _ := k.GetTrustAnchor(ctx, machine.MachineId) + if err := k.StoreTrustAnchor(ctx, ta, true); err != nil { + return err } - return &types.MsgAttestMachineResponse{}, err + return nil +} + +func (k msgServer) handleNFTIssuance(goCtx context.Context, machine *types.Machine, params types.Params) error { + ctx := sdk.UnwrapSDKContext(goCtx) + logger := util.GetAppLogger() + logger.Info(ctx, "Issuing Machine NFT: "+machine.String()) + + err := util.IssueMachineNFT(goCtx, machine, + params.AssetRegistryScheme, + params.AssetRegistryDomain, + params.AssetRegistryPath, + ) + + if err != nil { + logger.Error(ctx, "Machine NFT issuance failed: "+err.Error()) + return err + } + + logger.Info(ctx, "Machine NFT issuance successful: "+machine.String()) + return nil } func (k msgServer) sendInitialFundingTokensToMachine(goCtx context.Context, machineAddressString string, keeperParams types.Params) {