diff --git a/blockdag/dag.go b/blockdag/dag.go index 88660aae9..999a3208a 100644 --- a/blockdag/dag.go +++ b/blockdag/dag.go @@ -503,7 +503,7 @@ func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block) error { } // Add the node to the virtual and update the UTXO set of the DAG. - utxoDiff, err := dag.applyUTXOChanges(node, block) + utxoDiff, acceptedTxsData, err := dag.applyUTXOChanges(node, block) if err != nil { return err } @@ -540,7 +540,7 @@ func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block) error { // optional indexes with the block being connected so they can // update themselves accordingly. if dag.indexManager != nil { - err := dag.indexManager.ConnectBlock(dbTx, block, dag) + err := dag.indexManager.ConnectBlock(dbTx, block, dag, acceptedTxsData) if err != nil { return err } @@ -570,7 +570,7 @@ func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block) error { // 5. Updates each of the tips' utxoDiff. // // This function MUST be called with the chain state lock held (for writes). -func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (*utxoDiff, error) { +func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (utxoDiff *utxoDiff, acceptedTxData []*TxWithBlockHash, err error) { // Prepare provisionalNodes for all the relevant nodes to avoid modifying the original nodes. // We avoid modifying the original nodes in this function because it could potentially // fail if the block is not valid, thus bringing all the affected nodes (and the virtual) @@ -581,14 +581,14 @@ func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (*utxo // Clone the virtual block so that we don't modify the existing one. virtualClone := dag.virtual.clone() - newBlockUTXO, err := newNodeProvisional.verifyAndBuildUTXO(virtualClone, dag.db) + newBlockUTXO, acceptedTxData, err := newNodeProvisional.verifyAndBuildUTXO(virtualClone, dag.db) if err != nil { - return nil, fmt.Errorf("error verifying UTXO for %v: %s", node, err) + return nil, nil, fmt.Errorf("error verifying UTXO for %v: %s", node, err) } err = newNodeProvisional.updateParents(virtualClone, newBlockUTXO) if err != nil { - return nil, fmt.Errorf("failed updating parents of %v: %s", node, err) + return nil, nil, fmt.Errorf("failed updating parents of %v: %s", node, err) } // Update the virtual block's children (the DAG tips) to include the new block. @@ -596,20 +596,20 @@ func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (*utxo // Build a UTXO set for the new virtual block and update the DAG tips' diffs. virtualNodeProvisional := provisionalSet.newProvisionalNode(&virtualClone.blockNode, true, nil) - newVirtualUTXO, err := virtualNodeProvisional.pastUTXO(virtualClone, dag.db) + newVirtualUTXO, _, err := virtualNodeProvisional.pastUTXO(virtualClone, dag.db) if err != nil { - return nil, fmt.Errorf("could not restore past UTXO for virtual %v: %s", virtualClone, err) + return nil, nil, fmt.Errorf("could not restore past UTXO for virtual %v: %s", virtualClone, err) } // Apply new utxoDiffs to all the tips err = updateTipsUTXO(virtualNodeProvisional.parents, virtualClone, newVirtualUTXO) if err != nil { - return nil, fmt.Errorf("failed updating the tips' UTXO: %s", err) + return nil, nil, fmt.Errorf("failed updating the tips' UTXO: %s", err) } // It is now safe to meld the UTXO set to base. diffSet := newVirtualUTXO.(*DiffUTXOSet) - utxoDiff := diffSet.UTXODiff + utxoDiff = diffSet.UTXODiff dag.updateVirtualUTXO(diffSet) // It is now safe to commit all the provisionalNodes @@ -626,7 +626,7 @@ func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (*utxo // It is now safe to apply the new virtual block dag.virtual = virtualClone - return utxoDiff, nil + return utxoDiff, acceptedTxData, nil } func (dag *BlockDAG) updateVirtualUTXO(newVirtualUTXODiffSet *DiffUTXOSet) { @@ -704,46 +704,60 @@ func (pns provisionalNodeSet) newProvisionalNode(node *blockNode, withRelatives } // verifyAndBuildUTXO verifies all transactions in the given block (in provisionalNode format) and builds its UTXO -func (p *provisionalNode) verifyAndBuildUTXO(virtual *virtualBlock, db database.DB) (UTXOSet, error) { - pastUTXO, err := p.pastUTXO(virtual, db) +func (p *provisionalNode) verifyAndBuildUTXO(virtual *virtualBlock, db database.DB) (utxoSet UTXOSet, acceptedTxData []*TxWithBlockHash, err error) { + pastUTXO, pastUTXOaccpetedTxData, err := p.pastUTXO(virtual, db) if err != nil { - return nil, err + return nil, nil, err } diff := NewUTXODiff() + acceptedTxData = make([]*TxWithBlockHash, 0, len(pastUTXOaccpetedTxData)+len(p.transactions)) + if len(pastUTXOaccpetedTxData) != 0 { + acceptedTxData = append(acceptedTxData, pastUTXOaccpetedTxData...) + } for _, tx := range p.transactions { txDiff, err := pastUTXO.diffFromTx(tx.MsgTx(), p.original) if err != nil { - return nil, err + return nil, nil, err } diff, err = diff.WithDiff(txDiff) if err != nil { - return nil, err + return nil, nil, err } + acceptedTxData = append(acceptedTxData, &TxWithBlockHash{ + Tx: tx, + InBlock: &p.original.hash, + }) } utxo, err := pastUTXO.WithDiff(diff) if err != nil { - return nil, err + return nil, nil, err } - return utxo, nil + return utxo, acceptedTxData, nil +} + +// TxWithBlockHash is a type that holds data about in which block a transaction was found +type TxWithBlockHash struct { + Tx *util.Tx + InBlock *daghash.Hash } // pastUTXO returns the UTXO of a given block's (in provisionalNode format) past -func (p *provisionalNode) pastUTXO(virtual *virtualBlock, db database.DB) (UTXOSet, error) { - pastUTXO, err := p.selectedParent.restoreUTXO(virtual) +func (p *provisionalNode) pastUTXO(virtual *virtualBlock, db database.DB) (pastUTXO UTXOSet, acceptedTxData []*TxWithBlockHash, err error) { + pastUTXO, err = p.selectedParent.restoreUTXO(virtual) if err != nil { - return nil, err + return nil, nil, err } // Fetch from the database all the transactions for this block's blue set (besides the selected parent) - var blueBlockTransactions []*util.Tx + var blueBlockTransactions []*TxWithBlockHash + transactionCount := 0 err = db.View(func(tx database.Tx) error { // Precalculate the amount of transactions in this block's blue set, besides the selected parent. // This is to avoid an attack in which an attacker fabricates a block that will deliberately cause // a lot of copying, causing a high cost to the whole network. - transactionCount := 0 blueBlocks := make([]*util.Block, 0, len(p.original.blues)-1) for i := len(p.original.blues) - 1; i >= 0; i-- { blueBlockNode := p.original.blues[i] @@ -760,24 +774,31 @@ func (p *provisionalNode) pastUTXO(virtual *virtualBlock, db database.DB) (UTXOS blueBlocks = append(blueBlocks, blueBlock) } - blueBlockTransactions = make([]*util.Tx, 0, transactionCount) + blueBlockTransactions = make([]*TxWithBlockHash, 0, transactionCount) for _, blueBlock := range blueBlocks { - blueBlockTransactions = append(blueBlockTransactions, blueBlock.Transactions()...) + for _, tx := range blueBlock.Transactions() { + blueBlockTransactions = append(blueBlockTransactions, &TxWithBlockHash{Tx: tx, InBlock: blueBlock.Hash()}) + } } return nil }) if err != nil { - return nil, err + return nil, nil, err } + acceptedTxData = make([]*TxWithBlockHash, 0, transactionCount) + // Add all transactions to the pastUTXO // Purposefully ignore failures - these are just unaccepted transactions for _, tx := range blueBlockTransactions { - _ = pastUTXO.AddTx(tx.MsgTx(), p.original.height) + isAccepted := pastUTXO.AddTx(tx.Tx.MsgTx(), p.original.height) + if isAccepted { + acceptedTxData = append(acceptedTxData, tx) + } } - return pastUTXO, nil + return pastUTXO, acceptedTxData, nil } // restoreUTXO restores the UTXO of a given block (in provisionalNode format) from its diff @@ -1366,11 +1387,7 @@ type IndexManager interface { // ConnectBlock is invoked when a new block has been connected to the // DAG. - ConnectBlock(database.Tx, *util.Block, *BlockDAG) error - - // DisconnectBlock is invoked when a block has been disconnected from - // the DAG. - DisconnectBlock(database.Tx, *util.Block, *BlockDAG) error + ConnectBlock(database.Tx, *util.Block, *BlockDAG, []*TxWithBlockHash) error } // Config is a descriptor which specifies the blockchain instance configuration. diff --git a/blockdag/indexers/addrindex.go b/blockdag/indexers/addrindex.go index c85f5ba97..be926f056 100644 --- a/blockdag/indexers/addrindex.go +++ b/blockdag/indexers/addrindex.go @@ -693,7 +693,7 @@ func (idx *AddrIndex) indexBlock(data writeIndexData, block *util.Block, dag *bl // the transactions in the block involve. // // This is part of the Indexer interface. -func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error { +func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, _ []*blockdag.TxWithBlockHash) error { // The offset and length of the transactions within the serialized // block. txLocs, err := block.TxLoc() diff --git a/blockdag/indexers/cfindex.go b/blockdag/indexers/cfindex.go index b5d982d02..e273533ed 100644 --- a/blockdag/indexers/cfindex.go +++ b/blockdag/indexers/cfindex.go @@ -203,7 +203,7 @@ func storeFilter(dbTx database.Tx, block *util.Block, f *gcs.Filter, // connected to the main chain. This indexer adds a hash-to-cf mapping for // every passed block. This is part of the Indexer interface. func (idx *CfIndex) ConnectBlock(dbTx database.Tx, block *util.Block, - _ *blockdag.BlockDAG) error { + _ *blockdag.BlockDAG, _ []*blockdag.TxWithBlockHash) error { f, err := builder.BuildBasicFilter(block.MsgBlock()) if err != nil { diff --git a/blockdag/indexers/common.go b/blockdag/indexers/common.go index a21f8f28b..eee1f5469 100644 --- a/blockdag/indexers/common.go +++ b/blockdag/indexers/common.go @@ -52,11 +52,7 @@ type Indexer interface { // ConnectBlock is invoked when the index manager is notified that a new // block has been connected to the DAG. - ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error - - // DisconnectBlock is invoked when the index manager is notified that a - // block has been disconnected from the DAG. - DisconnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error + ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, _ []*blockdag.TxWithBlockHash) error } // AssertError identifies an error that indicates an internal code consistency diff --git a/blockdag/indexers/manager.go b/blockdag/indexers/manager.go index d17ccb8b7..c4b3663c4 100644 --- a/blockdag/indexers/manager.go +++ b/blockdag/indexers/manager.go @@ -181,7 +181,7 @@ func indexNeedsInputs(index Indexer) bool { // loads it from the database. func dbFetchTx(dbTx database.Tx, hash *daghash.Hash) (*wire.MsgTx, error) { // Look up the location of the transaction. - blockRegion, err := dbFetchTxIndexEntry(dbTx, hash) + blockRegion, err := dbFetchFirstTxRegion(dbTx, hash) if err != nil { return nil, err } @@ -210,31 +210,12 @@ func dbFetchTx(dbTx database.Tx, hash *daghash.Hash) (*wire.MsgTx, error) { // checks, and invokes each indexer. // // This is part of the blockchain.IndexManager interface. -func (m *Manager) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error { +func (m *Manager) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, acceptedTxsData []*blockdag.TxWithBlockHash) error { // Call each of the currently active optional indexes with the block // being connected so they can update accordingly. for _, index := range m.enabledIndexes { // Notify the indexer with the connected block so it can index it. - if err := index.ConnectBlock(dbTx, block, dag); err != nil { - return err - } - } - return nil -} - -// DisconnectBlock must be invoked when a block is being disconnected from the -// end of the main chain. It keeps track of the state of each index it is -// managing, performs some sanity checks, and invokes each indexer to remove -// the index entries associated with the block. -// -// This is part of the blockchain.IndexManager interface. -func (m *Manager) DisconnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error { - // Call each of the currently active optional indexes with the block - // being disconnected so they can update accordingly. - for _, index := range m.enabledIndexes { - // Notify the indexer with the disconnected block so it can remove all - // of the appropriate entries. - if err := index.DisconnectBlock(dbTx, block, dag); err != nil { + if err := index.ConnectBlock(dbTx, block, dag, acceptedTxsData); err != nil { return err } } diff --git a/blockdag/indexers/txindex.go b/blockdag/indexers/txindex.go index 844cb619b..ff19bfb4c 100644 --- a/blockdag/indexers/txindex.go +++ b/blockdag/indexers/txindex.go @@ -21,9 +21,13 @@ const ( ) var ( - // txIndexKey is the key of the transaction index and the db bucket used - // to house it. - txIndexKey = []byte("txbyhashidx") + includingBlocksIndexKey = []byte("includingblocksidx") + + includingBlocksIndexKeyEntrySize = 8 // 4 bytes for offset + 4 bytes for transaction length + + acceptingBlocksIndexKey = []byte("acceptingblocksidx") + + acceptingBlocksIndexKeyEntrySize = 4 // 4 bytes for including block ID // idByHashIndexBucketName is the name of the db bucket used to house // the block id -> block hash index. @@ -39,18 +43,21 @@ var ( ) // ----------------------------------------------------------------------------- -// The transaction index consists of an entry for every transaction in the main -// chain. In order to significantly optimize the space requirements a separate +// The transaction index consists of an entry for every transaction in the DAG. +// In order to significantly optimize the space requirements a separate // index which provides an internal mapping between each block that has been // indexed and a unique ID for use within the hash to location mappings. The ID // is simply a sequentially incremented uint32. This is useful because it is // only 4 bytes versus 32 bytes hashes and thus saves a ton of space in the // index. // -// There are three buckets used in total. The first bucket maps the hash of -// each transaction to the specific block location. The second bucket maps the -// hash of each block to the unique ID and the third maps that ID back to the -// block hash. +// There are four buckets used in total. The first bucket maps the hash of +// each transaction to its location in each block it's included in. The second bucket +// contains all of the blocks that from their viewpoint the transaction has been +// accepted (i.e. the transaction is found in their blue set without double spends), +// and their blue block (or themselves) that included the transaction. The third +// bucket maps the hash of each block to the unique ID and the fourth maps +// that ID back to the block hash. // // NOTE: Although it is technically possible for multiple transactions to have // the same hash as long as the previous transaction with the same hash is fully @@ -60,6 +67,27 @@ var ( // one must be fully spent and so the most likely transaction a caller would // want for a given hash is the most recent one anyways. // +// The including blocks index contains a sub bucket for each transaction hash (32 byte each), that its serialized format is: +// +// = +// +// Field Type Size +// block id uint32 4 bytes +// start offset uint32 4 bytes +// tx length uint32 4 bytes +// ----- +// Total: 12 bytes +// +// The accepting blocks index contains a sub bucket for each transaction hash (32 byte each), that its serialized format is: +// +// = +// +// Field Type Size +// accepting block id uint32 4 bytes +// including block id uint32 4 bytes +// ----- +// Total: 8 bytes +// // The serialized format for keys and values in the block hash to ID bucket is: // = // @@ -78,17 +106,6 @@ var ( // ----- // Total: 36 bytes // -// The serialized format for the keys and values in the tx index bucket is: -// -// = -// -// Field Type Size -// txhash daghash.Hash 32 bytes -// block id uint32 4 bytes -// start offset uint32 4 bytes -// tx length uint32 4 bytes -// ----- -// Total: 44 bytes // ----------------------------------------------------------------------------- // dbPutBlockIDIndexEntry uses an existing database transaction to update or add @@ -111,25 +128,6 @@ func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *daghash.Hash, id uint32) err return idIndex.Put(serializedID[:], hash[:]) } -// dbRemoveBlockIDIndexEntry uses an existing database transaction remove index -// entries from the hash to id and id to hash mappings for the provided hash. -func dbRemoveBlockIDIndexEntry(dbTx database.Tx, hash *daghash.Hash) error { - // Remove the block hash to ID mapping. - meta := dbTx.Metadata() - hashIndex := meta.Bucket(idByHashIndexBucketName) - serializedID := hashIndex.Get(hash[:]) - if serializedID == nil { - return nil - } - if err := hashIndex.Delete(hash[:]); err != nil { - return err - } - - // Remove the block ID to hash mapping. - idIndex := meta.Bucket(hashByIDIndexBucketName) - return idIndex.Delete(serializedID) -} - // dbFetchBlockIDByHash uses an existing database transaction to retrieve the // block id for the provided hash from the index. func dbFetchBlockIDByHash(dbTx database.Tx, hash *daghash.Hash) (uint32, error) { @@ -164,38 +162,68 @@ func dbFetchBlockHashByID(dbTx database.Tx, id uint32) (*daghash.Hash, error) { return dbFetchBlockHashBySerializedID(dbTx, serializedID[:]) } -// putTxIndexEntry serializes the provided values according to the format -// described about for a transaction index entry. The target byte slice must -// be at least large enough to handle the number of bytes defined by the -// txEntrySize constant or it will panic. -func putTxIndexEntry(target []byte, blockID uint32, txLoc wire.TxLoc) { - byteOrder.PutUint32(target, blockID) - byteOrder.PutUint32(target[4:], uint32(txLoc.TxStart)) - byteOrder.PutUint32(target[8:], uint32(txLoc.TxLen)) +func putIncludingBlocksEntry(target []byte, txLoc wire.TxLoc) { + byteOrder.PutUint32(target, uint32(txLoc.TxStart)) + byteOrder.PutUint32(target[4:], uint32(txLoc.TxLen)) } -// dbPutTxIndexEntry uses an existing database transaction to update the -// transaction index given the provided serialized data that is expected to have -// been serialized putTxIndexEntry. -func dbPutTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash, serializedData []byte) error { - txIndex := dbTx.Metadata().Bucket(txIndexKey) - return txIndex.Put(txHash[:], serializedData) +func putAcceptingBlocksEntry(target []byte, includingBlockID uint32) { + byteOrder.PutUint32(target, includingBlockID) } -// dbFetchTxIndexEntry uses an existing database transaction to fetch the block +func dbPutIncludingBlocksEntry(dbTx database.Tx, txHash *daghash.Hash, blockID uint32, serializedData []byte) error { + bucket, err := dbTx.Metadata().Bucket(includingBlocksIndexKey).CreateBucketIfNotExists(txHash[:]) + if err != nil { + return err + } + blockIDBytes := make([]byte, 4) + byteOrder.PutUint32(blockIDBytes, uint32(blockID)) + return bucket.Put(blockIDBytes, serializedData) +} + +func dbPutAcceptingBlocksEntry(dbTx database.Tx, txHash *daghash.Hash, blockID uint32, serializedData []byte) error { + bucket, err := dbTx.Metadata().Bucket(acceptingBlocksIndexKey).CreateBucketIfNotExists(txHash[:]) + if err != nil { + return err + } + blockIDBytes := make([]byte, 4) + byteOrder.PutUint32(blockIDBytes, uint32(blockID)) + return bucket.Put(blockIDBytes, serializedData) +} + +// dbFetchFirstTxRegion uses an existing database transaction to fetch the block // region for the provided transaction hash from the transaction index. When // there is no entry for the provided hash, nil will be returned for the both // the region and the error. -func dbFetchTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash) (*database.BlockRegion, error) { +// +// P.S Because the transaction can be found in multiple blocks, this function arbitarily +// returns the first block region that is stored in the txindex. +func dbFetchFirstTxRegion(dbTx database.Tx, txHash *daghash.Hash) (*database.BlockRegion, error) { // Load the record from the database and return now if it doesn't exist. - txIndex := dbTx.Metadata().Bucket(txIndexKey) - serializedData := txIndex.Get(txHash[:]) + txBucket := dbTx.Metadata().Bucket(includingBlocksIndexKey).Bucket(txHash[:]) + if txBucket == nil { + return nil, database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("No block region"+ + "was found for %s", txHash), + } + } + cursor := txBucket.Cursor() + if ok := cursor.First(); !ok { + return nil, database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("No block region"+ + "was found for %s", txHash), + } + } + blockIDBytes := cursor.Key() + serializedData := cursor.Value() if len(serializedData) == 0 { return nil, nil } // Ensure the serialized data has enough bytes to properly deserialize. - if len(serializedData) < 12 { + if len(serializedData) < includingBlocksIndexKeyEntrySize { return nil, database.Error{ ErrorCode: database.ErrCorruption, Description: fmt.Sprintf("corrupt transaction index "+ @@ -204,7 +232,7 @@ func dbFetchTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash) (*database.Bloc } // Load the block hash associated with the block ID. - hash, err := dbFetchBlockHashBySerializedID(dbTx, serializedData[0:4]) + hash, err := dbFetchBlockHashBySerializedID(dbTx, blockIDBytes) if err != nil { return nil, database.Error{ ErrorCode: database.ErrCorruption, @@ -216,15 +244,15 @@ func dbFetchTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash) (*database.Bloc // Deserialize the final entry. region := database.BlockRegion{Hash: &daghash.Hash{}} copy(region.Hash[:], hash[:]) - region.Offset = byteOrder.Uint32(serializedData[4:8]) - region.Len = byteOrder.Uint32(serializedData[8:12]) + region.Offset = byteOrder.Uint32(serializedData[:4]) + region.Len = byteOrder.Uint32(serializedData[4:]) return ®ion, nil } // dbAddTxIndexEntries uses an existing database transaction to add a // transaction index entry for every transaction in the passed block. -func dbAddTxIndexEntries(dbTx database.Tx, block *util.Block, blockID uint32) error { +func dbAddTxIndexEntries(dbTx database.Tx, block *util.Block, blockID uint32, acceptedTxData []*blockdag.TxWithBlockHash) error { // The offset and length of the transactions within the serialized // block. txLocs, err := block.TxLoc() @@ -237,43 +265,46 @@ func dbAddTxIndexEntries(dbTx database.Tx, block *util.Block, blockID uint32) er // serialize them directly into the slice. Then, pass the appropriate // subslice to the database to be written. This approach significantly // cuts down on the number of required allocations. - offset := 0 - serializedValues := make([]byte, len(block.Transactions())*txEntrySize) + includingBlocksOffset := 0 + serializedIncludingBlocksValues := make([]byte, len(block.Transactions())*includingBlocksIndexKeyEntrySize) for i, tx := range block.Transactions() { - putTxIndexEntry(serializedValues[offset:], blockID, txLocs[i]) - endOffset := offset + txEntrySize - err := dbPutTxIndexEntry(dbTx, tx.Hash(), - serializedValues[offset:endOffset:endOffset]) + putIncludingBlocksEntry(serializedIncludingBlocksValues[includingBlocksOffset:], txLocs[i]) + endOffset := includingBlocksOffset + includingBlocksIndexKeyEntrySize + err := dbPutIncludingBlocksEntry(dbTx, tx.Hash(), blockID, + serializedIncludingBlocksValues[includingBlocksOffset:endOffset:endOffset]) if err != nil { return err } - offset += txEntrySize + includingBlocksOffset += includingBlocksIndexKeyEntrySize } - return nil -} + blockHashToID := make(map[daghash.Hash]uint32) + blockHashToID[*block.Hash()] = blockID -// dbRemoveTxIndexEntry uses an existing database transaction to remove the most -// recent transaction index entry for the given hash. -func dbRemoveTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash) error { - txIndex := dbTx.Metadata().Bucket(txIndexKey) - serializedData := txIndex.Get(txHash[:]) - if len(serializedData) == 0 { - return fmt.Errorf("can't remove non-existent transaction %s "+ - "from the transaction index", txHash) - } + acceptingBlocksOffset := 0 - return txIndex.Delete(txHash[:]) -} + serializedAcceptingBlocksValues := make([]byte, len(acceptedTxData)*acceptingBlocksIndexKeyEntrySize) + for _, tx := range acceptedTxData { + var includingBlockID uint32 + var err error + var ok bool -// dbRemoveTxIndexEntries uses an existing database transaction to remove the -// latest transaction entry for every transaction in the passed block. -func dbRemoveTxIndexEntries(dbTx database.Tx, block *util.Block) error { - for _, tx := range block.Transactions() { - err := dbRemoveTxIndexEntry(dbTx, tx.Hash()) + if includingBlockID, ok = blockHashToID[*tx.InBlock]; !ok { + includingBlockID, err = dbFetchBlockIDByHash(dbTx, tx.InBlock) + if err != nil { + return err + } + blockHashToID[*tx.InBlock] = includingBlockID + } + + putAcceptingBlocksEntry(serializedAcceptingBlocksValues[acceptingBlocksOffset:], includingBlockID) + endOffset := acceptingBlocksOffset + acceptingBlocksIndexKeyEntrySize + err = dbPutAcceptingBlocksEntry(dbTx, tx.Tx.Hash(), blockID, + serializedAcceptingBlocksValues[acceptingBlocksOffset:endOffset:endOffset]) if err != nil { return err } + acceptingBlocksOffset += acceptingBlocksIndexKeyEntrySize } return nil @@ -356,7 +387,7 @@ func (idx *TxIndex) Init() error { // // This is part of the Indexer interface. func (idx *TxIndex) Key() []byte { - return txIndexKey + return includingBlocksIndexKey } // Name returns the human-readable name of the index. @@ -379,20 +410,24 @@ func (idx *TxIndex) Create(dbTx database.Tx) error { if _, err := meta.CreateBucket(hashByIDIndexBucketName); err != nil { return err } - _, err := meta.CreateBucket(txIndexKey) + if _, err := meta.CreateBucket(includingBlocksIndexKey); err != nil { + return err + } + _, err := meta.CreateBucket(acceptingBlocksIndexKey) return err + } // ConnectBlock is invoked by the index manager when a new block has been -// connected to the main chain. This indexer adds a hash-to-transaction mapping +// connected to the DAG. This indexer adds a hash-to-transaction mapping // for every transaction in the passed block. // // This is part of the Indexer interface. -func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, _ *blockdag.BlockDAG) error { +func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, _ *blockdag.BlockDAG, acceptedTxsData []*blockdag.TxWithBlockHash) error { // Increment the internal block ID to use for the block being connected // and add all of the transactions in the block to the index. newBlockID := idx.curBlockID + 1 - if err := dbAddTxIndexEntries(dbTx, block, newBlockID); err != nil { + if err := dbAddTxIndexEntries(dbTx, block, newBlockID, acceptedTxsData); err != nil { return err } @@ -406,37 +441,17 @@ func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, _ *blockda return nil } -// DisconnectBlock is invoked by the index manager when a block has been -// disconnected from the main chain. This indexer removes the -// hash-to-transaction mapping for every transaction in the block. -// -// This is part of the Indexer interface. -func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block *util.Block, _ *blockdag.BlockDAG) error { - // Remove all of the transactions in the block from the index. - if err := dbRemoveTxIndexEntries(dbTx, block); err != nil { - return err - } - - // Remove the block ID index entry for the block being disconnected and - // decrement the current internal block ID to account for it. - if err := dbRemoveBlockIDIndexEntry(dbTx, block.Hash()); err != nil { - return err - } - idx.curBlockID-- - return nil -} - -// TxBlockRegion returns the block region for the provided transaction hash +// TxFirstBlockRegion returns the block region for the provided transaction hash // from the transaction index. The block region can in turn be used to load the // raw transaction bytes. When there is no entry for the provided hash, nil // will be returned for the both the entry and the error. // // This function is safe for concurrent access. -func (idx *TxIndex) TxBlockRegion(hash *daghash.Hash) (*database.BlockRegion, error) { +func (idx *TxIndex) TxFirstBlockRegion(hash *daghash.Hash) (*database.BlockRegion, error) { var region *database.BlockRegion err := idx.db.View(func(dbTx database.Tx) error { var err error - region, err = dbFetchTxIndexEntry(dbTx, hash) + region, err = dbFetchFirstTxRegion(dbTx, hash) return err }) return region, err @@ -475,5 +490,10 @@ func DropTxIndex(db database.DB, interrupt <-chan struct{}) error { return err } - return dropIndex(db, txIndexKey, txIndexName, interrupt) + err = dropIndex(db, includingBlocksIndexKey, addrIndexName, interrupt) + if err != nil { + return err + } + + return dropIndex(db, acceptingBlocksIndexKey, txIndexName, interrupt) } diff --git a/blockdag/indexers/txindex_test.go b/blockdag/indexers/txindex_test.go new file mode 100644 index 000000000..9109839ec --- /dev/null +++ b/blockdag/indexers/txindex_test.go @@ -0,0 +1,175 @@ +package indexers + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "reflect" + "testing" + + "github.com/daglabs/btcd/blockdag" + "github.com/daglabs/btcd/dagconfig/daghash" + "github.com/daglabs/btcd/database" + "github.com/daglabs/btcd/util" + "github.com/daglabs/btcd/wire" +) + +func tempDb() (database.DB, func(), error) { + dbPath, err := ioutil.TempDir("", "ffldb") + if err != nil { + return nil, nil, err + } + db, err := database.Create("ffldb", dbPath, wire.MainNet) + if err != nil { + return nil, nil, fmt.Errorf("error creating db: %v", err) + } + teardown := func() { + db.Close() + os.RemoveAll(dbPath) + } + return db, teardown, nil +} + +func TestTxIndexConnectBlock(t *testing.T) { + db, teardown, err := tempDb() + if teardown != nil { + defer teardown() + } + if err != nil { + t.Fatalf("TestTxIndexConnectBlock: %v", err) + } + err = db.Update(func(dbTx database.Tx) error { + txIndex := NewTxIndex(db) + err := txIndex.Create(dbTx) + if err != nil { + t.Fatalf("TestTxIndexConnectBlock: Couldn't create txIndex: %v", err) + } + msgBlock1 := wire.NewMsgBlock(wire.NewBlockHeader(1, + []daghash.Hash{{1}}, &daghash.Hash{}, 1, 1)) + + dummyPrevOutHash, err := daghash.NewHashFromStr("01") + if err != nil { + t.Fatalf("TestTxIndexConnectBlock: NewShaHashFromStr: unexpected error: %v", err) + } + dummyPrevOut1 := wire.OutPoint{Hash: *dummyPrevOutHash, Index: 0} + dummySigScript := bytes.Repeat([]byte{0x00}, 65) + dummyTxOut := &wire.TxOut{ + Value: 5000000000, + PkScript: bytes.Repeat([]byte{0x00}, 65), + } + + tx1 := wire.NewMsgTx(wire.TxVersion) + tx1.AddTxIn(wire.NewTxIn(&dummyPrevOut1, dummySigScript)) + tx1.AddTxOut(dummyTxOut) + msgBlock1.AddTransaction(tx1) + block1 := util.NewBlock(msgBlock1) + err = txIndex.ConnectBlock(dbTx, block1, &blockdag.BlockDAG{}, []*blockdag.TxWithBlockHash{ + { + Tx: util.NewTx(tx1), + InBlock: block1.Hash(), + }, + }) + if err != nil { + t.Fatalf("TestTxIndexConnectBlock: Couldn't connect block 1 to txindex") + } + + tx1Hash := tx1.TxHash() + block1IDBytes := make([]byte, 4) + byteOrder.PutUint32(block1IDBytes, uint32(1)) + tx1IncludingBucket := dbTx.Metadata().Bucket(includingBlocksIndexKey).Bucket(tx1Hash[:]) + if tx1IncludingBucket == nil { + t.Fatalf("TestTxIndexConnectBlock: No including blocks bucket was found for tx1") + } + block1Tx1includingBlocksIndexEntry := tx1IncludingBucket.Get(block1IDBytes) + if len(block1Tx1includingBlocksIndexEntry) == 0 { + t.Fatalf("TestTxIndexConnectBlock: there was no entry for block1 in tx1's including blocks bucket") + } + + tx1Offset := byteOrder.Uint32(block1Tx1includingBlocksIndexEntry[:4]) + tx1Len := byteOrder.Uint32(block1Tx1includingBlocksIndexEntry[4:]) + + block1Bytes, err := block1.Bytes() + if err != nil { + t.Fatalf("TestTxIndexConnectBlock: Couldn't serialize block 1 to bytes") + } + tx1InBlock1 := block1Bytes[tx1Offset : tx1Offset+tx1Len] + + wTx1 := bytes.NewBuffer(make([]byte, 0, tx1.SerializeSize())) + tx1.BtcEncode(wTx1, 0) + tx1Bytes := wTx1.Bytes() + + if !reflect.DeepEqual(tx1Bytes, tx1InBlock1) { + t.Errorf("TestTxIndexConnectBlock: the block region that was in the bucket doesn't match tx1") + } + + tx1AcceptingBlocksBucket := dbTx.Metadata().Bucket(acceptingBlocksIndexKey).Bucket(tx1Hash[:]) + if tx1AcceptingBlocksBucket == nil { + t.Fatalf("TestTxIndexConnectBlock: No accepting blocks bucket was found for tx1") + } + + block1Tx1AcceptingEntry := tx1AcceptingBlocksBucket.Get(block1IDBytes) + tx1IncludingBlockID := byteOrder.Uint32(block1Tx1AcceptingEntry) + if tx1IncludingBlockID != 1 { + t.Fatalf("TestTxIndexConnectBlock: tx1 should've been included in block 1, but got %v", tx1IncludingBlockID) + } + + msgBlock2 := wire.NewMsgBlock(wire.NewBlockHeader(1, + []daghash.Hash{{2}}, &daghash.Hash{}, 1, 1)) + dummyPrevOut2 := wire.OutPoint{Hash: *dummyPrevOutHash, Index: 1} + tx2 := wire.NewMsgTx(wire.TxVersion) + tx2.AddTxIn(wire.NewTxIn(&dummyPrevOut2, dummySigScript)) + tx2.AddTxOut(dummyTxOut) + msgBlock2.AddTransaction(tx2) + block2 := util.NewBlock(msgBlock2) + err = txIndex.ConnectBlock(dbTx, block2, &blockdag.BlockDAG{}, []*blockdag.TxWithBlockHash{ + { + Tx: util.NewTx(tx1), + InBlock: block1.Hash(), + }, + { + Tx: util.NewTx(tx2), + InBlock: block2.Hash(), + }, + }) + if err != nil { + t.Fatalf("TestTxIndexConnectBlock: Couldn't connect block 2 to txindex") + } + + tx2Hash := tx2.TxHash() + block2IDBytes := make([]byte, 4) + byteOrder.PutUint32(block2IDBytes, uint32(2)) + + tx2IncludingBlocksBucket := dbTx.Metadata().Bucket(includingBlocksIndexKey).Bucket(tx2Hash[:]) + if tx2IncludingBlocksBucket == nil { + t.Fatalf("TestTxIndexConnectBlock: No including blocks bucket was found for tx2") + } + + block2Tx2includingBlocksIndexEntry := tx2IncludingBlocksBucket.Get(block2IDBytes) + if len(block2Tx2includingBlocksIndexEntry) == 0 { + t.Fatalf("TestTxIndexConnectBlock: there was no entry for block2 in tx2's including blocks bucket") + } + + tx2AcceptingBlocksBucket := dbTx.Metadata().Bucket(acceptingBlocksIndexKey).Bucket(tx2Hash[:]) + if tx2AcceptingBlocksBucket == nil { + t.Fatalf("TestTxIndexConnectBlock: No accepting blocks bucket was found for tx2") + } + + block2Tx2AcceptingEntry := tx2AcceptingBlocksBucket.Get(block2IDBytes) + tx2IncludingBlockID := byteOrder.Uint32(block2Tx2AcceptingEntry) + if tx2IncludingBlockID != 2 { + t.Fatalf("TestTxIndexConnectBlock: tx2 should've been included in block 2, but got %v", tx1IncludingBlockID) + } + + block2Tx1AcceptingEntry := tx1AcceptingBlocksBucket.Get(block2IDBytes) + tx1Block2IncludingBlockID := byteOrder.Uint32(block2Tx1AcceptingEntry) + if tx1Block2IncludingBlockID != 1 { + t.Fatalf("TestTxIndexConnectBlock: tx2 should've been included in block 1, but got %v", tx1Block2IncludingBlockID) + } + + return nil + }) + if err != nil { + t.Fatalf("TestTxIndexConnectBlock: %v", err) + } +} diff --git a/blockdag/utxoset_test.go b/blockdag/utxoset_test.go index fb442762e..1894b4775 100644 --- a/blockdag/utxoset_test.go +++ b/blockdag/utxoset_test.go @@ -878,7 +878,7 @@ func TestApplyUTXOChanges(t *testing.T) { initBlockNode(&node1, blockHeader, setFromSlice(dag.genesis), dagconfig.MainNetParams.K) //Checks that dag.applyUTXOChanges fails because we don't allow a transaction to spend another transaction from the same block - _, err = dag.applyUTXOChanges(&node1, block1) + _, _, err = dag.applyUTXOChanges(&node1, block1) if err == nil { t.Errorf("applyUTXOChanges expected an error\n") } @@ -904,8 +904,8 @@ func TestApplyUTXOChanges(t *testing.T) { var node2 blockNode initBlockNode(&node2, blockHeader, setFromSlice(dag.genesis), dagconfig.MainNetParams.K) - //Checks that dag.applyUTXOChanges doesn't fail because we all of its transaction are dependant on transactions from previous blocks - _, err = dag.applyUTXOChanges(&node2, block2) + //Checks that dag.applyUTXOChanges doesn't fail because all of its transaction are dependant on transactions from previous blocks + _, _, err = dag.applyUTXOChanges(&node2, block2) if err != nil { t.Errorf("applyUTXOChanges: %v", err) } diff --git a/server/rpc/rpcserver.go b/server/rpc/rpcserver.go index fd8752b9e..8c16f1a03 100644 --- a/server/rpc/rpcserver.go +++ b/server/rpc/rpcserver.go @@ -2443,7 +2443,7 @@ func handleGetRawTransaction(s *Server, cmd interface{}, closeChan <-chan struct } // Look up the location of the transaction. - blockRegion, err := s.cfg.TxIndex.TxBlockRegion(txHash) + blockRegion, err := s.cfg.TxIndex.TxFirstBlockRegion(txHash) if err != nil { context := "Failed to retrieve transaction location" return nil, internalRPCError(err.Error(), context) @@ -2724,7 +2724,7 @@ func fetchInputTxos(s *Server, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, er } // Look up the location of the transaction. - blockRegion, err := s.cfg.TxIndex.TxBlockRegion(&origin.Hash) + blockRegion, err := s.cfg.TxIndex.TxFirstBlockRegion(&origin.Hash) if err != nil { context := "Failed to retrieve transaction location" return nil, internalRPCError(err.Error(), context)