Compare commits

...

5 Commits

17 changed files with 152 additions and 123 deletions

View File

@ -2,9 +2,10 @@ package app
import ( import (
"fmt" "fmt"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"sync/atomic" "sync/atomic"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/miningmanager/mempool" "github.com/kaspanet/kaspad/domain/miningmanager/mempool"
"github.com/kaspanet/kaspad/app/protocol" "github.com/kaspanet/kaspad/app/protocol"
@ -68,7 +69,7 @@ func (a *ComponentManager) Stop() {
} }
a.protocolManager.Close() a.protocolManager.Close()
close(a.protocolManager.Context().Domain().VirtualChangeChannel()) close(a.protocolManager.Context().Domain().ConsensusEventsChannel())
return return
} }
@ -120,7 +121,7 @@ func NewComponentManager(cfg *config.Config, db infrastructuredatabase.Database,
if err != nil { if err != nil {
return nil, err return nil, err
} }
rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, domain.VirtualChangeChannel(), interrupt) rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, domain.ConsensusEventsChannel(), interrupt)
return &ComponentManager{ return &ComponentManager{
cfg: cfg, cfg: cfg,
@ -141,7 +142,7 @@ func setupRPC(
connectionManager *connmanager.ConnectionManager, connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager, addressManager *addressmanager.AddressManager,
utxoIndex *utxoindex.UTXOIndex, utxoIndex *utxoindex.UTXOIndex,
virtualChangeChan chan *externalapi.VirtualChangeSet, consensusEventsChan chan externalapi.ConsensusEvent,
shutDownChan chan<- struct{}, shutDownChan chan<- struct{},
) *rpc.Manager { ) *rpc.Manager {
@ -153,11 +154,10 @@ func setupRPC(
connectionManager, connectionManager,
addressManager, addressManager,
utxoIndex, utxoIndex,
virtualChangeChan, consensusEventsChan,
shutDownChan, shutDownChan,
) )
protocolManager.SetOnNewBlockTemplateHandler(rpcManager.NotifyNewBlockTemplate) protocolManager.SetOnNewBlockTemplateHandler(rpcManager.NotifyNewBlockTemplate)
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
protocolManager.SetOnPruningPointUTXOSetOverrideHandler(rpcManager.NotifyPruningPointUTXOSetOverride) protocolManager.SetOnPruningPointUTXOSetOverrideHandler(rpcManager.NotifyPruningPointUTXOSetOverride)
return rpcManager return rpcManager

View File

@ -40,14 +40,6 @@ func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock) error {
return err return err
} }
allAcceptedTransactions = append(allAcceptedTransactions, acceptedTransactions...) allAcceptedTransactions = append(allAcceptedTransactions, acceptedTransactions...)
if f.onBlockAddedToDAGHandler != nil {
log.Debugf("OnNewBlock: calling f.onBlockAddedToDAGHandler for block %s", hash)
err := f.onBlockAddedToDAGHandler(newBlock)
if err != nil {
return err
}
}
} }
return f.broadcastTransactionsAfterBlockAdded(newBlocks, allAcceptedTransactions) return f.broadcastTransactionsAfterBlockAdded(newBlocks, allAcceptedTransactions)

View File

@ -18,10 +18,6 @@ import (
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
) )
// OnBlockAddedToDAGHandler is a handler function that's triggered
// when a block is added to the DAG
type OnBlockAddedToDAGHandler func(block *externalapi.DomainBlock) error
// OnNewBlockTemplateHandler is a handler function that's triggered when a new block template is available // OnNewBlockTemplateHandler is a handler function that's triggered when a new block template is available
type OnNewBlockTemplateHandler func() error type OnNewBlockTemplateHandler func() error
@ -44,7 +40,6 @@ type FlowContext struct {
timeStarted int64 timeStarted int64
onBlockAddedToDAGHandler OnBlockAddedToDAGHandler
onNewBlockTemplateHandler OnNewBlockTemplateHandler onNewBlockTemplateHandler OnNewBlockTemplateHandler
onPruningPointUTXOSetOverrideHandler OnPruningPointUTXOSetOverrideHandler onPruningPointUTXOSetOverrideHandler OnPruningPointUTXOSetOverrideHandler
onTransactionAddedToMempoolHandler OnTransactionAddedToMempoolHandler onTransactionAddedToMempoolHandler OnTransactionAddedToMempoolHandler
@ -107,11 +102,6 @@ func (f *FlowContext) IsNearlySynced() (bool, error) {
return f.Domain().Consensus().IsNearlySynced() return f.Domain().Consensus().IsNearlySynced()
} }
// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler
func (f *FlowContext) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler OnBlockAddedToDAGHandler) {
f.onBlockAddedToDAGHandler = onBlockAddedToDAGHandler
}
// SetOnNewBlockTemplateHandler sets the onNewBlockTemplateHandler handler // SetOnNewBlockTemplateHandler sets the onNewBlockTemplateHandler handler
func (f *FlowContext) SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler OnNewBlockTemplateHandler) { func (f *FlowContext) SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler OnNewBlockTemplateHandler) {
f.onNewBlockTemplateHandler = onNewBlockTemplateHandler f.onNewBlockTemplateHandler = onNewBlockTemplateHandler

View File

@ -2,10 +2,11 @@ package protocol
import ( import (
"fmt" "fmt"
"github.com/kaspanet/kaspad/app/protocol/common"
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/kaspanet/kaspad/app/protocol/common"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/domain"
@ -90,11 +91,6 @@ func (m *Manager) runFlows(flows []*common.Flow, peer *peerpkg.Peer, errChan <-c
return <-errChan return <-errChan
} }
// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler
func (m *Manager) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler flowcontext.OnBlockAddedToDAGHandler) {
m.context.SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler)
}
// SetOnNewBlockTemplateHandler sets the onNewBlockTemplate handler // SetOnNewBlockTemplateHandler sets the onNewBlockTemplate handler
func (m *Manager) SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler flowcontext.OnNewBlockTemplateHandler) { func (m *Manager) SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler flowcontext.OnNewBlockTemplateHandler) {
m.context.SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler) m.context.SetOnNewBlockTemplateHandler(onNewBlockTemplateHandler)

