diff --git a/blockdag/accept_test.go b/blockdag/accept_test.go index 9d4014f8d..0c7696968 100644 --- a/blockdag/accept_test.go +++ b/blockdag/accept_test.go @@ -57,11 +57,11 @@ func TestMaybeAcceptBlockErrors(t *testing.T) { // Add a valid block and mark it as invalid block1 := blocks[1] - isOrphan, delay, err := dag.ProcessBlock(block1, BFNone) + isOrphan, isDelayed, err := dag.ProcessBlock(block1, BFNone) if err != nil { t.Fatalf("TestMaybeAcceptBlockErrors: Valid block unexpectedly returned an error: %s", err) } - if delay != 0 { + if isDelayed { t.Fatalf("TestMaybeAcceptBlockErrors: block 1 is too far in the future") } if isOrphan { diff --git a/blockdag/dag.go b/blockdag/dag.go index d8702f01b..f77ec3ffe 100644 --- a/blockdag/dag.go +++ b/blockdag/dag.go @@ -6,12 +6,13 @@ package blockdag import ( "fmt" - "github.com/pkg/errors" "math" "sort" "sync" "time" + "github.com/pkg/errors" + "github.com/kaspanet/kaspad/util/subnetworkid" "github.com/kaspanet/kaspad/dagconfig" @@ -36,6 +37,12 @@ type orphanBlock struct { expiration time.Time } +// delayedBlock represents a block which has a delayed timestamp and will be processed at processTime +type delayedBlock struct { + block *util.Block + processTime time.Time +} + // chainUpdates represents the updates made to the selected parent chain after // a block had been added to the DAG. type chainUpdates struct { @@ -99,6 +106,13 @@ type BlockDAG struct { prevOrphans map[daghash.Hash][]*orphanBlock newestOrphan *orphanBlock + // delayedBlocks is a list of all delayed blocks. We are maintaining this + // list for the case where a new block with a valid timestamp points to a delayed block. + // In that case we will delay the processing of the child block so it would be processed + // after its parent. + delayedBlocks map[daghash.Hash]*delayedBlock + delayedBlocksQueue delayedBlocksHeap + // The following caches are used to efficiently keep track of the // current deployment threshold state of each rule change deployment. // @@ -144,8 +158,7 @@ type BlockDAG struct { // // This function is safe for concurrent access. func (dag *BlockDAG) HaveBlock(hash *daghash.Hash) bool { - exists := dag.BlockExists(hash) - return exists || dag.IsKnownOrphan(hash) + return dag.BlockExists(hash) || dag.IsKnownOrphan(hash) || dag.isKnownDelayedBlock(hash) } // HaveBlocks returns whether or not the DAG instances has all blocks represented @@ -1817,6 +1830,55 @@ func (dag *BlockDAG) SubnetworkID() *subnetworkid.SubnetworkID { return dag.subnetworkID } +func (dag *BlockDAG) addDelayedBlock(block *util.Block, delay time.Duration) error { + processTime := dag.timeSource.AdjustedTime().Add(delay) + log.Debugf("Adding block to delayed blocks queue (block hash: %s, process time: %s)", block.Hash().String(), processTime) + delayedBlock := &delayedBlock{ + block: block, + processTime: processTime, + } + + dag.delayedBlocks[*block.Hash()] = delayedBlock + dag.delayedBlocksQueue.Push(delayedBlock) + return dag.processDelayedBlocks() +} + +// processDelayedBlocks loops over all delayed blocks and processes blocks which are due. +// This method is invoked after processing a block (ProcessBlock method). +func (dag *BlockDAG) processDelayedBlocks() error { + // Check if the delayed block with the earliest process time should be processed + for dag.delayedBlocksQueue.Len() > 0 { + earliestDelayedBlockProcessTime := dag.peekDelayedBlock().processTime + if earliestDelayedBlockProcessTime.After(dag.timeSource.AdjustedTime()) { + break + } + delayedBlock := dag.popDelayedBlock() + _, _, err := dag.processBlockNoLock(delayedBlock.block, BFAfterDelay) + if err != nil { + log.Errorf("Error while processing delayed block (block %s)", delayedBlock.block.Hash().String()) + // Rule errors should not be propagated as they refer only to the delayed block, + // while this function runs in the context of another block + if _, ok := err.(RuleError); !ok { + return err + } + } + log.Debugf("Processed delayed block (block %s)", delayedBlock.block.Hash().String()) + } + + return nil +} + +// popDelayedBlock removes the topmost (delayed block with earliest process time) of the queue and returns it. +func (dag *BlockDAG) popDelayedBlock() *delayedBlock { + delayedBlock := dag.delayedBlocksQueue.pop() + delete(dag.delayedBlocks, *delayedBlock.block.Hash()) + return delayedBlock +} + +func (dag *BlockDAG) peekDelayedBlock() *delayedBlock { + return dag.delayedBlocksQueue.peek() +} + // IndexManager provides a generic interface that is called when blocks are // connected to the DAG for the purpose of supporting optional indexes. type IndexManager interface { @@ -1915,6 +1977,8 @@ func New(config *Config) (*BlockDAG, error) { virtual: newVirtualBlock(nil, params.K), orphans: make(map[daghash.Hash]*orphanBlock), prevOrphans: make(map[daghash.Hash][]*orphanBlock), + delayedBlocks: make(map[daghash.Hash]*delayedBlock), + delayedBlocksQueue: newDelayedBlocksHeap(), warningCaches: newThresholdCaches(vbNumBits), deploymentCaches: newThresholdCaches(dagconfig.DefinedDeployments), blockCount: 0, @@ -1953,13 +2017,11 @@ func New(config *Config) (*BlockDAG, error) { if genesis == nil { genesisBlock := util.NewBlock(dag.dagParams.GenesisBlock) - var isOrphan bool - var delay time.Duration - isOrphan, delay, err = dag.ProcessBlock(genesisBlock, BFNone) + isOrphan, isDelayed, err := dag.ProcessBlock(genesisBlock, BFNone) if err != nil { return nil, err } - if delay != 0 { + if isDelayed { return nil, errors.New("Genesis block shouldn't be in the future") } if isOrphan { @@ -1983,3 +2045,8 @@ func New(config *Config) (*BlockDAG, error) { return &dag, nil } + +func (dag *BlockDAG) isKnownDelayedBlock(hash *daghash.Hash) bool { + _, exists := dag.delayedBlocks[*hash] + return exists +} diff --git a/blockdag/dag_test.go b/blockdag/dag_test.go index 76580949c..fc726e1c5 100644 --- a/blockdag/dag_test.go +++ b/blockdag/dag_test.go @@ -59,11 +59,11 @@ func TestBlockCount(t *testing.T) { dag.TestSetCoinbaseMaturity(0) for i := 1; i < len(blocks); i++ { - isOrphan, delay, err := dag.ProcessBlock(blocks[i], BFNone) + isOrphan, isDelayed, err := dag.ProcessBlock(blocks[i], BFNone) if err != nil { t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block %d "+ "is too far in the future", i) } @@ -112,11 +112,11 @@ func TestHaveBlock(t *testing.T) { dag.TestSetCoinbaseMaturity(0) for i := 1; i < len(blocks); i++ { - isOrphan, delay, err := dag.ProcessBlock(blocks[i], BFNone) + isOrphan, isDelayed, err := dag.ProcessBlock(blocks[i], BFNone) if err != nil { t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block %d "+ "is too far in the future", i) } @@ -138,13 +138,13 @@ func TestHaveBlock(t *testing.T) { } blocks = append(blocks, blockTmp...) } - isOrphan, delay, err := dag.ProcessBlock(blocks[6], BFNone) + isOrphan, isDelayed, err := dag.ProcessBlock(blocks[6], BFNone) // Block 3C should fail to connect since its parents are related. (It points to 1 and 2, and 1 is the parent of 2) if err == nil { t.Fatalf("ProcessBlock for block 3C has no error when expected to have an error\n") } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block 3C " + "is too far in the future") } @@ -165,7 +165,7 @@ func TestHaveBlock(t *testing.T) { } blocks = append(blocks, blockTmp...) } - isOrphan, delay, err = dag.ProcessBlock(blocks[7], BFNone) + isOrphan, isDelayed, err = dag.ProcessBlock(blocks[7], BFNone) // Block 3D should fail to connect since it has a transaction with the same input twice if err == nil { @@ -178,7 +178,7 @@ func TestHaveBlock(t *testing.T) { if !ok || rErr.ErrorCode != ErrDuplicateTxInputs { t.Fatalf("ProcessBlock for block 3D expected error code %s but got %s\n", ErrDuplicateTxInputs, rErr.ErrorCode) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block 3D " + "is too far in the future") } @@ -188,13 +188,13 @@ func TestHaveBlock(t *testing.T) { } // Insert an orphan block. - isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(&Block100000), BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(&Block100000), BFNoPoWCheck) if err != nil { t.Fatalf("Unable to process block 100000: %v", err) } - if delay != 0 { - t.Fatalf("ProcessBlock incorrectly returned that block 100000 "+ - "has a %s delay", delay) + if isDelayed { + t.Fatalf("ProcessBlock incorrectly returned that block 100000 " + + "has a delay") } if !isOrphan { t.Fatalf("ProcessBlock indicated block is an not orphan when " + @@ -826,12 +826,10 @@ func testErrorThroughPatching(t *testing.T, expectedErrorMessage string, targetF guard := monkey.Patch(targetFunction, replacementFunction) defer guard.Unpatch() - err = nil for i := 1; i < len(blocks); i++ { - var isOrphan bool - var delay time.Duration - isOrphan, delay, err = dag.ProcessBlock(blocks[i], BFNone) - if delay != 0 { + var isOrphan, isDelayed bool + isOrphan, isDelayed, err = dag.ProcessBlock(blocks[i], BFNone) + if isDelayed { t.Fatalf("ProcessBlock: block %d "+ "is too far in the future", i) } @@ -1326,12 +1324,12 @@ func TestDAGIndexFailedStatus(t *testing.T) { ) invalidMsgBlock.AddTransaction(invalidCbTx) invalidBlock := util.NewBlock(invalidMsgBlock) - isOrphan, delay, err := dag.ProcessBlock(invalidBlock, BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(invalidBlock, BFNoPoWCheck) if _, ok := err.(RuleError); !ok { t.Fatalf("ProcessBlock: expected a rule error but got %s instead", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: invalidBlock " + "is too far in the future") } @@ -1356,11 +1354,11 @@ func TestDAGIndexFailedStatus(t *testing.T) { invalidMsgBlockChild.AddTransaction(invalidCbTx) invalidBlockChild := util.NewBlock(invalidMsgBlockChild) - isOrphan, delay, err = dag.ProcessBlock(invalidBlockChild, BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(invalidBlockChild, BFNoPoWCheck) if rErr, ok := err.(RuleError); !ok || rErr.ErrorCode != ErrInvalidAncestorBlock { t.Fatalf("ProcessBlock: expected a rule error but got %s instead", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: invalidBlockChild " + "is too far in the future") } @@ -1384,11 +1382,11 @@ func TestDAGIndexFailedStatus(t *testing.T) { invalidMsgBlockGrandChild.AddTransaction(invalidCbTx) invalidBlockGrandChild := util.NewBlock(invalidMsgBlockGrandChild) - isOrphan, delay, err = dag.ProcessBlock(invalidBlockGrandChild, BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(invalidBlockGrandChild, BFNoPoWCheck) if rErr, ok := err.(RuleError); !ok || rErr.ErrorCode != ErrInvalidAncestorBlock { t.Fatalf("ProcessBlock: expected a rule error but got %s instead", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: invalidBlockGrandChild " + "is too far in the future") } diff --git a/blockdag/dagio.go b/blockdag/dagio.go index a3e29c4ef..05baaf58d 100644 --- a/blockdag/dagio.go +++ b/blockdag/dagio.go @@ -587,7 +587,7 @@ func (dag *BlockDAG) initDAGState() error { // Attempt to accept the block. block, err := dbFetchBlockByNode(dbTx, node) - isOrphan, delay, err := dag.ProcessBlock(block, BFWasStored) + isOrphan, isDelayed, err := dag.ProcessBlock(block, BFWasStored) if err != nil { log.Warnf("Block %s, which was not previously processed, "+ "failed to be accepted to the DAG: %s", node.hash, err) @@ -601,7 +601,7 @@ func (dag *BlockDAG) initDAGState() error { "previously processed, turned out to be an orphan, which is "+ "impossible.", node.hash)) } - if delay != 0 { + if isDelayed { return AssertError(fmt.Sprintf("Block %s, which was not "+ "previously processed, turned out to be delayed, which is "+ "impossible.", node.hash)) diff --git a/blockdag/delayedblockheap.go b/blockdag/delayedblockheap.go new file mode 100644 index 000000000..337dc608e --- /dev/null +++ b/blockdag/delayedblockheap.go @@ -0,0 +1,73 @@ +package blockdag + +import ( + "container/heap" +) + +type baseDelayedBlocksHeap []*delayedBlock + +func (h baseDelayedBlocksHeap) Len() int { + return len(h) +} +func (h baseDelayedBlocksHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *baseDelayedBlocksHeap) Push(x interface{}) { + *h = append(*h, x.(*delayedBlock)) +} + +func (h *baseDelayedBlocksHeap) Pop() interface{} { + oldHeap := *h + oldLength := len(oldHeap) + popped := oldHeap[oldLength-1] + *h = oldHeap[0 : oldLength-1] + return popped +} + +func (h baseDelayedBlocksHeap) peek() interface{} { + if h.Len() > 0 { + return h[h.Len()-1] + } + return nil +} + +func (h baseDelayedBlocksHeap) Less(i, j int) bool { + return h[j].processTime.After(h[i].processTime) +} + +type delayedBlocksHeap struct { + baseDelayedBlocksHeap *baseDelayedBlocksHeap + impl heap.Interface +} + +// newDelayedBlocksHeap initializes and returns a new delayedBlocksHeap +func newDelayedBlocksHeap() delayedBlocksHeap { + baseHeap := &baseDelayedBlocksHeap{} + h := delayedBlocksHeap{impl: baseHeap, baseDelayedBlocksHeap: baseHeap} + heap.Init(h.impl) + return h +} + +// pop removes the block with lowest height from this heap and returns it +func (dbh delayedBlocksHeap) pop() *delayedBlock { + return heap.Pop(dbh.impl).(*delayedBlock) +} + +// Push pushes the block onto the heap +func (dbh delayedBlocksHeap) Push(block *delayedBlock) { + heap.Push(dbh.impl, block) +} + +// Len returns the length of this heap +func (dbh delayedBlocksHeap) Len() int { + return dbh.impl.Len() +} + +// peek returns the topmost element in the queue without poping it +func (dbh delayedBlocksHeap) peek() *delayedBlock { + if dbh.baseDelayedBlocksHeap.peek() == nil { + return nil + } + return dbh.baseDelayedBlocksHeap.peek().(*delayedBlock) +} diff --git a/blockdag/external_dag_test.go b/blockdag/external_dag_test.go index debe32b15..8839b060a 100644 --- a/blockdag/external_dag_test.go +++ b/blockdag/external_dag_test.go @@ -54,11 +54,11 @@ func TestFinality(t *testing.T) { } block := util.NewBlock(msgBlock) - isOrphan, delay, err := dag.ProcessBlock(block, blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNoPoWCheck) if err != nil { return nil, err } - if delay != 0 { + if isDelayed { return nil, errors.Errorf("ProcessBlock: block " + "is too far in the future") } @@ -216,11 +216,11 @@ func TestChainedTransactions(t *testing.T) { if err != nil { t.Fatalf("PrepareBlockForTest: %v", err) } - isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(block1), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(block1), blockdag.BFNoPoWCheck) if err != nil { t.Fatalf("ProcessBlock: %v", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block1 " + "is too far in the future") } @@ -266,7 +266,7 @@ func TestChainedTransactions(t *testing.T) { } //Checks that dag.ProcessBlock fails because we don't allow a transaction to spend another transaction from the same block - isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(block2), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(block2), blockdag.BFNoPoWCheck) if err == nil { t.Errorf("ProcessBlock expected an error") } else if rErr, ok := err.(blockdag.RuleError); ok { @@ -276,7 +276,7 @@ func TestChainedTransactions(t *testing.T) { } else { t.Errorf("ProcessBlock expected a blockdag.RuleError but got %v", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block2 " + "is too far in the future") } @@ -301,11 +301,11 @@ func TestChainedTransactions(t *testing.T) { } //Checks that dag.ProcessBlock doesn't fail because all of its transaction are dependant on transactions from previous blocks - isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(block3), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(block3), blockdag.BFNoPoWCheck) if err != nil { t.Errorf("ProcessBlock: %v", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block3 " + "is too far in the future") } @@ -359,11 +359,11 @@ func TestOrderInDiffFromAcceptanceData(t *testing.T) { // Add the block to the DAG newBlock := util.NewBlock(msgBlock) - isOrphan, delay, err := dag.ProcessBlock(newBlock, blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(newBlock, blockdag.BFNoPoWCheck) if err != nil { t.Errorf("TestOrderInDiffFromAcceptanceData: %s", err) } - if delay != 0 { + if isDelayed { t.Fatalf("TestOrderInDiffFromAcceptanceData: block is too far in the future") } if isOrphan { @@ -412,11 +412,11 @@ func TestGasLimit(t *testing.T) { if err != nil { t.Fatalf("PrepareBlockForTest: %v", err) } - isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(fundsBlock), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(fundsBlock), blockdag.BFNoPoWCheck) if err != nil { t.Fatalf("ProcessBlock: %v", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: the funds block " + "is too far in the future") } @@ -464,7 +464,7 @@ func TestGasLimit(t *testing.T) { if err != nil { t.Fatalf("PrepareBlockForTest: %v", err) } - isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(overLimitBlock), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(overLimitBlock), blockdag.BFNoPoWCheck) if err == nil { t.Fatalf("ProcessBlock expected to have an error in block that exceeds gas limit") } @@ -474,7 +474,7 @@ func TestGasLimit(t *testing.T) { } else if rErr.ErrorCode != blockdag.ErrInvalidGas { t.Fatalf("ProcessBlock expected error code %s but got %s", blockdag.ErrInvalidGas, rErr.ErrorCode) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: overLimitBlock " + "is too far in the future") } @@ -499,7 +499,7 @@ func TestGasLimit(t *testing.T) { if err != nil { t.Fatalf("PrepareBlockForTest: %v", err) } - isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(overflowGasBlock), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(overflowGasBlock), blockdag.BFNoPoWCheck) if err == nil { t.Fatalf("ProcessBlock expected to have an error") } @@ -532,7 +532,7 @@ func TestGasLimit(t *testing.T) { } // Here we check that we can't process a block with a transaction from a non-existent subnetwork - isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(nonExistentSubnetworkBlock), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(nonExistentSubnetworkBlock), blockdag.BFNoPoWCheck) expectedErrStr := fmt.Sprintf("Error getting gas limit for subnetworkID '%s': subnetwork '%s' not found", nonExistentSubnetwork, nonExistentSubnetwork) if err.Error() != expectedErrStr { @@ -544,11 +544,11 @@ func TestGasLimit(t *testing.T) { if err != nil { t.Fatalf("PrepareBlockForTest: %v", err) } - isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(validBlock), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(validBlock), blockdag.BFNoPoWCheck) if err != nil { t.Fatalf("ProcessBlock: %v", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: overLimitBlock " + "is too far in the future") } diff --git a/blockdag/indexers/acceptanceindex_test.go b/blockdag/indexers/acceptanceindex_test.go index b92b5d00c..0d355a364 100644 --- a/blockdag/indexers/acceptanceindex_test.go +++ b/blockdag/indexers/acceptanceindex_test.go @@ -116,11 +116,11 @@ func TestAcceptanceIndexRecover(t *testing.T) { } for i := 1; i < len(blocks)-2; i++ { - isOrphan, delay, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone) + isOrphan, isDelayed, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone) if err != nil { t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block %d "+ "is too far in the future", i) } @@ -147,11 +147,11 @@ func TestAcceptanceIndexRecover(t *testing.T) { } for i := len(blocks) - 2; i < len(blocks); i++ { - isOrphan, delay, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone) + isOrphan, isDelayed, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone) if err != nil { t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block %d "+ "is too far in the future", i) } @@ -185,11 +185,11 @@ func TestAcceptanceIndexRecover(t *testing.T) { } for i := len(blocks) - 2; i < len(blocks); i++ { - isOrphan, delay, err := db2DAG.ProcessBlock(blocks[i], blockdag.BFNone) + isOrphan, isDelayed, err := db2DAG.ProcessBlock(blocks[i], blockdag.BFNone) if err != nil { t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block %d "+ "is too far in the future", i) } diff --git a/blockdag/indexers/txindex_test.go b/blockdag/indexers/txindex_test.go index 35b75577c..72a0a60eb 100644 --- a/blockdag/indexers/txindex_test.go +++ b/blockdag/indexers/txindex_test.go @@ -63,11 +63,11 @@ func TestTxIndexConnectBlock(t *testing.T) { } utilBlock := util.NewBlock(block) blocks[*block.BlockHash()] = utilBlock - isOrphan, delay, err := dag.ProcessBlock(utilBlock, blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(utilBlock, blockdag.BFNoPoWCheck) if err != nil { t.Fatalf("TestTxIndexConnectBlock: dag.ProcessBlock got unexpected error for block %v: %v", blockName, err) } - if delay != 0 { + if isDelayed { t.Fatalf("TestTxIndexConnectBlock: block %s "+ "is too far in the future", blockName) } diff --git a/blockdag/notifications_test.go b/blockdag/notifications_test.go index 3bf839f11..e1df2a832 100644 --- a/blockdag/notifications_test.go +++ b/blockdag/notifications_test.go @@ -41,11 +41,11 @@ func TestNotifications(t *testing.T) { dag.Subscribe(callback) } - isOrphan, delay, err := dag.ProcessBlock(blocks[1], BFNone) + isOrphan, isDelayed, err := dag.ProcessBlock(blocks[1], BFNone) if err != nil { t.Fatalf("ProcessBlock fail on block 1: %v\n", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: block 1 " + "is too far in the future") } diff --git a/blockdag/process.go b/blockdag/process.go index 6b39acb75..f4afa0ccc 100644 --- a/blockdag/process.go +++ b/blockdag/process.go @@ -134,11 +134,14 @@ func (dag *BlockDAG) processOrphans(hash *daghash.Hash, flags BehaviorFlags) err // whether or not the block is an orphan. // // This function is safe for concurrent access. -func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrphan bool, delay time.Duration, err error) { +func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrphan bool, isDelayed bool, err error) { dag.dagLock.Lock() defer dag.dagLock.Unlock() + return dag.processBlockNoLock(block, flags) +} - isDelayedBlock := flags&BFAfterDelay == BFAfterDelay +func (dag *BlockDAG) processBlockNoLock(block *util.Block, flags BehaviorFlags) (isOrphan bool, isDelayed bool, err error) { + isAfterDelay := flags&BFAfterDelay == BFAfterDelay wasBlockStored := flags&BFWasStored == BFWasStored blockHash := block.Hash() @@ -147,36 +150,57 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrp // The block must not already exist in the DAG. if dag.BlockExists(blockHash) && !wasBlockStored { str := fmt.Sprintf("already have block %s", blockHash) - return false, 0, ruleError(ErrDuplicateBlock, str) + return false, false, ruleError(ErrDuplicateBlock, str) } // The block must not already exist as an orphan. if _, exists := dag.orphans[*blockHash]; exists { str := fmt.Sprintf("already have block (orphan) %s", blockHash) - return false, 0, ruleError(ErrDuplicateBlock, str) + return false, false, ruleError(ErrDuplicateBlock, str) } - if !isDelayedBlock { + if dag.isKnownDelayedBlock(blockHash) { + str := fmt.Sprintf("already have block (delayed) %s", blockHash) + return false, false, ruleError(ErrDuplicateBlock, str) + } + + if !isAfterDelay { // Perform preliminary sanity checks on the block and its transactions. delay, err := dag.checkBlockSanity(block, flags) if err != nil { - return false, 0, err + return false, false, err } if delay != 0 { - return false, delay, err + err = dag.addDelayedBlock(block, delay) + if err != nil { + return false, false, err + } + return false, true, nil } } + var missingParents []*daghash.Hash + for _, parentHash := range block.MsgBlock().Header.ParentHashes { + if !dag.BlockExists(parentHash) { + missingParents = append(missingParents, parentHash) + } + } + + // Handle the case of a block with a valid timestamp(non-delayed) which points to a delayed block. + delay, isParentDelayed := dag.maxDelayOfParents(missingParents) + if isParentDelayed { + // Add Nanosecond to ensure that parent process time will be after its child. + delay += time.Nanosecond + err := dag.addDelayedBlock(block, delay) + if err != nil { + return false, false, err + } + return false, true, err + } + // Handle orphan blocks. - allParentsExist := true - for _, parentHash := range block.MsgBlock().Header.ParentHashes { - if !dag.BlockExists(parentHash) { - allParentsExist = false - } - } - - if !allParentsExist { + if len(missingParents) > 0 { // Some orphans during netsync are a normal part of the process, since the anticone // of the chain-split is never explicitly requested. // Therefore, if we are during netsync - don't report orphans to default logs. @@ -191,14 +215,14 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrp } dag.addOrphanBlock(block) - return true, 0, nil + return true, false, nil } // The block has passed all context independent checks and appears sane // enough to potentially accept it into the block DAG. err = dag.maybeAcceptBlock(block, flags) if err != nil { - return false, 0, err + return false, false, err } // Accept any orphan blocks that depend on this block (they are @@ -206,10 +230,33 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrp // there are no more. err = dag.processOrphans(blockHash, flags) if err != nil { - return false, 0, err + return false, false, err + } + + if !isAfterDelay { + err = dag.processDelayedBlocks() + if err != nil { + return false, false, err + } } log.Debugf("Accepted block %s", blockHash) - return false, 0, nil + return false, false, nil +} + +// maxDelayOfParents returns the maximum delay of the given block hashes. +// Note that delay could be 0, but isDelayed will return true. This is the case where the parent process time is due. +func (dag *BlockDAG) maxDelayOfParents(parentHashes []*daghash.Hash) (delay time.Duration, isDelayed bool) { + for _, parentHash := range parentHashes { + if delayedParent, exists := dag.delayedBlocks[*parentHash]; exists { + isDelayed = true + parentDelay := delayedParent.processTime.Sub(dag.timeSource.AdjustedTime()) + if parentDelay > delay { + delay = parentDelay + } + } + } + + return delay, isDelayed } diff --git a/blockdag/process_test.go b/blockdag/process_test.go index b611860e8..7b3d107e9 100644 --- a/blockdag/process_test.go +++ b/blockdag/process_test.go @@ -29,11 +29,11 @@ func TestProcessBlock(t *testing.T) { }) defer guard.Unpatch() - isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(&Block100000), BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(&Block100000), BFNoPoWCheck) if err != nil { t.Errorf("ProcessBlock: %s", err) } - if delay != 0 { + if isDelayed { t.Errorf("ProcessBlock: block is too far in the future") } if !isOrphan { @@ -47,11 +47,11 @@ func TestProcessBlock(t *testing.T) { // Change nonce to change block hash Block100000Copy.Header.Nonce++ called = false - isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(&Block100000Copy), BFAfterDelay|BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(&Block100000Copy), BFAfterDelay|BFNoPoWCheck) if err != nil { t.Errorf("ProcessBlock: %s", err) } - if delay != 0 { + if isDelayed { t.Errorf("ProcessBlock: block is too far in the future") } if !isOrphan { @@ -61,7 +61,7 @@ func TestProcessBlock(t *testing.T) { t.Errorf("ProcessBlock: Didn't expected checkBlockSanity to be called") } - isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(dagconfig.SimNetParams.GenesisBlock), BFNone) + isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(dagconfig.SimNetParams.GenesisBlock), BFNone) expectedErrMsg := fmt.Sprintf("already have block %s", dagconfig.SimNetParams.GenesisHash) if err == nil || err.Error() != expectedErrMsg { t.Errorf("ProcessBlock: Expected error \"%s\" but got \"%s\"", expectedErrMsg, err) @@ -97,11 +97,11 @@ func TestProcessOrphans(t *testing.T) { childBlock.MsgBlock().Header.UTXOCommitment = &daghash.ZeroHash // Process the child block so that it gets added to the orphan pool - isOrphan, delay, err := dag.ProcessBlock(childBlock, BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(childBlock, BFNoPoWCheck) if err != nil { t.Fatalf("TestProcessOrphans: child block unexpectedly returned an error: %s", err) } - if delay != 0 { + if isDelayed { t.Fatalf("TestProcessOrphans: child block is too far in the future") } if !isOrphan { @@ -109,11 +109,11 @@ func TestProcessOrphans(t *testing.T) { } // Process the parent block. Note that this will attempt to unorphan the child block - isOrphan, delay, err = dag.ProcessBlock(parentBlock, BFNone) + isOrphan, isDelayed, err = dag.ProcessBlock(parentBlock, BFNone) if err != nil { t.Fatalf("TestProcessOrphans: parent block unexpectedly returned an error: %s", err) } - if delay != 0 { + if isDelayed { t.Fatalf("TestProcessOrphans: parent block is too far in the future") } if isOrphan { diff --git a/blockdag/validate.go b/blockdag/validate.go index eff1c3406..e597ef629 100644 --- a/blockdag/validate.go +++ b/blockdag/validate.go @@ -6,11 +6,12 @@ package blockdag import ( "fmt" - "github.com/pkg/errors" "math" "sort" "time" + "github.com/pkg/errors" + "github.com/kaspanet/kaspad/dagconfig" "github.com/kaspanet/kaspad/txscript" "github.com/kaspanet/kaspad/util" @@ -432,6 +433,8 @@ func (dag *BlockDAG) checkBlockHeaderSanity(header *wire.BlockHeader, flags Beha // Ensure the block time is not too far in the future. If it's too far, return // the duration of time that should be waited before the block becomes valid. + // This check needs to be last as it does not return an error but rather marks the + // header as delayed (and valid). maxTimestamp := dag.timeSource.AdjustedTime().Add(time.Second * time.Duration(int64(dag.TimestampDeviationTolerance)*dag.targetTimePerBlock)) if header.Timestamp.After(maxTimestamp) { diff --git a/blockdag/validate_test.go b/blockdag/validate_test.go index 5b6dbdbfe..25601ad2d 100644 --- a/blockdag/validate_test.go +++ b/blockdag/validate_test.go @@ -101,12 +101,12 @@ func TestCheckConnectBlockTemplate(t *testing.T) { } for i := 1; i <= 3; i++ { - _, delay, err := dag.ProcessBlock(blocks[i], BFNone) + _, isDelayed, err := dag.ProcessBlock(blocks[i], BFNone) if err != nil { t.Fatalf("CheckConnectBlockTemplate: Received unexpected error "+ "processing block %d: %v", i, err) } - if delay != 0 { + if isDelayed { t.Fatalf("CheckConnectBlockTemplate: block %d is too far in the future", i) } } diff --git a/cmd/addblock/import.go b/cmd/addblock/import.go index 92a430f77..82fe2584a 100644 --- a/cmd/addblock/import.go +++ b/cmd/addblock/import.go @@ -116,12 +116,12 @@ func (bi *blockImporter) processBlock(serializedBlock []byte) (bool, error) { } // Ensure the blocks follows all of the DAG rules. - isOrphan, delay, err := bi.dag.ProcessBlock(block, + isOrphan, isDelayed, err := bi.dag.ProcessBlock(block, blockdag.BFFastAdd) if err != nil { return false, err } - if delay != 0 { + if isDelayed { return false, errors.Errorf("import file contains a block that is too far in the future") } if isOrphan { diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 8097a6e5c..cf30d1a44 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -262,11 +262,11 @@ func (tc *testContext) mineTransactions(transactions []*util.Tx, numberOfBlocks tc.t.Fatalf("PrepareBlockForTest: %s", err) } utilBlock := util.NewBlock(block) - isOrphan, delay, err := tc.harness.txPool.cfg.DAG.ProcessBlock(utilBlock, blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := tc.harness.txPool.cfg.DAG.ProcessBlock(utilBlock, blockdag.BFNoPoWCheck) if err != nil { tc.t.Fatalf("ProcessBlock: %s", err) } - if delay != 0 { + if isDelayed { tc.t.Fatalf("ProcessBlock: block %s "+ "is too far in the future", block.BlockHash()) } diff --git a/mining/mining_test.go b/mining/mining_test.go index 9f5a724d3..306413283 100644 --- a/mining/mining_test.go +++ b/mining/mining_test.go @@ -58,12 +58,12 @@ func TestNewBlockTemplate(t *testing.T) { t.Fatalf("NewBlockTemplate: %v", err) } - isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(template1.Block), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(template1.Block), blockdag.BFNoPoWCheck) if err != nil { t.Fatalf("ProcessBlock: %v", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: template1 " + "is too far in the future") } @@ -79,11 +79,11 @@ func TestNewBlockTemplate(t *testing.T) { if err != nil { t.Fatalf("NewBlockTemplate: %v", err) } - isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(template.Block), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(template.Block), blockdag.BFNoPoWCheck) if err != nil { t.Fatalf("ProcessBlock: %v", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: template " + "is too far in the future") } diff --git a/mining/txselection_test.go b/mining/txselection_test.go index c2e42fbfc..2431ebdbe 100644 --- a/mining/txselection_test.go +++ b/mining/txselection_test.go @@ -55,11 +55,11 @@ func TestSelectTxs(t *testing.T) { if err != nil { t.Fatalf("NewBlockTemplate: %v", err) } - isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(template.Block), blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(template.Block), blockdag.BFNoPoWCheck) if err != nil { t.Fatalf("ProcessBlock: %v", err) } - if delay != 0 { + if isDelayed { t.Fatalf("ProcessBlock: template " + "is too far in the future") } diff --git a/netsync/manager.go b/netsync/manager.go index 4bcf21e24..f97360a46 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -439,7 +439,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } // Process the block to include validation, orphan handling, etc. - isOrphan, delay, err := sm.dag.ProcessBlock(bmsg.block, behaviorFlags) + isOrphan, isDelayed, err := sm.dag.ProcessBlock(bmsg.block, behaviorFlags) // Remove block from request maps. Either DAG knows about it and // so we shouldn't have any more instances of trying to fetch it, or @@ -473,10 +473,8 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { return } - if delay != 0 { - spawn(func() { - sm.QueueBlock(bmsg.block, bmsg.peer, true, make(chan struct{})) - }, sm.handlePanic) + if isDelayed { + return } // Request the parents for the orphan block from the peer that sent it. @@ -869,7 +867,7 @@ out: msg.reply <- peerID case processBlockMsg: - isOrphan, delay, err := sm.dag.ProcessBlock( + isOrphan, isDelayed, err := sm.dag.ProcessBlock( msg.block, msg.flags) if err != nil { msg.reply <- processBlockResponse{ @@ -877,7 +875,7 @@ out: err: err, } } - if delay != 0 { + if isDelayed { msg.reply <- processBlockResponse{ isOrphan: false, err: errors.New("Cannot process blocks from RPC beyond the allowed time offset"), diff --git a/util/testtools/testtools.go b/util/testtools/testtools.go index fb2f810cb..43c77f242 100644 --- a/util/testtools/testtools.go +++ b/util/testtools/testtools.go @@ -27,12 +27,12 @@ func RegisterSubnetworkForTest(dag *blockdag.BlockDAG, params *dagconfig.Params, } addBlockToDAG := func(block *util.Block) error { - isOrphan, delay, err := dag.ProcessBlock(block, blockdag.BFNoPoWCheck) + isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNoPoWCheck) if err != nil { return err } - if delay != 0 { + if isDelayed { return errors.Errorf("ProcessBlock: block is is too far in the future") }