[NOD-420] Process delayed blocks (#529)

* [NOD-420] Delay blocks with valid timestamp (non-delayed) that point to a delayed block.

* [NOD-420] Mark block as requested when setting as delayed.

* [NOD-420] Merge master; Use dag.timeSource.AdjustedTime() instead of time.Now;

* [NOD-420] Return nil when not expecting an error

* [NOD-420] Initialise delyaed blocks mapping

* [NOD-420] Trigger delayed blocks processing every time we process a block.

* [NOD-420] Hold the read lock in processDelayedBlocks

* [NOD-420] Add delayed blocks heap sorted by their process time so we could process them in order.

* [NOD-420] Update debug log

* [NOD-420] Fix process blocks loop

* [NOD-420] Add comment

* [NOD-420] Log error message

* [NOD-420] Implement peek method for delayed block heap. extract delayed block processing to another  function.

* [NOD-420] Trigger process delayed blocks only in process block

* [NOD-420] Move delayed block addition to process block

* [NOD-420] Use process block to make sure we fully process the delayed block and deal with orphans.

* [NOD-420] Unexport functions when not needed; Return isDelayed boolean from ProcessBlock instead of the delay duration

* [NOd-420] Remove redundant delayedBlocksLock

* [NOD-420] Resolve merge conflict; Return delay 0 instead of boolean

* [NOD-420] Do not treat delayed block as orphan

* [NOD-420] Make sure block is not processed if we have already sa delayed.

* [NOD-420] Process delayed block if parent is delayed to make sure it would not be treated as orphan.

* [NOD-420] Rename variable

* [NOD-420] Rename function. Move maxDelayOfParents to process.go

* [NOD-420] Fix typo

* [NOD-420] Handle errors from processDelayedBlocks properly

* [NOD-420] Return default values if err != nil from dag.addDelayedBlock

* [NOD-420] Return default values if err != nil from dag.addDelayedBlock in another place

Co-authored-by: Svarog <feanorr@gmail.com>
This commit is contained in:
Dan Aharoni 2020-01-08 15:28:52 +02:00 committed by Ori Newman
parent ac9aa74a75
commit ea6f7a28c2
19 changed files with 298 additions and 112 deletions

View File

@ -57,11 +57,11 @@ func TestMaybeAcceptBlockErrors(t *testing.T) {
// Add a valid block and mark it as invalid
block1 := blocks[1]
isOrphan, delay, err := dag.ProcessBlock(block1, BFNone)
isOrphan, isDelayed, err := dag.ProcessBlock(block1, BFNone)
if err != nil {
t.Fatalf("TestMaybeAcceptBlockErrors: Valid block unexpectedly returned an error: %s", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("TestMaybeAcceptBlockErrors: block 1 is too far in the future")
}
if isOrphan {

View File

@ -6,12 +6,13 @@ package blockdag
import (
"fmt"
"github.com/pkg/errors"
"math"
"sort"
"sync"
"time"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/util/subnetworkid"
"github.com/kaspanet/kaspad/dagconfig"
@ -36,6 +37,12 @@ type orphanBlock struct {
expiration time.Time
}
// delayedBlock represents a block which has a delayed timestamp and will be processed at processTime
type delayedBlock struct {
block *util.Block
processTime time.Time
}
// chainUpdates represents the updates made to the selected parent chain after
// a block had been added to the DAG.
type chainUpdates struct {
@ -99,6 +106,13 @@ type BlockDAG struct {
prevOrphans map[daghash.Hash][]*orphanBlock
newestOrphan *orphanBlock
// delayedBlocks is a list of all delayed blocks. We are maintaining this
// list for the case where a new block with a valid timestamp points to a delayed block.
// In that case we will delay the processing of the child block so it would be processed
// after its parent.
delayedBlocks map[daghash.Hash]*delayedBlock
delayedBlocksQueue delayedBlocksHeap
// The following caches are used to efficiently keep track of the
// current deployment threshold state of each rule change deployment.
//
@ -144,8 +158,7 @@ type BlockDAG struct {
//
// This function is safe for concurrent access.
func (dag *BlockDAG) HaveBlock(hash *daghash.Hash) bool {
exists := dag.BlockExists(hash)
return exists || dag.IsKnownOrphan(hash)
return dag.BlockExists(hash) || dag.IsKnownOrphan(hash) || dag.isKnownDelayedBlock(hash)
}
// HaveBlocks returns whether or not the DAG instances has all blocks represented
@ -1817,6 +1830,55 @@ func (dag *BlockDAG) SubnetworkID() *subnetworkid.SubnetworkID {
return dag.subnetworkID
}
func (dag *BlockDAG) addDelayedBlock(block *util.Block, delay time.Duration) error {
processTime := dag.timeSource.AdjustedTime().Add(delay)
log.Debugf("Adding block to delayed blocks queue (block hash: %s, process time: %s)", block.Hash().String(), processTime)
delayedBlock := &delayedBlock{
block: block,
processTime: processTime,
}
dag.delayedBlocks[*block.Hash()] = delayedBlock
dag.delayedBlocksQueue.Push(delayedBlock)
return dag.processDelayedBlocks()
}
// processDelayedBlocks loops over all delayed blocks and processes blocks which are due.
// This method is invoked after processing a block (ProcessBlock method).
func (dag *BlockDAG) processDelayedBlocks() error {
// Check if the delayed block with the earliest process time should be processed
for dag.delayedBlocksQueue.Len() > 0 {
earliestDelayedBlockProcessTime := dag.peekDelayedBlock().processTime
if earliestDelayedBlockProcessTime.After(dag.timeSource.AdjustedTime()) {
break
}
delayedBlock := dag.popDelayedBlock()
_, _, err := dag.processBlockNoLock(delayedBlock.block, BFAfterDelay)
if err != nil {
log.Errorf("Error while processing delayed block (block %s)", delayedBlock.block.Hash().String())
// Rule errors should not be propagated as they refer only to the delayed block,
// while this function runs in the context of another block
if _, ok := err.(RuleError); !ok {
return err
}
}
log.Debugf("Processed delayed block (block %s)", delayedBlock.block.Hash().String())
}
return nil
}
// popDelayedBlock removes the topmost (delayed block with earliest process time) of the queue and returns it.
func (dag *BlockDAG) popDelayedBlock() *delayedBlock {
delayedBlock := dag.delayedBlocksQueue.pop()
delete(dag.delayedBlocks, *delayedBlock.block.Hash())
return delayedBlock
}
func (dag *BlockDAG) peekDelayedBlock() *delayedBlock {
return dag.delayedBlocksQueue.peek()
}
// IndexManager provides a generic interface that is called when blocks are
// connected to the DAG for the purpose of supporting optional indexes.
type IndexManager interface {
@ -1915,6 +1977,8 @@ func New(config *Config) (*BlockDAG, error) {
virtual: newVirtualBlock(nil, params.K),
orphans: make(map[daghash.Hash]*orphanBlock),
prevOrphans: make(map[daghash.Hash][]*orphanBlock),
delayedBlocks: make(map[daghash.Hash]*delayedBlock),
delayedBlocksQueue: newDelayedBlocksHeap(),
warningCaches: newThresholdCaches(vbNumBits),
deploymentCaches: newThresholdCaches(dagconfig.DefinedDeployments),
blockCount: 0,
@ -1953,13 +2017,11 @@ func New(config *Config) (*BlockDAG, error) {
if genesis == nil {
genesisBlock := util.NewBlock(dag.dagParams.GenesisBlock)
var isOrphan bool
var delay time.Duration
isOrphan, delay, err = dag.ProcessBlock(genesisBlock, BFNone)
isOrphan, isDelayed, err := dag.ProcessBlock(genesisBlock, BFNone)
if err != nil {
return nil, err
}
if delay != 0 {
if isDelayed {
return nil, errors.New("Genesis block shouldn't be in the future")
}
if isOrphan {
@ -1983,3 +2045,8 @@ func New(config *Config) (*BlockDAG, error) {
return &dag, nil
}
func (dag *BlockDAG) isKnownDelayedBlock(hash *daghash.Hash) bool {
_, exists := dag.delayedBlocks[*hash]
return exists
}

View File

@ -59,11 +59,11 @@ func TestBlockCount(t *testing.T) {
dag.TestSetCoinbaseMaturity(0)
for i := 1; i < len(blocks); i++ {
isOrphan, delay, err := dag.ProcessBlock(blocks[i], BFNone)
isOrphan, isDelayed, err := dag.ProcessBlock(blocks[i], BFNone)
if err != nil {
t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block %d "+
"is too far in the future", i)
}
@ -112,11 +112,11 @@ func TestHaveBlock(t *testing.T) {
dag.TestSetCoinbaseMaturity(0)
for i := 1; i < len(blocks); i++ {
isOrphan, delay, err := dag.ProcessBlock(blocks[i], BFNone)
isOrphan, isDelayed, err := dag.ProcessBlock(blocks[i], BFNone)
if err != nil {
t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block %d "+
"is too far in the future", i)
}
@ -138,13 +138,13 @@ func TestHaveBlock(t *testing.T) {
}
blocks = append(blocks, blockTmp...)
}
isOrphan, delay, err := dag.ProcessBlock(blocks[6], BFNone)
isOrphan, isDelayed, err := dag.ProcessBlock(blocks[6], BFNone)
// Block 3C should fail to connect since its parents are related. (It points to 1 and 2, and 1 is the parent of 2)
if err == nil {
t.Fatalf("ProcessBlock for block 3C has no error when expected to have an error\n")
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block 3C " +
"is too far in the future")
}
@ -165,7 +165,7 @@ func TestHaveBlock(t *testing.T) {
}
blocks = append(blocks, blockTmp...)
}
isOrphan, delay, err = dag.ProcessBlock(blocks[7], BFNone)
isOrphan, isDelayed, err = dag.ProcessBlock(blocks[7], BFNone)
// Block 3D should fail to connect since it has a transaction with the same input twice
if err == nil {
@ -178,7 +178,7 @@ func TestHaveBlock(t *testing.T) {
if !ok || rErr.ErrorCode != ErrDuplicateTxInputs {
t.Fatalf("ProcessBlock for block 3D expected error code %s but got %s\n", ErrDuplicateTxInputs, rErr.ErrorCode)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block 3D " +
"is too far in the future")
}
@ -188,13 +188,13 @@ func TestHaveBlock(t *testing.T) {
}
// Insert an orphan block.
isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(&Block100000), BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(&Block100000), BFNoPoWCheck)
if err != nil {
t.Fatalf("Unable to process block 100000: %v", err)
}
if delay != 0 {
t.Fatalf("ProcessBlock incorrectly returned that block 100000 "+
"has a %s delay", delay)
if isDelayed {
t.Fatalf("ProcessBlock incorrectly returned that block 100000 " +
"has a delay")
}
if !isOrphan {
t.Fatalf("ProcessBlock indicated block is an not orphan when " +
@ -826,12 +826,10 @@ func testErrorThroughPatching(t *testing.T, expectedErrorMessage string, targetF
guard := monkey.Patch(targetFunction, replacementFunction)
defer guard.Unpatch()
err = nil
for i := 1; i < len(blocks); i++ {
var isOrphan bool
var delay time.Duration
isOrphan, delay, err = dag.ProcessBlock(blocks[i], BFNone)
if delay != 0 {
var isOrphan, isDelayed bool
isOrphan, isDelayed, err = dag.ProcessBlock(blocks[i], BFNone)
if isDelayed {
t.Fatalf("ProcessBlock: block %d "+
"is too far in the future", i)
}
@ -1326,12 +1324,12 @@ func TestDAGIndexFailedStatus(t *testing.T) {
)
invalidMsgBlock.AddTransaction(invalidCbTx)
invalidBlock := util.NewBlock(invalidMsgBlock)
isOrphan, delay, err := dag.ProcessBlock(invalidBlock, BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(invalidBlock, BFNoPoWCheck)
if _, ok := err.(RuleError); !ok {
t.Fatalf("ProcessBlock: expected a rule error but got %s instead", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: invalidBlock " +
"is too far in the future")
}
@ -1356,11 +1354,11 @@ func TestDAGIndexFailedStatus(t *testing.T) {
invalidMsgBlockChild.AddTransaction(invalidCbTx)
invalidBlockChild := util.NewBlock(invalidMsgBlockChild)
isOrphan, delay, err = dag.ProcessBlock(invalidBlockChild, BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(invalidBlockChild, BFNoPoWCheck)
if rErr, ok := err.(RuleError); !ok || rErr.ErrorCode != ErrInvalidAncestorBlock {
t.Fatalf("ProcessBlock: expected a rule error but got %s instead", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: invalidBlockChild " +
"is too far in the future")
}
@ -1384,11 +1382,11 @@ func TestDAGIndexFailedStatus(t *testing.T) {
invalidMsgBlockGrandChild.AddTransaction(invalidCbTx)
invalidBlockGrandChild := util.NewBlock(invalidMsgBlockGrandChild)
isOrphan, delay, err = dag.ProcessBlock(invalidBlockGrandChild, BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(invalidBlockGrandChild, BFNoPoWCheck)
if rErr, ok := err.(RuleError); !ok || rErr.ErrorCode != ErrInvalidAncestorBlock {
t.Fatalf("ProcessBlock: expected a rule error but got %s instead", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: invalidBlockGrandChild " +
"is too far in the future")
}

View File

@ -587,7 +587,7 @@ func (dag *BlockDAG) initDAGState() error {
// Attempt to accept the block.
block, err := dbFetchBlockByNode(dbTx, node)
isOrphan, delay, err := dag.ProcessBlock(block, BFWasStored)
isOrphan, isDelayed, err := dag.ProcessBlock(block, BFWasStored)
if err != nil {
log.Warnf("Block %s, which was not previously processed, "+
"failed to be accepted to the DAG: %s", node.hash, err)
@ -601,7 +601,7 @@ func (dag *BlockDAG) initDAGState() error {
"previously processed, turned out to be an orphan, which is "+
"impossible.", node.hash))
}
if delay != 0 {
if isDelayed {
return AssertError(fmt.Sprintf("Block %s, which was not "+
"previously processed, turned out to be delayed, which is "+
"impossible.", node.hash))

View File

@ -0,0 +1,73 @@
package blockdag
import (
"container/heap"
)
type baseDelayedBlocksHeap []*delayedBlock
func (h baseDelayedBlocksHeap) Len() int {
return len(h)
}
func (h baseDelayedBlocksHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *baseDelayedBlocksHeap) Push(x interface{}) {
*h = append(*h, x.(*delayedBlock))
}
func (h *baseDelayedBlocksHeap) Pop() interface{} {
oldHeap := *h
oldLength := len(oldHeap)
popped := oldHeap[oldLength-1]
*h = oldHeap[0 : oldLength-1]
return popped
}
func (h baseDelayedBlocksHeap) peek() interface{} {
if h.Len() > 0 {
return h[h.Len()-1]
}
return nil
}
func (h baseDelayedBlocksHeap) Less(i, j int) bool {
return h[j].processTime.After(h[i].processTime)
}
type delayedBlocksHeap struct {
baseDelayedBlocksHeap *baseDelayedBlocksHeap
impl heap.Interface
}
// newDelayedBlocksHeap initializes and returns a new delayedBlocksHeap
func newDelayedBlocksHeap() delayedBlocksHeap {
baseHeap := &baseDelayedBlocksHeap{}
h := delayedBlocksHeap{impl: baseHeap, baseDelayedBlocksHeap: baseHeap}
heap.Init(h.impl)
return h
}
// pop removes the block with lowest height from this heap and returns it
func (dbh delayedBlocksHeap) pop() *delayedBlock {
return heap.Pop(dbh.impl).(*delayedBlock)
}
// Push pushes the block onto the heap
func (dbh delayedBlocksHeap) Push(block *delayedBlock) {
heap.Push(dbh.impl, block)
}
// Len returns the length of this heap
func (dbh delayedBlocksHeap) Len() int {
return dbh.impl.Len()
}
// peek returns the topmost element in the queue without poping it
func (dbh delayedBlocksHeap) peek() *delayedBlock {
if dbh.baseDelayedBlocksHeap.peek() == nil {
return nil
}
return dbh.baseDelayedBlocksHeap.peek().(*delayedBlock)
}

View File

@ -54,11 +54,11 @@ func TestFinality(t *testing.T) {
}
block := util.NewBlock(msgBlock)
isOrphan, delay, err := dag.ProcessBlock(block, blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNoPoWCheck)
if err != nil {
return nil, err
}
if delay != 0 {
if isDelayed {
return nil, errors.Errorf("ProcessBlock: block " +
"is too far in the future")
}
@ -216,11 +216,11 @@ func TestChainedTransactions(t *testing.T) {
if err != nil {
t.Fatalf("PrepareBlockForTest: %v", err)
}
isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(block1), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(block1), blockdag.BFNoPoWCheck)
if err != nil {
t.Fatalf("ProcessBlock: %v", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block1 " +
"is too far in the future")
}
@ -266,7 +266,7 @@ func TestChainedTransactions(t *testing.T) {
}
//Checks that dag.ProcessBlock fails because we don't allow a transaction to spend another transaction from the same block
isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(block2), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(block2), blockdag.BFNoPoWCheck)
if err == nil {
t.Errorf("ProcessBlock expected an error")
} else if rErr, ok := err.(blockdag.RuleError); ok {
@ -276,7 +276,7 @@ func TestChainedTransactions(t *testing.T) {
} else {
t.Errorf("ProcessBlock expected a blockdag.RuleError but got %v", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block2 " +
"is too far in the future")
}
@ -301,11 +301,11 @@ func TestChainedTransactions(t *testing.T) {
}
//Checks that dag.ProcessBlock doesn't fail because all of its transaction are dependant on transactions from previous blocks
isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(block3), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(block3), blockdag.BFNoPoWCheck)
if err != nil {
t.Errorf("ProcessBlock: %v", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block3 " +
"is too far in the future")
}
@ -359,11 +359,11 @@ func TestOrderInDiffFromAcceptanceData(t *testing.T) {
// Add the block to the DAG
newBlock := util.NewBlock(msgBlock)
isOrphan, delay, err := dag.ProcessBlock(newBlock, blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(newBlock, blockdag.BFNoPoWCheck)
if err != nil {
t.Errorf("TestOrderInDiffFromAcceptanceData: %s", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("TestOrderInDiffFromAcceptanceData: block is too far in the future")
}
if isOrphan {
@ -412,11 +412,11 @@ func TestGasLimit(t *testing.T) {
if err != nil {
t.Fatalf("PrepareBlockForTest: %v", err)
}
isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(fundsBlock), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(fundsBlock), blockdag.BFNoPoWCheck)
if err != nil {
t.Fatalf("ProcessBlock: %v", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: the funds block " +
"is too far in the future")
}
@ -464,7 +464,7 @@ func TestGasLimit(t *testing.T) {
if err != nil {
t.Fatalf("PrepareBlockForTest: %v", err)
}
isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(overLimitBlock), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(overLimitBlock), blockdag.BFNoPoWCheck)
if err == nil {
t.Fatalf("ProcessBlock expected to have an error in block that exceeds gas limit")
}
@ -474,7 +474,7 @@ func TestGasLimit(t *testing.T) {
} else if rErr.ErrorCode != blockdag.ErrInvalidGas {
t.Fatalf("ProcessBlock expected error code %s but got %s", blockdag.ErrInvalidGas, rErr.ErrorCode)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: overLimitBlock " +
"is too far in the future")
}
@ -499,7 +499,7 @@ func TestGasLimit(t *testing.T) {
if err != nil {
t.Fatalf("PrepareBlockForTest: %v", err)
}
isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(overflowGasBlock), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(overflowGasBlock), blockdag.BFNoPoWCheck)
if err == nil {
t.Fatalf("ProcessBlock expected to have an error")
}
@ -532,7 +532,7 @@ func TestGasLimit(t *testing.T) {
}
// Here we check that we can't process a block with a transaction from a non-existent subnetwork
isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(nonExistentSubnetworkBlock), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(nonExistentSubnetworkBlock), blockdag.BFNoPoWCheck)
expectedErrStr := fmt.Sprintf("Error getting gas limit for subnetworkID '%s': subnetwork '%s' not found",
nonExistentSubnetwork, nonExistentSubnetwork)
if err.Error() != expectedErrStr {
@ -544,11 +544,11 @@ func TestGasLimit(t *testing.T) {
if err != nil {
t.Fatalf("PrepareBlockForTest: %v", err)
}
isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(validBlock), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(validBlock), blockdag.BFNoPoWCheck)
if err != nil {
t.Fatalf("ProcessBlock: %v", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: overLimitBlock " +
"is too far in the future")
}

View File

@ -116,11 +116,11 @@ func TestAcceptanceIndexRecover(t *testing.T) {
}
for i := 1; i < len(blocks)-2; i++ {
isOrphan, delay, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone)
isOrphan, isDelayed, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone)
if err != nil {
t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block %d "+
"is too far in the future", i)
}
@ -147,11 +147,11 @@ func TestAcceptanceIndexRecover(t *testing.T) {
}
for i := len(blocks) - 2; i < len(blocks); i++ {
isOrphan, delay, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone)
isOrphan, isDelayed, err := db1DAG.ProcessBlock(blocks[i], blockdag.BFNone)
if err != nil {
t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block %d "+
"is too far in the future", i)
}
@ -185,11 +185,11 @@ func TestAcceptanceIndexRecover(t *testing.T) {
}
for i := len(blocks) - 2; i < len(blocks); i++ {
isOrphan, delay, err := db2DAG.ProcessBlock(blocks[i], blockdag.BFNone)
isOrphan, isDelayed, err := db2DAG.ProcessBlock(blocks[i], blockdag.BFNone)
if err != nil {
t.Fatalf("ProcessBlock fail on block %v: %v\n", i, err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block %d "+
"is too far in the future", i)
}

View File

@ -63,11 +63,11 @@ func TestTxIndexConnectBlock(t *testing.T) {
}
utilBlock := util.NewBlock(block)
blocks[*block.BlockHash()] = utilBlock
isOrphan, delay, err := dag.ProcessBlock(utilBlock, blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(utilBlock, blockdag.BFNoPoWCheck)
if err != nil {
t.Fatalf("TestTxIndexConnectBlock: dag.ProcessBlock got unexpected error for block %v: %v", blockName, err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("TestTxIndexConnectBlock: block %s "+
"is too far in the future", blockName)
}

View File

@ -41,11 +41,11 @@ func TestNotifications(t *testing.T) {
dag.Subscribe(callback)
}
isOrphan, delay, err := dag.ProcessBlock(blocks[1], BFNone)
isOrphan, isDelayed, err := dag.ProcessBlock(blocks[1], BFNone)
if err != nil {
t.Fatalf("ProcessBlock fail on block 1: %v\n", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: block 1 " +
"is too far in the future")
}

View File

@ -134,11 +134,14 @@ func (dag *BlockDAG) processOrphans(hash *daghash.Hash, flags BehaviorFlags) err
// whether or not the block is an orphan.
//
// This function is safe for concurrent access.
func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrphan bool, delay time.Duration, err error) {
func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrphan bool, isDelayed bool, err error) {
dag.dagLock.Lock()
defer dag.dagLock.Unlock()
return dag.processBlockNoLock(block, flags)
}
isDelayedBlock := flags&BFAfterDelay == BFAfterDelay
func (dag *BlockDAG) processBlockNoLock(block *util.Block, flags BehaviorFlags) (isOrphan bool, isDelayed bool, err error) {
isAfterDelay := flags&BFAfterDelay == BFAfterDelay
wasBlockStored := flags&BFWasStored == BFWasStored
blockHash := block.Hash()
@ -147,36 +150,57 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrp
// The block must not already exist in the DAG.
if dag.BlockExists(blockHash) && !wasBlockStored {
str := fmt.Sprintf("already have block %s", blockHash)
return false, 0, ruleError(ErrDuplicateBlock, str)
return false, false, ruleError(ErrDuplicateBlock, str)
}
// The block must not already exist as an orphan.
if _, exists := dag.orphans[*blockHash]; exists {
str := fmt.Sprintf("already have block (orphan) %s", blockHash)
return false, 0, ruleError(ErrDuplicateBlock, str)
return false, false, ruleError(ErrDuplicateBlock, str)
}
if !isDelayedBlock {
if dag.isKnownDelayedBlock(blockHash) {
str := fmt.Sprintf("already have block (delayed) %s", blockHash)
return false, false, ruleError(ErrDuplicateBlock, str)
}
if !isAfterDelay {
// Perform preliminary sanity checks on the block and its transactions.
delay, err := dag.checkBlockSanity(block, flags)
if err != nil {
return false, 0, err
return false, false, err
}
if delay != 0 {
return false, delay, err
err = dag.addDelayedBlock(block, delay)
if err != nil {
return false, false, err
}
return false, true, nil
}
}
var missingParents []*daghash.Hash
for _, parentHash := range block.MsgBlock().Header.ParentHashes {
if !dag.BlockExists(parentHash) {
missingParents = append(missingParents, parentHash)
}
}
// Handle the case of a block with a valid timestamp(non-delayed) which points to a delayed block.
delay, isParentDelayed := dag.maxDelayOfParents(missingParents)
if isParentDelayed {
// Add Nanosecond to ensure that parent process time will be after its child.
delay += time.Nanosecond
err := dag.addDelayedBlock(block, delay)
if err != nil {
return false, false, err
}
return false, true, err
}
// Handle orphan blocks.
allParentsExist := true
for _, parentHash := range block.MsgBlock().Header.ParentHashes {
if !dag.BlockExists(parentHash) {
allParentsExist = false
}
}
if !allParentsExist {
if len(missingParents) > 0 {
// Some orphans during netsync are a normal part of the process, since the anticone
// of the chain-split is never explicitly requested.
// Therefore, if we are during netsync - don't report orphans to default logs.
@ -191,14 +215,14 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrp
}
dag.addOrphanBlock(block)
return true, 0, nil
return true, false, nil
}
// The block has passed all context independent checks and appears sane
// enough to potentially accept it into the block DAG.
err = dag.maybeAcceptBlock(block, flags)
if err != nil {
return false, 0, err
return false, false, err
}
// Accept any orphan blocks that depend on this block (they are
@ -206,10 +230,33 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (isOrp
// there are no more.
err = dag.processOrphans(blockHash, flags)
if err != nil {
return false, 0, err
return false, false, err
}
if !isAfterDelay {
err = dag.processDelayedBlocks()
if err != nil {
return false, false, err
}
}
log.Debugf("Accepted block %s", blockHash)
return false, 0, nil
return false, false, nil
}
// maxDelayOfParents returns the maximum delay of the given block hashes.
// Note that delay could be 0, but isDelayed will return true. This is the case where the parent process time is due.
func (dag *BlockDAG) maxDelayOfParents(parentHashes []*daghash.Hash) (delay time.Duration, isDelayed bool) {
for _, parentHash := range parentHashes {
if delayedParent, exists := dag.delayedBlocks[*parentHash]; exists {
isDelayed = true
parentDelay := delayedParent.processTime.Sub(dag.timeSource.AdjustedTime())
if parentDelay > delay {
delay = parentDelay
}
}
}
return delay, isDelayed
}

View File

@ -29,11 +29,11 @@ func TestProcessBlock(t *testing.T) {
})
defer guard.Unpatch()
isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(&Block100000), BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(&Block100000), BFNoPoWCheck)
if err != nil {
t.Errorf("ProcessBlock: %s", err)
}
if delay != 0 {
if isDelayed {
t.Errorf("ProcessBlock: block is too far in the future")
}
if !isOrphan {
@ -47,11 +47,11 @@ func TestProcessBlock(t *testing.T) {
// Change nonce to change block hash
Block100000Copy.Header.Nonce++
called = false
isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(&Block100000Copy), BFAfterDelay|BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(&Block100000Copy), BFAfterDelay|BFNoPoWCheck)
if err != nil {
t.Errorf("ProcessBlock: %s", err)
}
if delay != 0 {
if isDelayed {
t.Errorf("ProcessBlock: block is too far in the future")
}
if !isOrphan {
@ -61,7 +61,7 @@ func TestProcessBlock(t *testing.T) {
t.Errorf("ProcessBlock: Didn't expected checkBlockSanity to be called")
}
isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(dagconfig.SimNetParams.GenesisBlock), BFNone)
isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(dagconfig.SimNetParams.GenesisBlock), BFNone)
expectedErrMsg := fmt.Sprintf("already have block %s", dagconfig.SimNetParams.GenesisHash)
if err == nil || err.Error() != expectedErrMsg {
t.Errorf("ProcessBlock: Expected error \"%s\" but got \"%s\"", expectedErrMsg, err)
@ -97,11 +97,11 @@ func TestProcessOrphans(t *testing.T) {
childBlock.MsgBlock().Header.UTXOCommitment = &daghash.ZeroHash
// Process the child block so that it gets added to the orphan pool
isOrphan, delay, err := dag.ProcessBlock(childBlock, BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(childBlock, BFNoPoWCheck)
if err != nil {
t.Fatalf("TestProcessOrphans: child block unexpectedly returned an error: %s", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("TestProcessOrphans: child block is too far in the future")
}
if !isOrphan {
@ -109,11 +109,11 @@ func TestProcessOrphans(t *testing.T) {
}
// Process the parent block. Note that this will attempt to unorphan the child block
isOrphan, delay, err = dag.ProcessBlock(parentBlock, BFNone)
isOrphan, isDelayed, err = dag.ProcessBlock(parentBlock, BFNone)
if err != nil {
t.Fatalf("TestProcessOrphans: parent block unexpectedly returned an error: %s", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("TestProcessOrphans: parent block is too far in the future")
}
if isOrphan {

View File

@ -6,11 +6,12 @@ package blockdag
import (
"fmt"
"github.com/pkg/errors"
"math"
"sort"
"time"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/dagconfig"
"github.com/kaspanet/kaspad/txscript"
"github.com/kaspanet/kaspad/util"
@ -432,6 +433,8 @@ func (dag *BlockDAG) checkBlockHeaderSanity(header *wire.BlockHeader, flags Beha
// Ensure the block time is not too far in the future. If it's too far, return
// the duration of time that should be waited before the block becomes valid.
// This check needs to be last as it does not return an error but rather marks the
// header as delayed (and valid).
maxTimestamp := dag.timeSource.AdjustedTime().Add(time.Second *
time.Duration(int64(dag.TimestampDeviationTolerance)*dag.targetTimePerBlock))
if header.Timestamp.After(maxTimestamp) {

View File

@ -101,12 +101,12 @@ func TestCheckConnectBlockTemplate(t *testing.T) {
}
for i := 1; i <= 3; i++ {
_, delay, err := dag.ProcessBlock(blocks[i], BFNone)
_, isDelayed, err := dag.ProcessBlock(blocks[i], BFNone)
if err != nil {
t.Fatalf("CheckConnectBlockTemplate: Received unexpected error "+
"processing block %d: %v", i, err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("CheckConnectBlockTemplate: block %d is too far in the future", i)
}
}

View File

@ -116,12 +116,12 @@ func (bi *blockImporter) processBlock(serializedBlock []byte) (bool, error) {
}
// Ensure the blocks follows all of the DAG rules.
isOrphan, delay, err := bi.dag.ProcessBlock(block,
isOrphan, isDelayed, err := bi.dag.ProcessBlock(block,
blockdag.BFFastAdd)
if err != nil {
return false, err
}
if delay != 0 {
if isDelayed {
return false, errors.Errorf("import file contains a block that is too far in the future")
}
if isOrphan {

View File

@ -262,11 +262,11 @@ func (tc *testContext) mineTransactions(transactions []*util.Tx, numberOfBlocks
tc.t.Fatalf("PrepareBlockForTest: %s", err)
}
utilBlock := util.NewBlock(block)
isOrphan, delay, err := tc.harness.txPool.cfg.DAG.ProcessBlock(utilBlock, blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := tc.harness.txPool.cfg.DAG.ProcessBlock(utilBlock, blockdag.BFNoPoWCheck)
if err != nil {
tc.t.Fatalf("ProcessBlock: %s", err)
}
if delay != 0 {
if isDelayed {
tc.t.Fatalf("ProcessBlock: block %s "+
"is too far in the future", block.BlockHash())
}

View File

@ -58,12 +58,12 @@ func TestNewBlockTemplate(t *testing.T) {
t.Fatalf("NewBlockTemplate: %v", err)
}
isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(template1.Block), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(template1.Block), blockdag.BFNoPoWCheck)
if err != nil {
t.Fatalf("ProcessBlock: %v", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: template1 " +
"is too far in the future")
}
@ -79,11 +79,11 @@ func TestNewBlockTemplate(t *testing.T) {
if err != nil {
t.Fatalf("NewBlockTemplate: %v", err)
}
isOrphan, delay, err = dag.ProcessBlock(util.NewBlock(template.Block), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err = dag.ProcessBlock(util.NewBlock(template.Block), blockdag.BFNoPoWCheck)
if err != nil {
t.Fatalf("ProcessBlock: %v", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: template " +
"is too far in the future")
}

View File

@ -55,11 +55,11 @@ func TestSelectTxs(t *testing.T) {
if err != nil {
t.Fatalf("NewBlockTemplate: %v", err)
}
isOrphan, delay, err := dag.ProcessBlock(util.NewBlock(template.Block), blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(util.NewBlock(template.Block), blockdag.BFNoPoWCheck)
if err != nil {
t.Fatalf("ProcessBlock: %v", err)
}
if delay != 0 {
if isDelayed {
t.Fatalf("ProcessBlock: template " +
"is too far in the future")
}

View File

@ -439,7 +439,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
// Process the block to include validation, orphan handling, etc.
isOrphan, delay, err := sm.dag.ProcessBlock(bmsg.block, behaviorFlags)
isOrphan, isDelayed, err := sm.dag.ProcessBlock(bmsg.block, behaviorFlags)
// Remove block from request maps. Either DAG knows about it and
// so we shouldn't have any more instances of trying to fetch it, or
@ -473,10 +473,8 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
return
}
if delay != 0 {
spawn(func() {
sm.QueueBlock(bmsg.block, bmsg.peer, true, make(chan struct{}))
}, sm.handlePanic)
if isDelayed {
return
}
// Request the parents for the orphan block from the peer that sent it.
@ -869,7 +867,7 @@ out:
msg.reply <- peerID
case processBlockMsg:
isOrphan, delay, err := sm.dag.ProcessBlock(
isOrphan, isDelayed, err := sm.dag.ProcessBlock(
msg.block, msg.flags)
if err != nil {
msg.reply <- processBlockResponse{
@ -877,7 +875,7 @@ out:
err: err,
}
}
if delay != 0 {
if isDelayed {
msg.reply <- processBlockResponse{
isOrphan: false,
err: errors.New("Cannot process blocks from RPC beyond the allowed time offset"),

View File

@ -27,12 +27,12 @@ func RegisterSubnetworkForTest(dag *blockdag.BlockDAG, params *dagconfig.Params,
}
addBlockToDAG := func(block *util.Block) error {
isOrphan, delay, err := dag.ProcessBlock(block, blockdag.BFNoPoWCheck)
isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNoPoWCheck)
if err != nil {
return err
}
if delay != 0 {
if isDelayed {
return errors.Errorf("ProcessBlock: block is is too far in the future")
}