View File

@ -12,6 +12,7 @@ import (
"github.com/kaspanet/kaspad/infrastructure/network/addressmanager" "github.com/kaspanet/kaspad/infrastructure/network/addressmanager"
"github.com/kaspanet/kaspad/infrastructure/network/connmanager" "github.com/kaspanet/kaspad/infrastructure/network/connmanager"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter" "github.com/kaspanet/kaspad/infrastructure/network/netadapter"
"github.com/pkg/errors"
) )
// Manager is an RPC manager // Manager is an RPC manager
@ -28,7 +29,7 @@ func NewManager(
connectionManager *connmanager.ConnectionManager, connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager, addressManager *addressmanager.AddressManager,
utxoIndex *utxoindex.UTXOIndex, utxoIndex *utxoindex.UTXOIndex,
virtualChangeChan chan *externalapi.VirtualChangeSet, consensusEventsChan chan externalapi.ConsensusEvent,
shutDownChan chan<- struct{}) *Manager { shutDownChan chan<- struct{}) *Manager {
manager := Manager{ manager := Manager{
@ -45,29 +46,39 @@ func NewManager(
} }
netAdapter.SetRPCRouterInitializer(manager.routerInitializer) netAdapter.SetRPCRouterInitializer(manager.routerInitializer)
manager.initVirtualChangeHandler(virtualChangeChan) manager.initConsensusEventsHandler(consensusEventsChan)
return &manager return &manager
} }
func (m *Manager) initVirtualChangeHandler(virtualChangeChan chan *externalapi.VirtualChangeSet) { func (m *Manager) initConsensusEventsHandler(consensusEventsChan chan externalapi.ConsensusEvent) {
spawn("virtualChangeHandler", func() { spawn("consensusEventsHandler", func() {
for { for {
virtualChangeSet, ok := <-virtualChangeChan consensusEvent, ok := <-consensusEventsChan
if !ok { if !ok {
return return
} }
err := m.notifyVirtualChange(virtualChangeSet) switch event := consensusEvent.(type) {
if err != nil { case *externalapi.VirtualChangeSet:
panic(err) err := m.notifyVirtualChange(event)
if err != nil {
panic(err)
}
case *externalapi.BlockAdded:
err := m.notifyBlockAddedToDAG(event.Block)
if err != nil {
panic(err)
}
default:
panic(errors.Errorf("Got event of unsupported type %T", consensusEvent))
} }
} }
}) })
} }
// NotifyBlockAddedToDAG notifies the manager that a block has been added to the DAG // notifyBlockAddedToDAG notifies the manager that a block has been added to the DAG
func (m *Manager) NotifyBlockAddedToDAG(block *externalapi.DomainBlock) error { func (m *Manager) notifyBlockAddedToDAG(block *externalapi.DomainBlock) error {
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyBlockAddedToDAG") onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.notifyBlockAddedToDAG")
defer onEnd() defer onEnd()
// Before converting the block and populating it, we check if any listeners are interested. // Before converting the block and populating it, we check if any listeners are interested.

View File

@ -102,10 +102,8 @@ func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAd
for router, listener := range nm.listeners { for router, listener := range nm.listeners {
if listener.propagateBlockAddedNotifications { if listener.propagateBlockAddedNotifications {
err := router.OutgoingRoute().Enqueue(notification) err := router.OutgoingRoute().MaybeEnqueue(notification)
if errors.Is(err, routerpkg.ErrRouteClosed) { if err != nil {
log.Warnf("Couldn't send notification: %s", err)
} else if err != nil {
return err return err
} }
} }

View File

@ -23,7 +23,7 @@ type fakeDomain struct {
testapi.TestConsensus testapi.TestConsensus
} }
func (d fakeDomain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet { func (d fakeDomain) ConsensusEventsChannel() chan externalapi.ConsensusEvent {
panic("implement me") panic("implement me")
} }

View File

@ -1,10 +1,11 @@
package consensus package consensus
import ( import (
"github.com/kaspanet/kaspad/util/mstime"
"math/big" "math/big"
"sync" "sync"
"github.com/kaspanet/kaspad/util/mstime"
"github.com/kaspanet/kaspad/domain/consensus/database" "github.com/kaspanet/kaspad/domain/consensus/database"
"github.com/kaspanet/kaspad/domain/consensus/model" "github.com/kaspanet/kaspad/domain/consensus/model"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
@ -59,14 +60,18 @@ type consensus struct {
daaBlocksStore model.DAABlocksStore daaBlocksStore model.DAABlocksStore
blocksWithTrustedDataDAAWindowStore model.BlocksWithTrustedDataDAAWindowStore blocksWithTrustedDataDAAWindowStore model.BlocksWithTrustedDataDAAWindowStore
virtualChangeChan chan *externalapi.VirtualChangeSet consensusEventsChan chan externalapi.ConsensusEvent
} }
func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, error) { func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, error) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
return s.blockProcessor.ValidateAndInsertBlockWithTrustedData(block, validateUTXO) virtualChangeSet, _, err := s.blockProcessor.ValidateAndInsertBlockWithTrustedData(block, validateUTXO)
if err != nil {
return nil, err
}
return virtualChangeSet, nil
} }
// Init initializes consensus // Init initializes consensus
@ -131,7 +136,7 @@ func (s *consensus) Init(skipAddingGenesis bool) error {
}, },
}, },
} }
_, err = s.blockProcessor.ValidateAndInsertBlockWithTrustedData(genesisWithTrustedData, true) _, _, err = s.blockProcessor.ValidateAndInsertBlockWithTrustedData(genesisWithTrustedData, true)
if err != nil { if err != nil {
return err return err
} }
@ -192,12 +197,17 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, shoul
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
virtualChangeSet, err := s.blockProcessor.ValidateAndInsertBlock(block, shouldValidateAgainstUTXO) virtualChangeSet, blockStatus, err := s.blockProcessor.ValidateAndInsertBlock(block, shouldValidateAgainstUTXO)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = s.onVirtualChange(virtualChangeSet, shouldValidateAgainstUTXO) err = s.sendBlockAddedEvent(block, blockStatus)
if err != nil {
return nil, err
}
err = s.sendVirtualChangedEvent(virtualChangeSet, shouldValidateAgainstUTXO)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -205,13 +215,27 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, shoul
return virtualChangeSet, nil return virtualChangeSet, nil
} }
func (s *consensus) onVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet, wasVirtualUpdated bool) error { func (s *consensus) sendBlockAddedEvent(block *externalapi.DomainBlock, blockStatus externalapi.BlockStatus) error {
if !wasVirtualUpdated || s.virtualChangeChan == nil { if s.consensusEventsChan != nil {
if blockStatus == externalapi.StatusHeaderOnly || blockStatus == externalapi.StatusInvalid {
return nil
}
if len(s.consensusEventsChan) == cap(s.consensusEventsChan) {
return errors.Errorf("consensusEventsChan is full")
}
s.consensusEventsChan <- &externalapi.BlockAdded{Block: block}
}
return nil
}
func (s *consensus) sendVirtualChangedEvent(virtualChangeSet *externalapi.VirtualChangeSet, wasVirtualUpdated bool) error {
if !wasVirtualUpdated || s.consensusEventsChan == nil {
return nil return nil
} }
if len(s.virtualChangeChan) == cap(s.virtualChangeChan) { if len(s.consensusEventsChan) == cap(s.consensusEventsChan) {
return errors.Errorf("virtualChangeChan is full") return errors.Errorf("consensusEventsChan is full")
} }
stagingArea := model.NewStagingArea() stagingArea := model.NewStagingArea()
@ -234,7 +258,7 @@ func (s *consensus) onVirtualChange(virtualChangeSet *externalapi.VirtualChangeS
virtualChangeSet.VirtualSelectedParentBlueScore = virtualSelectedParentGHOSTDAGData.BlueScore() virtualChangeSet.VirtualSelectedParentBlueScore = virtualSelectedParentGHOSTDAGData.BlueScore()
virtualChangeSet.VirtualDAAScore = virtualDAAScore virtualChangeSet.VirtualDAAScore = virtualDAAScore
s.virtualChangeChan <- virtualChangeSet s.consensusEventsChan <- virtualChangeSet
return nil return nil
} }
@ -837,7 +861,7 @@ func (s *consensus) ResolveVirtual() (*externalapi.VirtualChangeSet, bool, error
return nil, false, err return nil, false, err
} }
err = s.onVirtualChange(virtualChangeSet, true) err = s.sendVirtualChangedEvent(virtualChangeSet, true)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }

