From 3a33c0ffa81319ec0abe372ab05175914b1d9e17 Mon Sep 17 00:00:00 2001 From: D-Stacks Date: Mon, 20 Jun 2022 16:32:26 +0200 Subject: [PATCH] starting checkpoint --- .../database/serialization/dbobjects.pb.go | 102 +++++++-- .../database/serialization/dbobjects.proto | 5 + .../database/serialization/tx_block_data.go | 37 ++++ domain/txindex.go/log.go | 11 + domain/txindex.go/model.go | 27 +++ domain/txindex.go/serialization.go | 72 +++++++ domain/txindex.go/serialization_test.go | 44 ++++ domain/txindex.go/store.go | 203 ++++++++++++++++++ domain/txindex.go/txindex.go | 200 +++++++++++++++++ 9 files changed, 689 insertions(+), 12 deletions(-) create mode 100644 domain/consensus/database/serialization/tx_block_data.go create mode 100644 domain/txindex.go/log.go create mode 100644 domain/txindex.go/model.go create mode 100644 domain/txindex.go/serialization.go create mode 100644 domain/txindex.go/serialization_test.go create mode 100644 domain/txindex.go/store.go create mode 100644 domain/txindex.go/txindex.go diff --git a/domain/consensus/database/serialization/dbobjects.pb.go b/domain/consensus/database/serialization/dbobjects.pb.go index f237621c3..943ce895c 100644 --- a/domain/consensus/database/serialization/dbobjects.pb.go +++ b/domain/consensus/database/serialization/dbobjects.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.0 +// protoc-gen-go v1.26.0 // protoc v3.17.2 // source: dbobjects.proto @@ -1743,6 +1743,61 @@ func (x *DbBlockGHOSTDAGDataHashPair) GetGhostdagData() *DbBlockGhostdagData { return nil } +type DbTxBlockData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AcceptingBlock *DbHash `protobuf:"bytes,1,opt,name=acceptingBlock,proto3" json:"acceptingBlock,omitempty"` + MergeBlock *DbHash `protobuf:"bytes,2,opt,name=mergeBlock,proto3" json:"mergeBlock,omitempty"` +} + +func (x *DbTxBlockData) Reset() { + *x = DbTxBlockData{} + if protoimpl.UnsafeEnabled { + mi := &file_dbobjects_proto_msgTypes[29] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DbTxBlockData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DbTxBlockData) ProtoMessage() {} + +func (x *DbTxBlockData) ProtoReflect() protoreflect.Message { + mi := &file_dbobjects_proto_msgTypes[29] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DbTxBlockData.ProtoReflect.Descriptor instead. +func (*DbTxBlockData) Descriptor() ([]byte, []int) { + return file_dbobjects_proto_rawDescGZIP(), []int{29} +} + +func (x *DbTxBlockData) GetAcceptingBlock() *DbHash { + if x != nil { + return x.AcceptingBlock + } + return nil +} + +func (x *DbTxBlockData) GetMergeBlock() *DbHash { + if x != nil { + return x.MergeBlock + } + return nil +} + var File_dbobjects_proto protoreflect.FileDescriptor var file_dbobjects_proto_rawDesc = []byte{ @@ -2000,10 +2055,18 @@ var file_dbobjects_proto_rawDesc = []byte{ 0x2e, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x62, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x47, 0x68, 0x6f, 0x73, 0x74, 0x64, 0x61, 0x67, 0x44, 0x61, 0x74, 0x61, 0x52, 0x0c, 0x47, 0x68, 0x6f, 0x73, 0x74, 0x64, 0x61, 0x67, 0x44, 0x61, 0x74, 0x61, - 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, - 0x61, 0x73, 0x70, 0x61, 0x6e, 0x65, 0x74, 0x2f, 0x6b, 0x61, 0x73, 0x70, 0x61, 0x64, 0x2f, 0x73, - 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x85, 0x01, 0x0a, 0x0d, 0x44, 0x62, 0x54, 0x78, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x44, 0x61, + 0x74, 0x61, 0x12, 0x3d, 0x0a, 0x0e, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x69, 0x6e, 0x67, 0x42, + 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x73, 0x65, 0x72, + 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x62, 0x48, 0x61, 0x73, + 0x68, 0x52, 0x0e, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x69, 0x6e, 0x67, 0x42, 0x6c, 0x6f, 0x63, + 0x6b, 0x12, 0x35, 0x0a, 0x0a, 0x6d, 0x65, 0x72, 0x67, 0x65, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x62, 0x48, 0x61, 0x73, 0x68, 0x52, 0x0a, 0x6d, 0x65, + 0x72, 0x67, 0x65, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x61, 0x73, 0x70, 0x61, 0x6e, 0x65, 0x74, 0x2f, + 0x6b, 0x61, 0x73, 0x70, 0x61, 0x64, 0x2f, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -2018,7 +2081,7 @@ func file_dbobjects_proto_rawDescGZIP() []byte { return file_dbobjects_proto_rawDescData } -var file_dbobjects_proto_msgTypes = make([]protoimpl.MessageInfo, 29) +var file_dbobjects_proto_msgTypes = make([]protoimpl.MessageInfo, 30) var file_dbobjects_proto_goTypes = []interface{}{ (*DbBlock)(nil), // 0: serialization.DbBlock (*DbBlockHeader)(nil), // 1: serialization.DbBlockHeader @@ -2049,6 +2112,7 @@ var file_dbobjects_proto_goTypes = []interface{}{ (*DbBlockCount)(nil), // 26: serialization.DbBlockCount (*DbBlockHeaderCount)(nil), // 27: serialization.DbBlockHeaderCount (*DbBlockGHOSTDAGDataHashPair)(nil), // 28: serialization.DbBlockGHOSTDAGDataHashPair + (*DbTxBlockData)(nil), // 29: serialization.DbTxBlockData } var file_dbobjects_proto_depIdxs = []int32{ 1, // 0: serialization.DbBlock.header:type_name -> serialization.DbBlockHeader @@ -2090,11 +2154,13 @@ var file_dbobjects_proto_depIdxs = []int32{ 3, // 36: serialization.DbTips.tips:type_name -> serialization.DbHash 3, // 37: serialization.DbBlockGHOSTDAGDataHashPair.hash:type_name -> serialization.DbHash 15, // 38: serialization.DbBlockGHOSTDAGDataHashPair.GhostdagData:type_name -> serialization.DbBlockGhostdagData - 39, // [39:39] is the sub-list for method output_type - 39, // [39:39] is the sub-list for method input_type - 39, // [39:39] is the sub-list for extension type_name - 39, // [39:39] is the sub-list for extension extendee - 0, // [0:39] is the sub-list for field type_name + 3, // 39: serialization.DbTxBlockData.acceptingBlock:type_name -> serialization.DbHash + 3, // 40: serialization.DbTxBlockData.mergeBlock:type_name -> serialization.DbHash + 41, // [41:41] is the sub-list for method output_type + 41, // [41:41] is the sub-list for method input_type + 41, // [41:41] is the sub-list for extension type_name + 41, // [41:41] is the sub-list for extension extendee + 0, // [0:41] is the sub-list for field type_name } func init() { file_dbobjects_proto_init() } @@ -2451,6 +2517,18 @@ func file_dbobjects_proto_init() { return nil } } + file_dbobjects_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DbTxBlockData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -2458,7 +2536,7 @@ func file_dbobjects_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_dbobjects_proto_rawDesc, NumEnums: 0, - NumMessages: 29, + NumMessages: 30, NumExtensions: 0, NumServices: 0, }, diff --git a/domain/consensus/database/serialization/dbobjects.proto b/domain/consensus/database/serialization/dbobjects.proto index 1248ad33d..98d3586fc 100644 --- a/domain/consensus/database/serialization/dbobjects.proto +++ b/domain/consensus/database/serialization/dbobjects.proto @@ -163,3 +163,8 @@ message DbBlockGHOSTDAGDataHashPair { DbHash hash = 1; DbBlockGhostdagData GhostdagData = 2; } + +message DbTxBlockData { + DbHash acceptingBlock = 1; + DbHash mergeBlock = 2; +} \ No newline at end of file diff --git a/domain/consensus/database/serialization/tx_block_data.go b/domain/consensus/database/serialization/tx_block_data.go new file mode 100644 index 000000000..d120c5392 --- /dev/null +++ b/domain/consensus/database/serialization/tx_block_data.go @@ -0,0 +1,37 @@ +package serialization + +import ( + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" +) + +// DbHashToDomainHash converts a DbHash to a DomainHash +func DbTToDomainHash(dbHash *DbHash) (*externalapi.DomainHash, error) { + return externalapi.NewDomainHashFromByteSlice(dbHash.Hash) +} + +// DomainHashToDbHash converts a DomainHash to a DbHash +func DomainHashToDbHash(domainHash *externalapi.DomainHash) *DbHash { + return &DbHash{Hash: domainHash.ByteSlice()} +} + +// DomainHashesToDbHashes converts a slice of DomainHash to a slice of DbHash +func DomainHashesToDbHashes(domainHashes []*externalapi.DomainHash) []*DbHash { + dbHashes := make([]*DbHash, len(domainHashes)) + for i, domainHash := range domainHashes { + dbHashes[i] = DomainHashToDbHash(domainHash) + } + return dbHashes +} + +// DbHashesToDomainHashes converts a slice of DbHash to a slice of DomainHash +func DbHashesToDomainHashes(dbHashes []*DbHash) ([]*externalapi.DomainHash, error) { + domainHashes := make([]*externalapi.DomainHash, len(dbHashes)) + for i, domainHash := range dbHashes { + var err error + domainHashes[i], err = DbHashToDomainHash(domainHash) + if err != nil { + return nil, err + } + } + return domainHashes, nil +} diff --git a/domain/txindex.go/log.go b/domain/txindex.go/log.go new file mode 100644 index 000000000..898b14a7b --- /dev/null +++ b/domain/txindex.go/log.go @@ -0,0 +1,11 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package txindex + +import ( + "github.com/kaspanet/kaspad/infrastructure/logger" +) + +var log = logger.RegisterSubSystem("TXIN") diff --git a/domain/txindex.go/model.go b/domain/txindex.go/model.go new file mode 100644 index 000000000..9638647c6 --- /dev/null +++ b/domain/txindex.go/model.go @@ -0,0 +1,27 @@ +package txindex + +import ( + "encoding/binary" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" +) + +// TxBlockData represents +type TxBlockData struct { + acceptingBlockHash *externalapi.DomainHash + mergeBlockHash *externalapi.DomainHash +} + +// TxAcceptingChanges is the set of changes made to the tx index after +// a successful update +type TxAcceptingChanges struct { + toAddAccepting map[externalapi.DomainHash][]*externalapi.DomainHash + toRemoveAccepting map[externalapi.DomainHash][]*externalapi.DomainHash +} + +// TxAcceptingChanges is the set of changes made to the tx index after +// a successful update +type TxMergingChanges struct { + toAddMerge map[externalapi.DomainHash][]*externalapi.DomainHash + toRemoveMeroge map[externalapi.DomainHash][]*externalapi.DomainHash +} + diff --git a/domain/txindex.go/serialization.go b/domain/txindex.go/serialization.go new file mode 100644 index 000000000..2ec144ea1 --- /dev/null +++ b/domain/txindex.go/serialization.go @@ -0,0 +1,72 @@ +package txindex + +import ( + "encoding/binary" + "github.com/kaspanet/kaspad/domain/consensus/database/serialization" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/pkg/errors" + "google.golang.org/protobuf/proto" + "io" +) + +func serializeOutpoint(outpoint *externalapi.DomainOutpoint) ([]byte, error) { + dbOutpoint := serialization.DomainOutpointToDbOutpoint(outpoint) + return proto.Marshal(dbOutpoint) +} + +func deserializeOutpoint(serializedOutpoint []byte) (*externalapi.DomainOutpoint, error) { + var dbOutpoint serialization.DbOutpoint + err := proto.Unmarshal(serializedOutpoint, &dbOutpoint) + if err != nil { + return nil, err + } + return serialization.DbOutpointToDomainOutpoint(&dbOutpoint) +} + +func serializeUTXOEntry(utxoEntry externalapi.UTXOEntry) ([]byte, error) { + dbUTXOEntry := serialization.UTXOEntryToDBUTXOEntry(utxoEntry) + return proto.Marshal(dbUTXOEntry) +} + +func deserializeUTXOEntry(serializedUTXOEntry []byte) (externalapi.UTXOEntry, error) { + var dbUTXOEntry serialization.DbUtxoEntry + err := proto.Unmarshal(serializedUTXOEntry, &dbUTXOEntry) + if err != nil { + return nil, err + } + return serialization.DBUTXOEntryToUTXOEntry(&dbUTXOEntry) +} + +const hashesLengthSize = 8 + +func serializeHashes(hashes []*externalapi.DomainHash) []byte { + serializedHashes := make([]byte, hashesLengthSize+externalapi.DomainHashSize*len(hashes)) + binary.LittleEndian.PutUint64(serializedHashes[:hashesLengthSize], uint64(len(hashes))) + for i, hash := range hashes { + start := hashesLengthSize + externalapi.DomainHashSize*i + end := start + externalapi.DomainHashSize + copy(serializedHashes[start:end], hash.ByteSlice()) + } + return serializedHashes +} + +func deserializeHashes(serializedHashes []byte) ([]*externalapi.DomainHash, error) { + length := binary.LittleEndian.Uint64(serializedHashes[:hashesLengthSize]) + hashes := make([]*externalapi.DomainHash, length) + for i := uint64(0); i < length; i++ { + start := hashesLengthSize + externalapi.DomainHashSize*i + end := start + externalapi.DomainHashSize + + if end > uint64(len(serializedHashes)) { + return nil, errors.Wrapf(io.ErrUnexpectedEOF, "unexpected EOF while deserializing hashes") + } + + var err error + hashes[i], err = externalapi.NewDomainHashFromByteSlice(serializedHashes[start:end]) + if err != nil { + return nil, err + } + } + + return hashes, nil +} diff --git a/domain/txindex.go/serialization_test.go b/domain/txindex.go/serialization_test.go new file mode 100644 index 000000000..34ff59dc3 --- /dev/null +++ b/domain/txindex.go/serialization_test.go @@ -0,0 +1,44 @@ +package txindex + +import ( + "encoding/binary" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/pkg/errors" + "io" + "math/rand" + "testing" +) + +func Test_serializeHashes(t *testing.T) { + r := rand.New(rand.NewSource(0)) + + for length := 0; length < 32; length++ { + hashes := make([]*externalapi.DomainHash, length) + for i := range hashes { + var hashBytes [32]byte + r.Read(hashBytes[:]) + hashes[i] = externalapi.NewDomainHashFromByteArray(&hashBytes) + } + result, err := deserializeHashes(serializeHashes(hashes)) + if err != nil { + t.Fatalf("Failed deserializing hashes: %v", err) + } + if !externalapi.HashesEqual(hashes, result) { + t.Fatalf("Expected \n %s \n==\n %s\n", hashes, result) + } + } +} + +func Test_deserializeHashesFailure(t *testing.T) { + hashes := []*externalapi.DomainHash{ + externalapi.NewDomainHashFromByteArray(&[externalapi.DomainHashSize]byte{1}), + externalapi.NewDomainHashFromByteArray(&[externalapi.DomainHashSize]byte{2}), + externalapi.NewDomainHashFromByteArray(&[externalapi.DomainHashSize]byte{3}), + } + serialized := serializeHashes(hashes) + binary.LittleEndian.PutUint64(serialized[:8], uint64(len(hashes)+1)) + _, err := deserializeHashes(serialized) + if !errors.Is(err, io.ErrUnexpectedEOF) { + t.Fatalf("Expected error to be EOF, instead got: %v", err) + } +} diff --git a/domain/txindex.go/store.go b/domain/txindex.go/store.go new file mode 100644 index 000000000..47e25af7b --- /dev/null +++ b/domain/txindex.go/store.go @@ -0,0 +1,203 @@ +package txindex + +import ( + "encoding/binary" + + "github.com/kaspanet/kaspad/domain/consensus/database/binaryserialization" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/kaspanet/kaspad/infrastructure/db/database" + "github.com/kaspanet/kaspad/infrastructure/logger" + "github.com/pkg/errors" +) + +var txMergeIndexBucket = database.MakeBucket([]byte("tx-merge-index")) +var txAcceptedIndexBuced = database.MakeBucket([]byte("tx-accepted-index")) +var virtualParentsKey = database.MakeBucket([]byte("")).Key([]byte("tx-index-virtual-parents")) +var ghostDagBlocksKey = database.MakeBucket([]byte("")).Key([]byte("tx-index-ghostdagblocks")) + +type txIndexStore struct { + database database.Database + toAddMerge map[externalapi.DomainHash][]*externalapi.DomainTransactionID + toRemoveMerge map[externalapi.DomainHash][]*externalapi.DomainTransactionID + toAddAccepted map[externalapi.DomainHash][]*externalapi.DomainTransactionID + toRemoveAccepted map[externalapi.DomainHash][]*externalapi.DomainTransactionID + virtualParents []*externalapi.DomainHash + ghostdagBlocks []*externalapi.DomainHash +} + +func newTXIndexStore(database database.Database) *txIndexStore { + return &txIndexStore{ + database: database, + toAddMerge: make(map[externalapi.DomainHash][]*externalapi.DomainTransactionID), + toRemoveMerge: make(map[externalapi.DomainHash][]*externalapi.DomainTransactionID), + toAddAccepted: make(map[externalapi.DomainHash][]*externalapi.DomainTransactionID), + toRemoveAccepted: make(map[externalapi.DomainHash][]*externalapi.DomainTransactionID), + virtualParents: nil, + ghostdagBlocks: nil, + } +} + +func (tis *txIndexStore) addMerged(txIDs []*externalapi.DomainTransactionID, mergingBlockHash *externalapi.DomainHash) { + log.Tracef("Adding %d Txs from mergingBlockHash %s", len(txIDs), mergingBlockHash.String()) + if _, found := tis.toRemoveMerge[*mergingBlockHash]; found { + delete(tis.toRemoveMerge, *mergingBlockHash) + } + tis.toAddMerge[*mergingBlockHash] = txIDs +} + +func (tis *txIndexStore) removeMerged(txIDs []*externalapi.DomainTransactionID, mergingBlockHash *externalapi.DomainHash) { + log.Tracef("Removing %d Txs from mergingBlockHash %s", len(txIDs), mergingBlockHash.String()) + if _, found := tis.toAddMerge[*mergingBlockHash]; found { + delete(tis.toAddMerge, *mergingBlockHash) + } + tis.toRemoveMerge[*mergingBlockHash] = txIDs +} + +func (tis *txIndexStore) addAccepted(txIDs []*externalapi.DomainTransactionID, acceptingBlockHash *externalapi.DomainHash) { + log.Tracef("Adding %d Txs from acceptingBlockHash %s", len(txIDs), acceptingBlockHash.String()) + if _, found := tis.toRemoveAccepted[*acceptingBlockHash]; found { + delete(tis.toRemoveAccepted, *acceptingBlockHash) + } + tis.toAddAccepted[*acceptingBlockHash] = txIDs +} + +func (tis *txIndexStore) removeAccepted(txIDs []*externalapi.DomainTransactionID, acceptingBlockHash *externalapi.DomainHash) { + log.Tracef("Removing %d Txs from acceptingBlockHash %s", len(txIDs), acceptingBlockHash.String()) + if _, found := tis.toAddAccepted[*acceptingBlockHash]; found { + delete(tis.toAddAccepted, *acceptingBlockHash) + } + tis.toRemoveMerge[*acceptingBlockHash] = txIDs +} + +func (tis *txIndexStore) discardMerged() { + tis.toAddMerge = make(map[externalapi.DomainHash][]*externalapi.DomainTransactionID) + tis.toRemoveMerge = make(map[externalapi.DomainHash][]*externalapi.DomainTransactionID) + tis.virtualParents = nil +} + +func (tis *txIndexStore) discardAccepted() { + tis.toAddAccepted = make(map[externalapi.DomainHash][]*externalapi.DomainTransactionID) + tis.toRemoveAccepted = make(map[externalapi.DomainHash][]*externalapi.DomainTransactionID) + tis.ghostdagBlocks = nil +} + +func (tis *txIndexStore) discardAll() { + tis.discardAccepted() + tis.discardMerged() +} + +func (tis *txIndexStore) removeAll() error { + tis.removeAccepted() + tis.removeAll() + return nil +} + + +func (tis *txIndexStore) commitAll() error { + tis.commitAccepted() + tis.commitMerged() + return nil +} + +func (tis *txIndexStore) commitMerged() error { + if tis.isAnythingMergingStaged() { + return errors.Errorf("cannot commit merging TxIds while merge staging isn't empty") + } + return nil +} + +func (tis *txIndexStore) commitAccepted() error { + return nil +} + + +func (tis *txIndexStore) convertTxIDToKey(bucket *database.Bucket, txID *externalapi.DomainTransactionID) *database.Key { + return bucket.Key(txID.ByteSlice()) +} + +func (tis *txIndexStore) updateVirtualParents(virtualParents []*externalapi.DomainHash) { + tis.virtualParents = virtualParents +} + +func (tis *txIndexStore) updateGhostDagBlocks(ghostdagBlocks []*externalapi.DomainHash) { + tis.ghostdagBlocks = ghostdagBlocks +} + + +func (tis *txIndexStore) convertKeyToTxID(key *database.Key) (*externalapi.DomainTransactionID, error) { + serializedTxID := key.Suffix() + return externalapi.NewDomainTransactionIDFromByteSlice(serializedTxID) +} + +func (tis *txIndexStore) stagedAcceptingData() error { + return nil +} + +func (tis *txIndexStore) stagedMergingData() error { + return nil +} + +func (tis *txIndexStore) stagedData() error { + return nil +} + +func (tis *txIndexStore) isAnythingStaged() bool { + return tis.isAnythingAcceptingStaged() || tis.isAnythingMergingStaged() +} + +func (tis *txIndexStore) isAnythingAcceptingStaged() bool { + return len(tis.toAddAccepted) > 0 || len(tis.toRemoveAccepted) > 0 +} + +func (tis *txIndexStore) isAnythingMergingStaged() bool { + return len(tis.toAddMerge) > 0 || len(tis.ToRemoveMerge) > 0 +} + +func (tis *txIndexStore) getTxAcceptingBlockHash(scriptPublicKey *externalapi.ScriptPublicKey) (externalapi.DomainHash, error) { + if tis.isAnythingAcceptingStaged() { + return nil, errors.Errorf("cannot get utxo outpoint entry pairs while staging isn't empty") + } + return nil, nil +} + +func (tis *txIndexStore) getTxMergeBlockHash(scriptPublicKey *externalapi.ScriptPublicKey) (externalapi.DomainHash, error) { + if tis.isAnythingMergingMergingStaged() { + return nil, errors.Errorf("cannot get utxo outpoint entry pairs while staging isn't empty") + } + return nil, nil +} + +func (tis *txIndexStore) getTxBlockHashes(scriptPublicKey *externalapi.ScriptPublicKey) (externalapi.DomainHash, error) { + if tis.isAnythingStaged() { + return nil, errors.Errorf("cannot get utxo outpoint entry pairs while staging isn't empty") + } + return nil, nil +} + +func (tis *txIndexStore) deleteAccepptingData() error { + return nil +} + +func (tis *txIndexStore) deleteMergingData() error { + return nil +} + +func (tis *txIndexStore) deleteAll() error { + tis.deleteAccepptingData() + tis.deleteMergingData() + return nil +} + +func (tis *txIndexStore) resetAcceptingData() error { + return nil +} + +func (tis *txIndexStore) resetMergingData() error { + return nil +} + +func (tis *txIndexStore) resetAll() error { + tis.resetAcceptingData() + tis.resetMergingData() + return nil +} diff --git a/domain/txindex.go/txindex.go b/domain/txindex.go/txindex.go new file mode 100644 index 000000000..e58a12175 --- /dev/null +++ b/domain/txindex.go/txindex.go @@ -0,0 +1,200 @@ +package txindex + +import ( + "github.com/kaspanet/kaspad/domain" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/kaspanet/kaspad/infrastructure/db/database" + "github.com/kaspanet/kaspad/infrastructure/logger" + "sync" +) + +// TXIndex maintains an index between transaction IDs and accepting block hashes +type TXIndex struct { + domain domain.Domain + store *txIndexStore + + mutex sync.Mutex +} + +// New creates a new UTXO index. +// +// NOTE: While this is called no new blocks can be added to the consensus. +func New(domain domain.Domain, database database.Database) (*TXIndex, error) { + utxoIndex := &TXIndex{ + domain: domain, + store: newTXIndexStore(database), + } + isSynced, err := utxoIndex.isSynced() + if err != nil { + return nil, err + } + + ///Has check is for migration to circulating supply, can be removed eventually. + hasCirculatingSupplyKey, err := utxoIndex.store.database.Has(circulatingSupplyKey) + if err != nil { + return nil, err + } + + if !isSynced || !hasCirculatingSupplyKey { + + err := utxoIndex.Reset() + if err != nil { + return nil, err + } + } + + return utxoIndex, nil +} + +// Reset deletes the whole UTXO index and resyncs it from consensus. +func (ui *UTXOIndex) Reset() error { + ui.mutex.Lock() + defer ui.mutex.Unlock() + + err := ui.store.deleteAll() + if err != nil { + return err + } + + virtualInfo, err := ui.domain.Consensus().GetVirtualInfo() + if err != nil { + return err + } + + err = ui.store.initializeCirculatingSompiSupply() //At this point the database is empty, so the sole purpose of this call is to initialize the circulating supply key + if err != nil { + return err + } + + var fromOutpoint *externalapi.DomainOutpoint + for { + const step = 1000 + virtualUTXOs, err := ui.domain.Consensus().GetVirtualUTXOs(virtualInfo.ParentHashes, fromOutpoint, step) + if err != nil { + return err + } + + err = ui.store.addAndCommitOutpointsWithoutTransaction(virtualUTXOs) + if err != nil { + return err + } + + if len(virtualUTXOs) < step { + break + } + + fromOutpoint = virtualUTXOs[len(virtualUTXOs)-1].Outpoint + } + + // This has to be done last to mark that the reset went smoothly and no reset has to be called next time. + return ui.store.updateAndCommitVirtualParentsWithoutTransaction(virtualInfo.ParentHashes) +} + +func (ui *UTXOIndex) isSynced() (bool, error) { + utxoIndexVirtualParents, err := ui.store.getVirtualParents() + if err != nil { + if database.IsNotFoundError(err) { + return false, nil + } + return false, err + } + + virtualInfo, err := ui.domain.Consensus().GetVirtualInfo() + if err != nil { + return false, err + } + + return externalapi.HashesEqual(virtualInfo.ParentHashes, utxoIndexVirtualParents), nil +} + +// Update updates the UTXO index with the given DAG selected parent chain changes +func (ui *UTXOIndex) Update(virtualChangeSet *externalapi.VirtualChangeSet) (*UTXOChanges, error) { + onEnd := logger.LogAndMeasureExecutionTime(log, "UTXOIndex.Update") + defer onEnd() + + ui.mutex.Lock() + defer ui.mutex.Unlock() + + log.Tracef("Updating UTXO index with VirtualUTXODiff: %+v", virtualChangeSet.VirtualUTXODiff) + err := ui.removeUTXOs(virtualChangeSet.VirtualUTXODiff.ToRemove()) + if err != nil { + return nil, err + } + + err = ui.addUTXOs(virtualChangeSet.VirtualUTXODiff.ToAdd()) + if err != nil { + return nil, err + } + + ui.store.updateVirtualParents(virtualChangeSet.VirtualParents) + + added, removed, _ := ui.store.stagedData() + utxoIndexChanges := &UTXOChanges{ + Added: added, + Removed: removed, + } + + err = ui.store.commit() + if err != nil { + return nil, err + } + + log.Tracef("UTXO index updated with the UTXOChanged: %+v", utxoIndexChanges) + return utxoIndexChanges, nil +} + +func (ui *UTXOIndex) addUTXOs(toAdd externalapi.UTXOCollection) error { + iterator := toAdd.Iterator() + defer iterator.Close() + for ok := iterator.First(); ok; ok = iterator.Next() { + outpoint, entry, err := iterator.Get() + if err != nil { + return err + } + + log.Tracef("Adding outpoint %s to UTXO index", outpoint) + err = ui.store.add(entry.ScriptPublicKey(), outpoint, entry) + if err != nil { + return err + } + } + return nil +} + +func (ui *UTXOIndex) removeUTXOs(toRemove externalapi.UTXOCollection) error { + iterator := toRemove.Iterator() + defer iterator.Close() + for ok := iterator.First(); ok; ok = iterator.Next() { + outpoint, entry, err := iterator.Get() + if err != nil { + return err + } + + log.Tracef("Removing outpoint %s from UTXO index", outpoint) + err = ui.store.remove(entry.ScriptPublicKey(), outpoint, entry) + if err != nil { + return err + } + } + return nil +} + +// UTXOs returns all the UTXOs for the given scriptPublicKey +func (ui *UTXOIndex) UTXOs(scriptPublicKey *externalapi.ScriptPublicKey) (UTXOOutpointEntryPairs, error) { + onEnd := logger.LogAndMeasureExecutionTime(log, "UTXOIndex.UTXOs") + defer onEnd() + + ui.mutex.Lock() + defer ui.mutex.Unlock() + + return ui.store.getUTXOOutpointEntryPairs(scriptPublicKey) +} + +// GetCirculatingSompiSupply returns the current circulating supply of sompis in the network +func (ui *UTXOIndex) GetCirculatingSompiSupply() (uint64, error) { + + ui.mutex.Lock() + defer ui.mutex.Unlock() + + return ui.store.getCirculatingSompiSupply() +}