handle ids in client & manager, populate in notificationmanager

This commit is contained in:
D-Stacks 2022-06-16 09:06:42 +02:00
parent fe927bf46e
commit 1dc8bbf738
12 changed files with 51 additions and 19 deletions

View File

@ -92,7 +92,7 @@ func (m *Manager) notifyBlockAddedToDAG(block *externalapi.DomainBlock) error {
if err != nil {
return err
}
blockAddedNotification := appmessage.NewBlockAddedNotificationMessage(rpcBlock)
blockAddedNotification := appmessage.NewBlockAddedNotificationMessage(rpcBlock,"")
err = m.context.NotificationManager.NotifyBlockAdded(blockAddedNotification)
if err != nil {
return err
@ -141,7 +141,7 @@ func (m *Manager) notifyVirtualChange(virtualChangeSet *externalapi.VirtualChang
// NotifyNewBlockTemplate notifies the manager that a new
// block template is available for miners
func (m *Manager) NotifyNewBlockTemplate() error {
notification := appmessage.NewNewBlockTemplateNotificationMessage()
notification := appmessage.NewNewBlockTemplateNotificationMessage("")
return m.context.NotificationManager.NotifyNewBlockTemplate(notification)
}
@ -166,7 +166,7 @@ func (m *Manager) NotifyFinalityConflict(violatingBlockHash string) error {
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyFinalityConflict")
defer onEnd()
notification := appmessage.NewFinalityConflictNotificationMessage(violatingBlockHash)
notification := appmessage.NewFinalityConflictNotificationMessage(violatingBlockHash, rpccontext.DefaultNotificationId)
return m.context.NotificationManager.NotifyFinalityConflict(notification)
}
@ -175,7 +175,7 @@ func (m *Manager) NotifyFinalityConflictResolved(finalityBlockHash string) error
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyFinalityConflictResolved")
defer onEnd()
notification := appmessage.NewFinalityConflictResolvedNotificationMessage(finalityBlockHash)
notification := appmessage.NewFinalityConflictResolvedNotificationMessage(finalityBlockHash, rpccontext.DefaultNotificationId)
return m.context.NotificationManager.NotifyFinalityConflictResolved(notification)
}
@ -207,7 +207,7 @@ func (m *Manager) notifyVirtualSelectedParentBlueScoreChanged(virtualSelectedPar
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentBlueScoreChanged")
defer onEnd()
notification := appmessage.NewVirtualSelectedParentBlueScoreChangedNotificationMessage(virtualSelectedParentBlueScore)
notification := appmessage.NewVirtualSelectedParentBlueScoreChangedNotificationMessage(virtualSelectedParentBlueScore, rpccontext.DefaultNotificationId)
return m.context.NotificationManager.NotifyVirtualSelectedParentBlueScoreChanged(notification)
}
@ -215,7 +215,7 @@ func (m *Manager) notifyVirtualDaaScoreChanged(virtualDAAScore uint64) error {
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualDaaScoreChanged")
defer onEnd()
notification := appmessage.NewVirtualDaaScoreChangedNotificationMessage(virtualDAAScore)
notification := appmessage.NewVirtualDaaScoreChangedNotificationMessage(virtualDAAScore, rpccontext.DefaultNotificationId)
return m.context.NotificationManager.NotifyVirtualDaaScoreChanged(notification)
}
@ -236,7 +236,7 @@ func (m *Manager) notifyVirtualSelectedParentChainChanged(virtualChangeSet *exte
}
notification, err := m.context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage(
virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs)
virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs, rpccontext.DefaultNotificationId)
if err != nil {
return err
}

View File

@ -9,7 +9,7 @@ import (
// ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage converts
// VirtualSelectedParentChainChanges to VirtualSelectedParentChainChangedNotificationMessage
func (ctx *Context) ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage(
selectedParentChainChanges *externalapi.SelectedChainPath, includeAcceptedTransactionIDs bool) (
selectedParentChainChanges *externalapi.SelectedChainPath, includeAcceptedTransactionIDs bool, id string) (
*appmessage.VirtualSelectedParentChainChangedNotificationMessage, error) {
removedChainBlockHashes := make([]string, len(selectedParentChainChanges.Removed))
@ -32,7 +32,7 @@ func (ctx *Context) ConvertVirtualSelectedParentChainChangesToChainChangedNotifi
}
return appmessage.NewVirtualSelectedParentChainChangedNotificationMessage(
removedChainBlockHashes, addedChainBlocks, acceptedTransactionIDs), nil
removedChainBlockHashes, addedChainBlocks, acceptedTransactionIDs, id), nil
}
func (ctx *Context) getAndConvertAcceptedTransactionIDs(selectedParentChainChanges *externalapi.SelectedChainPath) (

View File

@ -13,6 +13,8 @@ import (
"github.com/pkg/errors"
)
const DefaultNotificationId = "" //empty string corrosponds to defualt grpc string value, and hence id value when not supplied
// NotificationManager manages notifications for the RPC
type NotificationManager struct {
sync.RWMutex
@ -111,6 +113,9 @@ func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAd
for router, listener := range nm.listeners {
if listener.propagateBlockAddedNotifications {
notification.Id = listener.propagateBlockAddedNotificationsId
err := router.OutgoingRoute().MaybeEnqueue(notification)
if err != nil {
return err
@ -136,6 +141,8 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentChainChanged(
if listener.propagateVirtualSelectedParentChainChangedNotifications {
var err error
notification.Id = listener.propagateVirtualSelectedParentChainChangedNotificationsId
if listener.includeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications {
err = router.OutgoingRoute().MaybeEnqueue(notification)
} else {
@ -169,6 +176,9 @@ func (nm *NotificationManager) NotifyFinalityConflict(notification *appmessage.F
for router, listener := range nm.listeners {
if listener.propagateFinalityConflictNotifications {
notification.Id = listener.propagateFinalityConflictNotificationsId
err := router.OutgoingRoute().Enqueue(notification)
if err != nil {
return err
@ -185,6 +195,9 @@ func (nm *NotificationManager) NotifyFinalityConflictResolved(notification *appm
for router, listener := range nm.listeners {
if listener.propagateFinalityConflictResolvedNotifications {
notification.Id = listener.propagateFinalityConflictResolvedNotificationsId
err := router.OutgoingRoute().Enqueue(notification)
if err != nil {
return err
@ -207,6 +220,8 @@ func (nm *NotificationManager) NotifyUTXOsChanged(utxoChanges *utxoindex.UTXOCha
return err
}
notification.Id = listener.propagateUTXOsChangedNotificationsId
// Don't send the notification if it's empty
if len(notification.Added) == 0 && len(notification.Removed) == 0 {
continue
@ -232,6 +247,9 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentBlueScoreChanged(
for router, listener := range nm.listeners {
if listener.propagateVirtualSelectedParentBlueScoreChangedNotifications {
notification.Id = listener.propagateVirtualSelectedParentBlueScoreChangedNotificationsId
err := router.OutgoingRoute().MaybeEnqueue(notification)
if err != nil {
return err
@ -251,6 +269,9 @@ func (nm *NotificationManager) NotifyVirtualDaaScoreChanged(
for router, listener := range nm.listeners {
if listener.propagateVirtualDaaScoreChangedNotifications {
notification.Id = listener.propagateVirtualDaaScoreChangedNotificationsId
err := router.OutgoingRoute().MaybeEnqueue(notification)
if err != nil {
return err
@ -270,6 +291,9 @@ func (nm *NotificationManager) NotifyNewBlockTemplate(
for router, listener := range nm.listeners {
if listener.propagateNewBlockTemplateNotifications {
notification.Id = listener.propagateNewBlockTemplateNotificationsId
err := router.OutgoingRoute().Enqueue(notification)
if err != nil {
return err

View File

@ -27,7 +27,7 @@ func HandleGetVirtualSelectedParentChainFromBlock(context *rpccontext.Context, _
}
chainChangedNotification, err := context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage(
virtualSelectedParentChain, getVirtualSelectedParentChainFromBlockRequest.IncludeAcceptedTransactionIDs)
virtualSelectedParentChain, getVirtualSelectedParentChainFromBlockRequest.IncludeAcceptedTransactionIDs, rpccontext.DefaultNotificationId)
if err != nil {
return nil, err
}

View File

@ -4,12 +4,13 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
)
// RegisterForBlockAddedNotifications sends an RPC request respective to the function's name and returns the RPC server's response.
// Additionally, it starts listening for the appropriate notification using the given handler function
func (c *RPCClient) RegisterForBlockAddedNotifications(onBlockAdded func(notification *appmessage.BlockAddedNotificationMessage)) error {
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyBlockAddedRequestMessage())
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyBlockAddedRequestMessage(rpccontext.DefaultNotificationId))
if err != nil {
return err
}

View File

@ -4,6 +4,7 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
)
// RegisterForVirtualSelectedParentChainChangedNotifications sends an RPC request respective to the function's name and returns the RPC server's response.
@ -12,7 +13,7 @@ func (c *RPCClient) RegisterForVirtualSelectedParentChainChangedNotifications(in
onChainChanged func(notification *appmessage.VirtualSelectedParentChainChangedNotificationMessage)) error {
err := c.rpcRouter.outgoingRoute().Enqueue(
appmessage.NewNotifyVirtualSelectedParentChainChangedRequestMessage(includeAcceptedTransactionIDs))
appmessage.NewNotifyVirtualSelectedParentChainChangedRequestMessage(includeAcceptedTransactionIDs, rpccontext.DefaultNotificationId))
if err != nil {
return err
}

View File

@ -4,6 +4,7 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
)
// RegisterForFinalityConflictsNotifications sends an RPC request respective to the function's name and returns the RPC server's response.
@ -12,7 +13,7 @@ func (c *RPCClient) RegisterForFinalityConflictsNotifications(
onFinalityConflict func(notification *appmessage.FinalityConflictNotificationMessage),
onFinalityConflictResolved func(notification *appmessage.FinalityConflictResolvedNotificationMessage)) error {
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyFinalityConflictsRequestMessage())
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyFinalityConflictsRequestMessage(rpccontext.DefaultNotificationId))
if err != nil {
return err
}

View File

@ -4,12 +4,13 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
)
// RegisterForNewBlockTemplateNotifications sends an RPC request respective to the function's name and returns the RPC server's response.
// Additionally, it starts listening for the appropriate notification using the given handler function
func (c *RPCClient) RegisterForNewBlockTemplateNotifications(onNewBlockTemplate func(notification *appmessage.NewBlockTemplateNotificationMessage)) error {
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyNewBlockTemplateRequestMessage())
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyNewBlockTemplateRequestMessage(rpccontext.DefaultNotificationId))
if err != nil {
return err
}

View File

@ -4,13 +4,14 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
)
// RegisterPruningPointUTXOSetNotifications sends an RPC request respective to the function's name and returns the RPC server's response.
// Additionally, it starts listening for the appropriate notification using the given handler function
func (c *RPCClient) RegisterPruningPointUTXOSetNotifications(onPruningPointUTXOSetNotifications func()) error {
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyPruningPointUTXOSetOverrideRequestMessage())
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyPruningPointUTXOSetOverrideRequestMessage(rpccontext.DefaultNotificationId))
if err != nil {
return err
}
@ -42,7 +43,7 @@ func (c *RPCClient) RegisterPruningPointUTXOSetNotifications(onPruningPointUTXOS
// Additionally, it stops listening for the appropriate notification using the given handler function
func (c *RPCClient) UnregisterPruningPointUTXOSetNotifications() error {
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewStopNotifyingPruningPointUTXOSetOverrideRequestMessage())
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewStopNotifyingPruningPointUTXOSetOverrideRequestMessage(rpccontext.DefaultNotificationId))
if err != nil {
return err
}

View File

@ -4,6 +4,7 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
)
// RegisterForUTXOsChangedNotifications sends an RPC request respective to the function's name and returns the RPC server's response.
@ -11,7 +12,7 @@ import (
func (c *RPCClient) RegisterForUTXOsChangedNotifications(addresses []string,
onUTXOsChanged func(notification *appmessage.UTXOsChangedNotificationMessage)) error {
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyUTXOsChangedRequestMessage(addresses))
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyUTXOsChangedRequestMessage(addresses, rpccontext.DefaultNotificationId))
if err != nil {
return err
}

View File

@ -4,6 +4,7 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
)
// RegisterForVirtualDaaScoreChangedNotifications sends an RPC request respective to the function's
@ -12,7 +13,7 @@ import (
func (c *RPCClient) RegisterForVirtualDaaScoreChangedNotifications(
onVirtualDaaScoreChanged func(notification *appmessage.VirtualDaaScoreChangedNotificationMessage)) error {
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyVirtualDaaScoreChangedRequestMessage())
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyVirtualDaaScoreChangedRequestMessage(rpccontext.DefaultNotificationId))
if err != nil {
return err
}

View File

@ -4,6 +4,7 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
)
// RegisterForVirtualSelectedParentBlueScoreChangedNotifications sends an RPC request respective to the function's
@ -12,7 +13,7 @@ import (
func (c *RPCClient) RegisterForVirtualSelectedParentBlueScoreChangedNotifications(
onVirtualSelectedParentBlueScoreChanged func(notification *appmessage.VirtualSelectedParentBlueScoreChangedNotificationMessage)) error {
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyVirtualSelectedParentBlueScoreChangedRequestMessage())
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewNotifyVirtualSelectedParentBlueScoreChangedRequestMessage(rpccontext.DefaultNotificationId))
if err != nil {
return err
}