From 94ddf0aab06d069b69ae4d41af008bd3385fd710 Mon Sep 17 00:00:00 2001 From: Mike Zak Date: Sun, 6 Jun 2021 12:11:33 +0300 Subject: [PATCH] Added model and stubs for all main methods --- .../handle_relayed_transactions.go | 8 +- app/rpc/rpchandlers/submit_transaction.go | 4 +- domain/miningmanager/factory.go | 2 +- domain/miningmanager/mempool/mempool.go | 1042 +--------------- .../miningmanager/mempool/mempool_utxo_set.go | 17 + domain/miningmanager/mempool/model.go | 18 + domain/miningmanager/mempool/orphan_pool.go | 35 + .../mempool/transactions_pool.go | 26 + .../{mempool => mempool_old}/README.md | 0 .../{mempool => mempool_old}/doc.go | 8 +- .../{mempool => mempool_old}/error.go | 2 +- domain/miningmanager/mempool_old/log.go | 11 + domain/miningmanager/mempool_old/mempool.go | 1045 +++++++++++++++++ .../mempool_utxoset.go | 2 +- .../{mempool => mempool_old}/policy.go | 3 +- .../{mempool => mempool_old}/policy_test.go | 2 +- domain/miningmanager/miningmanager_test.go | 7 +- 17 files changed, 1193 insertions(+), 1039 deletions(-) create mode 100644 domain/miningmanager/mempool/mempool_utxo_set.go create mode 100644 domain/miningmanager/mempool/model.go create mode 100644 domain/miningmanager/mempool/orphan_pool.go create mode 100644 domain/miningmanager/mempool/transactions_pool.go rename domain/miningmanager/{mempool => mempool_old}/README.md (100%) rename domain/miningmanager/{mempool => mempool_old}/doc.go (93%) rename domain/miningmanager/{mempool => mempool_old}/error.go (99%) create mode 100644 domain/miningmanager/mempool_old/log.go create mode 100644 domain/miningmanager/mempool_old/mempool.go rename domain/miningmanager/{mempool => mempool_old}/mempool_utxoset.go (99%) rename domain/miningmanager/{mempool => mempool_old}/policy.go (99%) rename domain/miningmanager/{mempool => mempool_old}/policy_test.go (99%) diff --git a/app/protocol/flows/transactionrelay/handle_relayed_transactions.go b/app/protocol/flows/transactionrelay/handle_relayed_transactions.go index 4558978c1..3cbd48938 100644 --- a/app/protocol/flows/transactionrelay/handle_relayed_transactions.go +++ b/app/protocol/flows/transactionrelay/handle_relayed_transactions.go @@ -7,7 +7,7 @@ import ( "github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" - "github.com/kaspanet/kaspad/domain/miningmanager/mempool" + "github.com/kaspanet/kaspad/domain/miningmanager/mempool_old" "github.com/kaspanet/kaspad/infrastructure/network/netadapter" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" @@ -175,14 +175,14 @@ func (flow *handleRelayedTransactionsFlow) receiveTransactions(requestedTransact err = flow.Domain().MiningManager().ValidateAndInsertTransaction(tx, true) if err != nil { - ruleErr := &mempool.RuleError{} + ruleErr := &mempool_old.RuleError{} if !errors.As(err, ruleErr) { return errors.Wrapf(err, "failed to process transaction %s", txID) } shouldBan := true - if txRuleErr := (&mempool.TxRuleError{}); errors.As(ruleErr.Err, txRuleErr) { - if txRuleErr.RejectCode != mempool.RejectInvalid { + if txRuleErr := (&mempool_old.TxRuleError{}); errors.As(ruleErr.Err, txRuleErr) { + if txRuleErr.RejectCode != mempool_old.RejectInvalid { shouldBan = false } } diff --git a/app/rpc/rpchandlers/submit_transaction.go b/app/rpc/rpchandlers/submit_transaction.go index e7174c60e..9566dc651 100644 --- a/app/rpc/rpchandlers/submit_transaction.go +++ b/app/rpc/rpchandlers/submit_transaction.go @@ -4,7 +4,7 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/rpc/rpccontext" "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" - "github.com/kaspanet/kaspad/domain/miningmanager/mempool" + "github.com/kaspanet/kaspad/domain/miningmanager/mempool_old" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/pkg/errors" ) @@ -23,7 +23,7 @@ func HandleSubmitTransaction(context *rpccontext.Context, _ *router.Router, requ transactionID := consensushashing.TransactionID(domainTransaction) err = context.ProtocolManager.AddTransaction(domainTransaction) if err != nil { - if !errors.As(err, &mempool.RuleError{}) { + if !errors.As(err, &mempool_old.RuleError{}) { return nil, err } diff --git a/domain/miningmanager/factory.go b/domain/miningmanager/factory.go index a578c9bba..1d852bfe5 100644 --- a/domain/miningmanager/factory.go +++ b/domain/miningmanager/factory.go @@ -4,7 +4,7 @@ import ( "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/dagconfig" "github.com/kaspanet/kaspad/domain/miningmanager/blocktemplatebuilder" - mempoolpkg "github.com/kaspanet/kaspad/domain/miningmanager/mempool" + mempoolpkg "github.com/kaspanet/kaspad/domain/miningmanager/mempool_old" ) // Factory instantiates new mining managers diff --git a/domain/miningmanager/mempool/mempool.go b/domain/miningmanager/mempool/mempool.go index a73dbf321..b79ef7635 100644 --- a/domain/miningmanager/mempool/mempool.go +++ b/domain/miningmanager/mempool/mempool.go @@ -1,1044 +1,44 @@ -// Copyright (c) 2013-2016 The btcsuite developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - package mempool -import ( - "container/list" - "fmt" - "github.com/kaspanet/kaspad/domain/dagconfig" - "sort" - "sync" - "time" +import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" - "github.com/kaspanet/kaspad/infrastructure/logger" - - "github.com/kaspanet/kaspad/domain/consensus/utils/constants" - - "github.com/kaspanet/kaspad/domain/consensus/utils/transactionhelper" - - consensusexternalapi "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" - "github.com/kaspanet/kaspad/domain/consensus/ruleerrors" - "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" - "github.com/kaspanet/kaspad/domain/consensus/utils/estimatedsize" - miningmanagermodel "github.com/kaspanet/kaspad/domain/miningmanager/model" - "github.com/kaspanet/kaspad/util" - "github.com/kaspanet/kaspad/util/mstime" - "github.com/pkg/errors" -) - -const ( - // orphanTTL is the maximum amount of time an orphan is allowed to - // stay in the orphan pool before it expires and is evicted during the - // next scan. - orphanTTL = time.Minute * 15 - - // orphanExpireScanInterval is the minimum amount of time in between - // scans of the orphan pool to evict expired transactions. - orphanExpireScanInterval = time.Minute * 5 -) - -// policy houses the policy (configuration parameters) which is used to -// control the mempool. -type policy struct { - // MaxTxVersion is the transaction version that the mempool should - // accept. All transactions above this version are rejected as - // non-standard. - MaxTxVersion uint16 - - // AcceptNonStd defines whether to accept non-standard transactions. If - // true, non-standard transactions will be accepted into the mempool. - // Otherwise, all non-standard transactions will be rejected. - AcceptNonStd bool - - // MaxOrphanTxs is the maximum number of orphan transactions - // that can be queued. - MaxOrphanTxs int - - // MaxOrphanTxSize is the maximum size allowed for orphan transactions. - // This helps prevent memory exhaustion attacks from sending a lot of - // of big orphans. - MaxOrphanTxSize int - - // MinRelayTxFee defines the minimum transaction fee in KAS/kB to be - // considered a non-zero fee. - MinRelayTxFee util.Amount -} - -// mempool is used as a source of transactions that need to be mined into blocks -// and relayed to other peers. It is safe for concurrent access from multiple -// peers. type mempool struct { - pool map[consensusexternalapi.DomainTransactionID]*txDescriptor - - chainedTransactions map[consensusexternalapi.DomainTransactionID]*txDescriptor - chainedTransactionByPreviousOutpoint map[consensusexternalapi.DomainOutpoint]*txDescriptor - - orphans map[consensusexternalapi.DomainTransactionID]*orphanTx - orphansByPrev map[consensusexternalapi.DomainOutpoint]map[consensusexternalapi.DomainTransactionID]*consensusexternalapi.DomainTransaction - - mempoolUTXOSet *mempoolUTXOSet - consensus consensusexternalapi.Consensus - - orderedTransactionsByFeeRate []*consensusexternalapi.DomainTransaction - - // nextExpireScan is the time after which the orphan pool will be - // scanned in order to evict orphans. This is NOT a hard deadline as - // the scan will only run when an orphan is added to the pool as opposed - // to on an unconditional timer. - nextExpireScan mstime.Time - - mtx sync.RWMutex - policy policy - dagParams *dagconfig.Params } -// New returns a new memory pool for validating and storing standalone -// transactions until they are mined into a block. -func New(consensus consensusexternalapi.Consensus, dagParams *dagconfig.Params) miningmanagermodel.Mempool { - policy := policy{ - MaxTxVersion: constants.MaxTransactionVersion, - AcceptNonStd: dagParams.RelayNonStdTxs, - MaxOrphanTxs: 5, - MaxOrphanTxSize: 100000, - MinRelayTxFee: 1000, // 1 sompi per byte - } - return &mempool{ - mtx: sync.RWMutex{}, - policy: policy, - pool: make(map[consensusexternalapi.DomainTransactionID]*txDescriptor), - chainedTransactions: make(map[consensusexternalapi.DomainTransactionID]*txDescriptor), - chainedTransactionByPreviousOutpoint: make(map[consensusexternalapi.DomainOutpoint]*txDescriptor), - orphans: make(map[consensusexternalapi.DomainTransactionID]*orphanTx), - orphansByPrev: make(map[consensusexternalapi.DomainOutpoint]map[consensusexternalapi.DomainTransactionID]*consensusexternalapi.DomainTransaction), - mempoolUTXOSet: newMempoolUTXOSet(), - consensus: consensus, - nextExpireScan: mstime.Now().Add(orphanExpireScanInterval), - dagParams: dagParams, - } +func (mp *mempool) ValidateAndInsertTransaction(transaction *externalapi.DomainTransaction, isHighPriority bool) ( + acceptedTransactions []*externalapi.DomainTransaction, err error) { + + panic("mempool.ValidateAndInsertTransaction not implemented") // TODO (Mike) } -func (mp *mempool) GetTransaction( - transactionID *consensusexternalapi.DomainTransactionID) (*consensusexternalapi.DomainTransaction, bool) { - mp.mtx.RLock() - defer mp.mtx.RUnlock() +func (mp *mempool) HandleNewBlockTransactions(transactions []*externalapi.DomainTransaction) ( + acceptedOrphans []*externalapi.DomainTransaction, err error) { - txDesc, exists := mp.fetchTxDesc(transactionID) - if !exists { - return nil, false - } - - return txDesc.DomainTransaction, true + panic("mempool.HandleNewBlockTransactions not implemented") // TODO (Mike) } -func (mp *mempool) AllTransactions() []*consensusexternalapi.DomainTransaction { - mp.mtx.RLock() - defer mp.mtx.RUnlock() - - transactions := make([]*consensusexternalapi.DomainTransaction, 0, len(mp.pool)+len(mp.chainedTransactions)) - for _, txDesc := range mp.pool { - transactions = append(transactions, txDesc.DomainTransaction) - } - - for _, txDesc := range mp.chainedTransactions { - transactions = append(transactions, txDesc.DomainTransaction) - } - - return transactions +func (mp *mempool) RemoveTransaction(transactionID *externalapi.DomainTransactionID) error { + panic("mempool.RemoveTransaction not implemented") // TODO (Mike) } -func (mp *mempool) TransactionCount() int { - mp.mtx.RLock() - defer mp.mtx.RUnlock() - - return len(mp.pool) + len(mp.chainedTransactions) +func (mp *mempool) BlockCandidateTransactions() ([]*externalapi.DomainTransaction, error) { + panic("mempool.BlockCandidateTransactions not implemented") // TODO (Mike) } -// txDescriptor is a descriptor containing a transaction in the mempool along with -// additional metadata. -type txDescriptor struct { - *consensusexternalapi.DomainTransaction - - // depCount is not 0 for a chained transaction. A chained transaction is - // one that is accepted to pool, but cannot be mined in next block because it - // depends on outputs of accepted, but still not mined transaction - depCount int +func (mp *mempool) RevalidateHighPriorityTransactions() (validTransactions []*externalapi.DomainTransaction, err error) { + panic("mempool.RevalidateHighPriorityTransactions not implemented") // TODO (Mike) } -// orphanTx is normal transaction that references an ancestor transaction -// that is not yet available. It also contains additional information related -// to it such as an expiration time to help prevent caching the orphan forever. -type orphanTx struct { - tx *consensusexternalapi.DomainTransaction - expiration mstime.Time +func (mp *mempool) validateTransactionInIsolation(transaction *externalapi.DomainTransaction) error { + panic("mempool.validateTransactionInIsolation not implemented") // TODO (Mike) } -// removeOrphan removes the passed orphan transaction from the orphan pool and -// previous orphan index. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) removeOrphan(tx *consensusexternalapi.DomainTransaction, removeRedeemers bool) { - // Nothing to do if passed tx is not an orphan. - txID := consensushashing.TransactionID(tx) - otx, exists := mp.orphans[*txID] - if !exists { - return - } - - // Remove the reference from the previous orphan index. - for _, txIn := range otx.tx.Inputs { - orphans, exists := mp.orphansByPrev[txIn.PreviousOutpoint] - if exists { - delete(orphans, *txID) - - // Remove the map entry altogether if there are no - // longer any orphans which depend on it. - if len(orphans) == 0 { - delete(mp.orphansByPrev, txIn.PreviousOutpoint) - } - } - } - - // Remove any orphans that redeem outputs from this one if requested. - if removeRedeemers { - prevOut := consensusexternalapi.DomainOutpoint{TransactionID: *txID} - for txOutIdx := range tx.Outputs { - prevOut.Index = uint32(txOutIdx) - for _, orphan := range mp.orphansByPrev[prevOut] { - mp.removeOrphan(orphan, true) - } - } - } - - // Remove the transaction from the orphan pool. - delete(mp.orphans, *txID) +func (mp *mempool) validateTransactionInContext(transaction *externalapi.DomainTransaction) error { + panic("mempool.validateTransactionInContext not implemented") // TODO (Mike) } -// limitNumOrphans limits the number of orphan transactions by evicting a random -// orphan if adding a new one would cause it to overflow the max allowed. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) limitNumOrphans() error { - // Scan through the orphan pool and remove any expired orphans when it's - // time. This is done for efficiency so the scan only happens - // periodically instead of on every orphan added to the pool. - if now := mstime.Now(); now.After(mp.nextExpireScan) { - origNumOrphans := len(mp.orphans) - for _, otx := range mp.orphans { - if now.After(otx.expiration) { - // Remove redeemers too because the missing - // parents are very unlikely to ever materialize - // since the orphan has already been around more - // than long enough for them to be delivered. - mp.removeOrphan(otx.tx, true) - } - } +func (mp *mempool) fillInputsAndGetMissingParents(transaction *externalapi.DomainTransaction) ( + parents []*mempoolTransaction, missingParents []externalapi.DomainTransactionID, err error) { - // Set next expiration scan to occur after the scan interval. - mp.nextExpireScan = now.Add(orphanExpireScanInterval) - - numOrphans := len(mp.orphans) - if numExpired := origNumOrphans - numOrphans; numExpired > 0 { - log.Debugf("Expired %d orphans (remaining: %d)", numExpired, numOrphans) - } - } - - // Nothing to do if adding another orphan will not cause the pool to - // exceed the limit. - if len(mp.orphans)+1 <= mp.policy.MaxOrphanTxs { - return nil - } - - // Remove a random entry from the map. For most compilers, Go's - // range statement iterates starting at a random item although - // that is not 100% guaranteed by the spec. The iteration order - // is not important here because an adversary would have to be - // able to pull off preimage attacks on the hashing function in - // order to target eviction of specific entries anyways. - for _, otx := range mp.orphans { - // Don't remove redeemers in the case of a random eviction since - // it is quite possible it might be needed again shortly. - mp.removeOrphan(otx.tx, false) - break - } - - return nil -} - -// addOrphan adds an orphan transaction to the orphan pool. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) addOrphan(tx *consensusexternalapi.DomainTransaction) { - // Nothing to do if no orphans are allowed. - if mp.policy.MaxOrphanTxs <= 0 { - return - } - - // Limit the number orphan transactions to prevent memory exhaustion. - // This will periodically remove any expired orphans and evict a random - // orphan if space is still needed. - mp.limitNumOrphans() - txID := consensushashing.TransactionID(tx) - mp.orphans[*txID] = &orphanTx{ - tx: tx, - expiration: mstime.Now().Add(orphanTTL), - } - for _, txIn := range tx.Inputs { - if _, exists := mp.orphansByPrev[txIn.PreviousOutpoint]; !exists { - mp.orphansByPrev[txIn.PreviousOutpoint] = - make(map[consensusexternalapi.DomainTransactionID]*consensusexternalapi.DomainTransaction) - } - mp.orphansByPrev[txIn.PreviousOutpoint][*txID] = tx - } - - log.Debugf("Stored orphan transaction %s (total: %d)", consensushashing.TransactionID(tx), - len(mp.orphans)) -} - -// maybeAddOrphan potentially adds an orphan to the orphan pool. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) maybeAddOrphan(tx *consensusexternalapi.DomainTransaction) error { - // Ignore orphan transactions that are too large. This helps avoid - // a memory exhaustion attack based on sending a lot of really large - // orphans. In the case there is a valid transaction larger than this, - // it will ultimtely be rebroadcast after the parent transactions - // have been mined or otherwise received. - // - // Note that the number of orphan transactions in the orphan pool is - // also limited, so this equates to a maximum memory used of - // mp.policy.MaxOrphanTxSize * mp.policy.MaxOrphanTxs (which is ~5MB - // using the default values at the time this comment was written). - serializedLen := estimatedsize.TransactionEstimatedSerializedSize(tx) - if serializedLen > uint64(mp.policy.MaxOrphanTxSize) { - str := fmt.Sprintf("orphan transaction size of %d bytes is "+ - "larger than max allowed size of %d bytes", - serializedLen, mp.policy.MaxOrphanTxSize) - return txRuleError(RejectNonstandard, str) - } - - // Add the orphan if the none of the above disqualified it. - mp.addOrphan(tx) - - return nil -} - -// removeOrphanDoubleSpends removes all orphans which spend outputs spent by the -// passed transaction from the orphan pool. Removing those orphans then leads -// to removing all orphans which rely on them, recursively. This is necessary -// when a transaction is added to the main pool because it may spend outputs -// that orphans also spend. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) removeOrphanDoubleSpends(tx *consensusexternalapi.DomainTransaction) { - for _, txIn := range tx.Inputs { - for _, orphan := range mp.orphansByPrev[txIn.PreviousOutpoint] { - mp.removeOrphan(orphan, true) - } - } -} - -// isTransactionInPool returns whether or not the passed transaction already -// exists in the main pool. -// -// This function MUST be called with the mempool lock held (for reads). -func (mp *mempool) isTransactionInPool(txID *consensusexternalapi.DomainTransactionID) bool { - if _, exists := mp.pool[*txID]; exists { - return true - } - return mp.isInDependPool(txID) -} - -// isInDependPool returns whether or not the passed transaction already -// exists in the depend pool. -// -// This function MUST be called with the mempool lock held (for reads). -func (mp *mempool) isInDependPool(hash *consensusexternalapi.DomainTransactionID) bool { - if _, exists := mp.chainedTransactions[*hash]; exists { - return true - } - - return false -} - -// isOrphanInPool returns whether or not the passed transaction already exists -// in the orphan pool. -// -// This function MUST be called with the mempool lock held (for reads). -func (mp *mempool) isOrphanInPool(txID *consensusexternalapi.DomainTransactionID) bool { - if _, exists := mp.orphans[*txID]; exists { - return true - } - - return false -} - -// haveTransaction returns whether or not the passed transaction already exists -// in the main pool or in the orphan pool. -// -// This function MUST be called with the mempool lock held (for reads). -func (mp *mempool) haveTransaction(txID *consensusexternalapi.DomainTransactionID) bool { - return mp.isTransactionInPool(txID) || mp.isOrphanInPool(txID) -} - -// removeTransactionsFromPool removes given transactions from the mempool, and move their chained mempool -// transactions (if any) to the main pool. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) removeTransactionsFromPool(txs []*consensusexternalapi.DomainTransaction) error { - for _, tx := range txs[transactionhelper.CoinbaseTransactionIndex+1:] { - txID := consensushashing.TransactionID(tx) - - // We use the mempool transaction, because it has populated fee and mass - mempoolTx, exists := mp.fetchTxDesc(txID) - if !exists { - continue - } - - err := mp.cleanTransactionFromSets(mempoolTx.DomainTransaction) - if err != nil { - return err - } - - mp.updateBlockTransactionChainedTransactions(mempoolTx.DomainTransaction) - } - return nil -} - -type transactionAndOutpoint struct { - transaction *consensusexternalapi.DomainTransaction - outpoint *consensusexternalapi.DomainOutpoint -} - -// removeTransactionAndItsChainedTransactions removes a transaction and all of its chained transaction from the mempool. -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) removeTransactionAndItsChainedTransactions(transaction *consensusexternalapi.DomainTransaction) error { - transactionAndOutpointQueue := make([]*transactionAndOutpoint, 0) - transactionAndOutpointQueue = appendTransactionToTransactionAndOutpointQueue(transactionAndOutpointQueue, transaction) - // Remove any transactions which rely on this one. - for len(transactionAndOutpointQueue) > 0 { - txAndOutpoint := transactionAndOutpointQueue[0] - transactionAndOutpointQueue = transactionAndOutpointQueue[1:] - if txRedeemer, exists := mp.mempoolUTXOSet.poolTransactionBySpendingOutpoint(*txAndOutpoint.outpoint); exists { - transactionAndOutpointQueue = appendTransactionToTransactionAndOutpointQueue(transactionAndOutpointQueue, txRedeemer) - } - - transactionID := txAndOutpoint.outpoint.TransactionID - if _, exists := mp.chainedTransactions[transactionID]; exists { - mp.removeChainTransaction(txAndOutpoint.transaction) - } - err := mp.cleanTransactionFromSets(txAndOutpoint.transaction) - if err != nil { - return err - } - } - return nil -} - -func appendTransactionToTransactionAndOutpointQueue(queue []*transactionAndOutpoint, - transaction *consensusexternalapi.DomainTransaction) []*transactionAndOutpoint { - - transactionID := consensushashing.TransactionID(transaction) - queueWithAddedTransactionAndOutpoint := queue - for i := uint32(0); i < uint32(len(transaction.Outputs)); i++ { - previousOutpoint := consensusexternalapi.DomainOutpoint{TransactionID: *transactionID, Index: i} - queueWithAddedTransactionAndOutpoint = append(queueWithAddedTransactionAndOutpoint, - &transactionAndOutpoint{transaction: transaction, outpoint: &previousOutpoint}) - } - return queueWithAddedTransactionAndOutpoint -} - -// cleanTransactionFromSets removes the transaction from all mempool related transaction sets. -// It assumes that any chained transaction is already cleaned from the mempool. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) cleanTransactionFromSets(tx *consensusexternalapi.DomainTransaction) error { - err := mp.mempoolUTXOSet.removeTx(tx) - if err != nil { - return err - } - - txID := consensushashing.TransactionID(tx) - delete(mp.pool, *txID) - delete(mp.chainedTransactions, *txID) - - return mp.removeTransactionFromOrderedTransactionsByFeeRate(tx) -} - -// updateBlockTransactionChainedTransactions processes the dependencies of a -// transaction that was included in a block and was just now removed from the mempool. -// -// This function MUST be called with the mempool lock held (for writes). - -func (mp *mempool) updateBlockTransactionChainedTransactions(tx *consensusexternalapi.DomainTransaction) { - prevOut := consensusexternalapi.DomainOutpoint{TransactionID: *consensushashing.TransactionID(tx)} - for txOutIdx := range tx.Outputs { - // Skip to the next available output if there are none. - prevOut.Index = uint32(txOutIdx) - txDesc, exists := mp.chainedTransactionByPreviousOutpoint[prevOut] - if !exists { - continue - } - - txDesc.depCount-- - // If the transaction is not chained anymore, move it into the main pool - if txDesc.depCount == 0 { - // Transaction may be already removed by recursive calls, if removeRedeemers is true. - // So avoid moving it into main pool - txDescID := consensushashing.TransactionID(txDesc.DomainTransaction) - if _, ok := mp.chainedTransactions[*txDescID]; ok { - delete(mp.chainedTransactions, *txDescID) - mp.pool[*txDescID] = txDesc - } - } - delete(mp.chainedTransactionByPreviousOutpoint, prevOut) - } -} - -// removeChainTransaction removes a chain transaction and all of its relation as a result of double spend. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) removeChainTransaction(tx *consensusexternalapi.DomainTransaction) { - delete(mp.chainedTransactions, *consensushashing.TransactionID(tx)) - for _, txIn := range tx.Inputs { - delete(mp.chainedTransactionByPreviousOutpoint, txIn.PreviousOutpoint) - } -} - -// removeDoubleSpends removes all transactions which spend outputs spent by the -// passed transaction from the memory pool. Removing those transactions then -// leads to removing all transactions which rely on them, recursively. This is -// necessary when a block is connected to the DAG because the block may -// contain transactions which were previously unknown to the memory pool. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) removeDoubleSpends(tx *consensusexternalapi.DomainTransaction) error { - txID := consensushashing.TransactionID(tx) - for _, txIn := range tx.Inputs { - if txRedeemer, ok := mp.mempoolUTXOSet.poolTransactionBySpendingOutpoint(txIn.PreviousOutpoint); ok { - if !consensushashing.TransactionID(txRedeemer).Equal(txID) { - err := mp.removeTransactionAndItsChainedTransactions(txRedeemer) - if err != nil { - return err - } - } - } - } - return nil -} - -// addTransaction adds the passed transaction to the memory pool. It should -// not be called directly as it doesn't perform any validation. This is a -// helper for maybeAcceptTransaction. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) addTransaction(tx *consensusexternalapi.DomainTransaction, parentsInPool []consensusexternalapi.DomainOutpoint) (*txDescriptor, error) { - // Add the transaction to the pool and mark the referenced outpoints - // as spent by the pool. - txDescriptor := &txDescriptor{ - DomainTransaction: tx, - depCount: len(parentsInPool), - } - txID := *consensushashing.TransactionID(tx) - - if len(parentsInPool) == 0 { - mp.pool[txID] = txDescriptor - } else { - mp.chainedTransactions[txID] = txDescriptor - for _, previousOutpoint := range parentsInPool { - mp.chainedTransactionByPreviousOutpoint[previousOutpoint] = txDescriptor - } - } - - err := mp.mempoolUTXOSet.addTx(tx) - if err != nil { - return nil, err - } - - err = mp.addTransactionToOrderedTransactionsByFeeRate(tx) - if err != nil { - return nil, err - } - - return txDescriptor, nil -} - -func (mp *mempool) findTxIndexInOrderedTransactionsByFeeRate(tx *consensusexternalapi.DomainTransaction) (int, error) { - if tx.Fee == 0 || tx.Mass == 0 { - return 0, errors.Errorf("findTxIndexInOrderedTransactionsByFeeRate expects a transaction with " + - "populated fee and mass") - } - txID := consensushashing.TransactionID(tx) - txFeeRate := float64(tx.Fee) / float64(tx.Mass) - - return sort.Search(len(mp.orderedTransactionsByFeeRate), func(i int) bool { - elementFeeRate := float64(mp.orderedTransactionsByFeeRate[i].Fee) / float64(mp.orderedTransactionsByFeeRate[i].Mass) - if elementFeeRate > txFeeRate { - return true - } - - if elementFeeRate == txFeeRate && txID.LessOrEqual(consensushashing.TransactionID(mp.orderedTransactionsByFeeRate[i])) { - return true - } - - return false - }), nil -} - -func (mp *mempool) addTransactionToOrderedTransactionsByFeeRate(tx *consensusexternalapi.DomainTransaction) error { - index, err := mp.findTxIndexInOrderedTransactionsByFeeRate(tx) - if err != nil { - return err - } - - mp.orderedTransactionsByFeeRate = append(mp.orderedTransactionsByFeeRate[:index], - append([]*consensusexternalapi.DomainTransaction{tx}, mp.orderedTransactionsByFeeRate[index:]...)...) - - return nil -} - -func (mp *mempool) removeTransactionFromOrderedTransactionsByFeeRate(tx *consensusexternalapi.DomainTransaction) error { - index, err := mp.findTxIndexInOrderedTransactionsByFeeRate(tx) - if err != nil { - return err - } - - txID := consensushashing.TransactionID(tx) - if !consensushashing.TransactionID(mp.orderedTransactionsByFeeRate[index]).Equal(txID) { - return errors.Errorf("Couldn't find %s in mp.orderedTransactionsByFeeRate", txID) - } - - mp.orderedTransactionsByFeeRate = append(mp.orderedTransactionsByFeeRate[:index], mp.orderedTransactionsByFeeRate[index+1:]...) - return nil -} - -func (mp *mempool) enforceTransactionLimit() error { - const limit = 1_000_000 - if len(mp.pool)+len(mp.chainedTransactions) > limit { - // mp.orderedTransactionsByFeeRate[0] is the least profitable transaction - txToRemove := mp.orderedTransactionsByFeeRate[0] - log.Debugf("Mempool size is over the limit of %d transactions. Removing %s", - limit, - consensushashing.TransactionID(txToRemove), - ) - return mp.removeTransactionAndItsChainedTransactions(txToRemove) - } - return nil -} - -// checkPoolDoubleSpend checks whether or not the passed transaction is -// attempting to spend coins already spent by other transactions in the pool. -// Note it does not check for double spends against transactions already in the -// DAG. -// -// This function MUST be called with the mempool lock held (for reads). -func (mp *mempool) checkPoolDoubleSpend(tx *consensusexternalapi.DomainTransaction) error { - for _, txIn := range tx.Inputs { - if txR, exists := mp.mempoolUTXOSet.poolTransactionBySpendingOutpoint(txIn.PreviousOutpoint); exists { - str := fmt.Sprintf("output %s already spent by "+ - "transaction %s in the memory pool", - txIn.PreviousOutpoint, consensushashing.TransactionID(txR)) - return txRuleError(RejectDuplicate, str) - } - } - - return nil -} - -// This function MUST be called with the mempool lock held (for reads). -// This only fetches from the main transaction pool and does not include -// orphans. -// returns false in the second return parameter if transaction was not found -func (mp *mempool) fetchTxDesc(txID *consensusexternalapi.DomainTransactionID) (*txDescriptor, bool) { - txDesc, exists := mp.pool[*txID] - if !exists { - txDesc, exists = mp.chainedTransactions[*txID] - } - return txDesc, exists -} - -// maybeAcceptTransaction is the main workhorse for handling insertion of new -// free-standing transactions into a memory pool. It includes functionality -// such as rejecting duplicate transactions, ensuring transactions follow all -// rules, detecting orphan transactions, and insertion into the memory pool. -// -// If the transaction is an orphan (missing parent transactions), the -// transaction is NOT added to the orphan pool, but each unknown referenced -// parent is returned. Use ProcessTransaction instead if new orphans should -// be added to the orphan pool. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) maybeAcceptTransaction(tx *consensusexternalapi.DomainTransaction, rejectDupOrphans bool) ( - []*consensusexternalapi.DomainOutpoint, *txDescriptor, error) { - - txID := consensushashing.TransactionID(tx) - - // Don't accept the transaction if it already exists in the pool. This - // applies to orphan transactions as well when the reject duplicate - // orphans flag is set. This check is intended to be a quick check to - // weed out duplicates. - if mp.isTransactionInPool(txID) || (rejectDupOrphans && - mp.isOrphanInPool(txID)) { - - str := fmt.Sprintf("already have transaction %s", txID) - return nil, nil, txRuleError(RejectDuplicate, str) - } - - // Don't allow non-standard transactions if the network parameters - // forbid their acceptance. - if !mp.policy.AcceptNonStd { - err := checkTransactionStandard(tx, &mp.policy) - if err != nil { - // Attempt to extract a reject code from the error so - // it can be retained. When not possible, fall back to - // a non standard error. - rejectCode, found := extractRejectCode(err) - if !found { - rejectCode = RejectNonstandard - } - str := fmt.Sprintf("transaction %s is not standard: %s", - txID, err) - return nil, nil, txRuleError(rejectCode, str) - } - } - - // The transaction may not use any of the same outputs as other - // transactions already in the pool as that would ultimately result in a - // double spend. This check is intended to be quick and therefore only - // detects double spends within the transaction pool itself. The - // transaction could still be double spending coins from the DAG - // at this point. There is a more in-depth check that happens later - // after fetching the referenced transaction inputs from the DAG - // which examines the actual spend data and prevents double spends. - err := mp.checkPoolDoubleSpend(tx) - if err != nil { - return nil, nil, err - } - - // Don't allow the transaction if it exists in the DAG and is - // not already fully spent. - if mp.mempoolUTXOSet.checkExists(tx) { - return nil, nil, txRuleError(RejectDuplicate, "transaction already exists") - } - - // Transaction is an orphan if any of the referenced transaction outputs - // don't exist or are already spent. Adding orphans to the orphan pool - // is not handled by this function, and the caller should use - // maybeAddOrphan if this behavior is desired. - parentsInPool := mp.mempoolUTXOSet.populateUTXOEntries(tx) - - // This will populate the missing UTXOEntries. - err = mp.consensus.ValidateTransactionAndPopulateWithConsensusData(tx) - missingOutpoints := ruleerrors.ErrMissingTxOut{} - if err != nil { - if errors.As(err, &missingOutpoints) { - return missingOutpoints.MissingOutpoints, nil, nil - } - if errors.Is(err, ruleerrors.ErrImmatureSpend) { - return nil, nil, txRuleError(RejectImmatureSpend, "one of the transaction inputs spends an immature UTXO") - } - if errors.As(err, &ruleerrors.RuleError{}) { - return nil, nil, newRuleError(err) - } - return nil, nil, err - } - - if tx.Mass > mp.dagParams.MaxMassAcceptedByBlock { - return nil, nil, newRuleError(errors.Errorf("The transaction mass is %d which is "+ - "higher than the maxmimum of %d", tx.Mass, mp.dagParams.MaxMassAcceptedByBlock)) - } - - // Don't allow transactions with non-standard inputs if the network - // parameters forbid their acceptance. - if !mp.policy.AcceptNonStd { - err := checkInputsStandard(tx) - if err != nil { - // Attempt to extract a reject code from the error so - // it can be retained. When not possible, fall back to - // a non standard error. - rejectCode, found := extractRejectCode(err) - if !found { - rejectCode = RejectNonstandard - } - str := fmt.Sprintf("transaction %s has a non-standard "+ - "input: %s", txID, err) - return nil, nil, txRuleError(rejectCode, str) - } - } - - // Don't allow transactions with fees too low to get into a mined block - serializedSize := int64(estimatedsize.TransactionEstimatedSerializedSize(tx)) - minFee := uint64(calcMinRequiredTxRelayFee(serializedSize, - mp.policy.MinRelayTxFee)) - if tx.Fee < minFee { - str := fmt.Sprintf("transaction %s has %d fees which is under "+ - "the required amount of %d", txID, tx.Fee, - minFee) - return nil, nil, txRuleError(RejectInsufficientFee, str) - } - // Add to transaction pool. - txDesc, err := mp.addTransaction(tx, parentsInPool) - if err != nil { - return nil, nil, err - } - - log.Debugf("Accepted transaction %s (pool size: %d)", txID, - len(mp.pool)) - - err = mp.enforceTransactionLimit() - if err != nil { - return nil, nil, err - } - - return nil, txDesc, nil -} - -// processOrphans determines if there are any orphans which depend on the passed -// transaction hash (it is possible that they are no longer orphans) and -// potentially accepts them to the memory pool. It repeats the process for the -// newly accepted transactions (to detect further orphans which may no longer be -// orphans) until there are no more. -// -// It returns a slice of transactions added to the mempool. A nil slice means -// no transactions were moved from the orphan pool to the mempool. -// -// This function MUST be called with the mempool lock held (for writes). -func (mp *mempool) processOrphans(acceptedTx *consensusexternalapi.DomainTransaction) ([]*txDescriptor, error) { - var acceptedTxns []*txDescriptor - - // Start with processing at least the passed transaction. - processList := list.New() - processList.PushBack(acceptedTx) - for processList.Len() > 0 { - // Pop the transaction to process from the front of the list. - firstElement := processList.Remove(processList.Front()) - processItem := firstElement.(*consensusexternalapi.DomainTransaction) - - prevOut := consensusexternalapi.DomainOutpoint{TransactionID: *consensushashing.TransactionID(processItem)} - for txOutIdx := range processItem.Outputs { - // Look up all orphans that redeem the output that is - // now available. This will typically only be one, but - // it could be multiple if the orphan pool contains - // double spends. While it may seem odd that the orphan - // pool would allow this since there can only possibly - // ultimately be a single redeemer, it's important to - // track it this way to prevent malicious actors from - // being able to purposely constructing orphans that - // would otherwise make outputs unspendable. - // - // Skip to the next available output if there are none. - prevOut.Index = uint32(txOutIdx) - orphans, exists := mp.orphansByPrev[prevOut] - if !exists { - continue - } - - // Potentially accept an orphan into the tx pool. - for _, tx := range orphans { - missing, txD, err := mp.maybeAcceptTransaction( - tx, false) - if err != nil { - if !errors.As(err, &RuleError{}) { - return nil, err - } - - log.Warnf("Invalid orphan transaction: %s", err) - // The orphan is now invalid, so there - // is no way any other orphans which - // redeem any of its outputs can be - // accepted. Remove them. - mp.removeOrphan(tx, true) - break - } - - // Transaction is still an orphan. Try the next - // orphan which redeems this output. - if len(missing) > 0 { - continue - } - - // Transaction was accepted into the main pool. - // - // Add it to the list of accepted transactions - // that are no longer orphans, remove it from - // the orphan pool, and add it to the list of - // transactions to process so any orphans that - // depend on it are handled too. - acceptedTxns = append(acceptedTxns, txD) - mp.removeOrphan(tx, false) - processList.PushBack(tx) - - // Only one transaction for this outpoint can be - // accepted, so the rest are now double spends - // and are removed later. - break - } - } - } - - // Recursively remove any orphans that also redeem any outputs redeemed - // by the accepted transactions since those are now definitive double - // spends. - mp.removeOrphanDoubleSpends(acceptedTx) - for _, txDescriptor := range acceptedTxns { - mp.removeOrphanDoubleSpends(txDescriptor.DomainTransaction) - } - - return acceptedTxns, nil -} - -// ProcessTransaction is the main workhorse for handling insertion of new -// free-standing transactions into the memory pool. It includes functionality -// such as rejecting duplicate transactions, ensuring transactions follow all -// rules, orphan transaction handling, and insertion into the memory pool. -// -// It returns a slice of transactions added to the mempool. When the -// error is nil, the list will include the passed transaction itself along -// with any additional orphan transaactions that were added as a result of -// the passed one being accepted. -// -// This function is safe for concurrent access. -func (mp *mempool) ValidateAndInsertTransaction(tx *consensusexternalapi.DomainTransaction, allowOrphan bool) error { - log.Tracef("Processing transaction %s", consensushashing.TransactionID(tx)) - - // Protect concurrent access. - mp.mtx.Lock() - defer mp.mtx.Unlock() - - // Potentially accept the transaction to the memory pool. - missingParents, txD, err := mp.maybeAcceptTransaction(tx, true) - if err != nil { - return err - } - - if len(missingParents) == 0 { - // Accept any orphan transactions that depend on this - // transaction (they may no longer be orphans if all inputs - // are now available) and repeat for those accepted - // transactions until there are no more. - newTxs, err := mp.processOrphans(tx) - if err != nil { - return err - } - - acceptedTxs := make([]*txDescriptor, len(newTxs)+1) - - // Add the parent transaction first so remote nodes - // do not add orphans. - acceptedTxs[0] = txD - copy(acceptedTxs[1:], newTxs) - - return nil - } - - // The transaction is an orphan (has inputs missing). Reject - // it if the flag to allow orphans is not set. - if !allowOrphan { - // Only use the first missing parent transaction in - // the error message. - // - // NOTE: RejectDuplicate is really not an accurate - // reject code here, but it matches the reference - // implementation and there isn't a better choice due - // to the limited number of reject codes. Missing - // inputs is assumed to mean they are already spent - // which is not really always the case. - str := fmt.Sprintf("orphan transaction %s references "+ - "outputs of unknown or fully-spent "+ - "transaction %s", consensushashing.TransactionID(tx), missingParents[0]) - return txRuleError(RejectDuplicate, str) - } - - // Potentially add the orphan transaction to the orphan pool. - return mp.maybeAddOrphan(tx) -} - -// Count returns the number of transactions in the main pool. It does not -// include the orphan pool. -// -// This function is safe for concurrent access. -func (mp *mempool) Count() int { - mp.mtx.RLock() - defer mp.mtx.RUnlock() - count := len(mp.pool) - - return count -} - -// ChainedCount returns the number of chained transactions in the mempool. It does not -// include the orphan pool. -// -// This function is safe for concurrent access. -func (mp *mempool) ChainedCount() int { - mp.mtx.RLock() - defer mp.mtx.RUnlock() - return len(mp.chainedTransactions) -} - -// BlockCandidateTransactions returns a slice of all the candidate transactions for the next block -// This is safe for concurrent use -func (mp *mempool) BlockCandidateTransactions() []*consensusexternalapi.DomainTransaction { - mp.mtx.RLock() - defer mp.mtx.RUnlock() - - onEnd := logger.LogAndMeasureExecutionTime(log, "BlockCandidateTransactions") - defer onEnd() - - descs := make([]*consensusexternalapi.DomainTransaction, len(mp.pool)) - i := 0 - for _, desc := range mp.pool { - descs[i] = desc.DomainTransaction.Clone() // Clone the transaction to prevent data races. A shallow-copy might do as well - i++ - } - - return descs -} - -// HandleNewBlockTransactions removes all the transactions in the new block -// from the mempool and the orphan pool, and it also removes -// from the mempool transactions that double spend a -// transaction that is already in the DAG -func (mp *mempool) HandleNewBlockTransactions(txs []*consensusexternalapi.DomainTransaction) ([]*consensusexternalapi.DomainTransaction, error) { - // Protect concurrent access. - mp.mtx.Lock() - defer mp.mtx.Unlock() - - // Remove all of the transactions (except the coinbase) in the - // connected block from the transaction pool. Secondly, remove any - // transactions which are now double spends as a result of these - // new transactions. Finally, remove any transaction that is - // no longer an orphan. Transactions which depend on a confirmed - // transaction are NOT removed recursively because they are still - // valid. - err := mp.removeTransactionsFromPool(txs) - if err != nil { - return nil, errors.Wrapf(err, "Failed removing txs from pool") - } - acceptedTxs := make([]*consensusexternalapi.DomainTransaction, 0) - for _, tx := range txs[transactionhelper.CoinbaseTransactionIndex+1:] { - err := mp.removeDoubleSpends(tx) - if err != nil { - return nil, errors.Wrapf(err, "Failed removing tx from mempool: %s", consensushashing.TransactionID(tx)) - } - mp.removeOrphan(tx, false) - acceptedOrphans, err := mp.processOrphans(tx) - if err != nil { - return nil, err - } - - for _, acceptedOrphan := range acceptedOrphans { - acceptedTxs = append(acceptedTxs, acceptedOrphan.DomainTransaction) - } - } - - return acceptedTxs, nil -} - -func (mp *mempool) RemoveTransactions(txs []*consensusexternalapi.DomainTransaction) error { - // Protect concurrent access. - mp.mtx.Lock() - defer mp.mtx.Unlock() - - return mp.removeTransactionsFromPool(txs) + panic("mempool.fillInputsAndGetMissingParents not implemented") // TODO (Mike) } diff --git a/domain/miningmanager/mempool/mempool_utxo_set.go b/domain/miningmanager/mempool/mempool_utxo_set.go new file mode 100644 index 000000000..16851a43f --- /dev/null +++ b/domain/miningmanager/mempool/mempool_utxo_set.go @@ -0,0 +1,17 @@ +package mempool + +import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + +type outpointToUTXOEntry map[externalapi.DomainOutpoint]externalapi.UTXOEntry + +type mempoolUTXOSet struct { + poolUnspentOutputs outpointToUTXOEntry +} + +func (mpus *mempoolUTXOSet) getParentsInPool(transaction *mempoolTransaction) ([]*mempoolTransaction, error) { + panic("mempoolUTXOSet.getParentsInPool not implemented") // TODO (Mike) +} + +func (mpus *mempoolUTXOSet) addTransaction(transaction *mempoolTransaction) error { + panic("mempoolUTXOSet.addTransaction not implemented") // TODO (Mike) +} diff --git a/domain/miningmanager/mempool/model.go b/domain/miningmanager/mempool/model.go new file mode 100644 index 000000000..2f3b10a7f --- /dev/null +++ b/domain/miningmanager/mempool/model.go @@ -0,0 +1,18 @@ +package mempool + +import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + +type idToTransaction map[externalapi.DomainTransactionID]*mempoolTransaction + +type mempoolTransaction struct { + transaction externalapi.DomainTransaction + parentsInPool idToTransaction + isHighPriority bool + addAtDAAScore uint64 +} + +type orphanTransaction struct { + transaction externalapi.DomainTransaction + isHighPriority bool + addedAtDAAScore uint64 +} diff --git a/domain/miningmanager/mempool/orphan_pool.go b/domain/miningmanager/mempool/orphan_pool.go new file mode 100644 index 000000000..5b81bd06c --- /dev/null +++ b/domain/miningmanager/mempool/orphan_pool.go @@ -0,0 +1,35 @@ +package mempool + +import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + +type orphanByPreviousOutpoint map[externalapi.DomainOutpoint]idToTransaction + +type orphansPool struct { + allOrphans idToTransaction + orphanByPreviousOutpoint orphanByPreviousOutpoint + previousExpireScan uint64 +} + +func (op *orphansPool) maybeAddOrphan(transaction *externalapi.DomainTransaction, + missingParents []*externalapi.DomainTransactionID, isHighPriority bool) error { + + panic("orphansPool.maybeAddOrphan not implemented") // TODO (Mike) +} + +func (op *orphansPool) processOrphansAfterAcceptedTransaction(acceptedTransaction *mempoolTransaction) ( + acceptedOrphans []*mempoolTransaction, err error) { + + panic("orphansPool.processOrphansAfterAcceptedTransaction not implemented") // TODO (Mike) +} + +func (op *orphansPool) unorphanTransaction(orphanTransactionID *externalapi.DomainTransactionID) (mempoolTransaction, error) { + panic("orphansPool.unorphanTransaction not implemented") // TODO (Mike) +} + +func (op *orphansPool) removeOrphan(orphanTransactionID *externalapi.DomainTransactionID) error { + panic("orphansPool.removeOrphan not implemented") // TODO (Mike) +} + +func (op *orphansPool) expireOrphanTransactions() error { + panic("orphansPool.expireOrphanTransactions not implemented") // TODO (Mike) +} diff --git a/domain/miningmanager/mempool/transactions_pool.go b/domain/miningmanager/mempool/transactions_pool.go new file mode 100644 index 000000000..a7dcf816b --- /dev/null +++ b/domain/miningmanager/mempool/transactions_pool.go @@ -0,0 +1,26 @@ +package mempool + +import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + +type outpointToTransaction map[externalapi.DomainOutpoint]*mempoolTransaction + +type transactionsByFeeHeap []*mempoolTransaction + +type transactionsPool struct { + allTransactions idToTransaction + highPriorityTransactions idToTransaction + chainedTransactionsByPreviousOutpoint outpointToTransaction + transactionsByFeeRate transactionsByFeeHeap +} + +func (tp *transactionsPool) addTransaction(transaction *externalapi.DomainTransaction, parentsInPool []*mempoolTransaction) error { + panic("transactionsPool.addTransaction not implemented") // TODO (Mike) +} + +func (tp *transactionsPool) addMempoolTransaction(transaction mempoolTransaction) error { + panic("transactionsPool.addMempoolTransaction not implemented") // TODO (Mike) +} + +func (tp *transactionsPool) expireOldTransactions() error { + panic("transactionsPool.expireOldTransactions not implemented") // TODO (Mike) +} diff --git a/domain/miningmanager/mempool/README.md b/domain/miningmanager/mempool_old/README.md similarity index 100% rename from domain/miningmanager/mempool/README.md rename to domain/miningmanager/mempool_old/README.md diff --git a/domain/miningmanager/mempool/doc.go b/domain/miningmanager/mempool_old/doc.go similarity index 93% rename from domain/miningmanager/mempool/doc.go rename to domain/miningmanager/mempool_old/doc.go index a5b38ab16..66f56b2ef 100644 --- a/domain/miningmanager/mempool/doc.go +++ b/domain/miningmanager/mempool_old/doc.go @@ -59,10 +59,10 @@ be an exhaustive list. Errors Errors returned by this package are either the raw errors provided by underlying -calls or of type mempool.RuleError. Since there are two classes of rules +calls or of type mempool_old.RuleError. Since there are two classes of rules (mempool acceptance rules and blockDAG (consensus) acceptance rules), the -mempool.RuleError type contains a single Err field which will, in turn, either -be a mempool.TxRuleError or a ruleerrors.RuleError. The first indicates a +mempool_old.RuleError type contains a single Err field which will, in turn, either +be a mempool_old.TxRuleError or a ruleerrors.RuleError. The first indicates a violation of mempool acceptance rules while the latter indicates a violation of consensus acceptance rules. This allows the caller to easily differentiate between unexpected errors, such as database errors, versus errors due to rule @@ -70,4 +70,4 @@ violations through type assertions. In addition, callers can programmatically determine the specific rule violation by type asserting the Err field to one of the aforementioned types and examining their underlying ErrorCode field. */ -package mempool +package mempool_old diff --git a/domain/miningmanager/mempool/error.go b/domain/miningmanager/mempool_old/error.go similarity index 99% rename from domain/miningmanager/mempool/error.go rename to domain/miningmanager/mempool_old/error.go index bc51bb3a0..3ba5eec43 100644 --- a/domain/miningmanager/mempool/error.go +++ b/domain/miningmanager/mempool_old/error.go @@ -2,7 +2,7 @@ // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. -package mempool +package mempool_old import ( "fmt" diff --git a/domain/miningmanager/mempool_old/log.go b/domain/miningmanager/mempool_old/log.go new file mode 100644 index 000000000..0e4e9a43f --- /dev/null +++ b/domain/miningmanager/mempool_old/log.go @@ -0,0 +1,11 @@ +// Copyright (c) 2013-2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mempool_old + +import ( + "github.com/kaspanet/kaspad/infrastructure/logger" +) + +var log = logger.RegisterSubSystem("TXMP") diff --git a/domain/miningmanager/mempool_old/mempool.go b/domain/miningmanager/mempool_old/mempool.go new file mode 100644 index 000000000..5343a41f9 --- /dev/null +++ b/domain/miningmanager/mempool_old/mempool.go @@ -0,0 +1,1045 @@ +// Copyright (c) 2013-2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mempool_old + +import ( + "container/list" + "fmt" + "sort" + "sync" + "time" + + "github.com/kaspanet/kaspad/domain/dagconfig" + + "github.com/kaspanet/kaspad/infrastructure/logger" + + "github.com/kaspanet/kaspad/domain/consensus/utils/constants" + + "github.com/kaspanet/kaspad/domain/consensus/utils/transactionhelper" + + consensusexternalapi "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/kaspanet/kaspad/domain/consensus/ruleerrors" + "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" + "github.com/kaspanet/kaspad/domain/consensus/utils/estimatedsize" + miningmanagermodel "github.com/kaspanet/kaspad/domain/miningmanager/model" + "github.com/kaspanet/kaspad/util" + "github.com/kaspanet/kaspad/util/mstime" + "github.com/pkg/errors" +) + +const ( + // orphanTTL is the maximum amount of time an orphan is allowed to + // stay in the orphan pool before it expires and is evicted during the + // next scan. + orphanTTL = time.Minute * 15 + + // orphanExpireScanInterval is the minimum amount of time in between + // scans of the orphan pool to evict expired transactions. + orphanExpireScanInterval = time.Minute * 5 +) + +// policy houses the policy (configuration parameters) which is used to +// control the mempool. +type policy struct { + // MaxTxVersion is the transaction version that the mempool should + // accept. All transactions above this version are rejected as + // non-standard. + MaxTxVersion uint16 + + // AcceptNonStd defines whether to accept non-standard transactions. If + // true, non-standard transactions will be accepted into the mempool. + // Otherwise, all non-standard transactions will be rejected. + AcceptNonStd bool + + // MaxOrphanTxs is the maximum number of orphan transactions + // that can be queued. + MaxOrphanTxs int + + // MaxOrphanTxSize is the maximum size allowed for orphan transactions. + // This helps prevent memory exhaustion attacks from sending a lot of + // of big orphans. + MaxOrphanTxSize int + + // MinRelayTxFee defines the minimum transaction fee in KAS/kB to be + // considered a non-zero fee. + MinRelayTxFee util.Amount +} + +// mempool is used as a source of transactions that need to be mined into blocks +// and relayed to other peers. It is safe for concurrent access from multiple +// peers. +type mempool struct { + pool map[consensusexternalapi.DomainTransactionID]*txDescriptor + + chainedTransactions map[consensusexternalapi.DomainTransactionID]*txDescriptor + chainedTransactionByPreviousOutpoint map[consensusexternalapi.DomainOutpoint]*txDescriptor + + orphans map[consensusexternalapi.DomainTransactionID]*orphanTx + orphansByPrev map[consensusexternalapi.DomainOutpoint]map[consensusexternalapi.DomainTransactionID]*consensusexternalapi.DomainTransaction + + mempoolUTXOSet *mempoolUTXOSet + consensus consensusexternalapi.Consensus + + orderedTransactionsByFeeRate []*consensusexternalapi.DomainTransaction + + // nextExpireScan is the time after which the orphan pool will be + // scanned in order to evict orphans. This is NOT a hard deadline as + // the scan will only run when an orphan is added to the pool as opposed + // to on an unconditional timer. + nextExpireScan mstime.Time + + mtx sync.RWMutex + policy policy + dagParams *dagconfig.Params +} + +// New returns a new memory pool for validating and storing standalone +// transactions until they are mined into a block. +func New(consensus consensusexternalapi.Consensus, dagParams *dagconfig.Params) miningmanagermodel.Mempool { + policy := policy{ + MaxTxVersion: constants.MaxTransactionVersion, + AcceptNonStd: dagParams.RelayNonStdTxs, + MaxOrphanTxs: 5, + MaxOrphanTxSize: 100000, + MinRelayTxFee: 1000, // 1 sompi per byte + } + return &mempool{ + mtx: sync.RWMutex{}, + policy: policy, + pool: make(map[consensusexternalapi.DomainTransactionID]*txDescriptor), + chainedTransactions: make(map[consensusexternalapi.DomainTransactionID]*txDescriptor), + chainedTransactionByPreviousOutpoint: make(map[consensusexternalapi.DomainOutpoint]*txDescriptor), + orphans: make(map[consensusexternalapi.DomainTransactionID]*orphanTx), + orphansByPrev: make(map[consensusexternalapi.DomainOutpoint]map[consensusexternalapi.DomainTransactionID]*consensusexternalapi.DomainTransaction), + mempoolUTXOSet: newMempoolUTXOSet(), + consensus: consensus, + nextExpireScan: mstime.Now().Add(orphanExpireScanInterval), + dagParams: dagParams, + } +} + +func (mp *mempool) GetTransaction( + transactionID *consensusexternalapi.DomainTransactionID) (*consensusexternalapi.DomainTransaction, bool) { + mp.mtx.RLock() + defer mp.mtx.RUnlock() + + txDesc, exists := mp.fetchTxDesc(transactionID) + if !exists { + return nil, false + } + + return txDesc.DomainTransaction, true +} + +func (mp *mempool) AllTransactions() []*consensusexternalapi.DomainTransaction { + mp.mtx.RLock() + defer mp.mtx.RUnlock() + + transactions := make([]*consensusexternalapi.DomainTransaction, 0, len(mp.pool)+len(mp.chainedTransactions)) + for _, txDesc := range mp.pool { + transactions = append(transactions, txDesc.DomainTransaction) + } + + for _, txDesc := range mp.chainedTransactions { + transactions = append(transactions, txDesc.DomainTransaction) + } + + return transactions +} + +func (mp *mempool) TransactionCount() int { + mp.mtx.RLock() + defer mp.mtx.RUnlock() + + return len(mp.pool) + len(mp.chainedTransactions) +} + +// txDescriptor is a descriptor containing a transaction in the mempool along with +// additional metadata. +type txDescriptor struct { + *consensusexternalapi.DomainTransaction + + // depCount is not 0 for a chained transaction. A chained transaction is + // one that is accepted to pool, but cannot be mined in next block because it + // depends on outputs of accepted, but still not mined transaction + depCount int +} + +// orphanTx is normal transaction that references an ancestor transaction +// that is not yet available. It also contains additional information related +// to it such as an expiration time to help prevent caching the orphan forever. +type orphanTx struct { + tx *consensusexternalapi.DomainTransaction + expiration mstime.Time +} + +// removeOrphan removes the passed orphan transaction from the orphan pool and +// previous orphan index. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) removeOrphan(tx *consensusexternalapi.DomainTransaction, removeRedeemers bool) { + // Nothing to do if passed tx is not an orphan. + txID := consensushashing.TransactionID(tx) + otx, exists := mp.orphans[*txID] + if !exists { + return + } + + // Remove the reference from the previous orphan index. + for _, txIn := range otx.tx.Inputs { + orphans, exists := mp.orphansByPrev[txIn.PreviousOutpoint] + if exists { + delete(orphans, *txID) + + // Remove the map entry altogether if there are no + // longer any orphans which depend on it. + if len(orphans) == 0 { + delete(mp.orphansByPrev, txIn.PreviousOutpoint) + } + } + } + + // Remove any orphans that redeem outputs from this one if requested. + if removeRedeemers { + prevOut := consensusexternalapi.DomainOutpoint{TransactionID: *txID} + for txOutIdx := range tx.Outputs { + prevOut.Index = uint32(txOutIdx) + for _, orphan := range mp.orphansByPrev[prevOut] { + mp.removeOrphan(orphan, true) + } + } + } + + // Remove the transaction from the orphan pool. + delete(mp.orphans, *txID) +} + +// limitNumOrphans limits the number of orphan transactions by evicting a random +// orphan if adding a new one would cause it to overflow the max allowed. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) limitNumOrphans() error { + // Scan through the orphan pool and remove any expired orphans when it's + // time. This is done for efficiency so the scan only happens + // periodically instead of on every orphan added to the pool. + if now := mstime.Now(); now.After(mp.nextExpireScan) { + origNumOrphans := len(mp.orphans) + for _, otx := range mp.orphans { + if now.After(otx.expiration) { + // Remove redeemers too because the missing + // parents are very unlikely to ever materialize + // since the orphan has already been around more + // than long enough for them to be delivered. + mp.removeOrphan(otx.tx, true) + } + } + + // Set next expiration scan to occur after the scan interval. + mp.nextExpireScan = now.Add(orphanExpireScanInterval) + + numOrphans := len(mp.orphans) + if numExpired := origNumOrphans - numOrphans; numExpired > 0 { + log.Debugf("Expired %d orphans (remaining: %d)", numExpired, numOrphans) + } + } + + // Nothing to do if adding another orphan will not cause the pool to + // exceed the limit. + if len(mp.orphans)+1 <= mp.policy.MaxOrphanTxs { + return nil + } + + // Remove a random entry from the map. For most compilers, Go's + // range statement iterates starting at a random item although + // that is not 100% guaranteed by the spec. The iteration order + // is not important here because an adversary would have to be + // able to pull off preimage attacks on the hashing function in + // order to target eviction of specific entries anyways. + for _, otx := range mp.orphans { + // Don't remove redeemers in the case of a random eviction since + // it is quite possible it might be needed again shortly. + mp.removeOrphan(otx.tx, false) + break + } + + return nil +} + +// addOrphan adds an orphan transaction to the orphan pool. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) addOrphan(tx *consensusexternalapi.DomainTransaction) { + // Nothing to do if no orphans are allowed. + if mp.policy.MaxOrphanTxs <= 0 { + return + } + + // Limit the number orphan transactions to prevent memory exhaustion. + // This will periodically remove any expired orphans and evict a random + // orphan if space is still needed. + mp.limitNumOrphans() + txID := consensushashing.TransactionID(tx) + mp.orphans[*txID] = &orphanTx{ + tx: tx, + expiration: mstime.Now().Add(orphanTTL), + } + for _, txIn := range tx.Inputs { + if _, exists := mp.orphansByPrev[txIn.PreviousOutpoint]; !exists { + mp.orphansByPrev[txIn.PreviousOutpoint] = + make(map[consensusexternalapi.DomainTransactionID]*consensusexternalapi.DomainTransaction) + } + mp.orphansByPrev[txIn.PreviousOutpoint][*txID] = tx + } + + log.Debugf("Stored orphan transaction %s (total: %d)", consensushashing.TransactionID(tx), + len(mp.orphans)) +} + +// maybeAddOrphan potentially adds an orphan to the orphan pool. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) maybeAddOrphan(tx *consensusexternalapi.DomainTransaction) error { + // Ignore orphan transactions that are too large. This helps avoid + // a memory exhaustion attack based on sending a lot of really large + // orphans. In the case there is a valid transaction larger than this, + // it will ultimtely be rebroadcast after the parent transactions + // have been mined or otherwise received. + // + // Note that the number of orphan transactions in the orphan pool is + // also limited, so this equates to a maximum memory used of + // mp.policy.MaxOrphanTxSize * mp.policy.MaxOrphanTxs (which is ~5MB + // using the default values at the time this comment was written). + serializedLen := estimatedsize.TransactionEstimatedSerializedSize(tx) + if serializedLen > uint64(mp.policy.MaxOrphanTxSize) { + str := fmt.Sprintf("orphan transaction size of %d bytes is "+ + "larger than max allowed size of %d bytes", + serializedLen, mp.policy.MaxOrphanTxSize) + return txRuleError(RejectNonstandard, str) + } + + // Add the orphan if the none of the above disqualified it. + mp.addOrphan(tx) + + return nil +} + +// removeOrphanDoubleSpends removes all orphans which spend outputs spent by the +// passed transaction from the orphan pool. Removing those orphans then leads +// to removing all orphans which rely on them, recursively. This is necessary +// when a transaction is added to the main pool because it may spend outputs +// that orphans also spend. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) removeOrphanDoubleSpends(tx *consensusexternalapi.DomainTransaction) { + for _, txIn := range tx.Inputs { + for _, orphan := range mp.orphansByPrev[txIn.PreviousOutpoint] { + mp.removeOrphan(orphan, true) + } + } +} + +// isTransactionInPool returns whether or not the passed transaction already +// exists in the main pool. +// +// This function MUST be called with the mempool lock held (for reads). +func (mp *mempool) isTransactionInPool(txID *consensusexternalapi.DomainTransactionID) bool { + if _, exists := mp.pool[*txID]; exists { + return true + } + return mp.isInDependPool(txID) +} + +// isInDependPool returns whether or not the passed transaction already +// exists in the depend pool. +// +// This function MUST be called with the mempool lock held (for reads). +func (mp *mempool) isInDependPool(hash *consensusexternalapi.DomainTransactionID) bool { + if _, exists := mp.chainedTransactions[*hash]; exists { + return true + } + + return false +} + +// isOrphanInPool returns whether or not the passed transaction already exists +// in the orphan pool. +// +// This function MUST be called with the mempool lock held (for reads). +func (mp *mempool) isOrphanInPool(txID *consensusexternalapi.DomainTransactionID) bool { + if _, exists := mp.orphans[*txID]; exists { + return true + } + + return false +} + +// haveTransaction returns whether or not the passed transaction already exists +// in the main pool or in the orphan pool. +// +// This function MUST be called with the mempool lock held (for reads). +func (mp *mempool) haveTransaction(txID *consensusexternalapi.DomainTransactionID) bool { + return mp.isTransactionInPool(txID) || mp.isOrphanInPool(txID) +} + +// removeTransactionsFromPool removes given transactions from the mempool, and move their chained mempool +// transactions (if any) to the main pool. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) removeTransactionsFromPool(txs []*consensusexternalapi.DomainTransaction) error { + for _, tx := range txs[transactionhelper.CoinbaseTransactionIndex+1:] { + txID := consensushashing.TransactionID(tx) + + // We use the mempool transaction, because it has populated fee and mass + mempoolTx, exists := mp.fetchTxDesc(txID) + if !exists { + continue + } + + err := mp.cleanTransactionFromSets(mempoolTx.DomainTransaction) + if err != nil { + return err + } + + mp.updateBlockTransactionChainedTransactions(mempoolTx.DomainTransaction) + } + return nil +} + +type transactionAndOutpoint struct { + transaction *consensusexternalapi.DomainTransaction + outpoint *consensusexternalapi.DomainOutpoint +} + +// removeTransactionAndItsChainedTransactions removes a transaction and all of its chained transaction from the mempool. +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) removeTransactionAndItsChainedTransactions(transaction *consensusexternalapi.DomainTransaction) error { + transactionAndOutpointQueue := make([]*transactionAndOutpoint, 0) + transactionAndOutpointQueue = appendTransactionToTransactionAndOutpointQueue(transactionAndOutpointQueue, transaction) + // Remove any transactions which rely on this one. + for len(transactionAndOutpointQueue) > 0 { + txAndOutpoint := transactionAndOutpointQueue[0] + transactionAndOutpointQueue = transactionAndOutpointQueue[1:] + if txRedeemer, exists := mp.mempoolUTXOSet.poolTransactionBySpendingOutpoint(*txAndOutpoint.outpoint); exists { + transactionAndOutpointQueue = appendTransactionToTransactionAndOutpointQueue(transactionAndOutpointQueue, txRedeemer) + } + + transactionID := txAndOutpoint.outpoint.TransactionID + if _, exists := mp.chainedTransactions[transactionID]; exists { + mp.removeChainTransaction(txAndOutpoint.transaction) + } + err := mp.cleanTransactionFromSets(txAndOutpoint.transaction) + if err != nil { + return err + } + } + return nil +} + +func appendTransactionToTransactionAndOutpointQueue(queue []*transactionAndOutpoint, + transaction *consensusexternalapi.DomainTransaction) []*transactionAndOutpoint { + + transactionID := consensushashing.TransactionID(transaction) + queueWithAddedTransactionAndOutpoint := queue + for i := uint32(0); i < uint32(len(transaction.Outputs)); i++ { + previousOutpoint := consensusexternalapi.DomainOutpoint{TransactionID: *transactionID, Index: i} + queueWithAddedTransactionAndOutpoint = append(queueWithAddedTransactionAndOutpoint, + &transactionAndOutpoint{transaction: transaction, outpoint: &previousOutpoint}) + } + return queueWithAddedTransactionAndOutpoint +} + +// cleanTransactionFromSets removes the transaction from all mempool related transaction sets. +// It assumes that any chained transaction is already cleaned from the mempool. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) cleanTransactionFromSets(tx *consensusexternalapi.DomainTransaction) error { + err := mp.mempoolUTXOSet.removeTx(tx) + if err != nil { + return err + } + + txID := consensushashing.TransactionID(tx) + delete(mp.pool, *txID) + delete(mp.chainedTransactions, *txID) + + return mp.removeTransactionFromOrderedTransactionsByFeeRate(tx) +} + +// updateBlockTransactionChainedTransactions processes the dependencies of a +// transaction that was included in a block and was just now removed from the mempool. +// +// This function MUST be called with the mempool lock held (for writes). + +func (mp *mempool) updateBlockTransactionChainedTransactions(tx *consensusexternalapi.DomainTransaction) { + prevOut := consensusexternalapi.DomainOutpoint{TransactionID: *consensushashing.TransactionID(tx)} + for txOutIdx := range tx.Outputs { + // Skip to the next available output if there are none. + prevOut.Index = uint32(txOutIdx) + txDesc, exists := mp.chainedTransactionByPreviousOutpoint[prevOut] + if !exists { + continue + } + + txDesc.depCount-- + // If the transaction is not chained anymore, move it into the main pool + if txDesc.depCount == 0 { + // Transaction may be already removed by recursive calls, if removeRedeemers is true. + // So avoid moving it into main pool + txDescID := consensushashing.TransactionID(txDesc.DomainTransaction) + if _, ok := mp.chainedTransactions[*txDescID]; ok { + delete(mp.chainedTransactions, *txDescID) + mp.pool[*txDescID] = txDesc + } + } + delete(mp.chainedTransactionByPreviousOutpoint, prevOut) + } +} + +// removeChainTransaction removes a chain transaction and all of its relation as a result of double spend. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) removeChainTransaction(tx *consensusexternalapi.DomainTransaction) { + delete(mp.chainedTransactions, *consensushashing.TransactionID(tx)) + for _, txIn := range tx.Inputs { + delete(mp.chainedTransactionByPreviousOutpoint, txIn.PreviousOutpoint) + } +} + +// removeDoubleSpends removes all transactions which spend outputs spent by the +// passed transaction from the memory pool. Removing those transactions then +// leads to removing all transactions which rely on them, recursively. This is +// necessary when a block is connected to the DAG because the block may +// contain transactions which were previously unknown to the memory pool. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) removeDoubleSpends(tx *consensusexternalapi.DomainTransaction) error { + txID := consensushashing.TransactionID(tx) + for _, txIn := range tx.Inputs { + if txRedeemer, ok := mp.mempoolUTXOSet.poolTransactionBySpendingOutpoint(txIn.PreviousOutpoint); ok { + if !consensushashing.TransactionID(txRedeemer).Equal(txID) { + err := mp.removeTransactionAndItsChainedTransactions(txRedeemer) + if err != nil { + return err + } + } + } + } + return nil +} + +// addTransaction adds the passed transaction to the memory pool. It should +// not be called directly as it doesn't perform any validation. This is a +// helper for maybeAcceptTransaction. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) addTransaction(tx *consensusexternalapi.DomainTransaction, parentsInPool []consensusexternalapi.DomainOutpoint) (*txDescriptor, error) { + // Add the transaction to the pool and mark the referenced outpoints + // as spent by the pool. + txDescriptor := &txDescriptor{ + DomainTransaction: tx, + depCount: len(parentsInPool), + } + txID := *consensushashing.TransactionID(tx) + + if len(parentsInPool) == 0 { + mp.pool[txID] = txDescriptor + } else { + mp.chainedTransactions[txID] = txDescriptor + for _, previousOutpoint := range parentsInPool { + mp.chainedTransactionByPreviousOutpoint[previousOutpoint] = txDescriptor + } + } + + err := mp.mempoolUTXOSet.addTx(tx) + if err != nil { + return nil, err + } + + err = mp.addTransactionToOrderedTransactionsByFeeRate(tx) + if err != nil { + return nil, err + } + + return txDescriptor, nil +} + +func (mp *mempool) findTxIndexInOrderedTransactionsByFeeRate(tx *consensusexternalapi.DomainTransaction) (int, error) { + if tx.Fee == 0 || tx.Mass == 0 { + return 0, errors.Errorf("findTxIndexInOrderedTransactionsByFeeRate expects a transaction with " + + "populated fee and mass") + } + txID := consensushashing.TransactionID(tx) + txFeeRate := float64(tx.Fee) / float64(tx.Mass) + + return sort.Search(len(mp.orderedTransactionsByFeeRate), func(i int) bool { + elementFeeRate := float64(mp.orderedTransactionsByFeeRate[i].Fee) / float64(mp.orderedTransactionsByFeeRate[i].Mass) + if elementFeeRate > txFeeRate { + return true + } + + if elementFeeRate == txFeeRate && txID.LessOrEqual(consensushashing.TransactionID(mp.orderedTransactionsByFeeRate[i])) { + return true + } + + return false + }), nil +} + +func (mp *mempool) addTransactionToOrderedTransactionsByFeeRate(tx *consensusexternalapi.DomainTransaction) error { + index, err := mp.findTxIndexInOrderedTransactionsByFeeRate(tx) + if err != nil { + return err + } + + mp.orderedTransactionsByFeeRate = append(mp.orderedTransactionsByFeeRate[:index], + append([]*consensusexternalapi.DomainTransaction{tx}, mp.orderedTransactionsByFeeRate[index:]...)...) + + return nil +} + +func (mp *mempool) removeTransactionFromOrderedTransactionsByFeeRate(tx *consensusexternalapi.DomainTransaction) error { + index, err := mp.findTxIndexInOrderedTransactionsByFeeRate(tx) + if err != nil { + return err + } + + txID := consensushashing.TransactionID(tx) + if !consensushashing.TransactionID(mp.orderedTransactionsByFeeRate[index]).Equal(txID) { + return errors.Errorf("Couldn't find %s in mp.orderedTransactionsByFeeRate", txID) + } + + mp.orderedTransactionsByFeeRate = append(mp.orderedTransactionsByFeeRate[:index], mp.orderedTransactionsByFeeRate[index+1:]...) + return nil +} + +func (mp *mempool) enforceTransactionLimit() error { + const limit = 1_000_000 + if len(mp.pool)+len(mp.chainedTransactions) > limit { + // mp.orderedTransactionsByFeeRate[0] is the least profitable transaction + txToRemove := mp.orderedTransactionsByFeeRate[0] + log.Debugf("Mempool size is over the limit of %d transactions. Removing %s", + limit, + consensushashing.TransactionID(txToRemove), + ) + return mp.removeTransactionAndItsChainedTransactions(txToRemove) + } + return nil +} + +// checkPoolDoubleSpend checks whether or not the passed transaction is +// attempting to spend coins already spent by other transactions in the pool. +// Note it does not check for double spends against transactions already in the +// DAG. +// +// This function MUST be called with the mempool lock held (for reads). +func (mp *mempool) checkPoolDoubleSpend(tx *consensusexternalapi.DomainTransaction) error { + for _, txIn := range tx.Inputs { + if txR, exists := mp.mempoolUTXOSet.poolTransactionBySpendingOutpoint(txIn.PreviousOutpoint); exists { + str := fmt.Sprintf("output %s already spent by "+ + "transaction %s in the memory pool", + txIn.PreviousOutpoint, consensushashing.TransactionID(txR)) + return txRuleError(RejectDuplicate, str) + } + } + + return nil +} + +// This function MUST be called with the mempool lock held (for reads). +// This only fetches from the main transaction pool and does not include +// orphans. +// returns false in the second return parameter if transaction was not found +func (mp *mempool) fetchTxDesc(txID *consensusexternalapi.DomainTransactionID) (*txDescriptor, bool) { + txDesc, exists := mp.pool[*txID] + if !exists { + txDesc, exists = mp.chainedTransactions[*txID] + } + return txDesc, exists +} + +// maybeAcceptTransaction is the main workhorse for handling insertion of new +// free-standing transactions into a memory pool. It includes functionality +// such as rejecting duplicate transactions, ensuring transactions follow all +// rules, detecting orphan transactions, and insertion into the memory pool. +// +// If the transaction is an orphan (missing parent transactions), the +// transaction is NOT added to the orphan pool, but each unknown referenced +// parent is returned. Use ProcessTransaction instead if new orphans should +// be added to the orphan pool. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) maybeAcceptTransaction(tx *consensusexternalapi.DomainTransaction, rejectDupOrphans bool) ( + []*consensusexternalapi.DomainOutpoint, *txDescriptor, error) { + + txID := consensushashing.TransactionID(tx) + + // Don't accept the transaction if it already exists in the pool. This + // applies to orphan transactions as well when the reject duplicate + // orphans flag is set. This check is intended to be a quick check to + // weed out duplicates. + if mp.isTransactionInPool(txID) || (rejectDupOrphans && + mp.isOrphanInPool(txID)) { + + str := fmt.Sprintf("already have transaction %s", txID) + return nil, nil, txRuleError(RejectDuplicate, str) + } + + // Don't allow non-standard transactions if the network parameters + // forbid their acceptance. + if !mp.policy.AcceptNonStd { + err := checkTransactionStandard(tx, &mp.policy) + if err != nil { + // Attempt to extract a reject code from the error so + // it can be retained. When not possible, fall back to + // a non standard error. + rejectCode, found := extractRejectCode(err) + if !found { + rejectCode = RejectNonstandard + } + str := fmt.Sprintf("transaction %s is not standard: %s", + txID, err) + return nil, nil, txRuleError(rejectCode, str) + } + } + + // The transaction may not use any of the same outputs as other + // transactions already in the pool as that would ultimately result in a + // double spend. This check is intended to be quick and therefore only + // detects double spends within the transaction pool itself. The + // transaction could still be double spending coins from the DAG + // at this point. There is a more in-depth check that happens later + // after fetching the referenced transaction inputs from the DAG + // which examines the actual spend data and prevents double spends. + err := mp.checkPoolDoubleSpend(tx) + if err != nil { + return nil, nil, err + } + + // Don't allow the transaction if it exists in the DAG and is + // not already fully spent. + if mp.mempoolUTXOSet.checkExists(tx) { + return nil, nil, txRuleError(RejectDuplicate, "transaction already exists") + } + + // Transaction is an orphan if any of the referenced transaction outputs + // don't exist or are already spent. Adding orphans to the orphan pool + // is not handled by this function, and the caller should use + // maybeAddOrphan if this behavior is desired. + parentsInPool := mp.mempoolUTXOSet.populateUTXOEntries(tx) + + // This will populate the missing UTXOEntries. + err = mp.consensus.ValidateTransactionAndPopulateWithConsensusData(tx) + missingOutpoints := ruleerrors.ErrMissingTxOut{} + if err != nil { + if errors.As(err, &missingOutpoints) { + return missingOutpoints.MissingOutpoints, nil, nil + } + if errors.Is(err, ruleerrors.ErrImmatureSpend) { + return nil, nil, txRuleError(RejectImmatureSpend, "one of the transaction inputs spends an immature UTXO") + } + if errors.As(err, &ruleerrors.RuleError{}) { + return nil, nil, newRuleError(err) + } + return nil, nil, err + } + + if tx.Mass > mp.dagParams.MaxMassAcceptedByBlock { + return nil, nil, newRuleError(errors.Errorf("The transaction mass is %d which is "+ + "higher than the maxmimum of %d", tx.Mass, mp.dagParams.MaxMassAcceptedByBlock)) + } + + // Don't allow transactions with non-standard inputs if the network + // parameters forbid their acceptance. + if !mp.policy.AcceptNonStd { + err := checkInputsStandard(tx) + if err != nil { + // Attempt to extract a reject code from the error so + // it can be retained. When not possible, fall back to + // a non standard error. + rejectCode, found := extractRejectCode(err) + if !found { + rejectCode = RejectNonstandard + } + str := fmt.Sprintf("transaction %s has a non-standard "+ + "input: %s", txID, err) + return nil, nil, txRuleError(rejectCode, str) + } + } + + // Don't allow transactions with fees too low to get into a mined block + serializedSize := int64(estimatedsize.TransactionEstimatedSerializedSize(tx)) + minFee := uint64(calcMinRequiredTxRelayFee(serializedSize, + mp.policy.MinRelayTxFee)) + if tx.Fee < minFee { + str := fmt.Sprintf("transaction %s has %d fees which is under "+ + "the required amount of %d", txID, tx.Fee, + minFee) + return nil, nil, txRuleError(RejectInsufficientFee, str) + } + // Add to transaction pool. + txDesc, err := mp.addTransaction(tx, parentsInPool) + if err != nil { + return nil, nil, err + } + + log.Debugf("Accepted transaction %s (pool size: %d)", txID, + len(mp.pool)) + + err = mp.enforceTransactionLimit() + if err != nil { + return nil, nil, err + } + + return nil, txDesc, nil +} + +// processOrphans determines if there are any orphans which depend on the passed +// transaction hash (it is possible that they are no longer orphans) and +// potentially accepts them to the memory pool. It repeats the process for the +// newly accepted transactions (to detect further orphans which may no longer be +// orphans) until there are no more. +// +// It returns a slice of transactions added to the mempool. A nil slice means +// no transactions were moved from the orphan pool to the mempool. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *mempool) processOrphans(acceptedTx *consensusexternalapi.DomainTransaction) ([]*txDescriptor, error) { + var acceptedTxns []*txDescriptor + + // Start with processing at least the passed transaction. + processList := list.New() + processList.PushBack(acceptedTx) + for processList.Len() > 0 { + // Pop the transaction to process from the front of the list. + firstElement := processList.Remove(processList.Front()) + processItem := firstElement.(*consensusexternalapi.DomainTransaction) + + prevOut := consensusexternalapi.DomainOutpoint{TransactionID: *consensushashing.TransactionID(processItem)} + for txOutIdx := range processItem.Outputs { + // Look up all orphans that redeem the output that is + // now available. This will typically only be one, but + // it could be multiple if the orphan pool contains + // double spends. While it may seem odd that the orphan + // pool would allow this since there can only possibly + // ultimately be a single redeemer, it's important to + // track it this way to prevent malicious actors from + // being able to purposely constructing orphans that + // would otherwise make outputs unspendable. + // + // Skip to the next available output if there are none. + prevOut.Index = uint32(txOutIdx) + orphans, exists := mp.orphansByPrev[prevOut] + if !exists { + continue + } + + // Potentially accept an orphan into the tx pool. + for _, tx := range orphans { + missing, txD, err := mp.maybeAcceptTransaction( + tx, false) + if err != nil { + if !errors.As(err, &RuleError{}) { + return nil, err + } + + log.Warnf("Invalid orphan transaction: %s", err) + // The orphan is now invalid, so there + // is no way any other orphans which + // redeem any of its outputs can be + // accepted. Remove them. + mp.removeOrphan(tx, true) + break + } + + // Transaction is still an orphan. Try the next + // orphan which redeems this output. + if len(missing) > 0 { + continue + } + + // Transaction was accepted into the main pool. + // + // Add it to the list of accepted transactions + // that are no longer orphans, remove it from + // the orphan pool, and add it to the list of + // transactions to process so any orphans that + // depend on it are handled too. + acceptedTxns = append(acceptedTxns, txD) + mp.removeOrphan(tx, false) + processList.PushBack(tx) + + // Only one transaction for this outpoint can be + // accepted, so the rest are now double spends + // and are removed later. + break + } + } + } + + // Recursively remove any orphans that also redeem any outputs redeemed + // by the accepted transactions since those are now definitive double + // spends. + mp.removeOrphanDoubleSpends(acceptedTx) + for _, txDescriptor := range acceptedTxns { + mp.removeOrphanDoubleSpends(txDescriptor.DomainTransaction) + } + + return acceptedTxns, nil +} + +// ProcessTransaction is the main workhorse for handling insertion of new +// free-standing transactions into the memory pool. It includes functionality +// such as rejecting duplicate transactions, ensuring transactions follow all +// rules, orphan transaction handling, and insertion into the memory pool. +// +// It returns a slice of transactions added to the mempool. When the +// error is nil, the list will include the passed transaction itself along +// with any additional orphan transaactions that were added as a result of +// the passed one being accepted. +// +// This function is safe for concurrent access. +func (mp *mempool) ValidateAndInsertTransaction(tx *consensusexternalapi.DomainTransaction, allowOrphan bool) error { + log.Tracef("Processing transaction %s", consensushashing.TransactionID(tx)) + + // Protect concurrent access. + mp.mtx.Lock() + defer mp.mtx.Unlock() + + // Potentially accept the transaction to the memory pool. + missingParents, txD, err := mp.maybeAcceptTransaction(tx, true) + if err != nil { + return err + } + + if len(missingParents) == 0 { + // Accept any orphan transactions that depend on this + // transaction (they may no longer be orphans if all inputs + // are now available) and repeat for those accepted + // transactions until there are no more. + newTxs, err := mp.processOrphans(tx) + if err != nil { + return err + } + + acceptedTxs := make([]*txDescriptor, len(newTxs)+1) + + // Add the parent transaction first so remote nodes + // do not add orphans. + acceptedTxs[0] = txD + copy(acceptedTxs[1:], newTxs) + + return nil + } + + // The transaction is an orphan (has inputs missing). Reject + // it if the flag to allow orphans is not set. + if !allowOrphan { + // Only use the first missing parent transaction in + // the error message. + // + // NOTE: RejectDuplicate is really not an accurate + // reject code here, but it matches the reference + // implementation and there isn't a better choice due + // to the limited number of reject codes. Missing + // inputs is assumed to mean they are already spent + // which is not really always the case. + str := fmt.Sprintf("orphan transaction %s references "+ + "outputs of unknown or fully-spent "+ + "transaction %s", consensushashing.TransactionID(tx), missingParents[0]) + return txRuleError(RejectDuplicate, str) + } + + // Potentially add the orphan transaction to the orphan pool. + return mp.maybeAddOrphan(tx) +} + +// Count returns the number of transactions in the main pool. It does not +// include the orphan pool. +// +// This function is safe for concurrent access. +func (mp *mempool) Count() int { + mp.mtx.RLock() + defer mp.mtx.RUnlock() + count := len(mp.pool) + + return count +} + +// ChainedCount returns the number of chained transactions in the mempool. It does not +// include the orphan pool. +// +// This function is safe for concurrent access. +func (mp *mempool) ChainedCount() int { + mp.mtx.RLock() + defer mp.mtx.RUnlock() + return len(mp.chainedTransactions) +} + +// BlockCandidateTransactions returns a slice of all the candidate transactions for the next block +// This is safe for concurrent use +func (mp *mempool) BlockCandidateTransactions() []*consensusexternalapi.DomainTransaction { + mp.mtx.RLock() + defer mp.mtx.RUnlock() + + onEnd := logger.LogAndMeasureExecutionTime(log, "BlockCandidateTransactions") + defer onEnd() + + descs := make([]*consensusexternalapi.DomainTransaction, len(mp.pool)) + i := 0 + for _, desc := range mp.pool { + descs[i] = desc.DomainTransaction.Clone() // Clone the transaction to prevent data races. A shallow-copy might do as well + i++ + } + + return descs +} + +// HandleNewBlockTransactions removes all the transactions in the new block +// from the mempool and the orphan pool, and it also removes +// from the mempool transactions that double spend a +// transaction that is already in the DAG +func (mp *mempool) HandleNewBlockTransactions(txs []*consensusexternalapi.DomainTransaction) ([]*consensusexternalapi.DomainTransaction, error) { + // Protect concurrent access. + mp.mtx.Lock() + defer mp.mtx.Unlock() + + // Remove all of the transactions (except the coinbase) in the + // connected block from the transaction pool. Secondly, remove any + // transactions which are now double spends as a result of these + // new transactions. Finally, remove any transaction that is + // no longer an orphan. Transactions which depend on a confirmed + // transaction are NOT removed recursively because they are still + // valid. + err := mp.removeTransactionsFromPool(txs) + if err != nil { + return nil, errors.Wrapf(err, "Failed removing txs from pool") + } + acceptedTxs := make([]*consensusexternalapi.DomainTransaction, 0) + for _, tx := range txs[transactionhelper.CoinbaseTransactionIndex+1:] { + err := mp.removeDoubleSpends(tx) + if err != nil { + return nil, errors.Wrapf(err, "Failed removing tx from mempool: %s", consensushashing.TransactionID(tx)) + } + mp.removeOrphan(tx, false) + acceptedOrphans, err := mp.processOrphans(tx) + if err != nil { + return nil, err + } + + for _, acceptedOrphan := range acceptedOrphans { + acceptedTxs = append(acceptedTxs, acceptedOrphan.DomainTransaction) + } + } + + return acceptedTxs, nil +} + +func (mp *mempool) RemoveTransactions(txs []*consensusexternalapi.DomainTransaction) error { + // Protect concurrent access. + mp.mtx.Lock() + defer mp.mtx.Unlock() + + return mp.removeTransactionsFromPool(txs) +} diff --git a/domain/miningmanager/mempool/mempool_utxoset.go b/domain/miningmanager/mempool_old/mempool_utxoset.go similarity index 99% rename from domain/miningmanager/mempool/mempool_utxoset.go rename to domain/miningmanager/mempool_old/mempool_utxoset.go index 60d90015a..77cdd745c 100644 --- a/domain/miningmanager/mempool/mempool_utxoset.go +++ b/domain/miningmanager/mempool_old/mempool_utxoset.go @@ -1,4 +1,4 @@ -package mempool +package mempool_old import ( "math" diff --git a/domain/miningmanager/mempool/policy.go b/domain/miningmanager/mempool_old/policy.go similarity index 99% rename from domain/miningmanager/mempool/policy.go rename to domain/miningmanager/mempool_old/policy.go index c2bb310d3..4b52057cc 100644 --- a/domain/miningmanager/mempool/policy.go +++ b/domain/miningmanager/mempool_old/policy.go @@ -2,10 +2,11 @@ // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. -package mempool +package mempool_old import ( "fmt" + "github.com/kaspanet/kaspad/domain/consensus/utils/constants" consensusexternalapi "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" diff --git a/domain/miningmanager/mempool/policy_test.go b/domain/miningmanager/mempool_old/policy_test.go similarity index 99% rename from domain/miningmanager/mempool/policy_test.go rename to domain/miningmanager/mempool_old/policy_test.go index 156e43604..7c3fcc3da 100644 --- a/domain/miningmanager/mempool/policy_test.go +++ b/domain/miningmanager/mempool_old/policy_test.go @@ -2,7 +2,7 @@ // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. -package mempool +package mempool_old import ( "bytes" diff --git a/domain/miningmanager/miningmanager_test.go b/domain/miningmanager/miningmanager_test.go index 4cde5d3d1..798e1b2c5 100644 --- a/domain/miningmanager/miningmanager_test.go +++ b/domain/miningmanager/miningmanager_test.go @@ -1,10 +1,11 @@ package miningmanager_test import ( - "github.com/kaspanet/kaspad/domain/miningmanager/mempool" "strings" "testing" + "github.com/kaspanet/kaspad/domain/miningmanager/mempool_old" + "github.com/kaspanet/kaspad/domain/consensus" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/testapi" @@ -84,8 +85,8 @@ func TestImmatureSpend(t *testing.T) { miningManager := miningFactory.NewMiningManager(tc, &consensusConfig.Params) tx := createTransactionWithUTXOEntry(t, 0) err = miningManager.ValidateAndInsertTransaction(tx, false) - txRuleError := &mempool.TxRuleError{} - if !errors.As(err, txRuleError) || txRuleError.RejectCode != mempool.RejectImmatureSpend { + txRuleError := &mempool_old.TxRuleError{} + if !errors.As(err, txRuleError) || txRuleError.RejectCode != mempool_old.RejectImmatureSpend { t.Fatalf("Unexpected error %+v", err) } transactionsFromMempool := miningManager.AllTransactions()