[NOD-189] Optimize UTXOCollection operations (#307)

* [NOD-189] Made UTXODiff WithDiff and DiffFrom allocate collections with appropriate sizes.
In mempool HandleNewBlock, Replaced removeTransaction loop with removeTransactions.

* [NOD-189] Removed code duplication between removeTransaction and removeTransactions.

* [NOD-189] Fixed a merge error.

* [NOD-189] Fixed another merge error.

* [NOD-189] Renamed removeRedeemers to removeDependants.

* [NOD-189] Removed superfluous check inside removeTransactionWithDiff.

* [NOD-189] Added a comment to removeTransactions detailing what it optimizes.

* [NOD-189] Added documentation to removeTransactionWithDiff and split it into smaller methods.
This commit is contained in:
stasatdaglabs 2019-05-29 11:46:55 +03:00 committed by Svarog
parent 7069d173c6
commit 4e6edd4ffd
2 changed files with 182 additions and 92 deletions

View File

@ -173,7 +173,11 @@ func NewUTXODiff() *UTXODiff {
// 2. This diff contains a UTXO in toRemove, and the other diff does not contain it
// diffFrom results in the UTXO being added to toAdd
func (d *UTXODiff) diffFrom(other *UTXODiff) (*UTXODiff, error) {
result := NewUTXODiff()
result := UTXODiff{
toAdd: make(utxoCollection, len(d.toRemove)+len(other.toAdd)),
toRemove: make(utxoCollection, len(d.toAdd)+len(other.toRemove)),
diffMultiset: btcec.NewMultiset(btcec.S256()),
}
// Note that the following cases are not accounted for, as they are impossible
// as long as the base utxoSet is the same:
@ -223,7 +227,7 @@ func (d *UTXODiff) diffFrom(other *UTXODiff) (*UTXODiff, error) {
// Create a new diffMultiset as the subtraction of the two diffs.
result.diffMultiset = other.diffMultiset.Subtract(d.diffMultiset)
return result, nil
return &result, nil
}
// WithDiff applies provided diff to this diff, creating a new utxoDiff, that would be the result if
@ -253,7 +257,11 @@ func (d *UTXODiff) diffFrom(other *UTXODiff) (*UTXODiff, error) {
// 2. This diff contains a UTXO in toRemove, and the other diff does not contain it
// WithDiff results in the UTXO being added to toRemove
func (d *UTXODiff) WithDiff(diff *UTXODiff) (*UTXODiff, error) {
result := NewUTXODiff()
result := UTXODiff{
toAdd: make(utxoCollection, len(d.toAdd)+len(diff.toAdd)),
toRemove: make(utxoCollection, len(d.toRemove)+len(diff.toRemove)),
diffMultiset: btcec.NewMultiset(btcec.S256()),
}
// All transactions in d.toAdd:
// If they are not in diff.toRemove - should be added in result.toAdd
@ -300,7 +308,7 @@ func (d *UTXODiff) WithDiff(diff *UTXODiff) (*UTXODiff, error) {
// Apply diff.diffMultiset to d.diffMultiset
result.diffMultiset = d.diffMultiset.Union(diff.diffMultiset)
return result, nil
return &result, nil
}
// clone returns a clone of this utxoDiff

View File

@ -483,13 +483,48 @@ func (mp *TxPool) HaveTransaction(hash *daghash.TxID) bool {
return haveTx
}
// removeTransactions is the internal function which implements the public
// RemoveTransactions. See the comment for RemoveTransactions for more details.
//
// This method, in contrast to removeTransaction (singular), creates one utxoDiff
// and calls removeTransactionWithDiff on it for every transaction. This is an
// optimization to save us a good amount of allocations (specifically in
// UTXODiff.WithDiff) every time we accept a block.
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) removeTransactions(txs []*util.Tx) error {
diff := blockdag.NewUTXODiff()
for _, tx := range txs {
txID := tx.ID()
if _, exists := mp.fetchTransaction(txID); !exists {
continue
}
err := mp.removeTransactionWithDiff(tx, diff, false)
if err != nil {
return err
}
}
var err error
mp.mpUTXOSet, err = mp.mpUTXOSet.WithDiff(diff)
if err != nil {
return err
}
atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
return nil
}
// removeTransaction is the internal function which implements the public
// RemoveTransaction. See the comment for RemoveTransaction for more details.
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) removeTransaction(tx *util.Tx, removeRedeemers bool, restoreInputs bool) error {
func (mp *TxPool) removeTransaction(tx *util.Tx, removeDependants bool, restoreInputs bool) error {
txID := tx.ID()
if removeRedeemers {
if removeDependants {
// Remove any transactions which rely on this one.
for i := uint32(0); i < uint32(len(tx.MsgTx().TxOut)); i++ {
prevOut := wire.OutPoint{TxID: *txID, Index: i}
@ -502,104 +537,152 @@ func (mp *TxPool) removeTransaction(tx *util.Tx, removeRedeemers bool, restoreIn
}
}
msgTx := tx.MsgTx()
if _, exists := mp.fetchTransaction(txID); !exists {
return nil
}
// Remove the transaction if needed.
if txDesc, exists := mp.fetchTransaction(txID); exists {
// Remove unconfirmed address index entries associated with the
// transaction if enabled.
if mp.cfg.AddrIndex != nil {
mp.cfg.AddrIndex.RemoveUnconfirmedTx(txID)
diff := blockdag.NewUTXODiff()
err := mp.removeTransactionWithDiff(tx, diff, restoreInputs)
if err != nil {
return err
}
mp.mpUTXOSet, err = mp.mpUTXOSet.WithDiff(diff)
if err != nil {
return err
}
atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
return nil
}
// removeTransactionWithDiff removes the transaction tx from the mempool while
// updating the UTXODiff diff with appropriate changes. diff is later meant to
// be withDiff'd against the mempool UTXOSet to update it.
//
// This method assumes that tx exists in the mempool.
func (mp *TxPool) removeTransactionWithDiff(tx *util.Tx, diff *blockdag.UTXODiff, restoreInputs bool) error {
txID := tx.ID()
// Remove unconfirmed address index entries associated with the
// transaction if enabled.
if mp.cfg.AddrIndex != nil {
mp.cfg.AddrIndex.RemoveUnconfirmedTx(txID)
}
err := mp.removeTransactionUTXOEntriesFromDiff(tx, diff)
if err != nil {
return fmt.Errorf("could not remove UTXOEntry from diff: %s", err)
}
err = mp.markTransactionOutputsUnspent(tx, diff, restoreInputs)
if err != nil {
return fmt.Errorf("could not mark transaction output as unspent: %s", err)
}
txDesc, _ := mp.fetchTransaction(txID)
if txDesc.depCount == 0 {
delete(mp.pool, *txID)
} else {
delete(mp.depends, *txID)
}
mp.processRemovedTransactionDependencies(tx)
return nil
}
// removeTransactionUTXOEntriesFromDiff removes tx's UTXOEntries from the diff
func (mp *TxPool) removeTransactionUTXOEntriesFromDiff(tx *util.Tx, diff *blockdag.UTXODiff) error {
for idx := range tx.MsgTx().TxOut {
outPoint := *wire.NewOutPoint(tx.ID(), uint32(idx))
entry, exists := mp.mpUTXOSet.Get(outPoint)
if exists {
err := diff.RemoveEntry(outPoint, entry)
if err != nil {
return err
}
}
}
return nil
}
diff := blockdag.NewUTXODiff()
for idx := range msgTx.TxOut {
outPoint := *wire.NewOutPoint(txID, uint32(idx))
entry, exists := mp.mpUTXOSet.Get(outPoint)
if exists {
err := diff.RemoveEntry(outPoint, entry)
// markTransactionOutputsUnspent updates the mempool so that tx's TXOs are unspent
// Iff restoreInputs is true then the inputs are restored back into the supplied diff
func (mp *TxPool) markTransactionOutputsUnspent(tx *util.Tx, diff *blockdag.UTXODiff, restoreInputs bool) error {
for _, txIn := range tx.MsgTx().TxIn {
if restoreInputs {
if prevTxDesc, exists := mp.pool[txIn.PreviousOutPoint.TxID]; exists {
prevOut := prevTxDesc.Tx.MsgTx().TxOut[txIn.PreviousOutPoint.Index]
entry := blockdag.NewUTXOEntry(prevOut, false, blockdag.UnminedChainHeight)
err := diff.AddEntry(txIn.PreviousOutPoint, entry)
if err != nil {
return err
}
}
if prevTxDesc, exists := mp.depends[txIn.PreviousOutPoint.TxID]; exists {
prevOut := prevTxDesc.Tx.MsgTx().TxOut[txIn.PreviousOutPoint.Index]
entry := blockdag.NewUTXOEntry(prevOut, false, blockdag.UnminedChainHeight)
err := diff.AddEntry(txIn.PreviousOutPoint, entry)
if err != nil {
return err
}
}
}
// Mark the referenced outpoints as unspent by the pool.
for _, txIn := range msgTx.TxIn {
if restoreInputs {
if prevTxDesc, exists := mp.pool[txIn.PreviousOutPoint.TxID]; exists {
prevOut := prevTxDesc.Tx.MsgTx().TxOut[txIn.PreviousOutPoint.Index]
entry := blockdag.NewUTXOEntry(prevOut, false, blockdag.UnminedChainHeight)
err := diff.AddEntry(txIn.PreviousOutPoint, entry)
if err != nil {
return err
}
}
if prevTxDesc, exists := mp.depends[txIn.PreviousOutPoint.TxID]; exists {
prevOut := prevTxDesc.Tx.MsgTx().TxOut[txIn.PreviousOutPoint.Index]
entry := blockdag.NewUTXOEntry(prevOut, false, blockdag.UnminedChainHeight)
err := diff.AddEntry(txIn.PreviousOutPoint, entry)
if err != nil {
return err
}
}
}
delete(mp.outpoints, txIn.PreviousOutPoint)
}
if txDesc.depCount == 0 {
delete(mp.pool, *txID)
} else {
delete(mp.depends, *txID)
}
// Process dependent transactions
prevOut := wire.OutPoint{TxID: *txID}
for txOutIdx := range msgTx.TxOut {
// Skip to the next available output if there are none.
prevOut.Index = uint32(txOutIdx)
depends, exists := mp.dependsByPrev[prevOut]
if !exists {
continue
}
// Move independent transactions into main pool
for _, txD := range depends {
txD.depCount--
if txD.depCount == 0 {
// Transaction may be already removed by recursive calls, if removeRedeemers is true.
// So avoid moving it into main pool
if _, ok := mp.depends[*txD.Tx.ID()]; ok {
delete(mp.depends, *txD.Tx.ID())
mp.pool[*txD.Tx.ID()] = txD
}
}
}
delete(mp.dependsByPrev, prevOut)
}
var err error
mp.mpUTXOSet, err = mp.mpUTXOSet.WithDiff(diff)
if err != nil {
return err
}
atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
delete(mp.outpoints, txIn.PreviousOutPoint)
}
return nil
}
// processRemovedTransactionDependencies processes the dependencies of a
// transaction tx that was just now removed from the mempool
func (mp *TxPool) processRemovedTransactionDependencies(tx *util.Tx) {
prevOut := wire.OutPoint{TxID: *tx.ID()}
for txOutIdx := range tx.MsgTx().TxOut {
// Skip to the next available output if there are none.
prevOut.Index = uint32(txOutIdx)
depends, exists := mp.dependsByPrev[prevOut]
if !exists {
continue
}
// Move independent transactions into main pool
for _, txD := range depends {
txD.depCount--
if txD.depCount == 0 {
// Transaction may be already removed by recursive calls, if removeRedeemers is true.
// So avoid moving it into main pool
if _, ok := mp.depends[*txD.Tx.ID()]; ok {
delete(mp.depends, *txD.Tx.ID())
mp.pool[*txD.Tx.ID()] = txD
}
}
}
delete(mp.dependsByPrev, prevOut)
}
}
// RemoveTransaction removes the passed transaction from the mempool. When the
// removeRedeemers flag is set, any transactions that redeem outputs from the
// removed transaction will also be removed recursively from the mempool, as
// they would otherwise become orphans.
// removeDependants flag is set, any transactions that depend on the removed
// transaction (that is to say, redeem outputs from it) will also be removed
// recursively from the mempool, as they would otherwise become orphans.
//
// This function is safe for concurrent access.
func (mp *TxPool) RemoveTransaction(tx *util.Tx, removeRedeemers bool, restoreInputs bool) error {
func (mp *TxPool) RemoveTransaction(tx *util.Tx, removeDependants bool, restoreInputs bool) error {
// Protect concurrent access.
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.removeTransaction(tx, removeRedeemers, restoreInputs)
return mp.removeTransaction(tx, removeDependants, restoreInputs)
}
// RemoveTransactions removes the passed transactions from the mempool.
//
// This function is safe for concurrent access.
func (mp *TxPool) RemoveTransactions(txs []*util.Tx) error {
// Protect concurrent access.
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.removeTransactions(txs)
}
// RemoveDoubleSpends removes all transactions which spend outputs spent by the
@ -1344,7 +1427,6 @@ func (mp *TxPool) LastUpdated() time.Time {
// from the mempool transactions that double spend a
// transaction that is already in the DAG
func (mp *TxPool) HandleNewBlock(block *util.Block, txChan chan NewBlockMsg) error {
oldUTXOSet := mp.mpUTXOSet
// Remove all of the transactions (except the coinbase) in the
@ -1354,12 +1436,12 @@ func (mp *TxPool) HandleNewBlock(block *util.Block, txChan chan NewBlockMsg) err
// no longer an orphan. Transactions which depend on a confirmed
// transaction are NOT removed recursively because they are still
// valid.
err := mp.RemoveTransactions(block.Transactions()[util.FeeTransactionIndex:])
if err != nil {
mp.mpUTXOSet = oldUTXOSet
return err
}
for _, tx := range block.Transactions()[util.FeeTransactionIndex:] {
err := mp.RemoveTransaction(tx, false, false)
if err != nil {
mp.mpUTXOSet = oldUTXOSet
return err
}
mp.RemoveDoubleSpends(tx)
mp.RemoveOrphan(tx)
acceptedTxs := mp.ProcessOrphans(tx)