From 73ad0adf72e67cc6d0bb8a1e2910e901b51dae21 Mon Sep 17 00:00:00 2001
From: Ori Newman <orinewman1@gmail.com>
Date: Mon, 4 May 2020 09:09:23 +0300
Subject: [PATCH] [NOD-913] Use sync rate in getBlockTemplate (#705)

* [NOD-913] Use sync rate in getBlockTemplate

* [NOD-913] Rename addBlockProcessTime->addBlockProcessTimestamp, maxDiff->maxTipAge

* [NOD-913] Pass maxDeviation as an argument

* [NOD-913] Change maxDeviation to +5%

* [NOD-913] Rename variables

* [NOD-913] Rename variables and functions and change comments

* [NOD-913] Split addBlockProcessingTimestamp
---
 blockdag/dag.go                         |  4 ++
 blockdag/process.go                     |  2 +
 blockdag/sync_rate.go                   | 57 +++++++++++++++++++++++++
 server/p2p/p2p.go                       | 26 -----------
 server/rpc/handle_get_block_template.go | 22 ++++++++--
 server/rpc/rpcserver.go                 | 23 +++++-----
 6 files changed, 92 insertions(+), 42 deletions(-)
 create mode 100644 blockdag/sync_rate.go

diff --git a/blockdag/dag.go b/blockdag/dag.go
index ee117f3b7..ad1bb4182 100644
--- a/blockdag/dag.go
+++ b/blockdag/dag.go
@@ -153,6 +153,9 @@ type BlockDAG struct {
 	utxoDiffStore     *utxoDiffStore
 	reachabilityStore *reachabilityStore
 	multisetStore     *multisetStore
+
+	recentBlockProcessingTimestamps []time.Time
+	startTime                       time.Time
 }
 
 // IsKnownBlock returns whether or not the DAG instance has the block represented
@@ -2092,6 +2095,7 @@ func New(config *Config) (*BlockDAG, error) {
 		deploymentCaches:               newThresholdCaches(dagconfig.DefinedDeployments),
 		blockCount:                     0,
 		subnetworkID:                   config.SubnetworkID,
+		startTime:                      time.Now(),
 	}
 
 	dag.virtual = newVirtualBlock(dag, nil)
diff --git a/blockdag/process.go b/blockdag/process.go
index 8af27846b..437b6f6c6 100644
--- a/blockdag/process.go
+++ b/blockdag/process.go
@@ -253,6 +253,8 @@ func (dag *BlockDAG) processBlockNoLock(block *util.Block, flags BehaviorFlags)
 		}
 	}
 
+	dag.addBlockProcessingTimestamp()
+
 	log.Debugf("Accepted block %s", blockHash)
 
 	return false, false, nil
