[NOD-1404] Remove most of the notification manager to fix a deadlock (#936)

* [NOD-1404] Remove most of the notification manager to fix a deadlock.

* [NOD-1404] Rename a couple of fields.

* [NOD-1404] Fix merge errors.

* [NOD-1404] Remove most of the notification manager to fix a deadlock (#935)

* [NOD-1404] Remove most of the notification manager to fix a deadlock.

* [NOD-1404] Rename a couple of fields.
This commit is contained in:
stasatdaglabs 2020-09-23 14:00:05 +03:00 committed by GitHub
parent 34a1b30006
commit fed34273a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 75 additions and 138 deletions

View File

@ -19,7 +19,10 @@ func (f *FlowContext) OnNewBlock(block *util.Block) error {
return err return err
} }
if f.onBlockAddedToDAGHandler != nil { if f.onBlockAddedToDAGHandler != nil {
f.onBlockAddedToDAGHandler(block) err := f.onBlockAddedToDAGHandler(block)
if err != nil {
return err
}
} }
return f.broadcastTransactionsAfterBlockAdded(block, transactionsAcceptedToMempool) return f.broadcastTransactionsAfterBlockAdded(block, transactionsAcceptedToMempool)

View File

@ -20,7 +20,7 @@ import (
// OnBlockAddedToDAGHandler is a handler function that's triggered // OnBlockAddedToDAGHandler is a handler function that's triggered
// when a block is added to the DAG // when a block is added to the DAG
type OnBlockAddedToDAGHandler func(block *util.Block) type OnBlockAddedToDAGHandler func(block *util.Block) error
// OnTransactionAddedToMempoolHandler is a handler function that's triggered // OnTransactionAddedToMempoolHandler is a handler function that's triggered
// when a transaction is added to the mempool // when a transaction is added to the mempool

View File

@ -52,11 +52,11 @@ func NewManager(
} }
// NotifyBlockAddedToDAG notifies the manager that a block has been added to the DAG // NotifyBlockAddedToDAG notifies the manager that a block has been added to the DAG
func (m *Manager) NotifyBlockAddedToDAG(block *util.Block) { func (m *Manager) NotifyBlockAddedToDAG(block *util.Block) error {
m.context.BlockTemplateState.NotifyBlockAdded(block) m.context.BlockTemplateState.NotifyBlockAdded(block)
notification := appmessage.NewBlockAddedNotificationMessage(block.MsgBlock()) notification := appmessage.NewBlockAddedNotificationMessage(block.MsgBlock())
m.context.NotificationManager.NotifyBlockAdded(notification) return m.context.NotificationManager.NotifyBlockAdded(notification)
} }
// NotifyChainChanged notifies the manager that the DAG's selected parent chain has changed // NotifyChainChanged notifies the manager that the DAG's selected parent chain has changed
@ -70,22 +70,19 @@ func (m *Manager) NotifyChainChanged(removedChainBlockHashes []*daghash.Hash, ad
removedChainBlockHashStrings[i] = removedChainBlockHash.String() removedChainBlockHashStrings[i] = removedChainBlockHash.String()
} }
notification := appmessage.NewChainChangedNotificationMessage(removedChainBlockHashStrings, addedChainBlocks) notification := appmessage.NewChainChangedNotificationMessage(removedChainBlockHashStrings, addedChainBlocks)
m.context.NotificationManager.NotifyChainChanged(notification) return m.context.NotificationManager.NotifyChainChanged(notification)
return nil
} }
// NotifyFinalityConflict notifies the manager that there's a finality conflict in the DAG // NotifyFinalityConflict notifies the manager that there's a finality conflict in the DAG
func (m *Manager) NotifyFinalityConflict(violatingBlockHash string) error { func (m *Manager) NotifyFinalityConflict(violatingBlockHash string) error {
notification := appmessage.NewFinalityConflictNotificationMessage(violatingBlockHash) notification := appmessage.NewFinalityConflictNotificationMessage(violatingBlockHash)
m.context.NotificationManager.NotifyFinalityConflict(notification) return m.context.NotificationManager.NotifyFinalityConflict(notification)
return nil
} }
// NotifyFinalityConflictResolved notifies the manager that a finality conflict in the DAG has been resolved // NotifyFinalityConflictResolved notifies the manager that a finality conflict in the DAG has been resolved
func (m *Manager) NotifyFinalityConflictResolved(finalityBlockHash string) error { func (m *Manager) NotifyFinalityConflictResolved(finalityBlockHash string) error {
notification := appmessage.NewFinalityConflictResolvedNotificationMessage(finalityBlockHash) notification := appmessage.NewFinalityConflictResolvedNotificationMessage(finalityBlockHash)
m.context.NotificationManager.NotifyFinalityConflictResolved(notification) return m.context.NotificationManager.NotifyFinalityConflictResolved(notification)
return nil
} }
// NotifyTransactionAddedToMempool notifies the manager that a transaction has been added to the mempool // NotifyTransactionAddedToMempool notifies the manager that a transaction has been added to the mempool

