diff --git a/app/rpc/manager.go b/app/rpc/manager.go index 137fd6555..e786460f3 100644 --- a/app/rpc/manager.go +++ b/app/rpc/manager.go @@ -92,7 +92,7 @@ func (m *Manager) notifyBlockAddedToDAG(block *externalapi.DomainBlock) error { if err != nil { return err } - blockAddedNotification := appmessage.NewBlockAddedNotificationMessage(rpcBlock) + blockAddedNotification := appmessage.NewBlockAddedNotificationMessage(rpcBlock,"") err = m.context.NotificationManager.NotifyBlockAdded(blockAddedNotification) if err != nil { return err @@ -141,7 +141,7 @@ func (m *Manager) notifyVirtualChange(virtualChangeSet *externalapi.VirtualChang // NotifyNewBlockTemplate notifies the manager that a new // block template is available for miners func (m *Manager) NotifyNewBlockTemplate() error { - notification := appmessage.NewNewBlockTemplateNotificationMessage() + notification := appmessage.NewNewBlockTemplateNotificationMessage("") return m.context.NotificationManager.NotifyNewBlockTemplate(notification) } @@ -166,7 +166,7 @@ func (m *Manager) NotifyFinalityConflict(violatingBlockHash string) error { onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyFinalityConflict") defer onEnd() - notification := appmessage.NewFinalityConflictNotificationMessage(violatingBlockHash) + notification := appmessage.NewFinalityConflictNotificationMessage(violatingBlockHash, rpccontext.DefaultNotificationId) return m.context.NotificationManager.NotifyFinalityConflict(notification) } @@ -175,7 +175,7 @@ func (m *Manager) NotifyFinalityConflictResolved(finalityBlockHash string) error onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyFinalityConflictResolved") defer onEnd() - notification := appmessage.NewFinalityConflictResolvedNotificationMessage(finalityBlockHash) + notification := appmessage.NewFinalityConflictResolvedNotificationMessage(finalityBlockHash, rpccontext.DefaultNotificationId) return m.context.NotificationManager.NotifyFinalityConflictResolved(notification) } @@ -207,7 +207,7 @@ func (m *Manager) notifyVirtualSelectedParentBlueScoreChanged(virtualSelectedPar onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentBlueScoreChanged") defer onEnd() - notification := appmessage.NewVirtualSelectedParentBlueScoreChangedNotificationMessage(virtualSelectedParentBlueScore) + notification := appmessage.NewVirtualSelectedParentBlueScoreChangedNotificationMessage(virtualSelectedParentBlueScore, rpccontext.DefaultNotificationId) return m.context.NotificationManager.NotifyVirtualSelectedParentBlueScoreChanged(notification) } @@ -215,7 +215,7 @@ func (m *Manager) notifyVirtualDaaScoreChanged(virtualDAAScore uint64) error { onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualDaaScoreChanged") defer onEnd() - notification := appmessage.NewVirtualDaaScoreChangedNotificationMessage(virtualDAAScore) + notification := appmessage.NewVirtualDaaScoreChangedNotificationMessage(virtualDAAScore, rpccontext.DefaultNotificationId) return m.context.NotificationManager.NotifyVirtualDaaScoreChanged(notification) } @@ -236,7 +236,7 @@ func (m *Manager) notifyVirtualSelectedParentChainChanged(virtualChangeSet *exte } notification, err := m.context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage( - virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs) + virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs, rpccontext.DefaultNotificationId) if err != nil { return err } diff --git a/app/rpc/rpccontext/chain_changed.go b/app/rpc/rpccontext/chain_changed.go index 101eb7d43..1e764f7e5 100644 --- a/app/rpc/rpccontext/chain_changed.go +++ b/app/rpc/rpccontext/chain_changed.go @@ -9,7 +9,7 @@ import ( // ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage converts // VirtualSelectedParentChainChanges to VirtualSelectedParentChainChangedNotificationMessage func (ctx *Context) ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage( - selectedParentChainChanges *externalapi.SelectedChainPath, includeAcceptedTransactionIDs bool) ( + selectedParentChainChanges *externalapi.SelectedChainPath, includeAcceptedTransactionIDs bool, id string) ( *appmessage.VirtualSelectedParentChainChangedNotificationMessage, error) { removedChainBlockHashes := make([]string, len(selectedParentChainChanges.Removed)) @@ -32,7 +32,7 @@ func (ctx *Context) ConvertVirtualSelectedParentChainChangesToChainChangedNotifi } return appmessage.NewVirtualSelectedParentChainChangedNotificationMessage( - removedChainBlockHashes, addedChainBlocks, acceptedTransactionIDs), nil + removedChainBlockHashes, addedChainBlocks, acceptedTransactionIDs, id), nil } func (ctx *Context) getAndConvertAcceptedTransactionIDs(selectedParentChainChanges *externalapi.SelectedChainPath) ( diff --git a/app/rpc/rpccontext/notificationmanager.go b/app/rpc/rpccontext/notificationmanager.go index c26ed40b9..967deeaa9 100644 --- a/app/rpc/rpccontext/notificationmanager.go +++ b/app/rpc/rpccontext/notificationmanager.go @@ -13,6 +13,8 @@ import ( "github.com/pkg/errors" ) +const DefaultNotificationId = "" //empty string corrosponds to defualt grpc string value, and hence id value when not supplied + // NotificationManager manages notifications for the RPC type NotificationManager struct { sync.RWMutex @@ -111,6 +113,9 @@ func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAd for router, listener := range nm.listeners { if listener.propagateBlockAddedNotifications { + + notification.Id = listener.propagateBlockAddedNotificationsId + err := router.OutgoingRoute().MaybeEnqueue(notification) if err != nil { return err @@ -136,6 +141,8 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentChainChanged( if listener.propagateVirtualSelectedParentChainChangedNotifications { var err error + notification.Id = listener.propagateVirtualSelectedParentChainChangedNotificationsId + if listener.includeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications { err = router.OutgoingRoute().MaybeEnqueue(notification) } else { @@ -169,6 +176,9 @@ func (nm *NotificationManager) NotifyFinalityConflict(notification *appmessage.F for router, listener := range nm.listeners { if listener.propagateFinalityConflictNotifications { + + notification.Id = listener.propagateFinalityConflictNotificationsId + err := router.OutgoingRoute().Enqueue(notification) if err != nil { return err @@ -185,6 +195,9 @@ func (nm *NotificationManager) NotifyFinalityConflictResolved(notification *appm for router, listener := range nm.listeners { if listener.propagateFinalityConflictResolvedNotifications { + + notification.Id = listener.propagateFinalityConflictResolvedNotificationsId + err := router.OutgoingRoute().Enqueue(notification) if err != nil { return err @@ -206,6 +219,8 @@ func (nm *NotificationManager) NotifyUTXOsChanged(utxoChanges *utxoindex.UTXOCha if err != nil { return err } + + notification.Id = listener.propagateUTXOsChangedNotificationsId // Don't send the notification if it's empty if len(notification.Added) == 0 && len(notification.Removed) == 0 { @@ -232,6 +247,9 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentBlueScoreChanged( for router, listener := range nm.listeners { if listener.propagateVirtualSelectedParentBlueScoreChangedNotifications { + + notification.Id = listener.propagateVirtualSelectedParentBlueScoreChangedNotificationsId + err := router.OutgoingRoute().MaybeEnqueue(notification) if err != nil { return err @@ -251,6 +269,9 @@ func (nm *NotificationManager) NotifyVirtualDaaScoreChanged( for router, listener := range nm.listeners { if listener.propagateVirtualDaaScoreChangedNotifications { + + notification.Id = listener.propagateVirtualDaaScoreChangedNotificationsId + err := router.OutgoingRoute().MaybeEnqueue(notification) if err != nil { return err @@ -270,6 +291,9 @@ func (nm *NotificationManager) NotifyNewBlockTemplate( for router, listener := range nm.listeners { if listener.propagateNewBlockTemplateNotifications { + + notification.Id = listener.propagateNewBlockTemplateNotificationsId + err := router.OutgoingRoute().Enqueue(notification) if err != nil { return err diff --git a/app/rpc/rpchandlers/get_virtual_selected_parent_chain_from_block.go b/app/rpc/rpchandlers/get_virtual_selected_parent_chain_from_block.go index aec4445c0..28e31422f 100644 --- a/app/rpc/rpchandlers/get_virtual_selected_parent_chain_from_block.go +++ b/app/rpc/rpchandlers/get_virtual_selected_parent_chain_from_block.go @@ -27,7 +27,7 @@ func HandleGetVirtualSelectedParentChainFromBlock(context *rpccontext.Context, _ } chainChangedNotification, err := context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage( - virtualSelectedParentChain, getVirtualSelectedParentChainFromBlockRequest.IncludeAcceptedTransactionIDs) + virtualSelectedParentChain, getVirtualSelectedParentChainFromBlockRequest.IncludeAcceptedTransactionIDs, rpccontext.DefaultNotificationId) if err != nil { return nil, err } diff --git a/infrastructure/network/rpcclient/rpc_on_block_added.go b/infrastructure/network/rpcclient/rpc_on_block_added.go index e82c9b248..512dd62d0 100644 --- a/infrastructure/network/rpcclient/rpc_on_block_added.go +++ b/infrastructure/network/rpcclient/rpc_on_block_added.go @@ -4,12 +4,13 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "github.com/kaspanet/kaspad/app/rpc/rpccontext" ) // RegisterForBlockAddedNotifications sends an RPC request respective to the function's name and returns the RPC server's response. // Additionally, it starts listening for the appropriate notification using the given handler function func (c *RPCClient) RegisterForBlockAddedNotifications(onBlockAdded func(notification *appmessage.BlockAddedNotificationMessage)) error { - err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyBlockAddedRequestMessage()) + err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyBlockAddedRequestMessage(rpccontext.DefaultNotificationId)) if err != nil { return err } diff --git a/infrastructure/network/rpcclient/rpc_on_chain_changed.go b/infrastructure/network/rpcclient/rpc_on_chain_changed.go index de7a5d613..fd45146e6 100644 --- a/infrastructure/network/rpcclient/rpc_on_chain_changed.go +++ b/infrastructure/network/rpcclient/rpc_on_chain_changed.go @@ -4,6 +4,7 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "github.com/kaspanet/kaspad/app/rpc/rpccontext" ) // RegisterForVirtualSelectedParentChainChangedNotifications sends an RPC request respective to the function's name and returns the RPC server's response. @@ -12,7 +13,7 @@ func (c *RPCClient) RegisterForVirtualSelectedParentChainChangedNotifications(in onChainChanged func(notification *appmessage.VirtualSelectedParentChainChangedNotificationMessage)) error { err := c.rpcRouter.outgoingRoute().Enqueue( - appmessage.NewNotifyVirtualSelectedParentChainChangedRequestMessage(includeAcceptedTransactionIDs)) + appmessage.NewNotifyVirtualSelectedParentChainChangedRequestMessage(includeAcceptedTransactionIDs, rpccontext.DefaultNotificationId)) if err != nil { return err } diff --git a/infrastructure/network/rpcclient/rpc_on_finality_conflicts.go b/infrastructure/network/rpcclient/rpc_on_finality_conflicts.go index 8b824089b..19c13e4a1 100644 --- a/infrastructure/network/rpcclient/rpc_on_finality_conflicts.go +++ b/infrastructure/network/rpcclient/rpc_on_finality_conflicts.go @@ -4,6 +4,7 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "github.com/kaspanet/kaspad/app/rpc/rpccontext" ) // RegisterForFinalityConflictsNotifications sends an RPC request respective to the function's name and returns the RPC server's response. @@ -12,7 +13,7 @@ func (c *RPCClient) RegisterForFinalityConflictsNotifications( onFinalityConflict func(notification *appmessage.FinalityConflictNotificationMessage), onFinalityConflictResolved func(notification *appmessage.FinalityConflictResolvedNotificationMessage)) error { - err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyFinalityConflictsRequestMessage()) + err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyFinalityConflictsRequestMessage(rpccontext.DefaultNotificationId)) if err != nil { return err } diff --git a/infrastructure/network/rpcclient/rpc_on_new_block_template.go b/infrastructure/network/rpcclient/rpc_on_new_block_template.go index 88e596020..57f7e9126 100644 --- a/infrastructure/network/rpcclient/rpc_on_new_block_template.go +++ b/infrastructure/network/rpcclient/rpc_on_new_block_template.go @@ -4,12 +4,13 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "github.com/kaspanet/kaspad/app/rpc/rpccontext" ) // RegisterForNewBlockTemplateNotifications sends an RPC request respective to the function's name and returns the RPC server's response. // Additionally, it starts listening for the appropriate notification using the given handler function func (c *RPCClient) RegisterForNewBlockTemplateNotifications(onNewBlockTemplate func(notification *appmessage.NewBlockTemplateNotificationMessage)) error { - err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyNewBlockTemplateRequestMessage()) + err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyNewBlockTemplateRequestMessage(rpccontext.DefaultNotificationId)) if err != nil { return err } diff --git a/infrastructure/network/rpcclient/rpc_on_pruning_point_utxo_set_override.go b/infrastructure/network/rpcclient/rpc_on_pruning_point_utxo_set_override.go index 96c775eba..aca774f18 100644 --- a/infrastructure/network/rpcclient/rpc_on_pruning_point_utxo_set_override.go +++ b/infrastructure/network/rpcclient/rpc_on_pruning_point_utxo_set_override.go @@ -4,13 +4,14 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "github.com/kaspanet/kaspad/app/rpc/rpccontext" ) // RegisterPruningPointUTXOSetNotifications sends an RPC request respective to the function's name and returns the RPC server's response. // Additionally, it starts listening for the appropriate notification using the given handler function func (c *RPCClient) RegisterPruningPointUTXOSetNotifications(onPruningPointUTXOSetNotifications func()) error { - err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyPruningPointUTXOSetOverrideRequestMessage()) + err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyPruningPointUTXOSetOverrideRequestMessage(rpccontext.DefaultNotificationId)) if err != nil { return err } @@ -42,7 +43,7 @@ func (c *RPCClient) RegisterPruningPointUTXOSetNotifications(onPruningPointUTXOS // Additionally, it stops listening for the appropriate notification using the given handler function func (c *RPCClient) UnregisterPruningPointUTXOSetNotifications() error { - err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewStopNotifyingPruningPointUTXOSetOverrideRequestMessage()) + err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewStopNotifyingPruningPointUTXOSetOverrideRequestMessage(rpccontext.DefaultNotificationId)) if err != nil { return err } diff --git a/infrastructure/network/rpcclient/rpc_on_utxos_changed.go b/infrastructure/network/rpcclient/rpc_on_utxos_changed.go index 49916c66b..313563b55 100644 --- a/infrastructure/network/rpcclient/rpc_on_utxos_changed.go +++ b/infrastructure/network/rpcclient/rpc_on_utxos_changed.go @@ -4,6 +4,7 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "github.com/kaspanet/kaspad/app/rpc/rpccontext" ) // RegisterForUTXOsChangedNotifications sends an RPC request respective to the function's name and returns the RPC server's response. @@ -11,7 +12,7 @@ import ( func (c *RPCClient) RegisterForUTXOsChangedNotifications(addresses []string, onUTXOsChanged func(notification *appmessage.UTXOsChangedNotificationMessage)) error { - err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyUTXOsChangedRequestMessage(addresses)) + err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyUTXOsChangedRequestMessage(addresses, rpccontext.DefaultNotificationId)) if err != nil { return err } diff --git a/infrastructure/network/rpcclient/rpc_on_virtual_daa_score_changed.go b/infrastructure/network/rpcclient/rpc_on_virtual_daa_score_changed.go index 8b6d044d0..8023146a0 100644 --- a/infrastructure/network/rpcclient/rpc_on_virtual_daa_score_changed.go +++ b/infrastructure/network/rpcclient/rpc_on_virtual_daa_score_changed.go @@ -4,6 +4,7 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "github.com/kaspanet/kaspad/app/rpc/rpccontext" ) // RegisterForVirtualDaaScoreChangedNotifications sends an RPC request respective to the function's @@ -12,7 +13,7 @@ import ( func (c *RPCClient) RegisterForVirtualDaaScoreChangedNotifications( onVirtualDaaScoreChanged func(notification *appmessage.VirtualDaaScoreChangedNotificationMessage)) error { - err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyVirtualDaaScoreChangedRequestMessage()) + err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyVirtualDaaScoreChangedRequestMessage(rpccontext.DefaultNotificationId)) if err != nil { return err } diff --git a/infrastructure/network/rpcclient/rpc_on_virtual_selected_parent_blue_score_changed.go b/infrastructure/network/rpcclient/rpc_on_virtual_selected_parent_blue_score_changed.go index 92d1c78fe..e5c4dad71 100644 --- a/infrastructure/network/rpcclient/rpc_on_virtual_selected_parent_blue_score_changed.go +++ b/infrastructure/network/rpcclient/rpc_on_virtual_selected_parent_blue_score_changed.go @@ -4,6 +4,7 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" + "github.com/kaspanet/kaspad/app/rpc/rpccontext" ) // RegisterForVirtualSelectedParentBlueScoreChangedNotifications sends an RPC request respective to the function's @@ -12,7 +13,7 @@ import ( func (c *RPCClient) RegisterForVirtualSelectedParentBlueScoreChangedNotifications( onVirtualSelectedParentBlueScoreChanged func(notification *appmessage.VirtualSelectedParentBlueScoreChangedNotificationMessage)) error { - err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyVirtualSelectedParentBlueScoreChangedRequestMessage()) + err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyVirtualSelectedParentBlueScoreChangedRequestMessage(rpccontext.DefaultNotificationId)) if err != nil { return err }