diff --git a/blockdag/dag.go b/blockdag/dag.go index ee117f3b7..ad1bb4182 100644 --- a/blockdag/dag.go +++ b/blockdag/dag.go @@ -153,6 +153,9 @@ type BlockDAG struct { utxoDiffStore *utxoDiffStore reachabilityStore *reachabilityStore multisetStore *multisetStore + + recentBlockProcessingTimestamps []time.Time + startTime time.Time } // IsKnownBlock returns whether or not the DAG instance has the block represented @@ -2092,6 +2095,7 @@ func New(config *Config) (*BlockDAG, error) { deploymentCaches: newThresholdCaches(dagconfig.DefinedDeployments), blockCount: 0, subnetworkID: config.SubnetworkID, + startTime: time.Now(), } dag.virtual = newVirtualBlock(dag, nil) diff --git a/blockdag/process.go b/blockdag/process.go index 8af27846b..437b6f6c6 100644 --- a/blockdag/process.go +++ b/blockdag/process.go @@ -253,6 +253,8 @@ func (dag *BlockDAG) processBlockNoLock(block *util.Block, flags BehaviorFlags) } } + dag.addBlockProcessingTimestamp() + log.Debugf("Accepted block %s", blockHash) return false, false, nil diff --git a/blockdag/sync_rate.go b/blockdag/sync_rate.go new file mode 100644 index 000000000..0ab955621 --- /dev/null +++ b/blockdag/sync_rate.go @@ -0,0 +1,57 @@ +package blockdag + +import "time" + +const syncRateWindowDuration = 15 * time.Minute + +// addBlockProcessingTimestamp adds the last block processing timestamp in order to measure the recent sync rate. +// +// This function MUST be called with the DAG state lock held (for writes). +func (dag *BlockDAG) addBlockProcessingTimestamp() { + now := time.Now() + dag.recentBlockProcessingTimestamps = append(dag.recentBlockProcessingTimestamps, now) + dag.removeNonRecentTimestampsFromRecentBlockProcessingTimestamps() +} + +// removeNonRecentTimestampsFromRecentBlockProcessingTimestamps removes timestamps older than syncRateWindowDuration +// from dag.recentBlockProcessingTimestamps +// +// This function MUST be called with the DAG state lock held (for writes). +func (dag *BlockDAG) removeNonRecentTimestampsFromRecentBlockProcessingTimestamps() { + dag.recentBlockProcessingTimestamps = dag.recentBlockProcessingTimestampsRelevantWindow() +} + +func (dag *BlockDAG) recentBlockProcessingTimestampsRelevantWindow() []time.Time { + minTime := time.Now().Add(-syncRateWindowDuration) + windowStartIndex := len(dag.recentBlockProcessingTimestamps) + for i, processTime := range dag.recentBlockProcessingTimestamps { + if processTime.After(minTime) { + windowStartIndex = i + break + } + } + return dag.recentBlockProcessingTimestamps[windowStartIndex:] +} + +// syncRate returns the rate of processed +// blocks in the last syncRateWindowDuration +// duration. +func (dag *BlockDAG) syncRate() float64 { + dag.RLock() + defer dag.RUnlock() + return float64(len(dag.recentBlockProcessingTimestampsRelevantWindow())) / syncRateWindowDuration.Seconds() +} + +// IsSyncRateBelowThreshold checks whether the sync rate +// is below the expected threshold. +func (dag *BlockDAG) IsSyncRateBelowThreshold(maxDeviation float64) bool { + if dag.uptime() < syncRateWindowDuration { + return false + } + + return dag.syncRate() < 1/dag.dagParams.TargetTimePerBlock.Seconds()*maxDeviation +} + +func (dag *BlockDAG) uptime() time.Duration { + return time.Now().Sub(dag.startTime) +} diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go index 05832eb64..8a41757b8 100644 --- a/server/p2p/p2p.go +++ b/server/p2p/p2p.go @@ -780,10 +780,6 @@ type getConnCountMsg struct { reply chan int32 } -type getShouldMineOnGenesisMsg struct { - reply chan bool -} - //GetPeersMsg is the message type which is used by the rpc server to get the peers list from the p2p server type GetPeersMsg struct { Reply chan []*Peer @@ -832,17 +828,6 @@ func (s *Server) handleQuery(state *peerState, querymsg interface{}) { }) msg.reply <- nconnected - case getShouldMineOnGenesisMsg: - shouldMineOnGenesis := true - if state.Count() != 0 { - shouldMineOnGenesis = state.forAllPeers(func(sp *Peer) bool { - return sp.SelectedTipHash().IsEqual(s.DAGParams.GenesisHash) - }) - } else { - shouldMineOnGenesis = false - } - msg.reply <- shouldMineOnGenesis - case GetPeersMsg: peers := make([]*Peer, 0, state.Count()) state.forAllPeers(func(sp *Peer) bool { @@ -1241,17 +1226,6 @@ func (s *Server) ConnectedCount() int32 { return <-replyChan } -// ShouldMineOnGenesis checks if the node is connected to at least one -// peer, and at least one of its peers knows of any blocks that were mined -// on top of the genesis block. -func (s *Server) ShouldMineOnGenesis() bool { - replyChan := make(chan bool) - - s.Query <- getShouldMineOnGenesisMsg{reply: replyChan} - - return <-replyChan -} - // OutboundGroupCount returns the number of peers connected to the given // outbound group key. func (s *Server) OutboundGroupCount(key string) int { diff --git a/server/rpc/handle_get_block_template.go b/server/rpc/handle_get_block_template.go index 7248f0fcb..7e9a7da3a 100644 --- a/server/rpc/handle_get_block_template.go +++ b/server/rpc/handle_get_block_template.go @@ -109,9 +109,7 @@ func handleGetBlockTemplate(s *Server, cmd interface{}, closeChan <-chan struct{ // the DAG is synced. Note that we make a special check for when // we have nothing besides the genesis block (blueScore == 0), // because in that state IsCurrent may still return true. - currentBlueScore := s.cfg.DAG.SelectedTipBlueScore() - if (currentBlueScore != 0 && !s.cfg.DAG.IsCurrent()) || - (currentBlueScore == 0 && !s.cfg.shouldMineOnGenesis()) { + if !isSyncedForMining(s) { return nil, &rpcmodel.RPCError{ Code: rpcmodel.ErrRPCClientInInitialDownload, Message: "Kaspa is downloading blocks...", @@ -131,6 +129,24 @@ func handleGetBlockTemplate(s *Server, cmd interface{}, closeChan <-chan struct{ } } +// isSyncedForMining checks if the node is synced enough for mining blocks +// on top of its world view. +// To do that, first it checks if the selected tip timestamp is not older than maxTipAge. If that's the case, it means +// the node is synced since blocks' timestamps are not allowed to deviate too much into the future. +// If that's not the case it checks the rate it added new blocks to the DAG recently. If it's faster than +// blockRate * maxSyncRateDeviation it means the node is not synced, since when the node is synced it shouldn't add +// blocks to the DAG faster than the block rate. +func isSyncedForMining(s *Server) bool { + const maxTipAge = 5 * time.Minute + isCloseToCurrentTime := s.cfg.DAG.Now().Sub(s.cfg.DAG.SelectedTipHeader().Timestamp) <= maxTipAge + if isCloseToCurrentTime { + return true + } + + const maxSyncRateDeviation = 1.05 + return s.cfg.DAG.IsSyncRateBelowThreshold(maxSyncRateDeviation) +} + // handleGetBlockTemplateRequest is a helper for handleGetBlockTemplate which // deals with generating and returning block templates to the caller. It // handles both long poll requests as specified by BIP 0022 as well as regular diff --git a/server/rpc/rpcserver.go b/server/rpc/rpcserver.go index da7c14705..90cb640a1 100644 --- a/server/rpc/rpcserver.go +++ b/server/rpc/rpcserver.go @@ -783,8 +783,6 @@ type rpcserverConfig struct { // These fields define any optional indexes the RPC server can make use // of to provide additional data when queried. AcceptanceIndex *indexers.AcceptanceIndex - - shouldMineOnGenesis func() bool } // setupRPCListeners returns a slice of listeners that are configured for use @@ -853,17 +851,16 @@ func NewRPCServer( return nil, errors.New("RPCS: No valid listen address") } cfg := &rpcserverConfig{ - Listeners: rpcListeners, - StartupTime: startupTime, - ConnMgr: &rpcConnManager{p2pServer}, - SyncMgr: &rpcSyncMgr{p2pServer, p2pServer.SyncManager}, - TimeSource: p2pServer.TimeSource, - DAGParams: p2pServer.DAGParams, - TxMemPool: p2pServer.TxMemPool, - Generator: blockTemplateGenerator, - AcceptanceIndex: p2pServer.AcceptanceIndex, - DAG: p2pServer.DAG, - shouldMineOnGenesis: p2pServer.ShouldMineOnGenesis, + Listeners: rpcListeners, + StartupTime: startupTime, + ConnMgr: &rpcConnManager{p2pServer}, + SyncMgr: &rpcSyncMgr{p2pServer, p2pServer.SyncManager}, + TimeSource: p2pServer.TimeSource, + DAGParams: p2pServer.DAGParams, + TxMemPool: p2pServer.TxMemPool, + Generator: blockTemplateGenerator, + AcceptanceIndex: p2pServer.AcceptanceIndex, + DAG: p2pServer.DAG, } rpc := Server{ cfg: *cfg,