diff --git a/app/rpc/manager.go b/app/rpc/manager.go index 44e2c7fca..382820bc3 100644 --- a/app/rpc/manager.go +++ b/app/rpc/manager.go @@ -223,12 +223,12 @@ func (m *Manager) notifyTXsChanged(virtualChangeSet *externalapi.VirtualChangeSe onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyTXsChanged") defer onEnd() - _, err := m.context.TXIndex.Update(virtualChangeSet) + txIndexChanges, err := m.context.TXIndex.Update(virtualChangeSet) if err != nil { return err } - return nil + return m.context.NotifyTXAcceptanceChange(txIndexChanges) } func (m *Manager) notifyPruningPointUTXOSetOverride() error { diff --git a/app/rpc/rpccontext/notificationmanager.go b/app/rpc/rpccontext/notificationmanager.go index bcd2db4d7..cc7965a3d 100644 --- a/app/rpc/rpccontext/notificationmanager.go +++ b/app/rpc/rpccontext/notificationmanager.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/kaspanet/kaspad/domain/dagconfig" + "github.com/kaspanet/kaspad/domain/txindex" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/utils/txscript" @@ -42,7 +43,9 @@ type NotificationListener struct { propagatePruningPointUTXOSetOverrideNotifications bool propagateNewBlockTemplateNotifications bool - propagateUTXOsChangedNotificationAddresses map[utxoindex.ScriptPublicKeyString]*UTXOsChangedNotificationAddress + propagateUTXOsChangedNotificationAddresses map[utxoindex.ScriptPublicKeyString]*UTXOsChangedNotificationAddress + propagateAddressesTxsNotifications TXsConfirmationChangedNotificationHolder + propagateTxsConfirmationChhangedNotifications TXsConfirmationChangedNotificationHolder includeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications bool } @@ -199,6 +202,29 @@ func (nm *NotificationManager) NotifyFinalityConflictResolved(notification *appm return nil } +// NotifyUTXOsChanged notifies the notification manager that UTXOs have been changed +func (nm *NotificationManager) NotifyTXAcceptanceChange(txChanges *txindex.TXAcceptanceChange) error { + nm.RLock() + defer nm.RUnlock() + + for router, listener := range nm.listeners { + if listener.propagateTXsConfirmationChangedNotifications { + // Filter utxoChanges and create a notification + notification, err := listener.convertUTXOChangesToUTXOsChangedNotification(utxoChanges) + if err != nil { + return err + } + + // Enqueue the notification + err = router.OutgoingRoute().MaybeEnqueue(notification) + if err != nil { + return err + } + } + } + return nil +} + // NotifyUTXOsChanged notifies the notification manager that UTXOs have been changed func (nm *NotificationManager) NotifyUTXOsChanged(utxoChanges *utxoindex.UTXOChanges) error { nm.RLock() @@ -367,6 +393,26 @@ func (nm *NotificationManager) PropagateUTXOsChangedNotifications(nl *Notificati } } +// PropagateUTXOsChangedNotifications instructs the listener to send UTXOs changed notifications +// to the remote listener for the given addresses. Subsequent calls instruct the listener to +// send UTXOs changed notifications for those addresses along with the old ones. Duplicate addresses +// are ignored. +func (nm *NotificationManager) PropagateTXsConfirmationChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) { + // Apply a write-lock since the internal listener address map is modified + nm.Lock() + defer nm.Unlock() + + if !nl.propagateUTXOsChangedNotifications { + nl.propagateUTXOsChangedNotifications = true + nl.propagateUTXOsChangedNotificationAddresses = + make(map[utxoindex.ScriptPublicKeyString]*UTXOsChangedNotificationAddress, len(addresses)) + } + + for _, address := range addresses { + nl.propagateUTXOsChangedNotificationAddresses[address.ScriptPublicKeyString] = address + } +} + // StopPropagatingUTXOsChangedNotifications instructs the listener to stop sending UTXOs // changed notifications to the remote listener for the given addresses. Addresses for which // notifications are not currently sent are ignored. diff --git a/app/rpc/rpccontext/tx_changed.go b/app/rpc/rpccontext/tx_changed.go new file mode 100644 index 000000000..576603fc8 --- /dev/null +++ b/app/rpc/rpccontext/tx_changed.go @@ -0,0 +1,85 @@ +package rpccontext + +import ( + "github.com/kaspanet/kaspad/app/appmessage" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/kaspanet/kaspad/domain/txindex" +) + +// TXsConfirmationChanged represents information for the TXsConfirmationChanged listener. +// This type is meant to be used in TXsChanged notifications +type TXsConfirmationChangedNotificationState struct { + RequiredConfirmations uint32 + IncludePending bool + RegisteredTxsToBlueScore txindex.TxIDsToBlueScores + UnregesiteredTxsBlueScore map[externalapi.DomainTransactionID]uint64 //this is bluescore when txid was either a) inserted into listener, or b) removed from listener +} + +func (ctx *Context) NewTXsConfirmationChangedNotificationState(txIds []*externalapi.DomainTransactionID, requiredConfirmations uint32, + includePending bool) (*TXsConfirmationChangedNotificationState, error) { + registeredTxsToBlueScore, NotFound, err := ctx.TXIndex.GetTXsBlueScores(txIds) + if err != nil { + return nil, err + } + virtualInfo, err := ctx.Domain.Consensus().GetVirtualInfo() + if err != nil { + return nil, err + } + + unregesiteredTxsBlueScore := make(txindex.TxIDsToBlueScores, len(NotFound)) + for _, txID := range NotFound { + unregesiteredTxsBlueScore[*txID] = virtualInfo.BlueScore + } + return &TXsConfirmationChangedNotificationState{ + RequiredConfirmations: requiredConfirmations, + IncludePending: includePending, + RegisteredTxsToBlueScore: registeredTxsToBlueScore, + UnregesiteredTxsBlueScore: unregesiteredTxsBlueScore, + }, nil +} + +func (tcc *TXsConfirmationChangedNotificationState) updateStateAndExtractConfirmations(txAcceptanceChange *txindex.TXAcceptanceChange) ( + pending []*appmessage.TxIDConfirmationsPair, confirmed []*appmessage.TxIDConfirmationsPair, unconfirmed []*appmessage.TxIDConfirmationsPair) { + + pending = make([]*appmessage.TxIDConfirmationsPair, 0) + confirmed = make([]*appmessage.TxIDConfirmationsPair, 0) + unconfirmed = make([]*appmessage.TxIDConfirmationsPair, 0) + + for txID := range txAcceptanceChange.Removed { + _, found := tcc.RegisteredTxsToBlueScore[txID] + if found { + delete(tcc.RegisteredTxsToBlueScore, txID) + tcc.UnregesiteredTxsBlueScore[txID] = txAcceptanceChange.BlueScore + } + } + for txID := range txAcceptanceChange.Added { + _, found := tcc.UnregesiteredTxsBlueScore[txID] + if !found { + delete(tcc.UnregesiteredTxsBlueScore, txID) + tcc.RegisteredTxsToBlueScore[txID] = txAcceptanceChange.BlueScore + } + } + + for txID, txBluescore := range tcc.RegisteredTxsToBlueScore { + confirmations := txAcceptanceChange.BlueScore - txBluescore + if confirmations >= tcc.RequiredConfirmations { + confirmed = append(confirmed, &appmessage.TxIDConfirmationsPair{TxID: txID, Confirmations: confirmations}) + } else if tcc.IncludePending { + pending = append(pending, &appmessage.TxIDConfirmationsPair{TxID: txID, Confirmations: confirmations}) + } + } + + for txID, txBluescore := range tcc.UnregesiteredTxsBlueScore { + unconfirmations := txAcceptanceChange.BlueScore - txBluescore + if unconfirmations >= tcc.RequiredConfirmations { + unconfirmed = append(unconfirmed, &appmessage.TxIDConfirmationsPair{TxID: txID, Confirmations: unconfirmations}) + delete(tcc.UnregesiteredTxsBlueScore, txID) + } + } + + if tcc.IncludePending { + return pending, confirmed, unconfirmed + } + + return nil, confirmed, unconfirmed +} \ No newline at end of file diff --git a/app/rpc/rpchandlers/modify_notifying_addresses_txs.go b/app/rpc/rpchandlers/modify_notifying_addresses_txs.go new file mode 100644 index 000000000..e3d306e91 --- /dev/null +++ b/app/rpc/rpchandlers/modify_notifying_addresses_txs.go @@ -0,0 +1,49 @@ +package appmessage + +// ModifyNotifyingAddressesTxsRequestMessage is an appmessage corresponding to +// its respective RPC message +type ModifyNotifyingAddressesTxsRequestMessage struct { + baseMessage + AddAddresses []string + RemoveAddresses []string + RequiredConfirmations uint32 + IncludePending bool + IncludeSending bool + IncludeReceiving bool +} + +// Command returns the protocol command string for the message +func (msg *ModifyNotifyingAddressesTxsRequestMessage) Command() MessageCommand { + return CmdModifyNotifyingAddressesTxsRequestMessage +} + +// NewModifyNotifyingAddressesTxsRequestMessage returns a instance of the message +func NewModifyNotifyingAddressesTxsRequestMessage(addAddresses []string, removeAddresses []string, + requiredConfirmations uint32, includePending bool, includeSending bool, + includeReceiving bool) *ModifyNotifyingAddressesTxsRequestMessage { + return &ModifyNotifyingAddressesTxsRequestMessage{ + AddAddresses: addAddresses, + RemoveAddresses: removeAddresses, + RequiredConfirmations: requiredConfirmations, + IncludePending: includePending, + IncludeSending: includeSending, + IncludeReceiving: includeReceiving, + } +} + +// ModifyNotifyingAddressesTxsResponseMessage is an appmessage corresponding to +// its respective RPC message +type ModifyNotifyingAddressesTxsResponseMessage struct { + baseMessage + Error *RPCError +} + +// Command returns the protocol command string for the message +func (msg *ModifyNotifyingAddressesTxsResponseMessage) Command() MessageCommand { + return CmdModifyNotifyingAddressesTxsResponseMessage +} + +// NewModifyNotifyingAddressesTxsResponseMessage returns a instance of the message +func NewModifyNotifyingAddressesTxsResponseMessage() *NotifyAddressesTxsResponseMessage { + return &NotifyAddressesTxsResponseMessage{} +} diff --git a/app/rpc/rpchandlers/modify_notifying_txs_confirmation_changed.go b/app/rpc/rpchandlers/modify_notifying_txs_confirmation_changed.go new file mode 100644 index 000000000..910af4db2 --- /dev/null +++ b/app/rpc/rpchandlers/modify_notifying_txs_confirmation_changed.go @@ -0,0 +1,44 @@ +package appmessage + +// ModifyNotifyingTxsConfirmationChangedRequestMessage is an appmessage corresponding to +// its respective RPC message +type ModifyNotifyingTxsConfirmationChangedRequestMessage struct { + baseMessage + AddTxIDs []string + RemoveTxIDs []string + RequiredConfirmations uint32 + IncludePending bool +} + +// Command returns the protocol command string for the message +func (msg *ModifyNotifyingTxsConfirmationChangedRequestMessage) Command() MessageCommand { + return CmdModifyNotifyingTxsConfirmationChangedRequestMessage +} + +// NewModifyNotifyingTxsConfirmationChangedRequestMessage returns a instance of the message +func NewModifyNotifyingTxsConfirmationChangedRequestMessage(addTxIDs []string, removeTxIDs []string, + requiredConfirmations uint32, includePending bool) *ModifyNotifyingTxsConfirmationChangedRequestMessage { + return &ModifyNotifyingTxsConfirmationChangedRequestMessage{ + AddTxIDs: addTxIDs, + RemoveTxIDs: removeTxIDs, + RequiredConfirmations: requiredConfirmations, + IncludePending: includePending, + } +} + +// ModifyNotifyingTxsConfirmationChangedResponseMessage is an appmessage corresponding to +// its respective RPC message +type ModifyNotifyingTxsConfirmationChangedResponseMessage struct { + baseMessage + Error *RPCError +} + +// Command returns the protocol command string for the message +func (msg *ModifyNotifyingTxsConfirmationChangedResponseMessage) Command() MessageCommand { + return CmdModifyNotifyingTxsConfirmationChangedResponseMessage +} + +// NewModifyNotifyingTxsChangedResponseMessage returns a instance of the message +func NewModifyNotifyingTxsChangedResponseMessage() *NotifyTxsConfirmationChangedResponseMessage { + return &NotifyTxsConfirmationChangedResponseMessage{} +} diff --git a/app/rpc/rpchandlers/notify_addresses_txs.go b/app/rpc/rpchandlers/notify_addresses_txs.go new file mode 100644 index 000000000..a23daba36 --- /dev/null +++ b/app/rpc/rpchandlers/notify_addresses_txs.go @@ -0,0 +1,87 @@ +package appmessage + +// NotifyAddressesTxsRequestMessage is an appmessage corresponding to +// its respective RPC message +type NotifyAddressesTxsRequestMessage struct { + baseMessage + Addresses []string + RequiredConfirmations uint32 + IncludePending bool + IncludeSending bool + IncludeReceiving bool +} + +// Command returns the protocol command string for the message +func (msg *NotifyAddressesTxsRequestMessage) Command() MessageCommand { + return CmdNotifyAddressesTxsRequestMessage +} + +// NewNotifyAddressesTxsRequestMessage returns a instance of the message +func NewNotifyAddressesTxsRequestMessage(addresses []string, requiredConfirmations uint32, + includePending bool, includeSending bool, includeReceiving bool) *NotifyAddressesTxsRequestMessage { + return &NotifyAddressesTxsRequestMessage{ + Addresses: addresses, + RequiredConfirmations: requiredConfirmations, + IncludePending: includePending, + IncludeSending: includeSending, + IncludeReceiving: includeReceiving, + } +} + +// NotifyAddressesTxsResponseMessage is an appmessage corresponding to +// its respective RPC message +type NotifyAddressesTxsResponseMessage struct { + baseMessage + Error *RPCError +} + +// Command returns the protocol command string for the message +func (msg *NotifyAddressesTxsResponseMessage) Command() MessageCommand { + return CmdNotifyAddressesTxsResponseMessage +} + +// NewNotifyAddressesTxsResponseMessage returns a instance of the message +func NewNotifyAddressesTxsResponseMessage() *NotifyAddressesTxsResponseMessage { + return &NotifyAddressesTxsResponseMessage{} +} + +// AddressesTxsNotificationMessage is an appmessage corresponding to +// its respective RPC message +type AddressesTxsNotificationMessage struct { + baseMessage + RequiredConfirmations uint32 + Pending *TxEntriesByAddresses + Confirmed *TxEntriesByAddresses + Unconfirmed *TxEntriesByAddresses +} + +// Command returns the protocol command string for the message +func (msg *AddressesTxsNotificationMessage) Command() MessageCommand { + return CmdAddressesTxsNotificationMessage +} + +// NewAddressesTxsNotificationMessage returns a instance of the message +func NewAddressesTxsNotificationMessage(requiredConfirmations uint32, pending *TxEntriesByAddresses, + confirmed *TxEntriesByAddresses, unconfirmed *TxEntriesByAddresses) *AddressesTxsNotificationMessage { + return &AddressesTxsNotificationMessage{ + RequiredConfirmations: requiredConfirmations, + Pending: pending, + Confirmed: confirmed, + Unconfirmed: unconfirmed, + } +} + +// TxEntriesByAddresses is an appmessage corresponding to +// its respective RPC message +type TxEntriesByAddresses struct { + Sent []*TxEntryByAddress + Received []*TxEntryByAddress +} + +// TxEntryByAddress is an appmessage corresponding to +// its respective RPC message +type TxEntryByAddress struct { + Address string + TxID string + Confirmations uint32 +} diff --git a/app/rpc/rpchandlers/notify_txs_confirmation_changed.go b/app/rpc/rpchandlers/notify_txs_confirmation_changed.go new file mode 100644 index 000000000..63d28bcb5 --- /dev/null +++ b/app/rpc/rpchandlers/notify_txs_confirmation_changed.go @@ -0,0 +1,68 @@ +package appmessage + +// NotifyTxsConfirmationChangedRequestMessage is an appmessage corresponding to +// its respective RPC message +type NotifyTxsConfirmationChangedRequestMessage struct { + baseMessage + TxIDs []string + RequiredConfirmations uint32 + IncludePending bool +} + +// Command returns the protocol command string for the message +func (msg *NotifyTxsConfirmationChangedRequestMessage) Command() MessageCommand { + return CmdNotifyTxsConfirmationChangedRequestMessage +} + +// NewNotifyTxsConfirmationChangedRequestMessage returns a instance of the message +func NewNotifyTxsConfirmationChangedRequestMessage(TxIDs []string, requiredConfirmations uint32, + includePending bool) *NotifyTxsConfirmationChangedRequestMessage { + return &NotifyTxsConfirmationChangedRequestMessage{ + TxIDs: TxIDs, + RequiredConfirmations: requiredConfirmations, + IncludePending: includePending, + } +} + +// NotifyTxsConfirmationChangedResponseMessage is an appmessage corresponding to +// its respective RPC message +type NotifyTxsConfirmationChangedResponseMessage struct { + baseMessage + Error *RPCError +} + +// Command returns the protocol command string for the message +func (msg *NotifyTxsConfirmationChangedResponseMessage) Command() MessageCommand { + return CmdNotifyTxsConfirmationChangedResponseMessage +} + +// NewNotifyTxsChangedResponseMessage returns a instance of the message +func NewNotifyTxsChangedResponseMessage() *NotifyTxsConfirmationChangedResponseMessage { + return &NotifyTxsConfirmationChangedResponseMessage{} +} + +// TxsConfirmationChangedNotificationMessage is an appmessage corresponding to +// its respective RPC message +type TxsConfirmationChangedNotificationMessage struct { + baseMessage + RequiredConfirmations uint32 + Pending []*TxIDConfirmationsPair + Confirmed []*TxIDConfirmationsPair + UnconfirmedTxIds []string +} + +// Command returns the protocol command string for the message +func (msg *TxsConfirmationChangedNotificationMessage) Command() MessageCommand { + return CmdTxsConfirmationChangedNotificationMessage +} + +// NewTxsChangedNotificationMessage returns a instance of the message +func NewTxsChangedNotificationMessage(requiredConfirmations uint32, pending []*TxIDConfirmationsPair, + confirmed []*TxIDConfirmationsPair, unconfirmedTxIds []string) *TxsConfirmationChangedNotificationMessage { + return &TxsConfirmationChangedNotificationMessage{ + RequiredConfirmations: requiredConfirmations, + Pending: pending, + Confirmed: confirmed, + UnconfirmedTxIds: unconfirmedTxIds, + } +} diff --git a/domain/txindex/txindex.go b/domain/txindex/txindex.go index dba05af1c..862b9c078 100644 --- a/domain/txindex/txindex.go +++ b/domain/txindex/txindex.go @@ -187,7 +187,12 @@ func (ti *TXIndex) addTXIDs(selectedParentChainChanges *externalapi.SelectedChai if err != nil { return err } - for i, acceptingBlock := range chainBlocksChunk { + for i, acceptingBlockHash := range chainBlocksChunk { + acceptingBlockHeader, err := ti.domain.Consensus().GetBlockHeader(acceptingBlockHash) + if err != nil { + return err + } + acceptingBlueScore := acceptingBlockHeader.BlueScore() chainBlockAcceptanceData := chainBlocksAcceptanceData[i] for _, blockAcceptanceData := range chainBlockAcceptanceData { for j, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData { @@ -199,9 +204,9 @@ func (ti *TXIndex) addTXIDs(selectedParentChainChanges *externalapi.SelectedChai transactionID := consensushashing.TransactionID(transactionAcceptanceData.Transaction) ti.store.add( *transactionID, - uint32(j), //index of including block where transaction is found - blockAcceptanceData.BlockHash, //this is the including block - acceptingBlock, //this is the accepting block + uint32(j), // index of including block where transaction is found + blockAcceptanceData.BlockHash, // this is the including block + acceptingBlockHash, // this is the accepting block ) } }