[NOD-1538] Implement a simple orphan pool (#1093)

* [NOD-1538] Implement a simple orphan pool.

* [NOD-1538] Connect the orphan pool to the appropriate flows.

* [NOD-1538] Make UnorphanBlocks actually unorphan blocks.

* [NOD-1538] Fix logs.

* [NOD-1538] Make unorphaned blocks call LogBlock.

* [NOD-1538] Fix a log and some bad names.

* [NOD-1538] Don't return an error from LogBlock.

* [NOD-1538] Pass a pointer to hash in findChildOrphansOfBlock.

* [NOD-1538] Extract addChildOrphansToProcessQueue to a separate function.
This commit is contained in:
stasatdaglabs 2020-11-17 14:44:34 +02:00 committed by Svarog
parent 213be67c47
commit 9d5d1b02dc
5 changed files with 132 additions and 9 deletions

View File

@ -22,7 +22,7 @@ var (
// LogBlock logs a new block blue score as an information message
// to show progress to the user. In order to prevent spam, it limits logging to
// one message every 10 seconds with duration and totals included.
func LogBlock(block *externalapi.DomainBlock) error {
func LogBlock(block *externalapi.DomainBlock) {
mtx.Lock()
defer mtx.Unlock()
@ -32,7 +32,7 @@ func LogBlock(block *externalapi.DomainBlock) error {
now := mstime.Now()
duration := now.Sub(lastBlockLogTime)
if duration < time.Second*10 {
return nil
return
}
// Truncate the duration to 10s of milliseconds.
@ -55,5 +55,4 @@ func LogBlock(block *externalapi.DomainBlock) error {
receivedLogBlocks = 0
receivedLogTx = 0
lastBlockLogTime = now
return nil
}

View File

@ -15,17 +15,22 @@ import (
// relays newly unorphaned transactions and possibly rebroadcast
// manually added transactions when not in IBD.
func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock) error {
err := blocklogger.LogBlock(block)
unorphanedBlocks, err := f.UnorphanBlocks(block)
if err != nil {
return err
}
f.Domain().MiningManager().HandleNewBlockTransactions(block.Transactions)
newBlocks := append([]*externalapi.DomainBlock{block}, unorphanedBlocks...)
for _, newBlock := range newBlocks {
blocklogger.LogBlock(block)
if f.onBlockAddedToDAGHandler != nil {
err := f.onBlockAddedToDAGHandler(block)
if err != nil {
return err
f.Domain().MiningManager().HandleNewBlockTransactions(newBlock.Transactions)
if f.onBlockAddedToDAGHandler != nil {
err := f.onBlockAddedToDAGHandler(newBlock)
if err != nil {
return err
}
}
}

View File

@ -51,6 +51,9 @@ type FlowContext struct {
peers map[id.ID]*peerpkg.Peer
peersMutex sync.RWMutex
orphans map[externalapi.DomainHash]*externalapi.DomainBlock
orphansMutex sync.Mutex
}
// New returns a new instance of FlowContext.
@ -67,6 +70,7 @@ func New(cfg *config.Config, domain domain.Domain, addressManager *addressmanage
sharedRequestedBlocks: blockrelay.NewSharedRequestedBlocks(),
peers: make(map[id.ID]*peerpkg.Peer),
transactionsToRebroadcast: make(map[externalapi.DomainTransactionID]*externalapi.DomainTransaction),
orphans: make(map[externalapi.DomainHash]*externalapi.DomainBlock),
}
}

View File

@ -0,0 +1,111 @@
package flowcontext
import (
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/consensusserialization"
"github.com/pkg/errors"
)
func (f *FlowContext) AddOrphan(orphanBlock *externalapi.DomainBlock) {
f.orphansMutex.Lock()
defer f.orphansMutex.Unlock()
orphanHash := consensusserialization.BlockHash(orphanBlock)
f.orphans[*orphanHash] = orphanBlock
}
func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*externalapi.DomainBlock, error) {
f.orphansMutex.Lock()
defer f.orphansMutex.Unlock()
// Find all the children of rootBlock among the orphans
// and add them to the process queue
rootBlockHash := consensusserialization.BlockHash(rootBlock)
processQueue := f.addChildOrphansToProcessQueue(rootBlockHash, []externalapi.DomainHash{})
var unorphanedBlocks []*externalapi.DomainBlock
for len(processQueue) > 0 {
var orphanHash externalapi.DomainHash
orphanHash, processQueue = processQueue[0], processQueue[1:]
orphanBlock := f.orphans[orphanHash]
log.Tracef("Considering to unorphan block %s with parents %s",
orphanHash, orphanBlock.Header.ParentHashes)
canBeUnorphaned := true
for _, orphanBlockParentHash := range orphanBlock.Header.ParentHashes {
orphanBlockParentInfo, err := f.domain.Consensus().GetBlockInfo(orphanBlockParentHash)
if err != nil {
return nil, err
}
if !orphanBlockParentInfo.Exists {
log.Tracef("Cannot unorphan block %s. It's missing at "+
"least the following parent: %s", orphanHash, orphanBlockParentHash)
canBeUnorphaned = false
break
}
}
if canBeUnorphaned {
err := f.unorphanBlock(orphanHash)
if err != nil {
return nil, err
}
unorphanedBlocks = append(unorphanedBlocks, orphanBlock)
processQueue = f.addChildOrphansToProcessQueue(&orphanHash, processQueue)
}
}
return unorphanedBlocks, nil
}
// addChildOrphansToProcessQueue finds all child orphans of `blockHash`
// and adds them to the given `processQueue` if they don't already exist
// inside of it
// Note that this method does not modify the given `processQueue`
func (f *FlowContext) addChildOrphansToProcessQueue(blockHash *externalapi.DomainHash,
processQueue []externalapi.DomainHash) []externalapi.DomainHash {
blockChildren := f.findChildOrphansOfBlock(blockHash)
for _, blockChild := range blockChildren {
exists := false
for _, queueOrphan := range processQueue {
if queueOrphan == blockChild {
exists = true
break
}
}
if !exists {
processQueue = append(processQueue, blockChild)
}
}
return processQueue
}
func (f *FlowContext) findChildOrphansOfBlock(blockHash *externalapi.DomainHash) []externalapi.DomainHash {
var childOrphans []externalapi.DomainHash
for orphanHash, orphanBlock := range f.orphans {
for _, orphanBlockParentHash := range orphanBlock.Header.ParentHashes {
if *orphanBlockParentHash == *blockHash {
childOrphans = append(childOrphans, orphanHash)
break
}
}
}
return childOrphans
}
func (f *FlowContext) unorphanBlock(orphanHash externalapi.DomainHash) error {
orphanBlock, ok := f.orphans[orphanHash]
if !ok {
return errors.Errorf("attempted to unorphan a non-orphan block %s", orphanHash)
}
err := f.domain.Consensus().ValidateAndInsertBlock(orphanBlock)
if err != nil {
return err
}
delete(f.orphans, orphanHash)
log.Debugf("Unorphaned block %s", orphanHash)
return nil
}

View File

@ -25,6 +25,7 @@ type RelayInvsContext interface {
StartIBDIfRequired() error
IsInIBD() bool
Broadcast(message appmessage.Message) error
AddOrphan(orphanBlock *externalapi.DomainBlock)
}
type handleRelayInvsFlow struct {
@ -229,6 +230,9 @@ func (flow *handleRelayInvsFlow) processAndRelayBlock(requestQueue *hashesQueueS
return nil
}
// Add the orphan to the orphan pool
flow.AddOrphan(block)
// Request the parents for the orphan block from the peer that sent it.
for _, missingAncestor := range missingParentsError.MissingParentHashes {
requestQueue.enqueueIfNotExists(missingAncestor)