mirror of
https://github.com/kaspanet/kaspad.git
synced 2026-02-24 12:23:22 +00:00
Add recoverability for UTXO index (#1342)
* Add recoverability for UTXO index * Add comment * Rename UTXOOutpointPair->OutpointUTXOPair * Get rid of the db transaction on resetStore and collect all keys before deleting * Use VirtualSelectedParent instead of selected tip * Fix error
This commit is contained in:
@@ -96,7 +96,10 @@ func NewComponentManager(cfg *config.Config, db infrastructuredatabase.Database,
|
||||
|
||||
var utxoIndex *utxoindex.UTXOIndex
|
||||
if cfg.UTXOIndex {
|
||||
utxoIndex = utxoindex.New(domain.Consensus(), db)
|
||||
utxoIndex, err = utxoindex.New(domain.Consensus(), db, cfg.ActiveNetParams.GenesisHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Infof("UTXO index started")
|
||||
}
|
||||
|
||||
|
||||
@@ -372,3 +372,25 @@ func (s *consensus) GetHeadersSelectedTip() (*externalapi.DomainHash, error) {
|
||||
|
||||
return s.headersSelectedTipStore.HeadersSelectedTip(s.databaseContext)
|
||||
}
|
||||
|
||||
func (s *consensus) GetVirtualUTXOSet() ([]*externalapi.OutpointUTXOPair, error) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
iterator, err := s.consensusStateStore.VirtualUTXOSetIterator(s.databaseContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pairs := make([]*externalapi.OutpointUTXOPair, 0)
|
||||
for iterator.Next() {
|
||||
outpoint, entry, err := iterator.Get()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pairs = append(pairs, &externalapi.OutpointUTXOPair{Outpoint: outpoint, Entry: entry})
|
||||
}
|
||||
|
||||
return pairs, nil
|
||||
}
|
||||
|
||||
@@ -26,4 +26,6 @@ type Consensus interface {
|
||||
GetVirtualSelectedParentChainFromBlock(blockHash *DomainHash) (*SelectedParentChainChanges, error)
|
||||
IsInSelectedParentChainOf(blockHashA *DomainHash, blockHashB *DomainHash) (bool, error)
|
||||
GetHeadersSelectedTip() (*DomainHash, error)
|
||||
|
||||
GetVirtualUTXOSet() ([]*OutpointUTXOPair, error)
|
||||
}
|
||||
|
||||
7
domain/consensus/model/externalapi/utxo.go
Normal file
7
domain/consensus/model/externalapi/utxo.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package externalapi
|
||||
|
||||
// OutpointUTXOPair is a pair of outpoint and UTXO entry
|
||||
type OutpointUTXOPair struct {
|
||||
Outpoint *DomainOutpoint
|
||||
Entry UTXOEntry
|
||||
}
|
||||
@@ -10,11 +10,13 @@ import (
|
||||
)
|
||||
|
||||
var utxoIndexBucket = database.MakeBucket([]byte("utxo-index"))
|
||||
var utxoIndexLastVirtualSelectedParentKey = database.MakeBucket().Key([]byte("utxo-index-last-virtual-selected-parent"))
|
||||
|
||||
type utxoIndexStore struct {
|
||||
database database.Database
|
||||
toAdd map[ScriptPublicKeyString]UTXOOutpointEntryPairs
|
||||
toRemove map[ScriptPublicKeyString]UTXOOutpoints
|
||||
database database.Database
|
||||
toAdd map[ScriptPublicKeyString]UTXOOutpointEntryPairs
|
||||
toRemove map[ScriptPublicKeyString]UTXOOutpoints
|
||||
virtualSelectedParent *externalapi.DomainHash
|
||||
}
|
||||
|
||||
func newUTXOIndexStore(database database.Database) *utxoIndexStore {
|
||||
@@ -96,6 +98,7 @@ func (uis *utxoIndexStore) remove(scriptPublicKey []byte, outpoint *externalapi.
|
||||
func (uis *utxoIndexStore) discard() {
|
||||
uis.toAdd = make(map[ScriptPublicKeyString]UTXOOutpointEntryPairs)
|
||||
uis.toRemove = make(map[ScriptPublicKeyString]UTXOOutpoints)
|
||||
uis.virtualSelectedParent = nil
|
||||
}
|
||||
|
||||
func (uis *utxoIndexStore) commit() error {
|
||||
@@ -142,6 +145,11 @@ func (uis *utxoIndexStore) commit() error {
|
||||
}
|
||||
}
|
||||
|
||||
err = dbTransaction.Put(utxoIndexLastVirtualSelectedParentKey, uis.virtualSelectedParent.ByteSlice())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = dbTransaction.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -222,7 +230,7 @@ func (uis *utxoIndexStore) stagedData() (
|
||||
}
|
||||
|
||||
func (uis *utxoIndexStore) getUTXOOutpointEntryPairs(scriptPublicKey []byte) (UTXOOutpointEntryPairs, error) {
|
||||
if len(uis.toAdd) > 0 || len(uis.toRemove) > 0 {
|
||||
if uis.isAnythingStaged() {
|
||||
return nil, errors.Errorf("cannot get utxo outpoint entry pairs while staging isn't empty")
|
||||
}
|
||||
|
||||
@@ -253,3 +261,94 @@ func (uis *utxoIndexStore) getUTXOOutpointEntryPairs(scriptPublicKey []byte) (UT
|
||||
}
|
||||
return utxoOutpointEntryPairs, nil
|
||||
}
|
||||
|
||||
func (uis *utxoIndexStore) getLastVirtualSelectedParent() (*externalapi.DomainHash, bool, error) {
|
||||
if uis.isAnythingStaged() {
|
||||
return nil, false, errors.Errorf("cannot get last virtual selected parent while staging isn't empty")
|
||||
}
|
||||
|
||||
hasLastVirtualSelectedParent, err := uis.database.Has(utxoIndexLastVirtualSelectedParentKey)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if !hasLastVirtualSelectedParent {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
lastVirtualSelectedParentBytes, err := uis.database.Get(utxoIndexLastVirtualSelectedParentKey)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
lastVirtualSelectedParent, err := externalapi.NewDomainHashFromByteSlice(lastVirtualSelectedParentBytes)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return lastVirtualSelectedParent, true, nil
|
||||
}
|
||||
|
||||
func (uis *utxoIndexStore) isAnythingStaged() bool {
|
||||
return len(uis.toAdd) > 0 || len(uis.toRemove) > 0 || uis.virtualSelectedParent != nil
|
||||
}
|
||||
|
||||
func (uis *utxoIndexStore) replaceUTXOSet(utxoSet []*externalapi.OutpointUTXOPair,
|
||||
virtualSelectedParent *externalapi.DomainHash) error {
|
||||
|
||||
onEnd := logger.LogAndMeasureExecutionTime(log, "utxoIndexStore.replaceUTXOSet")
|
||||
defer onEnd()
|
||||
|
||||
if uis.isAnythingStaged() {
|
||||
return errors.Errorf("cannot replace utxo set while something is staged")
|
||||
}
|
||||
|
||||
err := uis.resetStore()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uis.virtualSelectedParent = virtualSelectedParent
|
||||
for _, pair := range utxoSet {
|
||||
err := uis.add(pair.Entry.ScriptPublicKey(), pair.Outpoint, pair.Entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return uis.commit()
|
||||
}
|
||||
|
||||
func (uis *utxoIndexStore) resetStore() error {
|
||||
onEnd := logger.LogAndMeasureExecutionTime(log, "utxoIndexStore.resetStore")
|
||||
defer onEnd()
|
||||
|
||||
cursor, err := uis.database.Cursor(utxoIndexBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keysToDelete := make([]*database.Key, 0)
|
||||
for cursor.Next() {
|
||||
key, err := cursor.Key()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keysToDelete = append(keysToDelete, key)
|
||||
}
|
||||
|
||||
for _, key := range keysToDelete {
|
||||
err = uis.database.Delete(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = uis.database.Delete(utxoIndexLastVirtualSelectedParentKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -7,25 +7,80 @@ import (
|
||||
"github.com/kaspanet/kaspad/domain/consensus/utils/utxo"
|
||||
"github.com/kaspanet/kaspad/infrastructure/db/database"
|
||||
"github.com/kaspanet/kaspad/infrastructure/logger"
|
||||
"github.com/pkg/errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// UTXOIndex maintains an index between transaction scriptPublicKeys
|
||||
// and UTXOs
|
||||
type UTXOIndex struct {
|
||||
consensus externalapi.Consensus
|
||||
store *utxoIndexStore
|
||||
consensus externalapi.Consensus
|
||||
store *utxoIndexStore
|
||||
genesisHash *externalapi.DomainHash
|
||||
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// New creates a new UTXO index
|
||||
func New(consensus externalapi.Consensus, database database.Database) *UTXOIndex {
|
||||
func New(consensus externalapi.Consensus, database database.Database, genesisHash *externalapi.DomainHash) (*UTXOIndex, error) {
|
||||
store := newUTXOIndexStore(database)
|
||||
return &UTXOIndex{
|
||||
consensus: consensus,
|
||||
store: store,
|
||||
utxoIndex := &UTXOIndex{
|
||||
consensus: consensus,
|
||||
store: store,
|
||||
genesisHash: genesisHash,
|
||||
}
|
||||
|
||||
isSynced, err := utxoIndex.isSynced()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !isSynced {
|
||||
err := utxoIndex.recover()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return utxoIndex, nil
|
||||
}
|
||||
|
||||
func (ui *UTXOIndex) recover() error {
|
||||
onEnd := logger.LogAndMeasureExecutionTime(log, "UTXOIndex.recover")
|
||||
defer onEnd()
|
||||
|
||||
// Since the RPC and P2P should be down while initializing the
|
||||
// UTXO index, we can assume that the virtual selected parent
|
||||
// won't be changed while fetching the virtual selected parent.
|
||||
virtualSelectedParent, err := ui.consensus.GetVirtualSelectedParent()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
virtualUTXOSet, err := ui.consensus.GetVirtualUTXOSet()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ui.store.replaceUTXOSet(virtualUTXOSet, virtualSelectedParent)
|
||||
}
|
||||
|
||||
func (ui *UTXOIndex) isSynced() (bool, error) {
|
||||
virtualSelectedParent, err := ui.consensus.GetVirtualSelectedParent()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
lastVirtualSelectedParent, hasLastVirtualSelectedParent, err := ui.store.getLastVirtualSelectedParent()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if !hasLastVirtualSelectedParent {
|
||||
return virtualSelectedParent.Equal(ui.genesisHash), nil
|
||||
}
|
||||
|
||||
return virtualSelectedParent.Equal(lastVirtualSelectedParent), nil
|
||||
}
|
||||
|
||||
// Update updates the UTXO index with the given DAG selected parent chain changes
|
||||
@@ -37,6 +92,17 @@ func (ui *UTXOIndex) Update(chainChanges *externalapi.SelectedParentChainChanges
|
||||
defer ui.mutex.Unlock()
|
||||
|
||||
log.Tracef("Updating UTXO index with chainChanges: %+v", chainChanges)
|
||||
if len(chainChanges.Added) == 0 {
|
||||
if len(chainChanges.Removed) != 0 {
|
||||
return nil, errors.Errorf("len(chainChanges.Added) is 0 while len(chainChanges.Removed) is %d", len(chainChanges.Removed))
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
virtualSelectedParent := chainChanges.Added[len(chainChanges.Added)-1]
|
||||
ui.store.virtualSelectedParent = virtualSelectedParent
|
||||
|
||||
for _, removedBlockHash := range chainChanges.Removed {
|
||||
err := ui.removeBlock(removedBlockHash)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user