Use a channel for utxo change events (#2052)

* Use a channel from within consensus in order to raise change events in order -- note that this is only a draft commit for discussion

* Fix compilation

* Check for nil

* Allow nil virtualChangeChan

* Remove redundant comments

* Call notifyVirtualChange instead of notifyUTXOsChanged

* Remove redundant comment

* Add a separate function for initVirtualChangeHandler

* Remove redundant type

* Check for nil in the right place

* Fix integration test

* Add data to virtual changeset and cleanup block added event logic

* Renames

* Comment

Co-authored-by: Ori Newman <orinewman1@gmail.com>
This commit is contained in:
Michael Sutton 2022-05-19 14:07:48 +03:00 committed by GitHub
parent 5d24e2afbc
commit 016ddfdfce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 176 additions and 122 deletions

View File

@ -2,6 +2,7 @@ package app
import ( import (
"fmt" "fmt"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"sync/atomic" "sync/atomic"
"github.com/kaspanet/kaspad/domain/miningmanager/mempool" "github.com/kaspanet/kaspad/domain/miningmanager/mempool"
@ -67,6 +68,7 @@ func (a *ComponentManager) Stop() {
} }
a.protocolManager.Close() a.protocolManager.Close()
close(a.protocolManager.Context().Domain().VirtualChangeChannel())
return return
} }
@ -118,7 +120,7 @@ func NewComponentManager(cfg *config.Config, db infrastructuredatabase.Database,
if err != nil { if err != nil {
return nil, err return nil, err
} }
rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, interrupt) rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, domain.VirtualChangeChannel(), interrupt)
return &ComponentManager{ return &ComponentManager{
cfg: cfg, cfg: cfg,
@ -139,6 +141,7 @@ func setupRPC(
connectionManager *connmanager.ConnectionManager, connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager, addressManager *addressmanager.AddressManager,
utxoIndex *utxoindex.UTXOIndex, utxoIndex *utxoindex.UTXOIndex,
virtualChangeChan chan *externalapi.VirtualChangeSet,
shutDownChan chan<- struct{}, shutDownChan chan<- struct{},
) *rpc.Manager { ) *rpc.Manager {
@ -150,9 +153,9 @@ func setupRPC(
connectionManager, connectionManager,
addressManager, addressManager,
utxoIndex, utxoIndex,
virtualChangeChan,
shutDownChan, shutDownChan,
) )
protocolManager.SetOnVirtualChange(rpcManager.NotifyVirtualChange)
protocolManager.SetOnNewBlockTemplateHandler(rpcManager.NotifyNewBlockTemplate) protocolManager.SetOnNewBlockTemplateHandler(rpcManager.NotifyNewBlockTemplate)
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG) protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
protocolManager.SetOnPruningPointUTXOSetOverrideHandler(rpcManager.NotifyPruningPointUTXOSetOverride) protocolManager.SetOnPruningPointUTXOSetOverrideHandler(rpcManager.NotifyPruningPointUTXOSetOverride)

View File

@ -16,29 +16,24 @@ import (
// OnNewBlock updates the mempool after a new block arrival, and // OnNewBlock updates the mempool after a new block arrival, and
// relays newly unorphaned transactions and possibly rebroadcast // relays newly unorphaned transactions and possibly rebroadcast
// manually added transactions when not in IBD. // manually added transactions when not in IBD.
func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock, func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock) error {
virtualChangeSet *externalapi.VirtualChangeSet) error {
hash := consensushashing.BlockHash(block) hash := consensushashing.BlockHash(block)
log.Tracef("OnNewBlock start for block %s", hash) log.Tracef("OnNewBlock start for block %s", hash)
defer log.Tracef("OnNewBlock end for block %s", hash) defer log.Tracef("OnNewBlock end for block %s", hash)
unorphaningResults, err := f.UnorphanBlocks(block) unorphanedBlocks, err := f.UnorphanBlocks(block)
if err != nil { if err != nil {
return err return err
} }
log.Debugf("OnNewBlock: block %s unorphaned %d blocks", hash, len(unorphaningResults)) log.Debugf("OnNewBlock: block %s unorphaned %d blocks", hash, len(unorphanedBlocks))
newBlocks := []*externalapi.DomainBlock{block} newBlocks := []*externalapi.DomainBlock{block}
newVirtualChangeSets := []*externalapi.VirtualChangeSet{virtualChangeSet} newBlocks = append(newBlocks, unorphanedBlocks...)
for _, unorphaningResult := range unorphaningResults {
newBlocks = append(newBlocks, unorphaningResult.block)
newVirtualChangeSets = append(newVirtualChangeSets, unorphaningResult.virtualChangeSet)
}
allAcceptedTransactions := make([]*externalapi.DomainTransaction, 0) allAcceptedTransactions := make([]*externalapi.DomainTransaction, 0)
for i, newBlock := range newBlocks { for _, newBlock := range newBlocks {
log.Debugf("OnNewBlock: passing block %s transactions to mining manager", hash) log.Debugf("OnNewBlock: passing block %s transactions to mining manager", hash)
acceptedTransactions, err := f.Domain().MiningManager().HandleNewBlockTransactions(newBlock.Transactions) acceptedTransactions, err := f.Domain().MiningManager().HandleNewBlockTransactions(newBlock.Transactions)
if err != nil { if err != nil {
@ -48,8 +43,7 @@ func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock,
if f.onBlockAddedToDAGHandler != nil { if f.onBlockAddedToDAGHandler != nil {
log.Debugf("OnNewBlock: calling f.onBlockAddedToDAGHandler for block %s", hash) log.Debugf("OnNewBlock: calling f.onBlockAddedToDAGHandler for block %s", hash)
virtualChangeSet = newVirtualChangeSets[i] err := f.onBlockAddedToDAGHandler(newBlock)
err := f.onBlockAddedToDAGHandler(newBlock, virtualChangeSet)
if err != nil { if err != nil {
return err return err
} }
@ -59,15 +53,6 @@ func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock,
return f.broadcastTransactionsAfterBlockAdded(newBlocks, allAcceptedTransactions) return f.broadcastTransactionsAfterBlockAdded(newBlocks, allAcceptedTransactions)
} }
// OnVirtualChange calls the handler function whenever the virtual block changes.
func (f *FlowContext) OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error {
if f.onVirtualChangeHandler != nil && virtualChangeSet != nil {
return f.onVirtualChangeHandler(virtualChangeSet)
}
return nil
}
// OnNewBlockTemplate calls the handler function whenever a new block template is available for miners. // OnNewBlockTemplate calls the handler function whenever a new block template is available for miners.
func (f *FlowContext) OnNewBlockTemplate() error { func (f *FlowContext) OnNewBlockTemplate() error {
// Clear current template cache. Note we call this even if the handler is nil, in order to keep the // Clear current template cache. Note we call this even if the handler is nil, in order to keep the
@ -130,7 +115,7 @@ func (f *FlowContext) AddBlock(block *externalapi.DomainBlock) error {
return protocolerrors.Errorf(false, "cannot add header only block") return protocolerrors.Errorf(false, "cannot add header only block")
} }
virtualChangeSet, err := f.Domain().Consensus().ValidateAndInsertBlock(block, true) _, err := f.Domain().Consensus().ValidateAndInsertBlock(block, true)
if err != nil { if err != nil {
if errors.As(err, &ruleerrors.RuleError{}) { if errors.As(err, &ruleerrors.RuleError{}) {
log.Warnf("Validation failed for block %s: %s", consensushashing.BlockHash(block), err) log.Warnf("Validation failed for block %s: %s", consensushashing.BlockHash(block), err)
@ -141,7 +126,7 @@ func (f *FlowContext) AddBlock(block *externalapi.DomainBlock) error {
if err != nil { if err != nil {
return err return err
} }
err = f.OnNewBlock(block, virtualChangeSet) err = f.OnNewBlock(block)
if err != nil { if err != nil {
return err return err
} }

View File

@ -20,10 +20,7 @@ import (
// OnBlockAddedToDAGHandler is a handler function that's triggered // OnBlockAddedToDAGHandler is a handler function that's triggered
// when a block is added to the DAG // when a block is added to the DAG
type OnBlockAddedToDAGHandler func(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error type OnBlockAddedToDAGHandler func(block *externalapi.DomainBlock) error
// OnVirtualChangeHandler is a handler function that's triggered when the virtual changes
type OnVirtualChangeHandler func(virtualChangeSet *externalapi.VirtualChangeSet) error
// OnNewBlockTemplateHandler is a handler function that's triggered when a new block template is available // OnNewBlockTemplateHandler is a handler function that's triggered when a new block template is available
type OnNewBlockTemplateHandler func() error type OnNewBlockTemplateHandler func() error
@ -47,7 +44,6 @@ type FlowContext struct {
timeStarted int64 timeStarted int64
onVirtualChangeHandler OnVirtualChangeHandler
onBlockAddedToDAGHandler OnBlockAddedToDAGHandler onBlockAddedToDAGHandler OnBlockAddedToDAGHandler
onNewBlockTemplateHandler OnNewBlockTemplateHandler onNewBlockTemplateHandler OnNewBlockTemplateHandler
onPruningPointUTXOSetOverrideHandler OnPruningPointUTXOSetOverrideHandler onPruningPointUTXOSetOverrideHandler OnPruningPointUTXOSetOverrideHandler
@ -111,11 +107,6 @@ func (f *FlowContext) IsNearlySynced() (bool, error) {
return f.Domain().Consensus().IsNearlySynced() return f.Domain().Consensus().IsNearlySynced()
} }
// SetOnVirtualChangeHandler sets the onVirtualChangeHandler handler
func (f *FlowContext) SetOnVirtualChangeHandler(onVirtualChangeHandler OnVirtualChangeHandler) {
f.onVirtualChangeHandler = onVirtualChangeHandler
}
// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler // SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler
func (f *FlowContext) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler OnBlockAddedToDAGHandler) { func (f *FlowContext) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler OnBlockAddedToDAGHandler) {
f.onBlockAddedToDAGHandler = onBlockAddedToDAGHandler f.onBlockAddedToDAGHandler = onBlockAddedToDAGHandler

View File

@ -15,12 +15,6 @@ import (
// on: 2^orphanResolutionRange * PHANTOM K. // on: 2^orphanResolutionRange * PHANTOM K.
const maxOrphans = 600 const maxOrphans = 600
// UnorphaningResult is the result of unorphaning a block
type UnorphaningResult struct {
block *externalapi.DomainBlock
virtualChangeSet *externalapi.VirtualChangeSet
}
// AddOrphan adds the block to the orphan set // AddOrphan adds the block to the orphan set
func (f *FlowContext) AddOrphan(orphanBlock *externalapi.DomainBlock) { func (f *FlowContext) AddOrphan(orphanBlock *externalapi.DomainBlock) {
f.orphansMutex.Lock() f.orphansMutex.Lock()
@ -57,7 +51,7 @@ func (f *FlowContext) IsOrphan(blockHash *externalapi.DomainHash) bool {
} }
// UnorphanBlocks removes the block from the orphan set, and remove all of the blocks that are not orphans anymore. // UnorphanBlocks removes the block from the orphan set, and remove all of the blocks that are not orphans anymore.
func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*UnorphaningResult, error) { func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*externalapi.DomainBlock, error) {
f.orphansMutex.Lock() f.orphansMutex.Lock()
defer f.orphansMutex.Unlock() defer f.orphansMutex.Unlock()
@ -66,7 +60,7 @@ func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*Uno
rootBlockHash := consensushashing.BlockHash(rootBlock) rootBlockHash := consensushashing.BlockHash(rootBlock)
processQueue := f.addChildOrphansToProcessQueue(rootBlockHash, []externalapi.DomainHash{}) processQueue := f.addChildOrphansToProcessQueue(rootBlockHash, []externalapi.DomainHash{})
var unorphaningResults []*UnorphaningResult var unorphanedBlocks []*externalapi.DomainBlock
for len(processQueue) > 0 { for len(processQueue) > 0 {
var orphanHash externalapi.DomainHash var orphanHash externalapi.DomainHash
orphanHash, processQueue = processQueue[0], processQueue[1:] orphanHash, processQueue = processQueue[0], processQueue[1:]
@ -90,21 +84,18 @@ func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*Uno
} }
} }
if canBeUnorphaned { if canBeUnorphaned {
virtualChangeSet, unorphaningSucceeded, err := f.unorphanBlock(orphanHash) unorphaningSucceeded, err := f.unorphanBlock(orphanHash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if unorphaningSucceeded { if unorphaningSucceeded {
unorphaningResults = append(unorphaningResults, &UnorphaningResult{ unorphanedBlocks = append(unorphanedBlocks, orphanBlock)
block: orphanBlock,
virtualChangeSet: virtualChangeSet,
})
processQueue = f.addChildOrphansToProcessQueue(&orphanHash, processQueue) processQueue = f.addChildOrphansToProcessQueue(&orphanHash, processQueue)
} }
} }
} }
return unorphaningResults, nil return unorphanedBlocks, nil
} }
// addChildOrphansToProcessQueue finds all child orphans of `blockHash` // addChildOrphansToProcessQueue finds all child orphans of `blockHash`
@ -143,24 +134,24 @@ func (f *FlowContext) findChildOrphansOfBlock(blockHash *externalapi.DomainHash)
return childOrphans return childOrphans
} }
func (f *FlowContext) unorphanBlock(orphanHash externalapi.DomainHash) (*externalapi.VirtualChangeSet, bool, error) { func (f *FlowContext) unorphanBlock(orphanHash externalapi.DomainHash) (bool, error) {
orphanBlock, ok := f.orphans[orphanHash] orphanBlock, ok := f.orphans[orphanHash]
if !ok { if !ok {
return nil, false, errors.Errorf("attempted to unorphan a non-orphan block %s", orphanHash) return false, errors.Errorf("attempted to unorphan a non-orphan block %s", orphanHash)
} }
delete(f.orphans, orphanHash) delete(f.orphans, orphanHash)
virtualChangeSet, err := f.domain.Consensus().ValidateAndInsertBlock(orphanBlock, true) _, err := f.domain.Consensus().ValidateAndInsertBlock(orphanBlock, true)
if err != nil { if err != nil {
if errors.As(err, &ruleerrors.RuleError{}) { if errors.As(err, &ruleerrors.RuleError{}) {
log.Warnf("Validation failed for orphan block %s: %s", orphanHash, err) log.Warnf("Validation failed for orphan block %s: %s", orphanHash, err)
return nil, false, nil return false, nil
} }
return nil, false, err return false, err
} }
log.Infof("Unorphaned block %s", orphanHash) log.Infof("Unorphaned block %s", orphanHash)
return virtualChangeSet, true, nil return true, nil
} }
// GetOrphanRoots returns the roots of the missing ancestors DAG of the given orphan // GetOrphanRoots returns the roots of the missing ancestors DAG of the given orphan

View File

@ -26,8 +26,7 @@ var orphanResolutionRange uint32 = 5
type RelayInvsContext interface { type RelayInvsContext interface {
Domain() domain.Domain Domain() domain.Domain
Config() *config.Config Config() *config.Config
OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error OnNewBlock(block *externalapi.DomainBlock) error
OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error
OnNewBlockTemplate() error OnNewBlockTemplate() error
OnPruningPointUTXOSetOverride() error OnPruningPointUTXOSetOverride() error
SharedRequestedBlocks() *flowcontext.SharedRequestedBlocks SharedRequestedBlocks() *flowcontext.SharedRequestedBlocks
@ -174,7 +173,7 @@ func (flow *handleRelayInvsFlow) start() error {
if err != nil { if err != nil {
return err return err
} }
missingParents, virtualChangeSet, err := flow.processBlock(block) missingParents, err := flow.processBlock(block)
if err != nil { if err != nil {
if errors.Is(err, ruleerrors.ErrPrunedBlock) { if errors.Is(err, ruleerrors.ErrPrunedBlock) {
log.Infof("Ignoring pruned block %s", inv.Hash) log.Infof("Ignoring pruned block %s", inv.Hash)
@ -233,7 +232,7 @@ func (flow *handleRelayInvsFlow) start() error {
} }
log.Infof("Accepted block %s via relay", inv.Hash) log.Infof("Accepted block %s via relay", inv.Hash)
err = flow.OnNewBlock(block, virtualChangeSet) err = flow.OnNewBlock(block)
if err != nil { if err != nil {
return err return err
} }
@ -320,25 +319,25 @@ func (flow *handleRelayInvsFlow) readMsgBlock() (msgBlock *appmessage.MsgBlock,
} }
} }
func (flow *handleRelayInvsFlow) processBlock(block *externalapi.DomainBlock) ([]*externalapi.DomainHash, *externalapi.VirtualChangeSet, error) { func (flow *handleRelayInvsFlow) processBlock(block *externalapi.DomainBlock) ([]*externalapi.DomainHash, error) {
blockHash := consensushashing.BlockHash(block) blockHash := consensushashing.BlockHash(block)
virtualChangeSet, err := flow.Domain().Consensus().ValidateAndInsertBlock(block, true) _, err := flow.Domain().Consensus().ValidateAndInsertBlock(block, true)
if err != nil { if err != nil {
if !errors.As(err, &ruleerrors.RuleError{}) { if !errors.As(err, &ruleerrors.RuleError{}) {
return nil, nil, errors.Wrapf(err, "failed to process block %s", blockHash) return nil, errors.Wrapf(err, "failed to process block %s", blockHash)
} }
missingParentsError := &ruleerrors.ErrMissingParents{} missingParentsError := &ruleerrors.ErrMissingParents{}
if errors.As(err, missingParentsError) { if errors.As(err, missingParentsError) {
return missingParentsError.MissingParentHashes, nil, nil return missingParentsError.MissingParentHashes, nil
} }
// A duplicate block should not appear to the user as a warning and is already reported in the calling function // A duplicate block should not appear to the user as a warning and is already reported in the calling function
if !errors.Is(err, ruleerrors.ErrDuplicateBlock) { if !errors.Is(err, ruleerrors.ErrDuplicateBlock) {
log.Warnf("Rejected block %s from %s: %s", blockHash, flow.peer, err) log.Warnf("Rejected block %s from %s: %s", blockHash, flow.peer, err)
} }
return nil, nil, protocolerrors.Wrapf(true, err, "got invalid block %s from relay", blockHash) return nil, protocolerrors.Wrapf(true, err, "got invalid block %s from relay", blockHash)
} }
return nil, virtualChangeSet, nil return nil, nil
} }
func (flow *handleRelayInvsFlow) relayBlock(block *externalapi.DomainBlock) error { func (flow *handleRelayInvsFlow) relayBlock(block *externalapi.DomainBlock) error {

View File

@ -20,8 +20,7 @@ import (
type IBDContext interface { type IBDContext interface {
Domain() domain.Domain Domain() domain.Domain
Config() *config.Config Config() *config.Config
OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error OnNewBlock(block *externalapi.DomainBlock) error
OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error
OnNewBlockTemplate() error OnNewBlockTemplate() error
OnPruningPointUTXOSetOverride() error OnPruningPointUTXOSetOverride() error
IsIBDRunning() bool IsIBDRunning() bool
@ -655,7 +654,7 @@ func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHa
return err return err
} }
virtualChangeSet, err := flow.Domain().Consensus().ValidateAndInsertBlock(block, false) _, err = flow.Domain().Consensus().ValidateAndInsertBlock(block, false)
if err != nil { if err != nil {
if errors.Is(err, ruleerrors.ErrDuplicateBlock) { if errors.Is(err, ruleerrors.ErrDuplicateBlock) {
log.Debugf("Skipping IBD Block %s as it has already been added to the DAG", blockHash) log.Debugf("Skipping IBD Block %s as it has already been added to the DAG", blockHash)
@ -663,7 +662,7 @@ func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHa
} }
return protocolerrors.ConvertToBanningProtocolErrorIfRuleError(err, "invalid block %s", blockHash) return protocolerrors.ConvertToBanningProtocolErrorIfRuleError(err, "invalid block %s", blockHash)
} }
err = flow.OnNewBlock(block, virtualChangeSet) err = flow.OnNewBlock(block)
if err != nil { if err != nil {
return err return err
} }
@ -706,12 +705,7 @@ func (flow *handleIBDFlow) resolveVirtual(estimatedVirtualDAAScoreTarget uint64)
} }
log.Infof("Resolving virtual. Estimated progress: %d%%", percents) log.Infof("Resolving virtual. Estimated progress: %d%%", percents)
} }
virtualChangeSet, isCompletelyResolved, err := flow.Domain().Consensus().ResolveVirtual() _, isCompletelyResolved, err := flow.Domain().Consensus().ResolveVirtual()
if err != nil {
return err
}
err = flow.OnVirtualChange(virtualChangeSet)
if err != nil { if err != nil {
return err return err
} }

View File

@ -90,11 +90,6 @@ func (m *Manager) runFlows(flows []*common.Flow, peer *peerpkg.Peer, errChan <-c
return <-errChan return <-errChan
} }
// SetOnVirtualChange sets the onVirtualChangeHandler handler
func (m *Manager) SetOnVirtualChange(onVirtualChangeHandler flowcontext.OnVirtualChangeHandler) {
m.context.SetOnVirtualChangeHandler(onVirtualChangeHandler)
}
// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler // SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler
func (m *Manager) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler flowcontext.OnBlockAddedToDAGHandler) { func (m *Manager) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler flowcontext.OnBlockAddedToDAGHandler) {
m.context.SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler) m.context.SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler)

View File

@ -28,6 +28,7 @@ func NewManager(
connectionManager *connmanager.ConnectionManager, connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager, addressManager *addressmanager.AddressManager,
utxoIndex *utxoindex.UTXOIndex, utxoIndex *utxoindex.UTXOIndex,
virtualChangeChan chan *externalapi.VirtualChangeSet,
shutDownChan chan<- struct{}) *Manager { shutDownChan chan<- struct{}) *Manager {
manager := Manager{ manager := Manager{
@ -44,14 +45,37 @@ func NewManager(
} }
netAdapter.SetRPCRouterInitializer(manager.routerInitializer) netAdapter.SetRPCRouterInitializer(manager.routerInitializer)
manager.initVirtualChangeHandler(virtualChangeChan)
return &manager return &manager
} }
func (m *Manager) initVirtualChangeHandler(virtualChangeChan chan *externalapi.VirtualChangeSet) {
spawn("virtualChangeHandler", func() {
for {
virtualChangeSet, ok := <-virtualChangeChan
if !ok {
return
}
err := m.notifyVirtualChange(virtualChangeSet)
if err != nil {
panic(err)
}
}
})
}
// NotifyBlockAddedToDAG notifies the manager that a block has been added to the DAG // NotifyBlockAddedToDAG notifies the manager that a block has been added to the DAG
func (m *Manager) NotifyBlockAddedToDAG(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error { func (m *Manager) NotifyBlockAddedToDAG(block *externalapi.DomainBlock) error {
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyBlockAddedToDAG") onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyBlockAddedToDAG")
defer onEnd() defer onEnd()
// Before converting the block and populating it, we check if any listeners are interested.
// This is done since most nodes do not use this event.
if !m.context.NotificationManager.HasBlockAddedListeners() {
return nil
}
rpcBlock := appmessage.DomainBlockToRPCBlock(block) rpcBlock := appmessage.DomainBlockToRPCBlock(block)
err := m.context.PopulateBlockWithVerboseData(rpcBlock, block.Header, block, true) err := m.context.PopulateBlockWithVerboseData(rpcBlock, block.Header, block, true)
if err != nil { if err != nil {
@ -63,38 +87,43 @@ func (m *Manager) NotifyBlockAddedToDAG(block *externalapi.DomainBlock, virtualC
return err return err
} }
// When block was added during IBD - it doesn't incur any Virtual change, return nil
// thus no notification is needed.
if len(virtualChangeSet.VirtualSelectedParentChainChanges.Added) == 0 &&
len(virtualChangeSet.VirtualSelectedParentChainChanges.Removed) == 0 {
return nil
}
return m.NotifyVirtualChange(virtualChangeSet)
} }
// NotifyVirtualChange notifies the manager that the virtual block has been changed. // notifyVirtualChange notifies the manager that the virtual block has been changed.
func (m *Manager) NotifyVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error { func (m *Manager) notifyVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error {
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualChange") onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualChange")
defer onEnd() defer onEnd()
if m.context.Config.UTXOIndex { /*
NOTE: nothing under this function is allowed to acquire the consensus lock, since
the function is triggered by a channel call under consensus lock which might block
*/
if m.context.Config.UTXOIndex && virtualChangeSet.VirtualUTXODiff != nil {
err := m.notifyUTXOsChanged(virtualChangeSet) err := m.notifyUTXOsChanged(virtualChangeSet)
if err != nil { if err != nil {
return err return err
} }
} }
err := m.notifyVirtualSelectedParentBlueScoreChanged() err := m.notifyVirtualSelectedParentBlueScoreChanged(virtualChangeSet.VirtualSelectedParentBlueScore)
if err != nil { if err != nil {
return err return err
} }
err = m.notifyVirtualDaaScoreChanged() err = m.notifyVirtualDaaScoreChanged(virtualChangeSet.VirtualDAAScore)
if err != nil { if err != nil {
return err return err
} }
if virtualChangeSet.VirtualSelectedParentChainChanges == nil ||
(len(virtualChangeSet.VirtualSelectedParentChainChanges.Added) == 0 &&
len(virtualChangeSet.VirtualSelectedParentChainChanges.Removed) == 0) {
return nil
}
err = m.notifyVirtualSelectedParentChainChanged(virtualChangeSet) err = m.notifyVirtualSelectedParentChainChanged(virtualChangeSet)
if err != nil { if err != nil {
return err return err
@ -152,6 +181,7 @@ func (m *Manager) notifyUTXOsChanged(virtualChangeSet *externalapi.VirtualChange
if err != nil { if err != nil {
return err return err
} }
return m.context.NotificationManager.NotifyUTXOsChanged(utxoIndexChanges) return m.context.NotificationManager.NotifyUTXOsChanged(utxoIndexChanges)
} }
@ -167,33 +197,18 @@ func (m *Manager) notifyPruningPointUTXOSetOverride() error {
return m.context.NotificationManager.NotifyPruningPointUTXOSetOverride() return m.context.NotificationManager.NotifyPruningPointUTXOSetOverride()
} }
func (m *Manager) notifyVirtualSelectedParentBlueScoreChanged() error { func (m *Manager) notifyVirtualSelectedParentBlueScoreChanged(virtualSelectedParentBlueScore uint64) error {
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentBlueScoreChanged") onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentBlueScoreChanged")
defer onEnd() defer onEnd()
virtualSelectedParent, err := m.context.Domain.Consensus().GetVirtualSelectedParent() notification := appmessage.NewVirtualSelectedParentBlueScoreChangedNotificationMessage(virtualSelectedParentBlueScore)
if err != nil {
return err
}
blockInfo, err := m.context.Domain.Consensus().GetBlockInfo(virtualSelectedParent)
if err != nil {
return err
}
notification := appmessage.NewVirtualSelectedParentBlueScoreChangedNotificationMessage(blockInfo.BlueScore)
return m.context.NotificationManager.NotifyVirtualSelectedParentBlueScoreChanged(notification) return m.context.NotificationManager.NotifyVirtualSelectedParentBlueScoreChanged(notification)
} }
func (m *Manager) notifyVirtualDaaScoreChanged() error { func (m *Manager) notifyVirtualDaaScoreChanged(virtualDAAScore uint64) error {
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualDaaScoreChanged") onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualDaaScoreChanged")
defer onEnd() defer onEnd()
virtualDAAScore, err := m.context.Domain.Consensus().GetVirtualDAAScore()
if err != nil {
return err
}
notification := appmessage.NewVirtualDaaScoreChangedNotificationMessage(virtualDAAScore) notification := appmessage.NewVirtualDaaScoreChangedNotificationMessage(virtualDAAScore)
return m.context.NotificationManager.NotifyVirtualDaaScoreChanged(notification) return m.context.NotificationManager.NotifyVirtualDaaScoreChanged(notification)
} }

View File

@ -82,6 +82,19 @@ func (nm *NotificationManager) Listener(router *routerpkg.Router) (*Notification
return listener, nil return listener, nil
} }
// HasBlockAddedListeners indicates if the notification manager has any listeners for `BlockAdded` events
func (nm *NotificationManager) HasBlockAddedListeners() bool {
nm.RLock()
defer nm.RUnlock()
for _, listener := range nm.listeners {
if listener.propagateBlockAddedNotifications {
return true
}
}
return false
}
// NotifyBlockAdded notifies the notification manager that a block has been added to the DAG // NotifyBlockAdded notifies the notification manager that a block has been added to the DAG
func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAddedNotificationMessage) error { func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAddedNotificationMessage) error {
nm.RLock() nm.RLock()

View File

@ -23,6 +23,10 @@ type fakeDomain struct {
testapi.TestConsensus testapi.TestConsensus
} }
func (d fakeDomain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet {
panic("implement me")
}
func (d fakeDomain) DeleteStagingConsensus() error { func (d fakeDomain) DeleteStagingConsensus() error {
panic("implement me") panic("implement me")
} }

View File

@ -58,6 +58,8 @@ type consensus struct {
headersSelectedChainStore model.HeadersSelectedChainStore headersSelectedChainStore model.HeadersSelectedChainStore
daaBlocksStore model.DAABlocksStore daaBlocksStore model.DAABlocksStore
blocksWithTrustedDataDAAWindowStore model.BlocksWithTrustedDataDAAWindowStore blocksWithTrustedDataDAAWindowStore model.BlocksWithTrustedDataDAAWindowStore
virtualChangeChan chan *externalapi.VirtualChangeSet
} }
func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, error) { func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, error) {
@ -190,7 +192,46 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, shoul
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
return s.blockProcessor.ValidateAndInsertBlock(block, shouldValidateAgainstUTXO) virtualChangeSet, err := s.blockProcessor.ValidateAndInsertBlock(block, shouldValidateAgainstUTXO)
if err != nil {
return nil, err
}
err = s.onVirtualChange(virtualChangeSet, shouldValidateAgainstUTXO)
if err != nil {
return nil, err
}
return virtualChangeSet, nil
}
func (s *consensus) onVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet, wasVirtualUpdated bool) error {
if !wasVirtualUpdated || s.virtualChangeChan == nil {
return nil
}
stagingArea := model.NewStagingArea()
virtualGHOSTDAGData, err := s.ghostdagDataStores[0].Get(s.databaseContext, stagingArea, model.VirtualBlockHash, false)
if err != nil {
return err
}
virtualSelectedParentGHOSTDAGData, err := s.ghostdagDataStores[0].Get(s.databaseContext, stagingArea, virtualGHOSTDAGData.SelectedParent(), false)
if err != nil {
return err
}
virtualDAAScore, err := s.daaBlocksStore.DAAScore(s.databaseContext, stagingArea, model.VirtualBlockHash)
if err != nil {
return err
}
// Populate the change set with additional data before sending
virtualChangeSet.VirtualSelectedParentBlueScore = virtualSelectedParentGHOSTDAGData.BlueScore()
virtualChangeSet.VirtualDAAScore = virtualDAAScore
s.virtualChangeChan <- virtualChangeSet
return nil
} }
// ValidateTransactionAndPopulateWithConsensusData validates the given transaction // ValidateTransactionAndPopulateWithConsensusData validates the given transaction
@ -792,6 +833,11 @@ func (s *consensus) ResolveVirtual() (*externalapi.VirtualChangeSet, bool, error
return nil, false, err return nil, false, err
} }
err = s.onVirtualChange(virtualChangeSet, true)
if err != nil {
return nil, false, err
}
return virtualChangeSet, isCompletelyResolved, nil return virtualChangeSet, isCompletelyResolved, nil
} }

