Add recoverability for UTXO index (#1342)

* Add recoverability for UTXO index

* Add comment

* Rename UTXOOutpointPair->OutpointUTXOPair

* Get rid of the db transaction on resetStore and collect all keys before deleting

* Use VirtualSelectedParent instead of selected tip

* Fix error
This commit is contained in:
Ori Newman
2021-01-04 14:15:51 +02:00
committed by GitHub
parent acef311fb4
commit 778375c4af
6 changed files with 210 additions and 11 deletions

View File

@@ -96,7 +96,10 @@ func NewComponentManager(cfg *config.Config, db infrastructuredatabase.Database,
var utxoIndex *utxoindex.UTXOIndex
if cfg.UTXOIndex {
utxoIndex = utxoindex.New(domain.Consensus(), db)
utxoIndex, err = utxoindex.New(domain.Consensus(), db, cfg.ActiveNetParams.GenesisHash)
if err != nil {
return nil, err
}
log.Infof("UTXO index started")
}

View File

@@ -372,3 +372,25 @@ func (s *consensus) GetHeadersSelectedTip() (*externalapi.DomainHash, error) {
return s.headersSelectedTipStore.HeadersSelectedTip(s.databaseContext)
}
func (s *consensus) GetVirtualUTXOSet() ([]*externalapi.OutpointUTXOPair, error) {
s.lock.Lock()
defer s.lock.Unlock()
iterator, err := s.consensusStateStore.VirtualUTXOSetIterator(s.databaseContext)
if err != nil {
return nil, err
}
pairs := make([]*externalapi.OutpointUTXOPair, 0)
for iterator.Next() {
outpoint, entry, err := iterator.Get()
if err != nil {
return nil, err
}
pairs = append(pairs, &externalapi.OutpointUTXOPair{Outpoint: outpoint, Entry: entry})
}
return pairs, nil
}

View File

@@ -26,4 +26,6 @@ type Consensus interface {
GetVirtualSelectedParentChainFromBlock(blockHash *DomainHash) (*SelectedParentChainChanges, error)
IsInSelectedParentChainOf(blockHashA *DomainHash, blockHashB *DomainHash) (bool, error)
GetHeadersSelectedTip() (*DomainHash, error)
GetVirtualUTXOSet() ([]*OutpointUTXOPair, error)
}

View File

@@ -0,0 +1,7 @@
package externalapi
// OutpointUTXOPair is a pair of outpoint and UTXO entry
type OutpointUTXOPair struct {
Outpoint *DomainOutpoint
Entry UTXOEntry
}

View File

@@ -10,11 +10,13 @@ import (
)
var utxoIndexBucket = database.MakeBucket([]byte("utxo-index"))
var utxoIndexLastVirtualSelectedParentKey = database.MakeBucket().Key([]byte("utxo-index-last-virtual-selected-parent"))
type utxoIndexStore struct {
database database.Database
toAdd map[ScriptPublicKeyString]UTXOOutpointEntryPairs
toRemove map[ScriptPublicKeyString]UTXOOutpoints
database database.Database
toAdd map[ScriptPublicKeyString]UTXOOutpointEntryPairs
toRemove map[ScriptPublicKeyString]UTXOOutpoints
virtualSelectedParent *externalapi.DomainHash
}
func newUTXOIndexStore(database database.Database) *utxoIndexStore {
@@ -96,6 +98,7 @@ func (uis *utxoIndexStore) remove(scriptPublicKey []byte, outpoint *externalapi.
func (uis *utxoIndexStore) discard() {
uis.toAdd = make(map[ScriptPublicKeyString]UTXOOutpointEntryPairs)
uis.toRemove = make(map[ScriptPublicKeyString]UTXOOutpoints)
uis.virtualSelectedParent = nil
}
func (uis *utxoIndexStore) commit() error {
@@ -142,6 +145,11 @@ func (uis *utxoIndexStore) commit() error {
}
}
err = dbTransaction.Put(utxoIndexLastVirtualSelectedParentKey, uis.virtualSelectedParent.ByteSlice())
if err != nil {
return err
}
err = dbTransaction.Commit()
if err != nil {
return err
@@ -222,7 +230,7 @@ func (uis *utxoIndexStore) stagedData() (
}
func (uis *utxoIndexStore) getUTXOOutpointEntryPairs(scriptPublicKey []byte) (UTXOOutpointEntryPairs, error) {
if len(uis.toAdd) > 0 || len(uis.toRemove) > 0 {
if uis.isAnythingStaged() {
return nil, errors.Errorf("cannot get utxo outpoint entry pairs while staging isn't empty")
}
@@ -253,3 +261,94 @@ func (uis *utxoIndexStore) getUTXOOutpointEntryPairs(scriptPublicKey []byte) (UT
}
return utxoOutpointEntryPairs, nil
}
func (uis *utxoIndexStore) getLastVirtualSelectedParent() (*externalapi.DomainHash, bool, error) {
if uis.isAnythingStaged() {
return nil, false, errors.Errorf("cannot get last virtual selected parent while staging isn't empty")
}
hasLastVirtualSelectedParent, err := uis.database.Has(utxoIndexLastVirtualSelectedParentKey)
if err != nil {
return nil, false, err
}
if !hasLastVirtualSelectedParent {
return nil, false, nil
}
lastVirtualSelectedParentBytes, err := uis.database.Get(utxoIndexLastVirtualSelectedParentKey)
if err != nil {
return nil, false, err
}
lastVirtualSelectedParent, err := externalapi.NewDomainHashFromByteSlice(lastVirtualSelectedParentBytes)
if err != nil {
return nil, false, err
}
return lastVirtualSelectedParent, true, nil
}
func (uis *utxoIndexStore) isAnythingStaged() bool {
return len(uis.toAdd) > 0 || len(uis.toRemove) > 0 || uis.virtualSelectedParent != nil
}
func (uis *utxoIndexStore) replaceUTXOSet(utxoSet []*externalapi.OutpointUTXOPair,
virtualSelectedParent *externalapi.DomainHash) error {
onEnd := logger.LogAndMeasureExecutionTime(log, "utxoIndexStore.replaceUTXOSet")
defer onEnd()
if uis.isAnythingStaged() {
return errors.Errorf("cannot replace utxo set while something is staged")
}
err := uis.resetStore()
if err != nil {
return err
}
uis.virtualSelectedParent = virtualSelectedParent
for _, pair := range utxoSet {
err := uis.add(pair.Entry.ScriptPublicKey(), pair.Outpoint, pair.Entry)
if err != nil {
return err
}
}
return uis.commit()
}
func (uis *utxoIndexStore) resetStore() error {
onEnd := logger.LogAndMeasureExecutionTime(log, "utxoIndexStore.resetStore")
defer onEnd()
cursor, err := uis.database.Cursor(utxoIndexBucket)
if err != nil {
return err
}
keysToDelete := make([]*database.Key, 0)
for cursor.Next() {
key, err := cursor.Key()
if err != nil {
return err
}
keysToDelete = append(keysToDelete, key)
}
for _, key := range keysToDelete {
err = uis.database.Delete(key)
if err != nil {
return err
}
}
err = uis.database.Delete(utxoIndexLastVirtualSelectedParentKey)
if err != nil {
return err
}
return nil
}

View File

@@ -7,25 +7,80 @@ import (
"github.com/kaspanet/kaspad/domain/consensus/utils/utxo"
"github.com/kaspanet/kaspad/infrastructure/db/database"
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/pkg/errors"
"sync"
)
// UTXOIndex maintains an index between transaction scriptPublicKeys
// and UTXOs
type UTXOIndex struct {
consensus externalapi.Consensus
store *utxoIndexStore
consensus externalapi.Consensus
store *utxoIndexStore
genesisHash *externalapi.DomainHash
mutex sync.Mutex
}
// New creates a new UTXO index
func New(consensus externalapi.Consensus, database database.Database) *UTXOIndex {
func New(consensus externalapi.Consensus, database database.Database, genesisHash *externalapi.DomainHash) (*UTXOIndex, error) {
store := newUTXOIndexStore(database)
return &UTXOIndex{
consensus: consensus,
store: store,
utxoIndex := &UTXOIndex{
consensus: consensus,
store: store,
genesisHash: genesisHash,
}
isSynced, err := utxoIndex.isSynced()
if err != nil {
return nil, err
}
if !isSynced {
err := utxoIndex.recover()
if err != nil {
return nil, err
}
}
return utxoIndex, nil
}
func (ui *UTXOIndex) recover() error {
onEnd := logger.LogAndMeasureExecutionTime(log, "UTXOIndex.recover")
defer onEnd()
// Since the RPC and P2P should be down while initializing the
// UTXO index, we can assume that the virtual selected parent
// won't be changed while fetching the virtual selected parent.
virtualSelectedParent, err := ui.consensus.GetVirtualSelectedParent()
if err != nil {
return err
}
virtualUTXOSet, err := ui.consensus.GetVirtualUTXOSet()
if err != nil {
return err
}
return ui.store.replaceUTXOSet(virtualUTXOSet, virtualSelectedParent)
}
func (ui *UTXOIndex) isSynced() (bool, error) {
virtualSelectedParent, err := ui.consensus.GetVirtualSelectedParent()
if err != nil {
return false, err
}
lastVirtualSelectedParent, hasLastVirtualSelectedParent, err := ui.store.getLastVirtualSelectedParent()
if err != nil {
return false, err
}
if !hasLastVirtualSelectedParent {
return virtualSelectedParent.Equal(ui.genesisHash), nil
}
return virtualSelectedParent.Equal(lastVirtualSelectedParent), nil
}
// Update updates the UTXO index with the given DAG selected parent chain changes
@@ -37,6 +92,17 @@ func (ui *UTXOIndex) Update(chainChanges *externalapi.SelectedParentChainChanges
defer ui.mutex.Unlock()
log.Tracef("Updating UTXO index with chainChanges: %+v", chainChanges)
if len(chainChanges.Added) == 0 {
if len(chainChanges.Removed) != 0 {
return nil, errors.Errorf("len(chainChanges.Added) is 0 while len(chainChanges.Removed) is %d", len(chainChanges.Removed))
}
return nil, nil
}
virtualSelectedParent := chainChanges.Added[len(chainChanges.Added)-1]
ui.store.virtualSelectedParent = virtualSelectedParent
for _, removedBlockHash := range chainChanges.Removed {
err := ui.removeBlock(removedBlockHash)
if err != nil {