mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-11-26 15:35:55 +00:00
Move OnBlockAdded event to the channel that was only used by virtualChanged events
This commit is contained in:
parent
4dd7113dc5
commit
3f959be0a3
@ -2,9 +2,10 @@ package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain/miningmanager/mempool"
|
||||
|
||||
"github.com/kaspanet/kaspad/app/protocol"
|
||||
@ -68,7 +69,7 @@ func (a *ComponentManager) Stop() {
|
||||
}
|
||||
|
||||
a.protocolManager.Close()
|
||||
close(a.protocolManager.Context().Domain().VirtualChangeChannel())
|
||||
close(a.protocolManager.Context().Domain().ConsensusEventsChannel())
|
||||
|
||||
return
|
||||
}
|
||||
@ -120,7 +121,7 @@ func NewComponentManager(cfg *config.Config, db infrastructuredatabase.Database,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, domain.VirtualChangeChannel(), interrupt)
|
||||
rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, domain.ConsensusEventsChannel(), interrupt)
|
||||
|
||||
return &ComponentManager{
|
||||
cfg: cfg,
|
||||
@ -141,7 +142,7 @@ func setupRPC(
|
||||
connectionManager *connmanager.ConnectionManager,
|
||||
addressManager *addressmanager.AddressManager,
|
||||
utxoIndex *utxoindex.UTXOIndex,
|
||||
virtualChangeChan chan *externalapi.VirtualChangeSet,
|
||||
consensusEventsChan chan externalapi.ConsensusEvent,
|
||||
shutDownChan chan<- struct{},
|
||||
) *rpc.Manager {
|
||||
|
||||
@ -153,11 +154,10 @@ func setupRPC(
|
||||
connectionManager,
|
||||
addressManager,
|
||||
utxoIndex,
|
||||
virtualChangeChan,
|
||||
consensusEventsChan,
|
||||
shutDownChan,
|
||||
)
|
||||
protocolManager.SetOnNewBlockTemplateHandler(rpcManager.NotifyNewBlockTemplate)
|
||||
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
|
||||
protocolManager.SetOnPruningPointUTXOSetOverrideHandler(rpcManager.NotifyPruningPointUTXOSetOverride)
|
||||
|
||||
return rpcManager
|
||||
|
||||
@ -40,14 +40,6 @@ func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock) error {
|
||||
return err
|
||||
}
|
||||
allAcceptedTransactions = append(allAcceptedTransactions, acceptedTransactions...)
|
||||
|
||||
if f.onBlockAddedToDAGHandler != nil {
|
||||
log.Debugf("OnNewBlock: calling f.onBlockAddedToDAGHandler for block %s", hash)
|
||||
err := f.onBlockAddedToDAGHandler(newBlock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return f.broadcastTransactionsAfterBlockAdded(newBlocks, allAcceptedTransactions)
|
||||
|
||||
@ -18,10 +18,6 @@ import (
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
|
||||
)
|
||||
|
||||
// OnBlockAddedToDAGHandler is a handler function that's triggered
|
||||
// when a block is added to the DAG
|
||||
type OnBlockAddedToDAGHandler func(block *externalapi.DomainBlock) error
|
||||
|
||||
// OnNewBlockTemplateHandler is a handler function that's triggered when a new block template is available
|
||||
type OnNewBlockTemplateHandler func() error
|
||||
|
||||
@ -44,7 +40,6 @@ type FlowContext struct {
|
||||
|
||||
timeStarted int64
|
||||
|
||||
onBlockAddedToDAGHandler OnBlockAddedToDAGHandler
|
||||
onNewBlockTemplateHandler OnNewBlockTemplateHandler
|
||||
onPruningPointUTXOSetOverrideHandler OnPruningPointUTXOSetOverrideHandler
|
||||
onTransactionAddedToMempoolHandler OnTransactionAddedToMempoolHandler
|
||||
@ -107,11 +102,6 @@ func (f *FlowContext) IsNearlySynced() (bool, error) {
|
||||
return f.Domain().Consensus().IsNearlySynced()
|
||||
}
|
||||
|
||||
// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler
|
||||
func (f *FlowContext) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler OnBlockAddedToDAGHandler) {
|
||||
f.onBlockAddedToDAGHandler = onBlockAddedToDAGHandler
|
||||
}
|
||||
|
||||
// SetOnNewBlockTemplateHandler sets the onNewBlockTemplateHandler handler
|
||||
func (f *FlowContext) SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler OnNewBlockTemplateHandler) {
|
||||
f.onNewBlockTemplateHandler = onNewBlockTemplateHandler
|
||||
|
||||
@ -2,10 +2,11 @@ package protocol
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kaspanet/kaspad/app/protocol/common"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/kaspanet/kaspad/app/protocol/common"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain"
|
||||
@ -90,11 +91,6 @@ func (m *Manager) runFlows(flows []*common.Flow, peer *peerpkg.Peer, errChan <-c
|
||||
return <-errChan
|
||||
}
|
||||
|
||||
// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler
|
||||
func (m *Manager) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler flowcontext.OnBlockAddedToDAGHandler) {
|
||||
m.context.SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler)
|
||||
}
|
||||
|
||||
// SetOnNewBlockTemplateHandler sets the onNewBlockTemplate handler
|
||||
func (m *Manager) SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler flowcontext.OnNewBlockTemplateHandler) {
|
||||
m.context.SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler)
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/addressmanager"
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/connmanager"
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Manager is an RPC manager
|
||||
@ -28,7 +29,7 @@ func NewManager(
|
||||
connectionManager *connmanager.ConnectionManager,
|
||||
addressManager *addressmanager.AddressManager,
|
||||
utxoIndex *utxoindex.UTXOIndex,
|
||||
virtualChangeChan chan *externalapi.VirtualChangeSet,
|
||||
consensusEventsChan chan externalapi.ConsensusEvent,
|
||||
shutDownChan chan<- struct{}) *Manager {
|
||||
|
||||
manager := Manager{
|
||||
@ -45,29 +46,39 @@ func NewManager(
|
||||
}
|
||||
netAdapter.SetRPCRouterInitializer(manager.routerInitializer)
|
||||
|
||||
manager.initVirtualChangeHandler(virtualChangeChan)
|
||||
manager.initConsensusEventsHandler(consensusEventsChan)
|
||||
|
||||
return &manager
|
||||
}
|
||||
|
||||
func (m *Manager) initVirtualChangeHandler(virtualChangeChan chan *externalapi.VirtualChangeSet) {
|
||||
spawn("virtualChangeHandler", func() {
|
||||
func (m *Manager) initConsensusEventsHandler(consensusEventsChan chan externalapi.ConsensusEvent) {
|
||||
spawn("consensusEventsHandler", func() {
|
||||
for {
|
||||
virtualChangeSet, ok := <-virtualChangeChan
|
||||
consensusEvent, ok := <-consensusEventsChan
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
err := m.notifyVirtualChange(virtualChangeSet)
|
||||
switch event := consensusEvent.(type) {
|
||||
case *externalapi.VirtualChangeSet:
|
||||
err := m.notifyVirtualChange(event)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case *externalapi.BlockAdded:
|
||||
err := m.notifyBlockAddedToDAG(event.Block)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
default:
|
||||
panic(errors.Errorf("Got event of unsupported type %T", consensusEvent))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// NotifyBlockAddedToDAG notifies the manager that a block has been added to the DAG
|
||||
func (m *Manager) NotifyBlockAddedToDAG(block *externalapi.DomainBlock) error {
|
||||
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyBlockAddedToDAG")
|
||||
// notifyBlockAddedToDAG notifies the manager that a block has been added to the DAG
|
||||
func (m *Manager) notifyBlockAddedToDAG(block *externalapi.DomainBlock) error {
|
||||
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.notifyBlockAddedToDAG")
|
||||
defer onEnd()
|
||||
|
||||
// Before converting the block and populating it, we check if any listeners are interested.
|
||||
|
||||
@ -23,7 +23,7 @@ type fakeDomain struct {
|
||||
testapi.TestConsensus
|
||||
}
|
||||
|
||||
func (d fakeDomain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet {
|
||||
func (d fakeDomain) ConsensusEventsChannel() chan externalapi.ConsensusEvent {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
||||
@ -1,10 +1,11 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"github.com/kaspanet/kaspad/util/mstime"
|
||||
"math/big"
|
||||
"sync"
|
||||
|
||||
"github.com/kaspanet/kaspad/util/mstime"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain/consensus/database"
|
||||
"github.com/kaspanet/kaspad/domain/consensus/model"
|
||||
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
|
||||
@ -59,7 +60,7 @@ type consensus struct {
|
||||
daaBlocksStore model.DAABlocksStore
|
||||
blocksWithTrustedDataDAAWindowStore model.BlocksWithTrustedDataDAAWindowStore
|
||||
|
||||
virtualChangeChan chan *externalapi.VirtualChangeSet
|
||||
consensusEventsChan chan externalapi.ConsensusEvent
|
||||
}
|
||||
|
||||
func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, error) {
|
||||
@ -197,7 +198,12 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, shoul
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.onVirtualChange(virtualChangeSet, shouldValidateAgainstUTXO)
|
||||
err = s.sendBlockAddedEvent(block)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.sendVirtualChangedEvent(virtualChangeSet, shouldValidateAgainstUTXO)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -205,13 +211,23 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, shoul
|
||||
return virtualChangeSet, nil
|
||||
}
|
||||
|
||||
func (s *consensus) onVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet, wasVirtualUpdated bool) error {
|
||||
if !wasVirtualUpdated || s.virtualChangeChan == nil {
|
||||
func (s *consensus) sendBlockAddedEvent(block *externalapi.DomainBlock) error {
|
||||
if s.consensusEventsChan != nil {
|
||||
if len(s.consensusEventsChan) == cap(s.consensusEventsChan) {
|
||||
return errors.Errorf("consensusEventsChan is full")
|
||||
}
|
||||
s.consensusEventsChan <- &externalapi.BlockAdded{Block: block}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(s.virtualChangeChan) == cap(s.virtualChangeChan) {
|
||||
return errors.Errorf("virtualChangeChan is full")
|
||||
func (s *consensus) sendVirtualChangedEvent(virtualChangeSet *externalapi.VirtualChangeSet, wasVirtualUpdated bool) error {
|
||||
if !wasVirtualUpdated || s.consensusEventsChan == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(s.consensusEventsChan) == cap(s.consensusEventsChan) {
|
||||
return errors.Errorf("consensusEventsChan is full")
|
||||
}
|
||||
|
||||
stagingArea := model.NewStagingArea()
|
||||
@ -234,7 +250,7 @@ func (s *consensus) onVirtualChange(virtualChangeSet *externalapi.VirtualChangeS
|
||||
virtualChangeSet.VirtualSelectedParentBlueScore = virtualSelectedParentGHOSTDAGData.BlueScore()
|
||||
virtualChangeSet.VirtualDAAScore = virtualDAAScore
|
||||
|
||||
s.virtualChangeChan <- virtualChangeSet
|
||||
s.consensusEventsChan <- virtualChangeSet
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -837,7 +853,7 @@ func (s *consensus) ResolveVirtual() (*externalapi.VirtualChangeSet, bool, error
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
err = s.onVirtualChange(virtualChangeSet, true)
|
||||
err = s.sendVirtualChangedEvent(virtualChangeSet, true)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain/consensus/datastructures/blockwindowheapslicestore"
|
||||
"github.com/kaspanet/kaspad/domain/consensus/datastructures/daawindowstore"
|
||||
"github.com/kaspanet/kaspad/domain/consensus/datastructures/mergedepthrootstore"
|
||||
@ -10,9 +14,6 @@ import (
|
||||
"github.com/kaspanet/kaspad/domain/consensus/processes/pruningproofmanager"
|
||||
"github.com/kaspanet/kaspad/util/staging"
|
||||
"github.com/pkg/errors"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain/prefixmanager/prefix"
|
||||
"github.com/kaspanet/kaspad/util/txmass"
|
||||
@ -77,7 +78,7 @@ type Config struct {
|
||||
// Factory instantiates new Consensuses
|
||||
type Factory interface {
|
||||
NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix,
|
||||
virtualChangeChan chan *externalapi.VirtualChangeSet) (
|
||||
consensusEventsChan chan externalapi.ConsensusEvent) (
|
||||
externalapi.Consensus, bool, error)
|
||||
NewTestConsensus(config *Config, testName string) (
|
||||
tc testapi.TestConsensus, teardown func(keepDataDir bool), err error)
|
||||
@ -110,7 +111,7 @@ func NewFactory() Factory {
|
||||
|
||||
// NewConsensus instantiates a new Consensus
|
||||
func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix,
|
||||
virtualChangeChan chan *externalapi.VirtualChangeSet) (
|
||||
consensusEventsChan chan externalapi.ConsensusEvent) (
|
||||
consensusInstance externalapi.Consensus, shouldMigrate bool, err error) {
|
||||
|
||||
dbManager := consensusdatabase.New(db)
|
||||
@ -513,7 +514,7 @@ func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Databas
|
||||
daaBlocksStore: daaBlocksStore,
|
||||
blocksWithTrustedDataDAAWindowStore: daaWindowStore,
|
||||
|
||||
virtualChangeChan: virtualChangeChan,
|
||||
consensusEventsChan: consensusEventsChan,
|
||||
}
|
||||
|
||||
if isOldReachabilityInitialized {
|
||||
|
||||
@ -1,11 +1,11 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
|
||||
"github.com/kaspanet/kaspad/domain/prefixmanager/prefix"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain/prefixmanager/prefix"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain/dagconfig"
|
||||
"github.com/kaspanet/kaspad/infrastructure/db/database/ldb"
|
||||
)
|
||||
@ -25,7 +25,7 @@ func TestNewConsensus(t *testing.T) {
|
||||
t.Fatalf("error in NewLevelDB: %s", err)
|
||||
}
|
||||
|
||||
_, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}, make(chan *externalapi.VirtualChangeSet))
|
||||
_, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("error in NewConsensus: %+v", err)
|
||||
}
|
||||
|
||||
@ -1,6 +1,18 @@
|
||||
package externalapi
|
||||
|
||||
// VirtualChangeSet is auxiliary data returned from ValidateAndInsertBlock and ResolveVirtual
|
||||
// ConsensusEvent is an interface type that is implemented by all events raised by consensus
|
||||
type ConsensusEvent interface {
|
||||
isConsensusEvent()
|
||||
}
|
||||
|
||||
// BlockAdded is an event raised by consensus when a block was added to the dag
|
||||
type BlockAdded struct {
|
||||
Block *DomainBlock
|
||||
}
|
||||
|
||||
func (*BlockAdded) isConsensusEvent() {}
|
||||
|
||||
// VirtualChangeSet is an event raised by consensus when virtual changes
|
||||
type VirtualChangeSet struct {
|
||||
VirtualSelectedParentChainChanges *SelectedChainPath
|
||||
VirtualUTXODiff UTXODiff
|
||||
@ -9,6 +21,8 @@ type VirtualChangeSet struct {
|
||||
VirtualDAAScore uint64
|
||||
}
|
||||
|
||||
func (*VirtualChangeSet) isConsensusEvent() {}
|
||||
|
||||
// SelectedChainPath is a path the of the selected chains between two blocks.
|
||||
type SelectedChainPath struct {
|
||||
Added []*DomainHash
|
||||
@ -1,11 +1,12 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"github.com/kaspanet/kaspad/domain/consensusreference"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain/consensusreference"
|
||||
|
||||
"github.com/kaspanet/kaspad/domain/consensus"
|
||||
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
|
||||
"github.com/kaspanet/kaspad/domain/miningmanager"
|
||||
@ -24,7 +25,7 @@ type Domain interface {
|
||||
InitStagingConsensusWithoutGenesis() error
|
||||
CommitStagingConsensus() error
|
||||
DeleteStagingConsensus() error
|
||||
VirtualChangeChannel() chan *externalapi.VirtualChangeSet
|
||||
ConsensusEventsChannel() chan externalapi.ConsensusEvent
|
||||
}
|
||||
|
||||
type domain struct {
|
||||
@ -34,11 +35,11 @@ type domain struct {
|
||||
stagingConsensusLock sync.RWMutex
|
||||
consensusConfig *consensus.Config
|
||||
db infrastructuredatabase.Database
|
||||
virtualChangeChan chan *externalapi.VirtualChangeSet
|
||||
consensusEventsChannel chan externalapi.ConsensusEvent
|
||||
}
|
||||
|
||||
func (d *domain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet {
|
||||
return d.virtualChangeChan
|
||||
func (d *domain) ConsensusEventsChannel() chan externalapi.ConsensusEvent {
|
||||
return d.consensusEventsChannel
|
||||
}
|
||||
|
||||
func (d *domain) Consensus() externalapi.Consensus {
|
||||
@ -92,7 +93,7 @@ func (d *domain) initStagingConsensus(cfg *consensus.Config) error {
|
||||
|
||||
consensusFactory := consensus.NewFactory()
|
||||
|
||||
consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix, d.virtualChangeChan)
|
||||
consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix, d.consensusEventsChannel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -196,9 +197,9 @@ func New(consensusConfig *consensus.Config, mempoolConfig *mempool.Config, db in
|
||||
}
|
||||
}
|
||||
|
||||
virtualChangeChan := make(chan *externalapi.VirtualChangeSet, 100e3)
|
||||
consensusEventsChan := make(chan externalapi.ConsensusEvent, 100e3)
|
||||
consensusFactory := consensus.NewFactory()
|
||||
consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix, virtualChangeChan)
|
||||
consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix, consensusEventsChan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -207,7 +208,7 @@ func New(consensusConfig *consensus.Config, mempoolConfig *mempool.Config, db in
|
||||
consensus: &consensusInstance,
|
||||
consensusConfig: consensusConfig,
|
||||
db: db,
|
||||
virtualChangeChan: virtualChangeChan,
|
||||
consensusEventsChannel: consensusEventsChan,
|
||||
}
|
||||
|
||||
if shouldMigrate {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user