View File

@ -76,7 +76,8 @@ type Config struct {
// Factory instantiates new Consensuses // Factory instantiates new Consensuses
type Factory interface { type Factory interface {
NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix) ( NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix,
virtualChangeChan chan *externalapi.VirtualChangeSet) (
externalapi.Consensus, bool, error) externalapi.Consensus, bool, error)
NewTestConsensus(config *Config, testName string) ( NewTestConsensus(config *Config, testName string) (
tc testapi.TestConsensus, teardown func(keepDataDir bool), err error) tc testapi.TestConsensus, teardown func(keepDataDir bool), err error)
@ -108,7 +109,8 @@ func NewFactory() Factory {
} }
// NewConsensus instantiates a new Consensus // NewConsensus instantiates a new Consensus
func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix) ( func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix,
virtualChangeChan chan *externalapi.VirtualChangeSet) (
consensusInstance externalapi.Consensus, shouldMigrate bool, err error) { consensusInstance externalapi.Consensus, shouldMigrate bool, err error) {
dbManager := consensusdatabase.New(db) dbManager := consensusdatabase.New(db)
@ -510,6 +512,8 @@ func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Databas
headersSelectedChainStore: headersSelectedChainStore, headersSelectedChainStore: headersSelectedChainStore,
daaBlocksStore: daaBlocksStore, daaBlocksStore: daaBlocksStore,
blocksWithTrustedDataDAAWindowStore: daaWindowStore, blocksWithTrustedDataDAAWindowStore: daaWindowStore,
virtualChangeChan: virtualChangeChan,
} }
if isOldReachabilityInitialized { if isOldReachabilityInitialized {
@ -572,7 +576,7 @@ func (f *factory) NewTestConsensus(config *Config, testName string) (
} }
testConsensusDBPrefix := &prefix.Prefix{} testConsensusDBPrefix := &prefix.Prefix{}
consensusAsInterface, shouldMigrate, err := f.NewConsensus(config, db, testConsensusDBPrefix) consensusAsInterface, shouldMigrate, err := f.NewConsensus(config, db, testConsensusDBPrefix, nil)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -1,6 +1,7 @@
package consensus package consensus
import ( import (
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/prefixmanager/prefix" "github.com/kaspanet/kaspad/domain/prefixmanager/prefix"
"io/ioutil" "io/ioutil"
"testing" "testing"
@ -24,7 +25,7 @@ func TestNewConsensus(t *testing.T) {
t.Fatalf("error in NewLevelDB: %s", err) t.Fatalf("error in NewLevelDB: %s", err)
} }
_, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}) _, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}, make(chan *externalapi.VirtualChangeSet))
if err != nil { if err != nil {
t.Fatalf("error in NewConsensus: %+v", err) t.Fatalf("error in NewConsensus: %+v", err)
} }

