diff --git a/blockdag/accept_test.go b/blockdag/accept_test.go index 1207c8f40..271b2cd36 100644 --- a/blockdag/accept_test.go +++ b/blockdag/accept_test.go @@ -2,6 +2,7 @@ package blockdag import ( "errors" + "path/filepath" "strings" "testing" @@ -25,7 +26,7 @@ func TestMaybeAcceptBlockErrors(t *testing.T) { // Test rejecting the block if its parents are missing orphanBlockFile := "blk_3B.dat" - loadedBlocks, err := loadBlocks(orphanBlockFile) + loadedBlocks, err := LoadBlocks(filepath.Join("testdata/", orphanBlockFile)) if err != nil { t.Fatalf("TestMaybeAcceptBlockErrors: "+ "Error loading file '%s': %s\n", orphanBlockFile, err) @@ -48,7 +49,7 @@ func TestMaybeAcceptBlockErrors(t *testing.T) { // Test rejecting the block if its parents are invalid blocksFile := "blk_0_to_4.dat" - blocks, err := loadBlocks(blocksFile) + blocks, err := LoadBlocks(filepath.Join("testdata/", blocksFile)) if err != nil { t.Fatalf("TestMaybeAcceptBlockErrors: "+ "Error loading file '%s': %s\n", blocksFile, err) diff --git a/blockdag/blockidhash.go b/blockdag/blockidhash.go new file mode 100644 index 000000000..66483adca --- /dev/null +++ b/blockdag/blockidhash.go @@ -0,0 +1,136 @@ +package blockdag + +import ( + "fmt" + "github.com/daglabs/btcd/database" + "github.com/daglabs/btcd/util/daghash" +) + +var ( + // idByHashIndexBucketName is the name of the db bucket used to house + // the block hash -> block id index. + idByHashIndexBucketName = []byte("idbyhashidx") + + // hashByIDIndexBucketName is the name of the db bucket used to house + // the block id -> block hash index. + hashByIDIndexBucketName = []byte("hashbyididx") + + currentBlockIDKey = []byte("currentblockid") +) + +// ----------------------------------------------------------------------------- +// This is a mapping between block hashes and unique IDs. The ID +// is simply a sequentially incremented uint64 that is used instead of block hash +// for the indexers. This is useful because it is only 8 bytes versus 32 bytes +// hashes and thus saves a ton of space when a block is referenced in an index. +// It consists of three buckets: the first bucket maps the hash of each +// block to the unique ID and the second maps that ID back to the block hash. +// The third bucket contains the last received block ID, and is used +// when starting the node to check that the enabled indexes are up to date +// with the latest received block, and if not, initiate recovery process. +// +// The serialized format for keys and values in the block hash to ID bucket is: +// = +// +// Field Type Size +// hash daghash.Hash 32 bytes +// ID uint64 8 bytes +// ----- +// Total: 40 bytes +// +// The serialized format for keys and values in the ID to block hash bucket is: +// = +// +// Field Type Size +// ID uint64 8 bytes +// hash daghash.Hash 32 bytes +// ----- +// Total: 40 bytes +// +// ----------------------------------------------------------------------------- + +const blockIDSize = 8 // 8 bytes for block ID + +// DBFetchBlockIDByHash uses an existing database transaction to retrieve the +// block id for the provided hash from the index. +func DBFetchBlockIDByHash(dbTx database.Tx, hash *daghash.Hash) (uint64, error) { + hashIndex := dbTx.Metadata().Bucket(idByHashIndexBucketName) + serializedID := hashIndex.Get(hash[:]) + if serializedID == nil { + return 0, fmt.Errorf("no entry in the block ID index for block with hash %s", hash) + } + + return DeserializeBlockID(serializedID), nil +} + +// DBFetchBlockHashBySerializedID uses an existing database transaction to +// retrieve the hash for the provided serialized block id from the index. +func DBFetchBlockHashBySerializedID(dbTx database.Tx, serializedID []byte) (*daghash.Hash, error) { + idIndex := dbTx.Metadata().Bucket(hashByIDIndexBucketName) + hashBytes := idIndex.Get(serializedID) + if hashBytes == nil { + return nil, fmt.Errorf("no entry in the block ID index for block with id %d", byteOrder.Uint64(serializedID)) + } + + var hash daghash.Hash + copy(hash[:], hashBytes) + return &hash, nil +} + +// dbPutBlockIDIndexEntry uses an existing database transaction to update or add +// the index entries for the hash to id and id to hash mappings for the provided +// values. +func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *daghash.Hash, serializedID []byte) error { + // Add the block hash to ID mapping to the index. + meta := dbTx.Metadata() + hashIndex := meta.Bucket(idByHashIndexBucketName) + if err := hashIndex.Put(hash[:], serializedID[:]); err != nil { + return err + } + + // Add the block ID to hash mapping to the index. + idIndex := meta.Bucket(hashByIDIndexBucketName) + return idIndex.Put(serializedID[:], hash[:]) +} + +// DBFetchCurrentBlockID returns the last known block ID. +func DBFetchCurrentBlockID(dbTx database.Tx) uint64 { + serializedID := dbTx.Metadata().Get(currentBlockIDKey) + if serializedID == nil { + return 0 + } + return DeserializeBlockID(serializedID) +} + +// DeserializeBlockID returns a deserialized block id +func DeserializeBlockID(serializedID []byte) uint64 { + return byteOrder.Uint64(serializedID) +} + +// SerializeBlockID returns a serialized block id +func SerializeBlockID(blockID uint64) []byte { + serializedBlockID := make([]byte, blockIDSize) + byteOrder.PutUint64(serializedBlockID, blockID) + return serializedBlockID +} + +// DBFetchBlockHashByID uses an existing database transaction to retrieve the +// hash for the provided block id from the index. +func DBFetchBlockHashByID(dbTx database.Tx, id uint64) (*daghash.Hash, error) { + return DBFetchBlockHashBySerializedID(dbTx, SerializeBlockID(id)) +} + +func createBlockID(dbTx database.Tx, blockHash *daghash.Hash) (uint64, error) { + currentBlockID := DBFetchCurrentBlockID(dbTx) + newBlockID := currentBlockID + 1 + serializedNewBlockID := SerializeBlockID(newBlockID) + err := dbTx.Metadata().Put(currentBlockIDKey, serializedNewBlockID) + if err != nil { + return 0, err + } + err = dbPutBlockIDIndexEntry(dbTx, blockHash, serializedNewBlockID) + if err != nil { + return 0, err + } + return newBlockID, nil +} diff --git a/blockdag/common_test.go b/blockdag/common_test.go index 8c8461950..d353173f9 100644 --- a/blockdag/common_test.go +++ b/blockdag/common_test.go @@ -23,66 +23,8 @@ import ( "github.com/daglabs/btcd/wire" ) -// loadBlocks reads files containing bitcoin block data (gzipped but otherwise -// in the format bitcoind writes) from disk and returns them as an array of -// util.Block. This is largely borrowed from the test code in btcdb. -func loadBlocks(filename string) (blocks []*util.Block, err error) { - filename = filepath.Join("testdata/", filename) - - var network = wire.MainNet - var dr io.Reader - var fi io.ReadCloser - - fi, err = os.Open(filename) - if err != nil { - return - } - - if strings.HasSuffix(filename, ".bz2") { - dr = bzip2.NewReader(fi) - } else { - dr = fi - } - defer fi.Close() - - var block *util.Block - - err = nil - for height := uint64(0); err == nil; height++ { - var rintbuf uint32 - err = binary.Read(dr, binary.LittleEndian, &rintbuf) - if err == io.EOF { - // hit end of file at expected offset: no warning - height-- - err = nil - break - } - if err != nil { - break - } - if rintbuf != uint32(network) { - break - } - err = binary.Read(dr, binary.LittleEndian, &rintbuf) - blocklen := rintbuf - - rbytes := make([]byte, blocklen) - - // read block - dr.Read(rbytes) - - block, err = util.NewBlockFromBytes(rbytes) - if err != nil { - return - } - blocks = append(blocks, block) - } - - return -} - func loadBlocksWithLog(t *testing.T, filename string) ([]*util.Block, error) { - blocks, err := loadBlocks(filename) + blocks, err := LoadBlocks(filename) if err == nil { t.Logf("Loaded %d blocks from file %s", len(blocks), filename) for i, b := range blocks { diff --git a/blockdag/dag.go b/blockdag/dag.go index 25771d1f5..55d8d30c5 100644 --- a/blockdag/dag.go +++ b/blockdag/dag.go @@ -633,11 +633,16 @@ func (dag *BlockDAG) saveChangesFromBlock(node *blockNode, block *util.Block, vi return err } + blockID, err := createBlockID(dbTx, block.Hash()) + if err != nil { + return err + } + // Allow the index manager to call each of the currently active // optional indexes with the block being connected so they can // update themselves accordingly. if dag.indexManager != nil { - err := dag.indexManager.ConnectBlock(dbTx, block, dag, txsAcceptanceData, virtualTxsAcceptanceData) + err := dag.indexManager.ConnectBlock(dbTx, block, blockID, dag, txsAcceptanceData, virtualTxsAcceptanceData) if err != nil { return err } @@ -840,6 +845,15 @@ func (dag *BlockDAG) TxsAcceptedByVirtual() (MultiBlockTxsAcceptanceData, error) return txsAcceptanceData, err } +// TxsAcceptedByBlockHash retrieves transactions accepted by the given block +// +// This function MUST be called with the DAG read-lock held +func (dag *BlockDAG) TxsAcceptedByBlockHash(blockHash *daghash.Hash) (MultiBlockTxsAcceptanceData, error) { + node := dag.index.LookupNode(blockHash) + _, txsAcceptanceData, err := dag.pastUTXO(node) + return txsAcceptanceData, err +} + // applyDAGChanges does the following: // 1. Connects each of the new block's parents to the block. // 2. Adds the new block to the DAG's tips. @@ -1762,7 +1776,7 @@ type IndexManager interface { // ConnectBlock is invoked when a new block has been connected to the // DAG. - ConnectBlock(database.Tx, *util.Block, *BlockDAG, MultiBlockTxsAcceptanceData, MultiBlockTxsAcceptanceData) error + ConnectBlock(dbTx database.Tx, block *util.Block, blockID uint64, dag *BlockDAG, acceptedTxsData MultiBlockTxsAcceptanceData, virtualTxsAcceptanceData MultiBlockTxsAcceptanceData) error } // Config is a descriptor which specifies the blockchain instance configuration. diff --git a/blockdag/dag_test.go b/blockdag/dag_test.go index 2e7aafc6b..16f566dfd 100644 --- a/blockdag/dag_test.go +++ b/blockdag/dag_test.go @@ -38,7 +38,7 @@ func TestBlockCount(t *testing.T) { var blocks []*util.Block for _, file := range testFiles { - blockTmp, err := loadBlocks(file) + blockTmp, err := LoadBlocks(filepath.Join("testdata/", file)) if err != nil { t.Fatalf("Error loading file: %v\n", err) } @@ -91,7 +91,7 @@ func TestHaveBlock(t *testing.T) { var blocks []*util.Block for _, file := range testFiles { - blockTmp, err := loadBlocks(file) + blockTmp, err := LoadBlocks(filepath.Join("testdata/", file)) if err != nil { t.Fatalf("Error loading file: %v\n", err) } @@ -132,7 +132,7 @@ func TestHaveBlock(t *testing.T) { } for _, file := range testFiles { - blockTmp, err := loadBlocks(file) + blockTmp, err := LoadBlocks(filepath.Join("testdata/", file)) if err != nil { t.Fatalf("Error loading file: %v\n", err) } @@ -159,7 +159,7 @@ func TestHaveBlock(t *testing.T) { } for _, file := range testFiles { - blockTmp, err := loadBlocks(file) + blockTmp, err := LoadBlocks(filepath.Join("testdata/", file)) if err != nil { t.Fatalf("Error loading file: %v\n", err) } @@ -803,7 +803,7 @@ func testErrorThroughPatching(t *testing.T, expectedErrorMessage string, targetF var blocks []*util.Block for _, file := range testFiles { - blockTmp, err := loadBlocks(file) + blockTmp, err := LoadBlocks(filepath.Join("testdata/", file)) if err != nil { t.Fatalf("Error loading file: %v\n", err) } @@ -855,7 +855,7 @@ func testErrorThroughPatching(t *testing.T, expectedErrorMessage string, targetF func TestNew(t *testing.T) { // Create the root directory for test databases. - if !fileExists(testDbRoot) { + if !FileExists(testDbRoot) { if err := os.MkdirAll(testDbRoot, 0700); err != nil { t.Fatalf("unable to create test db "+ "root: %s", err) @@ -901,7 +901,7 @@ func TestNew(t *testing.T) { // validated. func TestAcceptingInInit(t *testing.T) { // Create the root directory for test databases. - if !fileExists(testDbRoot) { + if !FileExists(testDbRoot) { if err := os.MkdirAll(testDbRoot, 0700); err != nil { t.Fatalf("unable to create test db "+ "root: %s", err) @@ -934,7 +934,7 @@ func TestAcceptingInInit(t *testing.T) { } // Load the test block - blocks, err := loadBlocks("blk_0_to_4.dat") + blocks, err := LoadBlocks("testdata/blk_0_to_4.dat") if err != nil { t.Fatalf("Error loading file: %v\n", err) } diff --git a/blockdag/dagio.go b/blockdag/dagio.go index e9048f5ab..d6e49419b 100644 --- a/blockdag/dagio.go +++ b/blockdag/dagio.go @@ -335,6 +335,13 @@ func (dag *BlockDAG) createDAGState() error { if err := dbPutLocalSubnetworkID(dbTx, dag.subnetworkID); err != nil { return err } + + if _, err := meta.CreateBucketIfNotExists(idByHashIndexBucketName); err != nil { + return err + } + if _, err := meta.CreateBucketIfNotExists(hashByIDIndexBucketName); err != nil { + return err + } return nil }) @@ -747,7 +754,7 @@ func dbStoreBlockNode(dbTx database.Tx, node *blockNode) error { // Write block header data to block index bucket. blockIndexBucket := dbTx.Metadata().Bucket(blockIndexBucketName) - key := blockIndexKey(node.hash, node.blueScore) + key := BlockIndexKey(node.hash, node.blueScore) return blockIndexBucket.Put(key, value) } @@ -764,19 +771,19 @@ func dbStoreBlock(dbTx database.Tx, block *util.Block) error { return dbTx.StoreBlock(block) } -// blockIndexKey generates the binary key for an entry in the block index +// BlockIndexKey generates the binary key for an entry in the block index // bucket. The key is composed of the block blue score encoded as a big-endian // 64-bit unsigned int followed by the 32 byte block hash. // The blue score component is important for iteration order. -func blockIndexKey(blockHash *daghash.Hash, blueScore uint64) []byte { +func BlockIndexKey(blockHash *daghash.Hash, blueScore uint64) []byte { indexKey := make([]byte, daghash.HashSize+8) binary.BigEndian.PutUint64(indexKey[0:8], blueScore) copy(indexKey[8:daghash.HashSize+8], blockHash[:]) return indexKey } -func blockHashFromBlockIndexKey(blockIndexKey []byte) (*daghash.Hash, error) { - return daghash.NewHash(blockIndexKey[8 : daghash.HashSize+8]) +func blockHashFromBlockIndexKey(BlockIndexKey []byte) (*daghash.Hash, error) { + return daghash.NewHash(BlockIndexKey[8 : daghash.HashSize+8]) } // BlockByHash returns the block from the DAG with the given hash. @@ -823,7 +830,7 @@ func (dag *BlockDAG) BlockHashesFrom(startHash *daghash.Hash, limit int) ([]*dag err = dag.index.db.View(func(dbTx database.Tx) error { blockIndexBucket := dbTx.Metadata().Bucket(blockIndexBucketName) - startKey := blockIndexKey(startHash, blueScore) + startKey := BlockIndexKey(startHash, blueScore) cursor := blockIndexBucket.Cursor() cursor.Seek(startKey) diff --git a/blockdag/indexers/acceptanceindex.go b/blockdag/indexers/acceptanceindex.go index fc5792689..15a892d81 100644 --- a/blockdag/indexers/acceptanceindex.go +++ b/blockdag/indexers/acceptanceindex.go @@ -5,7 +5,6 @@ import ( "encoding/gob" "fmt" "github.com/daglabs/btcd/blockdag" - "github.com/daglabs/btcd/dagconfig" "github.com/daglabs/btcd/database" "github.com/daglabs/btcd/util" "github.com/daglabs/btcd/util/daghash" @@ -27,16 +26,20 @@ var ( // it stores a mapping between a block's hash and the set of transactions that the // block accepts among its blue blocks. type AcceptanceIndex struct { - db database.DB + db database.DB + dag *blockdag.BlockDAG } +// Ensure the AcceptanceIndex type implements the Indexer interface. +var _ Indexer = (*AcceptanceIndex)(nil) + // NewAcceptanceIndex returns a new instance of an indexer that is used to create a // mapping between block hashes and their txAcceptanceData. // // It implements the Indexer interface which plugs into the IndexManager that in // turn is used by the blockdag package. This allows the index to be // seamlessly maintained along with the DAG. -func NewAcceptanceIndex(_ *dagconfig.Params) *AcceptanceIndex { +func NewAcceptanceIndex() *AcceptanceIndex { return &AcceptanceIndex{} } @@ -73,8 +76,9 @@ func (idx *AcceptanceIndex) Create(dbTx database.Tx) error { // Init initializes the hash-based acceptance index. // // This is part of the Indexer interface. -func (idx *AcceptanceIndex) Init(db database.DB, _ *blockdag.BlockDAG) error { +func (idx *AcceptanceIndex) Init(db database.DB, dag *blockdag.BlockDAG) error { idx.db = db + idx.dag = dag return nil } @@ -82,9 +86,9 @@ func (idx *AcceptanceIndex) Init(db database.DB, _ *blockdag.BlockDAG) error { // connected to the DAG. // // This is part of the Indexer interface. -func (idx *AcceptanceIndex) ConnectBlock(dbTx database.Tx, block *util.Block, _ *blockdag.BlockDAG, +func (idx *AcceptanceIndex) ConnectBlock(dbTx database.Tx, _ *util.Block, blockID uint64, _ *blockdag.BlockDAG, txsAcceptanceData blockdag.MultiBlockTxsAcceptanceData, _ blockdag.MultiBlockTxsAcceptanceData) error { - return idx.dbPutTxsAcceptanceData(dbTx, block.Hash(), txsAcceptanceData) + return dbPutTxsAcceptanceData(dbTx, blockID, txsAcceptanceData) } // TxsAcceptanceData returns the acceptance data of all the transactions that @@ -93,7 +97,7 @@ func (idx *AcceptanceIndex) TxsAcceptanceData(blockHash *daghash.Hash) (blockdag var txsAcceptanceData blockdag.MultiBlockTxsAcceptanceData err := idx.db.View(func(dbTx database.Tx) error { var err error - txsAcceptanceData, err = idx.dbFetchTxsAcceptanceData(dbTx, blockHash) + txsAcceptanceData, err = dbFetchTxsAcceptanceDataByHash(dbTx, blockHash) return err }) if err != nil { @@ -102,7 +106,29 @@ func (idx *AcceptanceIndex) TxsAcceptanceData(blockHash *daghash.Hash) (blockdag return txsAcceptanceData, nil } -func (idx *AcceptanceIndex) dbPutTxsAcceptanceData(dbTx database.Tx, hash *daghash.Hash, +// Recover is invoked when the indexer wasn't turned on for several blocks +// and the indexer needs to close the gaps. +// +// This is part of the Indexer interface. +func (idx *AcceptanceIndex) Recover(dbTx database.Tx, currentBlockID, lastKnownBlockID uint64) error { + for blockID := currentBlockID + 1; blockID <= lastKnownBlockID; blockID++ { + hash, err := blockdag.DBFetchBlockHashByID(dbTx, currentBlockID) + if err != nil { + return err + } + txAcceptanceData, err := idx.dag.TxsAcceptedByBlockHash(hash) + if err != nil { + return err + } + err = idx.ConnectBlock(dbTx, nil, blockID, nil, txAcceptanceData, nil) + if err != nil { + return err + } + } + return nil +} + +func dbPutTxsAcceptanceData(dbTx database.Tx, blockID uint64, txsAcceptanceData blockdag.MultiBlockTxsAcceptanceData) error { serializedTxsAcceptanceData, err := serializeMultiBlockTxsAcceptanceData(txsAcceptanceData) if err != nil { @@ -110,16 +136,27 @@ func (idx *AcceptanceIndex) dbPutTxsAcceptanceData(dbTx database.Tx, hash *dagha } bucket := dbTx.Metadata().Bucket(acceptanceIndexKey) - return bucket.Put(hash[:], serializedTxsAcceptanceData) + return bucket.Put(blockdag.SerializeBlockID(blockID), serializedTxsAcceptanceData) } -func (idx *AcceptanceIndex) dbFetchTxsAcceptanceData(dbTx database.Tx, +func dbFetchTxsAcceptanceDataByHash(dbTx database.Tx, hash *daghash.Hash) (blockdag.MultiBlockTxsAcceptanceData, error) { + blockID, err := blockdag.DBFetchBlockIDByHash(dbTx, hash) + if err != nil { + return nil, err + } + + return dbFetchTxsAcceptanceDataByID(dbTx, blockID) +} + +func dbFetchTxsAcceptanceDataByID(dbTx database.Tx, + blockID uint64) (blockdag.MultiBlockTxsAcceptanceData, error) { + serializedBlockID := blockdag.SerializeBlockID(blockID) bucket := dbTx.Metadata().Bucket(acceptanceIndexKey) - serializedTxsAcceptanceData := bucket.Get(hash[:]) + serializedTxsAcceptanceData := bucket.Get(serializedBlockID) if serializedTxsAcceptanceData == nil { - return nil, fmt.Errorf("no entry in the accpetance index for block with hash %s", hash) + return nil, fmt.Errorf("no entry in the accpetance index for block id %d", blockID) } return deserializeMultiBlockTxsAcceptanceData(serializedTxsAcceptanceData) diff --git a/blockdag/indexers/acceptanceindex_test.go b/blockdag/indexers/acceptanceindex_test.go index d8b2804cc..fedf82c5c 100644 --- a/blockdag/indexers/acceptanceindex_test.go +++ b/blockdag/indexers/acceptanceindex_test.go @@ -1,15 +1,23 @@ package indexers import ( + "fmt" "github.com/daglabs/btcd/blockdag" + "github.com/daglabs/btcd/dagconfig" + "github.com/daglabs/btcd/database" "github.com/daglabs/btcd/util" "github.com/daglabs/btcd/util/daghash" "github.com/daglabs/btcd/wire" + "io" + "io/ioutil" + "os" + "path/filepath" "reflect" + "syscall" "testing" ) -func TestSerializationAnDeserialization(t *testing.T) { +func TestAcceptanceIndexSerializationAndDeserialization(t *testing.T) { txsAcceptanceData := blockdag.MultiBlockTxsAcceptanceData{} // Create test data @@ -33,17 +41,299 @@ func TestSerializationAnDeserialization(t *testing.T) { // Serialize serializedTxsAcceptanceData, err := serializeMultiBlockTxsAcceptanceData(txsAcceptanceData) if err != nil { - t.Fatalf("TestSerializationAnDeserialization: serialization failed: %s", err) + t.Fatalf("TestAcceptanceIndexSerializationAndDeserialization: serialization failed: %s", err) } // Deserialize deserializedTxsAcceptanceData, err := deserializeMultiBlockTxsAcceptanceData(serializedTxsAcceptanceData) if err != nil { - t.Fatalf("TestSerializationAnDeserialization: deserialization failed: %s", err) + t.Fatalf("TestAcceptanceIndexSerializationAndDeserialization: deserialization failed: %s", err) } // Check that they're the same if !reflect.DeepEqual(txsAcceptanceData, deserializedTxsAcceptanceData) { - t.Fatalf("TestSerializationAnDeserialization: original data and deseralize data aren't equal") + t.Fatalf("TestAcceptanceIndexSerializationAndDeserialization: original data and deseralize data aren't equal") } } + +// TestAcceptanceIndexRecover tests the recoverability of the +// acceptance index. +// It does it by following these steps: +// * It creates a DAG with enabled acceptance index (let's call it dag1) and +// make it process some blocks. +// * It creates a copy of dag1 (let's call it dag2), and disables the acceptance +// index in it. +// * It processes two more blocks in both dag1 and dag2. +// * A copy of dag2 is created (let's call it dag3) with enabled +// acceptance index +// * It checks that the two missing blocks are added to dag3 acceptance index by +// comparing dag1's last block acceptance data and dag3's last block acceptance +// data. +func TestAcceptanceIndexRecover(t *testing.T) { + params := &dagconfig.SimNetParams + params.BlockCoinbaseMaturity = 0 + + testFiles := []string{ + "blk_0_to_4.dat", + "blk_3B.dat", + } + + var blocks []*util.Block + for _, file := range testFiles { + blockTmp, err := blockdag.LoadBlocks(filepath.Join("../testdata/", file)) + if err != nil { + t.Fatalf("Error loading file: %v\n", err) + } + blocks = append(blocks, blockTmp...) + } + + db1AcceptanceIndex := NewAcceptanceIndex() + db1IndexManager := NewManager([]Indexer{db1AcceptanceIndex}) + db1Path, err := ioutil.TempDir("", "TestAcceptanceIndexRecover1") + if err != nil { + t.Fatalf("Error creating temporary directory: %s", err) + } + defer os.RemoveAll(db1Path) + + db1, err := database.Create("ffldb", db1Path, params.Net) + if err != nil { + t.Fatalf("error creating db: %s", err) + } + + db1Config := blockdag.Config{ + IndexManager: db1IndexManager, + DAGParams: params, + DB: db1, + } + + db1DAG, teardown, err := blockdag.DAGSetup("", db1Config) + if err != nil { + t.Fatalf("TestAcceptanceIndexRecover: Failed to setup DAG instance: %v", err) + } + if teardown != nil { + defer teardown() + } + + for i := 1; i < len(blocks)-2; i++ { + isOrphan, delay, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone) + if err != nil { + t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err) + } + if delay != 0 { + t.Fatalf("ProcessBlock: block %d "+ + "is too far in the future", i) + } + if isOrphan { + t.Fatalf("ProcessBlock incorrectly returned block %v "+ + "is an orphan\n", i) + } + } + + err = db1.FlushCache() + if err != nil { + t.Fatalf("Error flushing database to disk: %s", err) + } + + db2Path, err := ioutil.TempDir("", "TestAcceptanceIndexRecover2") + if err != nil { + t.Fatalf("Error creating temporary directory: %s", err) + } + defer os.RemoveAll(db2Path) + + err = copyDirectory(db1Path, db2Path) + if err != nil { + t.Fatalf("copyDirectory: %s", err) + } + + for i := len(blocks) - 2; i < len(blocks); i++ { + isOrphan, delay, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone) + if err != nil { + t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err) + } + if delay != 0 { + t.Fatalf("ProcessBlock: block %d "+ + "is too far in the future", i) + } + if isOrphan { + t.Fatalf("ProcessBlock incorrectly returned block %v "+ + "is an orphan\n", i) + } + } + + db1LastBlockAcceptanceData, err := db1AcceptanceIndex.TxsAcceptanceData(blocks[len(blocks)-1].Hash()) + if err != nil { + t.Fatalf("Error fetching acceptance data: %s", err) + } + + db2, err := database.Open("ffldb", db2Path, params.Net) + if err != nil { + t.Fatalf("Error opening database: %s", err) + } + + db2Config := blockdag.Config{ + DAGParams: params, + DB: db2, + } + + db2DAG, teardown, err := blockdag.DAGSetup("", db2Config) + if err != nil { + t.Fatalf("TestAcceptanceIndexRecover: Failed to setup DAG instance: %v", err) + } + if teardown != nil { + defer teardown() + } + + for i := len(blocks) - 2; i < len(blocks); i++ { + isOrphan, delay, err := db2DAG.ProcessBlock(blocks[i], blockdag.BFNone) + if err != nil { + t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err) + } + if delay != 0 { + t.Fatalf("ProcessBlock: block %d "+ + "is too far in the future", i) + } + if isOrphan { + t.Fatalf("ProcessBlock incorrectly returned block %v "+ + "is an orphan\n", i) + } + } + + err = db2.FlushCache() + if err != nil { + t.Fatalf("Error flushing database to disk: %s", err) + } + db3Path, err := ioutil.TempDir("", "TestAcceptanceIndexRecover3") + if err != nil { + t.Fatalf("Error creating temporary directory: %s", err) + } + defer os.RemoveAll(db3Path) + err = copyDirectory(db2Path, db3Path) + if err != nil { + t.Fatalf("copyDirectory: %s", err) + } + + db3, err := database.Open("ffldb", db3Path, params.Net) + if err != nil { + t.Fatalf("Error opening database: %s", err) + } + + db3AcceptanceIndex := NewAcceptanceIndex() + db3IndexManager := NewManager([]Indexer{db3AcceptanceIndex}) + db3Config := blockdag.Config{ + IndexManager: db3IndexManager, + DAGParams: params, + DB: db3, + } + + _, teardown, err = blockdag.DAGSetup("", db3Config) + if err != nil { + t.Fatalf("TestAcceptanceIndexRecover: Failed to setup DAG instance: %v", err) + } + if teardown != nil { + defer teardown() + } + + db3LastBlockAcceptanceData, err := db3AcceptanceIndex.TxsAcceptanceData(blocks[len(blocks)-1].Hash()) + if err != nil { + t.Fatalf("Error fetching acceptance data: %s", err) + } + if !reflect.DeepEqual(db1LastBlockAcceptanceData, db3LastBlockAcceptanceData) { + t.Fatalf("recovery failed") + } +} + +// This function is copied and modified from this stackoverflow answer: https://stackoverflow.com/a/56314145/2413761 +func copyDirectory(scrDir, dest string) error { + entries, err := ioutil.ReadDir(scrDir) + if err != nil { + return err + } + for _, entry := range entries { + sourcePath := filepath.Join(scrDir, entry.Name()) + destPath := filepath.Join(dest, entry.Name()) + + fileInfo, err := os.Stat(sourcePath) + if err != nil { + return err + } + + stat, ok := fileInfo.Sys().(*syscall.Stat_t) + if !ok { + return fmt.Errorf("failed to get raw syscall.Stat_t data for '%s'", sourcePath) + } + + switch fileInfo.Mode() & os.ModeType { + case os.ModeDir: + if err := createIfNotExists(destPath, 0755); err != nil { + return err + } + if err := copyDirectory(sourcePath, destPath); err != nil { + return err + } + case os.ModeSymlink: + if err := copySymLink(sourcePath, destPath); err != nil { + return err + } + default: + if err := copyFile(sourcePath, destPath); err != nil { + return err + } + } + + if err := os.Lchown(destPath, int(stat.Uid), int(stat.Gid)); err != nil { + return err + } + + isSymlink := entry.Mode()&os.ModeSymlink != 0 + if !isSymlink { + if err := os.Chmod(destPath, entry.Mode()); err != nil { + return err + } + } + } + return nil +} + +// This function is copied and modified from this stackoverflow answer: https://stackoverflow.com/a/56314145/2413761 +func copyFile(srcFile, dstFile string) error { + out, err := os.Create(dstFile) + defer out.Close() + if err != nil { + return err + } + + in, err := os.Open(srcFile) + defer in.Close() + if err != nil { + return err + } + + _, err = io.Copy(out, in) + if err != nil { + return err + } + + return nil +} + +// This function is copied and modified from this stackoverflow answer: https://stackoverflow.com/a/56314145/2413761 +func createIfNotExists(dir string, perm os.FileMode) error { + if blockdag.FileExists(dir) { + return nil + } + + if err := os.MkdirAll(dir, perm); err != nil { + return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error()) + } + + return nil +} + +// This function is copied and modified from this stackoverflow answer: https://stackoverflow.com/a/56314145/2413761 +func copySymLink(source, dest string) error { + link, err := os.Readlink(source) + if err != nil { + return err + } + return os.Symlink(link, dest) +} diff --git a/blockdag/indexers/addrindex.go b/blockdag/indexers/addrindex.go index 5a6768cf0..8ad2d0c76 100644 --- a/blockdag/indexers/addrindex.go +++ b/blockdag/indexers/addrindex.go @@ -685,7 +685,7 @@ func (idx *AddrIndex) indexBlock(data writeIndexData, block *util.Block, dag *bl // the transactions in the block involve. // // This is part of the Indexer interface. -func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, +func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *util.Block, blockID uint64, dag *blockdag.BlockDAG, _ blockdag.MultiBlockTxsAcceptanceData, _ blockdag.MultiBlockTxsAcceptanceData) error { // The offset and length of the transactions within the serialized @@ -695,12 +695,6 @@ func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blo return err } - // Get the internal block ID associated with the block. - blockID, err := dbFetchBlockIDByHash(dbTx, block.Hash()) - if err != nil { - return err - } - // Build all of the address to transaction mappings in a local map. addrsToTxns := make(writeIndexData) idx.indexBlock(addrsToTxns, block, dag) @@ -766,7 +760,7 @@ func (idx *AddrIndex) TxRegionsForAddress(dbTx database.Tx, addr util.Address, n // the database transaction. fetchBlockHash := func(id []byte) (*daghash.Hash, error) { // Deserialize and populate the result. - return dbFetchBlockHashBySerializedID(dbTx, id) + return blockdag.DBFetchBlockHashBySerializedID(dbTx, id) } var err error @@ -899,6 +893,15 @@ func (idx *AddrIndex) UnconfirmedTxnsForAddress(addr util.Address) []*util.Tx { return nil } +// Recover is invoked when the indexer wasn't turned on for several blocks +// and the indexer needs to close the gaps. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Recover(dbTx database.Tx, currentBlockID, lastKnownBlockID uint64) error { + return fmt.Errorf("addrindex was turned off for %d blocks and can't be recovered."+ + " To resume working drop the addrindex with --dropaddrindex", lastKnownBlockID-currentBlockID) +} + // NewAddrIndex returns a new instance of an indexer that is used to create a // mapping of all addresses in the blockchain to the respective transactions // that involve them. diff --git a/blockdag/indexers/cfindex.go b/blockdag/indexers/cfindex.go index 85dd1101d..181f82110 100644 --- a/blockdag/indexers/cfindex.go +++ b/blockdag/indexers/cfindex.go @@ -6,6 +6,7 @@ package indexers import ( "errors" + "fmt" "github.com/daglabs/btcd/blockdag" "github.com/daglabs/btcd/dagconfig" @@ -204,7 +205,7 @@ func storeFilter(dbTx database.Tx, block *util.Block, f *gcs.Filter, // ConnectBlock is invoked by the index manager when a new block has been // connected to the main chain. This indexer adds a hash-to-cf mapping for // every passed block. This is part of the Indexer interface. -func (idx *CfIndex) ConnectBlock(dbTx database.Tx, block *util.Block, +func (idx *CfIndex) ConnectBlock(dbTx database.Tx, block *util.Block, _ uint64, _ *blockdag.BlockDAG, _ blockdag.MultiBlockTxsAcceptanceData, _ blockdag.MultiBlockTxsAcceptanceData) error { f, err := builder.BuildBasicFilter(block.MsgBlock()) @@ -340,6 +341,15 @@ func (idx *CfIndex) FilterHashesByBlockHashes(blockHashes []*daghash.Hash, return idx.entriesByBlockHashes(cfHashKeys, filterType, blockHashes) } +// Recover is invoked when the indexer wasn't turned on for several blocks +// and the indexer needs to close the gaps. +// +// This is part of the Indexer interface. +func (idx *CfIndex) Recover(dbTx database.Tx, currentBlockID, lastKnownBlockID uint64) error { + return fmt.Errorf("cfindex was turned off for %d blocks and can't be recovered."+ + " To resume working drop the cfindex with --dropcfindex", lastKnownBlockID-currentBlockID) +} + // NewCfIndex returns a new instance of an indexer that is used to create a // mapping of the hashes of all blocks in the blockchain to their respective // committed filters. diff --git a/blockdag/indexers/common.go b/blockdag/indexers/common.go index 6bbae8cc4..b7ef0d11f 100644 --- a/blockdag/indexers/common.go +++ b/blockdag/indexers/common.go @@ -52,7 +52,16 @@ type Indexer interface { // ConnectBlock is invoked when the index manager is notified that a new // block has been connected to the DAG. - ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, _ blockdag.MultiBlockTxsAcceptanceData, _ blockdag.MultiBlockTxsAcceptanceData) error + ConnectBlock(dbTx database.Tx, + block *util.Block, + blockID uint64, + dag *blockdag.BlockDAG, + acceptedTxsData blockdag.MultiBlockTxsAcceptanceData, + virtualTxsAcceptanceData blockdag.MultiBlockTxsAcceptanceData) error + + // Recover is invoked when the indexer wasn't turned on for several blocks + // and the indexer needs to close the gaps. + Recover(dbTx database.Tx, currentBlockID, lastKnownBlockID uint64) error } // AssertError identifies an error that indicates an internal code consistency diff --git a/blockdag/indexers/manager.go b/blockdag/indexers/manager.go index d511c34fb..29292f7a1 100644 --- a/blockdag/indexers/manager.go +++ b/blockdag/indexers/manager.go @@ -8,12 +8,15 @@ import ( "github.com/daglabs/btcd/blockdag" "github.com/daglabs/btcd/database" "github.com/daglabs/btcd/util" + "github.com/daglabs/btcd/util/daghash" ) var ( // indexTipsBucketName is the name of the db bucket used to house the // current tip of each index. indexTipsBucketName = []byte("idxtips") + + indexCurrentBlockIDBucketName = []byte("idxcurrentblockid") ) // Manager defines an index manager that manages multiple optional indexes and @@ -146,6 +149,9 @@ func (m *Manager) Init(db database.DB, blockDAG *blockdag.BlockDAG, interrupt <- if err != nil { return err } + if _, err := meta.CreateBucketIfNotExists(indexCurrentBlockIDBucketName); err != nil { + return err + } return m.maybeCreateIndexes(dbTx) }) @@ -160,7 +166,30 @@ func (m *Manager) Init(db database.DB, blockDAG *blockdag.BlockDAG, interrupt <- } } - return nil + return m.recoverIfNeeded() +} + +// recoverIfNeeded checks if the node worked for some time +// without one of the current enabled indexes, and if it's +// the case, recovers the missing blocks from the index. +func (m *Manager) recoverIfNeeded() error { + return m.db.Update(func(dbTx database.Tx) error { + lastKnownBlockID := blockdag.DBFetchCurrentBlockID(dbTx) + for _, indexer := range m.enabledIndexes { + serializedCurrentIdxBlockID := dbTx.Metadata().Bucket(indexCurrentBlockIDBucketName).Get(indexer.Key()) + currentIdxBlockID := uint64(0) + if serializedCurrentIdxBlockID != nil { + currentIdxBlockID = blockdag.DeserializeBlockID(serializedCurrentIdxBlockID) + } + if lastKnownBlockID > currentIdxBlockID { + err := indexer.Recover(dbTx, currentIdxBlockID, lastKnownBlockID) + if err != nil { + return err + } + } + } + return nil + }) } // ConnectBlock must be invoked when a block is extending the main chain. It @@ -168,13 +197,32 @@ func (m *Manager) Init(db database.DB, blockDAG *blockdag.BlockDAG, interrupt <- // checks, and invokes each indexer. // // This is part of the blockchain.IndexManager interface. -func (m *Manager) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, +func (m *Manager) ConnectBlock(dbTx database.Tx, block *util.Block, blockID uint64, dag *blockdag.BlockDAG, txsAcceptanceData blockdag.MultiBlockTxsAcceptanceData, virtualTxsAcceptanceData blockdag.MultiBlockTxsAcceptanceData) error { + // Call each of the currently active optional indexes with the block // being connected so they can update accordingly. for _, index := range m.enabledIndexes { // Notify the indexer with the connected block so it can index it. - if err := index.ConnectBlock(dbTx, block, dag, txsAcceptanceData, virtualTxsAcceptanceData); err != nil { + if err := index.ConnectBlock(dbTx, block, blockID, dag, txsAcceptanceData, virtualTxsAcceptanceData); err != nil { + return err + } + } + + // Add the new block ID index entry for the block being connected and + // update the current internal block ID accordingly. + err := m.updateIndexersWithCurrentBlockID(dbTx, block.Hash(), blockID) + if err != nil { + return err + } + return nil +} + +func (m *Manager) updateIndexersWithCurrentBlockID(dbTx database.Tx, blockHash *daghash.Hash, blockID uint64) error { + serializedBlockID := blockdag.SerializeBlockID(blockID) + for _, index := range m.enabledIndexes { + err := dbTx.Metadata().Bucket(indexCurrentBlockIDBucketName).Put(index.Key(), serializedBlockID) + if err != nil { return err } } @@ -317,13 +365,6 @@ func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan s }) } - // Call extra index specific deinitialization for the transaction index. - if idxName == txIndexName { - if err := dropBlockIDIndex(db); err != nil { - return err - } - } - // Remove the index tip, index bucket, and in-progress drop flag now // that all index entries have been removed. err = db.Update(func(dbTx database.Tx) error { @@ -333,6 +374,10 @@ func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan s return err } + if err := meta.Bucket(indexCurrentBlockIDBucketName).Delete(idxKey); err != nil { + return err + } + return indexesBucket.Delete(indexDropKey(idxKey)) }) if err != nil { diff --git a/blockdag/indexers/txindex.go b/blockdag/indexers/txindex.go index 50b9f1527..2b8d0c6df 100644 --- a/blockdag/indexers/txindex.go +++ b/blockdag/indexers/txindex.go @@ -18,22 +18,12 @@ const ( txIndexName = "transaction index" includingBlocksIndexKeyEntrySize = 8 // 4 bytes for offset + 4 bytes for transaction length - - blockIDSize = 8 // 8 bytes for block ID ) var ( includingBlocksIndexKey = []byte("includingblocksidx") acceptingBlocksIndexKey = []byte("acceptingblocksidx") - - // idByHashIndexBucketName is the name of the db bucket used to house - // the block id -> block hash index. - idByHashIndexBucketName = []byte("idbyhashidx") - - // hashByIDIndexBucketName is the name of the db bucket used to house - // the block hash -> block id index. - hashByIDIndexBucketName = []byte("hashbyididx") ) // txsAcceptedByVirtual is the in-memory index of txIDs that were accepted @@ -42,20 +32,12 @@ var txsAcceptedByVirtual map[daghash.TxID]bool // ----------------------------------------------------------------------------- // The transaction index consists of an entry for every transaction in the DAG. -// In order to significantly optimize the space requirements a separate -// index which provides an internal mapping between each block that has been -// indexed and a unique ID for use within the hash to location mappings. The ID -// is simply a sequentially incremented uint64. This is useful because it is -// only 8 bytes versus 32 bytes hashes and thus saves a ton of space in the -// index. // -// There are four buckets used in total. The first bucket maps the hash of +// There are two buckets used in total. The first bucket maps the hash of // each transaction to its location in each block it's included in. The second bucket // contains all of the blocks that from their viewpoint the transaction has been // accepted (i.e. the transaction is found in their blue set without double spends), -// and their blue block (or themselves) that included the transaction. The third -// bucket maps the hash of each block to the unique ID and the fourth maps -// that ID back to the block hash. +// and their blue block (or themselves) that included the transaction. // // NOTE: Although it is technically possible for multiple transactions to have // the same hash as long as the previous transaction with the same hash is fully @@ -86,80 +68,8 @@ var txsAcceptedByVirtual map[daghash.TxID]bool // ----- // Total: 16 bytes // -// The serialized format for keys and values in the block hash to ID bucket is: -// = -// -// Field Type Size -// hash daghash.Hash 32 bytes -// ID uint64 8 bytes -// ----- -// Total: 40 bytes -// -// The serialized format for keys and values in the ID to block hash bucket is: -// = -// -// Field Type Size -// ID uint64 8 bytes -// hash daghash.Hash 32 bytes -// ----- -// Total: 40 bytes -// // ----------------------------------------------------------------------------- -// dbPutBlockIDIndexEntry uses an existing database transaction to update or add -// the index entries for the hash to id and id to hash mappings for the provided -// values. -func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *daghash.Hash, id uint64) error { - // Serialize the block ID for use in the index entries. - var serializedID [blockIDSize]byte - byteOrder.PutUint64(serializedID[:], id) - - // Add the block hash to ID mapping to the index. - meta := dbTx.Metadata() - hashIndex := meta.Bucket(idByHashIndexBucketName) - if err := hashIndex.Put(hash[:], serializedID[:]); err != nil { - return err - } - - // Add the block ID to hash mapping to the index. - idIndex := meta.Bucket(hashByIDIndexBucketName) - return idIndex.Put(serializedID[:], hash[:]) -} - -// dbFetchBlockIDByHash uses an existing database transaction to retrieve the -// block id for the provided hash from the index. -func dbFetchBlockIDByHash(dbTx database.Tx, hash *daghash.Hash) (uint64, error) { - hashIndex := dbTx.Metadata().Bucket(idByHashIndexBucketName) - serializedID := hashIndex.Get(hash[:]) - if serializedID == nil { - return 0, fmt.Errorf("no entry in the block ID index for block with hash %s", hash) - } - - return byteOrder.Uint64(serializedID), nil -} - -// dbFetchBlockHashBySerializedID uses an existing database transaction to -// retrieve the hash for the provided serialized block id from the index. -func dbFetchBlockHashBySerializedID(dbTx database.Tx, serializedID []byte) (*daghash.Hash, error) { - idIndex := dbTx.Metadata().Bucket(hashByIDIndexBucketName) - hashBytes := idIndex.Get(serializedID) - if hashBytes == nil { - return nil, fmt.Errorf("no entry in the block ID index for block with id %d", byteOrder.Uint64(serializedID)) - } - - var hash daghash.Hash - copy(hash[:], hashBytes) - return &hash, nil -} - -// dbFetchBlockHashByID uses an existing database transaction to retrieve the -// hash for the provided block id from the index. -func dbFetchBlockHashByID(dbTx database.Tx, id uint64) (*daghash.Hash, error) { - var serializedID [blockIDSize]byte - byteOrder.PutUint64(serializedID[:], id) - return dbFetchBlockHashBySerializedID(dbTx, serializedID[:]) -} - func putIncludingBlocksEntry(target []byte, txLoc wire.TxLoc) { byteOrder.PutUint32(target, uint32(txLoc.TxStart)) byteOrder.PutUint32(target[4:], uint32(txLoc.TxLen)) @@ -170,9 +80,7 @@ func dbPutIncludingBlocksEntry(dbTx database.Tx, txID *daghash.TxID, blockID uin if err != nil { return err } - blockIDBytes := make([]byte, blockIDSize) - byteOrder.PutUint64(blockIDBytes, blockID) - return bucket.Put(blockIDBytes, serializedData) + return bucket.Put(blockdag.SerializeBlockID(blockID), serializedData) } func dbPutAcceptingBlocksEntry(dbTx database.Tx, txID *daghash.TxID, blockID uint64, serializedData []byte) error { @@ -180,9 +88,7 @@ func dbPutAcceptingBlocksEntry(dbTx database.Tx, txID *daghash.TxID, blockID uin if err != nil { return err } - blockIDBytes := make([]byte, blockIDSize) - byteOrder.PutUint64(blockIDBytes, blockID) - return bucket.Put(blockIDBytes, serializedData) + return bucket.Put(blockdag.SerializeBlockID(blockID), serializedData) } // dbFetchFirstTxRegion uses an existing database transaction to fetch the block @@ -210,7 +116,7 @@ func dbFetchFirstTxRegion(dbTx database.Tx, txID *daghash.TxID) (*database.Block "was found for %s", txID), } } - blockIDBytes := cursor.Key() + serializedBlockID := cursor.Key() serializedData := cursor.Value() if len(serializedData) == 0 { return nil, nil @@ -226,7 +132,7 @@ func dbFetchFirstTxRegion(dbTx database.Tx, txID *daghash.TxID) (*database.Block } // Load the block hash associated with the block ID. - hash, err := dbFetchBlockHashBySerializedID(dbTx, blockIDBytes) + hash, err := blockdag.DBFetchBlockHashBySerializedID(dbTx, serializedBlockID) if err != nil { return nil, database.Error{ ErrorCode: database.ErrCorruption, @@ -277,17 +183,16 @@ func dbAddTxIndexEntries(dbTx database.Tx, block *util.Block, blockID uint64, tx if includingBlockHash.IsEqual(block.Hash()) { includingBlockID = blockID } else { - includingBlockID, err = dbFetchBlockIDByHash(dbTx, &includingBlockHash) + includingBlockID, err = blockdag.DBFetchBlockIDByHash(dbTx, &includingBlockHash) if err != nil { return err } } - includingBlockIDBytes := make([]byte, blockIDSize) - byteOrder.PutUint64(includingBlockIDBytes, includingBlockID) + serializedIncludingBlockID := blockdag.SerializeBlockID(includingBlockID) for _, txAcceptanceData := range blockTxsAcceptanceData { - err = dbPutAcceptingBlocksEntry(dbTx, txAcceptanceData.Tx.ID(), blockID, includingBlockIDBytes) + err = dbPutAcceptingBlocksEntry(dbTx, txAcceptanceData.Tx.ID(), blockID, serializedIncludingBlockID) if err != nil { return err } @@ -318,8 +223,7 @@ func updateTxsAcceptedByVirtual(virtualTxsAcceptanceData blockdag.MultiBlockTxsA // TxIndex implements a transaction by hash index. That is to say, it supports // querying all transactions by their hash. type TxIndex struct { - db database.DB - curBlockID uint64 + db database.DB } // Ensure the TxIndex type implements the Indexer interface. @@ -333,59 +237,6 @@ var _ Indexer = (*TxIndex)(nil) func (idx *TxIndex) Init(db database.DB, dag *blockdag.BlockDAG) error { idx.db = db - // Find the latest known block id field for the internal block id - // index and initialize it. This is done because it's a lot more - // efficient to do a single search at initialize time than it is to - // write another value to the database on every update. - err := idx.db.View(func(dbTx database.Tx) error { - // Scan forward in large gaps to find a block id that doesn't - // exist yet to serve as an upper bound for the binary search - // below. - var highestKnown, nextUnknown uint64 - testBlockID := uint64(1) - increment := uint64(100000) - for { - _, err := dbFetchBlockHashByID(dbTx, testBlockID) - if err != nil { - nextUnknown = testBlockID - break - } - - highestKnown = testBlockID - testBlockID += increment - } - log.Tracef("Forward scan (highest known %d, next unknown %d)", - highestKnown, nextUnknown) - - // No used block IDs due to new database. - if nextUnknown == 1 { - return nil - } - - // Use a binary search to find the final highest used block id. - // This will take at most ceil(log_2(increment)) attempts. - for { - testBlockID = (highestKnown + nextUnknown) / 2 - _, err := dbFetchBlockHashByID(dbTx, testBlockID) - if err != nil { - nextUnknown = testBlockID - } else { - highestKnown = testBlockID - } - log.Tracef("Binary scan (highest known %d, next "+ - "unknown %d)", highestKnown, nextUnknown) - if highestKnown+1 == nextUnknown { - break - } - } - - idx.curBlockID = highestKnown - return nil - }) - if err != nil { - return err - } - // Initialize the txsAcceptedByVirtual index virtualTxsAcceptanceData, err := dag.TxsAcceptedByVirtual() if err != nil { @@ -395,8 +246,6 @@ func (idx *TxIndex) Init(db database.DB, dag *blockdag.BlockDAG) error { if err != nil { return err } - - log.Debugf("Current internal block ID: %d", idx.curBlockID) return nil } @@ -421,12 +270,6 @@ func (idx *TxIndex) Name() string { // This is part of the Indexer interface. func (idx *TxIndex) Create(dbTx database.Tx) error { meta := dbTx.Metadata() - if _, err := meta.CreateBucket(idByHashIndexBucketName); err != nil { - return err - } - if _, err := meta.CreateBucket(hashByIDIndexBucketName); err != nil { - return err - } if _, err := meta.CreateBucket(includingBlocksIndexKey); err != nil { return err } @@ -440,15 +283,9 @@ func (idx *TxIndex) Create(dbTx database.Tx) error { // for every transaction in the passed block. // // This is part of the Indexer interface. -func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, dag *blockdag.BlockDAG, +func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, blockID uint64, dag *blockdag.BlockDAG, acceptedTxsData blockdag.MultiBlockTxsAcceptanceData, virtualTxsAcceptanceData blockdag.MultiBlockTxsAcceptanceData) error { - // Increment the internal block ID to use for the block being connected - // and add all of the transactions in the block to the index. - newBlockID := idx.curBlockID + 1 - if block.MsgBlock().Header.IsGenesis() { - newBlockID = 0 - } - if err := dbAddTxIndexEntries(dbTx, block, newBlockID, acceptedTxsData); err != nil { + if err := dbAddTxIndexEntries(dbTx, block, blockID, acceptedTxsData); err != nil { return err } @@ -456,14 +293,6 @@ func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, dag *block if err != nil { return err } - - // Add the new block ID index entry for the block being connected and - // update the current internal block ID accordingly. - err = dbPutBlockIDIndexEntry(dbTx, block.Hash(), newBlockID) - if err != nil { - return err - } - idx.curBlockID = newBlockID return nil } @@ -507,9 +336,8 @@ func dbFetchTxBlocks(dbTx database.Tx, txHash *daghash.Hash) ([]*daghash.Hash, e "were found for %s", txHash), } } - err := bucket.ForEach(func(blockIDBytes, _ []byte) error { - blockID := byteOrder.Uint64(blockIDBytes) - blockHash, err := dbFetchBlockHashByID(dbTx, blockID) + err := bucket.ForEach(func(serializedBlockID, _ []byte) error { + blockHash, err := blockdag.DBFetchBlockHashBySerializedID(dbTx, serializedBlockID) if err != nil { return err } @@ -557,8 +385,7 @@ func dbFetchTxAcceptingBlock(dbTx database.Tx, txID *daghash.TxID, dag *blockdag } } for ; cursor.Key() != nil; cursor.Next() { - blockID := byteOrder.Uint64(cursor.Key()) - blockHash, err := dbFetchBlockHashByID(dbTx, blockID) + blockHash, err := blockdag.DBFetchBlockHashBySerializedID(dbTx, cursor.Key()) if err != nil { return nil, err } @@ -580,19 +407,6 @@ func NewTxIndex() *TxIndex { return &TxIndex{} } -// dropBlockIDIndex drops the internal block id index. -func dropBlockIDIndex(db database.DB) error { - return db.Update(func(dbTx database.Tx) error { - meta := dbTx.Metadata() - err := meta.DeleteBucket(idByHashIndexBucketName) - if err != nil { - return err - } - - return meta.DeleteBucket(hashByIDIndexBucketName) - }) -} - // DropTxIndex drops the transaction index from the provided database if it // exists. Since the address index relies on it, the address index will also be // dropped when it exists. @@ -609,3 +423,12 @@ func DropTxIndex(db database.DB, interrupt <-chan struct{}) error { return dropIndex(db, acceptingBlocksIndexKey, txIndexName, interrupt) } + +// Recover is invoked when the indexer wasn't turned on for several blocks +// and the indexer needs to close the gaps. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Recover(dbTx database.Tx, currentBlockID, lastKnownBlockID uint64) error { + return fmt.Errorf("txindex was turned off for %d blocks and can't be recovered."+ + " To resume working drop the txindex with --droptxindex", lastKnownBlockID-currentBlockID) +} diff --git a/blockdag/notifications_test.go b/blockdag/notifications_test.go index c69162fc9..913be59af 100644 --- a/blockdag/notifications_test.go +++ b/blockdag/notifications_test.go @@ -5,6 +5,7 @@ package blockdag import ( + "path/filepath" "testing" "github.com/daglabs/btcd/dagconfig" @@ -12,7 +13,7 @@ import ( // TestNotifications ensures that notification callbacks are fired on events. func TestNotifications(t *testing.T) { - blocks, err := loadBlocks("blk_0_to_4.dat") + blocks, err := LoadBlocks(filepath.Join("testdata/blk_0_to_4.dat")) if err != nil { t.Fatalf("Error loading file: %v\n", err) } diff --git a/blockdag/scriptval_test.go b/blockdag/scriptval_test.go index c11f7ea25..fbd2186eb 100644 --- a/blockdag/scriptval_test.go +++ b/blockdag/scriptval_test.go @@ -6,6 +6,7 @@ package blockdag import ( "fmt" + "path/filepath" "runtime" "testing" @@ -20,7 +21,7 @@ func TestCheckBlockScripts(t *testing.T) { testBlockNum := 277647 blockDataFile := fmt.Sprintf("%d.dat", testBlockNum) - blocks, err := loadBlocks(blockDataFile) + blocks, err := LoadBlocks(filepath.Join("testdata/", blockDataFile)) if err != nil { t.Errorf("Error loading file: %v\n", err) return diff --git a/blockdag/test_utils.go b/blockdag/test_utils.go index 3ebedcdee..022681b5e 100644 --- a/blockdag/test_utils.go +++ b/blockdag/test_utils.go @@ -3,9 +3,14 @@ package blockdag // This file functions are not considered safe for regular use, and should be used for test purposes only. import ( + "compress/bzip2" + "encoding/binary" "fmt" + "github.com/daglabs/btcd/util" + "io" "os" "path/filepath" + "strings" "sync" "github.com/daglabs/btcd/util/subnetworkid" @@ -41,8 +46,8 @@ func isSupportedDbType(dbType string) bool { return false } -// filesExists returns whether or not the named file or directory exists. -func fileExists(name string) bool { +// FileExists returns whether or not the named file or directory exists. +func FileExists(name string) bool { if _, err := os.Stat(name); err != nil { if os.IsNotExist(err) { return false @@ -75,7 +80,7 @@ func DAGSetup(dbName string, config Config) (*BlockDAG, func(), error) { if config.DB == nil { // Create the root directory for test databases. - if !fileExists(testDbRoot) { + if !FileExists(testDbRoot) { if err := os.MkdirAll(testDbRoot, 0700); err != nil { err := fmt.Errorf("unable to create test db "+ "root: %s", err) @@ -100,6 +105,12 @@ func DAGSetup(dbName string, config Config) (*BlockDAG, func(), error) { os.RemoveAll(dbPath) os.RemoveAll(testDbRoot) } + } else { + teardown = func() { + spawnWaitGroup.Wait() + spawn = realSpawn + config.DB.Close() + } } config.TimeSource = NewMedianTime() @@ -193,3 +204,59 @@ func GetVirtualFromParentsForTest(dag *BlockDAG, parentHashes []*daghash.Hash) ( return VirtualForTest(virtual), nil } + +// LoadBlocks reads files containing bitcoin block data (gzipped but otherwise +// in the format bitcoind writes) from disk and returns them as an array of +// util.Block. This is largely borrowed from the test code in btcdb. +func LoadBlocks(filename string) (blocks []*util.Block, err error) { + var network = wire.MainNet + var dr io.Reader + var fi io.ReadCloser + + fi, err = os.Open(filename) + if err != nil { + return + } + + if strings.HasSuffix(filename, ".bz2") { + dr = bzip2.NewReader(fi) + } else { + dr = fi + } + defer fi.Close() + + var block *util.Block + + err = nil + for height := uint64(0); err == nil; height++ { + var rintbuf uint32 + err = binary.Read(dr, binary.LittleEndian, &rintbuf) + if err == io.EOF { + // hit end of file at expected offset: no warning + height-- + err = nil + break + } + if err != nil { + break + } + if rintbuf != uint32(network) { + break + } + err = binary.Read(dr, binary.LittleEndian, &rintbuf) + blocklen := rintbuf + + rbytes := make([]byte, blocklen) + + // read block + dr.Read(rbytes) + + block, err = util.NewBlockFromBytes(rbytes) + if err != nil { + return + } + blocks = append(blocks, block) + } + + return +} diff --git a/blockdag/validate_test.go b/blockdag/validate_test.go index 4af356161..758db62ff 100644 --- a/blockdag/validate_test.go +++ b/blockdag/validate_test.go @@ -8,6 +8,7 @@ import ( "bou.ke/monkey" "errors" "math" + "path/filepath" "testing" "time" @@ -92,7 +93,7 @@ func TestCheckConnectBlockTemplate(t *testing.T) { var blocks []*util.Block for _, file := range testFiles { - blockTmp, err := loadBlocks(file) + blockTmp, err := LoadBlocks(filepath.Join("testdata/", file)) if err != nil { t.Fatalf("Error loading file: %v\n", err) } diff --git a/database/ffldb/db.go b/database/ffldb/db.go index 97c55ed28..057128bf6 100644 --- a/database/ffldb/db.go +++ b/database/ffldb/db.go @@ -1995,7 +1995,6 @@ func (db *db) Close() error { } // FlushCache flushes the db cache to the disk. -// TODO: (Ori) This is a temporary function for dev use. It needs to be removed. func (db *db) FlushCache() error { return db.cache.flush() } diff --git a/database/interface.go b/database/interface.go index 4df744367..c37bc311b 100644 --- a/database/interface.go +++ b/database/interface.go @@ -465,6 +465,5 @@ type DB interface { Close() error // FlushCache flushes the db cache to the disk. - // TODO: (Ori) This is a temporary function for dev use. It needs to be removed. FlushCache() error } diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go index dd4ce28a0..97411ae04 100644 --- a/server/p2p/p2p.go +++ b/server/p2p/p2p.go @@ -1638,7 +1638,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params } if config.MainConfig().AcceptanceIndex { indxLog.Info("acceptance index is enabled") - s.AcceptanceIndex = indexers.NewAcceptanceIndex(dagParams) + s.AcceptanceIndex = indexers.NewAcceptanceIndex() indexes = append(indexes, s.AcceptanceIndex) } if config.MainConfig().EnableCFilters {