When the pruning point moves, update its UTXO set outside of a database transaction (#1444)

* Remove pruningPointUTXOSetStaging and implement UpdatePruningPointUTXOSet.

* Implement StageStartSavingNewPruningPointUTXOSet, HadStartedSavingNewPruningPointUTXOSet, and FinishSavingNewPruningPointUTXOSet.

* Fix a bad return.

* Implement UpdatePruningPointUTXOSetIfRequired.

* Call UpdatePruningPointUTXOSetIfRequired on consensus creation.

* Rename savingNewPruningPointUTXOSetKey to updatingPruningPointUTXOSet.

* Add a log.

* Add calls to runtime.GC() at its start and end.

* Rename a variable.

* Replace calls to runtime.GC to calls to LogMemoryStats.

* Wrap the contents of LogMemoryStats in a log closure.
This commit is contained in:
stasatdaglabs 2021-01-24 14:48:11 +02:00 committed by GitHub
parent 9a17198e7d
commit ca04c049ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 144 additions and 51 deletions

View File

@ -11,6 +11,7 @@ import (
var pruningBlockHashKey = database.MakeBucket(nil).Key([]byte("pruning-block-hash"))
var candidatePruningPointHashKey = database.MakeBucket(nil).Key([]byte("candidate-pruning-point-hash"))
var pruningPointUTXOSetBucket = database.MakeBucket([]byte("pruning-point-utxo-set"))
var updatingPruningPointUTXOSetKey = database.MakeBucket(nil).Key([]byte("updating-pruning-point-utxo-set"))
// pruningStore represents a store for the current pruning state
type pruningStore struct {
@ -19,7 +20,7 @@ type pruningStore struct {
pruningPointCandidateStaging *externalapi.DomainHash
pruningPointCandidateCache *externalapi.DomainHash
pruningPointUTXOSetStaging model.ReadOnlyUTXOSetIterator
startUpdatingPruningPointUTXOSetStaging bool
}
// New instantiates a new PruningStore
@ -70,17 +71,13 @@ func (ps *pruningStore) StagePruningPoint(pruningPointBlockHash *externalapi.Dom
ps.pruningPointStaging = pruningPointBlockHash
}
func (ps *pruningStore) StagePruningPointUTXOSet(pruningPointUTXOSetIterator model.ReadOnlyUTXOSetIterator) {
ps.pruningPointUTXOSetStaging = pruningPointUTXOSetIterator
}
func (ps *pruningStore) IsStaged() bool {
return ps.pruningPointStaging != nil || ps.pruningPointUTXOSetStaging != nil
return ps.pruningPointStaging != nil || ps.startUpdatingPruningPointUTXOSetStaging
}
func (ps *pruningStore) Discard() {
ps.pruningPointStaging = nil
ps.pruningPointUTXOSetStaging = nil
ps.startUpdatingPruningPointUTXOSetStaging = false
}
func (ps *pruningStore) Commit(dbTx model.DBTransaction) error {
@ -108,49 +105,60 @@ func (ps *pruningStore) Commit(dbTx model.DBTransaction) error {
ps.pruningPointCandidateCache = ps.pruningPointCandidateStaging
}
if ps.pruningPointUTXOSetStaging != nil {
// Delete all the old UTXOs from the database
deleteCursor, err := dbTx.Cursor(pruningPointUTXOSetBucket)
if ps.startUpdatingPruningPointUTXOSetStaging {
err := dbTx.Put(updatingPruningPointUTXOSetKey, []byte{0})
if err != nil {
return err
}
for ok := deleteCursor.First(); ok; ok = deleteCursor.Next() {
key, err := deleteCursor.Key()
if err != nil {
return err
}
err = dbTx.Delete(key)
if err != nil {
return err
}
}
// Insert all the new UTXOs into the database
for ok := ps.pruningPointUTXOSetStaging.First(); ok; ok = ps.pruningPointUTXOSetStaging.Next() {
outpoint, entry, err := ps.pruningPointUTXOSetStaging.Get()
if err != nil {
return err
}
serializedOutpoint, err := serializeOutpoint(outpoint)
if err != nil {
return err
}
key := pruningPointUTXOSetBucket.Key(serializedOutpoint)
serializedUTXOEntry, err := serializeUTXOEntry(entry)
if err != nil {
return err
}
err = dbTx.Put(key, serializedUTXOEntry)
if err != nil {
return err
}
}
}
ps.Discard()
return nil
}
func (ps *pruningStore) UpdatePruningPointUTXOSet(dbContext model.DBWriter,
utxoSetIterator model.ReadOnlyUTXOSetIterator) error {
// Delete all the old UTXOs from the database
deleteCursor, err := dbContext.Cursor(pruningPointUTXOSetBucket)
if err != nil {
return err
}
for ok := deleteCursor.First(); ok; ok = deleteCursor.Next() {
key, err := deleteCursor.Key()
if err != nil {
return err
}
err = dbContext.Delete(key)
if err != nil {
return err
}
}
// Insert all the new UTXOs into the database
for ok := utxoSetIterator.First(); ok; ok = utxoSetIterator.Next() {
outpoint, entry, err := utxoSetIterator.Get()
if err != nil {
return err
}
serializedOutpoint, err := serializeOutpoint(outpoint)
if err != nil {
return err
}
key := pruningPointUTXOSetBucket.Key(serializedOutpoint)
serializedUTXOEntry, err := serializeUTXOEntry(entry)
if err != nil {
return err
}
err = dbContext.Put(key, serializedUTXOEntry)
if err != nil {
return err
}
}
return nil
}
// PruningPoint gets the current pruning point
func (ps *pruningStore) PruningPoint(dbContext model.DBReader) (*externalapi.DomainHash, error) {
if ps.pruningPointStaging != nil {
@ -241,3 +249,15 @@ func (ps *pruningStore) PruningPointUTXOs(dbContext model.DBReader,
}
return outpointAndUTXOEntryPairs, nil
}
func (ps *pruningStore) StageStartUpdatingPruningPointUTXOSet() {
ps.startUpdatingPruningPointUTXOSetStaging = true
}
func (ps *pruningStore) HadStartedUpdatingPruningPointUTXOSet(dbContext model.DBWriter) (bool, error) {
return dbContext.Has(updatingPruningPointUTXOSetKey)
}
func (ps *pruningStore) FinishUpdatingPruningPointUTXOSet(dbContext model.DBWriter) error {
return dbContext.Delete(updatingPruningPointUTXOSetKey)
}

View File

@ -365,6 +365,10 @@ func (f *factory) NewConsensus(dagParams *dagconfig.Params, db infrastructuredat
if err != nil {
return nil, err
}
err = pruningManager.UpdatePruningPointUTXOSetIfRequired()
if err != nil {
return nil, err
}
return c, nil
}

View File

@ -6,13 +6,18 @@ import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
type PruningStore interface {
Store
StagePruningPoint(pruningPointBlockHash *externalapi.DomainHash)
StagePruningPointUTXOSet(pruningPointUTXOSetIterator ReadOnlyUTXOSetIterator)
StagePruningPointCandidate(candidate *externalapi.DomainHash)
IsStaged() bool
PruningPointCandidate(dbContext DBReader) (*externalapi.DomainHash, error)
HasPruningPointCandidate(dbContext DBReader) (bool, error)
PruningPoint(dbContext DBReader) (*externalapi.DomainHash, error)
HasPruningPoint(dbContext DBReader) (bool, error)
StageStartUpdatingPruningPointUTXOSet()
HadStartedUpdatingPruningPointUTXOSet(dbContext DBWriter) (bool, error)
FinishUpdatingPruningPointUTXOSet(dbContext DBWriter) error
UpdatePruningPointUTXOSet(dbContext DBWriter, utxoSetIterator ReadOnlyUTXOSetIterator) error
ClearImportedPruningPointUTXOs(dbContext DBWriter) error
AppendImportedPruningPointUTXOs(dbTx DBTransaction, outpointAndUTXOEntryPairs []*externalapi.OutpointAndUTXOEntryPair) error
ImportedPruningPointUTXOIterator(dbContext DBReader) (ReadOnlyUTXOSetIterator, error)

View File

@ -8,4 +8,5 @@ type PruningManager interface {
IsValidPruningPoint(blockHash *externalapi.DomainHash) (bool, error)
ClearImportedPruningPointData() error
AppendImportedPruningPointUTXOs(outpointAndUTXOEntryPairs []*externalapi.OutpointAndUTXOEntryPair) error
UpdatePruningPointUTXOSetIfRequired() error
}

View File

@ -117,6 +117,11 @@ func (bp *blockProcessor) validateAndInsertBlock(block *externalapi.DomainBlock,
return nil, err
}
err = bp.pruningManager.UpdatePruningPointUTXOSetIfRequired()
if err != nil {
return nil, err
}
log.Debug(logger.NewLogClosure(func() string {
hashrate := difficulty.GetHashrateString(difficulty.CompactToBig(block.Header.Bits()), bp.targetTimePerBlock)
return fmt.Sprintf("Block %s validated and inserted, network hashrate: %s", blockHash, hashrate)

View File

@ -311,20 +311,15 @@ func (pm *pruningManager) savePruningPoint(pruningPointHash *externalapi.DomainH
onEnd := logger.LogAndMeasureExecutionTime(log, "pruningManager.savePruningPoint")
defer onEnd()
utxoSetIterator, err := pm.consensusStateManager.RestorePastUTXOSetIterator(pruningPointHash)
if err != nil {
return err
}
// TODO: This is an assert that takes ~30 seconds to run
// It must be removed or optimized before launching testnet
err = pm.validateUTXOSetFitsCommitment(utxoSetIterator, pruningPointHash)
err := pm.validateUTXOSetFitsCommitment(pruningPointHash)
if err != nil {
return err
}
pm.pruningStore.StagePruningPoint(pruningPointHash)
pm.pruningStore.StagePruningPointUTXOSet(utxoSetIterator)
pm.pruningStore.StageStartUpdatingPruningPointUTXOSet()
return nil
}
@ -417,8 +412,11 @@ func (pm *pruningManager) pruningPointCandidate() (*externalapi.DomainHash, erro
// validateUTXOSetFitsCommitment makes sure that the calculated UTXOSet of the new pruning point fits the commitment.
// This is a sanity test, to make sure that kaspad doesn't store, and subsequently sends syncing peers the wrong UTXOSet.
func (pm *pruningManager) validateUTXOSetFitsCommitment(utxoSetIterator model.ReadOnlyUTXOSetIterator,
pruningPointHash *externalapi.DomainHash) error {
func (pm *pruningManager) validateUTXOSetFitsCommitment(pruningPointHash *externalapi.DomainHash) error {
utxoSetIterator, err := pm.consensusStateManager.RestorePastUTXOSetIterator(pruningPointHash)
if err != nil {
return err
}
utxoSetMultiset := multiset.New()
for ok := utxoSetIterator.First(); ok; ok = utxoSetIterator.Next() {
@ -500,3 +498,51 @@ func (pm *pruningManager) AppendImportedPruningPointUTXOs(
return dbTx.Commit()
}
func (pm *pruningManager) UpdatePruningPointUTXOSetIfRequired() error {
hadStartedUpdatingPruningPointUTXOSet, err := pm.pruningStore.HadStartedUpdatingPruningPointUTXOSet(pm.databaseContext)
if err != nil {
return err
}
if !hadStartedUpdatingPruningPointUTXOSet {
return nil
}
log.Debugf("Pruning point UTXO set update is required")
err = pm.updatePruningPointUTXOSet()
if err != nil {
return err
}
log.Debugf("Pruning point UTXO set updated")
return nil
}
func (pm *pruningManager) updatePruningPointUTXOSet() error {
onEnd := logger.LogAndMeasureExecutionTime(log, "updatePruningPointUTXOSet")
defer onEnd()
logger.LogMemoryStats(log, "updatePruningPointUTXOSet start")
defer logger.LogMemoryStats(log, "updatePruningPointUTXOSet end")
log.Debugf("Getting the pruning point")
pruningPoint, err := pm.pruningStore.PruningPoint(pm.databaseContext)
if err != nil {
return err
}
log.Debugf("Restoring the pruning point UTXO set")
utxoSetIterator, err := pm.consensusStateManager.RestorePastUTXOSetIterator(pruningPoint)
if err != nil {
return err
}
log.Debugf("Updating the pruning point UTXO set")
err = pm.pruningStore.UpdatePruningPointUTXOSet(pm.databaseContext, utxoSetIterator)
if err != nil {
return err
}
log.Debugf("Finishing updating the pruning point UTXO set")
return pm.pruningStore.FinishUpdatingPruningPointUTXOSet(pm.databaseContext)
}

View File

@ -1,6 +1,8 @@
package logger
import (
"fmt"
"runtime"
"time"
)
@ -15,3 +17,13 @@ func LogAndMeasureExecutionTime(log *Logger, functionName string) (onEnd func())
log.Debugf("%s end. Took: %s", functionName, time.Since(start))
}
}
// LogMemoryStats logs memory stats for `functionName`
func LogMemoryStats(log *Logger, functionName string) {
log.Debug(NewLogClosure(func() string {
stats := runtime.MemStats{}
runtime.ReadMemStats(&stats)
return fmt.Sprintf("%s: used memory: %d bytes, total: %d bytes", functionName,
stats.Alloc, stats.HeapIdle-stats.HeapReleased+stats.HeapInuse)
}))
}