View File

@ -1,6 +1,10 @@
package consensus package consensus
import ( import (
"io/ioutil"
"os"
"sync"
"github.com/kaspanet/kaspad/domain/consensus/datastructures/blockwindowheapslicestore" "github.com/kaspanet/kaspad/domain/consensus/datastructures/blockwindowheapslicestore"
"github.com/kaspanet/kaspad/domain/consensus/datastructures/daawindowstore" "github.com/kaspanet/kaspad/domain/consensus/datastructures/daawindowstore"
"github.com/kaspanet/kaspad/domain/consensus/datastructures/mergedepthrootstore" "github.com/kaspanet/kaspad/domain/consensus/datastructures/mergedepthrootstore"
@ -10,9 +14,6 @@ import (
"github.com/kaspanet/kaspad/domain/consensus/processes/pruningproofmanager" "github.com/kaspanet/kaspad/domain/consensus/processes/pruningproofmanager"
"github.com/kaspanet/kaspad/util/staging" "github.com/kaspanet/kaspad/util/staging"
"github.com/pkg/errors" "github.com/pkg/errors"
"io/ioutil"
"os"
"sync"
"github.com/kaspanet/kaspad/domain/prefixmanager/prefix" "github.com/kaspanet/kaspad/domain/prefixmanager/prefix"
"github.com/kaspanet/kaspad/util/txmass" "github.com/kaspanet/kaspad/util/txmass"
@ -77,7 +78,7 @@ type Config struct {
// Factory instantiates new Consensuses // Factory instantiates new Consensuses
type Factory interface { type Factory interface {
NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix, NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix,
virtualChangeChan chan *externalapi.VirtualChangeSet) ( consensusEventsChan chan externalapi.ConsensusEvent) (
externalapi.Consensus, bool, error) externalapi.Consensus, bool, error)
NewTestConsensus(config *Config, testName string) ( NewTestConsensus(config *Config, testName string) (
tc testapi.TestConsensus, teardown func(keepDataDir bool), err error) tc testapi.TestConsensus, teardown func(keepDataDir bool), err error)
@ -110,7 +111,7 @@ func NewFactory() Factory {
// NewConsensus instantiates a new Consensus // NewConsensus instantiates a new Consensus
func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix, func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix,
virtualChangeChan chan *externalapi.VirtualChangeSet) ( consensusEventsChan chan externalapi.ConsensusEvent) (
consensusInstance externalapi.Consensus, shouldMigrate bool, err error) { consensusInstance externalapi.Consensus, shouldMigrate bool, err error) {
dbManager := consensusdatabase.New(db) dbManager := consensusdatabase.New(db)
@ -513,7 +514,7 @@ func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Databas
daaBlocksStore: daaBlocksStore, daaBlocksStore: daaBlocksStore,
blocksWithTrustedDataDAAWindowStore: daaWindowStore, blocksWithTrustedDataDAAWindowStore: daaWindowStore,
virtualChangeChan: virtualChangeChan, consensusEventsChan: consensusEventsChan,
} }
if isOldReachabilityInitialized { if isOldReachabilityInitialized {

View File

@ -1,11 +1,11 @@
package consensus package consensus
import ( import (
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/prefixmanager/prefix"
"io/ioutil" "io/ioutil"
"testing" "testing"
"github.com/kaspanet/kaspad/domain/prefixmanager/prefix"
"github.com/kaspanet/kaspad/domain/dagconfig" "github.com/kaspanet/kaspad/domain/dagconfig"
"github.com/kaspanet/kaspad/infrastructure/db/database/ldb" "github.com/kaspanet/kaspad/infrastructure/db/database/ldb"
) )
@ -25,7 +25,7 @@ func TestNewConsensus(t *testing.T) {
t.Fatalf("error in NewLevelDB: %s", err) t.Fatalf("error in NewLevelDB: %s", err)
} }
_, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}, make(chan *externalapi.VirtualChangeSet)) _, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}, nil)
if err != nil { if err != nil {
t.Fatalf("error in NewConsensus: %+v", err) t.Fatalf("error in NewConsensus: %+v", err)
} }

