[NOD-292] When writing block to database - also create record in block index (#376)

* [NOD-292] In accept.go, made dbStoreBlock and flushToDB occur within the same transaction.

* [NOD-292] Implemented processing blocks that were not validated on BTCD start.

* [NOD-292] Fixed processing logic on init. Added a test for it.

* [NOD-292] Fixed some comments.

* [NOD-292] Made unlocks deferred in a couple of places.

* [NOD-292] Made unprocessed block reprocess via ProcessBlock rather than maybeAcceptBlock.

* [NOD-292] Fixed grammar in comment. Added an explanation to TestAcceptingInInit.

* [NOD-292] Split flushToDB into two versions.

* [NOD-292] Fixed a bad assignment.

* [NOD-292] Fixed bad spacing.
This commit is contained in:
stasatdaglabs 2019-08-28 14:52:57 +03:00 committed by Svarog
parent 0ae06cd277
commit b07a118431
5 changed files with 155 additions and 29 deletions

View File

@ -35,6 +35,11 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er
return err
}
// Create a new block node for the block and add it to the node index.
newNode := newBlockNode(&block.MsgBlock().Header, parents, dag.dagParams.K)
newNode.status = statusDataStored
dag.index.AddNode(newNode)
// Insert the block into the database if it's not already there. Even
// though it is possible the block will ultimately fail to connect, it
// has already passed all proof-of-work and validity tests which means
@ -45,17 +50,16 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er
// such as making blocks that never become part of the DAG or
// blocks that fail to connect available for further analysis.
err = dag.db.Update(func(dbTx database.Tx) error {
return dbStoreBlock(dbTx, block)
err := dbStoreBlock(dbTx, block)
if err != nil {
return err
}
return dag.index.flushToDBWithTx(dbTx)
})
if err != nil {
return err
}
// Create a new block node for the block and add it to the node index.
blockHeader := &block.MsgBlock().Header
newNode := newBlockNode(blockHeader, parents, dag.dagParams.K)
newNode.status = statusDataStored
// Make sure that all the block's transactions are finalized
fastAdd := flags&BFFastAdd == BFFastAdd
bluestParent := parents.bluest()
@ -65,12 +69,6 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er
}
}
dag.index.AddNode(newNode)
err = dag.index.flushToDB()
if err != nil {
return err
}
block.SetChainHeight(newNode.chainHeight)
// Connect the passed block to the DAG. This also handles validation of the

View File

