Dev 223 fix txindex (#100)

* [DEV-201] In handleGetBlockDAGInfo calculate difficulty by the tip with the lowest bits

* [DEV-202] Move VirtualBlock.GetUTXOEntry to BlockDAG

* [DEV-203] Move VirtualBlock.SelectedTip() to BlockDAG

* [DEV-203] Move VirtualBlock.SelectedTip() to BlockDAG

* [DEV-204] Unexport VirtualBlock() and add CalcMedianTime method for DAG

* [DEV-204] add explanation about difficulty in CurrentBits() comment

* [DEV-204] unexport VirtualBlock type

* [DEV-223] make applyUTXOChanges return pastUTXOResults

* [DEV-223] add bluestxdata for current block as well

* [DEV-223] re-design tx index

* [DEV-223] edit txindex comments

* [DEV-223] rename BluesTxData -> AcceptedTxData, and return from applyUTXOChanges only transactions that got accepted

* [DEV-223] add unit test for txindex

* [DEV-223] fix comments and unite blueTransaction and AcceptedTxData to one type

* [DEV-223] use bucket cursor for dbFetchFirstTxRegion

* [DEV-223] use the same cursor instance for dbFetchFirstTxRegion

* [DEV-223] write in dbFetchFirstTxRegion's comment that it returns the first block region

* [DEV-223] rename type BlueBlockTransaction to TxWithBlockHash

* [DEV-223] add named returned value for applyUTXOChanges
This commit is contained in:
Ori Newman 2018-10-21 16:01:22 +03:00 committed by GitHub
parent d5787954ee
commit 5e41d83015
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 372 additions and 183 deletions

View File

@ -503,7 +503,7 @@ func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block) error {
}
// Add the node to the virtual and update the UTXO set of the DAG.
utxoDiff, err := dag.applyUTXOChanges(node, block)
utxoDiff, acceptedTxsData, err := dag.applyUTXOChanges(node, block)
if err != nil {
return err
}
@ -540,7 +540,7 @@ func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block) error {
// optional indexes with the block being connected so they can
// update themselves accordingly.
if dag.indexManager != nil {
err := dag.indexManager.ConnectBlock(dbTx, block, dag)
err := dag.indexManager.ConnectBlock(dbTx, block, dag, acceptedTxsData)
if err != nil {
return err
}
@ -570,7 +570,7 @@ func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block) error {
// 5. Updates each of the tips' utxoDiff.
//
// This function MUST be called with the chain state lock held (for writes).
func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (*utxoDiff, error) {
func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (utxoDiff *utxoDiff, acceptedTxData []*TxWithBlockHash, err error) {
// Prepare provisionalNodes for all the relevant nodes to avoid modifying the original nodes.
// We avoid modifying the original nodes in this function because it could potentially
// fail if the block is not valid, thus bringing all the affected nodes (and the virtual)
@ -581,14 +581,14 @@ func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (*utxo
// Clone the virtual block so that we don't modify the existing one.
virtualClone := dag.virtual.clone()
newBlockUTXO, err := newNodeProvisional.verifyAndBuildUTXO(virtualClone, dag.db)
newBlockUTXO, acceptedTxData, err := newNodeProvisional.verifyAndBuildUTXO(virtualClone, dag.db)
if err != nil {
return nil, fmt.Errorf("error verifying UTXO for %v: %s", node, err)
return nil, nil, fmt.Errorf("error verifying UTXO for %v: %s", node, err)
}
err = newNodeProvisional.updateParents(virtualClone, newBlockUTXO)
if err != nil {
return nil, fmt.Errorf("failed updating parents of %v: %s", node, err)
return nil, nil, fmt.Errorf("failed updating parents of %v: %s", node, err)
}
// Update the virtual block's children (the DAG tips) to include the new block.
@ -596,20 +596,20 @@ func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (*utxo
// Build a UTXO set for the new virtual block and update the DAG tips' diffs.
virtualNodeProvisional := provisionalSet.newProvisionalNode(&virtualClone.blockNode, true, nil)
newVirtualUTXO, err := virtualNodeProvisional.pastUTXO(virtualClone, dag.db)
newVirtualUTXO, _, err := virtualNodeProvisional.pastUTXO(virtualClone, dag.db)
if err != nil {
return nil, fmt.Errorf("could not restore past UTXO for virtual %v: %s", virtualClone, err)
return nil, nil, fmt.Errorf("could not restore past UTXO for virtual %v: %s", virtualClone, err)
}
// Apply new utxoDiffs to all the tips
err = updateTipsUTXO(virtualNodeProvisional.parents, virtualClone, newVirtualUTXO)
if err != nil {
return nil, fmt.Errorf("failed updating the tips' UTXO: %s", err)
return nil, nil, fmt.Errorf("failed updating the tips' UTXO: %s", err)
}
// It is now safe to meld the UTXO set to base.
diffSet := newVirtualUTXO.(*DiffUTXOSet)
utxoDiff := diffSet.UTXODiff
utxoDiff = diffSet.UTXODiff
dag.updateVirtualUTXO(diffSet)
// It is now safe to commit all the provisionalNodes
@ -626,7 +626,7 @@ func (dag *BlockDAG) applyUTXOChanges(node *blockNode, block *util.Block) (*utxo
// It is now safe to apply the new virtual block
dag.virtual = virtualClone
return utxoDiff, nil
return utxoDiff, acceptedTxData, nil
}
func (dag *BlockDAG) updateVirtualUTXO(newVirtualUTXODiffSet *DiffUTXOSet) {
@ -704,46 +704,60 @@ func (pns provisionalNodeSet) newProvisionalNode(node *blockNode, withRelatives
}
// verifyAndBuildUTXO verifies all transactions in the given block (in provisionalNode format) and builds its UTXO
func (p *provisionalNode) verifyAndBuildUTXO(virtual *virtualBlock, db database.DB) (UTXOSet, error) {
pastUTXO, err := p.pastUTXO(virtual, db)
func (p *provisionalNode) verifyAndBuildUTXO(virtual *virtualBlock, db database.DB) (utxoSet UTXOSet, acceptedTxData []*TxWithBlockHash, err error) {
pastUTXO, pastUTXOaccpetedTxData, err := p.pastUTXO(virtual, db)
if err != nil {
return nil, err
return nil, nil, err
}
diff := NewUTXODiff()
acceptedTxData = make([]*TxWithBlockHash, 0, len(pastUTXOaccpetedTxData)+len(p.transactions))
if len(pastUTXOaccpetedTxData) != 0 {
acceptedTxData = append(acceptedTxData, pastUTXOaccpetedTxData...)
}
for _, tx := range p.transactions {
txDiff, err := pastUTXO.diffFromTx(tx.MsgTx(), p.original)
if err != nil {
return nil, err
return nil, nil, err
}
diff, err = diff.WithDiff(txDiff)
if err != nil {
return nil, err
return nil, nil, err
}
acceptedTxData = append(acceptedTxData, &TxWithBlockHash{
Tx: tx,
InBlock: &p.original.hash,
})
}
utxo, err := pastUTXO.WithDiff(diff)
if err != nil {
return nil, err
return nil, nil, err
}
return utxo, nil
return utxo, acceptedTxData, nil
}
// TxWithBlockHash is a type that holds data about in which block a transaction was found
type TxWithBlockHash struct {
Tx *util.Tx
InBlock *daghash.Hash
}
// pastUTXO returns the UTXO of a given block's (in provisionalNode format) past
func (p *provisionalNode) pastUTXO(virtual *virtualBlock, db database.DB) (UTXOSet, error) {
pastUTXO, err := p.selectedParent.restoreUTXO(virtual)
func (p *provisionalNode) pastUTXO(virtual *virtualBlock, db database.DB) (pastUTXO UTXOSet, acceptedTxData []*TxWithBlockHash, err error) {
pastUTXO, err = p.selectedParent.restoreUTXO(virtual)
if err != nil {
return nil, err
return nil, nil, err
}
// Fetch from the database all the transactions for this block's blue set (besides the selected parent)
var blueBlockTransactions []*util.Tx
var blueBlockTransactions []*TxWithBlockHash
transactionCount := 0
err = db.View(func(tx database.Tx) error {
// Precalculate the amount of transactions in this block's blue set, besides the selected parent.
// This is to avoid an attack in which an attacker fabricates a block that will deliberately cause
// a lot of copying, causing a high cost to the whole network.
transactionCount := 0
blueBlocks := make([]*util.Block, 0, len(p.original.blues)-1)
for i := len(p.original.blues) - 1; i >= 0; i-- {
blueBlockNode := p.original.blues[i]
@ -760,24 +774,31 @@ func (p *provisionalNode) pastUTXO(virtual *virtualBlock, db database.DB) (UTXOS
blueBlocks = append(blueBlocks, blueBlock)
}
blueBlockTransactions = make([]*util.Tx, 0, transactionCount)
blueBlockTransactions = make([]*TxWithBlockHash, 0, transactionCount)
for _, blueBlock := range blueBlocks {
blueBlockTransactions = append(blueBlockTransactions, blueBlock.Transactions()...)
for _, tx := range blueBlock.Transactions() {
blueBlockTransactions = append(blueBlockTransactions, &TxWithBlockHash{Tx: tx, InBlock: blueBlock.Hash()})
}
}
return nil
})
if err != nil {
return nil, err
return nil, nil, err
}
acceptedTxData = make([]*TxWithBlockHash, 0, transactionCount)
// Add all transactions to the pastUTXO
// Purposefully ignore failures - these are just unaccepted transactions
for _, tx := range blueBlockTransactions {
_ = pastUTXO.AddTx(tx.MsgTx(), p.original.height)
isAccepted := pastUTXO.AddTx(tx.Tx.MsgTx(), p.original.height)
if isAccepted {
acceptedTxData = append(acceptedTxData, tx)
}
}
return pastUTXO, nil
return pastUTXO, acceptedTxData, nil
}
// restoreUTXO restores the UTXO of a given block (in provisionalNode format) from its diff
@ -1366,11 +1387,7 @@ type IndexManager interface {
// ConnectBlock is invoked when a new block has been connected to the
// DAG.
ConnectBlock(database.Tx, *util.Block, *BlockDAG) error
// DisconnectBlock is invoked when a block has been disconnected from
// the DAG.
DisconnectBlock(database.Tx, *util.Block, *BlockDAG) error
ConnectBlock(database.Tx, *util.Block, *BlockDAG, []*TxWithBlockHash) error
}
// Config is a descriptor which specifies the blockchain instance configuration.

View File

@ -693,7 +693,7 @@ func (idx *AddrIndex) indexBlock(data writeIndexData, block *util.Block, dag *bl
// the transactions in the block involve.
//
// This is part of the Indexer interface.
func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error {
func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, _ []*blockdag.TxWithBlockHash) error {
// The offset and length of the transactions within the serialized
// block.
txLocs, err := block.TxLoc()

View File

@ -203,7 +203,7 @@ func storeFilter(dbTx database.Tx, block *util.Block, f *gcs.Filter,
// connected to the main chain. This indexer adds a hash-to-cf mapping for
// every passed block. This is part of the Indexer interface.
func (idx *CfIndex) ConnectBlock(dbTx database.Tx, block *util.Block,
_ *blockdag.BlockDAG) error {
_ *blockdag.BlockDAG, _ []*blockdag.TxWithBlockHash) error {
f, err := builder.BuildBasicFilter(block.MsgBlock())
if err != nil {

View File

@ -52,11 +52,7 @@ type Indexer interface {
// ConnectBlock is invoked when the index manager is notified that a new
// block has been connected to the DAG.
ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error
// DisconnectBlock is invoked when the index manager is notified that a
// block has been disconnected from the DAG.
DisconnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error
ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, _ []*blockdag.TxWithBlockHash) error
}
// AssertError identifies an error that indicates an internal code consistency

View File

@ -181,7 +181,7 @@ func indexNeedsInputs(index Indexer) bool {
// loads it from the database.
func dbFetchTx(dbTx database.Tx, hash *daghash.Hash) (*wire.MsgTx, error) {
// Look up the location of the transaction.
blockRegion, err := dbFetchTxIndexEntry(dbTx, hash)
blockRegion, err := dbFetchFirstTxRegion(dbTx, hash)
if err != nil {
return nil, err
}
@ -210,31 +210,12 @@ func dbFetchTx(dbTx database.Tx, hash *daghash.Hash) (*wire.MsgTx, error) {
// checks, and invokes each indexer.
//
// This is part of the blockchain.IndexManager interface.
func (m *Manager) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error {
func (m *Manager) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, acceptedTxsData []*blockdag.TxWithBlockHash) error {
// Call each of the currently active optional indexes with the block
// being connected so they can update accordingly.
for _, index := range m.enabledIndexes {
// Notify the indexer with the connected block so it can index it.
if err := index.ConnectBlock(dbTx, block, dag); err != nil {
return err
}
}
return nil
}
// DisconnectBlock must be invoked when a block is being disconnected from the
// end of the main chain. It keeps track of the state of each index it is
// managing, performs some sanity checks, and invokes each indexer to remove
// the index entries associated with the block.
//
// This is part of the blockchain.IndexManager interface.
func (m *Manager) DisconnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG) error {
// Call each of the currently active optional indexes with the block
// being disconnected so they can update accordingly.
for _, index := range m.enabledIndexes {
// Notify the indexer with the disconnected block so it can remove all
// of the appropriate entries.
if err := index.DisconnectBlock(dbTx, block, dag); err != nil {
if err := index.ConnectBlock(dbTx, block, dag, acceptedTxsData); err != nil {
return err
}
}

View File

@ -21,9 +21,13 @@ const (
)
var (
// txIndexKey is the key of the transaction index and the db bucket used
// to house it.
txIndexKey = []byte("txbyhashidx")
includingBlocksIndexKey = []byte("includingblocksidx")
includingBlocksIndexKeyEntrySize = 8 // 4 bytes for offset + 4 bytes for transaction length
acceptingBlocksIndexKey = []byte("acceptingblocksidx")
acceptingBlocksIndexKeyEntrySize = 4 // 4 bytes for including block ID
// idByHashIndexBucketName is the name of the db bucket used to house
// the block id -> block hash index.
@ -39,18 +43,21 @@ var (
)
// -----------------------------------------------------------------------------
// The transaction index consists of an entry for every transaction in the main
// chain. In order to significantly optimize the space requirements a separate
// The transaction index consists of an entry for every transaction in the DAG.
// In order to significantly optimize the space requirements a separate
// index which provides an internal mapping between each block that has been
// indexed and a unique ID for use within the hash to location mappings. The ID
// is simply a sequentially incremented uint32. This is useful because it is
// only 4 bytes versus 32 bytes hashes and thus saves a ton of space in the
// index.
//
// There are three buckets used in total. The first bucket maps the hash of
// each transaction to the specific block location. The second bucket maps the
// hash of each block to the unique ID and the third maps that ID back to the
// block hash.
// There are four buckets used in total. The first bucket maps the hash of
// each transaction to its location in each block it's included in. The second bucket
// contains all of the blocks that from their viewpoint the transaction has been
// accepted (i.e. the transaction is found in their blue set without double spends),
// and their blue block (or themselves) that included the transaction. The third
// bucket maps the hash of each block to the unique ID and the fourth maps
// that ID back to the block hash.
//
// NOTE: Although it is technically possible for multiple transactions to have
// the same hash as long as the previous transaction with the same hash is fully
@ -60,6 +67,27 @@ var (
// one must be fully spent and so the most likely transaction a caller would
// want for a given hash is the most recent one anyways.
//
// The including blocks index contains a sub bucket for each transaction hash (32 byte each), that its serialized format is:
//
// <block id> = <start offset><tx length>
//
// Field Type Size
// block id uint32 4 bytes
// start offset uint32 4 bytes
// tx length uint32 4 bytes
// -----
// Total: 12 bytes
//
// The accepting blocks index contains a sub bucket for each transaction hash (32 byte each), that its serialized format is:
//
// <accepting block id> = <including block id>
//
// Field Type Size
// accepting block id uint32 4 bytes
// including block id uint32 4 bytes
// -----
// Total: 8 bytes
//
// The serialized format for keys and values in the block hash to ID bucket is:
// <hash> = <ID>
//
@ -78,17 +106,6 @@ var (
// -----
// Total: 36 bytes
//
// The serialized format for the keys and values in the tx index bucket is:
//
// <txhash> = <block id><start offset><tx length>
//
// Field Type Size
// txhash daghash.Hash 32 bytes
// block id uint32 4 bytes
// start offset uint32 4 bytes
// tx length uint32 4 bytes
// -----
// Total: 44 bytes
// -----------------------------------------------------------------------------
// dbPutBlockIDIndexEntry uses an existing database transaction to update or add
@ -111,25 +128,6 @@ func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *daghash.Hash, id uint32) err
return idIndex.Put(serializedID[:], hash[:])
}
// dbRemoveBlockIDIndexEntry uses an existing database transaction remove index
// entries from the hash to id and id to hash mappings for the provided hash.
func dbRemoveBlockIDIndexEntry(dbTx database.Tx, hash *daghash.Hash) error {
// Remove the block hash to ID mapping.
meta := dbTx.Metadata()
hashIndex := meta.Bucket(idByHashIndexBucketName)
serializedID := hashIndex.Get(hash[:])
if serializedID == nil {
return nil
}
if err := hashIndex.Delete(hash[:]); err != nil {
return err
}
// Remove the block ID to hash mapping.
idIndex := meta.Bucket(hashByIDIndexBucketName)
return idIndex.Delete(serializedID)
}
// dbFetchBlockIDByHash uses an existing database transaction to retrieve the
// block id for the provided hash from the index.
func dbFetchBlockIDByHash(dbTx database.Tx, hash *daghash.Hash) (uint32, error) {
@ -164,38 +162,68 @@ func dbFetchBlockHashByID(dbTx database.Tx, id uint32) (*daghash.Hash, error) {
return dbFetchBlockHashBySerializedID(dbTx, serializedID[:])
}
// putTxIndexEntry serializes the provided values according to the format
// described about for a transaction index entry. The target byte slice must
// be at least large enough to handle the number of bytes defined by the
// txEntrySize constant or it will panic.
func putTxIndexEntry(target []byte, blockID uint32, txLoc wire.TxLoc) {
byteOrder.PutUint32(target, blockID)
byteOrder.PutUint32(target[4:], uint32(txLoc.TxStart))
byteOrder.PutUint32(target[8:], uint32(txLoc.TxLen))
func putIncludingBlocksEntry(target []byte, txLoc wire.TxLoc) {
byteOrder.PutUint32(target, uint32(txLoc.TxStart))
byteOrder.PutUint32(target[4:], uint32(txLoc.TxLen))
}
// dbPutTxIndexEntry uses an existing database transaction to update the
// transaction index given the provided serialized data that is expected to have
// been serialized putTxIndexEntry.
func dbPutTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash, serializedData []byte) error {
txIndex := dbTx.Metadata().Bucket(txIndexKey)
return txIndex.Put(txHash[:], serializedData)
func putAcceptingBlocksEntry(target []byte, includingBlockID uint32) {
byteOrder.PutUint32(target, includingBlockID)
}
// dbFetchTxIndexEntry uses an existing database transaction to fetch the block
func dbPutIncludingBlocksEntry(dbTx database.Tx, txHash *daghash.Hash, blockID uint32, serializedData []byte) error {
bucket, err := dbTx.Metadata().Bucket(includingBlocksIndexKey).CreateBucketIfNotExists(txHash[:])
if err != nil {
return err
}
blockIDBytes := make([]byte, 4)
byteOrder.PutUint32(blockIDBytes, uint32(blockID))
return bucket.Put(blockIDBytes, serializedData)
}
func dbPutAcceptingBlocksEntry(dbTx database.Tx, txHash *daghash.Hash, blockID uint32, serializedData []byte) error {
bucket, err := dbTx.Metadata().Bucket(acceptingBlocksIndexKey).CreateBucketIfNotExists(txHash[:])
if err != nil {
return err
}
blockIDBytes := make([]byte, 4)
byteOrder.PutUint32(blockIDBytes, uint32(blockID))
return bucket.Put(blockIDBytes, serializedData)
}
// dbFetchFirstTxRegion uses an existing database transaction to fetch the block
// region for the provided transaction hash from the transaction index. When
// there is no entry for the provided hash, nil will be returned for the both
// the region and the error.
func dbFetchTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash) (*database.BlockRegion, error) {
//
// P.S Because the transaction can be found in multiple blocks, this function arbitarily
// returns the first block region that is stored in the txindex.
func dbFetchFirstTxRegion(dbTx database.Tx, txHash *daghash.Hash) (*database.BlockRegion, error) {
// Load the record from the database and return now if it doesn't exist.
txIndex := dbTx.Metadata().Bucket(txIndexKey)
serializedData := txIndex.Get(txHash[:])
txBucket := dbTx.Metadata().Bucket(includingBlocksIndexKey).Bucket(txHash[:])
if txBucket == nil {
return nil, database.Error{
ErrorCode: database.ErrCorruption,
Description: fmt.Sprintf("No block region"+
"was found for %s", txHash),
}
}
cursor := txBucket.Cursor()
if ok := cursor.First(); !ok {
return nil, database.Error{
ErrorCode: database.ErrCorruption,
Description: fmt.Sprintf("No block region"+
"was found for %s", txHash),
}
}
blockIDBytes := cursor.Key()
serializedData := cursor.Value()
if len(serializedData) == 0 {
return nil, nil
}
// Ensure the serialized data has enough bytes to properly deserialize.
if len(serializedData) < 12 {
if len(serializedData) < includingBlocksIndexKeyEntrySize {
return nil, database.Error{
ErrorCode: database.ErrCorruption,
Description: fmt.Sprintf("corrupt transaction index "+
@ -204,7 +232,7 @@ func dbFetchTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash) (*database.Bloc
}
// Load the block hash associated with the block ID.
hash, err := dbFetchBlockHashBySerializedID(dbTx, serializedData[0:4])
hash, err := dbFetchBlockHashBySerializedID(dbTx, blockIDBytes)
if err != nil {
return nil, database.Error{
ErrorCode: database.ErrCorruption,
@ -216,15 +244,15 @@ func dbFetchTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash) (*database.Bloc
// Deserialize the final entry.
region := database.BlockRegion{Hash: &daghash.Hash{}}
copy(region.Hash[:], hash[:])
region.Offset = byteOrder.Uint32(serializedData[4:8])
region.Len = byteOrder.Uint32(serializedData[8:12])
region.Offset = byteOrder.Uint32(serializedData[:4])
region.Len = byteOrder.Uint32(serializedData[4:])
return &region, nil
}
// dbAddTxIndexEntries uses an existing database transaction to add a
// transaction index entry for every transaction in the passed block.
func dbAddTxIndexEntries(dbTx database.Tx, block *util.Block, blockID uint32) error {
func dbAddTxIndexEntries(dbTx database.Tx, block *util.Block, blockID uint32, acceptedTxData []*blockdag.TxWithBlockHash) error {
// The offset and length of the transactions within the serialized
// block.
txLocs, err := block.TxLoc()
@ -237,43 +265,46 @@ func dbAddTxIndexEntries(dbTx database.Tx, block *util.Block, blockID uint32) er
// serialize them directly into the slice. Then, pass the appropriate
// subslice to the database to be written. This approach significantly
// cuts down on the number of required allocations.
offset := 0
serializedValues := make([]byte, len(block.Transactions())*txEntrySize)
includingBlocksOffset := 0
serializedIncludingBlocksValues := make([]byte, len(block.Transactions())*includingBlocksIndexKeyEntrySize)
for i, tx := range block.Transactions() {
putTxIndexEntry(serializedValues[offset:], blockID, txLocs[i])
endOffset := offset + txEntrySize
err := dbPutTxIndexEntry(dbTx, tx.Hash(),
serializedValues[offset:endOffset:endOffset])
putIncludingBlocksEntry(serializedIncludingBlocksValues[includingBlocksOffset:], txLocs[i])
endOffset := includingBlocksOffset + includingBlocksIndexKeyEntrySize
err := dbPutIncludingBlocksEntry(dbTx, tx.Hash(), blockID,
serializedIncludingBlocksValues[includingBlocksOffset:endOffset:endOffset])
if err != nil {
return err
}
offset += txEntrySize
includingBlocksOffset += includingBlocksIndexKeyEntrySize
}
return nil
}
blockHashToID := make(map[daghash.Hash]uint32)
blockHashToID[*block.Hash()] = blockID
// dbRemoveTxIndexEntry uses an existing database transaction to remove the most
// recent transaction index entry for the given hash.
func dbRemoveTxIndexEntry(dbTx database.Tx, txHash *daghash.Hash) error {
txIndex := dbTx.Metadata().Bucket(txIndexKey)
serializedData := txIndex.Get(txHash[:])
if len(serializedData) == 0 {
return fmt.Errorf("can't remove non-existent transaction %s "+
"from the transaction index", txHash)
}
acceptingBlocksOffset := 0
return txIndex.Delete(txHash[:])
}
serializedAcceptingBlocksValues := make([]byte, len(acceptedTxData)*acceptingBlocksIndexKeyEntrySize)
for _, tx := range acceptedTxData {
var includingBlockID uint32
var err error
var ok bool
// dbRemoveTxIndexEntries uses an existing database transaction to remove the
// latest transaction entry for every transaction in the passed block.
func dbRemoveTxIndexEntries(dbTx database.Tx, block *util.Block) error {
for _, tx := range block.Transactions() {
err := dbRemoveTxIndexEntry(dbTx, tx.Hash())
if includingBlockID, ok = blockHashToID[*tx.InBlock]; !ok {
includingBlockID, err = dbFetchBlockIDByHash(dbTx, tx.InBlock)
if err != nil {
return err
}
blockHashToID[*tx.InBlock] = includingBlockID
}
putAcceptingBlocksEntry(serializedAcceptingBlocksValues[acceptingBlocksOffset:], includingBlockID)
endOffset := acceptingBlocksOffset + acceptingBlocksIndexKeyEntrySize
err = dbPutAcceptingBlocksEntry(dbTx, tx.Tx.Hash(), blockID,
serializedAcceptingBlocksValues[acceptingBlocksOffset:endOffset:endOffset])
if err != nil {
return err
}
acceptingBlocksOffset += acceptingBlocksIndexKeyEntrySize
}
return nil
@ -356,7 +387,7 @@ func (idx *TxIndex) Init() error {
//
// This is part of the Indexer interface.
func (idx *TxIndex) Key() []byte {
return txIndexKey
return includingBlocksIndexKey
}
// Name returns the human-readable name of the index.
@ -379,20 +410,24 @@ func (idx *TxIndex) Create(dbTx database.Tx) error {
if _, err := meta.CreateBucket(hashByIDIndexBucketName); err != nil {
return err
}
_, err := meta.CreateBucket(txIndexKey)
if _, err := meta.CreateBucket(includingBlocksIndexKey); err != nil {
return err
}
_, err := meta.CreateBucket(acceptingBlocksIndexKey)
return err
}
// ConnectBlock is invoked by the index manager when a new block has been
// connected to the main chain. This indexer adds a hash-to-transaction mapping
// connected to the DAG. This indexer adds a hash-to-transaction mapping
// for every transaction in the passed block.
//
// This is part of the Indexer interface.
func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, _ *blockdag.BlockDAG) error {
func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, _ *blockdag.BlockDAG, acceptedTxsData []*blockdag.TxWithBlockHash) error {
// Increment the internal block ID to use for the block being connected
// and add all of the transactions in the block to the index.
newBlockID := idx.curBlockID + 1
if err := dbAddTxIndexEntries(dbTx, block, newBlockID); err != nil {
if err := dbAddTxIndexEntries(dbTx, block, newBlockID, acceptedTxsData); err != nil {
return err
}
@ -406,37 +441,17 @@ func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, _ *blockda
return nil
}
// DisconnectBlock is invoked by the index manager when a block has been
// disconnected from the main chain. This indexer removes the
// hash-to-transaction mapping for every transaction in the block.
//
// This is part of the Indexer interface.
func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block *util.Block, _ *blockdag.BlockDAG) error {
// Remove all of the transactions in the block from the index.
if err := dbRemoveTxIndexEntries(dbTx, block); err != nil {
return err
}
// Remove the block ID index entry for the block being disconnected and
// decrement the current internal block ID to account for it.
if err := dbRemoveBlockIDIndexEntry(dbTx, block.Hash()); err != nil {
return err
}
idx.curBlockID--
return nil
}
// TxBlockRegion returns the block region for the provided transaction hash
// TxFirstBlockRegion returns the block region for the provided transaction hash
// from the transaction index. The block region can in turn be used to load the
// raw transaction bytes. When there is no entry for the provided hash, nil
// will be returned for the both the entry and the error.
//
// This function is safe for concurrent access.
func (idx *TxIndex) TxBlockRegion(hash *daghash.Hash) (*database.BlockRegion, error) {
func (idx *TxIndex) TxFirstBlockRegion(hash *daghash.Hash) (*database.BlockRegion, error) {
var region *database.BlockRegion
err := idx.db.View(func(dbTx database.Tx) error {
var err error
region, err = dbFetchTxIndexEntry(dbTx, hash)
region, err = dbFetchFirstTxRegion(dbTx, hash)
return err
})
return region, err
@ -475,5 +490,10 @@ func DropTxIndex(db database.DB, interrupt <-chan struct{}) error {
return err
}
return dropIndex(db, txIndexKey, txIndexName, interrupt)
err = dropIndex(db, includingBlocksIndexKey, addrIndexName, interrupt)
if err != nil {
return err
}
return dropIndex(db, acceptingBlocksIndexKey, txIndexName, interrupt)
}

View File

@ -0,0 +1,175 @@
package indexers
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"reflect"
"testing"
"github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/dagconfig/daghash"
"github.com/daglabs/btcd/database"
"github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/wire"
)
func tempDb() (database.DB, func(), error) {
dbPath, err := ioutil.TempDir("", "ffldb")
if err != nil {
return nil, nil, err
}
db, err := database.Create("ffldb", dbPath, wire.MainNet)
if err != nil {
return nil, nil, fmt.Errorf("error creating db: %v", err)
}
teardown := func() {
db.Close()
os.RemoveAll(dbPath)
}
return db, teardown, nil
}
func TestTxIndexConnectBlock(t *testing.T) {
db, teardown, err := tempDb()
if teardown != nil {
defer teardown()
}
if err != nil {
t.Fatalf("TestTxIndexConnectBlock: %v", err)
}
err = db.Update(func(dbTx database.Tx) error {
txIndex := NewTxIndex(db)
err := txIndex.Create(dbTx)
if err != nil {
t.Fatalf("TestTxIndexConnectBlock: Couldn't create txIndex: %v", err)
}
msgBlock1 := wire.NewMsgBlock(wire.NewBlockHeader(1,
[]daghash.Hash{{1}}, &daghash.Hash{}, 1, 1))
dummyPrevOutHash, err := daghash.NewHashFromStr("01")
if err != nil {
t.Fatalf("TestTxIndexConnectBlock: NewShaHashFromStr: unexpected error: %v", err)
}
dummyPrevOut1 := wire.OutPoint{Hash: *dummyPrevOutHash, Index: 0}
dummySigScript := bytes.Repeat([]byte{0x00}, 65)
dummyTxOut := &wire.TxOut{
Value: 5000000000,
PkScript: bytes.Repeat([]byte{0x00}, 65),
}
tx1 := wire.NewMsgTx(wire.TxVersion)
tx1.AddTxIn(wire.NewTxIn(&dummyPrevOut1, dummySigScript))
tx1.AddTxOut(dummyTxOut)
msgBlock1.AddTransaction(tx1)
block1 := util.NewBlock(msgBlock1)
err = txIndex.ConnectBlock(dbTx, block1, &blockdag.BlockDAG{}, []*blockdag.TxWithBlockHash{
{
Tx: util.NewTx(tx1),
InBlock: block1.Hash(),
},
})
if err != nil {
t.Fatalf("TestTxIndexConnectBlock: Couldn't connect block 1 to txindex")
}
tx1Hash := tx1.TxHash()
block1IDBytes := make([]byte, 4)
byteOrder.PutUint32(block1IDBytes, uint32(1))
tx1IncludingBucket := dbTx.Metadata().Bucket(includingBlocksIndexKey).Bucket(tx1Hash[:])
if tx1IncludingBucket == nil {
t.Fatalf("TestTxIndexConnectBlock: No including blocks bucket was found for tx1")
}
block1Tx1includingBlocksIndexEntry := tx1IncludingBucket.Get(block1IDBytes)
if len(block1Tx1includingBlocksIndexEntry) == 0 {
t.Fatalf("TestTxIndexConnectBlock: there was no entry for block1 in tx1's including blocks bucket")
}
tx1Offset := byteOrder.Uint32(block1Tx1includingBlocksIndexEntry[:4])
tx1Len := byteOrder.Uint32(block1Tx1includingBlocksIndexEntry[4:])
block1Bytes, err := block1.Bytes()
if err != nil {
t.Fatalf("TestTxIndexConnectBlock: Couldn't serialize block 1 to bytes")
}
tx1InBlock1 := block1Bytes[tx1Offset : tx1Offset+tx1Len]
wTx1 := bytes.NewBuffer(make([]byte, 0, tx1.SerializeSize()))
tx1.BtcEncode(wTx1, 0)
tx1Bytes := wTx1.Bytes()
if !reflect.DeepEqual(tx1Bytes, tx1InBlock1) {
t.Errorf("TestTxIndexConnectBlock: the block region that was in the bucket doesn't match tx1")
}
tx1AcceptingBlocksBucket := dbTx.Metadata().Bucket(acceptingBlocksIndexKey).Bucket(tx1Hash[:])
if tx1AcceptingBlocksBucket == nil {
t.Fatalf("TestTxIndexConnectBlock: No accepting blocks bucket was found for tx1")
}
block1Tx1AcceptingEntry := tx1AcceptingBlocksBucket.Get(block1IDBytes)
tx1IncludingBlockID := byteOrder.Uint32(block1Tx1AcceptingEntry)
if tx1IncludingBlockID != 1 {
t.Fatalf("TestTxIndexConnectBlock: tx1 should've been included in block 1, but got %v", tx1IncludingBlockID)
}
msgBlock2 := wire.NewMsgBlock(wire.NewBlockHeader(1,
[]daghash.Hash{{2}}, &daghash.Hash{}, 1, 1))
dummyPrevOut2 := wire.OutPoint{Hash: *dummyPrevOutHash, Index: 1}
tx2 := wire.NewMsgTx(wire.TxVersion)
tx2.AddTxIn(wire.NewTxIn(&dummyPrevOut2, dummySigScript))
tx2.AddTxOut(dummyTxOut)
msgBlock2.AddTransaction(tx2)
block2 := util.NewBlock(msgBlock2)
err = txIndex.ConnectBlock(dbTx, block2, &blockdag.BlockDAG{}, []*blockdag.TxWithBlockHash{
{
Tx: util.NewTx(tx1),
InBlock: block1.Hash(),
},
{
Tx: util.NewTx(tx2),
InBlock: block2.Hash(),
},
})
if err != nil {
t.Fatalf("TestTxIndexConnectBlock: Couldn't connect block 2 to txindex")
}
tx2Hash := tx2.TxHash()
block2IDBytes := make([]byte, 4)
byteOrder.PutUint32(block2IDBytes, uint32(2))
tx2IncludingBlocksBucket := dbTx.Metadata().Bucket(includingBlocksIndexKey).Bucket(tx2Hash[:])
if tx2IncludingBlocksBucket == nil {
t.Fatalf("TestTxIndexConnectBlock: No including blocks bucket was found for tx2")
}
block2Tx2includingBlocksIndexEntry := tx2IncludingBlocksBucket.Get(block2IDBytes)
if len(block2Tx2includingBlocksIndexEntry) == 0 {
t.Fatalf("TestTxIndexConnectBlock: there was no entry for block2 in tx2's including blocks bucket")
}
tx2AcceptingBlocksBucket := dbTx.Metadata().Bucket(acceptingBlocksIndexKey).Bucket(tx2Hash[:])
if tx2AcceptingBlocksBucket == nil {
t.Fatalf("TestTxIndexConnectBlock: No accepting blocks bucket was found for tx2")
}
block2Tx2AcceptingEntry := tx2AcceptingBlocksBucket.Get(block2IDBytes)
tx2IncludingBlockID := byteOrder.Uint32(block2Tx2AcceptingEntry)
if tx2IncludingBlockID != 2 {
t.Fatalf("TestTxIndexConnectBlock: tx2 should've been included in block 2, but got %v", tx1IncludingBlockID)
}
block2Tx1AcceptingEntry := tx1AcceptingBlocksBucket.Get(block2IDBytes)
tx1Block2IncludingBlockID := byteOrder.Uint32(block2Tx1AcceptingEntry)
if tx1Block2IncludingBlockID != 1 {
t.Fatalf("TestTxIndexConnectBlock: tx2 should've been included in block 1, but got %v", tx1Block2IncludingBlockID)
}
return nil
})
if err != nil {
t.Fatalf("TestTxIndexConnectBlock: %v", err)
}
}

View File

@ -878,7 +878,7 @@ func TestApplyUTXOChanges(t *testing.T) {
initBlockNode(&node1, blockHeader, setFromSlice(dag.genesis), dagconfig.MainNetParams.K)
//Checks that dag.applyUTXOChanges fails because we don't allow a transaction to spend another transaction from the same block
_, err = dag.applyUTXOChanges(&node1, block1)
_, _, err = dag.applyUTXOChanges(&node1, block1)
if err == nil {
t.Errorf("applyUTXOChanges expected an error\n")
}
@ -904,8 +904,8 @@ func TestApplyUTXOChanges(t *testing.T) {
var node2 blockNode
initBlockNode(&node2, blockHeader, setFromSlice(dag.genesis), dagconfig.MainNetParams.K)
//Checks that dag.applyUTXOChanges doesn't fail because we all of its transaction are dependant on transactions from previous blocks
_, err = dag.applyUTXOChanges(&node2, block2)
//Checks that dag.applyUTXOChanges doesn't fail because all of its transaction are dependant on transactions from previous blocks
_, _, err = dag.applyUTXOChanges(&node2, block2)
if err != nil {
t.Errorf("applyUTXOChanges: %v", err)
}

View File

@ -2443,7 +2443,7 @@ func handleGetRawTransaction(s *Server, cmd interface{}, closeChan <-chan struct
}
// Look up the location of the transaction.
blockRegion, err := s.cfg.TxIndex.TxBlockRegion(txHash)
blockRegion, err := s.cfg.TxIndex.TxFirstBlockRegion(txHash)
if err != nil {
context := "Failed to retrieve transaction location"
return nil, internalRPCError(err.Error(), context)
@ -2724,7 +2724,7 @@ func fetchInputTxos(s *Server, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, er
}
// Look up the location of the transaction.
blockRegion, err := s.cfg.TxIndex.TxBlockRegion(&origin.Hash)
blockRegion, err := s.cfg.TxIndex.TxFirstBlockRegion(&origin.Hash)
if err != nil {
context := "Failed to retrieve transaction location"
return nil, internalRPCError(err.Error(), context)