store accepting + including block + index

This commit is contained in:
D-Stacks 2022-10-14 00:47:14 +02:00
parent 7336b2d7eb
commit d401caced5
5 changed files with 288 additions and 174 deletions

View File

@ -1,43 +1,34 @@
package txindex package txindex
import ( import (
"encoding/hex"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
) )
// TXAcceptanceChange is the set of changes made to the TX index after // TXAcceptanceChange is the set of changes made to the TX index after
// a successful update // a successful update
type TXAcceptanceChange struct { type TXAcceptanceChange struct {
Added map[externalapi.DomainTransactionID]*externalapi.DomainHash Added map[externalapi.DomainTransactionID]*TxData
Removed map[externalapi.DomainTransactionID]*externalapi.DomainHash Removed map[externalapi.DomainTransactionID]*TxData
} }
//TxData holds tx data stored in the TXIndex database
type TxData struct {
IncludingBlockHash *externalapi.DomainHash
AcceptingBlockHash *externalapi.DomainHash
IncludingIndex uint32
}
//TxIDsToTxIndexData is a map of TxIDs to corrospnding TxIndexData
type TxIDsToTxIndexData map[externalapi.DomainTransactionID]*TxData
//TxIDsToBlockHashes is a map of TxIDs to corrospnding blockHashes //TxIDsToBlockHashes is a map of TxIDs to corrospnding blockHashes
type TxIDsToBlockHashes map[*externalapi.DomainTransactionID]*externalapi.DomainHash type TxIDsToBlockHashes map[externalapi.DomainTransactionID]*externalapi.DomainHash
//TxIDsToBlocks is a map of TxIDs to corrospnding blocks //TxIDsToBlocks is a map of TxIDs to corrospnding blocks
type TxIDsToBlocks map[*externalapi.DomainTransactionID]*externalapi.DomainBlock type TxIDsToBlocks map[externalapi.DomainTransactionID]*externalapi.DomainBlock
//TxIDsToConfirmations is a map of TxIDs to corrospnding Confirmations //TxIDsToConfirmations is a map of TxIDs to corrospnding Confirmations
type TxIDsToConfirmations map[*externalapi.DomainTransactionID]int64 type TxIDsToConfirmations map[externalapi.DomainTransactionID]int64
// ConvertDomainHashToString converts the given DomainHash to a string //TxIDsToBlueScores is a map of TxIDs to corrospnding Confirmations
func ConvertDomainHashToString(blockHash *externalapi.DomainHash) string { type TxIDsToBlueScores map[externalapi.DomainTransactionID]uint64
return hex.EncodeToString(blockHash.ByteSlice())
}
// ConvertStringToDomainHash converts the given string to a domainHash
func ConvertStringToDomainHash(stringDomainHash string) (*externalapi.DomainHash, error) {
return externalapi.NewDomainHashFromString(stringDomainHash)
}
// ConvertTXIDToString converts the given DomainHash to a string
func ConvertTXIDToString(txID *externalapi.DomainTransactionID) string {
return hex.EncodeToString(txID.ByteSlice())
}
// ConvertStringTXID converts the given string to a domainHash
func ConvertStringTXID(stringDomainTransactionID string) (*externalapi.DomainTransactionID, error) {
return externalapi.NewDomainTransactionIDFromString(stringDomainTransactionID)
}

View File

