From ca04c049abca322f3bd41b7a746a4ab9f14b0972 Mon Sep 17 00:00:00 2001 From: stasatdaglabs <39559713+stasatdaglabs@users.noreply.github.com> Date: Sun, 24 Jan 2021 14:48:11 +0200 Subject: [PATCH] When the pruning point moves, update its UTXO set outside of a database transaction (#1444) * Remove pruningPointUTXOSetStaging and implement UpdatePruningPointUTXOSet. * Implement StageStartSavingNewPruningPointUTXOSet, HadStartedSavingNewPruningPointUTXOSet, and FinishSavingNewPruningPointUTXOSet. * Fix a bad return. * Implement UpdatePruningPointUTXOSetIfRequired. * Call UpdatePruningPointUTXOSetIfRequired on consensus creation. * Rename savingNewPruningPointUTXOSetKey to updatingPruningPointUTXOSet. * Add a log. * Add calls to runtime.GC() at its start and end. * Rename a variable. * Replace calls to runtime.GC to calls to LogMemoryStats. * Wrap the contents of LogMemoryStats in a log closure. --- .../pruningstore/pruningstore.go | 102 +++++++++++------- domain/consensus/factory.go | 4 + .../interface_datastructures_pruningstore.go | 7 +- .../interface_processes_pruningmanager.go | 1 + .../blockprocessor/validateandinsertblock.go | 5 + .../pruningmanager/pruningmanager.go | 64 +++++++++-- infrastructure/logger/utils.go | 12 +++ 7 files changed, 144 insertions(+), 51 deletions(-) diff --git a/domain/consensus/datastructures/pruningstore/pruningstore.go b/domain/consensus/datastructures/pruningstore/pruningstore.go index aa0e1367f..6cb29cbc7 100644 --- a/domain/consensus/datastructures/pruningstore/pruningstore.go +++ b/domain/consensus/datastructures/pruningstore/pruningstore.go @@ -11,6 +11,7 @@ import ( var pruningBlockHashKey = database.MakeBucket(nil).Key([]byte("pruning-block-hash")) var candidatePruningPointHashKey = database.MakeBucket(nil).Key([]byte("candidate-pruning-point-hash")) var pruningPointUTXOSetBucket = database.MakeBucket([]byte("pruning-point-utxo-set")) +var updatingPruningPointUTXOSetKey = database.MakeBucket(nil).Key([]byte("updating-pruning-point-utxo-set")) // pruningStore represents a store for the current pruning state type pruningStore struct { @@ -19,7 +20,7 @@ type pruningStore struct { pruningPointCandidateStaging *externalapi.DomainHash pruningPointCandidateCache *externalapi.DomainHash - pruningPointUTXOSetStaging model.ReadOnlyUTXOSetIterator + startUpdatingPruningPointUTXOSetStaging bool } // New instantiates a new PruningStore @@ -70,17 +71,13 @@ func (ps *pruningStore) StagePruningPoint(pruningPointBlockHash *externalapi.Dom ps.pruningPointStaging = pruningPointBlockHash } -func (ps *pruningStore) StagePruningPointUTXOSet(pruningPointUTXOSetIterator model.ReadOnlyUTXOSetIterator) { - ps.pruningPointUTXOSetStaging = pruningPointUTXOSetIterator -} - func (ps *pruningStore) IsStaged() bool { - return ps.pruningPointStaging != nil || ps.pruningPointUTXOSetStaging != nil + return ps.pruningPointStaging != nil || ps.startUpdatingPruningPointUTXOSetStaging } func (ps *pruningStore) Discard() { ps.pruningPointStaging = nil - ps.pruningPointUTXOSetStaging = nil + ps.startUpdatingPruningPointUTXOSetStaging = false } func (ps *pruningStore) Commit(dbTx model.DBTransaction) error { @@ -108,49 +105,60 @@ func (ps *pruningStore) Commit(dbTx model.DBTransaction) error { ps.pruningPointCandidateCache = ps.pruningPointCandidateStaging } - if ps.pruningPointUTXOSetStaging != nil { - // Delete all the old UTXOs from the database - deleteCursor, err := dbTx.Cursor(pruningPointUTXOSetBucket) + if ps.startUpdatingPruningPointUTXOSetStaging { + err := dbTx.Put(updatingPruningPointUTXOSetKey, []byte{0}) if err != nil { return err } - for ok := deleteCursor.First(); ok; ok = deleteCursor.Next() { - key, err := deleteCursor.Key() - if err != nil { - return err - } - err = dbTx.Delete(key) - if err != nil { - return err - } - } - - // Insert all the new UTXOs into the database - for ok := ps.pruningPointUTXOSetStaging.First(); ok; ok = ps.pruningPointUTXOSetStaging.Next() { - outpoint, entry, err := ps.pruningPointUTXOSetStaging.Get() - if err != nil { - return err - } - serializedOutpoint, err := serializeOutpoint(outpoint) - if err != nil { - return err - } - key := pruningPointUTXOSetBucket.Key(serializedOutpoint) - serializedUTXOEntry, err := serializeUTXOEntry(entry) - if err != nil { - return err - } - err = dbTx.Put(key, serializedUTXOEntry) - if err != nil { - return err - } - } } ps.Discard() return nil } +func (ps *pruningStore) UpdatePruningPointUTXOSet(dbContext model.DBWriter, + utxoSetIterator model.ReadOnlyUTXOSetIterator) error { + + // Delete all the old UTXOs from the database + deleteCursor, err := dbContext.Cursor(pruningPointUTXOSetBucket) + if err != nil { + return err + } + for ok := deleteCursor.First(); ok; ok = deleteCursor.Next() { + key, err := deleteCursor.Key() + if err != nil { + return err + } + err = dbContext.Delete(key) + if err != nil { + return err + } + } + + // Insert all the new UTXOs into the database + for ok := utxoSetIterator.First(); ok; ok = utxoSetIterator.Next() { + outpoint, entry, err := utxoSetIterator.Get() + if err != nil { + return err + } + serializedOutpoint, err := serializeOutpoint(outpoint) + if err != nil { + return err + } + key := pruningPointUTXOSetBucket.Key(serializedOutpoint) + serializedUTXOEntry, err := serializeUTXOEntry(entry) + if err != nil { + return err + } + err = dbContext.Put(key, serializedUTXOEntry) + if err != nil { + return err + } + } + + return nil +} + // PruningPoint gets the current pruning point func (ps *pruningStore) PruningPoint(dbContext model.DBReader) (*externalapi.DomainHash, error) { if ps.pruningPointStaging != nil { @@ -241,3 +249,15 @@ func (ps *pruningStore) PruningPointUTXOs(dbContext model.DBReader, } return outpointAndUTXOEntryPairs, nil } + +func (ps *pruningStore) StageStartUpdatingPruningPointUTXOSet() { + ps.startUpdatingPruningPointUTXOSetStaging = true +} + +func (ps *pruningStore) HadStartedUpdatingPruningPointUTXOSet(dbContext model.DBWriter) (bool, error) { + return dbContext.Has(updatingPruningPointUTXOSetKey) +} + +func (ps *pruningStore) FinishUpdatingPruningPointUTXOSet(dbContext model.DBWriter) error { + return dbContext.Delete(updatingPruningPointUTXOSetKey) +} diff --git a/domain/consensus/factory.go b/domain/consensus/factory.go index 993464989..96a72c928 100644 --- a/domain/consensus/factory.go +++ b/domain/consensus/factory.go @@ -365,6 +365,10 @@ func (f *factory) NewConsensus(dagParams *dagconfig.Params, db infrastructuredat if err != nil { return nil, err } + err = pruningManager.UpdatePruningPointUTXOSetIfRequired() + if err != nil { + return nil, err + } return c, nil } diff --git a/domain/consensus/model/interface_datastructures_pruningstore.go b/domain/consensus/model/interface_datastructures_pruningstore.go index e8149e975..6fce5098d 100644 --- a/domain/consensus/model/interface_datastructures_pruningstore.go +++ b/domain/consensus/model/interface_datastructures_pruningstore.go @@ -6,13 +6,18 @@ import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" type PruningStore interface { Store StagePruningPoint(pruningPointBlockHash *externalapi.DomainHash) - StagePruningPointUTXOSet(pruningPointUTXOSetIterator ReadOnlyUTXOSetIterator) StagePruningPointCandidate(candidate *externalapi.DomainHash) IsStaged() bool PruningPointCandidate(dbContext DBReader) (*externalapi.DomainHash, error) HasPruningPointCandidate(dbContext DBReader) (bool, error) PruningPoint(dbContext DBReader) (*externalapi.DomainHash, error) HasPruningPoint(dbContext DBReader) (bool, error) + + StageStartUpdatingPruningPointUTXOSet() + HadStartedUpdatingPruningPointUTXOSet(dbContext DBWriter) (bool, error) + FinishUpdatingPruningPointUTXOSet(dbContext DBWriter) error + UpdatePruningPointUTXOSet(dbContext DBWriter, utxoSetIterator ReadOnlyUTXOSetIterator) error + ClearImportedPruningPointUTXOs(dbContext DBWriter) error AppendImportedPruningPointUTXOs(dbTx DBTransaction, outpointAndUTXOEntryPairs []*externalapi.OutpointAndUTXOEntryPair) error ImportedPruningPointUTXOIterator(dbContext DBReader) (ReadOnlyUTXOSetIterator, error) diff --git a/domain/consensus/model/interface_processes_pruningmanager.go b/domain/consensus/model/interface_processes_pruningmanager.go index effefcbfe..75d907f82 100644 --- a/domain/consensus/model/interface_processes_pruningmanager.go +++ b/domain/consensus/model/interface_processes_pruningmanager.go @@ -8,4 +8,5 @@ type PruningManager interface { IsValidPruningPoint(blockHash *externalapi.DomainHash) (bool, error) ClearImportedPruningPointData() error AppendImportedPruningPointUTXOs(outpointAndUTXOEntryPairs []*externalapi.OutpointAndUTXOEntryPair) error + UpdatePruningPointUTXOSetIfRequired() error } diff --git a/domain/consensus/processes/blockprocessor/validateandinsertblock.go b/domain/consensus/processes/blockprocessor/validateandinsertblock.go index 7b232e43b..528826ee6 100644 --- a/domain/consensus/processes/blockprocessor/validateandinsertblock.go +++ b/domain/consensus/processes/blockprocessor/validateandinsertblock.go @@ -117,6 +117,11 @@ func (bp *blockProcessor) validateAndInsertBlock(block *externalapi.DomainBlock, return nil, err } + err = bp.pruningManager.UpdatePruningPointUTXOSetIfRequired() + if err != nil { + return nil, err + } + log.Debug(logger.NewLogClosure(func() string { hashrate := difficulty.GetHashrateString(difficulty.CompactToBig(block.Header.Bits()), bp.targetTimePerBlock) return fmt.Sprintf("Block %s validated and inserted, network hashrate: %s", blockHash, hashrate) diff --git a/domain/consensus/processes/pruningmanager/pruningmanager.go b/domain/consensus/processes/pruningmanager/pruningmanager.go index 6479a2810..ab789b682 100644 --- a/domain/consensus/processes/pruningmanager/pruningmanager.go +++ b/domain/consensus/processes/pruningmanager/pruningmanager.go @@ -311,20 +311,15 @@ func (pm *pruningManager) savePruningPoint(pruningPointHash *externalapi.DomainH onEnd := logger.LogAndMeasureExecutionTime(log, "pruningManager.savePruningPoint") defer onEnd() - utxoSetIterator, err := pm.consensusStateManager.RestorePastUTXOSetIterator(pruningPointHash) - if err != nil { - return err - } - // TODO: This is an assert that takes ~30 seconds to run // It must be removed or optimized before launching testnet - err = pm.validateUTXOSetFitsCommitment(utxoSetIterator, pruningPointHash) + err := pm.validateUTXOSetFitsCommitment(pruningPointHash) if err != nil { return err } pm.pruningStore.StagePruningPoint(pruningPointHash) - pm.pruningStore.StagePruningPointUTXOSet(utxoSetIterator) + pm.pruningStore.StageStartUpdatingPruningPointUTXOSet() return nil } @@ -417,8 +412,11 @@ func (pm *pruningManager) pruningPointCandidate() (*externalapi.DomainHash, erro // validateUTXOSetFitsCommitment makes sure that the calculated UTXOSet of the new pruning point fits the commitment. // This is a sanity test, to make sure that kaspad doesn't store, and subsequently sends syncing peers the wrong UTXOSet. -func (pm *pruningManager) validateUTXOSetFitsCommitment(utxoSetIterator model.ReadOnlyUTXOSetIterator, - pruningPointHash *externalapi.DomainHash) error { +func (pm *pruningManager) validateUTXOSetFitsCommitment(pruningPointHash *externalapi.DomainHash) error { + utxoSetIterator, err := pm.consensusStateManager.RestorePastUTXOSetIterator(pruningPointHash) + if err != nil { + return err + } utxoSetMultiset := multiset.New() for ok := utxoSetIterator.First(); ok; ok = utxoSetIterator.Next() { @@ -500,3 +498,51 @@ func (pm *pruningManager) AppendImportedPruningPointUTXOs( return dbTx.Commit() } + +func (pm *pruningManager) UpdatePruningPointUTXOSetIfRequired() error { + hadStartedUpdatingPruningPointUTXOSet, err := pm.pruningStore.HadStartedUpdatingPruningPointUTXOSet(pm.databaseContext) + if err != nil { + return err + } + if !hadStartedUpdatingPruningPointUTXOSet { + return nil + } + + log.Debugf("Pruning point UTXO set update is required") + err = pm.updatePruningPointUTXOSet() + if err != nil { + return err + } + log.Debugf("Pruning point UTXO set updated") + + return nil +} + +func (pm *pruningManager) updatePruningPointUTXOSet() error { + onEnd := logger.LogAndMeasureExecutionTime(log, "updatePruningPointUTXOSet") + defer onEnd() + + logger.LogMemoryStats(log, "updatePruningPointUTXOSet start") + defer logger.LogMemoryStats(log, "updatePruningPointUTXOSet end") + + log.Debugf("Getting the pruning point") + pruningPoint, err := pm.pruningStore.PruningPoint(pm.databaseContext) + if err != nil { + return err + } + + log.Debugf("Restoring the pruning point UTXO set") + utxoSetIterator, err := pm.consensusStateManager.RestorePastUTXOSetIterator(pruningPoint) + if err != nil { + return err + } + + log.Debugf("Updating the pruning point UTXO set") + err = pm.pruningStore.UpdatePruningPointUTXOSet(pm.databaseContext, utxoSetIterator) + if err != nil { + return err + } + + log.Debugf("Finishing updating the pruning point UTXO set") + return pm.pruningStore.FinishUpdatingPruningPointUTXOSet(pm.databaseContext) +} diff --git a/infrastructure/logger/utils.go b/infrastructure/logger/utils.go index c383a00a8..1ba315d55 100644 --- a/infrastructure/logger/utils.go +++ b/infrastructure/logger/utils.go @@ -1,6 +1,8 @@ package logger import ( + "fmt" + "runtime" "time" ) @@ -15,3 +17,13 @@ func LogAndMeasureExecutionTime(log *Logger, functionName string) (onEnd func()) log.Debugf("%s end. Took: %s", functionName, time.Since(start)) } } + +// LogMemoryStats logs memory stats for `functionName` +func LogMemoryStats(log *Logger, functionName string) { + log.Debug(NewLogClosure(func() string { + stats := runtime.MemStats{} + runtime.ReadMemStats(&stats) + return fmt.Sprintf("%s: used memory: %d bytes, total: %d bytes", functionName, + stats.Alloc, stats.HeapIdle-stats.HeapReleased+stats.HeapInuse) + })) +}