diff --git a/app/appmessage/message.go b/app/appmessage/message.go index e5a261574..0de20227c 100644 --- a/app/appmessage/message.go +++ b/app/appmessage/message.go @@ -312,6 +312,10 @@ var RPCMessageCommandToString = map[MessageCommand]string{ CmdGetFeeEstimateResponseMessage: "GetFeeEstimateResponse", CmdSubmitTransactionReplacementRequestMessage: "SubmitTransactionReplacementRequest", CmdSubmitTransactionReplacementResponseMessage: "SubmitTransactionReplacementResponse", + CmdGetPruningWindowRootsRequestMessage: "GetPruningWindowRootsRequestMessage", + CmdGetPruningWindowRootsResponseMessage: "GetPruningWindowRootsResponseMessage", + CmdAddArchivalBlocksRequestMessage: "AddArchivalBlocksRequestMessage", + CmdAddArchivalBlocksResponseMessage: "AddArchivalBlocksResponseMessage", } // Message is an interface that describes a kaspa message. A type that diff --git a/cmd/archiveexport/main.go b/cmd/archiveexport/main.go index 300270d35..a2617a09f 100644 --- a/cmd/archiveexport/main.go +++ b/cmd/archiveexport/main.go @@ -9,17 +9,17 @@ import ( "github.com/kaspanet/kaspad/domain/consensus" "github.com/kaspanet/kaspad/domain/consensus/model" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" "github.com/kaspanet/kaspad/infrastructure/config" "github.com/kaspanet/kaspad/infrastructure/db/database" "github.com/kaspanet/kaspad/infrastructure/network/rpcclient" - "github.com/kaspanet/kaspad/util/panics" "github.com/kaspanet/kaspad/util/profiling" "github.com/kaspanet/kaspad/version" "github.com/pkg/errors" ) func main() { - defer panics.HandlePanic(log, "MAIN", nil) + // defer panics.HandlePanic(log, "MAIN", nil) cfg, err := parseConfig() if err != nil { @@ -43,10 +43,11 @@ func main() { func mainImpl(cfg *configFlags) error { dataDir := filepath.Join(config.DefaultAppDir) - dbPath := filepath.Join(dataDir, "db") + dbPath := filepath.Join(dataDir, "kaspa-mainnet/datadir2") consensusConfig := &consensus.Config{Params: *cfg.NetParams()} factory := consensus.NewFactory() factory.SetTestDataDir(dbPath) + factory.AutoSetActivePrefix(true) tc, tearDownFunc, err := factory.NewTestConsensus(consensusConfig, "archiveexport") if err != nil { return err @@ -72,16 +73,17 @@ func mainImpl(cfg *configFlags) error { return err } + for _, root := range rootsResp.Roots { + log.Infof("Got root %s", root.Root) + } + for _, root := range rootsResp.Roots { rootHash, err := externalapi.NewDomainHashFromString(root.Root) if err != nil { return err } - rootHeader, err := tc.BlockHeaderStore().BlockHeader(tc.DatabaseContext(), model.NewStagingArea(), rootHash) - if database.IsNotFoundError(err) { - continue - } + log.Infof("Adding past of %s", rootHash) if err != nil { return err @@ -93,10 +95,7 @@ func mainImpl(cfg *configFlags) error { // TODO: Since GD data is not always available, we should extract the blue work from the header and use that for topological traversal heap := tc.DAGTraversalManager().NewDownHeap(model.NewStagingArea()) - for _, parent := range rootHeader.DirectParents() { - heap.Push(parent) - blockToChild[*parent] = *rootHash - } + heap.Push(rootHash) visited := make(map[externalapi.DomainHash]struct{}) chunk := make([]*appmessage.ArchivalBlock, 0, 1000) @@ -127,13 +126,21 @@ func mainImpl(cfg *configFlags) error { return err } - chunk = append(chunk, &appmessage.ArchivalBlock{ + archivalBlock := &appmessage.ArchivalBlock{ Block: appmessage.DomainBlockToRPCBlock(block), - Child: blockToChild[*hash].String(), - }) + } + if child, ok := blockToChild[*hash]; ok { + archivalBlock.Child = child.String() + } + + chunk = append(chunk, archivalBlock) + + if len(chunk) == 1 { + log.Infof("Added %s to chunk", consensushashing.BlockHash(block)) + } if len(chunk) == cap(chunk) { - _, err := rpcClient.AddArchivalBlocks(chunk) + err := sendChunk(rpcClient, chunk) if err != nil { return err } @@ -146,6 +153,39 @@ func mainImpl(cfg *configFlags) error { blockToChild[*parent] = *hash } } + + if len(chunk) > 0 { + sendChunk(rpcClient, chunk) + } + } + + return nil +} + +func sendChunk(rpcClient *rpcclient.RPCClient, chunk []*appmessage.ArchivalBlock) error { + log.Infof("Sending chunk") + _, err := rpcClient.AddArchivalBlocks(chunk) + if err != nil { + return err + } + log.Infof("Sent chunk") + + // Checking existence of first block for sanity + block := chunk[0] + domainBlock, err := appmessage.RPCBlockToDomainBlock(block.Block) + if err != nil { + return err + } + + blockHash := consensushashing.BlockHash(domainBlock) + log.Infof("Checking block %s", blockHash) + resp, err := rpcClient.GetBlock(blockHash.String(), true) + if err != nil { + return err + } + + if len(resp.Block.Transactions) == 0 { + return errors.Errorf("Block %s has no transactions on the server", blockHash) } return nil diff --git a/domain/consensus/factory.go b/domain/consensus/factory.go index 8d1efef47..6207d0663 100644 --- a/domain/consensus/factory.go +++ b/domain/consensus/factory.go @@ -12,6 +12,7 @@ import ( "github.com/kaspanet/kaspad/domain/consensus/processes/blockparentbuilder" parentssanager "github.com/kaspanet/kaspad/domain/consensus/processes/parentsmanager" "github.com/kaspanet/kaspad/domain/consensus/processes/pruningproofmanager" + "github.com/kaspanet/kaspad/domain/prefixmanager" "github.com/kaspanet/kaspad/util/staging" "github.com/pkg/errors" @@ -89,6 +90,8 @@ type Factory interface { SetTestPreAllocateCache(preallocateCaches bool) SetTestPastMedianTimeManager(medianTimeConstructor PastMedianTimeManagerConstructor) SetTestDifficultyManager(difficultyConstructor DifficultyManagerConstructor) + + AutoSetActivePrefix(value bool) } type factory struct { @@ -98,6 +101,7 @@ type factory struct { difficultyConstructor DifficultyManagerConstructor cacheSizeMiB *int preallocateCaches *bool + autoCheckActivePrefix bool } // NewFactory creates a new Consensus factory @@ -106,6 +110,7 @@ func NewFactory() Factory { ghostdagConstructor: ghostdagmanager.New, pastMedianTimeConsructor: pastmediantimemanager.New, difficultyConstructor: difficultymanager.New, + autoCheckActivePrefix: false, } } @@ -588,8 +593,19 @@ func (f *factory) NewTestConsensus(config *Config, testName string) ( return nil, nil, err } - testConsensusDBPrefix := &prefix.Prefix{} - consensusAsInterface, shouldMigrate, err := f.NewConsensus(config, db, testConsensusDBPrefix, nil) + prefix := &prefix.Prefix{} + if f.autoCheckActivePrefix { + activePrefix, exists, err := prefixmanager.ActivePrefix(db) + if err != nil { + return nil, nil, err + } + + if exists { + prefix = activePrefix + } + } + + consensusAsInterface, shouldMigrate, err := f.NewConsensus(config, db, prefix, nil) if err != nil { return nil, nil, err } @@ -648,6 +664,11 @@ func (f *factory) SetTestPreAllocateCache(preallocateCaches bool) { f.preallocateCaches = &preallocateCaches } +// AutoSetActivePrefix implements Factory. +func (f *factory) AutoSetActivePrefix(value bool) { + f.autoCheckActivePrefix = value +} + func dagStores(config *Config, prefixBucket model.DBBucket, pruningWindowSizePlusFinalityDepthForCache, pruningWindowSizeForCaches int, diff --git a/infrastructure/network/netadapter/server/grpcserver/protowire/rpc_add_archival_blocks.go b/infrastructure/network/netadapter/server/grpcserver/protowire/rpc_add_archival_blocks.go index 6b721eecf..487c0dfd7 100644 --- a/infrastructure/network/netadapter/server/grpcserver/protowire/rpc_add_archival_blocks.go +++ b/infrastructure/network/netadapter/server/grpcserver/protowire/rpc_add_archival_blocks.go @@ -46,7 +46,7 @@ func (x *KaspadMessage_AddArchivalBlocksRequest) fromAppMessage(message *appmess } x.AddArchivalBlocksRequest = &AddArchivalBlocksRequestMessage{ - Blocks: make([]*ArchivalBlock, len(message.Blocks)), + Blocks: blocks, } return nil } @@ -81,7 +81,7 @@ func (x *AddArchivalBlocksResponseMessage) toAppMessage() (appmessage.Message, e return nil, err } - return &appmessage.GetPruningWindowRootsResponseMessage{ + return &appmessage.AddArchivalBlocksResponseMessage{ Error: rpcErr, }, nil } diff --git a/infrastructure/network/rpcclient/rpc_add_archival_blocks.go b/infrastructure/network/rpcclient/rpc_add_archival_blocks.go index c8bd656b6..4e684498d 100644 --- a/infrastructure/network/rpcclient/rpc_add_archival_blocks.go +++ b/infrastructure/network/rpcclient/rpc_add_archival_blocks.go @@ -7,7 +7,7 @@ func (c *RPCClient) AddArchivalBlocks(blocks []*appmessage.ArchivalBlock) (*appm if err != nil { return nil, err } - response, err := c.route(appmessage.CmdAddArchivalBlocksRequestMessage).DequeueWithTimeout(c.timeout) + response, err := c.route(appmessage.CmdAddArchivalBlocksResponseMessage).DequeueWithTimeout(c.timeout) if err != nil { return nil, err } diff --git a/infrastructure/network/rpcclient/rpc_get_pruning_window_roots.go b/infrastructure/network/rpcclient/rpc_get_pruning_window_roots.go index 088e71399..578762791 100644 --- a/infrastructure/network/rpcclient/rpc_get_pruning_window_roots.go +++ b/infrastructure/network/rpcclient/rpc_get_pruning_window_roots.go @@ -3,11 +3,11 @@ package rpcclient import "github.com/kaspanet/kaspad/app/appmessage" func (c *RPCClient) GetPruningWindowRoots() (*appmessage.GetPruningWindowRootsResponseMessage, error) { - err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewGetPeerAddressesRequestMessage()) + err := c.rpcRouter.outgoingRoute().Enqueue(&appmessage.GetPruningWindowRootsRequestMessage{}) if err != nil { return nil, err } - response, err := c.route(appmessage.CmdGetPruningWindowRootsRequestMessage).DequeueWithTimeout(c.timeout) + response, err := c.route(appmessage.CmdGetPruningWindowRootsResponseMessage).DequeueWithTimeout(c.timeout) if err != nil { return nil, err }