Close iterators (#1542)

* Add Close() function to all the iterators

* Add defer iterator.Close() whenever we open an iterator

* Add isClosed to all iterators and panic/return error if used after closing

Co-authored-by: Svarog <feanorr@gmail.com>
This commit is contained in:
Elichai Turkel 2021-03-01 11:15:59 +02:00 committed by GitHub
parent 63b1d2a05a
commit 6a12428504
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 222 additions and 8 deletions

View File

@ -3,25 +3,40 @@ package database
import (
"github.com/kaspanet/kaspad/domain/consensus/model"
"github.com/kaspanet/kaspad/infrastructure/db/database"
"github.com/pkg/errors"
)
type dbCursor struct {
cursor database.Cursor
cursor database.Cursor
isClosed bool
}
func (d dbCursor) Next() bool {
if d.isClosed {
panic("Tried using a closed DBCursor")
}
return d.cursor.Next()
}
func (d dbCursor) First() bool {
if d.isClosed {
panic("Tried using a closed DBCursor")
}
return d.cursor.First()
}
func (d dbCursor) Seek(key model.DBKey) error {
if d.isClosed {
return errors.New("Tried using a closed DBCursor")
}
return d.cursor.Seek(dbKeyToDatabaseKey(key))
}
func (d dbCursor) Key() (model.DBKey, error) {
if d.isClosed {
return nil, errors.New("Tried using a closed DBCursor")
}
key, err := d.cursor.Key()
if err != nil {
return nil, err
@ -31,11 +46,23 @@ func (d dbCursor) Key() (model.DBKey, error) {
}
func (d dbCursor) Value() ([]byte, error) {
if d.isClosed {
return nil, errors.New("Tried using a closed DBCursor")
}
return d.cursor.Value()
}
func (d dbCursor) Close() error {
return d.cursor.Close()
if d.isClosed {
return errors.New("Tried using a closed DBCursor")
}
d.isClosed = true
err := d.cursor.Close()
if err != nil {
return err
}
d.cursor = nil
return nil
}
func newDBCursor(cursor database.Cursor) model.DBCursor {

View File

@ -9,6 +9,7 @@ func utxoCollectionToDBUTXOCollection(utxoCollection externalapi.UTXOCollection)
items := make([]*DbUtxoCollectionItem, utxoCollection.Len())
i := 0
utxoIterator := utxoCollection.Iterator()
defer utxoIterator.Close()
for ok := utxoIterator.First(); ok; ok = utxoIterator.Next() {
outpoint, entry, err := utxoIterator.Get()
if err != nil {

View File

@ -7,6 +7,7 @@ import (
"github.com/kaspanet/kaspad/domain/consensus/model"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/lrucache"
"github.com/pkg/errors"
)
var bucket = database.MakeBucket([]byte("blocks"))
@ -214,18 +215,28 @@ func (bs *blockStore) serializeBlockCount(count uint64) ([]byte, error) {
}
type allBlockHashesIterator struct {
cursor model.DBCursor
cursor model.DBCursor
isClosed bool
}
func (a allBlockHashesIterator) First() bool {
if a.isClosed {
panic("Tried using a closed AllBlockHashesIterator")
}
return a.cursor.First()
}
func (a allBlockHashesIterator) Next() bool {
if a.isClosed {
panic("Tried using a closed AllBlockHashesIterator")
}
return a.cursor.Next()
}
func (a allBlockHashesIterator) Get() (*externalapi.DomainHash, error) {
if a.isClosed {
return nil, errors.New("Tried using a closed AllBlockHashesIterator")
}
key, err := a.cursor.Key()
if err != nil {
return nil, err
@ -235,6 +246,19 @@ func (a allBlockHashesIterator) Get() (*externalapi.DomainHash, error) {
return externalapi.NewDomainHashFromByteSlice(blockHashBytes)
}
func (a allBlockHashesIterator) Close() error {
if a.isClosed {
return errors.New("Tried using a closed AllBlockHashesIterator")
}
a.isClosed = true
err := a.cursor.Close()
if err != nil {
return err
}
a.cursor = nil
return nil
}
func (bs *blockStore) AllBlockHashesIterator(dbContext model.DBReader) (model.BlockIterator, error) {
cursor, err := dbContext.Cursor(bucket)
if err != nil {

View File

@ -37,6 +37,7 @@ func (css *consensusStateStore) commitVirtualUTXODiff(dbTx model.DBTransaction)
}
toRemoveIterator := css.virtualUTXODiffStaging.ToRemove().Iterator()
defer toRemoveIterator.Close()
for ok := toRemoveIterator.First(); ok; ok = toRemoveIterator.Next() {
toRemoveOutpoint, _, err := toRemoveIterator.Get()
if err != nil {
@ -56,6 +57,7 @@ func (css *consensusStateStore) commitVirtualUTXODiff(dbTx model.DBTransaction)
}
toAddIterator := css.virtualUTXODiffStaging.ToAdd().Iterator()
defer toAddIterator.Close()
for ok := toAddIterator.First(); ok; ok = toAddIterator.Next() {
toAddOutpoint, toAddEntry, err := toAddIterator.Get()
if err != nil {
@ -156,6 +158,7 @@ func (css *consensusStateStore) VirtualUTXOs(dbContext model.DBReader,
if err != nil {
return nil, err
}
defer cursor.Close()
if fromOutpoint != nil {
serializedFromOutpoint, err := serializeOutpoint(fromOutpoint)
@ -170,6 +173,7 @@ func (css *consensusStateStore) VirtualUTXOs(dbContext model.DBReader,
}
iterator := newCursorUTXOSetIterator(cursor)
defer iterator.Close()
outpointAndUTXOEntryPairs := make([]*externalapi.OutpointAndUTXOEntryPair, 0, limit)
for len(outpointAndUTXOEntryPairs) < limit && iterator.Next() {
@ -200,7 +204,8 @@ func (css *consensusStateStore) VirtualUTXOSetIterator(dbContext model.DBReader)
}
type utxoSetIterator struct {
cursor model.DBCursor
cursor model.DBCursor
isClosed bool
}
func newCursorUTXOSetIterator(cursor model.DBCursor) externalapi.ReadOnlyUTXOSetIterator {
@ -208,14 +213,23 @@ func newCursorUTXOSetIterator(cursor model.DBCursor) externalapi.ReadOnlyUTXOSet
}
func (u utxoSetIterator) First() bool {
if u.isClosed {
panic("Tried using a closed utxoSetIterator")
}
return u.cursor.First()
}
func (u utxoSetIterator) Next() bool {
if u.isClosed {
panic("Tried using a closed utxoSetIterator")
}
return u.cursor.Next()
}
func (u utxoSetIterator) Get() (outpoint *externalapi.DomainOutpoint, utxoEntry externalapi.UTXOEntry, err error) {
if u.isClosed {
return nil, nil, errors.New("Tried using a closed utxoSetIterator")
}
key, err := u.cursor.Key()
if err != nil {
panic(err)
@ -238,3 +252,16 @@ func (u utxoSetIterator) Get() (outpoint *externalapi.DomainOutpoint, utxoEntry
return outpoint, utxoEntry, nil
}
func (u utxoSetIterator) Close() error {
if u.isClosed {
return errors.New("Tried using a closed utxoSetIterator")
}
u.isClosed = true
err := u.cursor.Close()
if err != nil {
return err
}
u.cursor = nil
return nil
}

View File

@ -45,6 +45,7 @@ func (css *consensusStateStore) ImportPruningPointUTXOSetIntoVirtualUTXOSet(dbCo
if err != nil {
return err
}
defer deleteCursor.Close()
for ok := deleteCursor.First(); ok; ok = deleteCursor.Next() {
key, err := deleteCursor.Key()
if err != nil {

View File

@ -6,6 +6,7 @@ import (
"github.com/kaspanet/kaspad/domain/consensus/database/serialization"
"github.com/kaspanet/kaspad/domain/consensus/model"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/pkg/errors"
)
var importedPruningPointUTXOsBucket = database.MakeBucket([]byte("imported-pruning-point-utxos"))
@ -16,6 +17,7 @@ func (ps *pruningStore) ClearImportedPruningPointUTXOs(dbContext model.DBWriter)
if err != nil {
return err
}
defer cursor.Close()
for ok := cursor.First(); ok; ok = cursor.Next() {
key, err := cursor.Key()
@ -60,7 +62,8 @@ func (ps *pruningStore) ImportedPruningPointUTXOIterator(dbContext model.DBReade
}
type utxoSetIterator struct {
cursor model.DBCursor
cursor model.DBCursor
isClosed bool
}
func (ps *pruningStore) newCursorUTXOSetIterator(cursor model.DBCursor) externalapi.ReadOnlyUTXOSetIterator {
@ -68,14 +71,23 @@ func (ps *pruningStore) newCursorUTXOSetIterator(cursor model.DBCursor) external
}
func (u *utxoSetIterator) First() bool {
if u.isClosed {
panic("Tried using a closed utxoSetIterator")
}
return u.cursor.First()
}
func (u *utxoSetIterator) Next() bool {
if u.isClosed {
panic("Tried using a closed utxoSetIterator")
}
return u.cursor.Next()
}
func (u *utxoSetIterator) Get() (outpoint *externalapi.DomainOutpoint, utxoEntry externalapi.UTXOEntry, err error) {
if u.isClosed {
return nil, nil, errors.New("Tried using a closed utxoSetIterator")
}
key, err := u.cursor.Key()
if err != nil {
panic(err)
@ -99,6 +111,19 @@ func (u *utxoSetIterator) Get() (outpoint *externalapi.DomainOutpoint, utxoEntry
return outpoint, utxoEntry, nil
}
func (u *utxoSetIterator) Close() error {
if u.isClosed {
return errors.New("Tried using a closed utxoSetIterator")
}
u.isClosed = true
err := u.cursor.Close()
if err != nil {
return err
}
u.cursor = nil
return nil
}
func (ps *pruningStore) importedPruningPointUTXOKey(outpoint *externalapi.DomainOutpoint) (model.DBKey, error) {
serializedOutpoint, err := serializeOutpoint(outpoint)
if err != nil {
@ -175,6 +200,7 @@ func (ps *pruningStore) CommitImportedPruningPointUTXOSet(dbContext model.DBWrit
if err != nil {
return err
}
defer deleteCursor.Close()
for ok := deleteCursor.First(); ok; ok = deleteCursor.Next() {
key, err := deleteCursor.Key()
if err != nil {
@ -191,6 +217,7 @@ func (ps *pruningStore) CommitImportedPruningPointUTXOSet(dbContext model.DBWrit
if err != nil {
return err
}
defer insertCursor.Close()
for ok := insertCursor.First(); ok; ok = insertCursor.Next() {
importedPruningPointUTXOSetKey, err := insertCursor.Key()
if err != nil {

View File

@ -124,6 +124,7 @@ func (ps *pruningStore) UpdatePruningPointUTXOSet(dbContext model.DBWriter,
if err != nil {
return err
}
defer deleteCursor.Close()
for ok := deleteCursor.First(); ok; ok = deleteCursor.Next() {
key, err := deleteCursor.Key()
if err != nil {
@ -215,6 +216,7 @@ func (ps *pruningStore) PruningPointUTXOs(dbContext model.DBReader,
if err != nil {
return nil, err
}
defer cursor.Close()
if fromOutpoint != nil {
serializedFromOutpoint, err := serializeOutpoint(fromOutpoint)
@ -229,6 +231,7 @@ func (ps *pruningStore) PruningPointUTXOs(dbContext model.DBReader,
}
pruningPointUTXOIterator := ps.newCursorUTXOSetIterator(cursor)
defer pruningPointUTXOIterator.Close()
outpointAndUTXOEntryPairs := make([]*externalapi.OutpointAndUTXOEntryPair, 0, limit)
for len(outpointAndUTXOEntryPairs) < limit && pruningPointUTXOIterator.Next() {

View File

@ -7,4 +7,5 @@ type BlockIterator interface {
First() bool
Next() bool
Get() (*externalapi.DomainHash, error)
Close() error
}

View File

@ -6,4 +6,5 @@ type ReadOnlyUTXOSetIterator interface {
First() bool
Next() bool
Get() (outpoint *DomainOutpoint, utxoEntry UTXOEntry, err error)
Close() error
}

View File

@ -86,6 +86,7 @@ func checkBlockUTXOCommitment(t *testing.T, consensus testapi.TestConsensus, blo
if err != nil {
t.Fatalf("Error restoring past UTXO of block %s: %+v", blockName, err)
}
defer utxoSetIterator.Close()
// Build a Multiset
ms := multiset.New()

View File

@ -88,6 +88,7 @@ func (csm *consensusStateManager) importPruningPoint(newPruningPoint *externalap
if err != nil {
return err
}
defer importedPruningPointUTXOIterator.Close()
// Clone the pruningPoint block here because validateBlockTransactionsAgainstPastUTXO
// assumes that the block UTXOEntries are pre-filled during further validations
@ -176,6 +177,7 @@ func (csm *consensusStateManager) importVirtualUTXOSetAndPruningPointUTXOSet() e
if err != nil {
return err
}
defer pruningPointUTXOSetIterator.Close()
log.Debugf("Importing the virtual UTXO set")
err = csm.consensusStateStore.ImportPruningPointUTXOSetIntoVirtualUTXOSet(csm.databaseContext, pruningPointUTXOSetIterator)

View File

@ -14,14 +14,21 @@ type selectedChildIterator struct {
highHash, lowHash *externalapi.DomainHash
current *externalapi.DomainHash
err error
isClosed bool
}
func (s *selectedChildIterator) First() bool {
if s.isClosed {
panic("Tried using a closed SelectedChildIterator")
}
s.current = s.lowHash
return s.Next()
}
func (s *selectedChildIterator) Next() bool {
if s.isClosed {
panic("Tried using a closed SelectedChildIterator")
}
if s.err != nil {
return true
}
@ -50,9 +57,27 @@ func (s *selectedChildIterator) Next() bool {
}
func (s *selectedChildIterator) Get() (*externalapi.DomainHash, error) {
if s.isClosed {
return nil, errors.New("Tried using a closed SelectedChildIterator")
}
return s.current, s.err
}
func (s *selectedChildIterator) Close() error {
if s.isClosed {
return errors.New("Tried using a closed SelectedChildIterator")
}
s.isClosed = true
s.databaseContext = nil
s.dagTopologyManager = nil
s.reachabilityDataStore = nil
s.highHash = nil
s.lowHash = nil
s.current = nil
s.err = nil
return nil
}
// SelectedChildIterator returns a BlockIterator that iterates from lowHash (exclusive) to highHash (inclusive) over
// highHash's selected parent chain
func (dtm *dagTraversalManager) SelectedChildIterator(highHash, lowHash *externalapi.DomainHash) (model.BlockIterator, error) {

View File

@ -133,6 +133,7 @@ func (pm *pruningManager) UpdatePruningPointByVirtual() error {
if err != nil {
return err
}
defer iterator.Close()
// Finding the next pruning point candidate: look for the latest
// selected child of the current candidate that is in depth of at
@ -425,6 +426,7 @@ func (pm *pruningManager) validateUTXOSetFitsCommitment(pruningPointHash *extern
if err != nil {
return err
}
defer utxoSetIterator.Close()
utxoSetMultiset := multiset.New()
for ok := utxoSetIterator.First(); ok; ok = utxoSetIterator.Next() {
@ -544,6 +546,7 @@ func (pm *pruningManager) updatePruningPointUTXOSet() error {
if err != nil {
return err
}
defer utxoSetIterator.Close()
log.Debugf("Updating the pruning point UTXO set")
err = pm.pruningStore.UpdatePruningPointUTXOSet(pm.databaseContext, utxoSetIterator)
@ -563,6 +566,7 @@ func (pm *pruningManager) PruneAllBlocksBelow(pruningPointHash *externalapi.Doma
if err != nil {
return err
}
defer iterator.Close()
for ok := iterator.First(); ok; ok = iterator.Next() {
blockHash, err := iterator.Get()

View File

@ -59,6 +59,7 @@ func (sm *syncManager) antiPastHashesBetween(lowHash, highHash *externalapi.Doma
if err != nil {
return nil, err
}
defer iterator.Close()
for ok := iterator.First(); ok; ok = iterator.Next() {
current, err := iterator.Get()
if err != nil {
@ -145,6 +146,7 @@ func (sm *syncManager) findHighHashAccordingToMaxBlueScoreDifference(lowHash *ex
if err != nil {
return nil, err
}
defer iterator.Close()
for ok := iterator.First(); ok; ok = iterator.Next() {
highHashCandidate, err := iterator.Get()
if err != nil {
@ -191,6 +193,7 @@ func (sm *syncManager) missingBlockBodyHashes(highHash *externalapi.DomainHash)
if err != nil {
return nil, err
}
defer selectedChildIterator.Close()
lowHash := pruningPoint
foundHeaderOnlyBlock := false

View File

@ -26,6 +26,7 @@ func (tc *testConsensus) convertToDot() (string, error) {
if err != nil {
return "", err
}
defer blocksIterator.Close()
for ok := blocksIterator.First(); ok; ok = blocksIterator.Next() {
hash, err := blocksIterator.Get()

View File

@ -11,8 +11,9 @@ type utxoOutpointEntryPair struct {
}
type utxoCollectionIterator struct {
index int
pairs []utxoOutpointEntryPair
index int
pairs []utxoOutpointEntryPair
isClosed bool
}
func (uc utxoCollection) Iterator() externalapi.ReadOnlyUTXOSetIterator {
@ -29,21 +30,33 @@ func (uc utxoCollection) Iterator() externalapi.ReadOnlyUTXOSetIterator {
}
func (uci *utxoCollectionIterator) First() bool {
if uci.isClosed {
panic("Tried using a closed utxoCollectionIterator")
}
uci.index = 0
return len(uci.pairs) > 0
}
func (uci *utxoCollectionIterator) Next() bool {
if uci.isClosed {
panic("Tried using a closed utxoCollectionIterator")
}
uci.index++
return uci.index < len(uci.pairs)
}
func (uci *utxoCollectionIterator) Get() (outpoint *externalapi.DomainOutpoint, utxoEntry externalapi.UTXOEntry, err error) {
if uci.isClosed {
return nil, nil, errors.New("Tried using a closed utxoCollectionIterator")
}
pair := uci.pairs[uci.index]
return &pair.outpoint, pair.entry, nil
}
func (uci *utxoCollectionIterator) WithDiff(diff externalapi.UTXODiff) (externalapi.ReadOnlyUTXOSetIterator, error) {
if uci.isClosed {
return nil, errors.New("Tried using a closed utxoCollectionIterator")
}
d, ok := diff.(*immutableUTXODiff)
if !ok {
return nil, errors.New("diff is not of type *immutableUTXODiff")
@ -55,3 +68,12 @@ func (uci *utxoCollectionIterator) WithDiff(diff externalapi.UTXODiff) (external
toAddIterator: diff.ToAdd().Iterator(),
}, nil
}
func (uci *utxoCollectionIterator) Close() error {
if uci.isClosed {
return errors.New("Tried using a closed utxoCollectionIterator")
}
uci.isClosed = true
uci.pairs = nil
return nil
}

View File

@ -14,6 +14,7 @@ type readOnlyUTXOIteratorWithDiff struct {
currentErr error
toAddIterator externalapi.ReadOnlyUTXOSetIterator
isClosed bool
}
// IteratorWithDiff applies a UTXODiff to given utxo iterator
@ -40,9 +41,17 @@ func IteratorWithDiff(iterator externalapi.ReadOnlyUTXOSetIterator, diff externa
}
func (r *readOnlyUTXOIteratorWithDiff) First() bool {
if r.isClosed {
panic("Tried using a closed readOnlyUTXOIteratorWithDiff")
}
baseNotEmpty := r.baseIterator.First()
baseEmpty := !baseNotEmpty
err := r.toAddIterator.Close()
if err != nil {
r.currentErr = err
return true
}
r.toAddIterator = r.diff.ToAdd().Iterator()
toAddEmpty := r.diff.ToAdd().Len() == 0
@ -61,6 +70,9 @@ func (r *readOnlyUTXOIteratorWithDiff) First() bool {
}
func (r *readOnlyUTXOIteratorWithDiff) Next() bool {
if r.isClosed {
panic("Tried using a closed readOnlyUTXOIteratorWithDiff")
}
for r.baseIterator.Next() { // keep looping until we reach an outpoint/entry pair that is not in r.diff.toRemove
r.currentOutpoint, r.currentUTXOEntry, r.currentErr = r.baseIterator.Get()
if !r.diff.mutableUTXODiff.toRemove.containsWithBlueScore(r.currentOutpoint, r.currentUTXOEntry.BlockBlueScore()) {
@ -77,5 +89,30 @@ func (r *readOnlyUTXOIteratorWithDiff) Next() bool {
}
func (r *readOnlyUTXOIteratorWithDiff) Get() (outpoint *externalapi.DomainOutpoint, utxoEntry externalapi.UTXOEntry, err error) {
if r.isClosed {
return nil, nil, errors.New("Tried using a closed readOnlyUTXOIteratorWithDiff")
}
return r.currentOutpoint, r.currentUTXOEntry, r.currentErr
}
func (r *readOnlyUTXOIteratorWithDiff) Close() error {
if r.isClosed {
return errors.New("Tried using a closed readOnlyUTXOIteratorWithDiff")
}
r.isClosed = true
err := r.baseIterator.Close()
if err != nil {
return err
}
err = r.toAddIterator.Close()
if err != nil {
return err
}
r.baseIterator = nil
r.diff = nil
r.currentOutpoint = nil
r.currentUTXOEntry = nil
r.currentErr = nil
r.toAddIterator = nil
return nil
}

View File

@ -249,6 +249,7 @@ func (uis *utxoIndexStore) getUTXOOutpointEntryPairs(scriptPublicKey *externalap
if err != nil {
return nil, err
}
defer cursor.Close()
utxoOutpointEntryPairs := make(UTXOOutpointEntryPairs)
for cursor.Next() {
key, err := cursor.Key()
@ -297,6 +298,7 @@ func (uis *utxoIndexStore) deleteAll() error {
if err != nil {
return err
}
defer cursor.Close()
for cursor.Next() {
key, err := cursor.Key()
if err != nil {

View File

@ -131,6 +131,7 @@ func (ui *UTXOIndex) Update(blockInsertionResult *externalapi.BlockInsertionResu
func (ui *UTXOIndex) addUTXOs(toAdd externalapi.UTXOCollection) error {
iterator := toAdd.Iterator()
defer iterator.Close()
for ok := iterator.First(); ok; ok = iterator.Next() {
outpoint, entry, err := iterator.Get()
if err != nil {
@ -148,6 +149,7 @@ func (ui *UTXOIndex) addUTXOs(toAdd externalapi.UTXOCollection) error {
func (ui *UTXOIndex) removeUTXOs(toRemove externalapi.UTXOCollection) error {
iterator := toRemove.Iterator()
defer iterator.Close()
for ok := iterator.First(); ok; ok = iterator.Next() {
outpoint, entry, err := iterator.Get()
if err != nil {

View File

@ -106,7 +106,8 @@ func (c *LevelDBCursor) Close() error {
return errors.New("cannot close an already closed cursor")
}
c.isClosed = true
c.ldbIterator.Release()
c.ldbIterator = nil
c.bucket = nil
return nil
}

View File

@ -43,6 +43,7 @@ func (as *addressStore) restoreNotBannedAddresses() error {
if err != nil {
return err
}
defer cursor.Close()
for ok := cursor.First(); ok; ok = cursor.Next() {
databaseKey, err := cursor.Key()
if err != nil {
@ -66,6 +67,7 @@ func (as *addressStore) restoreBannedAddresses() error {
if err != nil {
return err
}
defer cursor.Close()
for ok := cursor.First(); ok; ok = cursor.Next() {
databaseKey, err := cursor.Key()
if err != nil {