@ -117,27 +117,28 @@ func (bi *blockIndex) UnsetStatusFlags(node *blockNode, flags blockStatus) {
// flushToDB writes all dirty block nodes to the database. If all writes
// succeed, this clears the dirty set.
func (bi *blockIndex) flushToDB() error {
return bi.db.Update(func(dbTx database.Tx) error {
return bi.flushToDBWithTx(dbTx)
})
}
// flushToDBWithTx writes all dirty block nodes to the database. If all
// writes succeed, this clears the dirty set.
func (bi *blockIndex) flushToDBWithTx(dbTx database.Tx) error {
bi.Lock()
defer bi.Unlock()
if len(bi.dirty) == 0 {
bi.Unlock()
return nil
}
err := bi.db.Update(func(dbTx database.Tx) error {
for node := range bi.dirty {
err := dbStoreBlockNode(dbTx, node)
if err != nil {
return err
}
for node := range bi.dirty {
err := dbStoreBlockNode(dbTx, node)
if err != nil {
return err
}
return nil
})
}
// If write was successful, clear the dirty set.
if err == nil {
bi.dirty = make(map[*blockNode]struct{})
}
bi.Unlock()
return err
bi.dirty = make(map[*blockNode]struct{})
return nil
}

View File

@ -895,11 +895,88 @@ func TestNew(t *testing.T) {
}
}
// TestAcceptingInInit makes sure that blocks that were stored but not
// yet fully processed do get correctly processed on DAG init. This may
// occur when the node shuts down improperly while a block is being
// validated.
func TestAcceptingInInit(t *testing.T) {
// Create the root directory for test databases.
if !fileExists(testDbRoot) {
if err := os.MkdirAll(testDbRoot, 0700); err != nil {
t.Fatalf("unable to create test db "+
"root: %s", err)
}
}
// Create a test database
dbPath := filepath.Join(testDbRoot, "TestAcceptingInInit")
_ = os.RemoveAll(dbPath)
db, err := database.Create(testDbType, dbPath, blockDataNet)
if err != nil {
t.Fatalf("error creating db: %s", err)
}
defer func() {
db.Close()
os.RemoveAll(dbPath)
os.RemoveAll(testDbRoot)
}()
// Create a DAG to add the test block into
config := &Config{
DAGParams: &dagconfig.SimNetParams,
DB: db,
TimeSource: NewMedianTime(),
SigCache: txscript.NewSigCache(1000),
}
dag, err := New(config)
if err != nil {
t.Fatalf("failed to create dag instance: %s", err)
}
// Load the test block
blocks, err := loadBlocks("blk_0_to_4.dat")
if err != nil {
t.Fatalf("Error loading file: %v\n", err)
}
genesisBlock := blocks[0]
testBlock := blocks[1]
// Create a test blockNode with an unvalidated status
genesisNode := dag.index.LookupNode(genesisBlock.Hash())
testNode := newBlockNode(&testBlock.MsgBlock().Header, setFromSlice(genesisNode), dag.dagParams.K)
testNode.status = statusDataStored
// Manually add the test block to the database
err = db.Update(func(dbTx database.Tx) error {
err := dbStoreBlock(dbTx, testBlock)
if err != nil {
return err
}
return dbStoreBlockNode(dbTx, testNode)
})
if err != nil {
t.Fatalf("Failed to update block index: %s", err)
}
// Create a new DAG. We expect this DAG to process the
// test node
dag, err = New(config)
if err != nil {
t.Fatalf("failed to create dag instance: %s", err)
}
// Make sure that the test node's status is valid
testNode = dag.index.LookupNode(testBlock.Hash())
if testNode.status&statusValid == 0 {
t.Fatalf("testNode is unexpectedly invalid")
}
}
func TestConfirmations(t *testing.T) {
// Create a new database and DAG instance to run tests against.
params := dagconfig.SimNetParams
params.K = 1
dag, teardownFunc, err := DAGSetup("TestBlockCount", Config{
dag, teardownFunc, err := DAGSetup("TestConfirmations", Config{
DAGParams: &params,
})
if err != nil {

View File

@ -453,6 +453,7 @@ func (dag *BlockDAG) initDAGState() error {
var i int32
var lastNode *blockNode
var unprocessedBlockNodes []*blockNode
cursor := blockIndexBucket.Cursor()
for ok := cursor.First(); ok; ok = cursor.Next() {
node, err := dag.deserializeBlockNode(cursor.Value())
@ -460,6 +461,13 @@ func (dag *BlockDAG) initDAGState() error {
return err
}
// Check to see if this node had been stored in the the block DB
// but not yet accepted. If so, add it to a slice to be processed later.
if node.status == statusDataStored {
unprocessedBlockNodes = append(unprocessedBlockNodes, node)
continue
}
if lastNode == nil {
if !node.hash.IsEqual(dag.dagParams.GenesisHash) {
return AssertError(fmt.Sprintf("initDAGState: Expected "+
@ -559,6 +567,43 @@ func (dag *BlockDAG) initDAGState() error {
dag.lastFinalityPoint = dag.index.LookupNode(state.LastFinalityPoint)
dag.finalizeNodesBelowFinalityPoint(false)
// Go over any unprocessed blockNodes and process them now.
for _, node := range unprocessedBlockNodes {
// Check to see if the block exists in the block DB. If it
// doesn't, the database has certainly been corrupted.
blockExists, err := dbTx.HasBlock(node.hash)
if err != nil {
return AssertError(fmt.Sprintf("initDAGState: HasBlock "+
"for block %s failed: %s", node.hash, err))
}
if !blockExists {
return AssertError(fmt.Sprintf("initDAGState: block %s "+
"exists in block index but not in block db", node.hash))
}
// Attempt to accept the block.
block, err := dbFetchBlockByNode(dbTx, node)
isOrphan, delay, err := dag.ProcessBlock(block, BFWasStored)
if err != nil {
log.Warnf("Block %s, which was not previously processed, "+
"failed to be accepted to the DAG: %s", node.hash, err)
continue
}
// If the block is an orphan or is delayed then it couldn't have
// possibly been written to the block index in the first place.
if isOrphan {
return AssertError(fmt.Sprintf("Block %s, which was not "+
"previously processed, turned out to be an orphan, which is "+
"impossible.", node.hash))
}
if delay != 0 {
return AssertError(fmt.Sprintf("Block %s, which was not "+
"previously processed, turned out to be delayed, which is "+
"impossible.", node.hash))
}
}
return nil
})
}

View File

@ -40,6 +40,10 @@ const (
// netsync process
BFIsSync
// BFWasStored is set to indicate that the block was previously stored
// in the block index but was never fully processed
BFWasStored
// BFNone is a convenience value to specifically indicate no flags.
BFNone BehaviorFlags = 0
)
@ -134,12 +138,13 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrp
defer dag.dagLock.Unlock()
isDelayedBlock := flags&BFAfterDelay == BFAfterDelay
wasBlockStored := flags&BFWasStored == BFWasStored
blockHash := block.Hash()
log.Tracef("Processing block %s", blockHash)
// The block must not already exist in the DAG.
if dag.BlockExists(blockHash) {
if dag.BlockExists(blockHash) && !wasBlockStored {
str := fmt.Sprintf("already have block %s", blockHash)
return false, 0, ruleError(ErrDuplicateBlock, str)
}