start work on listening logic

This commit is contained in:
D-Stacks 2022-10-16 17:12:09 +02:00
parent ee09eb568a
commit cdcdfc4269
8 changed files with 391 additions and 7 deletions

View File

@ -223,12 +223,12 @@ func (m *Manager) notifyTXsChanged(virtualChangeSet *externalapi.VirtualChangeSe
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyTXsChanged")
defer onEnd()
_, err := m.context.TXIndex.Update(virtualChangeSet)
txIndexChanges, err := m.context.TXIndex.Update(virtualChangeSet)
if err != nil {
return err
}
return nil
return m.context.NotifyTXAcceptanceChange(txIndexChanges)
}
func (m *Manager) notifyPruningPointUTXOSetOverride() error {

View File

@ -4,6 +4,7 @@ import (
"sync"
"github.com/kaspanet/kaspad/domain/dagconfig"
"github.com/kaspanet/kaspad/domain/txindex"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/txscript"
@ -42,7 +43,9 @@ type NotificationListener struct {
propagatePruningPointUTXOSetOverrideNotifications bool
propagateNewBlockTemplateNotifications bool
propagateUTXOsChangedNotificationAddresses map[utxoindex.ScriptPublicKeyString]*UTXOsChangedNotificationAddress
propagateUTXOsChangedNotificationAddresses map[utxoindex.ScriptPublicKeyString]*UTXOsChangedNotificationAddress
propagateAddressesTxsNotifications TXsConfirmationChangedNotificationHolder
propagateTxsConfirmationChhangedNotifications TXsConfirmationChangedNotificationHolder
includeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications bool
}
@ -199,6 +202,29 @@ func (nm *NotificationManager) NotifyFinalityConflictResolved(notification *appm
return nil
}
// NotifyUTXOsChanged notifies the notification manager that UTXOs have been changed
func (nm *NotificationManager) NotifyTXAcceptanceChange(txChanges *txindex.TXAcceptanceChange) error {
nm.RLock()
defer nm.RUnlock()
for router, listener := range nm.listeners {
if listener.propagateTXsConfirmationChangedNotifications {
// Filter utxoChanges and create a notification
notification, err := listener.convertUTXOChangesToUTXOsChangedNotification(utxoChanges)
if err != nil {
return err
}
// Enqueue the notification
err = router.OutgoingRoute().MaybeEnqueue(notification)
if err != nil {
return err
}
}
}
return nil
}
// NotifyUTXOsChanged notifies the notification manager that UTXOs have been changed
func (nm *NotificationManager) NotifyUTXOsChanged(utxoChanges *utxoindex.UTXOChanges) error {
nm.RLock()
@ -367,6 +393,26 @@ func (nm *NotificationManager) PropagateUTXOsChangedNotifications(nl *Notificati
}
}
// PropagateUTXOsChangedNotifications instructs the listener to send UTXOs changed notifications
// to the remote listener for the given addresses. Subsequent calls instruct the listener to
// send UTXOs changed notifications for those addresses along with the old ones. Duplicate addresses
// are ignored.
func (nm *NotificationManager) PropagateTXsConfirmationChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) {
// Apply a write-lock since the internal listener address map is modified
nm.Lock()
defer nm.Unlock()
if !nl.propagateUTXOsChangedNotifications {
nl.propagateUTXOsChangedNotifications = true
nl.propagateUTXOsChangedNotificationAddresses =
make(map[utxoindex.ScriptPublicKeyString]*UTXOsChangedNotificationAddress, len(addresses))
}
for _, address := range addresses {
nl.propagateUTXOsChangedNotificationAddresses[address.ScriptPublicKeyString] = address
}
}
// StopPropagatingUTXOsChangedNotifications instructs the listener to stop sending UTXOs
// changed notifications to the remote listener for the given addresses. Addresses for which
// notifications are not currently sent are ignored.

View File

@ -0,0 +1,85 @@
package rpccontext
import (
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/txindex"
)
// TXsConfirmationChanged represents information for the TXsConfirmationChanged listener.
// This type is meant to be used in TXsChanged notifications
type TXsConfirmationChangedNotificationState struct {
RequiredConfirmations uint32
IncludePending bool
RegisteredTxsToBlueScore txindex.TxIDsToBlueScores
UnregesiteredTxsBlueScore map[externalapi.DomainTransactionID]uint64 //this is bluescore when txid was either a) inserted into listener, or b) removed from listener
}
func (ctx *Context) NewTXsConfirmationChangedNotificationState(txIds []*externalapi.DomainTransactionID, requiredConfirmations uint32,
includePending bool) (*TXsConfirmationChangedNotificationState, error) {
registeredTxsToBlueScore, NotFound, err := ctx.TXIndex.GetTXsBlueScores(txIds)
if err != nil {
return nil, err
}
virtualInfo, err := ctx.Domain.Consensus().GetVirtualInfo()
if err != nil {
return nil, err
}
unregesiteredTxsBlueScore := make(txindex.TxIDsToBlueScores, len(NotFound))
for _, txID := range NotFound {
unregesiteredTxsBlueScore[*txID] = virtualInfo.BlueScore
}
return &TXsConfirmationChangedNotificationState{
RequiredConfirmations: requiredConfirmations,
IncludePending: includePending,
RegisteredTxsToBlueScore: registeredTxsToBlueScore,
UnregesiteredTxsBlueScore: unregesiteredTxsBlueScore,
}, nil
}
func (tcc *TXsConfirmationChangedNotificationState) updateStateAndExtractConfirmations(txAcceptanceChange *txindex.TXAcceptanceChange) (
pending []*appmessage.TxIDConfirmationsPair, confirmed []*appmessage.TxIDConfirmationsPair, unconfirmed []*appmessage.TxIDConfirmationsPair) {
pending = make([]*appmessage.TxIDConfirmationsPair, 0)
confirmed = make([]*appmessage.TxIDConfirmationsPair, 0)
unconfirmed = make([]*appmessage.TxIDConfirmationsPair, 0)
for txID := range txAcceptanceChange.Removed {
_, found := tcc.RegisteredTxsToBlueScore[txID]
if found {
delete(tcc.RegisteredTxsToBlueScore, txID)
tcc.UnregesiteredTxsBlueScore[txID] = txAcceptanceChange.BlueScore
}
}
for txID := range txAcceptanceChange.Added {
_, found := tcc.UnregesiteredTxsBlueScore[txID]
if !found {
delete(tcc.UnregesiteredTxsBlueScore, txID)
tcc.RegisteredTxsToBlueScore[txID] = txAcceptanceChange.BlueScore
}
}
for txID, txBluescore := range tcc.RegisteredTxsToBlueScore {
confirmations := txAcceptanceChange.BlueScore - txBluescore
if confirmations >= tcc.RequiredConfirmations {
confirmed = append(confirmed, &appmessage.TxIDConfirmationsPair{TxID: txID, Confirmations: confirmations})
} else if tcc.IncludePending {
pending = append(pending, &appmessage.TxIDConfirmationsPair{TxID: txID, Confirmations: confirmations})
}
}
for txID, txBluescore := range tcc.UnregesiteredTxsBlueScore {
unconfirmations := txAcceptanceChange.BlueScore - txBluescore
if unconfirmations >= tcc.RequiredConfirmations {
unconfirmed = append(unconfirmed, &appmessage.TxIDConfirmationsPair{TxID: txID, Confirmations: unconfirmations})
delete(tcc.UnregesiteredTxsBlueScore, txID)
}
}
if tcc.IncludePending {
return pending, confirmed, unconfirmed
}
return nil, confirmed, unconfirmed
}

View File

@ -0,0 +1,49 @@
package appmessage
// ModifyNotifyingAddressesTxsRequestMessage is an appmessage corresponding to
// its respective RPC message
type ModifyNotifyingAddressesTxsRequestMessage struct {
baseMessage
AddAddresses []string
RemoveAddresses []string
RequiredConfirmations uint32
IncludePending bool
IncludeSending bool
IncludeReceiving bool
}
// Command returns the protocol command string for the message
func (msg *ModifyNotifyingAddressesTxsRequestMessage) Command() MessageCommand {
return CmdModifyNotifyingAddressesTxsRequestMessage
}
// NewModifyNotifyingAddressesTxsRequestMessage returns a instance of the message
func NewModifyNotifyingAddressesTxsRequestMessage(addAddresses []string, removeAddresses []string,
requiredConfirmations uint32, includePending bool, includeSending bool,
includeReceiving bool) *ModifyNotifyingAddressesTxsRequestMessage {
return &ModifyNotifyingAddressesTxsRequestMessage{
AddAddresses: addAddresses,
RemoveAddresses: removeAddresses,
RequiredConfirmations: requiredConfirmations,
IncludePending: includePending,
IncludeSending: includeSending,
IncludeReceiving: includeReceiving,
}
}
// ModifyNotifyingAddressesTxsResponseMessage is an appmessage corresponding to
// its respective RPC message
type ModifyNotifyingAddressesTxsResponseMessage struct {
baseMessage
Error *RPCError
}
// Command returns the protocol command string for the message
func (msg *ModifyNotifyingAddressesTxsResponseMessage) Command() MessageCommand {
return CmdModifyNotifyingAddressesTxsResponseMessage
}
// NewModifyNotifyingAddressesTxsResponseMessage returns a instance of the message
func NewModifyNotifyingAddressesTxsResponseMessage() *NotifyAddressesTxsResponseMessage {
return &NotifyAddressesTxsResponseMessage{}
}

View File

@ -0,0 +1,44 @@
package appmessage
// ModifyNotifyingTxsConfirmationChangedRequestMessage is an appmessage corresponding to
// its respective RPC message
type ModifyNotifyingTxsConfirmationChangedRequestMessage struct {
baseMessage
AddTxIDs []string
RemoveTxIDs []string
RequiredConfirmations uint32
IncludePending bool
}
// Command returns the protocol command string for the message
func (msg *ModifyNotifyingTxsConfirmationChangedRequestMessage) Command() MessageCommand {
return CmdModifyNotifyingTxsConfirmationChangedRequestMessage
}
// NewModifyNotifyingTxsConfirmationChangedRequestMessage returns a instance of the message
func NewModifyNotifyingTxsConfirmationChangedRequestMessage(addTxIDs []string, removeTxIDs []string,
requiredConfirmations uint32, includePending bool) *ModifyNotifyingTxsConfirmationChangedRequestMessage {
return &ModifyNotifyingTxsConfirmationChangedRequestMessage{
AddTxIDs: addTxIDs,
RemoveTxIDs: removeTxIDs,
RequiredConfirmations: requiredConfirmations,
IncludePending: includePending,
}
}
// ModifyNotifyingTxsConfirmationChangedResponseMessage is an appmessage corresponding to
// its respective RPC message
type ModifyNotifyingTxsConfirmationChangedResponseMessage struct {
baseMessage
Error *RPCError
}
// Command returns the protocol command string for the message
func (msg *ModifyNotifyingTxsConfirmationChangedResponseMessage) Command() MessageCommand {
return CmdModifyNotifyingTxsConfirmationChangedResponseMessage
}
// NewModifyNotifyingTxsChangedResponseMessage returns a instance of the message
func NewModifyNotifyingTxsChangedResponseMessage() *NotifyTxsConfirmationChangedResponseMessage {
return &NotifyTxsConfirmationChangedResponseMessage{}
}

View File

@ -0,0 +1,87 @@
package appmessage
// NotifyAddressesTxsRequestMessage is an appmessage corresponding to
// its respective RPC message
type NotifyAddressesTxsRequestMessage struct {
baseMessage
Addresses []string
RequiredConfirmations uint32
IncludePending bool
IncludeSending bool
IncludeReceiving bool
}
// Command returns the protocol command string for the message
func (msg *NotifyAddressesTxsRequestMessage) Command() MessageCommand {
return CmdNotifyAddressesTxsRequestMessage
}
// NewNotifyAddressesTxsRequestMessage returns a instance of the message
func NewNotifyAddressesTxsRequestMessage(addresses []string, requiredConfirmations uint32,
includePending bool, includeSending bool, includeReceiving bool) *NotifyAddressesTxsRequestMessage {
return &NotifyAddressesTxsRequestMessage{
Addresses: addresses,
RequiredConfirmations: requiredConfirmations,
IncludePending: includePending,
IncludeSending: includeSending,
IncludeReceiving: includeReceiving,
}
}
// NotifyAddressesTxsResponseMessage is an appmessage corresponding to
// its respective RPC message
type NotifyAddressesTxsResponseMessage struct {
baseMessage
Error *RPCError
}
// Command returns the protocol command string for the message
func (msg *NotifyAddressesTxsResponseMessage) Command() MessageCommand {
return CmdNotifyAddressesTxsResponseMessage
}
// NewNotifyAddressesTxsResponseMessage returns a instance of the message
func NewNotifyAddressesTxsResponseMessage() *NotifyAddressesTxsResponseMessage {
return &NotifyAddressesTxsResponseMessage{}
}
// AddressesTxsNotificationMessage is an appmessage corresponding to
// its respective RPC message
type AddressesTxsNotificationMessage struct {
baseMessage
RequiredConfirmations uint32
Pending *TxEntriesByAddresses
Confirmed *TxEntriesByAddresses
Unconfirmed *TxEntriesByAddresses
}
// Command returns the protocol command string for the message
func (msg *AddressesTxsNotificationMessage) Command() MessageCommand {
return CmdAddressesTxsNotificationMessage
}
// NewAddressesTxsNotificationMessage returns a instance of the message
func NewAddressesTxsNotificationMessage(requiredConfirmations uint32, pending *TxEntriesByAddresses,
confirmed *TxEntriesByAddresses, unconfirmed *TxEntriesByAddresses) *AddressesTxsNotificationMessage {
return &AddressesTxsNotificationMessage{
RequiredConfirmations: requiredConfirmations,
Pending: pending,
Confirmed: confirmed,
Unconfirmed: unconfirmed,
}
}
// TxEntriesByAddresses is an appmessage corresponding to
// its respective RPC message
type TxEntriesByAddresses struct {
Sent []*TxEntryByAddress
Received []*TxEntryByAddress
}
// TxEntryByAddress is an appmessage corresponding to
// its respective RPC message
type TxEntryByAddress struct {
Address string
TxID string
Confirmations uint32
}

View File

@ -0,0 +1,68 @@
package appmessage
// NotifyTxsConfirmationChangedRequestMessage is an appmessage corresponding to
// its respective RPC message
type NotifyTxsConfirmationChangedRequestMessage struct {
baseMessage
TxIDs []string
RequiredConfirmations uint32
IncludePending bool
}
// Command returns the protocol command string for the message
func (msg *NotifyTxsConfirmationChangedRequestMessage) Command() MessageCommand {
return CmdNotifyTxsConfirmationChangedRequestMessage
}
// NewNotifyTxsConfirmationChangedRequestMessage returns a instance of the message
func NewNotifyTxsConfirmationChangedRequestMessage(TxIDs []string, requiredConfirmations uint32,
includePending bool) *NotifyTxsConfirmationChangedRequestMessage {
return &NotifyTxsConfirmationChangedRequestMessage{
TxIDs: TxIDs,
RequiredConfirmations: requiredConfirmations,
IncludePending: includePending,
}
}
// NotifyTxsConfirmationChangedResponseMessage is an appmessage corresponding to
// its respective RPC message
type NotifyTxsConfirmationChangedResponseMessage struct {
baseMessage
Error *RPCError
}
// Command returns the protocol command string for the message
func (msg *NotifyTxsConfirmationChangedResponseMessage) Command() MessageCommand {
return CmdNotifyTxsConfirmationChangedResponseMessage
}
// NewNotifyTxsChangedResponseMessage returns a instance of the message
func NewNotifyTxsChangedResponseMessage() *NotifyTxsConfirmationChangedResponseMessage {
return &NotifyTxsConfirmationChangedResponseMessage{}
}
// TxsConfirmationChangedNotificationMessage is an appmessage corresponding to
// its respective RPC message
type TxsConfirmationChangedNotificationMessage struct {
baseMessage
RequiredConfirmations uint32
Pending []*TxIDConfirmationsPair
Confirmed []*TxIDConfirmationsPair
UnconfirmedTxIds []string
}
// Command returns the protocol command string for the message
func (msg *TxsConfirmationChangedNotificationMessage) Command() MessageCommand {
return CmdTxsConfirmationChangedNotificationMessage
}
// NewTxsChangedNotificationMessage returns a instance of the message
func NewTxsChangedNotificationMessage(requiredConfirmations uint32, pending []*TxIDConfirmationsPair,
confirmed []*TxIDConfirmationsPair, unconfirmedTxIds []string) *TxsConfirmationChangedNotificationMessage {
return &TxsConfirmationChangedNotificationMessage{
RequiredConfirmations: requiredConfirmations,
Pending: pending,
Confirmed: confirmed,
UnconfirmedTxIds: unconfirmedTxIds,
}
}

View File

@ -187,7 +187,12 @@ func (ti *TXIndex) addTXIDs(selectedParentChainChanges *externalapi.SelectedChai
if err != nil {
return err
}
for i, acceptingBlock := range chainBlocksChunk {
for i, acceptingBlockHash := range chainBlocksChunk {
acceptingBlockHeader, err := ti.domain.Consensus().GetBlockHeader(acceptingBlockHash)
if err != nil {
return err
}
acceptingBlueScore := acceptingBlockHeader.BlueScore()
chainBlockAcceptanceData := chainBlocksAcceptanceData[i]
for _, blockAcceptanceData := range chainBlockAcceptanceData {
for j, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData {
@ -199,9 +204,9 @@ func (ti *TXIndex) addTXIDs(selectedParentChainChanges *externalapi.SelectedChai
transactionID := consensushashing.TransactionID(transactionAcceptanceData.Transaction)
ti.store.add(
*transactionID,
uint32(j), //index of including block where transaction is found
blockAcceptanceData.BlockHash, //this is the including block
acceptingBlock, //this is the accepting block
uint32(j), // index of including block where transaction is found
blockAcceptanceData.BlockHash, // this is the including block
acceptingBlockHash, // this is the accepting block
)
}
}