mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-10-14 00:59:33 +00:00
Add missing locks to notification listener modifications (#2124)
This commit is contained in:
parent
266ec6c270
commit
3f80638c86
@ -223,18 +223,9 @@ func (m *Manager) notifyVirtualSelectedParentChainChanged(virtualChangeSet *exte
|
|||||||
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentChainChanged")
|
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentChainChanged")
|
||||||
defer onEnd()
|
defer onEnd()
|
||||||
|
|
||||||
listenersThatPropagateSelectedParentChanged :=
|
hasListeners, includeAcceptedTransactionIDs := m.context.NotificationManager.HasListenersThatPropagateVirtualSelectedParentChainChanged()
|
||||||
m.context.NotificationManager.AllListenersThatPropagateVirtualSelectedParentChainChanged()
|
|
||||||
if len(listenersThatPropagateSelectedParentChanged) > 0 {
|
|
||||||
// Generating acceptedTransactionIDs is a heavy operation, so we check if it's needed by any listener.
|
|
||||||
includeAcceptedTransactionIDs := false
|
|
||||||
for _, listener := range listenersThatPropagateSelectedParentChanged {
|
|
||||||
if listener.IncludeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications() {
|
|
||||||
includeAcceptedTransactionIDs = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if hasListeners {
|
||||||
notification, err := m.context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage(
|
notification, err := m.context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage(
|
||||||
virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs)
|
virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -142,16 +142,29 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentChainChanged(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AllListenersThatPropagateVirtualSelectedParentChainChanged returns true if there's any listener that is
|
// HasListenersThatPropagateVirtualSelectedParentChainChanged returns whether there's any listener that is
|
||||||
// subscribed to VirtualSelectedParentChainChanged notifications.
|
// subscribed to VirtualSelectedParentChainChanged notifications as well as checks if any such listener requested
|
||||||
func (nm *NotificationManager) AllListenersThatPropagateVirtualSelectedParentChainChanged() []*NotificationListener {
|
// to include AcceptedTransactionIDs.
|
||||||
var listenersThatPropagate []*NotificationListener
|
func (nm *NotificationManager) HasListenersThatPropagateVirtualSelectedParentChainChanged() (hasListeners, hasListenersThatRequireAcceptedTransactionIDs bool) {
|
||||||
|
|
||||||
|
nm.RLock()
|
||||||
|
defer nm.RUnlock()
|
||||||
|
|
||||||
|
hasListeners = false
|
||||||
|
hasListenersThatRequireAcceptedTransactionIDs = false
|
||||||
|
|
||||||
for _, listener := range nm.listeners {
|
for _, listener := range nm.listeners {
|
||||||
if listener.propagateVirtualSelectedParentChainChangedNotifications {
|
if listener.propagateVirtualSelectedParentChainChangedNotifications {
|
||||||
listenersThatPropagate = append(listenersThatPropagate, listener)
|
hasListeners = true
|
||||||
|
// Generating acceptedTransactionIDs is a heavy operation, so we check if it's needed by any listener.
|
||||||
|
if listener.includeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications {
|
||||||
|
hasListenersThatRequireAcceptedTransactionIDs = true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return listenersThatPropagate
|
}
|
||||||
|
|
||||||
|
return hasListeners, hasListenersThatRequireAcceptedTransactionIDs
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG
|
// NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG
|
||||||
@ -338,7 +351,11 @@ func (nl *NotificationListener) PropagateFinalityConflictResolvedNotifications()
|
|||||||
// to the remote listener for the given addresses. Subsequent calls instruct the listener to
|
// to the remote listener for the given addresses. Subsequent calls instruct the listener to
|
||||||
// send UTXOs changed notifications for those addresses along with the old ones. Duplicate addresses
|
// send UTXOs changed notifications for those addresses along with the old ones. Duplicate addresses
|
||||||
// are ignored.
|
// are ignored.
|
||||||
func (nl *NotificationListener) PropagateUTXOsChangedNotifications(addresses []*UTXOsChangedNotificationAddress) {
|
func (nm *NotificationManager) PropagateUTXOsChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) {
|
||||||
|
// Apply a write-lock since the internal listener address map is modified
|
||||||
|
nm.Lock()
|
||||||
|
defer nm.Unlock()
|
||||||
|
|
||||||
if !nl.propagateUTXOsChangedNotifications {
|
if !nl.propagateUTXOsChangedNotifications {
|
||||||
nl.propagateUTXOsChangedNotifications = true
|
nl.propagateUTXOsChangedNotifications = true
|
||||||
nl.propagateUTXOsChangedNotificationAddresses =
|
nl.propagateUTXOsChangedNotificationAddresses =
|
||||||
@ -353,7 +370,11 @@ func (nl *NotificationListener) PropagateUTXOsChangedNotifications(addresses []*
|
|||||||
// StopPropagatingUTXOsChangedNotifications instructs the listener to stop sending UTXOs
|
// StopPropagatingUTXOsChangedNotifications instructs the listener to stop sending UTXOs
|
||||||
// changed notifications to the remote listener for the given addresses. Addresses for which
|
// changed notifications to the remote listener for the given addresses. Addresses for which
|
||||||
// notifications are not currently sent are ignored.
|
// notifications are not currently sent are ignored.
|
||||||
func (nl *NotificationListener) StopPropagatingUTXOsChangedNotifications(addresses []*UTXOsChangedNotificationAddress) {
|
func (nm *NotificationManager) StopPropagatingUTXOsChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) {
|
||||||
|
// Apply a write-lock since the internal listener address map is modified
|
||||||
|
nm.Lock()
|
||||||
|
defer nm.Unlock()
|
||||||
|
|
||||||
if !nl.propagateUTXOsChangedNotifications {
|
if !nl.propagateUTXOsChangedNotifications {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ func HandleNotifyUTXOsChanged(context *rpccontext.Context, router *router.Router
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
listener.PropagateUTXOsChangedNotifications(addresses)
|
context.NotificationManager.PropagateUTXOsChangedNotifications(listener, addresses)
|
||||||
|
|
||||||
response := appmessage.NewNotifyUTXOsChangedResponseMessage()
|
response := appmessage.NewNotifyUTXOsChangedResponseMessage()
|
||||||
return response, nil
|
return response, nil
|
||||||
|
@ -26,7 +26,7 @@ func HandleStopNotifyingUTXOsChanged(context *rpccontext.Context, router *router
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
listener.StopPropagatingUTXOsChangedNotifications(addresses)
|
context.NotificationManager.StopPropagatingUTXOsChangedNotifications(listener, addresses)
|
||||||
|
|
||||||
response := appmessage.NewStopNotifyingUTXOsChangedResponseMessage()
|
response := appmessage.NewStopNotifyingUTXOsChangedResponseMessage()
|
||||||
return response, nil
|
return response, nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user