From b07a1184317e1def37fb967a5441415721d85274 Mon Sep 17 00:00:00 2001 From: stasatdaglabs <39559713+stasatdaglabs@users.noreply.github.com> Date: Wed, 28 Aug 2019 14:52:57 +0300 Subject: [PATCH] [NOD-292] When writing block to database - also create record in block index (#376) * [NOD-292] In accept.go, made dbStoreBlock and flushToDB occur within the same transaction. * [NOD-292] Implemented processing blocks that were not validated on BTCD start. * [NOD-292] Fixed processing logic on init. Added a test for it. * [NOD-292] Fixed some comments. * [NOD-292] Made unlocks deferred in a couple of places. * [NOD-292] Made unprocessed block reprocess via ProcessBlock rather than maybeAcceptBlock. * [NOD-292] Fixed grammar in comment. Added an explanation to TestAcceptingInInit. * [NOD-292] Split flushToDB into two versions. * [NOD-292] Fixed a bad assignment. * [NOD-292] Fixed bad spacing. --- blockdag/accept.go | 22 ++++++------ blockdag/blockindex.go | 31 +++++++++-------- blockdag/dag_test.go | 79 +++++++++++++++++++++++++++++++++++++++++- blockdag/dagio.go | 45 ++++++++++++++++++++++++ blockdag/process.go | 7 +++- 5 files changed, 155 insertions(+), 29 deletions(-) diff --git a/blockdag/accept.go b/blockdag/accept.go index 20c9920ac..a53fbdf86 100644 --- a/blockdag/accept.go +++ b/blockdag/accept.go @@ -35,6 +35,11 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er return err } + // Create a new block node for the block and add it to the node index. + newNode := newBlockNode(&block.MsgBlock().Header, parents, dag.dagParams.K) + newNode.status = statusDataStored + dag.index.AddNode(newNode) + // Insert the block into the database if it's not already there. Even // though it is possible the block will ultimately fail to connect, it // has already passed all proof-of-work and validity tests which means @@ -45,17 +50,16 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er // such as making blocks that never become part of the DAG or // blocks that fail to connect available for further analysis. err = dag.db.Update(func(dbTx database.Tx) error { - return dbStoreBlock(dbTx, block) + err := dbStoreBlock(dbTx, block) + if err != nil { + return err + } + return dag.index.flushToDBWithTx(dbTx) }) if err != nil { return err } - // Create a new block node for the block and add it to the node index. - blockHeader := &block.MsgBlock().Header - newNode := newBlockNode(blockHeader, parents, dag.dagParams.K) - newNode.status = statusDataStored - // Make sure that all the block's transactions are finalized fastAdd := flags&BFFastAdd == BFFastAdd bluestParent := parents.bluest() @@ -65,12 +69,6 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er } } - dag.index.AddNode(newNode) - err = dag.index.flushToDB() - if err != nil { - return err - } - block.SetChainHeight(newNode.chainHeight) // Connect the passed block to the DAG. This also handles validation of the diff --git a/blockdag/blockindex.go b/blockdag/blockindex.go index 23229493c..d39289911 100644 --- a/blockdag/blockindex.go +++ b/blockdag/blockindex.go @@ -117,27 +117,28 @@ func (bi *blockIndex) UnsetStatusFlags(node *blockNode, flags blockStatus) { // flushToDB writes all dirty block nodes to the database. If all writes // succeed, this clears the dirty set. func (bi *blockIndex) flushToDB() error { + return bi.db.Update(func(dbTx database.Tx) error { + return bi.flushToDBWithTx(dbTx) + }) +} + +// flushToDBWithTx writes all dirty block nodes to the database. If all +// writes succeed, this clears the dirty set. +func (bi *blockIndex) flushToDBWithTx(dbTx database.Tx) error { bi.Lock() + defer bi.Unlock() if len(bi.dirty) == 0 { - bi.Unlock() return nil } - err := bi.db.Update(func(dbTx database.Tx) error { - for node := range bi.dirty { - err := dbStoreBlockNode(dbTx, node) - if err != nil { - return err - } + for node := range bi.dirty { + err := dbStoreBlockNode(dbTx, node) + if err != nil { + return err } - return nil - }) + } // If write was successful, clear the dirty set. - if err == nil { - bi.dirty = make(map[*blockNode]struct{}) - } - - bi.Unlock() - return err + bi.dirty = make(map[*blockNode]struct{}) + return nil } diff --git a/blockdag/dag_test.go b/blockdag/dag_test.go index e23823d20..6367827b0 100644 --- a/blockdag/dag_test.go +++ b/blockdag/dag_test.go @@ -895,11 +895,88 @@ func TestNew(t *testing.T) { } } +// TestAcceptingInInit makes sure that blocks that were stored but not +// yet fully processed do get correctly processed on DAG init. This may +// occur when the node shuts down improperly while a block is being +// validated. +func TestAcceptingInInit(t *testing.T) { + // Create the root directory for test databases. + if !fileExists(testDbRoot) { + if err := os.MkdirAll(testDbRoot, 0700); err != nil { + t.Fatalf("unable to create test db "+ + "root: %s", err) + } + } + + // Create a test database + dbPath := filepath.Join(testDbRoot, "TestAcceptingInInit") + _ = os.RemoveAll(dbPath) + db, err := database.Create(testDbType, dbPath, blockDataNet) + if err != nil { + t.Fatalf("error creating db: %s", err) + } + defer func() { + db.Close() + os.RemoveAll(dbPath) + os.RemoveAll(testDbRoot) + }() + + // Create a DAG to add the test block into + config := &Config{ + DAGParams: &dagconfig.SimNetParams, + DB: db, + TimeSource: NewMedianTime(), + SigCache: txscript.NewSigCache(1000), + } + dag, err := New(config) + if err != nil { + t.Fatalf("failed to create dag instance: %s", err) + } + + // Load the test block + blocks, err := loadBlocks("blk_0_to_4.dat") + if err != nil { + t.Fatalf("Error loading file: %v\n", err) + } + genesisBlock := blocks[0] + testBlock := blocks[1] + + // Create a test blockNode with an unvalidated status + genesisNode := dag.index.LookupNode(genesisBlock.Hash()) + testNode := newBlockNode(&testBlock.MsgBlock().Header, setFromSlice(genesisNode), dag.dagParams.K) + testNode.status = statusDataStored + + // Manually add the test block to the database + err = db.Update(func(dbTx database.Tx) error { + err := dbStoreBlock(dbTx, testBlock) + if err != nil { + return err + } + return dbStoreBlockNode(dbTx, testNode) + }) + if err != nil { + t.Fatalf("Failed to update block index: %s", err) + } + + // Create a new DAG. We expect this DAG to process the + // test node + dag, err = New(config) + if err != nil { + t.Fatalf("failed to create dag instance: %s", err) + } + + // Make sure that the test node's status is valid + testNode = dag.index.LookupNode(testBlock.Hash()) + if testNode.status&statusValid == 0 { + t.Fatalf("testNode is unexpectedly invalid") + } +} + func TestConfirmations(t *testing.T) { // Create a new database and DAG instance to run tests against. params := dagconfig.SimNetParams params.K = 1 - dag, teardownFunc, err := DAGSetup("TestBlockCount", Config{ + dag, teardownFunc, err := DAGSetup("TestConfirmations", Config{ DAGParams: ¶ms, }) if err != nil { diff --git a/blockdag/dagio.go b/blockdag/dagio.go index 3f6b7caf5..3eda2ef33 100644 --- a/blockdag/dagio.go +++ b/blockdag/dagio.go @@ -453,6 +453,7 @@ func (dag *BlockDAG) initDAGState() error { var i int32 var lastNode *blockNode + var unprocessedBlockNodes []*blockNode cursor := blockIndexBucket.Cursor() for ok := cursor.First(); ok; ok = cursor.Next() { node, err := dag.deserializeBlockNode(cursor.Value()) @@ -460,6 +461,13 @@ func (dag *BlockDAG) initDAGState() error { return err } + // Check to see if this node had been stored in the the block DB + // but not yet accepted. If so, add it to a slice to be processed later. + if node.status == statusDataStored { + unprocessedBlockNodes = append(unprocessedBlockNodes, node) + continue + } + if lastNode == nil { if !node.hash.IsEqual(dag.dagParams.GenesisHash) { return AssertError(fmt.Sprintf("initDAGState: Expected "+ @@ -559,6 +567,43 @@ func (dag *BlockDAG) initDAGState() error { dag.lastFinalityPoint = dag.index.LookupNode(state.LastFinalityPoint) dag.finalizeNodesBelowFinalityPoint(false) + // Go over any unprocessed blockNodes and process them now. + for _, node := range unprocessedBlockNodes { + // Check to see if the block exists in the block DB. If it + // doesn't, the database has certainly been corrupted. + blockExists, err := dbTx.HasBlock(node.hash) + if err != nil { + return AssertError(fmt.Sprintf("initDAGState: HasBlock "+ + "for block %s failed: %s", node.hash, err)) + } + if !blockExists { + return AssertError(fmt.Sprintf("initDAGState: block %s "+ + "exists in block index but not in block db", node.hash)) + } + + // Attempt to accept the block. + block, err := dbFetchBlockByNode(dbTx, node) + isOrphan, delay, err := dag.ProcessBlock(block, BFWasStored) + if err != nil { + log.Warnf("Block %s, which was not previously processed, "+ + "failed to be accepted to the DAG: %s", node.hash, err) + continue + } + + // If the block is an orphan or is delayed then it couldn't have + // possibly been written to the block index in the first place. + if isOrphan { + return AssertError(fmt.Sprintf("Block %s, which was not "+ + "previously processed, turned out to be an orphan, which is "+ + "impossible.", node.hash)) + } + if delay != 0 { + return AssertError(fmt.Sprintf("Block %s, which was not "+ + "previously processed, turned out to be delayed, which is "+ + "impossible.", node.hash)) + } + } + return nil }) } diff --git a/blockdag/process.go b/blockdag/process.go index 1e7a24bfc..44adc229e 100644 --- a/blockdag/process.go +++ b/blockdag/process.go @@ -40,6 +40,10 @@ const ( // netsync process BFIsSync + // BFWasStored is set to indicate that the block was previously stored + // in the block index but was never fully processed + BFWasStored + // BFNone is a convenience value to specifically indicate no flags. BFNone BehaviorFlags = 0 ) @@ -134,12 +138,13 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrp defer dag.dagLock.Unlock() isDelayedBlock := flags&BFAfterDelay == BFAfterDelay + wasBlockStored := flags&BFWasStored == BFWasStored blockHash := block.Hash() log.Tracef("Processing block %s", blockHash) // The block must not already exist in the DAG. - if dag.BlockExists(blockHash) { + if dag.BlockExists(blockHash) && !wasBlockStored { str := fmt.Sprintf("already have block %s", blockHash) return false, 0, ruleError(ErrDuplicateBlock, str) }