From 3f80638c86d334379b43ffc69bbfad6ba6d0fe09 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Sun, 21 Aug 2022 01:26:03 +0300 Subject: [PATCH] Add missing locks to notification listener modifications (#2124) --- app/rpc/manager.go | 13 +------ app/rpc/rpccontext/notificationmanager.go | 37 +++++++++++++++---- app/rpc/rpchandlers/notify_utxos_changed.go | 2 +- .../stop_notifying_utxos_changed.go | 2 +- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/app/rpc/manager.go b/app/rpc/manager.go index 137fd6555..c9479666b 100644 --- a/app/rpc/manager.go +++ b/app/rpc/manager.go @@ -223,18 +223,9 @@ func (m *Manager) notifyVirtualSelectedParentChainChanged(virtualChangeSet *exte onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentChainChanged") defer onEnd() - listenersThatPropagateSelectedParentChanged := - m.context.NotificationManager.AllListenersThatPropagateVirtualSelectedParentChainChanged() - if len(listenersThatPropagateSelectedParentChanged) > 0 { - // Generating acceptedTransactionIDs is a heavy operation, so we check if it's needed by any listener. - includeAcceptedTransactionIDs := false - for _, listener := range listenersThatPropagateSelectedParentChanged { - if listener.IncludeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications() { - includeAcceptedTransactionIDs = true - break - } - } + hasListeners, includeAcceptedTransactionIDs := m.context.NotificationManager.HasListenersThatPropagateVirtualSelectedParentChainChanged() + if hasListeners { notification, err := m.context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage( virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs) if err != nil { diff --git a/app/rpc/rpccontext/notificationmanager.go b/app/rpc/rpccontext/notificationmanager.go index 0ba132f17..bcd2db4d7 100644 --- a/app/rpc/rpccontext/notificationmanager.go +++ b/app/rpc/rpccontext/notificationmanager.go @@ -142,16 +142,29 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentChainChanged( return nil } -// AllListenersThatPropagateVirtualSelectedParentChainChanged returns true if there's any listener that is -// subscribed to VirtualSelectedParentChainChanged notifications. -func (nm *NotificationManager) AllListenersThatPropagateVirtualSelectedParentChainChanged() []*NotificationListener { - var listenersThatPropagate []*NotificationListener +// HasListenersThatPropagateVirtualSelectedParentChainChanged returns whether there's any listener that is +// subscribed to VirtualSelectedParentChainChanged notifications as well as checks if any such listener requested +// to include AcceptedTransactionIDs. +func (nm *NotificationManager) HasListenersThatPropagateVirtualSelectedParentChainChanged() (hasListeners, hasListenersThatRequireAcceptedTransactionIDs bool) { + + nm.RLock() + defer nm.RUnlock() + + hasListeners = false + hasListenersThatRequireAcceptedTransactionIDs = false + for _, listener := range nm.listeners { if listener.propagateVirtualSelectedParentChainChangedNotifications { - listenersThatPropagate = append(listenersThatPropagate, listener) + hasListeners = true + // Generating acceptedTransactionIDs is a heavy operation, so we check if it's needed by any listener. + if listener.includeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications { + hasListenersThatRequireAcceptedTransactionIDs = true + break + } } } - return listenersThatPropagate + + return hasListeners, hasListenersThatRequireAcceptedTransactionIDs } // NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG @@ -338,7 +351,11 @@ func (nl *NotificationListener) PropagateFinalityConflictResolvedNotifications() // to the remote listener for the given addresses. Subsequent calls instruct the listener to // send UTXOs changed notifications for those addresses along with the old ones. Duplicate addresses // are ignored. -func (nl *NotificationListener) PropagateUTXOsChangedNotifications(addresses []*UTXOsChangedNotificationAddress) { +func (nm *NotificationManager) PropagateUTXOsChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) { + // Apply a write-lock since the internal listener address map is modified + nm.Lock() + defer nm.Unlock() + if !nl.propagateUTXOsChangedNotifications { nl.propagateUTXOsChangedNotifications = true nl.propagateUTXOsChangedNotificationAddresses = @@ -353,7 +370,11 @@ func (nl *NotificationListener) PropagateUTXOsChangedNotifications(addresses []* // StopPropagatingUTXOsChangedNotifications instructs the listener to stop sending UTXOs // changed notifications to the remote listener for the given addresses. Addresses for which // notifications are not currently sent are ignored. -func (nl *NotificationListener) StopPropagatingUTXOsChangedNotifications(addresses []*UTXOsChangedNotificationAddress) { +func (nm *NotificationManager) StopPropagatingUTXOsChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) { + // Apply a write-lock since the internal listener address map is modified + nm.Lock() + defer nm.Unlock() + if !nl.propagateUTXOsChangedNotifications { return } diff --git a/app/rpc/rpchandlers/notify_utxos_changed.go b/app/rpc/rpchandlers/notify_utxos_changed.go index 41ffe0dd3..e43f10204 100644 --- a/app/rpc/rpchandlers/notify_utxos_changed.go +++ b/app/rpc/rpchandlers/notify_utxos_changed.go @@ -26,7 +26,7 @@ func HandleNotifyUTXOsChanged(context *rpccontext.Context, router *router.Router if err != nil { return nil, err } - listener.PropagateUTXOsChangedNotifications(addresses) + context.NotificationManager.PropagateUTXOsChangedNotifications(listener, addresses) response := appmessage.NewNotifyUTXOsChangedResponseMessage() return response, nil diff --git a/app/rpc/rpchandlers/stop_notifying_utxos_changed.go b/app/rpc/rpchandlers/stop_notifying_utxos_changed.go index 0da89a9bf..2ef376397 100644 --- a/app/rpc/rpchandlers/stop_notifying_utxos_changed.go +++ b/app/rpc/rpchandlers/stop_notifying_utxos_changed.go @@ -26,7 +26,7 @@ func HandleStopNotifyingUTXOsChanged(context *rpccontext.Context, router *router if err != nil { return nil, err } - listener.StopPropagatingUTXOsChangedNotifications(addresses) + context.NotificationManager.StopPropagatingUTXOsChangedNotifications(listener, addresses) response := appmessage.NewStopNotifyingUTXOsChangedResponseMessage() return response, nil