diff --git a/blockdag/sync_rate.go b/blockdag/sync_rate.go
new file mode 100644
index 000000000..0ab955621
--- /dev/null
+++ b/blockdag/sync_rate.go
@@ -0,0 +1,57 @@
+package blockdag
+
+import "time"
+
+const syncRateWindowDuration = 15 * time.Minute
+
+// addBlockProcessingTimestamp adds the last block processing timestamp in order to measure the recent sync rate.
+//
+// This function MUST be called with the DAG state lock held (for writes).
+func (dag *BlockDAG) addBlockProcessingTimestamp() {
+	now := time.Now()
+	dag.recentBlockProcessingTimestamps = append(dag.recentBlockProcessingTimestamps, now)
+	dag.removeNonRecentTimestampsFromRecentBlockProcessingTimestamps()
+}
+
+// removeNonRecentTimestampsFromRecentBlockProcessingTimestamps removes timestamps older than syncRateWindowDuration
+// from dag.recentBlockProcessingTimestamps
+//
+// This function MUST be called with the DAG state lock held (for writes).
+func (dag *BlockDAG) removeNonRecentTimestampsFromRecentBlockProcessingTimestamps() {
+	dag.recentBlockProcessingTimestamps = dag.recentBlockProcessingTimestampsRelevantWindow()
+}
+
+func (dag *BlockDAG) recentBlockProcessingTimestampsRelevantWindow() []time.Time {
+	minTime := time.Now().Add(-syncRateWindowDuration)
+	windowStartIndex := len(dag.recentBlockProcessingTimestamps)
+	for i, processTime := range dag.recentBlockProcessingTimestamps {
+		if processTime.After(minTime) {
+			windowStartIndex = i
+			break
+		}
+	}
+	return dag.recentBlockProcessingTimestamps[windowStartIndex:]
+}
+
+// syncRate returns the rate of processed
+// blocks in the last syncRateWindowDuration
+// duration.
+func (dag *BlockDAG) syncRate() float64 {
+	dag.RLock()
+	defer dag.RUnlock()
+	return float64(len(dag.recentBlockProcessingTimestampsRelevantWindow())) / syncRateWindowDuration.Seconds()
+}
+
+// IsSyncRateBelowThreshold checks whether the sync rate
+// is below the expected threshold.
+func (dag *BlockDAG) IsSyncRateBelowThreshold(maxDeviation float64) bool {
+	if dag.uptime() < syncRateWindowDuration {
+		return false
+	}
+
+	return dag.syncRate() < 1/dag.dagParams.TargetTimePerBlock.Seconds()*maxDeviation
+}
+
+func (dag *BlockDAG) uptime() time.Duration {
+	return time.Now().Sub(dag.startTime)
+}
diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go
index 05832eb64..8a41757b8 100644
--- a/server/p2p/p2p.go
+++ b/server/p2p/p2p.go
@@ -780,10 +780,6 @@ type getConnCountMsg struct {
 	reply chan int32
 }
 
-type getShouldMineOnGenesisMsg struct {
-	reply chan bool
-}
-
 //GetPeersMsg is the message type which is used by the rpc server to get the peers list from the p2p server
 type GetPeersMsg struct {
 	Reply chan []*Peer
@@ -832,17 +828,6 @@ func (s *Server) handleQuery(state *peerState, querymsg interface{}) {
 		})
 		msg.reply <- nconnected
 
