diff --git a/app/app.go b/app/app.go index b5dc61809..81b7fa0de 100644 --- a/app/app.go +++ b/app/app.go @@ -21,7 +21,6 @@ import ( "github.com/kaspanet/kaspad/infrastructure/network/connmanager" "github.com/kaspanet/kaspad/infrastructure/network/dnsseed" "github.com/kaspanet/kaspad/infrastructure/network/netadapter" - "github.com/kaspanet/kaspad/util" "github.com/kaspanet/kaspad/util/panics" ) @@ -223,9 +222,7 @@ func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript MinRelayTxFee: cfg.MinRelayTxFee, MaxTxVersion: 1, }, - CalcSequenceLockNoLock: func(tx *util.Tx, utxoSet blockdag.UTXOSet) (*blockdag.SequenceLock, error) { - return dag.CalcSequenceLockNoLock(tx, utxoSet) - }, + CalcTxSequenceLockFromReferencedUTXOEntries: dag.CalcTxSequenceLockFromReferencedUTXOEntries, SigCache: sigCache, DAG: dag, } diff --git a/domain/blockdag/scriptval.go b/domain/blockdag/scriptval.go index a0ccfee9d..2364c835c 100644 --- a/domain/blockdag/scriptval.go +++ b/domain/blockdag/scriptval.go @@ -16,9 +16,10 @@ import ( // txValidateItem holds a transaction along with which input to validate. type txValidateItem struct { - txInIndex int - txIn *appmessage.TxIn - tx *util.Tx + txInIndex int + txIn *appmessage.TxIn + tx *util.Tx + referencedUTXOEntries []*UTXOEntry } // txValidator provides a type which asynchronously validates transaction @@ -28,7 +29,6 @@ type txValidator struct { validateChan chan *txValidateItem quitChan chan struct{} resultChan chan error - utxoSet UTXOSet flags txscript.ScriptFlags sigCache *txscript.SigCache } @@ -52,19 +52,8 @@ out: for { select { case txVI := <-v.validateChan: - // Ensure the referenced input utxo is available. txIn := txVI.txIn - entry, ok := v.utxoSet.Get(txIn.PreviousOutpoint) - if !ok { - str := fmt.Sprintf("unable to find unspent "+ - "output %s referenced from "+ - "transaction %s input %d", - txIn.PreviousOutpoint, txVI.tx.ID(), - txVI.txInIndex) - err := ruleError(ErrMissingTxOut, str) - v.sendResult(err) - break out - } + entry := txVI.referencedUTXOEntries[txVI.txInIndex] // Create a new script engine for the script pair. sigScript := txIn.SignatureScript @@ -165,12 +154,11 @@ func (v *txValidator) Validate(items []*txValidateItem) error { // newTxValidator returns a new instance of txValidator to be used for // validating transaction scripts asynchronously. -func newTxValidator(utxoSet UTXOSet, flags txscript.ScriptFlags, sigCache *txscript.SigCache) *txValidator { +func newTxValidator(flags txscript.ScriptFlags, sigCache *txscript.SigCache) *txValidator { return &txValidator{ validateChan: make(chan *txValidateItem), quitChan: make(chan struct{}), resultChan: make(chan error), - utxoSet: utxoSet, sigCache: sigCache, flags: flags, } @@ -178,22 +166,23 @@ func newTxValidator(utxoSet UTXOSet, flags txscript.ScriptFlags, sigCache *txscr // ValidateTransactionScripts validates the scripts for the passed transaction // using multiple goroutines. -func ValidateTransactionScripts(tx *util.Tx, utxoSet UTXOSet, flags txscript.ScriptFlags, sigCache *txscript.SigCache) error { +func ValidateTransactionScripts(tx *util.Tx, referencedUTXOEntries []*UTXOEntry, flags txscript.ScriptFlags, sigCache *txscript.SigCache) error { // Collect all of the transaction inputs and required information for // validation. txIns := tx.MsgTx().TxIn txValItems := make([]*txValidateItem, 0, len(txIns)) for txInIdx, txIn := range txIns { txVI := &txValidateItem{ - txInIndex: txInIdx, - txIn: txIn, - tx: tx, + txInIndex: txInIdx, + txIn: txIn, + tx: tx, + referencedUTXOEntries: referencedUTXOEntries, } txValItems = append(txValItems, txVI) } // Validate all of the inputs. - validator := newTxValidator(utxoSet, flags, sigCache) + validator := newTxValidator(flags, sigCache) return validator.Validate(txValItems) } @@ -208,18 +197,24 @@ func checkBlockScripts(block *blockNode, utxoSet UTXOSet, transactions []*util.T } txValItems := make([]*txValidateItem, 0, numInputs) for _, tx := range transactions { + referencedUTXOEntries, err := getReferencedUTXOEntries(tx, utxoSet) + if err != nil { + return err + } + for txInIdx, txIn := range tx.MsgTx().TxIn { txVI := &txValidateItem{ - txInIndex: txInIdx, - txIn: txIn, - tx: tx, + txInIndex: txInIdx, + txIn: txIn, + tx: tx, + referencedUTXOEntries: referencedUTXOEntries, } txValItems = append(txValItems, txVI) } } // Validate all of the inputs. - validator := newTxValidator(utxoSet, scriptFlags, sigCache) + validator := newTxValidator(scriptFlags, sigCache) start := time.Now() if err := validator.Validate(txValItems); err != nil { return err diff --git a/domain/blockdag/sequence_lock.go b/domain/blockdag/sequence_lock.go index 88797f280..9eb59d612 100644 --- a/domain/blockdag/sequence_lock.go +++ b/domain/blockdag/sequence_lock.go @@ -1,7 +1,6 @@ package blockdag import ( - "fmt" "github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/util" ) @@ -30,20 +29,41 @@ func (dag *BlockDAG) CalcSequenceLock(tx *util.Tx, utxoSet UTXOSet) (*SequenceLo dag.dagLock.RLock() defer dag.dagLock.RUnlock() - return dag.calcSequenceLock(dag.selectedTip(), utxoSet, tx) + return dag.calcTxSequenceLock(dag.selectedTip(), tx, utxoSet) } // CalcSequenceLockNoLock is lock free version of CalcSequenceLockWithLock // This function is unsafe for concurrent access. func (dag *BlockDAG) CalcSequenceLockNoLock(tx *util.Tx, utxoSet UTXOSet) (*SequenceLock, error) { - return dag.calcSequenceLock(dag.selectedTip(), utxoSet, tx) + return dag.calcTxSequenceLock(dag.selectedTip(), tx, utxoSet) } -// calcSequenceLock computes the relative lock-times for the passed +// calcTxSequenceLock computes the relative lock-times for the passed // transaction. See the exported version, CalcSequenceLock for further details. // // This function MUST be called with the DAG state lock held (for writes). -func (dag *BlockDAG) calcSequenceLock(node *blockNode, utxoSet UTXOSet, tx *util.Tx) (*SequenceLock, error) { +func (dag *BlockDAG) calcTxSequenceLock(node *blockNode, tx *util.Tx, utxoSet UTXOSet) (*SequenceLock, error) { + referencedUTXOEntries, err := getReferencedUTXOEntries(tx, utxoSet) + if err != nil { + return nil, err + } + + return dag.calcTxSequenceLockFromReferencedUTXOEntries(node, tx, referencedUTXOEntries) +} + +// CalcTxSequenceLockFromReferencedUTXOEntries computes the relative lock-times for the passed +// transaction, with the given referenced UTXO entries. See CalcSequenceLock for further details. +func (dag *BlockDAG) CalcTxSequenceLockFromReferencedUTXOEntries( + tx *util.Tx, referencedUTXOEntries []*UTXOEntry) (*SequenceLock, error) { + dag.dagLock.RLock() + defer dag.dagLock.RUnlock() + + return dag.calcTxSequenceLockFromReferencedUTXOEntries(dag.selectedTip(), tx, referencedUTXOEntries) +} + +func (dag *BlockDAG) calcTxSequenceLockFromReferencedUTXOEntries( + node *blockNode, tx *util.Tx, referencedUTXOEntries []*UTXOEntry) (*SequenceLock, error) { + // A value of -1 for each relative lock type represents a relative time // lock value that will allow a transaction to be included in a block // at any given height or time. @@ -56,22 +76,14 @@ func (dag *BlockDAG) calcSequenceLock(node *blockNode, utxoSet UTXOSet, tx *util return sequenceLock, nil } - mTx := tx.MsgTx() - for txInIndex, txIn := range mTx.TxIn { - entry, ok := utxoSet.Get(txIn.PreviousOutpoint) - if !ok { - str := fmt.Sprintf("output %s referenced from "+ - "transaction %s input %d either does not exist or "+ - "has already been spent", txIn.PreviousOutpoint, - tx.ID(), txInIndex) - return sequenceLock, ruleError(ErrMissingTxOut, str) - } + for i, txIn := range tx.MsgTx().TxIn { + utxoEntry := referencedUTXOEntries[i] // If the input blue score is set to the mempool blue score, then we // assume the transaction makes it into the next block when // evaluating its sequence blocks. - inputBlueScore := entry.BlockBlueScore() - if entry.IsUnaccepted() { + inputBlueScore := utxoEntry.BlockBlueScore() + if utxoEntry.IsUnaccepted() { inputBlueScore = dag.virtual.blueScore } diff --git a/domain/blockdag/validate.go b/domain/blockdag/validate.go index cd1fa2d99..163f4f158 100644 --- a/domain/blockdag/validate.go +++ b/domain/blockdag/validate.go @@ -349,17 +349,33 @@ func (dag *BlockDAG) checkProofOfWork(header *appmessage.BlockHeader, flags Beha // ValidateTxMass makes sure that the given transaction's mass does not exceed // the maximum allowed limit. Currently, it is equivalent to the block mass limit. // See CalcTxMass for further details. -func ValidateTxMass(tx *util.Tx, utxoSet UTXOSet) error { - txMass, err := CalcTxMassFromUTXOSet(tx, utxoSet) - if err != nil { - return err - } +func ValidateTxMass(tx *util.Tx, referencedUTXOEntries []*UTXOEntry) (uint64, error) { + txMass := calcTxMassFromReferencedUTXOEntries(tx, referencedUTXOEntries) if txMass > appmessage.MaxMassPerBlock { str := fmt.Sprintf("tx %s has mass %d, which is above the "+ "allowed limit of %d", tx.ID(), txMass, appmessage.MaxMassPerBlock) - return ruleError(ErrTxMassTooHigh, str) + return 0, ruleError(ErrTxMassTooHigh, str) } - return nil + return txMass, nil +} + +func calcTxMassFromReferencedUTXOEntries( + tx *util.Tx, referencedUTXOEntries []*UTXOEntry) uint64 { + + if tx.IsCoinBase() { + return calcCoinbaseTxMass(tx) + } + + previousScriptPubKeys := make([][]byte, 0, len(tx.MsgTx().TxIn)) + + for _, utxoEntry := range referencedUTXOEntries { + previousScriptPubKeys = append(previousScriptPubKeys, utxoEntry.ScriptPubKey()) + } + return CalcTxMass(tx, previousScriptPubKeys) +} + +func calcCoinbaseTxMass(tx *util.Tx) uint64 { + return CalcTxMass(tx, nil) } func validateBlockMass(pastUTXO UTXOSet, transactions []*util.Tx) error { @@ -868,7 +884,7 @@ func ensureNoDuplicateTx(utxoSet UTXOSet, transactions []*util.Tx) error { // // NOTE: The transaction MUST have already been sanity checked with the // CheckTransactionSanity function prior to calling this function. -func CheckTransactionInputsAndCalulateFee(tx *util.Tx, txBlueScore uint64, utxoSet UTXOSet, dagParams *dagconfig.Params, fastAdd bool) ( +func CheckTransactionInputsAndCalulateFee(tx *util.Tx, txBlueScore uint64, referencedUTXOEntries []*UTXOEntry, dagParams *dagconfig.Params, fastAdd bool) ( txFeeInSompi uint64, err error) { // Coinbase transactions have no standard inputs to validate. @@ -879,15 +895,7 @@ func CheckTransactionInputsAndCalulateFee(tx *util.Tx, txBlueScore uint64, utxoS txID := tx.ID() var totalSompiIn uint64 for txInIndex, txIn := range tx.MsgTx().TxIn { - // Ensure the referenced input transaction is available. - entry, ok := utxoSet.Get(txIn.PreviousOutpoint) - if !ok { - str := fmt.Sprintf("output %s referenced from "+ - "transaction %s input %d either does not exist or "+ - "has already been spent", txIn.PreviousOutpoint, - tx.ID(), txInIndex) - return 0, ruleError(ErrMissingTxOut, str) - } + entry := referencedUTXOEntries[txInIndex] if !fastAdd { if err = validateCoinbaseMaturity(dagParams, entry, txBlueScore, txIn); err != nil { @@ -1008,7 +1016,12 @@ func (dag *BlockDAG) checkConnectToPastUTXO(block *blockNode, pastUTXO UTXOSet, compactFeeFactory := newCompactFeeFactory() for _, tx := range transactions { - txFee, err := CheckTransactionInputsAndCalulateFee(tx, block.blueScore, pastUTXO, + referencedUTXOEntries, err := getReferencedUTXOEntries(tx, pastUTXO) + if err != nil { + return nil, err + } + + txFee, err := CheckTransactionInputsAndCalulateFee(tx, block.blueScore, referencedUTXOEntries, dag.Params, fastAdd) if err != nil { return nil, err @@ -1050,7 +1063,7 @@ func (dag *BlockDAG) checkConnectToPastUTXO(block *blockNode, pastUTXO UTXOSet, // A transaction can only be included within a block // once the sequence locks of *all* its inputs are // active. - sequenceLock, err := dag.calcSequenceLock(block, pastUTXO, tx) + sequenceLock, err := dag.calcTxSequenceLock(block, tx, pastUTXO) if err != nil { return nil, err } @@ -1075,6 +1088,27 @@ func (dag *BlockDAG) checkConnectToPastUTXO(block *blockNode, pastUTXO UTXOSet, return feeData, nil } +func getReferencedUTXOEntries(tx *util.Tx, utxoSet UTXOSet) ([]*UTXOEntry, error) { + + txIns := tx.MsgTx().TxIn + referencedUTXOEntries := make([]*UTXOEntry, 0, len(txIns)) + + for txInIndex, txIn := range txIns { + utxoEntry, ok := utxoSet.Get(txIn.PreviousOutpoint) + if !ok { + str := fmt.Sprintf("output %s referenced from "+ + "transaction %s input %d either does not exist or "+ + "has already been spent", txIn.PreviousOutpoint, + tx.ID(), txInIndex) + return nil, ruleError(ErrMissingTxOut, str) + } + + referencedUTXOEntries = append(referencedUTXOEntries, utxoEntry) + } + + return referencedUTXOEntries, nil +} + func (node *blockNode) validateUTXOCommitment(multiset *secp256k1.MultiSet) error { calculatedMultisetHash := daghash.Hash(*multiset.Finalize()) if !calculatedMultisetHash.IsEqual(node.utxoCommitment) { diff --git a/domain/mempool/mempool.go b/domain/mempool/mempool.go index 68a098bb8..f7d7d5a3f 100644 --- a/domain/mempool/mempool.go +++ b/domain/mempool/mempool.go @@ -48,16 +48,13 @@ type Config struct { // to policy. Policy Policy - // CalcSequenceLockNoLock defines the function to use in order to generate - // the current sequence lock for the given transaction using the passed - // utxo set. - CalcSequenceLockNoLock func(*util.Tx, blockdag.UTXOSet) (*blockdag.SequenceLock, error) - // SigCache defines a signature cache to use. SigCache *txscript.SigCache // DAG is the BlockDAG we want to use (mainly for UTXO checks) DAG *blockdag.BlockDAG + + CalcTxSequenceLockFromReferencedUTXOEntries func(tx *util.Tx, referencedUTXOEntries []*blockdag.UTXOEntry) (*blockdag.SequenceLock, error) } // Policy houses the policy (configuration parameters) which is used to @@ -92,7 +89,7 @@ type Policy struct { type TxDesc struct { mining.TxDesc - // depCount is not 0 for dependent transaction. Dependent transaction is + // depCount is not 0 for a chained transaction. A chained transaction is // one that is accepted to pool, but cannot be mined in next block because it // depends on outputs of accepted, but still not mined transaction depCount int @@ -113,22 +110,24 @@ type TxPool struct { // The following variables must only be used atomically. lastUpdated int64 // last time pool was updated - mtx sync.RWMutex - cfg Config - pool map[daghash.TxID]*TxDesc - depends map[daghash.TxID]*TxDesc - dependsByPrev map[appmessage.Outpoint]map[daghash.TxID]*TxDesc + mtx sync.RWMutex + cfg Config + + pool map[daghash.TxID]*TxDesc + + chainedTransactions map[daghash.TxID]*TxDesc + chainedTransactionByPreviousOutpoint map[appmessage.Outpoint]*TxDesc + orphans map[daghash.TxID]*orphanTx orphansByPrev map[appmessage.Outpoint]map[daghash.TxID]*util.Tx - outpoints map[appmessage.Outpoint]*util.Tx + + mempoolUTXOSet *mempoolUTXOSet // nextExpireScan is the time after which the orphan pool will be // scanned in order to evict orphans. This is NOT a hard deadline as // the scan will only run when an orphan is added to the pool as opposed // to on an unconditional timer. nextExpireScan mstime.Time - - mpUTXOSet blockdag.UTXOSet } // Ensure the TxPool type implements the mining.TxSource interface. @@ -341,7 +340,7 @@ func (mp *TxPool) IsTransactionInPool(hash *daghash.TxID) bool { // // This function MUST be called with the mempool lock held (for reads). func (mp *TxPool) isInDependPool(hash *daghash.TxID) bool { - if _, exists := mp.depends[*hash]; exists { + if _, exists := mp.chainedTransactions[*hash]; exists { return true } @@ -405,221 +404,132 @@ func (mp *TxPool) HaveTransaction(txID *daghash.TxID) bool { return haveTx } -// removeTransactions is the internal function which implements the public -// RemoveTransactions. See the comment for RemoveTransactions for more details. -// -// This method, in contrast to removeTransaction (singular), creates one utxoDiff -// and calls removeTransactionWithDiff on it for every transaction. This is an -// optimization to save us a good amount of allocations (specifically in -// UTXODiff.WithDiff) every time we accept a block. +// removeBlockTransactionsFromPool removes the transactions that are found in the block +// from the mempool, and move their chained mempool transactions (if any) to the main pool. // // This function MUST be called with the mempool lock held (for writes). -func (mp *TxPool) removeTransactions(txs []*util.Tx) error { - diff := blockdag.NewUTXODiff() - - for _, tx := range txs { +func (mp *TxPool) removeBlockTransactionsFromPool(block *util.Block) error { + for _, tx := range block.Transactions()[util.CoinbaseTransactionIndex+1:] { txID := tx.ID() if _, exists := mp.fetchTxDesc(txID); !exists { continue } - err := mp.removeTransactionWithDiff(tx, diff, false) + err := mp.cleanTransactionFromSets(tx) + if err != nil { + return err + } + + err = mp.updateBlockTransactionChainedTransactions(tx) if err != nil { return err } } - var err error - mp.mpUTXOSet, err = mp.mpUTXOSet.WithDiff(diff) - if err != nil { - return err - } atomic.StoreInt64(&mp.lastUpdated, mstime.Now().UnixMilliseconds()) return nil } -// removeTransaction is the internal function which implements the public -// RemoveTransaction. See the comment for RemoveTransaction for more details. +// removeTransactionAndItsChainedTransactions removes a transaction and all of its chained transaction from the mempool. // // This function MUST be called with the mempool lock held (for writes). -func (mp *TxPool) removeTransaction(tx *util.Tx, removeDependants bool, restoreInputs bool) error { +func (mp *TxPool) removeTransactionAndItsChainedTransactions(tx *util.Tx) error { txID := tx.ID() - if removeDependants { - // Remove any transactions which rely on this one. - for i := uint32(0); i < uint32(len(tx.MsgTx().TxOut)); i++ { - prevOut := appmessage.Outpoint{TxID: *txID, Index: i} - if txRedeemer, exists := mp.outpoints[prevOut]; exists { - err := mp.removeTransaction(txRedeemer, true, false) - if err != nil { - return err - } - } - } - } - - if _, exists := mp.fetchTxDesc(txID); !exists { - return nil - } - - diff := blockdag.NewUTXODiff() - err := mp.removeTransactionWithDiff(tx, diff, restoreInputs) - if err != nil { - return err - } - - mp.mpUTXOSet, err = mp.mpUTXOSet.WithDiff(diff) - if err != nil { - return err - } - atomic.StoreInt64(&mp.lastUpdated, mstime.Now().UnixMilliseconds()) - - return nil -} - -// removeTransactionWithDiff removes the transaction tx from the mempool while -// updating the UTXODiff diff with appropriate changes. diff is later meant to -// be withDiff'd against the mempool UTXOSet to update it. -// -// This method assumes that tx exists in the mempool. -func (mp *TxPool) removeTransactionWithDiff(tx *util.Tx, diff *blockdag.UTXODiff, restoreInputs bool) error { - txID := tx.ID() - - err := mp.removeTransactionUTXOEntriesFromDiff(tx, diff) - if err != nil { - return errors.Errorf("could not remove UTXOEntry from diff: %s", err) - } - - err = mp.markTransactionOutputsUnspent(tx, diff, restoreInputs) - if err != nil { - return errors.Errorf("could not mark transaction output as unspent: %s", err) - } - - txDesc, _ := mp.fetchTxDesc(txID) - if txDesc.depCount == 0 { - delete(mp.pool, *txID) - } else { - delete(mp.depends, *txID) - } - - mp.processRemovedTransactionDependencies(tx) - - return nil -} - -// removeTransactionUTXOEntriesFromDiff removes tx's UTXOEntries from the diff -func (mp *TxPool) removeTransactionUTXOEntriesFromDiff(tx *util.Tx, diff *blockdag.UTXODiff) error { - for idx := range tx.MsgTx().TxOut { - outpoint := *appmessage.NewOutpoint(tx.ID(), uint32(idx)) - entry, exists := mp.mpUTXOSet.Get(outpoint) - if exists { - err := diff.RemoveEntry(outpoint, entry) + // Remove any transactions which rely on this one. + for i := uint32(0); i < uint32(len(tx.MsgTx().TxOut)); i++ { + prevOut := appmessage.Outpoint{TxID: *txID, Index: i} + if txRedeemer, exists := mp.mempoolUTXOSet.poolTransactionBySpendingOutpoint(prevOut); exists { + err := mp.removeTransactionAndItsChainedTransactions(txRedeemer) if err != nil { return err } } } - return nil -} -// markTransactionOutputsUnspent updates the mempool so that tx's TXOs are unspent -// Iff restoreInputs is true then the inputs are restored back into the supplied diff -func (mp *TxPool) markTransactionOutputsUnspent(tx *util.Tx, diff *blockdag.UTXODiff, restoreInputs bool) error { - for _, txIn := range tx.MsgTx().TxIn { - if restoreInputs { - if prevTxDesc, exists := mp.pool[txIn.PreviousOutpoint.TxID]; exists { - prevOut := prevTxDesc.Tx.MsgTx().TxOut[txIn.PreviousOutpoint.Index] - entry := blockdag.NewUTXOEntry(prevOut, false, blockdag.UnacceptedBlueScore) - err := diff.AddEntry(txIn.PreviousOutpoint, entry) - if err != nil { - return err - } - } - if prevTxDesc, exists := mp.depends[txIn.PreviousOutpoint.TxID]; exists { - prevOut := prevTxDesc.Tx.MsgTx().TxOut[txIn.PreviousOutpoint.Index] - entry := blockdag.NewUTXOEntry(prevOut, false, blockdag.UnacceptedBlueScore) - err := diff.AddEntry(txIn.PreviousOutpoint, entry) - if err != nil { - return err - } - } - } - delete(mp.outpoints, txIn.PreviousOutpoint) + if _, exists := mp.chainedTransactions[*tx.ID()]; exists { + mp.removeChainTransaction(tx) } + + err := mp.cleanTransactionFromSets(tx) + if err != nil { + return err + } + + atomic.StoreInt64(&mp.lastUpdated, mstime.Now().UnixMilliseconds()) + return nil } -// processRemovedTransactionDependencies processes the dependencies of a -// transaction tx that was just now removed from the mempool -func (mp *TxPool) processRemovedTransactionDependencies(tx *util.Tx) { +// cleanTransactionFromSets removes the transaction from all mempool related transaction sets. +// It assumes that any chained transaction is already cleaned from the mempool. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *TxPool) cleanTransactionFromSets(tx *util.Tx) error { + err := mp.mempoolUTXOSet.removeTx(tx) + if err != nil { + return err + } + + txID := *tx.ID() + delete(mp.pool, txID) + delete(mp.chainedTransactions, txID) + + return nil +} + +// updateBlockTransactionChainedTransactions processes the dependencies of a +// transaction that was included in a block and was just now removed from the mempool. +// +// This function MUST be called with the mempool lock held (for writes). + +func (mp *TxPool) updateBlockTransactionChainedTransactions(tx *util.Tx) error { prevOut := appmessage.Outpoint{TxID: *tx.ID()} for txOutIdx := range tx.MsgTx().TxOut { // Skip to the next available output if there are none. prevOut.Index = uint32(txOutIdx) - depends, exists := mp.dependsByPrev[prevOut] + txDesc, exists := mp.chainedTransactionByPreviousOutpoint[prevOut] if !exists { continue } - // Move independent transactions into main pool - for _, txD := range depends { - txD.depCount-- - if txD.depCount == 0 { - // Transaction may be already removed by recursive calls, if removeRedeemers is true. - // So avoid moving it into main pool - if _, ok := mp.depends[*txD.Tx.ID()]; ok { - delete(mp.depends, *txD.Tx.ID()) - mp.pool[*txD.Tx.ID()] = txD - } + txDesc.depCount-- + // If the transaction is not chained anymore, move it into the main pool + if txDesc.depCount == 0 { + if _, ok := mp.chainedTransactions[*txDesc.Tx.ID()]; !ok { + return errors.Errorf("transactions %s is not found in the chained transaction map", txDesc.Tx.ID()) } + delete(mp.chainedTransactions, *txDesc.Tx.ID()) + mp.pool[*txDesc.Tx.ID()] = txDesc } - delete(mp.dependsByPrev, prevOut) + delete(mp.chainedTransactionByPreviousOutpoint, prevOut) + } + return nil +} + +// removeChainTransaction removes a chain transaction and all of its relation as a result of double spend. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *TxPool) removeChainTransaction(tx *util.Tx) { + delete(mp.chainedTransactions, *tx.ID()) + for _, txIn := range tx.MsgTx().TxIn { + delete(mp.chainedTransactionByPreviousOutpoint, txIn.PreviousOutpoint) } } -// RemoveTransaction removes the passed transaction from the mempool. When the -// removeDependants flag is set, any transactions that depend on the removed -// transaction (that is to say, redeem outputs from it) will also be removed -// recursively from the mempool, as they would otherwise become orphans. -// -// This function is safe for concurrent access. -func (mp *TxPool) RemoveTransaction(tx *util.Tx, removeDependants bool, restoreInputs bool) error { - // Protect concurrent access. - mp.mtx.Lock() - defer mp.mtx.Unlock() - return mp.removeTransaction(tx, removeDependants, restoreInputs) -} - -// RemoveTransactions removes the passed transactions from the mempool. -// -// This function is safe for concurrent access. -func (mp *TxPool) RemoveTransactions(txs []*util.Tx) error { - // Protect concurrent access. - mp.mtx.Lock() - defer mp.mtx.Unlock() - return mp.removeTransactions(txs) -} - -// RemoveDoubleSpends removes all transactions which spend outputs spent by the +// removeDoubleSpends removes all transactions which spend outputs spent by the // passed transaction from the memory pool. Removing those transactions then // leads to removing all transactions which rely on them, recursively. This is // necessary when a block is connected to the DAG because the block may // contain transactions which were previously unknown to the memory pool. // -// This function is safe for concurrent access. -func (mp *TxPool) RemoveDoubleSpends(tx *util.Tx) error { - // Protect concurrent access. - mp.mtx.Lock() - defer mp.mtx.Unlock() - return mp.removeDoubleSpends(tx) -} - +// This function MUST be called with the mempool lock held (for writes). func (mp *TxPool) removeDoubleSpends(tx *util.Tx) error { for _, txIn := range tx.MsgTx().TxIn { - if txRedeemer, ok := mp.outpoints[txIn.PreviousOutpoint]; ok { + if txRedeemer, ok := mp.mempoolUTXOSet.poolTransactionBySpendingOutpoint(txIn.PreviousOutpoint); ok { if !txRedeemer.ID().IsEqual(tx.ID()) { - err := mp.removeTransaction(txRedeemer, true, false) + err := mp.removeTransactionAndItsChainedTransactions(txRedeemer) if err != nil { return err } @@ -634,13 +544,9 @@ func (mp *TxPool) removeDoubleSpends(tx *util.Tx) error { // helper for maybeAcceptTransaction. // // This function MUST be called with the mempool lock held (for writes). -func (mp *TxPool) addTransaction(tx *util.Tx, fee uint64, parentsInPool []*appmessage.Outpoint) (*TxDesc, error) { +func (mp *TxPool) addTransaction(tx *util.Tx, mass uint64, fee uint64, parentsInPool []*appmessage.Outpoint) (*TxDesc, error) { // Add the transaction to the pool and mark the referenced outpoints // as spent by the pool. - mass, err := blockdag.CalcTxMassFromUTXOSet(tx, mp.mpUTXOSet) - if err != nil { - return nil, err - } txD := &TxDesc{ TxDesc: mining.TxDesc{ Tx: tx, @@ -654,23 +560,17 @@ func (mp *TxPool) addTransaction(tx *util.Tx, fee uint64, parentsInPool []*appme if len(parentsInPool) == 0 { mp.pool[*tx.ID()] = txD } else { - mp.depends[*tx.ID()] = txD + mp.chainedTransactions[*tx.ID()] = txD for _, previousOutpoint := range parentsInPool { - if _, exists := mp.dependsByPrev[*previousOutpoint]; !exists { - mp.dependsByPrev[*previousOutpoint] = make(map[daghash.TxID]*TxDesc) - } - mp.dependsByPrev[*previousOutpoint][*tx.ID()] = txD + mp.chainedTransactionByPreviousOutpoint[*previousOutpoint] = txD } } - for _, txIn := range tx.MsgTx().TxIn { - mp.outpoints[txIn.PreviousOutpoint] = tx - } - if isAccepted, err := mp.mpUTXOSet.AddTx(tx.MsgTx(), blockdag.UnacceptedBlueScore); err != nil { + err := mp.mempoolUTXOSet.addTx(tx) + if err != nil { return nil, err - } else if !isAccepted { - return nil, errors.Errorf("unexpectedly failed to add tx %s to the mempool utxo set", tx.ID()) } + atomic.StoreInt64(&mp.lastUpdated, mstime.Now().UnixMilliseconds()) return txD, nil @@ -684,7 +584,7 @@ func (mp *TxPool) addTransaction(tx *util.Tx, fee uint64, parentsInPool []*appme // This function MUST be called with the mempool lock held (for reads). func (mp *TxPool) checkPoolDoubleSpend(tx *util.Tx) error { for _, txIn := range tx.MsgTx().TxIn { - if txR, exists := mp.outpoints[txIn.PreviousOutpoint]; exists { + if txR, exists := mp.mempoolUTXOSet.poolTransactionBySpendingOutpoint(txIn.PreviousOutpoint); exists { str := fmt.Sprintf("output %s already spent by "+ "transaction %s in the memory pool", txIn.PreviousOutpoint, txR.ID()) @@ -695,22 +595,11 @@ func (mp *TxPool) checkPoolDoubleSpend(tx *util.Tx) error { return nil } -// CheckSpend checks whether the passed outpoint is already spent by a -// transaction in the mempool. If that's the case the spending transaction will -// be returned, if not nil will be returned. -func (mp *TxPool) CheckSpend(op appmessage.Outpoint) *util.Tx { - mp.mtx.RLock() - defer mp.mtx.RUnlock() - txR := mp.outpoints[op] - - return txR -} - // This function MUST be called with the mempool lock held (for reads). func (mp *TxPool) fetchTxDesc(txID *daghash.TxID) (*TxDesc, bool) { txDesc, exists := mp.pool[*txID] if !exists { - txDesc, exists = mp.depends[*txID] + txDesc, exists = mp.chainedTransactions[*txID] } return txDesc, exists } @@ -885,7 +774,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *util.Tx, rejectDupOrphans bool) ([] prevOut := appmessage.Outpoint{TxID: *txID} for txOutIdx := range tx.MsgTx().TxOut { prevOut.Index = uint32(txOutIdx) - _, ok := mp.mpUTXOSet.Get(prevOut) + _, _, ok := mp.mempoolUTXOSet.utxoEntryByOutpoint(prevOut) if ok { return nil, nil, txRuleError(RejectDuplicate, "transaction already exists") @@ -896,21 +785,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *util.Tx, rejectDupOrphans bool) ([] // don't exist or are already spent. Adding orphans to the orphan pool // is not handled by this function, and the caller should use // maybeAddOrphan if this behavior is desired. - var missingParents []*daghash.TxID - var parentsInPool []*appmessage.Outpoint - for _, txIn := range tx.MsgTx().TxIn { - if _, ok := mp.mpUTXOSet.Get(txIn.PreviousOutpoint); !ok { - // Must make a copy of the hash here since the iterator - // is replaced and taking its address directly would - // result in all of the entries pointing to the same - // memory location and thus all be the final hash. - txIDCopy := txIn.PreviousOutpoint.TxID - missingParents = append(missingParents, &txIDCopy) - } - if mp.isTransactionInPool(&txIn.PreviousOutpoint.TxID) { - parentsInPool = append(parentsInPool, &txIn.PreviousOutpoint) - } - } + spentUTXOEntries, parentsInPool, missingParents := mp.mempoolUTXOSet.transactionRelatedUTXOEntries(tx) if len(missingParents) > 0 { return missingParents, nil, nil } @@ -918,7 +793,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *util.Tx, rejectDupOrphans bool) ([] // Don't allow the transaction into the mempool unless its sequence // lock is active, meaning that it'll be allowed into the next block // with respect to its defined relative lock times. - sequenceLock, err := mp.cfg.CalcSequenceLockNoLock(tx, mp.mpUTXOSet) + sequenceLock, err := mp.cfg.CalcTxSequenceLockFromReferencedUTXOEntries(tx, spentUTXOEntries) if err != nil { var dagRuleErr blockdag.RuleError if ok := errors.As(err, &dagRuleErr); ok { @@ -934,7 +809,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *util.Tx, rejectDupOrphans bool) ([] // Don't allow transactions that exceed the maximum allowed // transaction mass. - err = blockdag.ValidateTxMass(tx, mp.mpUTXOSet) + mass, err := blockdag.ValidateTxMass(tx, spentUTXOEntries) if err != nil { var ruleError blockdag.RuleError if ok := errors.As(err, &ruleError); ok { @@ -948,7 +823,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *util.Tx, rejectDupOrphans bool) ([] // Also returns the fees associated with the transaction which will be // used later. txFee, err := blockdag.CheckTransactionInputsAndCalulateFee(tx, nextBlockBlueScore, - mp.mpUTXOSet, mp.cfg.DAG.Params, false) + spentUTXOEntries, mp.cfg.DAG.Params, false) if err != nil { var dagRuleErr blockdag.RuleError if ok := errors.As(err, &dagRuleErr); ok { @@ -960,7 +835,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *util.Tx, rejectDupOrphans bool) ([] // Don't allow transactions with non-standard inputs if the network // parameters forbid their acceptance. if !mp.cfg.Policy.AcceptNonStd { - err := checkInputsStandard(tx, mp.mpUTXOSet) + err := checkInputsStandard(tx, spentUTXOEntries) if err != nil { // Attempt to extract a reject code from the error so // it can be retained. When not possible, fall back to @@ -1008,7 +883,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *util.Tx, rejectDupOrphans bool) ([] // Verify crypto signatures for each input and reject the transaction if // any don't verify. - err = blockdag.ValidateTransactionScripts(tx, mp.mpUTXOSet, + err = blockdag.ValidateTransactionScripts(tx, spentUTXOEntries, txscript.StandardVerifyFlags, mp.cfg.SigCache) if err != nil { var dagRuleErr blockdag.RuleError @@ -1019,7 +894,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *util.Tx, rejectDupOrphans bool) ([] } // Add to transaction pool. - txD, err := mp.addTransaction(tx, txFee, parentsInPool) + txDesc, err := mp.addTransaction(tx, mass, txFee, parentsInPool) if err != nil { return nil, nil, err } @@ -1027,7 +902,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *util.Tx, rejectDupOrphans bool) ([] log.Debugf("Accepted transaction %s (pool size: %d)", txID, len(mp.pool)) - return nil, txD, nil + return nil, txDesc, nil } // processOrphans is the internal function which implements the public @@ -1124,8 +999,6 @@ func (mp *TxPool) processOrphans(acceptedTx *util.Tx) []*TxDesc { // // This function is safe for concurrent access. func (mp *TxPool) ProcessOrphans(acceptedTx *util.Tx) []*TxDesc { - mp.cfg.DAG.RLock() - defer mp.cfg.DAG.RUnlock() mp.mtx.Lock() defer mp.mtx.Unlock() acceptedTxns := mp.processOrphans(acceptedTx) @@ -1148,8 +1021,6 @@ func (mp *TxPool) ProcessTransaction(tx *util.Tx, allowOrphan bool) ([]*TxDesc, log.Tracef("Processing transaction %s", tx.ID()) // Protect concurrent access. - mp.cfg.DAG.RLock() - defer mp.cfg.DAG.RUnlock() mp.mtx.Lock() defer mp.mtx.Unlock() @@ -1210,14 +1081,14 @@ func (mp *TxPool) Count() int { return count } -// DepCount returns the number of dependent transactions in the main pool. It does not +// ChainedCount returns the number of chained transactions in the mempool. It does not // include the orphan pool. // // This function is safe for concurrent access. -func (mp *TxPool) DepCount() int { +func (mp *TxPool) ChainedCount() int { mp.mtx.RLock() defer mp.mtx.RUnlock() - return len(mp.depends) + return len(mp.chainedTransactions) } // TxIDs returns a slice of IDs for all of the transactions in the memory @@ -1287,13 +1158,9 @@ func (mp *TxPool) LastUpdated() mstime.Time { // transaction that is already in the DAG func (mp *TxPool) HandleNewBlock(block *util.Block) ([]*util.Tx, error) { // Protect concurrent access. - mp.cfg.DAG.RLock() - defer mp.cfg.DAG.RUnlock() mp.mtx.Lock() defer mp.mtx.Unlock() - oldUTXOSet := mp.mpUTXOSet - // Remove all of the transactions (except the coinbase) in the // connected block from the transaction pool. Secondly, remove any // transactions which are now double spends as a result of these @@ -1301,9 +1168,8 @@ func (mp *TxPool) HandleNewBlock(block *util.Block) ([]*util.Tx, error) { // no longer an orphan. Transactions which depend on a confirmed // transaction are NOT removed recursively because they are still // valid. - err := mp.removeTransactions(block.Transactions()[util.CoinbaseTransactionIndex+1:]) + err := mp.removeBlockTransactionsFromPool(block) if err != nil { - mp.mpUTXOSet = oldUTXOSet return nil, err } acceptedTxs := make([]*util.Tx, 0) @@ -1324,17 +1190,14 @@ func (mp *TxPool) HandleNewBlock(block *util.Block) ([]*util.Tx, error) { // New returns a new memory pool for validating and storing standalone // transactions until they are mined into a block. func New(cfg *Config) *TxPool { - virtualUTXO := cfg.DAG.UTXOSet() - mpUTXO := blockdag.NewDiffUTXOSet(virtualUTXO, blockdag.NewUTXODiff()) return &TxPool{ - cfg: *cfg, - pool: make(map[daghash.TxID]*TxDesc), - depends: make(map[daghash.TxID]*TxDesc), - dependsByPrev: make(map[appmessage.Outpoint]map[daghash.TxID]*TxDesc), - orphans: make(map[daghash.TxID]*orphanTx), - orphansByPrev: make(map[appmessage.Outpoint]map[daghash.TxID]*util.Tx), - nextExpireScan: mstime.Now().Add(orphanExpireScanInterval), - outpoints: make(map[appmessage.Outpoint]*util.Tx), - mpUTXOSet: mpUTXO, + cfg: *cfg, + pool: make(map[daghash.TxID]*TxDesc), + chainedTransactions: make(map[daghash.TxID]*TxDesc), + chainedTransactionByPreviousOutpoint: make(map[appmessage.Outpoint]*TxDesc), + orphans: make(map[daghash.TxID]*orphanTx), + orphansByPrev: make(map[appmessage.Outpoint]map[daghash.TxID]*util.Tx), + nextExpireScan: mstime.Now().Add(orphanExpireScanInterval), + mempoolUTXOSet: newMempoolUTXOSet(cfg.DAG), } } diff --git a/domain/mempool/mempool_test.go b/domain/mempool/mempool_test.go index ced02cd9d..966b56cab 100644 --- a/domain/mempool/mempool_test.go +++ b/domain/mempool/mempool_test.go @@ -23,7 +23,6 @@ import ( "github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/domain/blockdag" "github.com/kaspanet/kaspad/domain/dagconfig" - "github.com/kaspanet/kaspad/domain/mining" "github.com/kaspanet/kaspad/domain/txscript" "github.com/kaspanet/kaspad/util" "github.com/kaspanet/kaspad/util/daghash" @@ -70,8 +69,7 @@ func (s *fakeDAG) SetMedianTimePast(mtp mstime.Time) { s.medianTimePast = mtp } -func calcSequenceLock(tx *util.Tx, - utxoSet blockdag.UTXOSet) (*blockdag.SequenceLock, error) { +func calcTxSequenceLockFromReferencedUTXOEntries(tx *util.Tx, referencedUTXOEntries []*blockdag.UTXOEntry) (*blockdag.SequenceLock, error) { return &blockdag.SequenceLock{ Milliseconds: -1, @@ -256,7 +254,8 @@ func (tc *testContext) mineTransactions(transactions []*util.Tx, numberOfBlocks if i == 0 { blockTxs = msgTxs } - block, err := mining.PrepareBlockForTest(tc.harness.txPool.cfg.DAG, tc.harness.txPool.cfg.DAG.TipHashes(), blockTxs, false) + block, err := blockdag.PrepareBlockForTest( + tc.harness.txPool.cfg.DAG, tc.harness.txPool.cfg.DAG.TipHashes(), blockTxs) if err != nil { tc.t.Fatalf("PrepareBlockForTest: %s", err) } @@ -339,8 +338,8 @@ func newPoolHarness(t *testing.T, dagParams *dagconfig.Params, numOutputs uint32 MinRelayTxFee: 1000, // 1 sompi per byte MaxTxVersion: 1, }, - CalcSequenceLockNoLock: calcSequenceLock, - SigCache: nil, + CalcTxSequenceLockFromReferencedUTXOEntries: calcTxSequenceLockFromReferencedUTXOEntries, + SigCache: nil, }), } @@ -646,10 +645,8 @@ func TestProcessTransaction(t *testing.T) { t.Fatalf("PayToAddrScript: unexpected error: %v", err) } p2shTx := util.NewTx(appmessage.NewNativeMsgTx(1, nil, []*appmessage.TxOut{{Value: 5000000000, ScriptPubKey: p2shScriptPubKey}})) - if isAccepted, err := harness.txPool.mpUTXOSet.AddTx(p2shTx.MsgTx(), currentBlueScore+1); err != nil { + if err := harness.txPool.mempoolUTXOSet.addTx(p2shTx); err != nil { t.Fatalf("AddTx unexpectedly failed. Error: %s", err) - } else if !isAccepted { - t.Fatalf("AddTx unexpectedly didn't add tx %s", p2shTx.ID()) } txIns := []*appmessage.TxIn{{ @@ -691,8 +688,7 @@ func TestProcessTransaction(t *testing.T) { } // Checks that transactions get rejected from mempool if sequence lock is not active - harness.txPool.cfg.CalcSequenceLockNoLock = func(tx *util.Tx, - view blockdag.UTXOSet) (*blockdag.SequenceLock, error) { + harness.txPool.cfg.CalcTxSequenceLockFromReferencedUTXOEntries = func(tx *util.Tx, referencedUTXOEntries []*blockdag.UTXOEntry) (*blockdag.SequenceLock, error) { return &blockdag.SequenceLock{ Milliseconds: math.MaxInt64, @@ -714,7 +710,7 @@ func TestProcessTransaction(t *testing.T) { if err.Error() != expectedErrStr { t.Errorf("Unexpected error message. Expected \"%s\" but got \"%s\"", expectedErrStr, err.Error()) } - harness.txPool.cfg.CalcSequenceLockNoLock = calcSequenceLock + harness.txPool.cfg.CalcTxSequenceLockFromReferencedUTXOEntries = calcTxSequenceLockFromReferencedUTXOEntries // Transaction should be rejected from mempool because it has low fee, and its priority is above mining.MinHighPriority tx, err = harness.createTx(spendableOuts[4], 0, 1000) @@ -796,7 +792,7 @@ func TestDoubleSpends(t *testing.T) { // Then we assume tx3 is already in the DAG, so we need to remove // transactions that spends the same outpoints from the mempool - harness.txPool.RemoveDoubleSpends(tx3) + harness.txPool.removeDoubleSpends(tx3) // Ensures that only the transaction that double spends the same // funds as tx3 is removed, and the other one remains unaffected testPoolMembership(tc, tx1, false, false, false) @@ -1132,10 +1128,10 @@ func TestRemoveTransaction(t *testing.T) { testPoolMembership(tc, chainedTxns[3], false, true, true) testPoolMembership(tc, chainedTxns[4], false, true, true) - // Checks that when removeRedeemers is true, all of the transaction that are dependent on it get removed - err = harness.txPool.RemoveTransaction(chainedTxns[1], true, true) + // Checks that all of the transaction that are dependent on it get removed + err = harness.txPool.removeTransactionAndItsChainedTransactions(chainedTxns[1]) if err != nil { - t.Fatalf("RemoveTransaction: %v", err) + t.Fatalf("removeTransactionAndItsChainedTransactions: %v", err) } testPoolMembership(tc, chainedTxns[1], false, false, false) testPoolMembership(tc, chainedTxns[2], false, false, false) @@ -1429,9 +1425,9 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) { testPoolMembership(tc, doubleSpendTx, false, false, false) } -// TestCheckSpend tests that CheckSpend returns the expected spends found in +// TestPoolTransactionBySpendingOutpoint tests that poolTransactionBySpendingOutpoint returns the expected spends found in // the mempool. -func TestCheckSpend(t *testing.T) { +func TestPoolTransactionBySpendingOutpoint(t *testing.T) { tc, outputs, teardownFunc, err := newPoolHarness(t, &dagconfig.SimnetParams, 1, "TestCheckSpend") if err != nil { t.Fatalf("unable to create test pool: %v", err) @@ -1442,8 +1438,8 @@ func TestCheckSpend(t *testing.T) { // The mempool is empty, so none of the spendable outputs should have a // spend there. for _, op := range outputs { - spend := harness.txPool.CheckSpend(op.outpoint) - if spend != nil { + spend, ok := harness.txPool.mempoolUTXOSet.poolTransactionBySpendingOutpoint(op.outpoint) + if ok { t.Fatalf("Unexpeced spend found in pool: %v", spend) } } @@ -1466,7 +1462,7 @@ func TestCheckSpend(t *testing.T) { // The first tx in the chain should be the spend of the spendable // output. op := outputs[0].outpoint - spend := harness.txPool.CheckSpend(op) + spend, _ := harness.txPool.mempoolUTXOSet.poolTransactionBySpendingOutpoint(op) if spend != chainedTxns[0] { t.Fatalf("expected %v to be spent by %v, instead "+ "got %v", op, chainedTxns[0], spend) @@ -1479,7 +1475,7 @@ func TestCheckSpend(t *testing.T) { Index: 0, } expSpend := chainedTxns[i+1] - spend = harness.txPool.CheckSpend(op) + spend, _ = harness.txPool.mempoolUTXOSet.poolTransactionBySpendingOutpoint(op) if spend != expSpend { t.Fatalf("expected %v to be spent by %v, instead "+ "got %v", op, expSpend, spend) @@ -1491,7 +1487,7 @@ func TestCheckSpend(t *testing.T) { TxID: *chainedTxns[txChainLength-1].ID(), Index: 0, } - spend = harness.txPool.CheckSpend(op) + spend, _ = harness.txPool.mempoolUTXOSet.poolTransactionBySpendingOutpoint(op) if spend != nil { t.Fatalf("Unexpeced spend found in pool: %v", spend) } @@ -1518,16 +1514,21 @@ func TestCount(t *testing.T) { if err != nil { t.Errorf("ProcessTransaction: unexpected error: %v", err) } - if harness.txPool.Count()+harness.txPool.DepCount() != i+1 { + if harness.txPool.Count()+harness.txPool.ChainedCount() != i+1 { t.Errorf("TestCount: txPool expected to have %v transactions but got %v", i+1, harness.txPool.Count()) } } - err = harness.txPool.RemoveTransaction(chainedTxns[0], false, false) + // Mimic a situation where the first transaction is found in a block + fakeBlock := appmessage.NewMsgBlock(&appmessage.BlockHeader{}) + fakeCoinbase := &appmessage.MsgTx{} + fakeBlock.AddTransaction(fakeCoinbase) + fakeBlock.AddTransaction(chainedTxns[0].MsgTx()) + err = harness.txPool.removeBlockTransactionsFromPool(util.NewBlock(fakeBlock)) if err != nil { t.Fatalf("harness.CreateTxChain: unexpected error: %v", err) } - if harness.txPool.Count()+harness.txPool.DepCount() != 2 { + if harness.txPool.Count()+harness.txPool.ChainedCount() != 2 { t.Errorf("TestCount: txPool expected to have 2 transactions but got %v", harness.txPool.Count()) } } @@ -1636,82 +1637,15 @@ func TestHandleNewBlock(t *testing.T) { if err != nil { t.Fatalf("unable to create transaction 1: %v", err) } - dummyBlock.Transactions = append(dummyBlock.Transactions, blockTx1.MsgTx(), blockTx2.MsgTx()) - - // Create block and add its transactions to UTXO set - block := util.NewBlock(&dummyBlock) - for i, tx := range block.Transactions() { - if isAccepted, err := harness.txPool.mpUTXOSet.AddTx(tx.MsgTx(), 1); err != nil { - t.Fatalf("Failed to add transaction (%v,%v) to UTXO set: %v", i, tx.ID(), err) - } else if !isAccepted { - t.Fatalf("AddTx unexpectedly didn't add tx %s", tx.ID()) - } - } + block := blockdag.PrepareAndProcessBlockForTest(t, harness.txPool.cfg.DAG, harness.txPool.cfg.DAG.TipHashes(), []*appmessage.MsgTx{blockTx1.MsgTx(), blockTx2.MsgTx()}) // Handle new block by pool - _, err = harness.txPool.HandleNewBlock(block) + _, err = harness.txPool.HandleNewBlock(util.NewBlock(block)) // ensure that orphan transaction moved to main pool testPoolMembership(tc, orphanTx, false, true, false) } -// dummyBlock defines a block on the block DAG. It is used to test block operations. -var dummyBlock = appmessage.MsgBlock{ - Header: appmessage.BlockHeader{ - Version: 1, - ParentHashes: []*daghash.Hash{ - { - 0x82, 0xdc, 0xbd, 0xe6, 0x88, 0x37, 0x74, 0x5b, - 0x78, 0x6b, 0x03, 0x1d, 0xa3, 0x48, 0x3c, 0x45, - 0x3f, 0xc3, 0x2e, 0xd4, 0x53, 0x5b, 0x6f, 0x26, - 0x26, 0xb0, 0x48, 0x4f, 0x09, 0x00, 0x00, 0x00, - }, // Mainnet genesis - { - 0xc1, 0x5b, 0x71, 0xfe, 0x20, 0x70, 0x0f, 0xd0, - 0x08, 0x49, 0x88, 0x1b, 0x32, 0xb5, 0xbd, 0x13, - 0x17, 0xbe, 0x75, 0xe7, 0x29, 0x46, 0xdd, 0x03, - 0x01, 0x92, 0x90, 0xf1, 0xca, 0x8a, 0x88, 0x11, - }}, // Simnet genesis - HashMerkleRoot: &daghash.Hash{ - 0x66, 0x57, 0xa9, 0x25, 0x2a, 0xac, 0xd5, 0xc0, - 0xb2, 0x94, 0x09, 0x96, 0xec, 0xff, 0x95, 0x22, - 0x28, 0xc3, 0x06, 0x7c, 0xc3, 0x8d, 0x48, 0x85, - 0xef, 0xb5, 0xa4, 0xac, 0x42, 0x47, 0xe9, 0xf3, - }, // f3e94742aca4b5ef85488dc37c06c3282295ffec960994b2c0d5ac2a25a95766 - Timestamp: mstime.UnixMilliseconds(1529483563000), // 2018-06-20 08:32:43 +0000 UTC - Bits: 0x1e00ffff, // 503382015 - Nonce: 0x000ae53f, // 714047 - }, - Transactions: []*appmessage.MsgTx{ - { - Version: 1, - TxIn: []*appmessage.TxIn{}, - TxOut: []*appmessage.TxOut{ - { - Value: 0x12a05f200, // 5000000000 - ScriptPubKey: []byte{ - 0xa9, 0x14, 0xda, 0x17, 0x45, 0xe9, 0xb5, 0x49, - 0xbd, 0x0b, 0xfa, 0x1a, 0x56, 0x99, 0x71, 0xc7, - 0x7e, 0xba, 0x30, 0xcd, 0x5a, 0x4b, 0x87, - }, - }, - }, - LockTime: 0, - SubnetworkID: *subnetworkid.SubnetworkIDCoinbase, - Payload: []byte{ - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, - }, - PayloadHash: &daghash.Hash{ - 0x14, 0x06, 0xe0, 0x58, 0x81, 0xe2, 0x99, 0x36, - 0x77, 0x66, 0xd3, 0x13, 0xe2, 0x6c, 0x05, 0x56, - 0x4e, 0xc9, 0x1b, 0xf7, 0x21, 0xd3, 0x17, 0x26, - 0xbd, 0x6e, 0x46, 0xe6, 0x06, 0x89, 0x53, 0x9a, - }, - }, - }, -} - func TestTransactionGas(t *testing.T) { params := dagconfig.SimnetParams params.BlockCoinbaseMaturity = 0 diff --git a/domain/mempool/mempool_utxoset.go b/domain/mempool/mempool_utxoset.go new file mode 100644 index 000000000..8824cd471 --- /dev/null +++ b/domain/mempool/mempool_utxoset.go @@ -0,0 +1,115 @@ +package mempool + +import ( + "github.com/kaspanet/kaspad/app/appmessage" + "github.com/kaspanet/kaspad/domain/blockdag" + "github.com/kaspanet/kaspad/util" + "github.com/kaspanet/kaspad/util/daghash" + "github.com/pkg/errors" +) + +func newMempoolUTXOSet(dag *blockdag.BlockDAG) *mempoolUTXOSet { + return &mempoolUTXOSet{ + transactionByPreviousOutpoint: make(map[appmessage.Outpoint]*util.Tx), + poolUnspentOutputs: make(map[appmessage.Outpoint]*blockdag.UTXOEntry), + dag: dag, + } +} + +type mempoolUTXOSet struct { + transactionByPreviousOutpoint map[appmessage.Outpoint]*util.Tx + poolUnspentOutputs map[appmessage.Outpoint]*blockdag.UTXOEntry + dag *blockdag.BlockDAG +} + +func (mpus *mempoolUTXOSet) utxoEntryByOutpoint(outpoint appmessage.Outpoint) (entry *blockdag.UTXOEntry, isInPool bool, exists bool) { + entry, exists = mpus.dag.GetUTXOEntry(outpoint) + if !exists { + entry, exists := mpus.poolUnspentOutputs[outpoint] + if !exists { + return nil, false, false + } + return entry, true, true + } + return entry, false, true +} + +// addTx adds a transaction to the mempool UTXO set. It assumes that it doesn't double spend another transaction +// in the mempool, and that its outputs doesn't exist in the mempool UTXO set, and returns error otherwise. +func (mpus *mempoolUTXOSet) addTx(tx *util.Tx) error { + msgTx := tx.MsgTx() + for _, txIn := range msgTx.TxIn { + if existingTx, exists := mpus.transactionByPreviousOutpoint[txIn.PreviousOutpoint]; exists { + return errors.Errorf("outpoint %s is spent by the mempool transaction %s", txIn.PreviousOutpoint, existingTx.ID()) + } + mpus.transactionByPreviousOutpoint[txIn.PreviousOutpoint] = tx + } + + for i, txOut := range msgTx.TxOut { + outpoint := appmessage.NewOutpoint(tx.ID(), uint32(i)) + if _, exists := mpus.poolUnspentOutputs[*outpoint]; exists { + return errors.Errorf("outpoint %s already exists", outpoint) + } + mpus.poolUnspentOutputs[*outpoint] = blockdag.NewUTXOEntry(txOut, false, blockdag.UnacceptedBlueScore) + } + return nil +} + +// removeTx removes a transaction from the mempool UTXO set. +// Note: it doesn't re-add its previous outputs to the mempool UTXO set. +func (mpus *mempoolUTXOSet) removeTx(tx *util.Tx) error { + msgTx := tx.MsgTx() + for _, txIn := range msgTx.TxIn { + if _, exists := mpus.transactionByPreviousOutpoint[txIn.PreviousOutpoint]; !exists { + return errors.Errorf("outpoint %s doesn't exist", txIn.PreviousOutpoint) + } + delete(mpus.transactionByPreviousOutpoint, txIn.PreviousOutpoint) + } + + for i := range msgTx.TxOut { + outpoint := appmessage.NewOutpoint(tx.ID(), uint32(i)) + if _, exists := mpus.poolUnspentOutputs[*outpoint]; !exists { + return errors.Errorf("outpoint %s doesn't exist", outpoint) + } + delete(mpus.poolUnspentOutputs, *outpoint) + } + + return nil +} + +func (mpus *mempoolUTXOSet) poolTransactionBySpendingOutpoint(outpoint appmessage.Outpoint) (*util.Tx, bool) { + tx, exists := mpus.transactionByPreviousOutpoint[outpoint] + return tx, exists +} + +func (mpus *mempoolUTXOSet) transactionRelatedUTXOEntries(tx *util.Tx) (spentUTXOEntries []*blockdag.UTXOEntry, parentsInPool []*appmessage.Outpoint, missingParents []*daghash.TxID) { + msgTx := tx.MsgTx() + spentUTXOEntries = make([]*blockdag.UTXOEntry, len(msgTx.TxIn)) + missingParents = make([]*daghash.TxID, 0) + parentsInPool = make([]*appmessage.Outpoint, 0) + + isOrphan := false + for i, txIn := range msgTx.TxIn { + entry, isInPool, exists := mpus.utxoEntryByOutpoint(txIn.PreviousOutpoint) + if !exists { + isOrphan = true + missingParents = append(missingParents, &txIn.PreviousOutpoint.TxID) + } + + if isOrphan { + continue + } + + if isInPool { + parentsInPool = append(parentsInPool, &txIn.PreviousOutpoint) + } + + spentUTXOEntries[i] = entry + } + + if isOrphan { + return nil, nil, missingParents + } + + return spentUTXOEntries, parentsInPool, nil +} diff --git a/domain/mempool/policy.go b/domain/mempool/policy.go index 710396c95..d9bb9c3b9 100644 --- a/domain/mempool/policy.go +++ b/domain/mempool/policy.go @@ -80,7 +80,7 @@ func calcMinRequiredTxRelayFee(serializedSize int64, minRelayTxFee util.Amount) // context of this function is one whose referenced public key script is of a // standard form and, for pay-to-script-hash, does not have more than // maxStandardP2SHSigOps signature operations. -func checkInputsStandard(tx *util.Tx, utxoSet blockdag.UTXOSet) error { +func checkInputsStandard(tx *util.Tx, referencedUTXOEntries []*blockdag.UTXOEntry) error { // NOTE: The reference implementation also does a coinbase check here, // but coinbases have already been rejected prior to calling this // function so no need to recheck. @@ -89,7 +89,7 @@ func checkInputsStandard(tx *util.Tx, utxoSet blockdag.UTXOSet) error { // It is safe to elide existence and index checks here since // they have already been checked prior to calling this // function. - entry, _ := utxoSet.Get(txIn.PreviousOutpoint) + entry := referencedUTXOEntries[i] originScriptPubKey := entry.ScriptPubKey() switch txscript.GetScriptClass(originScriptPubKey) { case txscript.ScriptHashTy: diff --git a/domain/mining/mining.go b/domain/mining/mining.go index 92de2a10e..44c1f5d84 100644 --- a/domain/mining/mining.go +++ b/domain/mining/mining.go @@ -176,10 +176,17 @@ func NewBlkTmplGenerator(policy *Policy, // | <= policy.BlockMinSize) | | // ----------------------------------- -- func (g *BlkTmplGenerator) NewBlockTemplate(payToAddress util.Address, extraNonce uint64) (*BlockTemplate, error) { + + mempoolTransactions := g.txSource.MiningDescs() + + // The lock is called only after MiningDescs() to avoid a potential deadlock: + // MiningDescs() requires the TxPool's read lock, and TxPool.ProcessTransaction + // requires the dag's read lock, so if NewBlockTemplate will call the lock before, it + // might cause a dead lock. g.dag.Lock() defer g.dag.Unlock() - txsForBlockTemplate, err := g.selectTxs(payToAddress, extraNonce) + txsForBlockTemplate, err := g.selectTxs(mempoolTransactions, payToAddress, extraNonce) if err != nil { return nil, errors.Errorf("failed to select transactions: %s", err) } diff --git a/domain/mining/txselection.go b/domain/mining/txselection.go index 2531c86da..343519826 100644 --- a/domain/mining/txselection.go +++ b/domain/mining/txselection.go @@ -65,9 +65,8 @@ type txsForBlockTemplate struct { // Once the sum of probabilities of marked transactions is greater than // rebalanceThreshold percent of the sum of probabilities of all transactions, // rebalance. -func (g *BlkTmplGenerator) selectTxs(payToAddress util.Address, extraNonce uint64) (*txsForBlockTemplate, error) { - // Fetch the source transactions. - sourceTxs := g.txSource.MiningDescs() +func (g *BlkTmplGenerator) selectTxs(mempoolTransactions []*TxDesc, payToAddress util.Address, + extraNonce uint64) (*txsForBlockTemplate, error) { // Create a new txsForBlockTemplate struct, onto which all selectedTxs // will be appended. @@ -78,7 +77,7 @@ func (g *BlkTmplGenerator) selectTxs(payToAddress util.Address, extraNonce uint6 // Collect candidateTxs while excluding txs that will certainly not // be selected. - candidateTxs := g.collectCandidatesTxs(sourceTxs) + candidateTxs := g.collectCandidatesTxs(mempoolTransactions) log.Debugf("Considering %d transactions for inclusion to new block", len(candidateTxs))