mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-03-30 15:08:33 +00:00
Various P2P V5 IBD fixes (#1976)
* The first message is expected to contain headers and not a "done" message (+comment and error text fixes) * Dequeue w/o timeout during pp anticone batch processing * Add a verification step for catching possible new IBD errors * Fetch missing bodies for both, syncer selected tip past and relay block past * Make sure the syncer is behaving correctly to avoid out of index errors * Make sure progress reporter does not exceed 100% * No orphan roots, so no need to queue the empty list * Add a log to report utxo fetch failure with err message * A duplicate blocks should not appear as a warning * typo
This commit is contained in:
parent
1e56a22b32
commit
4e44dd8510
@ -285,7 +285,10 @@ func (flow *handleRelayInvsFlow) processBlock(block *externalapi.DomainBlock) ([
|
||||
if errors.As(err, missingParentsError) {
|
||||
return missingParentsError.MissingParentHashes, nil, nil
|
||||
}
|
||||
log.Warnf("Rejected block %s from %s: %s", blockHash, flow.peer, err)
|
||||
// A duplicate block should not appear to the user as a warning and is already reported in the calling function
|
||||
if !errors.Is(err, ruleerrors.ErrDuplicateBlock) {
|
||||
log.Warnf("Rejected block %s from %s: %s", blockHash, flow.peer, err)
|
||||
}
|
||||
return nil, nil, protocolerrors.Wrapf(true, err, "got invalid block %s from relay", blockHash)
|
||||
}
|
||||
return nil, virtualChangeSet, nil
|
||||
@ -396,6 +399,10 @@ func (flow *handleRelayInvsFlow) AddOrphanRootsToQueue(orphan *externalapi.Domai
|
||||
"probably happened because it was randomly evicted immediately after it was added.", orphan)
|
||||
}
|
||||
|
||||
if len(orphanRoots) == 0 {
|
||||
// In some rare cases we get here when there are no orphan roots already
|
||||
return nil
|
||||
}
|
||||
log.Infof("Block %s has %d missing ancestors. Adding them to the invs queue...", orphan, len(orphanRoots))
|
||||
|
||||
invMessages := make([]*appmessage.MsgInvRelayBlock, len(orphanRoots))
|
||||
|
@ -344,6 +344,7 @@ func (flow *handleIBDFlow) syncPruningPointUTXOSet(consensus externalapi.Consens
|
||||
log.Info("Fetching the pruning point UTXO set")
|
||||
isSuccessful, err := flow.fetchMissingUTXOSet(consensus, pruningPoint)
|
||||
if err != nil {
|
||||
log.Infof("An error occurred while fetching the pruning point UTXO set. Stopping IBD. (%s)", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,6 @@ package blockrelay
|
||||
|
||||
import (
|
||||
"github.com/kaspanet/kaspad/app/appmessage"
|
||||
"github.com/kaspanet/kaspad/app/protocol/common"
|
||||
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
|
||||
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
|
||||
"github.com/kaspanet/kaspad/domain"
|
||||
@ -131,7 +130,9 @@ func HandlePruningPointAndItsAnticoneRequests(context PruningPointAndItsAnticone
|
||||
}
|
||||
|
||||
if (i+1)%ibdBatchSize == 0 {
|
||||
message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
|
||||
// No timeout here, as we don't care if the syncee takes its time computing,
|
||||
// since it only blocks this dedicated flow
|
||||
message, err := incomingRoute.Dequeue()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -285,7 +285,10 @@ func (flow *handleRelayInvsFlow) processBlock(block *externalapi.DomainBlock) ([
|
||||
if errors.As(err, missingParentsError) {
|
||||
return missingParentsError.MissingParentHashes, nil, nil
|
||||
}
|
||||
log.Warnf("Rejected block %s from %s: %s", blockHash, flow.peer, err)
|
||||
// A duplicate block should not appear to the user as a warning and is already reported in the calling function
|
||||
if !errors.Is(err, ruleerrors.ErrDuplicateBlock) {
|
||||
log.Warnf("Rejected block %s from %s: %s", blockHash, flow.peer, err)
|
||||
}
|
||||
return nil, nil, protocolerrors.Wrapf(true, err, "got invalid block %s from relay", blockHash)
|
||||
}
|
||||
return nil, virtualChangeSet, nil
|
||||
@ -396,6 +399,10 @@ func (flow *handleRelayInvsFlow) AddOrphanRootsToQueue(orphan *externalapi.Domai
|
||||
"probably happened because it was randomly evicted immediately after it was added.", orphan)
|
||||
}
|
||||
|
||||
if len(orphanRoots) == 0 {
|
||||
// In some rare cases we get here when there are no orphan roots already
|
||||
return nil
|
||||
}
|
||||
log.Infof("Block %s has %d missing ancestors. Adding them to the invs queue...", orphan, len(orphanRoots))
|
||||
|
||||
invMessages := make([]*appmessage.MsgInvRelayBlock, len(orphanRoots))
|
||||
|
@ -46,6 +46,15 @@ func (flow *handleRequestHeadersFlow) start() error {
|
||||
}
|
||||
log.Debugf("Recieved requestHeaders with lowHash: %s, highHash: %s", lowHash, highHash)
|
||||
|
||||
isLowSelectedAncestorOfHigh, err := flow.Domain().Consensus().IsInSelectedParentChainOf(lowHash, highHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !isLowSelectedAncestorOfHigh {
|
||||
return protocolerrors.Errorf(true, "Expected %s to be on the selected chain of %s",
|
||||
lowHash, highHash)
|
||||
}
|
||||
|
||||
for !lowHash.Equal(highHash) {
|
||||
log.Debugf("Getting block headers between %s and %s to %s", lowHash, highHash, flow.peer)
|
||||
|
||||
|
@ -194,7 +194,6 @@ func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) er
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: need DAA score of syncerHeaderSelectedTipHash
|
||||
err = flow.syncPruningPointFutureHeaders(
|
||||
flow.Domain().Consensus(),
|
||||
syncerHeaderSelectedTipHash, highestKnownSyncerChainHash, relayBlockHash, block.Header.DAAScore())
|
||||
@ -203,10 +202,25 @@ func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) er
|
||||
}
|
||||
}
|
||||
|
||||
err = flow.syncMissingBlockBodies(relayBlockHash)
|
||||
// We start by syncing missing bodies over the syncer selected chain
|
||||
err = flow.syncMissingBlockBodies(syncerHeaderSelectedTipHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
relayBlockInfo, err := flow.Domain().Consensus().GetBlockInfo(relayBlockHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Relay block might be in the anticone of syncer selected tip, thus
|
||||
// check his chain for missing bodies as well.
|
||||
// Note: this operation can be slightly optimized to avoid the full chain search since relay block
|
||||
// is in syncer virtual mergeset which has bounded size.
|
||||
if relayBlockInfo.BlockStatus == externalapi.StatusHeaderOnly {
|
||||
err = flow.syncMissingBlockBodies(relayBlockHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("Finished syncing blocks up to %s", relayBlockHash)
|
||||
isFinishedSuccessfully = true
|
||||
@ -253,7 +267,7 @@ func (flow *handleIBDFlow) getSyncerChainBlockLocator(
|
||||
|
||||
func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.Consensus,
|
||||
syncerHeaderSelectedTipHash, highestKnownSyncerChainHash, relayBlockHash *externalapi.DomainHash,
|
||||
highBlockDAAScore uint64) error {
|
||||
highBlockDAAScoreHint uint64) error {
|
||||
|
||||
log.Infof("Downloading headers from %s", flow.peer)
|
||||
|
||||
@ -266,7 +280,7 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
progressReporter := newIBDProgressReporter(highestSharedBlockHeader.DAAScore(), highBlockDAAScore, "block headers")
|
||||
progressReporter := newIBDProgressReporter(highestSharedBlockHeader.DAAScore(), highBlockDAAScoreHint, "block headers")
|
||||
|
||||
// Keep a short queue of BlockHeadersMessages so that there's
|
||||
// never a moment when the node is not validating and inserting
|
||||
@ -284,6 +298,11 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C
|
||||
close(blockHeadersMessageChan)
|
||||
return
|
||||
}
|
||||
if len(blockHeadersMessage.BlockHeaders) == 0 {
|
||||
// The syncer should have sent a done message if the search completed, and not an empty list
|
||||
errChan <- protocolerrors.Errorf(true, "Received an empty headers message from peer %s", flow.peer)
|
||||
return
|
||||
}
|
||||
|
||||
blockHeadersMessageChan <- blockHeadersMessage
|
||||
|
||||
@ -306,22 +325,31 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C
|
||||
return err
|
||||
}
|
||||
if !relayBlockInfo.Exists {
|
||||
// Send a special header request for the past diff. This is expected to be a small,
|
||||
// as it is bounded to the size of virtual's mergeset
|
||||
// Send a special header request for the selected tip anticone. This is expected to
|
||||
// be a small set, as it is bounded to the size of virtual's mergeset.
|
||||
err = flow.sendRequestAnticone(syncerHeaderSelectedTipHash, relayBlockHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pastDiffHeadersMessage, pastDiffDone, err := flow.receiveHeaders()
|
||||
anticoneHeadersMessage, anticoneDone, err := flow.receiveHeaders()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !pastDiffDone {
|
||||
if anticoneDone {
|
||||
return protocolerrors.Errorf(true,
|
||||
"Expected only one past diff header chunk for past(%s) setminus past(%s)",
|
||||
syncerHeaderSelectedTipHash, relayBlockHash)
|
||||
"Expected one anticone header chunk for past(%s) cap anticone(%s) but got zero",
|
||||
relayBlockHash, syncerHeaderSelectedTipHash)
|
||||
}
|
||||
for _, header := range pastDiffHeadersMessage.BlockHeaders {
|
||||
_, anticoneDone, err = flow.receiveHeaders()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !anticoneDone {
|
||||
return protocolerrors.Errorf(true,
|
||||
"Expected only one anticone header chunk for past(%s) cap anticone(%s)",
|
||||
relayBlockHash, syncerHeaderSelectedTipHash)
|
||||
}
|
||||
for _, header := range anticoneHeadersMessage.BlockHeaders {
|
||||
err = flow.processHeader(consensus, header)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -336,7 +364,7 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C
|
||||
}
|
||||
if !relayBlockInfo.Exists {
|
||||
return protocolerrors.Errorf(true, "did not receive "+
|
||||
"highHash block %s from peer %s during block download", relayBlockHash, flow.peer)
|
||||
"relayBlockHash block %s from peer %s during block download", relayBlockHash, flow.peer)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -23,6 +23,11 @@ func newIBDProgressReporter(lowDAAScore uint64, highDAAScore uint64, objectName
|
||||
func (ipr *ibdProgressReporter) reportProgress(processedDelta int, highestProcessedDAAScore uint64) {
|
||||
ipr.processed += processedDelta
|
||||
|
||||
// Avoid exploding numbers in the percentage report, since the original `highDAAScore` might have been only a hint
|
||||
if highestProcessedDAAScore > ipr.highDAAScore {
|
||||
ipr.highDAAScore = highestProcessedDAAScore + 1 // + 1 for keeping it at 99%
|
||||
ipr.totalDAAScoreDifference = ipr.highDAAScore - ipr.lowDAAScore
|
||||
}
|
||||
relativeDAAScore := highestProcessedDAAScore - ipr.lowDAAScore
|
||||
progressPercent := int((float64(relativeDAAScore) / float64(ipr.totalDAAScoreDifference)) * 100)
|
||||
if progressPercent > ipr.lastReportedProgressPercent {
|
||||
|
@ -378,6 +378,7 @@ func (flow *handleIBDFlow) syncPruningPointUTXOSet(consensus externalapi.Consens
|
||||
log.Info("Fetching the pruning point UTXO set")
|
||||
isSuccessful, err := flow.fetchMissingUTXOSet(consensus, pruningPoint)
|
||||
if err != nil {
|
||||
log.Infof("An error occurred while fetching the pruning point UTXO set. Stopping IBD. (%s)", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user