View File

@ -5,6 +5,8 @@ type VirtualChangeSet struct {
VirtualSelectedParentChainChanges *SelectedChainPath VirtualSelectedParentChainChanges *SelectedChainPath
VirtualUTXODiff UTXODiff VirtualUTXODiff UTXODiff
VirtualParents []*DomainHash VirtualParents []*DomainHash
VirtualSelectedParentBlueScore uint64
VirtualDAAScore uint64
} }
// SelectedChainPath is a path the of the selected chains between two blocks. // SelectedChainPath is a path the of the selected chains between two blocks.

View File

@ -24,6 +24,7 @@ type Domain interface {
InitStagingConsensusWithoutGenesis() error InitStagingConsensusWithoutGenesis() error
CommitStagingConsensus() error CommitStagingConsensus() error
DeleteStagingConsensus() error DeleteStagingConsensus() error
VirtualChangeChannel() chan *externalapi.VirtualChangeSet
} }
type domain struct { type domain struct {
@ -33,6 +34,11 @@ type domain struct {
stagingConsensusLock sync.RWMutex stagingConsensusLock sync.RWMutex
consensusConfig *consensus.Config consensusConfig *consensus.Config
db infrastructuredatabase.Database db infrastructuredatabase.Database
virtualChangeChan chan *externalapi.VirtualChangeSet
}
func (d *domain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet {
return d.virtualChangeChan
} }
func (d *domain) Consensus() externalapi.Consensus { func (d *domain) Consensus() externalapi.Consensus {
@ -86,7 +92,7 @@ func (d *domain) initStagingConsensus(cfg *consensus.Config) error {
consensusFactory := consensus.NewFactory() consensusFactory := consensus.NewFactory()
consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix) consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix, d.virtualChangeChan)
if err != nil { if err != nil {
return err return err
} }
@ -190,16 +196,18 @@ func New(consensusConfig *consensus.Config, mempoolConfig *mempool.Config, db in
} }
} }
virtualChangeChan := make(chan *externalapi.VirtualChangeSet, 1000)
consensusFactory := consensus.NewFactory() consensusFactory := consensus.NewFactory()
consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix) consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix, virtualChangeChan)
if err != nil { if err != nil {
return nil, err return nil, err
} }
domainInstance := &domain{ domainInstance := &domain{
consensus: &consensusInstance, consensus: &consensusInstance,
consensusConfig: consensusConfig, consensusConfig: consensusConfig,
db: db, db: db,
virtualChangeChan: virtualChangeChan,
} }
if shouldMigrate { if shouldMigrate {

View File

@ -43,6 +43,9 @@ func New(domain domain.Domain, database database.Database) (*UTXOIndex, error) {
// Reset deletes the whole UTXO index and resyncs it from consensus. // Reset deletes the whole UTXO index and resyncs it from consensus.
func (ui *UTXOIndex) Reset() error { func (ui *UTXOIndex) Reset() error {
ui.mutex.Lock()
defer ui.mutex.Unlock()
err := ui.store.deleteAll() err := ui.store.deleteAll()
if err != nil { if err != nil {
return err return err