-	case getShouldMineOnGenesisMsg:
-		shouldMineOnGenesis := true
-		if state.Count() != 0 {
-			shouldMineOnGenesis = state.forAllPeers(func(sp *Peer) bool {
-				return sp.SelectedTipHash().IsEqual(s.DAGParams.GenesisHash)
-			})
-		} else {
-			shouldMineOnGenesis = false
-		}
-		msg.reply <- shouldMineOnGenesis
-
 	case GetPeersMsg:
 		peers := make([]*Peer, 0, state.Count())
 		state.forAllPeers(func(sp *Peer) bool {
@@ -1241,17 +1226,6 @@ func (s *Server) ConnectedCount() int32 {
 	return <-replyChan
 }
 
-// ShouldMineOnGenesis checks if the node is connected to at least one
-// peer, and at least one of its peers knows of any blocks that were mined
-// on top of the genesis block.
-func (s *Server) ShouldMineOnGenesis() bool {
-	replyChan := make(chan bool)
-
-	s.Query <- getShouldMineOnGenesisMsg{reply: replyChan}
-
-	return <-replyChan
-}
-
 // OutboundGroupCount returns the number of peers connected to the given
 // outbound group key.
 func (s *Server) OutboundGroupCount(key string) int {
diff --git a/server/rpc/handle_get_block_template.go b/server/rpc/handle_get_block_template.go
index 7248f0fcb..7e9a7da3a 100644
--- a/server/rpc/handle_get_block_template.go
+++ b/server/rpc/handle_get_block_template.go
@@ -109,9 +109,7 @@ func handleGetBlockTemplate(s *Server, cmd interface{}, closeChan <-chan struct{
 	// the DAG is synced. Note that we make a special check for when
 	// we have nothing besides the genesis block (blueScore == 0),
 	// because in that state IsCurrent may still return true.
-	currentBlueScore := s.cfg.DAG.SelectedTipBlueScore()
-	if (currentBlueScore != 0 && !s.cfg.DAG.IsCurrent()) ||
-		(currentBlueScore == 0 && !s.cfg.shouldMineOnGenesis()) {
+	if !isSyncedForMining(s) {
 		return nil, &rpcmodel.RPCError{
 			Code:    rpcmodel.ErrRPCClientInInitialDownload,
 			Message: "Kaspa is downloading blocks...",
@@ -131,6 +129,24 @@ func handleGetBlockTemplate(s *Server, cmd interface{}, closeChan <-chan struct{
 	}
 }
 
+// isSyncedForMining checks if the node is synced enough for mining blocks
+// on top of its world view.
+// To do that, first it checks if the selected tip timestamp is not older than maxTipAge. If that's the case, it means
+// the node is synced since blocks' timestamps are not allowed to deviate too much into the future.
+// If that's not the case it checks the rate it added new blocks to the DAG recently. If it's faster than
+// blockRate * maxSyncRateDeviation it means the node is not synced, since when the node is synced it shouldn't add
+// blocks to the DAG faster than the block rate.
+func isSyncedForMining(s *Server) bool {
+	const maxTipAge = 5 * time.Minute
+	isCloseToCurrentTime := s.cfg.DAG.Now().Sub(s.cfg.DAG.SelectedTipHeader().Timestamp) <= maxTipAge
+	if isCloseToCurrentTime {
+		return true
+	}
+
+	const maxSyncRateDeviation = 1.05
+	return s.cfg.DAG.IsSyncRateBelowThreshold(maxSyncRateDeviation)
+}
+
 // handleGetBlockTemplateRequest is a helper for handleGetBlockTemplate which
 // deals with generating and returning block templates to the caller. It
 // handles both long poll requests as specified by BIP 0022 as well as regular
diff --git a/server/rpc/rpcserver.go b/server/rpc/rpcserver.go
index da7c14705..90cb640a1 100644
--- a/server/rpc/rpcserver.go
+++ b/server/rpc/rpcserver.go
@@ -783,8 +783,6 @@ type rpcserverConfig struct {
 	// These fields define any optional indexes the RPC server can make use
 	// of to provide additional data when queried.
 	AcceptanceIndex *indexers.AcceptanceIndex
-
-	shouldMineOnGenesis func() bool
 }
 
 // setupRPCListeners returns a slice of listeners that are configured for use
@@ -853,17 +851,16 @@ func NewRPCServer(
 		return nil, errors.New("RPCS: No valid listen address")
 	}
 	cfg := &rpcserverConfig{
-		Listeners:           rpcListeners,
-		StartupTime:         startupTime,
-		ConnMgr:             &rpcConnManager{p2pServer},
-		SyncMgr:             &rpcSyncMgr{p2pServer, p2pServer.SyncManager},
-		TimeSource:          p2pServer.TimeSource,
-		DAGParams:           p2pServer.DAGParams,
-		TxMemPool:           p2pServer.TxMemPool,
-		Generator:           blockTemplateGenerator,
-		AcceptanceIndex:     p2pServer.AcceptanceIndex,
-		DAG:                 p2pServer.DAG,
-		shouldMineOnGenesis: p2pServer.ShouldMineOnGenesis,
+		Listeners:       rpcListeners,
+		StartupTime:     startupTime,
+		ConnMgr:         &rpcConnManager{p2pServer},
+		SyncMgr:         &rpcSyncMgr{p2pServer, p2pServer.SyncManager},
+		TimeSource:      p2pServer.TimeSource,
+		DAGParams:       p2pServer.DAGParams,
+		TxMemPool:       p2pServer.TxMemPool,
+		Generator:       blockTemplateGenerator,
+		AcceptanceIndex: p2pServer.AcceptanceIndex,
+		DAG:             p2pServer.DAG,
 	}
 	rpc := Server{
 		cfg:                    *cfg,