mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-05-29 02:06:43 +00:00

* [NOD-1579] Rename AcceptedTxIDs to AcceptedTransactionIDs. * [NOD-1579] Add InsertBlockResult to ValidateAndInsertBlock results. * [NOD-1593] Rename InsertBlockResult to BlockInsertionResult. * [NOD-1593] Add SelectedParentChainChanges to AddBlockToVirtual's result. * [NOD-1593] Implement findSelectedParentChainChanges. * [NOD-1593] Implement TestFindSelectedParentChainChanges. * [NOD-1593] Fix a string. * [NOD-1593] Finish implementing TestFindSelectedParentChainChanges. * [NOD-1593] Fix merge errors. * [NOD-1597] Begin implementing UTXOIndex. * [NOD-1597] Connect UTXOIndex to RPC. * [NOD-1597] Connect Consensus to UTXOIndex. * [NOD-1597] Add AcceptanceData to BlockInfo. * [NOD-1597] Implement UTXOIndex.Update(). * [NOD-1597] Implement add(), remove(), and discard() in utxoIndexStore. * [NOD-1597] Add error cases to add() and remove(). * [NOD-1597] Add special cases to add() and remove(). * [NOD-1597] Implement commit. * [NOD-1597] Add a mutex around UTXOIndex.Update(). * [NOD-1597] Return changes to the UTXO from Update(). * [NOD-1597] Add NotifyUTXOsChangedRequestMessage and related structs. * [NOD-1597] Implement HandleNotifyUTXOsChanged. * [NOD-1597] Begin implementing TestUTXOIndex. * [NOD-1597] Implement RegisterForUTXOsChangedNotifications. * [NOD-1597] Fix bad transaction.ID usage. * [NOD-1597] Implement convertUTXOChangesToUTXOsChangedNotification. * [NOD-1597] Make UTXOsChangedNotificationMessage.Removed UTXOsByAddressesEntry instead of just RPCOutpoint so that the client can discern which address was the UTXO removed for. * [NOD-1597] Collect outpoints in TestUTXOIndex. * [NOD-1597] Rename RPC stuff. * [NOD-1597] Add messages for GetUTXOsByAddresses. * [NOD-1597] Implement HandleGetUTXOsByAddresses. * [NOD-1597] Implement GetUTXOsByAddresses. * [NOD-1597] Implement UTXOs(). * [NOD-1597] Implement getUTXOOutpointEntryPairs(). * [NOD-1597] Expand TestUTXOIndex. * [NOD-1597] Convert SubmitTransaction to use RPCTransaction instead of MsgTx. * [NOD-1597] Finish implementing TestUTXOIndex. * [NOD-1597] Add messages for GetVirtualSelectedParentBlueScore. * [NOD-1597] Implement HandleGetVirtualSelectedParentBlueScore and GetVirtualSelectedParentBlueScore. * [NOD-1597] Implement TestVirtualSelectedParentBlueScore. * [NOD-1597] Implement NotifyVirtualSelectedParentBlueScoreChanged. * [NOD-1597] Expand TestVirtualSelectedParentBlueScore. * [NOD-1597] Implement notifyVirtualSelectedParentBlueScoreChanged. * [NOD-1597] Make go lint happy. * [NOD-1593] Fix merge errors. * [NOD-1593] Rename findSelectedParentChainChanges to calculateSelectedParentChainChanges. * [NOD-1593] Expand TestCalculateSelectedParentChainChanges. * [NOD-1597] Add logs to utxoindex.go. * [NOD-1597] Add logs to utxoindex/store.go. * [NOD-1597] Add logs to RPCManager.NotifyXXX functions. * Implement notifySelectedParentChainChanged. * Implement TestSelectedParentChain. * Rename NotifyChainChanged to NotifyVirtualSelectedParentChainChanged. * Rename GetChainFromBlock to GetVirtualSelectedParentChainFromBlock. * Remove AcceptanceIndex from the config. * Implement HandleGetVirtualSelectedParentChainFromBlock. * Expand TestVirtualSelectedParentChain. * Fix merge errors. * Add a comment. * Move a comment.
254 lines
8.8 KiB
Go
254 lines
8.8 KiB
Go
package rpccontext
|
|
|
|
import (
|
|
"encoding/hex"
|
|
"github.com/kaspanet/kaspad/app/appmessage"
|
|
"github.com/kaspanet/kaspad/domain/utxoindex"
|
|
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
|
|
"github.com/pkg/errors"
|
|
"sync"
|
|
)
|
|
|
|
// NotificationManager manages notifications for the RPC
|
|
type NotificationManager struct {
|
|
sync.RWMutex
|
|
listeners map[*routerpkg.Router]*NotificationListener
|
|
}
|
|
|
|
// UTXOsChangedNotificationAddress represents a kaspad address.
|
|
// This type is meant to be used in UTXOsChanged notifications
|
|
type UTXOsChangedNotificationAddress struct {
|
|
Address string
|
|
ScriptPublicKeyString utxoindex.ScriptPublicKeyString
|
|
}
|
|
|
|
// NotificationListener represents a registered RPC notification listener
|
|
type NotificationListener struct {
|
|
propagateBlockAddedNotifications bool
|
|
propagateVirtualSelectedParentChainChangedNotifications bool
|
|
propagateFinalityConflictNotifications bool
|
|
propagateFinalityConflictResolvedNotifications bool
|
|
propagateUTXOsChangedNotifications bool
|
|
propagateVirtualSelectedParentBlueScoreChangedNotifications bool
|
|
|
|
propagateUTXOsChangedNotificationAddresses []*UTXOsChangedNotificationAddress
|
|
}
|
|
|
|
// NewNotificationManager creates a new NotificationManager
|
|
func NewNotificationManager() *NotificationManager {
|
|
return &NotificationManager{
|
|
listeners: make(map[*routerpkg.Router]*NotificationListener),
|
|
}
|
|
}
|
|
|
|
// AddListener registers a listener with the given router
|
|
func (nm *NotificationManager) AddListener(router *routerpkg.Router) {
|
|
nm.Lock()
|
|
defer nm.Unlock()
|
|
|
|
listener := newNotificationListener()
|
|
nm.listeners[router] = listener
|
|
}
|
|
|
|
// RemoveListener unregisters the given router
|
|
func (nm *NotificationManager) RemoveListener(router *routerpkg.Router) {
|
|
nm.Lock()
|
|
defer nm.Unlock()
|
|
|
|
delete(nm.listeners, router)
|
|
}
|
|
|
|
// Listener retrieves the listener registered with the given router
|
|
func (nm *NotificationManager) Listener(router *routerpkg.Router) (*NotificationListener, error) {
|
|
nm.RLock()
|
|
defer nm.RUnlock()
|
|
|
|
listener, ok := nm.listeners[router]
|
|
if !ok {
|
|
return nil, errors.Errorf("listener not found")
|
|
}
|
|
return listener, nil
|
|
}
|
|
|
|
// NotifyBlockAdded notifies the notification manager that a block has been added to the DAG
|
|
func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAddedNotificationMessage) error {
|
|
nm.RLock()
|
|
defer nm.RUnlock()
|
|
|
|
for router, listener := range nm.listeners {
|
|
if listener.propagateBlockAddedNotifications {
|
|
err := router.OutgoingRoute().Enqueue(notification)
|
|
if errors.Is(err, routerpkg.ErrRouteClosed) {
|
|
log.Warnf("Couldn't send notification: %s", err)
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NotifyVirtualSelectedParentChainChanged notifies the notification manager that the DAG's selected parent chain has changed
|
|
func (nm *NotificationManager) NotifyVirtualSelectedParentChainChanged(notification *appmessage.VirtualSelectedParentChainChangedNotificationMessage) error {
|
|
nm.RLock()
|
|
defer nm.RUnlock()
|
|
|
|
for router, listener := range nm.listeners {
|
|
if listener.propagateVirtualSelectedParentChainChangedNotifications {
|
|
err := router.OutgoingRoute().Enqueue(notification)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG
|
|
func (nm *NotificationManager) NotifyFinalityConflict(notification *appmessage.FinalityConflictNotificationMessage) error {
|
|
nm.RLock()
|
|
defer nm.RUnlock()
|
|
|
|
for router, listener := range nm.listeners {
|
|
if listener.propagateFinalityConflictNotifications {
|
|
err := router.OutgoingRoute().Enqueue(notification)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NotifyFinalityConflictResolved notifies the notification manager that a finality conflict in the DAG has been resolved
|
|
func (nm *NotificationManager) NotifyFinalityConflictResolved(notification *appmessage.FinalityConflictResolvedNotificationMessage) error {
|
|
nm.RLock()
|
|
defer nm.RUnlock()
|
|
|
|
for router, listener := range nm.listeners {
|
|
if listener.propagateFinalityConflictResolvedNotifications {
|
|
err := router.OutgoingRoute().Enqueue(notification)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NotifyUTXOsChanged notifies the notification manager that UTXOs have been changed
|
|
func (nm *NotificationManager) NotifyUTXOsChanged(utxoChanges *utxoindex.UTXOChanges) error {
|
|
nm.RLock()
|
|
defer nm.RUnlock()
|
|
|
|
for router, listener := range nm.listeners {
|
|
if listener.propagateUTXOsChangedNotifications {
|
|
// Filter utxoChanges and create a notification
|
|
notification := listener.convertUTXOChangesToUTXOsChangedNotification(utxoChanges)
|
|
|
|
// Don't send the notification if it's empty
|
|
if len(notification.Added) == 0 && len(notification.Removed) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Enqueue the notification
|
|
err := router.OutgoingRoute().Enqueue(notification)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NotifyVirtualSelectedParentBlueScoreChanged notifies the notification manager that the DAG's
|
|
// virtual selected parent blue score has changed
|
|
func (nm *NotificationManager) NotifyVirtualSelectedParentBlueScoreChanged(
|
|
notification *appmessage.VirtualSelectedParentBlueScoreChangedNotificationMessage) error {
|
|
|
|
nm.RLock()
|
|
defer nm.RUnlock()
|
|
|
|
for router, listener := range nm.listeners {
|
|
if listener.propagateVirtualSelectedParentBlueScoreChangedNotifications {
|
|
err := router.OutgoingRoute().Enqueue(notification)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newNotificationListener() *NotificationListener {
|
|
return &NotificationListener{
|
|
propagateBlockAddedNotifications: false,
|
|
propagateVirtualSelectedParentChainChangedNotifications: false,
|
|
propagateFinalityConflictNotifications: false,
|
|
propagateFinalityConflictResolvedNotifications: false,
|
|
propagateUTXOsChangedNotifications: false,
|
|
propagateVirtualSelectedParentBlueScoreChangedNotifications: false,
|
|
}
|
|
}
|
|
|
|
// PropagateBlockAddedNotifications instructs the listener to send block added notifications
|
|
// to the remote listener
|
|
func (nl *NotificationListener) PropagateBlockAddedNotifications() {
|
|
nl.propagateBlockAddedNotifications = true
|
|
}
|
|
|
|
// PropagateVirtualSelectedParentChainChangedNotifications instructs the listener to send chain changed notifications
|
|
// to the remote listener
|
|
func (nl *NotificationListener) PropagateVirtualSelectedParentChainChangedNotifications() {
|
|
nl.propagateVirtualSelectedParentChainChangedNotifications = true
|
|
}
|
|
|
|
// PropagateFinalityConflictNotifications instructs the listener to send finality conflict notifications
|
|
// to the remote listener
|
|
func (nl *NotificationListener) PropagateFinalityConflictNotifications() {
|
|
nl.propagateFinalityConflictNotifications = true
|
|
}
|
|
|
|
// PropagateFinalityConflictResolvedNotifications instructs the listener to send finality conflict resolved notifications
|
|
// to the remote listener
|
|
func (nl *NotificationListener) PropagateFinalityConflictResolvedNotifications() {
|
|
nl.propagateFinalityConflictResolvedNotifications = true
|
|
}
|
|
|
|
// PropagateUTXOsChangedNotifications instructs the listener to send UTXOs changed notifications
|
|
// to the remote listener
|
|
func (nl *NotificationListener) PropagateUTXOsChangedNotifications(addresses []*UTXOsChangedNotificationAddress) {
|
|
nl.propagateUTXOsChangedNotifications = true
|
|
nl.propagateUTXOsChangedNotificationAddresses = addresses
|
|
}
|
|
|
|
func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
|
|
utxoChanges *utxoindex.UTXOChanges) *appmessage.UTXOsChangedNotificationMessage {
|
|
|
|
notification := &appmessage.UTXOsChangedNotificationMessage{}
|
|
for _, listenerAddress := range nl.propagateUTXOsChangedNotificationAddresses {
|
|
listenerScriptPublicKeyString := listenerAddress.ScriptPublicKeyString
|
|
if addedPairs, ok := utxoChanges.Added[listenerScriptPublicKeyString]; ok {
|
|
notification.Added = ConvertUTXOOutpointEntryPairsToUTXOsByAddressesEntries(listenerAddress.Address, addedPairs)
|
|
}
|
|
if removedOutpoints, ok := utxoChanges.Removed[listenerScriptPublicKeyString]; ok {
|
|
for outpoint := range removedOutpoints {
|
|
notification.Removed = append(notification.Removed, &appmessage.UTXOsByAddressesEntry{
|
|
Address: listenerAddress.Address,
|
|
Outpoint: &appmessage.RPCOutpoint{
|
|
TransactionID: hex.EncodeToString(outpoint.TransactionID[:]),
|
|
Index: outpoint.Index,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
}
|
|
return notification
|
|
}
|
|
|
|
// PropagateVirtualSelectedParentBlueScoreChangedNotifications instructs the listener to send
|
|
// virtual selected parent blue score notifications to the remote listener
|
|
func (nl *NotificationListener) PropagateVirtualSelectedParentBlueScoreChangedNotifications() {
|
|
nl.propagateVirtualSelectedParentBlueScoreChangedNotifications = true
|
|
}
|