From 3f959be0a3333f6187859e81e5d120a9849d3b4d Mon Sep 17 00:00:00 2001 From: Mike Zak Date: Mon, 23 May 2022 17:11:35 +0300 Subject: [PATCH] Move OnBlockAdded event to the channel that was only used by virtualChanged events --- app/component_manager.go | 12 +++--- app/protocol/flowcontext/blocks.go | 8 ---- app/protocol/flowcontext/flow_context.go | 10 ----- app/protocol/manager.go | 8 +--- app/rpc/manager.go | 33 +++++++++++------ app/rpc/rpchandlers/get_blocks_test.go | 2 +- domain/consensus/consensus.go | 34 ++++++++++++----- domain/consensus/factory.go | 13 ++++--- domain/consensus/factory_test.go | 6 +-- ...irtualchangeset.go => consensus_events.go} | 16 +++++++- domain/domain.go | 37 ++++++++++--------- 11 files changed, 100 insertions(+), 79 deletions(-) rename domain/consensus/model/externalapi/{virtualchangeset.go => consensus_events.go} (50%) diff --git a/app/component_manager.go b/app/component_manager.go index e1b171cb7..33b9a2c09 100644 --- a/app/component_manager.go +++ b/app/component_manager.go @@ -2,9 +2,10 @@ package app import ( "fmt" - "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "sync/atomic" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/kaspanet/kaspad/domain/miningmanager/mempool" "github.com/kaspanet/kaspad/app/protocol" @@ -68,7 +69,7 @@ func (a *ComponentManager) Stop() { } a.protocolManager.Close() - close(a.protocolManager.Context().Domain().VirtualChangeChannel()) + close(a.protocolManager.Context().Domain().ConsensusEventsChannel()) return } @@ -120,7 +121,7 @@ func NewComponentManager(cfg *config.Config, db infrastructuredatabase.Database, if err != nil { return nil, err } - rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, domain.VirtualChangeChannel(), interrupt) + rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, domain.ConsensusEventsChannel(), interrupt) return &ComponentManager{ cfg: cfg, @@ -141,7 +142,7 @@ func setupRPC( connectionManager *connmanager.ConnectionManager, addressManager *addressmanager.AddressManager, utxoIndex *utxoindex.UTXOIndex, - virtualChangeChan chan *externalapi.VirtualChangeSet, + consensusEventsChan chan externalapi.ConsensusEvent, shutDownChan chan<- struct{}, ) *rpc.Manager { @@ -153,11 +154,10 @@ func setupRPC( connectionManager, addressManager, utxoIndex, - virtualChangeChan, + consensusEventsChan, shutDownChan, ) protocolManager.SetOnNewBlockTemplateHandler(rpcManager.NotifyNewBlockTemplate) - protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG) protocolManager.SetOnPruningPointUTXOSetOverrideHandler(rpcManager.NotifyPruningPointUTXOSetOverride) return rpcManager diff --git a/app/protocol/flowcontext/blocks.go b/app/protocol/flowcontext/blocks.go index dcc5a37f6..68bca6cb5 100644 --- a/app/protocol/flowcontext/blocks.go +++ b/app/protocol/flowcontext/blocks.go @@ -40,14 +40,6 @@ func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock) error { return err } allAcceptedTransactions = append(allAcceptedTransactions, acceptedTransactions...) - - if f.onBlockAddedToDAGHandler != nil { - log.Debugf("OnNewBlock: calling f.onBlockAddedToDAGHandler for block %s", hash) - err := f.onBlockAddedToDAGHandler(newBlock) - if err != nil { - return err - } - } } return f.broadcastTransactionsAfterBlockAdded(newBlocks, allAcceptedTransactions) diff --git a/app/protocol/flowcontext/flow_context.go b/app/protocol/flowcontext/flow_context.go index f32476d52..7ca8bf001 100644 --- a/app/protocol/flowcontext/flow_context.go +++ b/app/protocol/flowcontext/flow_context.go @@ -18,10 +18,6 @@ import ( "github.com/kaspanet/kaspad/infrastructure/network/netadapter/id" ) -// OnBlockAddedToDAGHandler is a handler function that's triggered -// when a block is added to the DAG -type OnBlockAddedToDAGHandler func(block *externalapi.DomainBlock) error - // OnNewBlockTemplateHandler is a handler function that's triggered when a new block template is available type OnNewBlockTemplateHandler func() error @@ -44,7 +40,6 @@ type FlowContext struct { timeStarted int64 - onBlockAddedToDAGHandler OnBlockAddedToDAGHandler onNewBlockTemplateHandler OnNewBlockTemplateHandler onPruningPointUTXOSetOverrideHandler OnPruningPointUTXOSetOverrideHandler onTransactionAddedToMempoolHandler OnTransactionAddedToMempoolHandler @@ -107,11 +102,6 @@ func (f *FlowContext) IsNearlySynced() (bool, error) { return f.Domain().Consensus().IsNearlySynced() } -// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler -func (f *FlowContext) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler OnBlockAddedToDAGHandler) { - f.onBlockAddedToDAGHandler = onBlockAddedToDAGHandler -} - // SetOnNewBlockTemplateHandler sets the onNewBlockTemplateHandler handler func (f *FlowContext) SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler OnNewBlockTemplateHandler) { f.onNewBlockTemplateHandler = onNewBlockTemplateHandler diff --git a/app/protocol/manager.go b/app/protocol/manager.go index c6c3046aa..2ecae023a 100644 --- a/app/protocol/manager.go +++ b/app/protocol/manager.go @@ -2,10 +2,11 @@ package protocol import ( "fmt" - "github.com/kaspanet/kaspad/app/protocol/common" "sync" "sync/atomic" + "github.com/kaspanet/kaspad/app/protocol/common" + "github.com/pkg/errors" "github.com/kaspanet/kaspad/domain" @@ -90,11 +91,6 @@ func (m *Manager) runFlows(flows []*common.Flow, peer *peerpkg.Peer, errChan <-c return <-errChan } -// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler -func (m *Manager) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler flowcontext.OnBlockAddedToDAGHandler) { - m.context.SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler) -} - // SetOnNewBlockTemplateHandler sets the onNewBlockTemplate handler func (m *Manager) SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler flowcontext.OnNewBlockTemplateHandler) { m.context.SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler) diff --git a/app/rpc/manager.go b/app/rpc/manager.go index e6cc97b4c..137fd6555 100644 --- a/app/rpc/manager.go +++ b/app/rpc/manager.go @@ -12,6 +12,7 @@ import ( "github.com/kaspanet/kaspad/infrastructure/network/addressmanager" "github.com/kaspanet/kaspad/infrastructure/network/connmanager" "github.com/kaspanet/kaspad/infrastructure/network/netadapter" + "github.com/pkg/errors" ) // Manager is an RPC manager @@ -28,7 +29,7 @@ func NewManager( connectionManager *connmanager.ConnectionManager, addressManager *addressmanager.AddressManager, utxoIndex *utxoindex.UTXOIndex, - virtualChangeChan chan *externalapi.VirtualChangeSet, + consensusEventsChan chan externalapi.ConsensusEvent, shutDownChan chan<- struct{}) *Manager { manager := Manager{ @@ -45,29 +46,39 @@ func NewManager( } netAdapter.SetRPCRouterInitializer(manager.routerInitializer) - manager.initVirtualChangeHandler(virtualChangeChan) + manager.initConsensusEventsHandler(consensusEventsChan) return &manager } -func (m *Manager) initVirtualChangeHandler(virtualChangeChan chan *externalapi.VirtualChangeSet) { - spawn("virtualChangeHandler", func() { +func (m *Manager) initConsensusEventsHandler(consensusEventsChan chan externalapi.ConsensusEvent) { + spawn("consensusEventsHandler", func() { for { - virtualChangeSet, ok := <-virtualChangeChan + consensusEvent, ok := <-consensusEventsChan if !ok { return } - err := m.notifyVirtualChange(virtualChangeSet) - if err != nil { - panic(err) + switch event := consensusEvent.(type) { + case *externalapi.VirtualChangeSet: + err := m.notifyVirtualChange(event) + if err != nil { + panic(err) + } + case *externalapi.BlockAdded: + err := m.notifyBlockAddedToDAG(event.Block) + if err != nil { + panic(err) + } + default: + panic(errors.Errorf("Got event of unsupported type %T", consensusEvent)) } } }) } -// NotifyBlockAddedToDAG notifies the manager that a block has been added to the DAG -func (m *Manager) NotifyBlockAddedToDAG(block *externalapi.DomainBlock) error { - onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyBlockAddedToDAG") +// notifyBlockAddedToDAG notifies the manager that a block has been added to the DAG +func (m *Manager) notifyBlockAddedToDAG(block *externalapi.DomainBlock) error { + onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.notifyBlockAddedToDAG") defer onEnd() // Before converting the block and populating it, we check if any listeners are interested. diff --git a/app/rpc/rpchandlers/get_blocks_test.go b/app/rpc/rpchandlers/get_blocks_test.go index 6f4bf5809..38642845f 100644 --- a/app/rpc/rpchandlers/get_blocks_test.go +++ b/app/rpc/rpchandlers/get_blocks_test.go @@ -23,7 +23,7 @@ type fakeDomain struct { testapi.TestConsensus } -func (d fakeDomain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet { +func (d fakeDomain) ConsensusEventsChannel() chan externalapi.ConsensusEvent { panic("implement me") } diff --git a/domain/consensus/consensus.go b/domain/consensus/consensus.go index 9cfa10988..fb4fbba0d 100644 --- a/domain/consensus/consensus.go +++ b/domain/consensus/consensus.go @@ -1,10 +1,11 @@ package consensus import ( - "github.com/kaspanet/kaspad/util/mstime" "math/big" "sync" + "github.com/kaspanet/kaspad/util/mstime" + "github.com/kaspanet/kaspad/domain/consensus/database" "github.com/kaspanet/kaspad/domain/consensus/model" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" @@ -59,7 +60,7 @@ type consensus struct { daaBlocksStore model.DAABlocksStore blocksWithTrustedDataDAAWindowStore model.BlocksWithTrustedDataDAAWindowStore - virtualChangeChan chan *externalapi.VirtualChangeSet + consensusEventsChan chan externalapi.ConsensusEvent } func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, error) { @@ -197,7 +198,12 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, shoul return nil, err } - err = s.onVirtualChange(virtualChangeSet, shouldValidateAgainstUTXO) + err = s.sendBlockAddedEvent(block) + if err != nil { + return nil, err + } + + err = s.sendVirtualChangedEvent(virtualChangeSet, shouldValidateAgainstUTXO) if err != nil { return nil, err } @@ -205,13 +211,23 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, shoul return virtualChangeSet, nil } -func (s *consensus) onVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet, wasVirtualUpdated bool) error { - if !wasVirtualUpdated || s.virtualChangeChan == nil { +func (s *consensus) sendBlockAddedEvent(block *externalapi.DomainBlock) error { + if s.consensusEventsChan != nil { + if len(s.consensusEventsChan) == cap(s.consensusEventsChan) { + return errors.Errorf("consensusEventsChan is full") + } + s.consensusEventsChan <- &externalapi.BlockAdded{Block: block} + } + return nil +} + +func (s *consensus) sendVirtualChangedEvent(virtualChangeSet *externalapi.VirtualChangeSet, wasVirtualUpdated bool) error { + if !wasVirtualUpdated || s.consensusEventsChan == nil { return nil } - if len(s.virtualChangeChan) == cap(s.virtualChangeChan) { - return errors.Errorf("virtualChangeChan is full") + if len(s.consensusEventsChan) == cap(s.consensusEventsChan) { + return errors.Errorf("consensusEventsChan is full") } stagingArea := model.NewStagingArea() @@ -234,7 +250,7 @@ func (s *consensus) onVirtualChange(virtualChangeSet *externalapi.VirtualChangeS virtualChangeSet.VirtualSelectedParentBlueScore = virtualSelectedParentGHOSTDAGData.BlueScore() virtualChangeSet.VirtualDAAScore = virtualDAAScore - s.virtualChangeChan <- virtualChangeSet + s.consensusEventsChan <- virtualChangeSet return nil } @@ -837,7 +853,7 @@ func (s *consensus) ResolveVirtual() (*externalapi.VirtualChangeSet, bool, error return nil, false, err } - err = s.onVirtualChange(virtualChangeSet, true) + err = s.sendVirtualChangedEvent(virtualChangeSet, true) if err != nil { return nil, false, err } diff --git a/domain/consensus/factory.go b/domain/consensus/factory.go index a91f539ad..f5167fc7e 100644 --- a/domain/consensus/factory.go +++ b/domain/consensus/factory.go @@ -1,6 +1,10 @@ package consensus import ( + "io/ioutil" + "os" + "sync" + "github.com/kaspanet/kaspad/domain/consensus/datastructures/blockwindowheapslicestore" "github.com/kaspanet/kaspad/domain/consensus/datastructures/daawindowstore" "github.com/kaspanet/kaspad/domain/consensus/datastructures/mergedepthrootstore" @@ -10,9 +14,6 @@ import ( "github.com/kaspanet/kaspad/domain/consensus/processes/pruningproofmanager" "github.com/kaspanet/kaspad/util/staging" "github.com/pkg/errors" - "io/ioutil" - "os" - "sync" "github.com/kaspanet/kaspad/domain/prefixmanager/prefix" "github.com/kaspanet/kaspad/util/txmass" @@ -77,7 +78,7 @@ type Config struct { // Factory instantiates new Consensuses type Factory interface { NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix, - virtualChangeChan chan *externalapi.VirtualChangeSet) ( + consensusEventsChan chan externalapi.ConsensusEvent) ( externalapi.Consensus, bool, error) NewTestConsensus(config *Config, testName string) ( tc testapi.TestConsensus, teardown func(keepDataDir bool), err error) @@ -110,7 +111,7 @@ func NewFactory() Factory { // NewConsensus instantiates a new Consensus func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix, - virtualChangeChan chan *externalapi.VirtualChangeSet) ( + consensusEventsChan chan externalapi.ConsensusEvent) ( consensusInstance externalapi.Consensus, shouldMigrate bool, err error) { dbManager := consensusdatabase.New(db) @@ -513,7 +514,7 @@ func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Databas daaBlocksStore: daaBlocksStore, blocksWithTrustedDataDAAWindowStore: daaWindowStore, - virtualChangeChan: virtualChangeChan, + consensusEventsChan: consensusEventsChan, } if isOldReachabilityInitialized { diff --git a/domain/consensus/factory_test.go b/domain/consensus/factory_test.go index cbf661cba..b5bca1394 100644 --- a/domain/consensus/factory_test.go +++ b/domain/consensus/factory_test.go @@ -1,11 +1,11 @@ package consensus import ( - "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" - "github.com/kaspanet/kaspad/domain/prefixmanager/prefix" "io/ioutil" "testing" + "github.com/kaspanet/kaspad/domain/prefixmanager/prefix" + "github.com/kaspanet/kaspad/domain/dagconfig" "github.com/kaspanet/kaspad/infrastructure/db/database/ldb" ) @@ -25,7 +25,7 @@ func TestNewConsensus(t *testing.T) { t.Fatalf("error in NewLevelDB: %s", err) } - _, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}, make(chan *externalapi.VirtualChangeSet)) + _, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}, nil) if err != nil { t.Fatalf("error in NewConsensus: %+v", err) } diff --git a/domain/consensus/model/externalapi/virtualchangeset.go b/domain/consensus/model/externalapi/consensus_events.go similarity index 50% rename from domain/consensus/model/externalapi/virtualchangeset.go rename to domain/consensus/model/externalapi/consensus_events.go index 356e938f4..54c015849 100644 --- a/domain/consensus/model/externalapi/virtualchangeset.go +++ b/domain/consensus/model/externalapi/consensus_events.go @@ -1,6 +1,18 @@ package externalapi -// VirtualChangeSet is auxiliary data returned from ValidateAndInsertBlock and ResolveVirtual +// ConsensusEvent is an interface type that is implemented by all events raised by consensus +type ConsensusEvent interface { + isConsensusEvent() +} + +// BlockAdded is an event raised by consensus when a block was added to the dag +type BlockAdded struct { + Block *DomainBlock +} + +func (*BlockAdded) isConsensusEvent() {} + +// VirtualChangeSet is an event raised by consensus when virtual changes type VirtualChangeSet struct { VirtualSelectedParentChainChanges *SelectedChainPath VirtualUTXODiff UTXODiff @@ -9,6 +21,8 @@ type VirtualChangeSet struct { VirtualDAAScore uint64 } +func (*VirtualChangeSet) isConsensusEvent() {} + // SelectedChainPath is a path the of the selected chains between two blocks. type SelectedChainPath struct { Added []*DomainHash diff --git a/domain/domain.go b/domain/domain.go index 99579a301..3e02530d4 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1,11 +1,12 @@ package domain import ( - "github.com/kaspanet/kaspad/domain/consensusreference" "sync" "sync/atomic" "unsafe" + "github.com/kaspanet/kaspad/domain/consensusreference" + "github.com/kaspanet/kaspad/domain/consensus" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/miningmanager" @@ -24,21 +25,21 @@ type Domain interface { InitStagingConsensusWithoutGenesis() error CommitStagingConsensus() error DeleteStagingConsensus() error - VirtualChangeChannel() chan *externalapi.VirtualChangeSet + ConsensusEventsChannel() chan externalapi.ConsensusEvent } type domain struct { - miningManager miningmanager.MiningManager - consensus *externalapi.Consensus - stagingConsensus *externalapi.Consensus - stagingConsensusLock sync.RWMutex - consensusConfig *consensus.Config - db infrastructuredatabase.Database - virtualChangeChan chan *externalapi.VirtualChangeSet + miningManager miningmanager.MiningManager + consensus *externalapi.Consensus + stagingConsensus *externalapi.Consensus + stagingConsensusLock sync.RWMutex + consensusConfig *consensus.Config + db infrastructuredatabase.Database + consensusEventsChannel chan externalapi.ConsensusEvent } -func (d *domain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet { - return d.virtualChangeChan +func (d *domain) ConsensusEventsChannel() chan externalapi.ConsensusEvent { + return d.consensusEventsChannel } func (d *domain) Consensus() externalapi.Consensus { @@ -92,7 +93,7 @@ func (d *domain) initStagingConsensus(cfg *consensus.Config) error { consensusFactory := consensus.NewFactory() - consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix, d.virtualChangeChan) + consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix, d.consensusEventsChannel) if err != nil { return err } @@ -196,18 +197,18 @@ func New(consensusConfig *consensus.Config, mempoolConfig *mempool.Config, db in } } - virtualChangeChan := make(chan *externalapi.VirtualChangeSet, 100e3) + consensusEventsChan := make(chan externalapi.ConsensusEvent, 100e3) consensusFactory := consensus.NewFactory() - consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix, virtualChangeChan) + consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix, consensusEventsChan) if err != nil { return nil, err } domainInstance := &domain{ - consensus: &consensusInstance, - consensusConfig: consensusConfig, - db: db, - virtualChangeChan: virtualChangeChan, + consensus: &consensusInstance, + consensusConfig: consensusConfig, + db: db, + consensusEventsChannel: consensusEventsChan, } if shouldMigrate {