diff --git a/app/protocol/flowcontext/blocks.go b/app/protocol/flowcontext/blocks.go index 0e29ef478..f77e3156d 100644 --- a/app/protocol/flowcontext/blocks.go +++ b/app/protocol/flowcontext/blocks.go @@ -19,7 +19,10 @@ func (f *FlowContext) OnNewBlock(block *util.Block) error { return err } if f.onBlockAddedToDAGHandler != nil { - f.onBlockAddedToDAGHandler(block) + err := f.onBlockAddedToDAGHandler(block) + if err != nil { + return err + } } return f.broadcastTransactionsAfterBlockAdded(block, transactionsAcceptedToMempool) diff --git a/app/protocol/flowcontext/flow_context.go b/app/protocol/flowcontext/flow_context.go index bb56518c1..7e62c7f59 100644 --- a/app/protocol/flowcontext/flow_context.go +++ b/app/protocol/flowcontext/flow_context.go @@ -20,7 +20,7 @@ import ( // OnBlockAddedToDAGHandler is a handler function that's triggered // when a block is added to the DAG -type OnBlockAddedToDAGHandler func(block *util.Block) +type OnBlockAddedToDAGHandler func(block *util.Block) error // OnTransactionAddedToMempoolHandler is a handler function that's triggered // when a transaction is added to the mempool diff --git a/app/rpc/manager.go b/app/rpc/manager.go index f1201dda1..2270095fb 100644 --- a/app/rpc/manager.go +++ b/app/rpc/manager.go @@ -52,11 +52,11 @@ func NewManager( } // NotifyBlockAddedToDAG notifies the manager that a block has been added to the DAG -func (m *Manager) NotifyBlockAddedToDAG(block *util.Block) { +func (m *Manager) NotifyBlockAddedToDAG(block *util.Block) error { m.context.BlockTemplateState.NotifyBlockAdded(block) notification := appmessage.NewBlockAddedNotificationMessage(block.MsgBlock()) - m.context.NotificationManager.NotifyBlockAdded(notification) + return m.context.NotificationManager.NotifyBlockAdded(notification) } // NotifyChainChanged notifies the manager that the DAG's selected parent chain has changed @@ -70,22 +70,19 @@ func (m *Manager) NotifyChainChanged(removedChainBlockHashes []*daghash.Hash, ad removedChainBlockHashStrings[i] = removedChainBlockHash.String() } notification := appmessage.NewChainChangedNotificationMessage(removedChainBlockHashStrings, addedChainBlocks) - m.context.NotificationManager.NotifyChainChanged(notification) - return nil + return m.context.NotificationManager.NotifyChainChanged(notification) } // NotifyFinalityConflict notifies the manager that there's a finality conflict in the DAG func (m *Manager) NotifyFinalityConflict(violatingBlockHash string) error { notification := appmessage.NewFinalityConflictNotificationMessage(violatingBlockHash) - m.context.NotificationManager.NotifyFinalityConflict(notification) - return nil + return m.context.NotificationManager.NotifyFinalityConflict(notification) } // NotifyFinalityConflictResolved notifies the manager that a finality conflict in the DAG has been resolved func (m *Manager) NotifyFinalityConflictResolved(finalityBlockHash string) error { notification := appmessage.NewFinalityConflictResolvedNotificationMessage(finalityBlockHash) - m.context.NotificationManager.NotifyFinalityConflictResolved(notification) - return nil + return m.context.NotificationManager.NotifyFinalityConflictResolved(notification) } // NotifyTransactionAddedToMempool notifies the manager that a transaction has been added to the mempool diff --git a/app/rpc/rpc.go b/app/rpc/rpc.go index b38170641..f40d11a82 100644 --- a/app/rpc/rpc.go +++ b/app/rpc/rpc.go @@ -42,16 +42,12 @@ func (m *Manager) routerInitializer(router *router.Router, netConnection *netada if err != nil { panic(err) } - spawn("routerInitializer-handleIncomingMessages", func() { - err := m.handleIncomingMessages(router, incomingRoute) - m.handleError(err, netConnection) - }) + m.context.NotificationManager.AddListener(router) - notificationListener := m.context.NotificationManager.AddListener(router) - spawn("routerInitializer-handleOutgoingNotifications", func() { + spawn("routerInitializer-handleIncomingMessages", func() { defer m.context.NotificationManager.RemoveListener(router) - err := m.handleOutgoingNotifications(notificationListener) + err := m.handleIncomingMessages(router, incomingRoute) m.handleError(err, netConnection) }) } @@ -78,15 +74,6 @@ func (m *Manager) handleIncomingMessages(router *router.Router, incomingRoute *r } } -func (m *Manager) handleOutgoingNotifications(notificationListener *rpccontext.NotificationListener) error { - for { - err := notificationListener.ProcessNextNotification() - if err != nil { - return err - } - } -} - func (m *Manager) handleError(err error, netConnection *netadapter.NetConnection) { if errors.Is(err, router.ErrTimeout) { log.Warnf("Got timeout from %s. Disconnecting...", netConnection) diff --git a/app/rpc/rpccontext/notificationmanager.go b/app/rpc/rpccontext/notificationmanager.go index c7d7f0a6a..2a6ed51fb 100644 --- a/app/rpc/rpccontext/notificationmanager.go +++ b/app/rpc/rpccontext/notificationmanager.go @@ -2,7 +2,7 @@ package rpccontext import ( "github.com/kaspanet/kaspad/app/appmessage" - "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" + routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" "sync" ) @@ -10,68 +10,43 @@ import ( // NotificationManager manages notifications for the RPC type NotificationManager struct { sync.RWMutex - listeners map[*router.Router]*NotificationListener + listeners map[*routerpkg.Router]*NotificationListener } -// OnBlockAddedListener is a listener function for when a block is added to the DAG -type OnBlockAddedListener func(notification *appmessage.BlockAddedNotificationMessage) error - -// OnChainChangedListener is a listener function for when the DAG's selected parent chain changes -type OnChainChangedListener func(notification *appmessage.ChainChangedNotificationMessage) error - -// OnFinalityConflictListener is a listener function for when there's a finality conflict in the DAG -type OnFinalityConflictListener func(notification *appmessage.FinalityConflictNotificationMessage) error - -// OnFinalityConflictResolvedListener is a listener function for when a finality conflict in the DAG has been resolved -type OnFinalityConflictResolvedListener func(notification *appmessage.FinalityConflictResolvedNotificationMessage) error - // NotificationListener represents a registered RPC notification listener type NotificationListener struct { - onBlockAddedListener OnBlockAddedListener - onBlockAddedNotificationChan chan *appmessage.BlockAddedNotificationMessage - onChainChangedListener OnChainChangedListener - onChainChangedNotificationChan chan *appmessage.ChainChangedNotificationMessage - onFinalityConflictListener OnFinalityConflictListener - onFinalityConflictNotificationChan chan *appmessage.FinalityConflictNotificationMessage - onFinalityConflictResolvedListener OnFinalityConflictResolvedListener - onFinalityConflictResolvedNotificationChan chan *appmessage.FinalityConflictResolvedNotificationMessage - - closeChan chan struct{} + propagateBlockAddedNotifications bool + propagateChainChangedNotifications bool + propagateFinalityConflictNotifications bool + propagateFinalityConflictResolvedNotifications bool } // NewNotificationManager creates a new NotificationManager func NewNotificationManager() *NotificationManager { return &NotificationManager{ - listeners: make(map[*router.Router]*NotificationListener), + listeners: make(map[*routerpkg.Router]*NotificationListener), } } // AddListener registers a listener with the given router -func (nm *NotificationManager) AddListener(router *router.Router) *NotificationListener { +func (nm *NotificationManager) AddListener(router *routerpkg.Router) { nm.Lock() defer nm.Unlock() listener := newNotificationListener() nm.listeners[router] = listener - return listener } // RemoveListener unregisters the given router -func (nm *NotificationManager) RemoveListener(router *router.Router) { +func (nm *NotificationManager) RemoveListener(router *routerpkg.Router) { nm.Lock() defer nm.Unlock() - listener, ok := nm.listeners[router] - if !ok { - return - } - listener.close() - delete(nm.listeners, router) } // Listener retrieves the listener registered with the given router -func (nm *NotificationManager) Listener(router *router.Router) (*NotificationListener, error) { +func (nm *NotificationManager) Listener(router *routerpkg.Router) (*NotificationListener, error) { nm.RLock() defer nm.RUnlock() @@ -83,115 +58,98 @@ func (nm *NotificationManager) Listener(router *router.Router) (*NotificationLis } // NotifyBlockAdded notifies the notification manager that a block has been added to the DAG -func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAddedNotificationMessage) { +func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAddedNotificationMessage) error { nm.RLock() defer nm.RUnlock() - for _, listener := range nm.listeners { - if listener.onBlockAddedListener != nil { - select { - case listener.onBlockAddedNotificationChan <- notification: - case <-listener.closeChan: - continue + for router, listener := range nm.listeners { + if listener.propagateBlockAddedNotifications { + err := router.OutgoingRoute().Enqueue(notification) + if err != nil { + return err } } } + return nil } // NotifyChainChanged notifies the notification manager that the DAG's selected parent chain has changed -func (nm *NotificationManager) NotifyChainChanged(message *appmessage.ChainChangedNotificationMessage) { +func (nm *NotificationManager) NotifyChainChanged(notification *appmessage.ChainChangedNotificationMessage) error { nm.RLock() defer nm.RUnlock() - for _, listener := range nm.listeners { - if listener.onChainChangedListener != nil { - select { - case listener.onChainChangedNotificationChan <- message: - case <-listener.closeChan: - continue + for router, listener := range nm.listeners { + if listener.propagateChainChangedNotifications { + err := router.OutgoingRoute().Enqueue(notification) + if err != nil { + return err } } } + return nil } // NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG -func (nm *NotificationManager) NotifyFinalityConflict(message *appmessage.FinalityConflictNotificationMessage) { +func (nm *NotificationManager) NotifyFinalityConflict(notification *appmessage.FinalityConflictNotificationMessage) error { nm.RLock() defer nm.RUnlock() - for _, listener := range nm.listeners { - if listener.onFinalityConflictListener != nil { - select { - case listener.onFinalityConflictNotificationChan <- message: - case <-listener.closeChan: - continue + for router, listener := range nm.listeners { + if listener.propagateFinalityConflictNotifications { + err := router.OutgoingRoute().Enqueue(notification) + if err != nil { + return err } } } + return nil } // NotifyFinalityConflictResolved notifies the notification manager that a finality conflict in the DAG has been resolved -func (nm *NotificationManager) NotifyFinalityConflictResolved(message *appmessage.FinalityConflictResolvedNotificationMessage) { +func (nm *NotificationManager) NotifyFinalityConflictResolved(notification *appmessage.FinalityConflictResolvedNotificationMessage) error { nm.RLock() defer nm.RUnlock() - for _, listener := range nm.listeners { - if listener.onFinalityConflictResolvedListener != nil { - select { - case listener.onFinalityConflictResolvedNotificationChan <- message: - case <-listener.closeChan: - continue + for router, listener := range nm.listeners { + if listener.propagateFinalityConflictResolvedNotifications { + err := router.OutgoingRoute().Enqueue(notification) + if err != nil { + return err } } } + return nil } func newNotificationListener() *NotificationListener { return &NotificationListener{ - onBlockAddedNotificationChan: make(chan *appmessage.BlockAddedNotificationMessage), - onChainChangedNotificationChan: make(chan *appmessage.ChainChangedNotificationMessage), - onFinalityConflictNotificationChan: make(chan *appmessage.FinalityConflictNotificationMessage), - onFinalityConflictResolvedNotificationChan: make(chan *appmessage.FinalityConflictResolvedNotificationMessage), - closeChan: make(chan struct{}, 1), + propagateBlockAddedNotifications: false, + propagateChainChangedNotifications: false, + propagateFinalityConflictNotifications: false, + propagateFinalityConflictResolvedNotifications: false, } } -// SetOnBlockAddedListener sets the onBlockAddedListener handler for this listener -func (nl *NotificationListener) SetOnBlockAddedListener(onBlockAddedListener OnBlockAddedListener) { - nl.onBlockAddedListener = onBlockAddedListener +// PropagateBlockAddedNotifications instructs the listener to send block added notifications +// to the remote listener +func (nl *NotificationListener) PropagateBlockAddedNotifications() { + nl.propagateBlockAddedNotifications = true } -// SetOnChainChangedListener sets the onChainChangedListener handler for this listener -func (nl *NotificationListener) SetOnChainChangedListener(onChainChangedListener OnChainChangedListener) { - nl.onChainChangedListener = onChainChangedListener +// PropagateChainChangedNotifications instructs the listener to send chain changed notifications +// to the remote listener +func (nl *NotificationListener) PropagateChainChangedNotifications() { + nl.propagateChainChangedNotifications = true } -// SetOnFinalityConflictListener sets the onFinalityConflictListener handler for this listener -func (nl *NotificationListener) SetOnFinalityConflictListener(onFinalityConflictListener OnFinalityConflictListener) { - nl.onFinalityConflictListener = onFinalityConflictListener +// PropagateFinalityConflictNotifications instructs the listener to send finality conflict notifications +// to the remote listener +func (nl *NotificationListener) PropagateFinalityConflictNotifications() { + nl.propagateFinalityConflictNotifications = true } -// SetOnFinalityConflictResolvedListener sets the onFinalityConflictResolvedListener handler for this listener -func (nl *NotificationListener) SetOnFinalityConflictResolvedListener(onFinalityConflictResolvedListener OnFinalityConflictResolvedListener) { - nl.onFinalityConflictResolvedListener = onFinalityConflictResolvedListener -} - -// ProcessNextNotification waits until a notification arrives and processes it -func (nl *NotificationListener) ProcessNextNotification() error { - select { - case block := <-nl.onBlockAddedNotificationChan: - return nl.onBlockAddedListener(block) - case notification := <-nl.onChainChangedNotificationChan: - return nl.onChainChangedListener(notification) - case notification := <-nl.onFinalityConflictNotificationChan: - return nl.onFinalityConflictListener(notification) - case notification := <-nl.onFinalityConflictResolvedNotificationChan: - return nl.onFinalityConflictResolvedListener(notification) - case <-nl.closeChan: - return nil - } -} - -func (nl *NotificationListener) close() { - nl.closeChan <- struct{}{} +// PropagateFinalityConflictResolvedNotifications instructs the listener to send finality conflict resolved notifications +// to the remote listener +func (nl *NotificationListener) PropagateFinalityConflictResolvedNotifications() { + nl.propagateFinalityConflictResolvedNotifications = true } diff --git a/app/rpc/rpchandlers/notify_block_added.go b/app/rpc/rpchandlers/notify_block_added.go index b86ef03b8..2929aa662 100644 --- a/app/rpc/rpchandlers/notify_block_added.go +++ b/app/rpc/rpchandlers/notify_block_added.go @@ -12,9 +12,7 @@ func HandleNotifyBlockAdded(context *rpccontext.Context, router *router.Router, if err != nil { return nil, err } - listener.SetOnBlockAddedListener(func(notification *appmessage.BlockAddedNotificationMessage) error { - return router.OutgoingRoute().Enqueue(notification) - }) + listener.PropagateBlockAddedNotifications() response := appmessage.NewNotifyBlockAddedResponseMessage() return response, nil diff --git a/app/rpc/rpchandlers/notify_chain_changed.go b/app/rpc/rpchandlers/notify_chain_changed.go index 872c9dced..a7f18c0b0 100644 --- a/app/rpc/rpchandlers/notify_chain_changed.go +++ b/app/rpc/rpchandlers/notify_chain_changed.go @@ -18,9 +18,7 @@ func HandleNotifyChainChanged(context *rpccontext.Context, router *router.Router if err != nil { return nil, err } - listener.SetOnChainChangedListener(func(message *appmessage.ChainChangedNotificationMessage) error { - return router.OutgoingRoute().Enqueue(message) - }) + listener.PropagateChainChangedNotifications() response := appmessage.NewNotifyChainChangedResponseMessage() return response, nil diff --git a/app/rpc/rpchandlers/notify_finality_conflicts.go b/app/rpc/rpchandlers/notify_finality_conflicts.go index 23de33779..7864797f6 100644 --- a/app/rpc/rpchandlers/notify_finality_conflicts.go +++ b/app/rpc/rpchandlers/notify_finality_conflicts.go @@ -12,12 +12,8 @@ func HandleNotifyFinalityConflicts(context *rpccontext.Context, router *router.R if err != nil { return nil, err } - listener.SetOnFinalityConflictListener(func(notification *appmessage.FinalityConflictNotificationMessage) error { - return router.OutgoingRoute().Enqueue(notification) - }) - listener.SetOnFinalityConflictResolvedListener(func(notification *appmessage.FinalityConflictResolvedNotificationMessage) error { - return router.OutgoingRoute().Enqueue(notification) - }) + listener.PropagateFinalityConflictNotifications() + listener.PropagateFinalityConflictResolvedNotifications() response := appmessage.NewNotifyFinalityConflictsResponseMessage() return response, nil