Some changes

This commit is contained in:
Ori Newman 2025-03-16 18:14:05 +02:00
parent c0f4301099
commit bc77fe1b4d
6 changed files with 87 additions and 22 deletions

View File

@ -312,6 +312,10 @@ var RPCMessageCommandToString = map[MessageCommand]string{
CmdGetFeeEstimateResponseMessage: "GetFeeEstimateResponse",
CmdSubmitTransactionReplacementRequestMessage: "SubmitTransactionReplacementRequest",
CmdSubmitTransactionReplacementResponseMessage: "SubmitTransactionReplacementResponse",
CmdGetPruningWindowRootsRequestMessage: "GetPruningWindowRootsRequestMessage",
CmdGetPruningWindowRootsResponseMessage: "GetPruningWindowRootsResponseMessage",
CmdAddArchivalBlocksRequestMessage: "AddArchivalBlocksRequestMessage",
CmdAddArchivalBlocksResponseMessage: "AddArchivalBlocksResponseMessage",
}
// Message is an interface that describes a kaspa message. A type that

View File

@ -9,17 +9,17 @@ import (
"github.com/kaspanet/kaspad/domain/consensus"
"github.com/kaspanet/kaspad/domain/consensus/model"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/db/database"
"github.com/kaspanet/kaspad/infrastructure/network/rpcclient"
"github.com/kaspanet/kaspad/util/panics"
"github.com/kaspanet/kaspad/util/profiling"
"github.com/kaspanet/kaspad/version"
"github.com/pkg/errors"
)
func main() {
defer panics.HandlePanic(log, "MAIN", nil)
// defer panics.HandlePanic(log, "MAIN", nil)
cfg, err := parseConfig()
if err != nil {
@ -43,10 +43,11 @@ func main() {
func mainImpl(cfg *configFlags) error {
dataDir := filepath.Join(config.DefaultAppDir)
dbPath := filepath.Join(dataDir, "db")
dbPath := filepath.Join(dataDir, "kaspa-mainnet/datadir2")
consensusConfig := &consensus.Config{Params: *cfg.NetParams()}
factory := consensus.NewFactory()
factory.SetTestDataDir(dbPath)
factory.AutoSetActivePrefix(true)
tc, tearDownFunc, err := factory.NewTestConsensus(consensusConfig, "archiveexport")
if err != nil {
return err
@ -72,16 +73,17 @@ func mainImpl(cfg *configFlags) error {
return err
}
for _, root := range rootsResp.Roots {
log.Infof("Got root %s", root.Root)
}
for _, root := range rootsResp.Roots {
rootHash, err := externalapi.NewDomainHashFromString(root.Root)
if err != nil {
return err
}
rootHeader, err := tc.BlockHeaderStore().BlockHeader(tc.DatabaseContext(), model.NewStagingArea(), rootHash)
if database.IsNotFoundError(err) {
continue
}
log.Infof("Adding past of %s", rootHash)
if err != nil {
return err
@ -93,10 +95,7 @@ func mainImpl(cfg *configFlags) error {
// TODO: Since GD data is not always available, we should extract the blue work from the header and use that for topological traversal
heap := tc.DAGTraversalManager().NewDownHeap(model.NewStagingArea())
for _, parent := range rootHeader.DirectParents() {
heap.Push(parent)
blockToChild[*parent] = *rootHash
}
heap.Push(rootHash)
visited := make(map[externalapi.DomainHash]struct{})
chunk := make([]*appmessage.ArchivalBlock, 0, 1000)
@ -127,13 +126,21 @@ func mainImpl(cfg *configFlags) error {
return err
}
chunk = append(chunk, &appmessage.ArchivalBlock{
archivalBlock := &appmessage.ArchivalBlock{
Block: appmessage.DomainBlockToRPCBlock(block),
Child: blockToChild[*hash].String(),
})
}
if child, ok := blockToChild[*hash]; ok {
archivalBlock.Child = child.String()
}
chunk = append(chunk, archivalBlock)
if len(chunk) == 1 {
log.Infof("Added %s to chunk", consensushashing.BlockHash(block))
}
if len(chunk) == cap(chunk) {
_, err := rpcClient.AddArchivalBlocks(chunk)
err := sendChunk(rpcClient, chunk)
if err != nil {
return err
}
@ -146,6 +153,39 @@ func mainImpl(cfg *configFlags) error {
blockToChild[*parent] = *hash
}
}
if len(chunk) > 0 {
sendChunk(rpcClient, chunk)
}
}
return nil
}
func sendChunk(rpcClient *rpcclient.RPCClient, chunk []*appmessage.ArchivalBlock) error {
log.Infof("Sending chunk")
_, err := rpcClient.AddArchivalBlocks(chunk)
if err != nil {
return err
}
log.Infof("Sent chunk")
// Checking existence of first block for sanity
block := chunk[0]
domainBlock, err := appmessage.RPCBlockToDomainBlock(block.Block)
if err != nil {
return err
}
blockHash := consensushashing.BlockHash(domainBlock)
log.Infof("Checking block %s", blockHash)
resp, err := rpcClient.GetBlock(blockHash.String(), true)
if err != nil {
return err
}
if len(resp.Block.Transactions) == 0 {
return errors.Errorf("Block %s has no transactions on the server", blockHash)
}
return nil

View File

@ -12,6 +12,7 @@ import (
"github.com/kaspanet/kaspad/domain/consensus/processes/blockparentbuilder"
parentssanager "github.com/kaspanet/kaspad/domain/consensus/processes/parentsmanager"
"github.com/kaspanet/kaspad/domain/consensus/processes/pruningproofmanager"
"github.com/kaspanet/kaspad/domain/prefixmanager"
"github.com/kaspanet/kaspad/util/staging"
"github.com/pkg/errors"
@ -89,6 +90,8 @@ type Factory interface {
SetTestPreAllocateCache(preallocateCaches bool)
SetTestPastMedianTimeManager(medianTimeConstructor PastMedianTimeManagerConstructor)
SetTestDifficultyManager(difficultyConstructor DifficultyManagerConstructor)
AutoSetActivePrefix(value bool)
}
type factory struct {
@ -98,6 +101,7 @@ type factory struct {
difficultyConstructor DifficultyManagerConstructor
cacheSizeMiB *int
preallocateCaches *bool
autoCheckActivePrefix bool
}
// NewFactory creates a new Consensus factory
@ -106,6 +110,7 @@ func NewFactory() Factory {
ghostdagConstructor: ghostdagmanager.New,
pastMedianTimeConsructor: pastmediantimemanager.New,
difficultyConstructor: difficultymanager.New,
autoCheckActivePrefix: false,
}
}
@ -588,8 +593,19 @@ func (f *factory) NewTestConsensus(config *Config, testName string) (
return nil, nil, err
}
testConsensusDBPrefix := &prefix.Prefix{}
consensusAsInterface, shouldMigrate, err := f.NewConsensus(config, db, testConsensusDBPrefix, nil)
prefix := &prefix.Prefix{}
if f.autoCheckActivePrefix {
activePrefix, exists, err := prefixmanager.ActivePrefix(db)
if err != nil {
return nil, nil, err
}
if exists {
prefix = activePrefix
}
}
consensusAsInterface, shouldMigrate, err := f.NewConsensus(config, db, prefix, nil)
if err != nil {
return nil, nil, err
}
@ -648,6 +664,11 @@ func (f *factory) SetTestPreAllocateCache(preallocateCaches bool) {
f.preallocateCaches = &preallocateCaches
}
// AutoSetActivePrefix implements Factory.
func (f *factory) AutoSetActivePrefix(value bool) {
f.autoCheckActivePrefix = value
}
func dagStores(config *Config,
prefixBucket model.DBBucket,
pruningWindowSizePlusFinalityDepthForCache, pruningWindowSizeForCaches int,

View File

@ -46,7 +46,7 @@ func (x *KaspadMessage_AddArchivalBlocksRequest) fromAppMessage(message *appmess
}
x.AddArchivalBlocksRequest = &AddArchivalBlocksRequestMessage{
Blocks: make([]*ArchivalBlock, len(message.Blocks)),
Blocks: blocks,
}
return nil
}
@ -81,7 +81,7 @@ func (x *AddArchivalBlocksResponseMessage) toAppMessage() (appmessage.Message, e
return nil, err
}
return &appmessage.GetPruningWindowRootsResponseMessage{
return &appmessage.AddArchivalBlocksResponseMessage{
Error: rpcErr,
}, nil
}

View File

@ -7,7 +7,7 @@ func (c *RPCClient) AddArchivalBlocks(blocks []*appmessage.ArchivalBlock) (*appm
if err != nil {
return nil, err
}
response, err := c.route(appmessage.CmdAddArchivalBlocksRequestMessage).DequeueWithTimeout(c.timeout)
response, err := c.route(appmessage.CmdAddArchivalBlocksResponseMessage).DequeueWithTimeout(c.timeout)
if err != nil {
return nil, err
}

View File

@ -3,11 +3,11 @@ package rpcclient
import "github.com/kaspanet/kaspad/app/appmessage"
func (c *RPCClient) GetPruningWindowRoots() (*appmessage.GetPruningWindowRootsResponseMessage, error) {
err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewGetPeerAddressesRequestMessage())
err := c.rpcRouter.outgoingRoute().Enqueue(&appmessage.GetPruningWindowRootsRequestMessage{})
if err != nil {
return nil, err
}
response, err := c.route(appmessage.CmdGetPruningWindowRootsRequestMessage).DequeueWithTimeout(c.timeout)
response, err := c.route(appmessage.CmdGetPruningWindowRootsResponseMessage).DequeueWithTimeout(c.timeout)
if err != nil {
return nil, err
}