View File

@ -42,16 +42,12 @@ func (m *Manager) routerInitializer(router *router.Router, netConnection *netada
if err != nil { if err != nil {
panic(err) panic(err)
} }
spawn("routerInitializer-handleIncomingMessages", func() { m.context.NotificationManager.AddListener(router)
err := m.handleIncomingMessages(router, incomingRoute)
m.handleError(err, netConnection)
})
notificationListener := m.context.NotificationManager.AddListener(router) spawn("routerInitializer-handleIncomingMessages", func() {
spawn("routerInitializer-handleOutgoingNotifications", func() {
defer m.context.NotificationManager.RemoveListener(router) defer m.context.NotificationManager.RemoveListener(router)
err := m.handleOutgoingNotifications(notificationListener) err := m.handleIncomingMessages(router, incomingRoute)
m.handleError(err, netConnection) m.handleError(err, netConnection)
}) })
} }
@ -78,15 +74,6 @@ func (m *Manager) handleIncomingMessages(router *router.Router, incomingRoute *r
} }
} }
func (m *Manager) handleOutgoingNotifications(notificationListener *rpccontext.NotificationListener) error {
for {
err := notificationListener.ProcessNextNotification()
if err != nil {
return err
}
}
}
func (m *Manager) handleError(err error, netConnection *netadapter.NetConnection) { func (m *Manager) handleError(err error, netConnection *netadapter.NetConnection) {
if errors.Is(err, router.ErrTimeout) { if errors.Is(err, router.ErrTimeout) {
log.Warnf("Got timeout from %s. Disconnecting...", netConnection) log.Warnf("Got timeout from %s. Disconnecting...", netConnection)

View File

@ -2,7 +2,7 @@ package rpccontext
import ( import (
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors" "github.com/pkg/errors"
"sync" "sync"
) )
@ -10,68 +10,43 @@ import (
// NotificationManager manages notifications for the RPC // NotificationManager manages notifications for the RPC
type NotificationManager struct { type NotificationManager struct {
sync.RWMutex sync.RWMutex
listeners map[*router.Router]*NotificationListener listeners map[*routerpkg.Router]*NotificationListener
} }
// OnBlockAddedListener is a listener function for when a block is added to the DAG
type OnBlockAddedListener func(notification *appmessage.BlockAddedNotificationMessage) error
// OnChainChangedListener is a listener function for when the DAG's selected parent chain changes
type OnChainChangedListener func(notification *appmessage.ChainChangedNotificationMessage) error
// OnFinalityConflictListener is a listener function for when there's a finality conflict in the DAG
type OnFinalityConflictListener func(notification *appmessage.FinalityConflictNotificationMessage) error
// OnFinalityConflictResolvedListener is a listener function for when a finality conflict in the DAG has been resolved
type OnFinalityConflictResolvedListener func(notification *appmessage.FinalityConflictResolvedNotificationMessage) error
// NotificationListener represents a registered RPC notification listener // NotificationListener represents a registered RPC notification listener
type NotificationListener struct { type NotificationListener struct {
onBlockAddedListener OnBlockAddedListener propagateBlockAddedNotifications bool
onBlockAddedNotificationChan chan *appmessage.BlockAddedNotificationMessage propagateChainChangedNotifications bool
onChainChangedListener OnChainChangedListener propagateFinalityConflictNotifications bool
onChainChangedNotificationChan chan *appmessage.ChainChangedNotificationMessage propagateFinalityConflictResolvedNotifications bool
onFinalityConflictListener OnFinalityConflictListener
onFinalityConflictNotificationChan chan *appmessage.FinalityConflictNotificationMessage
onFinalityConflictResolvedListener OnFinalityConflictResolvedListener
onFinalityConflictResolvedNotificationChan chan *appmessage.FinalityConflictResolvedNotificationMessage
closeChan chan struct{}
} }
// NewNotificationManager creates a new NotificationManager // NewNotificationManager creates a new NotificationManager
func NewNotificationManager() *NotificationManager { func NewNotificationManager() *NotificationManager {
return &NotificationManager{ return &NotificationManager{
listeners: make(map[*router.Router]*NotificationListener), listeners: make(map[*routerpkg.Router]*NotificationListener),
} }
} }
// AddListener registers a listener with the given router // AddListener registers a listener with the given router
func (nm *NotificationManager) AddListener(router *router.Router) *NotificationListener { func (nm *NotificationManager) AddListener(router *routerpkg.Router) {
nm.Lock() nm.Lock()
defer nm.Unlock() defer nm.Unlock()
listener := newNotificationListener() listener := newNotificationListener()
nm.listeners[router] = listener nm.listeners[router] = listener
return listener
} }
// RemoveListener unregisters the given router // RemoveListener unregisters the given router
func (nm *NotificationManager) RemoveListener(router *router.Router) { func (nm *NotificationManager) RemoveListener(router *routerpkg.Router) {
nm.Lock() nm.Lock()
defer nm.Unlock() defer nm.Unlock()
listener, ok := nm.listeners[router]
if !ok {
return
}
listener.close()
delete(nm.listeners, router) delete(nm.listeners, router)
} }
// Listener retrieves the listener registered with the given router // Listener retrieves the listener registered with the given router
func (nm *NotificationManager) Listener(router *router.Router) (*NotificationListener, error) { func (nm *NotificationManager) Listener(router *routerpkg.Router) (*NotificationListener, error) {
nm.RLock() nm.RLock()
defer nm.RUnlock() defer nm.RUnlock()
@ -83,115 +58,98 @@ func (nm *NotificationManager) Listener(router *router.Router) (*NotificationLis
} }
// NotifyBlockAdded notifies the notification manager that a block has been added to the DAG // NotifyBlockAdded notifies the notification manager that a block has been added to the DAG
func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAddedNotificationMessage) { func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAddedNotificationMessage) error {
nm.RLock() nm.RLock()
defer nm.RUnlock() defer nm.RUnlock()
for _, listener := range nm.listeners { for router, listener := range nm.listeners {
if listener.onBlockAddedListener != nil { if listener.propagateBlockAddedNotifications {
select { err := router.OutgoingRoute().Enqueue(notification)
case listener.onBlockAddedNotificationChan <- notification: if err != nil {
case <-listener.closeChan: return err
continue
} }
} }
} }
return nil
} }
// NotifyChainChanged notifies the notification manager that the DAG's selected parent chain has changed // NotifyChainChanged notifies the notification manager that the DAG's selected parent chain has changed
func (nm *NotificationManager) NotifyChainChanged(message *appmessage.ChainChangedNotificationMessage) { func (nm *NotificationManager) NotifyChainChanged(notification *appmessage.ChainChangedNotificationMessage) error {
nm.RLock() nm.RLock()
defer nm.RUnlock() defer nm.RUnlock()
for _, listener := range nm.listeners { for router, listener := range nm.listeners {
if listener.onChainChangedListener != nil { if listener.propagateChainChangedNotifications {
select { err := router.OutgoingRoute().Enqueue(notification)
case listener.onChainChangedNotificationChan <- message: if err != nil {
case <-listener.closeChan: return err
continue
} }
} }
} }
return nil
} }
// NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG // NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG
func (nm *NotificationManager) NotifyFinalityConflict(message *appmessage.FinalityConflictNotificationMessage) { func (nm *NotificationManager) NotifyFinalityConflict(notification *appmessage.FinalityConflictNotificationMessage) error {
nm.RLock() nm.RLock()
defer nm.RUnlock() defer nm.RUnlock()
for _, listener := range nm.listeners { for router, listener := range nm.listeners {
if listener.onFinalityConflictListener != nil { if listener.propagateFinalityConflictNotifications {
select { err := router.OutgoingRoute().Enqueue(notification)
case listener.onFinalityConflictNotificationChan <- message: if err != nil {
case <-listener.closeChan: return err
continue
} }
} }
} }
return nil
} }
// NotifyFinalityConflictResolved notifies the notification manager that a finality conflict in the DAG has been resolved // NotifyFinalityConflictResolved notifies the notification manager that a finality conflict in the DAG has been resolved
func (nm *NotificationManager) NotifyFinalityConflictResolved(message *appmessage.FinalityConflictResolvedNotificationMessage) { func (nm *NotificationManager) NotifyFinalityConflictResolved(notification *appmessage.FinalityConflictResolvedNotificationMessage) error {
nm.RLock() nm.RLock()
defer nm.RUnlock() defer nm.RUnlock()
for _, listener := range nm.listeners { for router, listener := range nm.listeners {
if listener.onFinalityConflictResolvedListener != nil { if listener.propagateFinalityConflictResolvedNotifications {
select { err := router.OutgoingRoute().Enqueue(notification)
case listener.onFinalityConflictResolvedNotificationChan <- message: if err != nil {
case <-listener.closeChan: return err
continue
} }
} }
} }
return nil
} }
func newNotificationListener() *NotificationListener { func newNotificationListener() *NotificationListener {
return &NotificationListener{ return &NotificationListener{
onBlockAddedNotificationChan: make(chan *appmessage.BlockAddedNotificationMessage), propagateBlockAddedNotifications: false,
onChainChangedNotificationChan: make(chan *appmessage.ChainChangedNotificationMessage), propagateChainChangedNotifications: false,
onFinalityConflictNotificationChan: make(chan *appmessage.FinalityConflictNotificationMessage), propagateFinalityConflictNotifications: false,
onFinalityConflictResolvedNotificationChan: make(chan *appmessage.FinalityConflictResolvedNotificationMessage), propagateFinalityConflictResolvedNotifications: false,
closeChan: make(chan struct{}, 1),
} }
} }
// SetOnBlockAddedListener sets the onBlockAddedListener handler for this listener // PropagateBlockAddedNotifications instructs the listener to send block added notifications
func (nl *NotificationListener) SetOnBlockAddedListener(onBlockAddedListener OnBlockAddedListener) { // to the remote listener
nl.onBlockAddedListener = onBlockAddedListener func (nl *NotificationListener) PropagateBlockAddedNotifications() {
nl.propagateBlockAddedNotifications = true
} }
// SetOnChainChangedListener sets the onChainChangedListener handler for this listener // PropagateChainChangedNotifications instructs the listener to send chain changed notifications
func (nl *NotificationListener) SetOnChainChangedListener(onChainChangedListener OnChainChangedListener) { // to the remote listener
nl.onChainChangedListener = onChainChangedListener func (nl *NotificationListener) PropagateChainChangedNotifications() {
nl.propagateChainChangedNotifications = true
} }
// SetOnFinalityConflictListener sets the onFinalityConflictListener handler for this listener // PropagateFinalityConflictNotifications instructs the listener to send finality conflict notifications
func (nl *NotificationListener) SetOnFinalityConflictListener(onFinalityConflictListener OnFinalityConflictListener) { // to the remote listener
nl.onFinalityConflictListener = onFinalityConflictListener func (nl *NotificationListener) PropagateFinalityConflictNotifications() {
nl.propagateFinalityConflictNotifications = true
} }
// SetOnFinalityConflictResolvedListener sets the onFinalityConflictResolvedListener handler for this listener // PropagateFinalityConflictResolvedNotifications instructs the listener to send finality conflict resolved notifications
func (nl *NotificationListener) SetOnFinalityConflictResolvedListener(onFinalityConflictResolvedListener OnFinalityConflictResolvedListener) { // to the remote listener
nl.onFinalityConflictResolvedListener = onFinalityConflictResolvedListener func (nl *NotificationListener) PropagateFinalityConflictResolvedNotifications() {
} nl.propagateFinalityConflictResolvedNotifications = true
// ProcessNextNotification waits until a notification arrives and processes it
func (nl *NotificationListener) ProcessNextNotification() error {
select {
case block := <-nl.onBlockAddedNotificationChan:
return nl.onBlockAddedListener(block)
case notification := <-nl.onChainChangedNotificationChan:
return nl.onChainChangedListener(notification)
case notification := <-nl.onFinalityConflictNotificationChan:
return nl.onFinalityConflictListener(notification)
case notification := <-nl.onFinalityConflictResolvedNotificationChan:
return nl.onFinalityConflictResolvedListener(notification)
case <-nl.closeChan:
return nil
}
}
func (nl *NotificationListener) close() {
nl.closeChan <- struct{}{}
} }

View File

@ -12,9 +12,7 @@ func HandleNotifyBlockAdded(context *rpccontext.Context, router *router.Router,
if err != nil { if err != nil {
return nil, err return nil, err
} }
listener.SetOnBlockAddedListener(func(notification *appmessage.BlockAddedNotificationMessage) error { listener.PropagateBlockAddedNotifications()
return router.OutgoingRoute().Enqueue(notification)
})
response := appmessage.NewNotifyBlockAddedResponseMessage() response := appmessage.NewNotifyBlockAddedResponseMessage()
return response, nil return response, nil

View File

@ -18,9 +18,7 @@ func HandleNotifyChainChanged(context *rpccontext.Context, router *router.Router
if err != nil { if err != nil {
return nil, err return nil, err
} }
listener.SetOnChainChangedListener(func(message *appmessage.ChainChangedNotificationMessage) error { listener.PropagateChainChangedNotifications()
return router.OutgoingRoute().Enqueue(message)
})
response := appmessage.NewNotifyChainChangedResponseMessage() response := appmessage.NewNotifyChainChangedResponseMessage()
return response, nil return response, nil

View File

@ -12,12 +12,8 @@ func HandleNotifyFinalityConflicts(context *rpccontext.Context, router *router.R
if err != nil { if err != nil {
return nil, err return nil, err
} }
listener.SetOnFinalityConflictListener(func(notification *appmessage.FinalityConflictNotificationMessage) error { listener.PropagateFinalityConflictNotifications()
return router.OutgoingRoute().Enqueue(notification) listener.PropagateFinalityConflictResolvedNotifications()
})
listener.SetOnFinalityConflictResolvedListener(func(notification *appmessage.FinalityConflictResolvedNotificationMessage) error {
return router.OutgoingRoute().Enqueue(notification)
})
response := appmessage.NewNotifyFinalityConflictsResponseMessage() response := appmessage.NewNotifyFinalityConflictsResponseMessage()
return response, nil return response, nil