View File

@ -1,6 +1,18 @@
package externalapi package externalapi
// VirtualChangeSet is auxiliary data returned from ValidateAndInsertBlock and ResolveVirtual // ConsensusEvent is an interface type that is implemented by all events raised by consensus
type ConsensusEvent interface {
isConsensusEvent()
}
// BlockAdded is an event raised by consensus when a block was added to the dag
type BlockAdded struct {
Block *DomainBlock
}
func (*BlockAdded) isConsensusEvent() {}
// VirtualChangeSet is an event raised by consensus when virtual changes
type VirtualChangeSet struct { type VirtualChangeSet struct {
VirtualSelectedParentChainChanges *SelectedChainPath VirtualSelectedParentChainChanges *SelectedChainPath
VirtualUTXODiff UTXODiff VirtualUTXODiff UTXODiff
@ -9,6 +21,8 @@ type VirtualChangeSet struct {
VirtualDAAScore uint64 VirtualDAAScore uint64
} }
func (*VirtualChangeSet) isConsensusEvent() {}
// SelectedChainPath is a path the of the selected chains between two blocks. // SelectedChainPath is a path the of the selected chains between two blocks.
type SelectedChainPath struct { type SelectedChainPath struct {
Added []*DomainHash Added []*DomainHash

View File

@ -4,7 +4,7 @@ import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
// BlockProcessor is responsible for processing incoming blocks // BlockProcessor is responsible for processing incoming blocks
type BlockProcessor interface { type BlockProcessor interface {
ValidateAndInsertBlock(block *externalapi.DomainBlock, shouldValidateAgainstUTXO bool) (*externalapi.VirtualChangeSet, error) ValidateAndInsertBlock(block *externalapi.DomainBlock, shouldValidateAgainstUTXO bool) (*externalapi.VirtualChangeSet, externalapi.BlockStatus, error)
ValidateAndInsertImportedPruningPoint(newPruningPoint *externalapi.DomainHash) error ValidateAndInsertImportedPruningPoint(newPruningPoint *externalapi.DomainHash) error
ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, error) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, externalapi.BlockStatus, error)
} }

View File

@ -144,7 +144,8 @@ func New(
// ValidateAndInsertBlock validates the given block and, if valid, applies it // ValidateAndInsertBlock validates the given block and, if valid, applies it
// to the current state // to the current state
func (bp *blockProcessor) ValidateAndInsertBlock(block *externalapi.DomainBlock, shouldValidateAgainstUTXO bool) (*externalapi.VirtualChangeSet, error) { func (bp *blockProcessor) ValidateAndInsertBlock(block *externalapi.DomainBlock,
shouldValidateAgainstUTXO bool) (*externalapi.VirtualChangeSet, externalapi.BlockStatus, error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "ValidateAndInsertBlock") onEnd := logger.LogAndMeasureExecutionTime(log, "ValidateAndInsertBlock")
defer onEnd() defer onEnd()
@ -160,7 +161,8 @@ func (bp *blockProcessor) ValidateAndInsertImportedPruningPoint(newPruningPoint
return bp.validateAndInsertImportedPruningPoint(stagingArea, newPruningPoint) return bp.validateAndInsertImportedPruningPoint(stagingArea, newPruningPoint)
} }
func (bp *blockProcessor) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, shouldValidateAgainstUTXO bool) (*externalapi.VirtualChangeSet, error) { func (bp *blockProcessor) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData,
shouldValidateAgainstUTXO bool) (*externalapi.VirtualChangeSet, externalapi.BlockStatus, error) {
onEnd := logger.LogAndMeasureExecutionTime(log, "ValidateAndInsertBlockWithTrustedData") onEnd := logger.LogAndMeasureExecutionTime(log, "ValidateAndInsertBlockWithTrustedData")
defer onEnd() defer onEnd()

View File

@ -19,18 +19,18 @@ import (
) )
func (bp *blockProcessor) setBlockStatusAfterBlockValidation( func (bp *blockProcessor) setBlockStatusAfterBlockValidation(
stagingArea *model.StagingArea, block *externalapi.DomainBlock, isPruningPoint bool) error { stagingArea *model.StagingArea, block *externalapi.DomainBlock, isPruningPoint bool) (externalapi.BlockStatus, error) {
blockHash := consensushashing.BlockHash(block) blockHash := consensushashing.BlockHash(block)
exists, err := bp.blockStatusStore.Exists(bp.databaseContext, stagingArea, blockHash) exists, err := bp.blockStatusStore.Exists(bp.databaseContext, stagingArea, blockHash)
if err != nil { if err != nil {
return err return externalapi.StatusInvalid, err
} }
if exists { if exists {
status, err := bp.blockStatusStore.Get(bp.databaseContext, stagingArea, blockHash) status, err := bp.blockStatusStore.Get(bp.databaseContext, stagingArea, blockHash)
if err != nil { if err != nil {
return err return externalapi.StatusInvalid, err
} }
if status == externalapi.StatusUTXOValid { if status == externalapi.StatusUTXOValid {
@ -39,12 +39,12 @@ func (bp *blockProcessor) setBlockStatusAfterBlockValidation(
// The only exception is the pruning point because its status is manually set before inserting // The only exception is the pruning point because its status is manually set before inserting
// the block. // the block.
if !isPruningPoint { if !isPruningPoint {
return errors.Errorf("block %s that is not the pruning point is not expected to be valid "+ return externalapi.StatusInvalid, errors.Errorf("block %s that is not the pruning point is not expected to be valid "+
"before adding to to the consensus state manager", blockHash) "before adding to to the consensus state manager", blockHash)
} }
log.Debugf("Block %s is the pruning point and has status %s, so leaving its status untouched", log.Debugf("Block %s is the pruning point and has status %s, so leaving its status untouched",
blockHash, status) blockHash, status)
return nil return status, nil
} }
} }
@ -53,13 +53,13 @@ func (bp *blockProcessor) setBlockStatusAfterBlockValidation(
log.Debugf("Block %s is a header-only block so setting its status as %s", log.Debugf("Block %s is a header-only block so setting its status as %s",
blockHash, externalapi.StatusHeaderOnly) blockHash, externalapi.StatusHeaderOnly)
bp.blockStatusStore.Stage(stagingArea, blockHash, externalapi.StatusHeaderOnly) bp.blockStatusStore.Stage(stagingArea, blockHash, externalapi.StatusHeaderOnly)
} else { return externalapi.StatusHeaderOnly, nil
log.Debugf("Block %s has body so setting its status as %s",
blockHash, externalapi.StatusUTXOPendingVerification)
bp.blockStatusStore.Stage(stagingArea, blockHash, externalapi.StatusUTXOPendingVerification)
} }
return nil log.Debugf("Block %s has body so setting its status as %s",
blockHash, externalapi.StatusUTXOPendingVerification)
bp.blockStatusStore.Stage(stagingArea, blockHash, externalapi.StatusUTXOPendingVerification)
return externalapi.StatusUTXOPendingVerification, nil
} }
func (bp *blockProcessor) updateVirtualAcceptanceDataAfterImportingPruningPoint(stagingArea *model.StagingArea) error { func (bp *blockProcessor) updateVirtualAcceptanceDataAfterImportingPruningPoint(stagingArea *model.StagingArea) error {
@ -78,29 +78,29 @@ func (bp *blockProcessor) updateVirtualAcceptanceDataAfterImportingPruningPoint(
} }
func (bp *blockProcessor) validateAndInsertBlock(stagingArea *model.StagingArea, block *externalapi.DomainBlock, func (bp *blockProcessor) validateAndInsertBlock(stagingArea *model.StagingArea, block *externalapi.DomainBlock,
isPruningPoint bool, shouldValidateAgainstUTXO bool, isBlockWithTrustedData bool) (*externalapi.VirtualChangeSet, error) { isPruningPoint bool, shouldValidateAgainstUTXO bool, isBlockWithTrustedData bool) (*externalapi.VirtualChangeSet, externalapi.BlockStatus, error) {
blockHash := consensushashing.HeaderHash(block.Header) blockHash := consensushashing.HeaderHash(block.Header)
err := bp.validateBlock(stagingArea, block, isBlockWithTrustedData) err := bp.validateBlock(stagingArea, block, isBlockWithTrustedData)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
err = bp.setBlockStatusAfterBlockValidation(stagingArea, block, isPruningPoint) status, err := bp.setBlockStatusAfterBlockValidation(stagingArea, block, isPruningPoint)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
var oldHeadersSelectedTip *externalapi.DomainHash var oldHeadersSelectedTip *externalapi.DomainHash
hasHeaderSelectedTip, err := bp.headersSelectedTipStore.Has(bp.databaseContext, stagingArea) hasHeaderSelectedTip, err := bp.headersSelectedTipStore.Has(bp.databaseContext, stagingArea)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
if hasHeaderSelectedTip { if hasHeaderSelectedTip {
var err error var err error
oldHeadersSelectedTip, err = bp.headersSelectedTipStore.HeadersSelectedTip(bp.databaseContext, stagingArea) oldHeadersSelectedTip, err = bp.headersSelectedTipStore.HeadersSelectedTip(bp.databaseContext, stagingArea)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
} }
@ -110,12 +110,12 @@ func (bp *blockProcessor) validateAndInsertBlock(stagingArea *model.StagingArea,
} else { } else {
pruningPoint, err := bp.pruningStore.PruningPoint(bp.databaseContext, stagingArea) pruningPoint, err := bp.pruningStore.PruningPoint(bp.databaseContext, stagingArea)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
isInSelectedChainOfPruningPoint, err := bp.dagTopologyManager.IsInSelectedParentChainOf(stagingArea, pruningPoint, blockHash) isInSelectedChainOfPruningPoint, err := bp.dagTopologyManager.IsInSelectedParentChainOf(stagingArea, pruningPoint, blockHash)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
// Don't set blocks in the anticone of the pruning point as header selected tip. // Don't set blocks in the anticone of the pruning point as header selected tip.
@ -126,7 +126,7 @@ func (bp *blockProcessor) validateAndInsertBlock(stagingArea *model.StagingArea,
// Don't set blocks in the anticone of the pruning point as header selected tip. // Don't set blocks in the anticone of the pruning point as header selected tip.
err = bp.headerTipsManager.AddHeaderTip(stagingArea, blockHash) err = bp.headerTipsManager.AddHeaderTip(stagingArea, blockHash)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
} }
@ -139,14 +139,14 @@ func (bp *blockProcessor) validateAndInsertBlock(stagingArea *model.StagingArea,
// Attempt to add the block to the virtual // Attempt to add the block to the virtual
selectedParentChainChanges, virtualUTXODiff, reversalData, err = bp.consensusStateManager.AddBlock(stagingArea, blockHash, shouldValidateAgainstUTXO) selectedParentChainChanges, virtualUTXODiff, reversalData, err = bp.consensusStateManager.AddBlock(stagingArea, blockHash, shouldValidateAgainstUTXO)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
} }
if hasHeaderSelectedTip { if hasHeaderSelectedTip {
err := bp.updateReachabilityReindexRoot(stagingArea, oldHeadersSelectedTip) err := bp.updateReachabilityReindexRoot(stagingArea, oldHeadersSelectedTip)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
} }
@ -154,13 +154,13 @@ func (bp *blockProcessor) validateAndInsertBlock(stagingArea *model.StagingArea,
// Trigger pruning, which will check if the pruning point changed and delete the data if it did. // Trigger pruning, which will check if the pruning point changed and delete the data if it did.
err = bp.pruningManager.UpdatePruningPointByVirtual(stagingArea) err = bp.pruningManager.UpdatePruningPointByVirtual(stagingArea)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
} }
err = staging.CommitAllChanges(bp.databaseContext, stagingArea) err = staging.CommitAllChanges(bp.databaseContext, stagingArea)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
if reversalData != nil { if reversalData != nil {
@ -171,13 +171,13 @@ func (bp *blockProcessor) validateAndInsertBlock(stagingArea *model.StagingArea,
if errors.Is(err, consensusstatemanager.ErrReverseUTXODiffsUTXODiffChildNotFound) { if errors.Is(err, consensusstatemanager.ErrReverseUTXODiffsUTXODiffChildNotFound) {
log.Errorf("Could not reverse UTXO diffs while resolving virtual: %s", err) log.Errorf("Could not reverse UTXO diffs while resolving virtual: %s", err)
} else if err != nil { } else if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
} }
err = bp.pruningManager.UpdatePruningPointIfRequired() err = bp.pruningManager.UpdatePruningPointIfRequired()
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
log.Debug(logger.NewLogClosure(func() string { log.Debug(logger.NewLogClosure(func() string {
@ -202,14 +202,14 @@ func (bp *blockProcessor) validateAndInsertBlock(stagingArea *model.StagingArea,
virtualGhostDAGData.BlueScore(), blockCount, headerCount) virtualGhostDAGData.BlueScore(), blockCount, headerCount)
})) }))
if logClosureErr != nil { if logClosureErr != nil {
return nil, logClosureErr return nil, externalapi.StatusInvalid, logClosureErr
} }
virtualParents, err := bp.dagTopologyManager.Parents(stagingArea, model.VirtualBlockHash) virtualParents, err := bp.dagTopologyManager.Parents(stagingArea, model.VirtualBlockHash)
if database.IsNotFoundError(err) { if database.IsNotFoundError(err) {
virtualParents = nil virtualParents = nil
} else if err != nil { } else if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
bp.pastMedianTimeManager.InvalidateVirtualPastMedianTimeCache() bp.pastMedianTimeManager.InvalidateVirtualPastMedianTimeCache()
@ -220,7 +220,7 @@ func (bp *blockProcessor) validateAndInsertBlock(stagingArea *model.StagingArea,
VirtualSelectedParentChainChanges: selectedParentChainChanges, VirtualSelectedParentChainChanges: selectedParentChainChanges,
VirtualUTXODiff: virtualUTXODiff, VirtualUTXODiff: virtualUTXODiff,
VirtualParents: virtualParents, VirtualParents: virtualParents,
}, nil }, status, nil
} }
func (bp *blockProcessor) loadUTXODataForGenesis(stagingArea *model.StagingArea, block *externalapi.DomainBlock) { func (bp *blockProcessor) loadUTXODataForGenesis(stagingArea *model.StagingArea, block *externalapi.DomainBlock) {

View File

@ -8,7 +8,7 @@ import (
) )
func (bp *blockProcessor) validateAndInsertBlockWithTrustedData(stagingArea *model.StagingArea, func (bp *blockProcessor) validateAndInsertBlockWithTrustedData(stagingArea *model.StagingArea,
block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, error) { block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, externalapi.BlockStatus, error) {
blockHash := consensushashing.BlockHash(block.Block) blockHash := consensushashing.BlockHash(block.Block)
for i, daaBlock := range block.DAAWindow { for i, daaBlock := range block.DAAWindow {
@ -22,7 +22,7 @@ func (bp *blockProcessor) validateAndInsertBlockWithTrustedData(stagingArea *mod
blockReplacedGHOSTDAGData, err := bp.ghostdagDataWithoutPrunedBlocks(stagingArea, block.GHOSTDAGData[0].GHOSTDAGData) blockReplacedGHOSTDAGData, err := bp.ghostdagDataWithoutPrunedBlocks(stagingArea, block.GHOSTDAGData[0].GHOSTDAGData)
if err != nil { if err != nil {
return nil, err return nil, externalapi.StatusInvalid, err
} }
bp.ghostdagDataStore.Stage(stagingArea, blockHash, blockReplacedGHOSTDAGData, false) bp.ghostdagDataStore.Stage(stagingArea, blockHash, blockReplacedGHOSTDAGData, false)

View File

@ -60,7 +60,7 @@ func (tc *testConsensus) AddBlock(parentHashes []*externalapi.DomainHash, coinba
return nil, nil, err return nil, nil, err
} }
virtualChangeSet, err := tc.blockProcessor.ValidateAndInsertBlock(block, true) virtualChangeSet, _, err := tc.blockProcessor.ValidateAndInsertBlock(block, true)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -80,7 +80,7 @@ func (tc *testConsensus) AddUTXOInvalidHeader(parentHashes []*externalapi.Domain
return nil, nil, err return nil, nil, err
} }
virtualChangeSet, err := tc.blockProcessor.ValidateAndInsertBlock(&externalapi.DomainBlock{ virtualChangeSet, _, err := tc.blockProcessor.ValidateAndInsertBlock(&externalapi.DomainBlock{
Header: header, Header: header,
Transactions: nil, Transactions: nil,
}, true) }, true)
@ -103,7 +103,7 @@ func (tc *testConsensus) AddUTXOInvalidBlock(parentHashes []*externalapi.DomainH
return nil, nil, err return nil, nil, err
} }
virtualChangeSet, err := tc.blockProcessor.ValidateAndInsertBlock(block, true) virtualChangeSet, _, err := tc.blockProcessor.ValidateAndInsertBlock(block, true)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -1,11 +1,12 @@
package domain package domain
import ( import (
"github.com/kaspanet/kaspad/domain/consensusreference"
"sync" "sync"
"sync/atomic" "sync/atomic"
"unsafe" "unsafe"
"github.com/kaspanet/kaspad/domain/consensusreference"
"github.com/kaspanet/kaspad/domain/consensus" "github.com/kaspanet/kaspad/domain/consensus"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/miningmanager" "github.com/kaspanet/kaspad/domain/miningmanager"
@ -24,21 +25,21 @@ type Domain interface {
InitStagingConsensusWithoutGenesis() error InitStagingConsensusWithoutGenesis() error
CommitStagingConsensus() error CommitStagingConsensus() error
DeleteStagingConsensus() error DeleteStagingConsensus() error
VirtualChangeChannel() chan *externalapi.VirtualChangeSet ConsensusEventsChannel() chan externalapi.ConsensusEvent
} }
type domain struct { type domain struct {
miningManager miningmanager.MiningManager miningManager miningmanager.MiningManager
consensus *externalapi.Consensus consensus *externalapi.Consensus
stagingConsensus *externalapi.Consensus stagingConsensus *externalapi.Consensus
stagingConsensusLock sync.RWMutex stagingConsensusLock sync.RWMutex
consensusConfig *consensus.Config consensusConfig *consensus.Config
db infrastructuredatabase.Database db infrastructuredatabase.Database
virtualChangeChan chan *externalapi.VirtualChangeSet consensusEventsChannel chan externalapi.ConsensusEvent
} }
func (d *domain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet { func (d *domain) ConsensusEventsChannel() chan externalapi.ConsensusEvent {
return d.virtualChangeChan return d.consensusEventsChannel
} }
func (d *domain) Consensus() externalapi.Consensus { func (d *domain) Consensus() externalapi.Consensus {
@ -92,7 +93,7 @@ func (d *domain) initStagingConsensus(cfg *consensus.Config) error {
consensusFactory := consensus.NewFactory() consensusFactory := consensus.NewFactory()
consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix, d.virtualChangeChan) consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix, d.consensusEventsChannel)
if err != nil { if err != nil {
return err return err
} }
@ -196,18 +197,18 @@ func New(consensusConfig *consensus.Config, mempoolConfig *mempool.Config, db in
} }
} }
virtualChangeChan := make(chan *externalapi.VirtualChangeSet, 100e3) consensusEventsChan := make(chan externalapi.ConsensusEvent, 100e3)
consensusFactory := consensus.NewFactory() consensusFactory := consensus.NewFactory()
consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix, virtualChangeChan) consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix, consensusEventsChan)
if err != nil { if err != nil {
return nil, err return nil, err
} }
domainInstance := &domain{ domainInstance := &domain{
consensus: &consensusInstance, consensus: &consensusInstance,
consensusConfig: consensusConfig, consensusConfig: consensusConfig,
db: db, db: db,
virtualChangeChan: virtualChangeChan, consensusEventsChannel: consensusEventsChan,
} }
if shouldMigrate { if shouldMigrate {