@ -2,9 +2,10 @@ package txindex
import ( import (
"encoding/binary" "encoding/binary"
"io"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/pkg/errors" "github.com/pkg/errors"
"io"
) )
func serializeHashes(hashes []*externalapi.DomainHash) []byte { func serializeHashes(hashes []*externalapi.DomainHash) []byte {
@ -40,3 +41,33 @@ func deserializeHashes(serializedHashes []byte) ([]*externalapi.DomainHash, erro
return hashes, nil return hashes, nil
} }
func deserializeTxIndexData(serializedTxIndexData []byte) (*TxData, error) {
var err error
deserializedTxIndexData := &TxData{}
deserializedTxIndexData.IncludingBlockHash, err = externalapi.NewDomainHashFromByteSlice(serializedTxIndexData[:32])
if err != nil {
return nil, err
}
deserializedTxIndexData.AcceptingBlockHash, err = externalapi.NewDomainHashFromByteSlice(serializedTxIndexData[32:64])
if err != nil {
return nil, err
}
deserializedTxIndexData.IncludingIndex = binary.BigEndian.Uint32(serializedTxIndexData[64:68])
return deserializedTxIndexData, nil
}
func serializeTxIndexData(blockTxIndexData *TxData) []byte {
indexBytes := make([]byte, 4)
binary.BigEndian.PutUint32(indexBytes, blockTxIndexData.IncludingIndex)
serializedTxIndexData := append(
append(
blockTxIndexData.IncludingBlockHash.ByteSlice(),
blockTxIndexData.AcceptingBlockHash.ByteSlice()...,
),
indexBytes...,
)
return serializedTxIndexData
}

View File

@ -2,43 +2,42 @@ package txindex
import ( import (
"encoding/binary" "encoding/binary"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/pkg/errors"
"io"
"math/rand" "math/rand"
"testing" "testing"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
) )
func Test_serializeHashes(t *testing.T) { func Test_serializeTxIndexData(t *testing.T) {
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
for length := 0; length < 32; length++ { serializedtxIndex := make([]byte, 68) // 32 bytes including block hash 32 bytes accepting blockhash and 4 bytes uint32
hashes := make([]*externalapi.DomainHash, length) r.Read(serializedtxIndex[:])
for i := range hashes { includingBlockHash, err := externalapi.NewDomainHashFromByteSlice(serializedtxIndex[:32])
var hashBytes [32]byte if err != nil {
r.Read(hashBytes[:]) t.Fatalf(err.Error())
hashes[i] = externalapi.NewDomainHashFromByteArray(&hashBytes)
}
result, err := deserializeHashes(serializeHashes(hashes))
if err != nil {
t.Fatalf("Failed deserializing hashes: %v", err)
}
if !externalapi.HashesEqual(hashes, result) {
t.Fatalf("Expected \n %s \n==\n %s\n", hashes, result)
}
} }
} acceptingBlockHash, err := externalapi.NewDomainHashFromByteSlice(serializedtxIndex[32:64])
if err != nil {
t.Fatalf(err.Error())
}
includingIndex := binary.BigEndian.Uint32(serializedtxIndex[64:68])
func Test_deserializeHashesFailure(t *testing.T) { testdeserializedtxIndex := &TxData{
hashes := []*externalapi.DomainHash{ IncludingBlockHash: includingBlockHash,
externalapi.NewDomainHashFromByteArray(&[externalapi.DomainHashSize]byte{1}), AcceptingBlockHash: acceptingBlockHash,
externalapi.NewDomainHashFromByteArray(&[externalapi.DomainHashSize]byte{2}), IncludingIndex: includingIndex,
externalapi.NewDomainHashFromByteArray(&[externalapi.DomainHashSize]byte{3}),
} }
serialized := serializeHashes(hashes)
binary.LittleEndian.PutUint64(serialized[:8], uint64(len(hashes)+1)) result, err := deserializeTxIndexData(serializeTxIndexData(testdeserializedtxIndex))
_, err := deserializeHashes(serialized) if err != nil {
if !errors.Is(err, io.ErrUnexpectedEOF) { t.Fatalf("Failed deserializing txIndexData: %v", err)
t.Fatalf("Expected error to be EOF, instead got: %v", err) }
if !testdeserializedtxIndex.IncludingBlockHash.Equal(result.IncludingBlockHash) {
t.Fatalf("Expected including block hash: \n %s \n Got: \n %s\n", testdeserializedtxIndex.IncludingBlockHash.String(), result.IncludingBlockHash.String())
} else if !testdeserializedtxIndex.AcceptingBlockHash.Equal(result.AcceptingBlockHash) {
t.Fatalf("Expected accepting block hash \n %s \n Got: \n %s\n", testdeserializedtxIndex.AcceptingBlockHash.String(), result.AcceptingBlockHash.String())
} else if testdeserializedtxIndex.IncludingIndex != result.IncludingIndex {
t.Fatalf("Expected including index \n %d \n Got: \n %d\n", testdeserializedtxIndex.IncludingIndex, result.IncludingIndex)
} }
} }

View File

@ -13,8 +13,8 @@ var pruningPointKey = database.MakeBucket([]byte("")).Key([]byte("tx-index-prunn
type txIndexStore struct { type txIndexStore struct {
database database.Database database database.Database
toAdd map[externalapi.DomainTransactionID]*externalapi.DomainHash toAdd map[externalapi.DomainTransactionID]*TxData
toRemove map[externalapi.DomainTransactionID]*externalapi.DomainHash toRemove map[externalapi.DomainTransactionID]*TxData
virtualParents []*externalapi.DomainHash virtualParents []*externalapi.DomainHash
pruningPoint *externalapi.DomainHash pruningPoint *externalapi.DomainHash
} }
@ -22,8 +22,8 @@ type txIndexStore struct {
func newTXIndexStore(database database.Database) *txIndexStore { func newTXIndexStore(database database.Database) *txIndexStore {
return &txIndexStore{ return &txIndexStore{
database: database, database: database,
toAdd: make(map[externalapi.DomainTransactionID]*externalapi.DomainHash), toAdd: make(map[externalapi.DomainTransactionID]*TxData),
toRemove: make(map[externalapi.DomainTransactionID]*externalapi.DomainHash), toRemove: make(map[externalapi.DomainTransactionID]*TxData),
virtualParents: nil, virtualParents: nil,
pruningPoint: nil, pruningPoint: nil,
} }
@ -60,22 +60,32 @@ func (tis *txIndexStore) deleteAll() error {
return nil return nil
} }
func (tis *txIndexStore) add(txID externalapi.DomainTransactionID, blockHash *externalapi.DomainHash) { func (tis *txIndexStore) add(txID externalapi.DomainTransactionID, includingIndex uint32,
log.Tracef("Adding %s Txs from blockHash %s", txID.String(), ConvertDomainHashToString(blockHash)) includingBlockHash *externalapi.DomainHash, acceptingBlockHash *externalapi.DomainHash) {
log.Tracef("Adding %s Txs from blockHash %s", txID.String(), includingBlockHash.String())
delete(tis.toRemove, txID) //adding takes precedence delete(tis.toRemove, txID) //adding takes precedence
tis.toAdd[txID] = blockHash tis.toAdd[txID] = &TxData{
IncludingBlockHash: includingBlockHash,
IncludingIndex: includingIndex,
AcceptingBlockHash: acceptingBlockHash,
}
} }
func (tis *txIndexStore) remove(txID externalapi.DomainTransactionID, blockHash *externalapi.DomainHash) { func (tis *txIndexStore) remove(txID externalapi.DomainTransactionID, includingIndex uint32,
log.Tracef("Removing %s Txs from blockHash %s", txID.String(), ConvertDomainHashToString(blockHash)) includingBlockHash *externalapi.DomainHash, acceptingBlockHash *externalapi.DomainHash) {
log.Tracef("Removing %s Txs from blockHash %s", txID.String(), includingBlockHash.String())
if _, found := tis.toAdd[txID]; !found { //adding takes precedence if _, found := tis.toAdd[txID]; !found { //adding takes precedence
tis.toRemove[txID] = blockHash tis.toRemove[txID] = &TxData{
IncludingBlockHash: includingBlockHash,
IncludingIndex: includingIndex,
AcceptingBlockHash: acceptingBlockHash,
}
} }
} }
func (tis *txIndexStore) discardAllButPruningPoint() { func (tis *txIndexStore) discardAllButPruningPoint() {
tis.toAdd = make(map[externalapi.DomainTransactionID]*externalapi.DomainHash) tis.toAdd = make(map[externalapi.DomainTransactionID]*TxData)
tis.toRemove = make(map[externalapi.DomainTransactionID]*externalapi.DomainHash) tis.toRemove = make(map[externalapi.DomainTransactionID]*TxData)
tis.virtualParents = nil tis.virtualParents = nil
} }
@ -90,10 +100,10 @@ func (tis *txIndexStore) commit() error {
defer dbTransaction.RollbackUnlessClosed() defer dbTransaction.RollbackUnlessClosed()
for toAddTxID, blockHash := range tis.toAdd { for toAddTxID, txData := range tis.toAdd {
delete(tis.toRemove, toAddTxID) //safeguard delete(tis.toRemove, toAddTxID) //safeguard
key := tis.convertTxIDToKey(txAcceptedIndexBucket, toAddTxID) key := tis.convertTxIDToKey(txAcceptedIndexBucket, toAddTxID)
dbTransaction.Put(key, blockHash.ByteSlice()) dbTransaction.Put(key, serializeTxIndexData(txData))
if err != nil { if err != nil {
return err return err
} }
@ -138,10 +148,10 @@ func (tis *txIndexStore) updateAndCommitPruningPointWithoutTransaction(pruningPo
} }
func (tis *txIndexStore) commitTxIDsWithoutTransaction() error { func (tis *txIndexStore) commitTxIDsWithoutTransaction() error {
for txID, blockHash := range tis.toAdd { for txID, txData := range tis.toAdd {
delete(tis.toRemove, txID) //adding takes precedence delete(tis.toRemove, txID) //adding takes precedence
key := tis.convertTxIDToKey(txAcceptedIndexBucket, txID) key := tis.convertTxIDToKey(txAcceptedIndexBucket, txID)
err := tis.database.Put(key, blockHash.ByteSlice()) err := tis.database.Put(key, serializeTxIndexData(txData))
if err != nil { if err != nil {
return err return err
} }
@ -189,18 +199,18 @@ func (tis *txIndexStore) convertTxIDToKey(bucket *database.Bucket, txID external
} }
func (tis *txIndexStore) stagedData() ( func (tis *txIndexStore) stagedData() (
toAdd map[externalapi.DomainTransactionID]*externalapi.DomainHash, toAdd map[externalapi.DomainTransactionID]*TxData,
toRemove map[externalapi.DomainTransactionID]*externalapi.DomainHash, toRemove map[externalapi.DomainTransactionID]*TxData,
virtualParents []*externalapi.DomainHash, virtualParents []*externalapi.DomainHash,
pruningPoint *externalapi.DomainHash) { pruningPoint *externalapi.DomainHash) {
toAddClone := make(map[externalapi.DomainTransactionID]*externalapi.DomainHash) toAddClone := make(map[externalapi.DomainTransactionID]*TxData)
toRemoveClone := make(map[externalapi.DomainTransactionID]*externalapi.DomainHash) toRemoveClone := make(map[externalapi.DomainTransactionID]*TxData)
for txID, blockHash := range tis.toAdd { for txID, txData := range tis.toAdd {
toAddClone[txID] = blockHash toAddClone[txID] = txData
} }
for txID, blockHash := range tis.toRemove { for txID, txData := range tis.toRemove {
toRemoveClone[txID] = blockHash toRemoveClone[txID] = txData
} }
return toAddClone, toRemoveClone, tis.virtualParents, tis.pruningPoint return toAddClone, toRemoveClone, tis.virtualParents, tis.pruningPoint
} }
@ -209,14 +219,14 @@ func (tis *txIndexStore) isAnythingStaged() bool {
return len(tis.toAdd) > 0 || len(tis.toRemove) > 0 return len(tis.toAdd) > 0 || len(tis.toRemove) > 0
} }
func (tis *txIndexStore) getTxAcceptingBlockHash(txID *externalapi.DomainTransactionID) (blockHash *externalapi.DomainHash, found bool, err error) { func (tis *txIndexStore) getTxData(txID *externalapi.DomainTransactionID) (txData *TxData, found bool, err error) {
if tis.isAnythingStaged() { if tis.isAnythingStaged() {
return nil, false, errors.Errorf("cannot get TX accepting Block hash while staging isn't empty") return nil, false, errors.Errorf("cannot get TX accepting Block hash while staging isn't empty")
} }
key := tis.convertTxIDToKey(txAcceptedIndexBucket, *txID) key := tis.convertTxIDToKey(txAcceptedIndexBucket, *txID)
serializedAcceptingBlockHash, err := tis.database.Get(key) serializedTxData, err := tis.database.Get(key)
if err != nil { if err != nil {
if database.IsNotFoundError(err) { if database.IsNotFoundError(err) {
return nil, false, nil return nil, false, nil
@ -224,39 +234,43 @@ func (tis *txIndexStore) getTxAcceptingBlockHash(txID *externalapi.DomainTransac
return nil, false, err return nil, false, err
} }
acceptingBlockHash, err := externalapi.NewDomainHashFromByteSlice(serializedAcceptingBlockHash) deserializedTxData, err := deserializeTxIndexData(serializedTxData)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
return acceptingBlockHash, true, nil return deserializedTxData, true, nil
} }
func (tis *txIndexStore) getTxAcceptingBlockHashes(txIDs []*externalapi.DomainTransactionID) (acceptingBlockHashes TxIDsToBlockHashes, found bool, err error) { func (tis *txIndexStore) getTxsData(txIDs []*externalapi.DomainTransactionID) (
txsData TxIDsToTxIndexData, notFoundTxIDs []*externalapi.DomainTransactionID, err error) {
if tis.isAnythingStaged() { if tis.isAnythingStaged() {
return nil, false, errors.Errorf("cannot get TX accepting Block hash while staging isn't empty") return nil, nil, errors.Errorf("cannot get TX accepting Block hash while staging isn't empty")
} }
keys := make([]*database.Key, len(txIDs)) keys := make([]*database.Key, len(txIDs))
acceptingBlockHashes = make(TxIDsToBlockHashes) txsData = make(TxIDsToTxIndexData)
notFoundTxIDs = make([]*externalapi.DomainTransactionID, 0)
for i, key := range keys { for i, key := range keys {
key = tis.convertTxIDToKey(txAcceptedIndexBucket, *txIDs[i]) key = tis.convertTxIDToKey(txAcceptedIndexBucket, *txIDs[i])
serializedAcceptingBlockHash, err := tis.database.Get(key) serializedTxData, err := tis.database.Get(key)
if err != nil { if err != nil {
if database.IsNotFoundError(err) { if database.IsNotFoundError(err) {
continue //ignore not found errors we expect this to happen frequently with queries notFoundTxIDs = append(notFoundTxIDs, txIDs[i])
} else {
return nil, nil, err
} }
return nil, false, err
} }
acceptingBlockHash, err := externalapi.NewDomainHashFromByteSlice(serializedAcceptingBlockHash) deserializedTxData, err := deserializeTxIndexData(serializedTxData)
if err != nil { if err != nil {
return nil, false, err return nil, nil, err
} }
acceptingBlockHashes[txIDs[i]] = acceptingBlockHash
txsData[*txIDs[i]] = deserializedTxData
} }
return acceptingBlockHashes, true, nil return txsData, notFoundTxIDs, nil
} }

View File

@ -187,15 +187,21 @@ func (ti *TXIndex) addTXIDs(selectedParentChainChanges *externalapi.SelectedChai
if err != nil { if err != nil {
return err return err
} }
for i := range chainBlocksChunk { for i, acceptingBlock := range chainBlocksChunk {
chainBlockAcceptanceData := chainBlocksAcceptanceData[i] chainBlockAcceptanceData := chainBlocksAcceptanceData[i]
for _, blockAcceptanceData := range chainBlockAcceptanceData { for _, blockAcceptanceData := range chainBlockAcceptanceData {
for _, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData { for j, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData {
log.Tracef("TX index Adding: %d transactions", len(blockAcceptanceData.TransactionAcceptanceData)) log.Warnf("TX index Adding: %d transactions", len(blockAcceptanceData.TransactionAcceptanceData))
if transactionAcceptanceData.IsAccepted { if transactionAcceptanceData.IsAccepted {
if err != nil {
return err
}
transactionID := consensushashing.TransactionID(transactionAcceptanceData.Transaction)
ti.store.add( ti.store.add(
*consensushashing.TransactionID(transactionAcceptanceData.Transaction), *transactionID,
blockAcceptanceData.BlockHash, uint32(j), //index of including block where transaction is found
blockAcceptanceData.BlockHash, //this is the including block
acceptingBlock, //this is the accepting block
) )
} }
} }
@ -222,15 +228,17 @@ func (ti *TXIndex) removeTXIDs(selectedParentChainChanges *externalapi.SelectedC
if err != nil { if err != nil {
return err return err
} }
for i := range chainBlocksChunk { for i, acceptingBlockHash := range chainBlocksChunk {
chainBlockAcceptanceData := chainBlocksAcceptanceData[i] chainBlockAcceptanceData := chainBlocksAcceptanceData[i]
for _, blockAcceptanceData := range chainBlockAcceptanceData { for _, blockAcceptanceData := range chainBlockAcceptanceData {
log.Tracef("TX index Removing: %d transactions", len(blockAcceptanceData.TransactionAcceptanceData)) log.Tracef("TX index Removing: %d transactions", len(blockAcceptanceData.TransactionAcceptanceData))
for _, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData { for j, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData {
if transactionAcceptanceData.IsAccepted { if transactionAcceptanceData.IsAccepted {
ti.store.remove( ti.store.remove(
*consensushashing.TransactionID(transactionAcceptanceData.Transaction), *consensushashing.TransactionID(transactionAcceptanceData.Transaction),
uint32(j),
blockAcceptanceData.BlockHash, blockAcceptanceData.BlockHash,
acceptingBlockHash,
) )
} }
} }
@ -242,14 +250,15 @@ func (ti *TXIndex) removeTXIDs(selectedParentChainChanges *externalapi.SelectedC
} }
// TXAcceptingBlockHash returns the accepting block hash for for the given txID // TXAcceptingBlockHash returns the accepting block hash for for the given txID
func (ti *TXIndex) TXAcceptingBlockHash(txID *externalapi.DomainTransactionID) (acceptingBlockHash *externalapi.DomainHash, found bool, err error) { func (ti *TXIndex) TXAcceptingBlockHash(txID *externalapi.DomainTransactionID) (
acceptingBlockHash *externalapi.DomainHash, found bool, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXAcceptingBlockHash") onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXAcceptingBlockHash")
defer onEnd() defer onEnd()
ti.mutex.Lock() ti.mutex.Lock()
defer ti.mutex.Unlock() defer ti.mutex.Unlock()
acceptingBlockHash, found, err = ti.store.getTxAcceptingBlockHash(txID) txData, found, err := ti.store.getTxData(txID)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -257,26 +266,29 @@ func (ti *TXIndex) TXAcceptingBlockHash(txID *externalapi.DomainTransactionID) (
return nil, false, nil return nil, false, nil
} }
return acceptingBlockHash, found, nil return txData.AcceptingBlockHash, found, nil
} }
// TXAcceptingBlockHashes returns the accepting block hashes for for the given txIDs // TXAcceptingBlockHashes returns the accepting block hashes for for the given txIDs
func (ti *TXIndex) TXAcceptingBlockHashes(txIDs []*externalapi.DomainTransactionID) (acceptingBlockHashes TxIDsToBlockHashes, found bool, err error) { func (ti *TXIndex) TXAcceptingBlockHashes(txIDs []*externalapi.DomainTransactionID) (
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXAcceptingBlockHash") txIDsToAcceptingBlockHashes TxIDsToBlockHashes, missingTxIds []*externalapi.DomainTransactionID, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXAcceptingBlockHashes")
defer onEnd() defer onEnd()
ti.mutex.Lock() ti.mutex.Lock()
defer ti.mutex.Unlock() defer ti.mutex.Unlock()
acceptingBlockHashes, found, err = ti.store.getTxAcceptingBlockHashes(txIDs) txIDsToTxIndexData, missingTxIds, err := ti.store.getTxsData(txIDs)
if err != nil { if err != nil {
return nil, false, err return nil, nil, err
}
if !found {
return nil, false, nil
} }
return acceptingBlockHashes, found, nil txIDsToAcceptingBlockHashes = make(TxIDsToBlockHashes)
for txID, txIndexData := range txIDsToTxIndexData {
txIDsToAcceptingBlockHashes[txID] = txIndexData.AcceptingBlockHash
}
return txIDsToAcceptingBlockHashes, missingTxIds, nil
} }
// TXAcceptingBlock returns the accepting block for for the given txID // TXAcceptingBlock returns the accepting block for for the given txID
@ -288,13 +300,16 @@ func (ti *TXIndex) TXAcceptingBlock(txID *externalapi.DomainTransactionID) (
ti.mutex.Lock() ti.mutex.Lock()
defer ti.mutex.Unlock() defer ti.mutex.Unlock()
acceptingBlockHash, found, err := ti.store.getTxAcceptingBlockHash(txID) txIndexData, found, err := ti.store.getTxData(txID)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
acceptingBlock, err := ti.domain.Consensus().GetBlock(acceptingBlockHash) acceptingBlock, err := ti.domain.Consensus().GetBlockEvenIfHeaderOnly(txIndexData.AcceptingBlockHash)
if err != nil { if err != nil {
if database.IsNotFoundError(err) {
return nil, false, fmt.Errorf("accepting block %s missing for txID %s ", txIndexData.AcceptingBlockHash.String(), txID.String())
}
return nil, false, err return nil, false, err
} }
return acceptingBlock, true, nil return acceptingBlock, true, nil
@ -302,113 +317,104 @@ func (ti *TXIndex) TXAcceptingBlock(txID *externalapi.DomainTransactionID) (
// TXAcceptingBlocks returns the accepting blocks for for the given txIDs // TXAcceptingBlocks returns the accepting blocks for for the given txIDs
func (ti *TXIndex) TXAcceptingBlocks(txIDs []*externalapi.DomainTransactionID) ( func (ti *TXIndex) TXAcceptingBlocks(txIDs []*externalapi.DomainTransactionID) (
acceptingBlocks TxIDsToBlocks, found bool, err error) { txIDsToAcceptingBlocks TxIDsToBlocks, notFound []*externalapi.DomainTransactionID, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXAcceptingBlock") onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXAcceptingBlocks")
defer onEnd() defer onEnd()
ti.mutex.Lock() ti.mutex.Lock()
defer ti.mutex.Unlock() defer ti.mutex.Unlock()
acceptingBlockHashTxIDPairs, found, err := ti.store.getTxAcceptingBlockHashes(txIDs) txIDsToTxIndexData, notFound, err := ti.store.getTxsData(txIDs)
if err != nil { if err != nil {
return nil, false, err return nil, nil, err
} }
acceptingBlocks = make(TxIDsToBlocks) txIDsToAcceptingBlocks = make(TxIDsToBlocks)
i := 0
for txID, blockHash := range acceptingBlockHashTxIDPairs { for txID, txIndexData := range txIDsToTxIndexData {
acceptingBlocks[txID], err = ti.domain.Consensus().GetBlock(blockHash) txIDsToAcceptingBlocks[txID], err = ti.domain.Consensus().GetBlockEvenIfHeaderOnly(txIndexData.AcceptingBlockHash)
if err != nil { if err != nil {
if database.IsNotFoundError(err) { if database.IsNotFoundError(err) {
continue // ignore return nil, nil, fmt.Errorf("accepting block %s missing for txID %s ", txIndexData.IncludingBlockHash.String(), txID.String())
} else {
return nil, false, err
} }
return nil, notFound, err
} }
i++
} }
return acceptingBlocks, true, nil return txIDsToAcceptingBlocks, notFound, nil
} }
// GetTX returns the domain transaction for for the given txID // GetTX returns the domain transaction for for the given txID
func (ti *TXIndex) GetTX(txID *externalapi.DomainTransactionID) ( func (ti *TXIndex) GetTX(txID *externalapi.DomainTransactionID) (
block *externalapi.DomainTransaction, found bool, err error) { tx *externalapi.DomainTransaction, found bool, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.GetTX") onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.GetTX")
defer onEnd() defer onEnd()
ti.mutex.Lock() ti.mutex.Lock()
defer ti.mutex.Unlock() defer ti.mutex.Unlock()
acceptingBlockHash, found, err := ti.store.getTxAcceptingBlockHash(txID) txIndexData, found, err := ti.store.getTxData(txID)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
acceptingBlock, err := ti.domain.Consensus().GetBlock(acceptingBlockHash) acceptingBlock, err := ti.domain.Consensus().GetBlock(txIndexData.AcceptingBlockHash)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
for i := range acceptingBlock.Transactions { return acceptingBlock.Transactions[txIndexData.IncludingIndex], true, nil
if consensushashing.TransactionID(acceptingBlock.Transactions[i]).Equal(txID) {
return acceptingBlock.Transactions[i].Clone(), true, nil
}
}
return nil, false, fmt.Errorf("Could not find transaction with ID %s in Txindex database", txID.String())
} }
// GetTXs returns the domain transaction for for the given txIDs // GetTXs returns the domain transaction for for the given txIDs
func (ti *TXIndex) GetTXs(txIDs []*externalapi.DomainTransactionID) ( func (ti *TXIndex) GetTXs(txIDs []*externalapi.DomainTransactionID) (
txs []*externalapi.DomainTransaction, found bool, err error) { txs []*externalapi.DomainTransaction, notFound []*externalapi.DomainTransactionID, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.GetTXs") onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.GetTXs")
defer onEnd() defer onEnd()
ti.mutex.Lock() ti.mutex.Lock()
defer ti.mutex.Unlock() defer ti.mutex.Unlock()
acceptingBlockHashes, found, err := ti.store.getTxAcceptingBlockHashes(txIDs) txIDsToTxIndexData, notFound, err := ti.store.getTxsData(txIDs)
if err != nil { if err != nil {
return nil, false, err return nil, nil, err
} }
txs = make([]*externalapi.DomainTransaction, 0) txs = make([]*externalapi.DomainTransaction, len(txIDsToTxIndexData))
for txID, acceptingBlockHash := range acceptingBlockHashes { i := 0
acceptingBlock, err := ti.domain.Consensus().GetBlock(acceptingBlockHash)
for txID, txIndexData := range txIDsToTxIndexData {
includingBlock, err := ti.domain.Consensus().GetBlockEvenIfHeaderOnly(txIndexData.IncludingBlockHash)
if err != nil { if err != nil {
if database.IsNotFoundError(err) { if database.IsNotFoundError(err) {
continue // ignore return nil, nil, fmt.Errorf("including block %s missing for txID %s ", txIndexData.IncludingBlockHash.String(), txID.String())
} else {
return nil, false, err
}
}
for _, tx := range acceptingBlock.Transactions {
if consensushashing.TransactionID(tx).Equal(txID) {
txs = append(txs, tx)
} }
return nil, nil, err
} }
txs[i] = includingBlock.Transactions[txIndexData.IncludingIndex]
i++
} }
return txs, true, nil return txs, notFound, nil
} }
// GetTXConfirmations returns the tx confirmations for for the given txID // GetTXConfirmations returns the tx confirmations for for the given txID
func (ti *TXIndex) GetTXConfirmations(txID *externalapi.DomainTransactionID) ( func (ti *TXIndex) GetTXConfirmations(txID *externalapi.DomainTransactionID) (
Confirmations int64, found bool, err error) { confirmations int64, found bool, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.GetTXConfirmations") onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.GetTXConfirmations")
defer onEnd() defer onEnd()
ti.mutex.Lock() ti.mutex.Lock()
defer ti.mutex.Unlock() defer ti.mutex.Unlock()
acceptingBlockHash, found, err := ti.store.getTxAcceptingBlockHash(txID) txdata, found, err := ti.store.getTxData(txID)
if err != nil { if err != nil {
return 0, false, err return 0, false, err
} }
acceptingBlockHeader, err := ti.domain.Consensus().GetBlockHeader(acceptingBlockHash) acceptingBlockHeader, err := ti.domain.Consensus().GetBlockHeader(txdata.AcceptingBlockHash)
if err != nil { if err != nil {
return -1, false, err return -1, false, err
} }
@ -423,7 +429,7 @@ func (ti *TXIndex) GetTXConfirmations(txID *externalapi.DomainTransactionID) (
// GetTXsConfirmations returns the tx confirmations for for the given txIDs // GetTXsConfirmations returns the tx confirmations for for the given txIDs
func (ti *TXIndex) GetTXsConfirmations(txIDs []*externalapi.DomainTransactionID) ( func (ti *TXIndex) GetTXsConfirmations(txIDs []*externalapi.DomainTransactionID) (
Confirmations TxIDsToConfirmations, found bool, err error) { txIDsToConfirmations TxIDsToConfirmations, notFound []*externalapi.DomainTransactionID, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.GetTXsConfirmations") onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.GetTXsConfirmations")
defer onEnd() defer onEnd()
@ -432,51 +438,124 @@ func (ti *TXIndex) GetTXsConfirmations(txIDs []*externalapi.DomainTransactionID)
virtualBlock, err := ti.domain.Consensus().GetVirtualInfo() virtualBlock, err := ti.domain.Consensus().GetVirtualInfo()
if err != nil { if err != nil {
return nil, false, err return nil, nil, err
} }
acceptingBlockHashes, _, err := ti.store.getTxAcceptingBlockHashes(txIDs) txIDsToTxIndexData, _, err := ti.store.getTxsData(txIDs)
if err != nil { if err != nil {
return nil, false, err return nil, nil, err
} }
Confirmations = make(TxIDsToConfirmations) txIDsToConfirmations = make(TxIDsToConfirmations)
for txID, acceptingBlockHash := range acceptingBlockHashes { for txID, txIndexData := range txIDsToTxIndexData {
acceptingBlockHeader, err := ti.domain.Consensus().GetBlockHeader(acceptingBlockHash) acceptingBlockHeader, err := ti.domain.Consensus().GetBlockHeader(txIndexData.AcceptingBlockHash)
if err != nil { if err != nil {
return nil, false, err if database.IsNotFoundError(err) {
return nil, nil, fmt.Errorf("including block %s missing for txID %s ", txIndexData.IncludingBlockHash.String(), txID.String())
}
return nil, nil, err
} }
Confirmations[txID] = int64(virtualBlock.BlueScore - acceptingBlockHeader.BlueScore()) txIDsToConfirmations[txID] = int64(virtualBlock.BlueScore - acceptingBlockHeader.BlueScore())
} }
return Confirmations, true, nil return txIDsToConfirmations, notFound, nil
} }
// TXIncludingBlockHash returns the including block hash for the given txID // TXIncludingBlockHash returns the including block hash for the given txID
func (ti *TXIndex) TXIncludingBlockHash(txID *externalapi.DomainTransactionID) (includingBlockHash *externalapi.DomainHash, found bool, err error) { func (ti *TXIndex) TXIncludingBlockHash(txID *externalapi.DomainTransactionID) (includingBlockHash *externalapi.DomainHash, found bool, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXAcceptingBlock") onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXIncludingBlockHash")
defer onEnd() defer onEnd()
ti.mutex.Lock() ti.mutex.Lock()
defer ti.mutex.Unlock() defer ti.mutex.Unlock()
acceptingBlockHash, found, err := ti.store.getTxAcceptingBlockHash(txID) txIndexData, found, err := ti.store.getTxData(txID)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
acceptanceData, err := ti.domain.Consensus().GetBlockAcceptanceData(acceptingBlockHash) return txIndexData.IncludingBlockHash, true, nil
}
// TXIncludingBlockHashes returns the including block hashes for for the given txI
func (ti *TXIndex) TXIncludingBlockHashes(txIDs []*externalapi.DomainTransactionID) (
txIDsToIncludinglockHashes TxIDsToBlockHashes, missingTxIds []*externalapi.DomainTransactionID, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXIncludingBlockHashes")
defer onEnd()
ti.mutex.Lock()
defer ti.mutex.Unlock()
txIDsToTxIndexData, notFound, err := ti.store.getTxsData(txIDs)
if err != nil { if err != nil {
return nil, false, err return nil, nil, err
} }
for _, blockAcceptanceData := range acceptanceData { txIDsToIncludinglockHashes = make(TxIDsToBlockHashes)
for _, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData {
if consensushashing.TransactionID(transactionAcceptanceData.Transaction).Equal(txID) { for txID, txIndexData := range txIDsToTxIndexData {
return blockAcceptanceData.BlockHash, true, nil txIDsToIncludinglockHashes[txID] = txIndexData.IncludingBlockHash
}
return txIDsToIncludinglockHashes, notFound, nil
}
// TXIncludingBlocks returns the including block hashes for for the given txIDs
func (ti *TXIndex) TXIncludingBlocks(txIDs []*externalapi.DomainTransactionID) (
txIDsToIncludingBlocks TxIDsToBlocks, notFound []*externalapi.DomainTransactionID, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.TXIncludingBlocks")
defer onEnd()
ti.mutex.Lock()
defer ti.mutex.Unlock()
txIDsToTxIndexData, notFound, err := ti.store.getTxsData(txIDs)
if err != nil {
return nil, nil, err
}
txIDsToIncludingBlocks = make(TxIDsToBlocks)
for txID, txIndexData := range txIDsToTxIndexData {
txIDsToIncludingBlocks[txID], err = ti.domain.Consensus().GetBlockEvenIfHeaderOnly(txIndexData.IncludingBlockHash)
if err != nil {
if database.IsNotFoundError(err) {
return nil, nil, fmt.Errorf("including block %s missing for txID %s ", txIndexData.IncludingBlockHash.String(), txID.String())
} }
return nil, nil, err
} }
} }
return nil, false, fmt.Errorf("Could not find including blockHash for transaction with ID %s in Txindex database", txID.String()) return txIDsToIncludingBlocks, notFound, nil
}
// GetTXsBlueScores returns the tx's accepting bluescore for for the given txID
// Note: this is a optimization function to store and dynamically calc. tx confirmations with access to to virtual bluescore
// such as in the case of rpc confirmation notification listeners
func (ti *TXIndex) GetTXsBlueScores(txIDs []*externalapi.DomainTransactionID) (
txIDsToBlueScores TxIDsToBlueScores, notFound []*externalapi.DomainTransactionID, err error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "TXIndex.GetTXsBlueScores")
defer onEnd()
ti.mutex.Lock()
defer ti.mutex.Unlock()
txIDsToTxIndexData, notFound, err := ti.store.getTxsData(txIDs)
if err != nil {
return nil, nil, err
}
txIDsToBlueScores = make(TxIDsToBlueScores)
for txID, txIndexData := range txIDsToTxIndexData {
acceptingBlockHeader, err := ti.domain.Consensus().GetBlockHeader(txIndexData.AcceptingBlockHash)
if err != nil {
if database.IsNotFoundError(err) {
return nil, nil, fmt.Errorf("Accepting block %s missing for txID %s ", txIndexData.AcceptingBlockHash.String(), txID.String())
}
return nil, nil, err
}
txIDsToBlueScores[txID] = acceptingBlockHeader.BlueScore()
}
return txIDsToBlueScores, notFound, nil
} }