include removal logic in txindex

This commit is contained in:
D-Stacks 2022-06-24 11:48:13 +02:00
parent 463db63e22
commit e059e880c7
2 changed files with 93 additions and 40 deletions

View File

@ -14,6 +14,7 @@ var pruningPointKey = database.MakeBucket([]byte("")).Key([]byte("tx-index-prunn
type txIndexStore struct {
database database.Database
toAdd map[externalapi.DomainTransactionID]*externalapi.DomainHash
toRemove map[externalapi.DomainTransactionID]*externalapi.DomainHash
virtualParents []*externalapi.DomainHash
pruningPoint *externalapi.DomainHash
}
@ -60,71 +61,67 @@ func (tis *txIndexStore) deleteAll() error {
func (tis *txIndexStore) add(txID externalapi.DomainTransactionID, blockHash *externalapi.DomainHash) {
log.Tracef("Adding %s Txs from blockHash %s", txID.String(), blockHash.String())
delete(tis.toRemove, txID) //adding takes precedence
tis.toAdd[txID] = blockHash
}
func (tis *txIndexStore) remove(txID externalapi.DomainTransactionID, blockHash *externalapi.DomainHash) {
log.Tracef("Removing %s Txs from blockHash %s", txID.String(), blockHash.String())
if _, found := tis.toAdd[txID]; !found { //adding takes precedence
tis.toRemove[txID] = blockHash
}
}
func (tis *txIndexStore) discard() {
tis.toAdd = make(map[externalapi.DomainTransactionID]*externalapi.DomainHash)
tis.virtualParents = nil
tis.pruningPoint = nil
}
func (tis *txIndexStore) commitAndReturnRemoved() (
removed map[externalapi.DomainTransactionID]*externalapi.DomainHash,
err error) {
func (tis *txIndexStore) commit() error {
onEnd := logger.LogAndMeasureExecutionTime(log, "txIndexStore.commit")
defer onEnd()
dbTransaction, err := tis.database.Begin()
if err != nil {
return nil, err
return err
}
defer dbTransaction.RollbackUnlessClosed()
removed = make(map[externalapi.DomainTransactionID]*externalapi.DomainHash)
for txID, blockHash := range tis.toAdd {
key := tis.convertTxIDToKey(txAcceptedIndexBucket, txID)
found, err := dbTransaction.Has(key)
for toRemoveTxID := range tis.toRemove { //safer to remove first
key := tis.convertTxIDToKey(txAcceptedIndexBucket, toRemoveTxID)
err := dbTransaction.Delete(key)
if err != nil {
return nil, err
}
if found {
serializedRemovedBlockHash, err := dbTransaction.Get(key)
if err != nil {
return nil, err
}
removedBlockHash, err := externalapi.NewDomainHashFromByteSlice(serializedRemovedBlockHash)
if err != nil {
return nil, err
}
removed[txID] = removedBlockHash
return err
}
}
for toAddTxID, blockHash := range tis.toAdd {
key := tis.convertTxIDToKey(txAcceptedIndexBucket, toAddTxID)
dbTransaction.Put(key, blockHash.ByteSlice())
if err != nil {
return nil, err
return err
}
}
err = dbTransaction.Put(virtualParentsKey, serializeHashes(tis.virtualParents))
if err != nil {
return nil, err
return err
}
err = dbTransaction.Put(pruningPointKey, tis.pruningPoint.ByteSlice())
if err != nil {
return nil, err
return err
}
err = dbTransaction.Commit()
if err != nil {
return nil, err
return err
}
tis.discard()
return removed, nil
return nil
}
func (tis *txIndexStore) updateAndCommitVirtualParentsWithoutTransaction(virtualParents []*externalapi.DomainHash) error {
@ -142,6 +139,14 @@ func (tis *txIndexStore) updateVirtualParents(virtualParents []*externalapi.Doma
}
func (tis *txIndexStore) CommitWithoutTransaction() error {
for txID := range tis.toRemove { //safer to remove first
key := tis.convertTxIDToKey(txAcceptedIndexBucket, txID)
err := tis.database.Delete(key)
if err != nil {
return err
}
}
for txID, blockHash := range tis.toAdd {
key := tis.convertTxIDToKey(txAcceptedIndexBucket, txID)
err := tis.database.Put(key, blockHash.ByteSlice())
@ -185,18 +190,23 @@ func (tis *txIndexStore) convertTxIDToKey(bucket *database.Bucket, txID external
func (tis *txIndexStore) stagedData() (
toAdd map[externalapi.DomainTransactionID]*externalapi.DomainHash,
toRemove map[externalapi.DomainTransactionID]*externalapi.DomainHash,
virtualParents []*externalapi.DomainHash,
pruningPoint *externalapi.DomainHash) {
toAddClone := make(map[externalapi.DomainTransactionID]*externalapi.DomainHash)
toRemoveClone := make(map[externalapi.DomainTransactionID]*externalapi.DomainHash)
for txID, blockHash := range tis.toAdd {
toAddClone[txID] = blockHash
}
return toAddClone, tis.virtualParents, tis.pruningPoint
for txID, blockHash := range tis.toRemove {
toRemoveClone[txID] = blockHash
}
return toAddClone, toRemoveClone, tis.virtualParents, tis.pruningPoint
}
func (tis *txIndexStore) isAnythingStaged() bool {
return len(tis.toAdd) > 0
return len(tis.toAdd) > 0 || len(tis.toRemove) > 0
}
func (tis *txIndexStore) getTxAcceptingBlockHash(txID *externalapi.DomainTransactionID) (blockHash *externalapi.DomainHash, found bool, err error) {

View File

@ -70,7 +70,15 @@ func (ti *TXIndex) Reset() error {
return err
}
ti.removeTXIDs(selectedParentChainChanges, 1000)
if err != nil {
return err
}
ti.addTXIDs(selectedParentChainChanges, 1000)
if err != nil {
return err
}
err = ti.store.CommitWithoutTransaction()
if err != nil {
@ -124,25 +132,29 @@ func (ti *TXIndex) Update(virtualChangeSet *externalapi.VirtualChangeSet) (*TXAc
log.Tracef("Updating TX index with VirtualSelectedParentChainChanges: %+v", virtualChangeSet.VirtualSelectedParentChainChanges)
err := ti.addTXIDs(virtualChangeSet.VirtualSelectedParentChainChanges, 1000)
err := ti.removeTXIDs(virtualChangeSet.VirtualSelectedParentChainChanges, 1000)
if err != nil {
return nil, err
}
err = ti.addTXIDs(virtualChangeSet.VirtualSelectedParentChainChanges, 1000)
if err != nil {
return nil, err
}
ti.store.updateVirtualParents(virtualChangeSet.VirtualParents)
added, _, _ := ti.store.stagedData()
added, removed, _, _ := ti.store.stagedData()
txIndexChanges := &TXAcceptanceChange{
Added: added,
Removed: removed,
}
removed, err := ti.store.commitAndReturnRemoved()
err = ti.store.commit()
if err != nil {
return nil, err
}
txIndexChanges.Removed = removed
log.Tracef("TX index updated with the TXAcceptanceChange: %+v", txIndexChanges)
return txIndexChanges, nil
}
@ -178,6 +190,37 @@ func (ti *TXIndex) addTXIDs(selectedParentChainChanges *externalapi.SelectedChai
return nil
}
func (ti *TXIndex) removeTXIDs(selectedParentChainChanges *externalapi.SelectedChainPath, chunkSize int) error {
position := 0
for position < len(selectedParentChainChanges.Removed) {
var chainBlocksChunk []*externalapi.DomainHash
if position+chunkSize > len(selectedParentChainChanges.Removed) {
chainBlocksChunk = selectedParentChainChanges.Removed[position:]
} else {
chainBlocksChunk = selectedParentChainChanges.Removed[position : position+chunkSize]
}
// We use chunks in order to avoid blocking consensus for too long
// note: this might not be needed here, but unsure how kaspad handles pruning / when reset might be called.
chainBlocksAcceptanceData, err := ti.domain.Consensus().GetBlocksAcceptanceData(chainBlocksChunk)
if err != nil {
return err
}
for i, removedChainBlock := range chainBlocksChunk {
chainBlockAcceptanceData := chainBlocksAcceptanceData[i]
for _, blockAcceptanceData := range chainBlockAcceptanceData {
for _, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData {
if transactionAcceptanceData.IsAccepted {
ti.store.remove(*transactionAcceptanceData.Transaction.ID, removedChainBlock)
}
}
}
}
position += chunkSize
}
return nil
}
// TXAcceptingBlockHash returns the accepting block hash for for the given txID
func (ti *TXIndex) TXAcceptingBlockHash(txID *externalapi.DomainTransactionID) (blockHash *externalapi.DomainHash, found bool, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXAcceptingBlockHash")
@ -199,4 +242,4 @@ func (ti *TXIndex) TXAcceptingBlockHash(txID *externalapi.DomainTransactionID) (
//TO DO: Get Including Block from AcceptingBlock
//TO DO: Get Confirmations
//TO DO: Get Number of confirmations