diff --git a/app/protocol/flows/v3/blockrelay/ibd.go b/app/protocol/flows/v3/blockrelay/ibd.go index 9f76d82e3..098eff5bc 100644 --- a/app/protocol/flows/v3/blockrelay/ibd.go +++ b/app/protocol/flows/v3/blockrelay/ibd.go @@ -97,7 +97,7 @@ func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) er if shouldDownloadHeadersProof { log.Infof("Starting IBD with headers proof") - err := flow.ibdWithHeadersProof(highHash) + err := flow.ibdWithHeadersProof(highHash, block.Header.DAAScore()) if err != nil { return err } @@ -115,7 +115,7 @@ func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) er } } - err = flow.syncPruningPointFutureHeaders(flow.Domain().Consensus(), highestSharedBlockHash, highHash) + err = flow.syncPruningPointFutureHeaders(flow.Domain().Consensus(), highestSharedBlockHash, highHash, block.Header.DAAScore()) if err != nil { return err } @@ -266,7 +266,7 @@ func (flow *handleIBDFlow) fetchHighestHash( } func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.Consensus, highestSharedBlockHash *externalapi.DomainHash, - highHash *externalapi.DomainHash) error { + highHash *externalapi.DomainHash, highBlockDAAScore uint64) error { log.Infof("Downloading headers from %s", flow.peer) @@ -275,6 +275,12 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C return err } + highestSharedBlockHeader, err := consensus.GetBlockHeader(highestSharedBlockHash) + if err != nil { + return err + } + progressReporter := newIBDProgressReporter(highestSharedBlockHeader.DAAScore(), highBlockDAAScore, "block headers") + // Keep a short queue of BlockHeadersMessages so that there's // never a moment when the node is not validating and inserting // headers @@ -323,6 +329,9 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C return err } } + + lastReceivedHeader := ibdBlocksMessage.BlockHeaders[len(ibdBlocksMessage.BlockHeaders)-1] + progressReporter.reportProgress(len(ibdBlocksMessage.BlockHeaders), lastReceivedHeader.DAAScore) case err := <-errChan: return err } @@ -491,6 +500,17 @@ func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHa return nil } + lowBlockHeader, err := flow.Domain().Consensus().GetBlockHeader(hashes[0]) + if err != nil { + return err + } + highBlockHeader, err := flow.Domain().Consensus().GetBlockHeader(hashes[len(hashes)-1]) + if err != nil { + return err + } + progressReporter := newIBDProgressReporter(lowBlockHeader.DAAScore(), highBlockHeader.DAAScore(), "blocks") + highestProcessedDAAScore := lowBlockHeader.DAAScore() + for offset := 0; offset < len(hashes); offset += ibdBatchSize { var hashesToRequest []*externalapi.DomainHash if offset+ibdBatchSize < len(hashes) { @@ -539,7 +559,11 @@ func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHa if err != nil { return err } + + highestProcessedDAAScore = block.Header.DAAScore() } + + progressReporter.reportProgress(len(hashesToRequest), highestProcessedDAAScore) } return flow.resolveVirtual() diff --git a/app/protocol/flows/v3/blockrelay/ibd_progress_reporter.go b/app/protocol/flows/v3/blockrelay/ibd_progress_reporter.go new file mode 100644 index 000000000..acb8d8209 --- /dev/null +++ b/app/protocol/flows/v3/blockrelay/ibd_progress_reporter.go @@ -0,0 +1,32 @@ +package blockrelay + +type ibdProgressReporter struct { + lowDAAScore uint64 + highDAAScore uint64 + objectName string + totalDAAScoreDifference uint64 + lastReportedProgressPercent int + processed int +} + +func newIBDProgressReporter(lowDAAScore uint64, highDAAScore uint64, objectName string) *ibdProgressReporter { + return &ibdProgressReporter{ + lowDAAScore: lowDAAScore, + highDAAScore: highDAAScore, + objectName: objectName, + totalDAAScoreDifference: highDAAScore - lowDAAScore, + lastReportedProgressPercent: 0, + processed: 0, + } +} + +func (ipr *ibdProgressReporter) reportProgress(processedDelta int, highestProcessedDAAScore uint64) { + ipr.processed += processedDelta + + relativeDAAScore := highestProcessedDAAScore - ipr.lowDAAScore + progressPercent := int((float64(relativeDAAScore) / float64(ipr.totalDAAScoreDifference)) * 100) + if progressPercent > ipr.lastReportedProgressPercent { + log.Infof("IBD: Processed %d %s (%d%%)", ipr.processed, ipr.objectName, progressPercent) + ipr.lastReportedProgressPercent = progressPercent + } +} diff --git a/app/protocol/flows/v3/blockrelay/ibd_with_headers_proof.go b/app/protocol/flows/v3/blockrelay/ibd_with_headers_proof.go index e0b0d300f..7cb2d7597 100644 --- a/app/protocol/flows/v3/blockrelay/ibd_with_headers_proof.go +++ b/app/protocol/flows/v3/blockrelay/ibd_with_headers_proof.go @@ -11,13 +11,13 @@ import ( "github.com/pkg/errors" ) -func (flow *handleIBDFlow) ibdWithHeadersProof(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) ibdWithHeadersProof(highHash *externalapi.DomainHash, highBlockDAAScore uint64) error { err := flow.Domain().InitStagingConsensus() if err != nil { return err } - err = flow.downloadHeadersAndPruningUTXOSet(highHash) + err = flow.downloadHeadersAndPruningUTXOSet(highHash, highBlockDAAScore) if err != nil { if !flow.IsRecoverableError(err) { return err @@ -113,7 +113,7 @@ func (flow *handleIBDFlow) syncAndValidatePruningPointProof() (*externalapi.Doma return consensushashing.HeaderHash(pruningPointProof.Headers[0][len(pruningPointProof.Headers[0])-1]), nil } -func (flow *handleIBDFlow) downloadHeadersAndPruningUTXOSet(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) downloadHeadersAndPruningUTXOSet(highHash *externalapi.DomainHash, highBlockDAAScore uint64) error { proofPruningPoint, err := flow.syncAndValidatePruningPointProof() if err != nil { return err @@ -130,7 +130,7 @@ func (flow *handleIBDFlow) downloadHeadersAndPruningUTXOSet(highHash *externalap return protocolerrors.Errorf(true, "the genesis pruning point violates finality") } - err = flow.syncPruningPointFutureHeaders(flow.Domain().StagingConsensus(), proofPruningPoint, highHash) + err = flow.syncPruningPointFutureHeaders(flow.Domain().StagingConsensus(), proofPruningPoint, highHash, highBlockDAAScore) if err != nil { return err } diff --git a/app/protocol/flows/v4/blockrelay/ibd.go b/app/protocol/flows/v4/blockrelay/ibd.go index 9f76d82e3..098eff5bc 100644 --- a/app/protocol/flows/v4/blockrelay/ibd.go +++ b/app/protocol/flows/v4/blockrelay/ibd.go @@ -97,7 +97,7 @@ func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) er if shouldDownloadHeadersProof { log.Infof("Starting IBD with headers proof") - err := flow.ibdWithHeadersProof(highHash) + err := flow.ibdWithHeadersProof(highHash, block.Header.DAAScore()) if err != nil { return err } @@ -115,7 +115,7 @@ func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) er } } - err = flow.syncPruningPointFutureHeaders(flow.Domain().Consensus(), highestSharedBlockHash, highHash) + err = flow.syncPruningPointFutureHeaders(flow.Domain().Consensus(), highestSharedBlockHash, highHash, block.Header.DAAScore()) if err != nil { return err } @@ -266,7 +266,7 @@ func (flow *handleIBDFlow) fetchHighestHash( } func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.Consensus, highestSharedBlockHash *externalapi.DomainHash, - highHash *externalapi.DomainHash) error { + highHash *externalapi.DomainHash, highBlockDAAScore uint64) error { log.Infof("Downloading headers from %s", flow.peer) @@ -275,6 +275,12 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C return err } + highestSharedBlockHeader, err := consensus.GetBlockHeader(highestSharedBlockHash) + if err != nil { + return err + } + progressReporter := newIBDProgressReporter(highestSharedBlockHeader.DAAScore(), highBlockDAAScore, "block headers") + // Keep a short queue of BlockHeadersMessages so that there's // never a moment when the node is not validating and inserting // headers @@ -323,6 +329,9 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C return err } } + + lastReceivedHeader := ibdBlocksMessage.BlockHeaders[len(ibdBlocksMessage.BlockHeaders)-1] + progressReporter.reportProgress(len(ibdBlocksMessage.BlockHeaders), lastReceivedHeader.DAAScore) case err := <-errChan: return err } @@ -491,6 +500,17 @@ func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHa return nil } + lowBlockHeader, err := flow.Domain().Consensus().GetBlockHeader(hashes[0]) + if err != nil { + return err + } + highBlockHeader, err := flow.Domain().Consensus().GetBlockHeader(hashes[len(hashes)-1]) + if err != nil { + return err + } + progressReporter := newIBDProgressReporter(lowBlockHeader.DAAScore(), highBlockHeader.DAAScore(), "blocks") + highestProcessedDAAScore := lowBlockHeader.DAAScore() + for offset := 0; offset < len(hashes); offset += ibdBatchSize { var hashesToRequest []*externalapi.DomainHash if offset+ibdBatchSize < len(hashes) { @@ -539,7 +559,11 @@ func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHa if err != nil { return err } + + highestProcessedDAAScore = block.Header.DAAScore() } + + progressReporter.reportProgress(len(hashesToRequest), highestProcessedDAAScore) } return flow.resolveVirtual() diff --git a/app/protocol/flows/v4/blockrelay/ibd_progress_reporter.go b/app/protocol/flows/v4/blockrelay/ibd_progress_reporter.go new file mode 100644 index 000000000..acb8d8209 --- /dev/null +++ b/app/protocol/flows/v4/blockrelay/ibd_progress_reporter.go @@ -0,0 +1,32 @@ +package blockrelay + +type ibdProgressReporter struct { + lowDAAScore uint64 + highDAAScore uint64 + objectName string + totalDAAScoreDifference uint64 + lastReportedProgressPercent int + processed int +} + +func newIBDProgressReporter(lowDAAScore uint64, highDAAScore uint64, objectName string) *ibdProgressReporter { + return &ibdProgressReporter{ + lowDAAScore: lowDAAScore, + highDAAScore: highDAAScore, + objectName: objectName, + totalDAAScoreDifference: highDAAScore - lowDAAScore, + lastReportedProgressPercent: 0, + processed: 0, + } +} + +func (ipr *ibdProgressReporter) reportProgress(processedDelta int, highestProcessedDAAScore uint64) { + ipr.processed += processedDelta + + relativeDAAScore := highestProcessedDAAScore - ipr.lowDAAScore + progressPercent := int((float64(relativeDAAScore) / float64(ipr.totalDAAScoreDifference)) * 100) + if progressPercent > ipr.lastReportedProgressPercent { + log.Infof("IBD: Processed %d %s (%d%%)", ipr.processed, ipr.objectName, progressPercent) + ipr.lastReportedProgressPercent = progressPercent + } +} diff --git a/app/protocol/flows/v4/blockrelay/ibd_with_headers_proof.go b/app/protocol/flows/v4/blockrelay/ibd_with_headers_proof.go index e2f42a47a..85a766356 100644 --- a/app/protocol/flows/v4/blockrelay/ibd_with_headers_proof.go +++ b/app/protocol/flows/v4/blockrelay/ibd_with_headers_proof.go @@ -11,13 +11,13 @@ import ( "github.com/pkg/errors" ) -func (flow *handleIBDFlow) ibdWithHeadersProof(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) ibdWithHeadersProof(highHash *externalapi.DomainHash, highBlockDAAScore uint64) error { err := flow.Domain().InitStagingConsensus() if err != nil { return err } - err = flow.downloadHeadersAndPruningUTXOSet(highHash) + err = flow.downloadHeadersAndPruningUTXOSet(highHash, highBlockDAAScore) if err != nil { if !flow.IsRecoverableError(err) { return err @@ -113,7 +113,7 @@ func (flow *handleIBDFlow) syncAndValidatePruningPointProof() (*externalapi.Doma return consensushashing.HeaderHash(pruningPointProof.Headers[0][len(pruningPointProof.Headers[0])-1]), nil } -func (flow *handleIBDFlow) downloadHeadersAndPruningUTXOSet(highHash *externalapi.DomainHash) error { +func (flow *handleIBDFlow) downloadHeadersAndPruningUTXOSet(highHash *externalapi.DomainHash, highBlockDAAScore uint64) error { proofPruningPoint, err := flow.syncAndValidatePruningPointProof() if err != nil { return err @@ -130,7 +130,7 @@ func (flow *handleIBDFlow) downloadHeadersAndPruningUTXOSet(highHash *externalap return protocolerrors.Errorf(true, "the genesis pruning point violates finality") } - err = flow.syncPruningPointFutureHeaders(flow.Domain().StagingConsensus(), proofPruningPoint, highHash) + err = flow.syncPruningPointFutureHeaders(flow.Domain().StagingConsensus(), proofPruningPoint, highHash, highBlockDAAScore) if err != nil { return err }