mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-03-30 15:08:33 +00:00

* [NOD-806] After panic, gracefully stop logs, and then exit immediately * [NOD-806] Convert non-kaspad applications to use the new spawn * [NOD-806] Fix disabled log at rpcclient * [NOD-806] Refactor HandlePanic * [NOD-806] Cancel Logger interface * [NOD-806] Remove redundant spawn checks from waitgroup_test.go * [NOD-806] Use caller subsystem when logging panics * [NOD-806] Fix go vet errors
1173 lines
36 KiB
Go
1173 lines
36 KiB
Go
// Copyright (c) 2013-2017 The btcsuite developers
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package netsync
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/kaspanet/kaspad/blockdag"
|
|
"github.com/kaspanet/kaspad/dagconfig"
|
|
"github.com/kaspanet/kaspad/database"
|
|
"github.com/kaspanet/kaspad/mempool"
|
|
peerpkg "github.com/kaspanet/kaspad/peer"
|
|
"github.com/kaspanet/kaspad/util"
|
|
"github.com/kaspanet/kaspad/util/daghash"
|
|
"github.com/kaspanet/kaspad/wire"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const (
|
|
// maxRejectedTxns is the maximum number of rejected transactions
|
|
// hashes to store in memory.
|
|
maxRejectedTxns = 1000
|
|
|
|
// maxRequestedBlocks is the maximum number of requested block
|
|
// hashes to store in memory.
|
|
maxRequestedBlocks = wire.MaxInvPerMsg
|
|
|
|
// maxRequestedTxns is the maximum number of requested transactions
|
|
// hashes to store in memory.
|
|
maxRequestedTxns = wire.MaxInvPerMsg
|
|
|
|
minGetSelectedTipInterval = time.Minute
|
|
|
|
minDAGTimeDelay = time.Minute
|
|
)
|
|
|
|
// newPeerMsg signifies a newly connected peer to the block handler.
|
|
type newPeerMsg struct {
|
|
peer *peerpkg.Peer
|
|
}
|
|
|
|
// blockMsg packages a kaspa block message and the peer it came from together
|
|
// so the block handler has access to that information.
|
|
type blockMsg struct {
|
|
block *util.Block
|
|
peer *peerpkg.Peer
|
|
isDelayedBlock bool
|
|
reply chan struct{}
|
|
}
|
|
|
|
// invMsg packages a kaspa inv message and the peer it came from together
|
|
// so the block handler has access to that information.
|
|
type invMsg struct {
|
|
inv *wire.MsgInv
|
|
peer *peerpkg.Peer
|
|
}
|
|
|
|
// donePeerMsg signifies a newly disconnected peer to the block handler.
|
|
type donePeerMsg struct {
|
|
peer *peerpkg.Peer
|
|
}
|
|
|
|
// removeFromSyncCandidatesMsg signifies to remove the given peer
|
|
// from the sync candidates.
|
|
type removeFromSyncCandidatesMsg struct {
|
|
peer *peerpkg.Peer
|
|
}
|
|
|
|
// txMsg packages a kaspa tx message and the peer it came from together
|
|
// so the block handler has access to that information.
|
|
type txMsg struct {
|
|
tx *util.Tx
|
|
peer *peerpkg.Peer
|
|
reply chan struct{}
|
|
}
|
|
|
|
// getSyncPeerMsg is a message type to be sent across the message channel for
|
|
// retrieving the current sync peer.
|
|
type getSyncPeerMsg struct {
|
|
reply chan int32
|
|
}
|
|
|
|
// processBlockResponse is a response sent to the reply channel of a
|
|
// processBlockMsg.
|
|
type processBlockResponse struct {
|
|
isOrphan bool
|
|
err error
|
|
}
|
|
|
|
// processBlockMsg is a message type to be sent across the message channel
|
|
// for requested a block is processed. Note this call differs from blockMsg
|
|
// above in that blockMsg is intended for blocks that came from peers and have
|
|
// extra handling whereas this message essentially is just a concurrent safe
|
|
// way to call ProcessBlock on the internal block DAG instance.
|
|
type processBlockMsg struct {
|
|
block *util.Block
|
|
flags blockdag.BehaviorFlags
|
|
reply chan processBlockResponse
|
|
}
|
|
|
|
// isCurrentMsg is a message type to be sent across the message channel for
|
|
// requesting whether or not the sync manager believes it is synced with the
|
|
// currently connected peers.
|
|
type isCurrentMsg struct {
|
|
reply chan bool
|
|
}
|
|
|
|
// pauseMsg is a message type to be sent across the message channel for
|
|
// pausing the sync manager. This effectively provides the caller with
|
|
// exclusive access over the manager until a receive is performed on the
|
|
// unpause channel.
|
|
type pauseMsg struct {
|
|
unpause <-chan struct{}
|
|
}
|
|
|
|
type selectedTipMsg struct {
|
|
selectedTipHash *daghash.Hash
|
|
peer *peerpkg.Peer
|
|
reply chan struct{}
|
|
}
|
|
|
|
type requestQueueAndSet struct {
|
|
queue []*wire.InvVect
|
|
set map[daghash.Hash]struct{}
|
|
}
|
|
|
|
// peerSyncState stores additional information that the SyncManager tracks
|
|
// about a peer.
|
|
type peerSyncState struct {
|
|
syncCandidate bool
|
|
lastSelectedTipRequest time.Time
|
|
isPendingForSelectedTip bool
|
|
requestQueueMtx sync.Mutex
|
|
requestQueues map[wire.InvType]*requestQueueAndSet
|
|
requestedTxns map[daghash.TxID]struct{}
|
|
requestedBlocks map[daghash.Hash]struct{}
|
|
}
|
|
|
|
// SyncManager is used to communicate block related messages with peers. The
|
|
// SyncManager is started as by executing Start() in a goroutine. Once started,
|
|
// it selects peers to sync from and starts the initial block download. Once the
|
|
// DAG is in sync, the SyncManager handles incoming block and header
|
|
// notifications and relays announcements of new blocks to peers.
|
|
type SyncManager struct {
|
|
peerNotifier PeerNotifier
|
|
started int32
|
|
shutdown int32
|
|
dag *blockdag.BlockDAG
|
|
txMemPool *mempool.TxPool
|
|
dagParams *dagconfig.Params
|
|
progressLogger *blockProgressLogger
|
|
msgChan chan interface{}
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
|
|
// These fields should only be accessed from the messageHandler thread
|
|
rejectedTxns map[daghash.TxID]struct{}
|
|
requestedTxns map[daghash.TxID]struct{}
|
|
requestedBlocks map[daghash.Hash]struct{}
|
|
syncPeer *peerpkg.Peer
|
|
peerStates map[*peerpkg.Peer]*peerSyncState
|
|
}
|
|
|
|
// startSync will choose the sync peer among the available candidate peers to
|
|
// download/sync the blockDAG from. When syncing is already running, it
|
|
// simply returns. It also examines the candidates for any which are no longer
|
|
// candidates and removes them as needed.
|
|
func (sm *SyncManager) startSync() {
|
|
// Return now if we're already syncing.
|
|
if sm.syncPeer != nil {
|
|
return
|
|
}
|
|
|
|
var syncPeer *peerpkg.Peer
|
|
for peer, state := range sm.peerStates {
|
|
if !state.syncCandidate {
|
|
continue
|
|
}
|
|
|
|
if !peer.IsSelectedTipKnown() {
|
|
continue
|
|
}
|
|
|
|
// TODO(davec): Use a better algorithm to choose the sync peer.
|
|
// For now, just pick the first available candidate.
|
|
syncPeer = peer
|
|
}
|
|
|
|
// Start syncing from the sync peer if one was selected.
|
|
if syncPeer != nil {
|
|
// Clear the requestedBlocks if the sync peer changes, otherwise
|
|
// we may ignore blocks we need that the last sync peer failed
|
|
// to send.
|
|
sm.requestedBlocks = make(map[daghash.Hash]struct{})
|
|
|
|
log.Infof("Syncing to block %s from peer %s",
|
|
syncPeer.SelectedTipHash(), syncPeer.Addr())
|
|
|
|
syncPeer.PushGetBlockLocatorMsg(syncPeer.SelectedTipHash(), sm.dagParams.GenesisHash)
|
|
sm.syncPeer = syncPeer
|
|
return
|
|
}
|
|
|
|
if sm.shouldQueryPeerSelectedTips() {
|
|
hasSyncCandidates := false
|
|
for peer, state := range sm.peerStates {
|
|
if !state.syncCandidate {
|
|
continue
|
|
}
|
|
hasSyncCandidates = true
|
|
|
|
if time.Since(state.lastSelectedTipRequest) < minGetSelectedTipInterval {
|
|
continue
|
|
}
|
|
|
|
queueMsgGetSelectedTip(peer, state)
|
|
}
|
|
if !hasSyncCandidates {
|
|
log.Warnf("No sync peer candidates available")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sm *SyncManager) shouldQueryPeerSelectedTips() bool {
|
|
return sm.dag.AdjustedTime().Sub(sm.dag.CalcPastMedianTime()) > minDAGTimeDelay
|
|
}
|
|
|
|
func queueMsgGetSelectedTip(peer *peerpkg.Peer, state *peerSyncState) {
|
|
state.lastSelectedTipRequest = time.Now()
|
|
state.isPendingForSelectedTip = true
|
|
peer.QueueMessage(wire.NewMsgGetSelectedTip(), nil)
|
|
}
|
|
|
|
// isSyncCandidate returns whether or not the peer is a candidate to consider
|
|
// syncing from.
|
|
func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
|
|
// Typically a peer is not a candidate for sync if it's not a full node,
|
|
// however regression test is special in that the regression tool is
|
|
// not a full node and still needs to be considered a sync candidate.
|
|
if sm.dagParams == &dagconfig.RegressionNetParams {
|
|
// The peer is not a candidate if it's not coming from localhost
|
|
// or the hostname can't be determined for some reason.
|
|
host, _, err := net.SplitHostPort(peer.Addr())
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if host != "127.0.0.1" && host != "localhost" {
|
|
return false
|
|
}
|
|
} else {
|
|
// The peer is not a candidate for sync if it's not a full
|
|
// node.
|
|
nodeServices := peer.Services()
|
|
if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Candidate if all checks passed.
|
|
return true
|
|
}
|
|
|
|
// handleNewPeerMsg deals with new peers that have signalled they may
|
|
// be considered as a sync peer (they have already successfully negotiated). It
|
|
// also starts syncing if needed. It is invoked from the syncHandler goroutine.
|
|
func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
|
|
// Ignore if in the process of shutting down.
|
|
if atomic.LoadInt32(&sm.shutdown) != 0 {
|
|
return
|
|
}
|
|
|
|
log.Infof("New valid peer %s (%s)", peer, peer.UserAgent())
|
|
|
|
// Initialize the peer state
|
|
isSyncCandidate := sm.isSyncCandidate(peer)
|
|
requestQueues := make(map[wire.InvType]*requestQueueAndSet)
|
|
requestQueueInvTypes := []wire.InvType{wire.InvTypeTx, wire.InvTypeBlock, wire.InvTypeSyncBlock}
|
|
for _, invType := range requestQueueInvTypes {
|
|
requestQueues[invType] = &requestQueueAndSet{
|
|
set: make(map[daghash.Hash]struct{}),
|
|
}
|
|
}
|
|
sm.peerStates[peer] = &peerSyncState{
|
|
syncCandidate: isSyncCandidate,
|
|
requestedTxns: make(map[daghash.TxID]struct{}),
|
|
requestedBlocks: make(map[daghash.Hash]struct{}),
|
|
requestQueues: requestQueues,
|
|
}
|
|
|
|
// Start syncing by choosing the best candidate if needed.
|
|
if isSyncCandidate && sm.syncPeer == nil {
|
|
sm.startSync()
|
|
}
|
|
}
|
|
|
|
// handleDonePeerMsg deals with peers that have signalled they are done. It
|
|
// removes the peer as a candidate for syncing and in the case where it was
|
|
// the current sync peer, attempts to select a new best peer to sync from. It
|
|
// is invoked from the syncHandler goroutine.
|
|
func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
|
|
state, exists := sm.peerStates[peer]
|
|
if !exists {
|
|
log.Warnf("Received done peer message for unknown peer %s", peer)
|
|
return
|
|
}
|
|
|
|
// Remove the peer from the list of candidate peers.
|
|
delete(sm.peerStates, peer)
|
|
|
|
log.Infof("Lost peer %s", peer)
|
|
|
|
// Remove requested transactions from the global map so that they will
|
|
// be fetched from elsewhere next time we get an inv.
|
|
for txHash := range state.requestedTxns {
|
|
delete(sm.requestedTxns, txHash)
|
|
}
|
|
|
|
// Remove requested blocks from the global map so that they will be
|
|
// fetched from elsewhere next time we get an inv.
|
|
// TODO: we could possibly here check which peers have these blocks
|
|
// and request them now to speed things up a little.
|
|
for blockHash := range state.requestedBlocks {
|
|
delete(sm.requestedBlocks, blockHash)
|
|
}
|
|
|
|
sm.stopSyncFromPeer(peer)
|
|
}
|
|
|
|
func (sm *SyncManager) stopSyncFromPeer(peer *peerpkg.Peer) {
|
|
// Attempt to find a new peer to sync from if the quitting peer is the
|
|
// sync peer.
|
|
if sm.syncPeer == peer {
|
|
sm.syncPeer = nil
|
|
sm.startSync()
|
|
}
|
|
}
|
|
|
|
func (sm *SyncManager) handleRemoveFromSyncCandidatesMsg(peer *peerpkg.Peer) {
|
|
sm.peerStates[peer].syncCandidate = false
|
|
sm.stopSyncFromPeer(peer)
|
|
}
|
|
|
|
// handleTxMsg handles transaction messages from all peers.
|
|
func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
|
|
peer := tmsg.peer
|
|
state, exists := sm.peerStates[peer]
|
|
if !exists {
|
|
log.Warnf("Received tx message from unknown peer %s", peer)
|
|
return
|
|
}
|
|
|
|
// If we didn't ask for this transaction then the peer is misbehaving.
|
|
txID := tmsg.tx.ID()
|
|
if _, exists = state.requestedTxns[*txID]; !exists {
|
|
log.Warnf("Got unrequested transaction %s from %s -- "+
|
|
"disconnecting", txID, peer.Addr())
|
|
peer.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Ignore transactions that we have already rejected. Do not
|
|
// send a reject message here because if the transaction was already
|
|
// rejected, the transaction was unsolicited.
|
|
if _, exists = sm.rejectedTxns[*txID]; exists {
|
|
log.Debugf("Ignoring unsolicited previously rejected "+
|
|
"transaction %s from %s", txID, peer)
|
|
return
|
|
}
|
|
|
|
// Process the transaction to include validation, insertion in the
|
|
// memory pool, orphan handling, etc.
|
|
acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx,
|
|
true, mempool.Tag(peer.ID()))
|
|
|
|
// Remove transaction from request maps. Either the mempool/DAG
|
|
// already knows about it and as such we shouldn't have any more
|
|
// instances of trying to fetch it, or we failed to insert and thus
|
|
// we'll retry next time we get an inv.
|
|
delete(state.requestedTxns, *txID)
|
|
delete(sm.requestedTxns, *txID)
|
|
|
|
if err != nil {
|
|
// Do not request this transaction again until a new block
|
|
// has been processed.
|
|
sm.rejectedTxns[*txID] = struct{}{}
|
|
sm.limitTxIDMap(sm.rejectedTxns, maxRejectedTxns)
|
|
|
|
// When the error is a rule error, it means the transaction was
|
|
// simply rejected as opposed to something actually going wrong,
|
|
// so log it as such. Otherwise, something really did go wrong,
|
|
// so log it as an actual error.
|
|
if errors.As(err, &mempool.RuleError{}) {
|
|
log.Debugf("Rejected transaction %s from %s: %s",
|
|
txID, peer, err)
|
|
} else {
|
|
log.Errorf("Failed to process transaction %s: %s",
|
|
txID, err)
|
|
}
|
|
|
|
// Convert the error into an appropriate reject message and
|
|
// send it.
|
|
code, reason := mempool.ErrToRejectErr(err)
|
|
peer.PushRejectMsg(wire.CmdTx, code, reason, (*daghash.Hash)(txID), false)
|
|
return
|
|
}
|
|
|
|
sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
|
|
}
|
|
|
|
// current returns true if we believe we are synced with our peers, false if we
|
|
// still have blocks to check
|
|
//
|
|
// We consider ourselves current iff both of the following are true:
|
|
// 1. there's no syncPeer, a.k.a. all connected peers are at the same tip
|
|
// 2. the DAG considers itself current - to prevent attacks where a peer sends an
|
|
// unknown tip but never lets us sync to it.
|
|
func (sm *SyncManager) current() bool {
|
|
return sm.syncPeer == nil && sm.dag.IsCurrent()
|
|
}
|
|
|
|
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any
|
|
// blocks from the current one.
|
|
func (sm *SyncManager) restartSyncIfNeeded() {
|
|
if sm.syncPeer != nil {
|
|
syncPeerState, exists := sm.peerStates[sm.syncPeer]
|
|
if exists {
|
|
isWaitingForBlocks := func() bool {
|
|
syncPeerState.requestQueueMtx.Lock()
|
|
defer syncPeerState.requestQueueMtx.Unlock()
|
|
return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0
|
|
}()
|
|
if isWaitingForBlocks {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
sm.syncPeer = nil
|
|
sm.startSync()
|
|
}
|
|
|
|
// handleBlockMsg handles block messages from all peers.
|
|
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
|
|
peer := bmsg.peer
|
|
state, exists := sm.peerStates[peer]
|
|
if !exists {
|
|
log.Warnf("Received block message from unknown peer %s", peer)
|
|
return
|
|
}
|
|
|
|
// If we didn't ask for this block then the peer is misbehaving.
|
|
blockHash := bmsg.block.Hash()
|
|
if _, exists = state.requestedBlocks[*blockHash]; !exists {
|
|
// The regression test intentionally sends some blocks twice
|
|
// to test duplicate block insertion fails. Don't disconnect
|
|
// the peer or ignore the block when we're in regression test
|
|
// mode in this case so the DAG code is actually fed the
|
|
// duplicate blocks.
|
|
if sm.dagParams != &dagconfig.RegressionNetParams {
|
|
log.Warnf("Got unrequested block %s from %s -- "+
|
|
"disconnecting", blockHash, peer.Addr())
|
|
peer.Disconnect()
|
|
return
|
|
}
|
|
}
|
|
|
|
behaviorFlags := blockdag.BFNone
|
|
if bmsg.isDelayedBlock {
|
|
behaviorFlags |= blockdag.BFAfterDelay
|
|
}
|
|
if bmsg.peer == sm.syncPeer {
|
|
behaviorFlags |= blockdag.BFIsSync
|
|
}
|
|
|
|
// Process the block to include validation, orphan handling, etc.
|
|
isOrphan, isDelayed, err := sm.dag.ProcessBlock(bmsg.block, behaviorFlags)
|
|
|
|
// Remove block from request maps. Either DAG knows about it and
|
|
// so we shouldn't have any more instances of trying to fetch it, or
|
|
// the insertion fails and thus we'll retry next time we get an inv.
|
|
delete(state.requestedBlocks, *blockHash)
|
|
delete(sm.requestedBlocks, *blockHash)
|
|
|
|
sm.restartSyncIfNeeded()
|
|
|
|
if err != nil {
|
|
// When the error is a rule error, it means the block was simply
|
|
// rejected as opposed to something actually going wrong, so log
|
|
// it as such. Otherwise, something really did go wrong, so log
|
|
// it as an actual error.
|
|
if errors.As(err, &blockdag.RuleError{}) {
|
|
log.Infof("Rejected block %s from %s: %s", blockHash,
|
|
peer, err)
|
|
} else {
|
|
log.Errorf("Failed to process block %s: %s",
|
|
blockHash, err)
|
|
}
|
|
var dbErr database.Error
|
|
if ok := errors.As(err, &dbErr); ok && dbErr.ErrorCode ==
|
|
database.ErrCorruption {
|
|
panic(dbErr)
|
|
}
|
|
|
|
// Convert the error into an appropriate reject message and
|
|
// send it.
|
|
code, reason := mempool.ErrToRejectErr(err)
|
|
peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
|
|
|
|
// Disconnect from the misbehaving peer.
|
|
peer.Disconnect()
|
|
return
|
|
}
|
|
|
|
if isDelayed {
|
|
return
|
|
}
|
|
|
|
// Request the parents for the orphan block from the peer that sent it.
|
|
if isOrphan {
|
|
missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(blockHash)
|
|
if err != nil {
|
|
log.Errorf("Failed to find missing ancestors for block %s: %s",
|
|
blockHash, err)
|
|
return
|
|
}
|
|
sm.addBlocksToRequestQueue(state, missingAncestors, false)
|
|
} else {
|
|
// When the block is not an orphan, log information about it and
|
|
// update the DAG state.
|
|
blockBlueScore, err := sm.dag.BlueScoreByBlockHash(blockHash)
|
|
if err != nil {
|
|
log.Errorf("Failed to get blue score for block %s: %s", blockHash, err)
|
|
}
|
|
sm.progressLogger.LogBlockBlueScore(bmsg.block, blockBlueScore)
|
|
|
|
// Clear the rejected transactions.
|
|
sm.rejectedTxns = make(map[daghash.TxID]struct{})
|
|
}
|
|
|
|
// We don't want to flood our sync peer with getdata messages, so
|
|
// instead of asking it immediately about missing ancestors, we first
|
|
// wait until it finishes to send us all of the requested blocks.
|
|
if (isOrphan && peer != sm.syncPeer) || (peer == sm.syncPeer && len(state.requestedBlocks) == 0) {
|
|
err := sm.sendInvsFromRequestQueue(peer, state)
|
|
if err != nil {
|
|
log.Errorf("Failed to send invs from queue: %s", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sm *SyncManager) addBlocksToRequestQueue(state *peerSyncState, hashes []*daghash.Hash, isRelayedInv bool) {
|
|
state.requestQueueMtx.Lock()
|
|
defer state.requestQueueMtx.Unlock()
|
|
for _, hash := range hashes {
|
|
if _, exists := sm.requestedBlocks[*hash]; !exists {
|
|
invType := wire.InvTypeSyncBlock
|
|
if isRelayedInv {
|
|
invType = wire.InvTypeBlock
|
|
}
|
|
iv := wire.NewInvVect(invType, hash)
|
|
state.addInvToRequestQueueNoLock(iv)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (state *peerSyncState) addInvToRequestQueueNoLock(iv *wire.InvVect) {
|
|
requestQueue, ok := state.requestQueues[iv.Type]
|
|
if !ok {
|
|
panic(errors.Errorf("got unsupported inventory type %s", iv.Type))
|
|
}
|
|
if _, exists := requestQueue.set[*iv.Hash]; !exists {
|
|
requestQueue.set[*iv.Hash] = struct{}{}
|
|
requestQueue.queue = append(requestQueue.queue, iv)
|
|
}
|
|
}
|
|
|
|
func (state *peerSyncState) addInvToRequestQueue(iv *wire.InvVect) {
|
|
state.requestQueueMtx.Lock()
|
|
defer state.requestQueueMtx.Unlock()
|
|
state.addInvToRequestQueueNoLock(iv)
|
|
}
|
|
|
|
// haveInventory returns whether or not the inventory represented by the passed
|
|
// inventory vector is known. This includes checking all of the various places
|
|
// inventory can be when it is in different states such as blocks that are part
|
|
// of the DAG, in the orphan pool, and transactions that are in the memory pool
|
|
// (either the main pool or orphan pool).
|
|
func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
|
|
switch invVect.Type {
|
|
case wire.InvTypeSyncBlock:
|
|
fallthrough
|
|
case wire.InvTypeBlock:
|
|
// Ask DAG if the block is known to it in any form (in DAG or as an orphan).
|
|
return sm.dag.IsKnownBlock(invVect.Hash), nil
|
|
|
|
case wire.InvTypeTx:
|
|
// Ask the transaction memory pool if the transaction is known
|
|
// to it in any form (main pool or orphan).
|
|
if sm.txMemPool.HaveTransaction((*daghash.TxID)(invVect.Hash)) {
|
|
return true, nil
|
|
}
|
|
|
|
// Check if the transaction exists from the point of view of the
|
|
// DAG's virtual block. Note that this is only a best effort
|
|
// since it is expensive to check existence of every output and
|
|
// the only purpose of this check is to avoid downloading
|
|
// already known transactions. Only the first two outputs are
|
|
// checked because the vast majority of transactions consist of
|
|
// two outputs where one is some form of "pay-to-somebody-else"
|
|
// and the other is a change output.
|
|
prevOut := wire.Outpoint{TxID: daghash.TxID(*invVect.Hash)}
|
|
for i := uint32(0); i < 2; i++ {
|
|
prevOut.Index = i
|
|
entry, ok := sm.dag.GetUTXOEntry(prevOut)
|
|
if !ok {
|
|
return false, nil
|
|
}
|
|
if entry != nil {
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// The requested inventory is is an unsupported type, so just claim
|
|
// it is known to avoid requesting it.
|
|
return true, nil
|
|
}
|
|
|
|
// handleInvMsg handles inv messages from all peers.
|
|
// We examine the inventory advertised by the remote peer and act accordingly.
|
|
func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
|
|
peer := imsg.peer
|
|
state, exists := sm.peerStates[peer]
|
|
if !exists {
|
|
log.Warnf("Received inv message from unknown peer %s", peer)
|
|
return
|
|
}
|
|
|
|
// Attempt to find the final block in the inventory list. There may
|
|
// not be one.
|
|
lastBlock := -1
|
|
invVects := imsg.inv.InvList
|
|
for i := len(invVects) - 1; i >= 0; i-- {
|
|
if invVects[i].IsBlockOrSyncBlock() {
|
|
lastBlock = i
|
|
break
|
|
}
|
|
}
|
|
|
|
// Request the advertised inventory if we don't already have it. Also,
|
|
// request parent blocks of orphans if we receive one we already have.
|
|
// Finally, attempt to detect potential stalls due to big orphan DAGs
|
|
// we already have and request more blocks to prevent them.
|
|
for i, iv := range invVects {
|
|
// Ignore unsupported inventory types.
|
|
switch iv.Type {
|
|
case wire.InvTypeBlock:
|
|
case wire.InvTypeSyncBlock:
|
|
case wire.InvTypeTx:
|
|
default:
|
|
continue
|
|
}
|
|
|
|
// Add the inventory to the cache of known inventory
|
|
// for the peer.
|
|
peer.AddKnownInventory(iv)
|
|
|
|
// Request the inventory if we don't already have it.
|
|
haveInv, err := sm.haveInventory(iv)
|
|
if err != nil {
|
|
log.Warnf("Unexpected failure when checking for "+
|
|
"existing inventory during inv message "+
|
|
"processing: %s", err)
|
|
continue
|
|
}
|
|
if !haveInv {
|
|
if iv.Type == wire.InvTypeTx {
|
|
// Skip the transaction if it has already been rejected.
|
|
if _, exists := sm.rejectedTxns[daghash.TxID(*iv.Hash)]; exists {
|
|
continue
|
|
}
|
|
|
|
// Skip the transaction if it had previously been requested.
|
|
if _, exists := state.requestedTxns[daghash.TxID(*iv.Hash)]; exists {
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Add it to the request queue.
|
|
state.addInvToRequestQueue(iv)
|
|
continue
|
|
}
|
|
|
|
if iv.IsBlockOrSyncBlock() {
|
|
// The block is an orphan block that we already have.
|
|
// When the existing orphan was processed, it requested
|
|
// the missing parent blocks. When this scenario
|
|
// happens, it means there were more blocks missing
|
|
// than are allowed into a single inventory message. As
|
|
// a result, once this peer requested the final
|
|
// advertised block, the remote peer noticed and is now
|
|
// resending the orphan block as an available block
|
|
// to signal there are more missing blocks that need to
|
|
// be requested.
|
|
if sm.dag.IsKnownOrphan(iv.Hash) {
|
|
missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(iv.Hash)
|
|
if err != nil {
|
|
log.Errorf("Failed to find missing ancestors for block %s: %s",
|
|
iv.Hash, err)
|
|
return
|
|
}
|
|
sm.addBlocksToRequestQueue(state, missingAncestors, iv.Type != wire.InvTypeSyncBlock)
|
|
continue
|
|
}
|
|
|
|
// We already have the final block advertised by this
|
|
// inventory message, so force a request for more. This
|
|
// should only happen if our DAG and the peer's DAG have
|
|
// diverged long time ago.
|
|
if i == lastBlock && peer == sm.syncPeer {
|
|
// Request blocks after the first block's ancestor that exists
|
|
// in the selected path chain, one up to the
|
|
// final one the remote peer knows about.
|
|
peer.PushGetBlockLocatorMsg(iv.Hash, sm.dagParams.GenesisHash)
|
|
}
|
|
}
|
|
}
|
|
|
|
err := sm.sendInvsFromRequestQueue(peer, state)
|
|
if err != nil {
|
|
log.Errorf("Failed to send invs from queue: %s", err)
|
|
}
|
|
}
|
|
|
|
func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData, state *peerSyncState, invType wire.InvType, maxInvsToAdd int) error {
|
|
requestQueue, ok := state.requestQueues[invType]
|
|
if !ok {
|
|
panic(errors.Errorf("got unsupported inventory type %s", invType))
|
|
}
|
|
queue := requestQueue.queue
|
|
var invsNum int
|
|
leftSpaceInGdmsg := wire.MaxInvPerGetDataMsg - len(gdmsg.InvList)
|
|
if len(queue) > leftSpaceInGdmsg {
|
|
invsNum = leftSpaceInGdmsg
|
|
} else {
|
|
invsNum = len(queue)
|
|
}
|
|
if invsNum > maxInvsToAdd {
|
|
invsNum = maxInvsToAdd
|
|
}
|
|
invsToAdd := make([]*wire.InvVect, 0, invsNum)
|
|
for len(queue) != 0 && len(invsToAdd) < invsNum {
|
|
var iv *wire.InvVect
|
|
iv, queue = queue[0], queue[1:]
|
|
|
|
exists, err := sm.haveInventory(iv)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exists {
|
|
invsToAdd = append(invsToAdd, iv)
|
|
}
|
|
}
|
|
|
|
addBlockInv := func(iv *wire.InvVect) {
|
|
// Request the block if there is not already a pending
|
|
// request.
|
|
if _, exists := sm.requestedBlocks[*iv.Hash]; !exists {
|
|
sm.requestedBlocks[*iv.Hash] = struct{}{}
|
|
sm.limitHashMap(sm.requestedBlocks, maxRequestedBlocks)
|
|
state.requestedBlocks[*iv.Hash] = struct{}{}
|
|
|
|
gdmsg.AddInvVect(iv)
|
|
}
|
|
}
|
|
for _, iv := range invsToAdd {
|
|
delete(requestQueue.set, *iv.Hash)
|
|
switch invType {
|
|
case wire.InvTypeSyncBlock:
|
|
addBlockInv(iv)
|
|
case wire.InvTypeBlock:
|
|
addBlockInv(iv)
|
|
|
|
case wire.InvTypeTx:
|
|
// Request the transaction if there is not already a
|
|
// pending request.
|
|
if _, exists := sm.requestedTxns[daghash.TxID(*iv.Hash)]; !exists {
|
|
sm.requestedTxns[daghash.TxID(*iv.Hash)] = struct{}{}
|
|
sm.limitTxIDMap(sm.requestedTxns, maxRequestedTxns)
|
|
state.requestedTxns[daghash.TxID(*iv.Hash)] = struct{}{}
|
|
|
|
gdmsg.AddInvVect(iv)
|
|
}
|
|
}
|
|
|
|
if len(queue) >= wire.MaxInvPerGetDataMsg {
|
|
break
|
|
}
|
|
}
|
|
requestQueue.queue = queue
|
|
return nil
|
|
}
|
|
|
|
func (sm *SyncManager) sendInvsFromRequestQueue(peer *peerpkg.Peer, state *peerSyncState) error {
|
|
state.requestQueueMtx.Lock()
|
|
defer state.requestQueueMtx.Unlock()
|
|
gdmsg := wire.NewMsgGetData()
|
|
err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeSyncBlock, wire.MaxSyncBlockInvPerGetDataMsg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if sm.current() {
|
|
err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeBlock, wire.MaxInvPerGetDataMsg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeTx, wire.MaxInvPerGetDataMsg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if len(gdmsg.InvList) > 0 {
|
|
peer.QueueMessage(gdmsg, nil)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// limitTxIDMap is a helper function for maps that require a maximum limit by
|
|
// evicting a random transaction if adding a new value would cause it to
|
|
// overflow the maximum allowed.
|
|
func (sm *SyncManager) limitTxIDMap(m map[daghash.TxID]struct{}, limit int) {
|
|
if len(m)+1 > limit {
|
|
// Remove a random entry from the map. For most compilers, Go's
|
|
// range statement iterates starting at a random item although
|
|
// that is not 100% guaranteed by the spec. The iteration order
|
|
// is not important here because an adversary would have to be
|
|
// able to pull off preimage attacks on the hashing function in
|
|
// order to target eviction of specific entries anyways.
|
|
for txID := range m {
|
|
delete(m, txID)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// limitHashMap is a helper function for maps that require a maximum limit by
|
|
// evicting a random item if adding a new value would cause it to
|
|
// overflow the maximum allowed.
|
|
func (sm *SyncManager) limitHashMap(m map[daghash.Hash]struct{}, limit int) {
|
|
if len(m)+1 > limit {
|
|
// Remove a random entry from the map. For most compilers, Go's
|
|
// range statement iterates starting at a random item although
|
|
// that is not 100% guaranteed by the spec. The iteration order
|
|
// is not important here because an adversary would have to be
|
|
// able to pull off preimage attacks on the hashing function in
|
|
// order to target eviction of specific entries anyways.
|
|
for hash := range m {
|
|
delete(m, hash)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sm *SyncManager) handleProcessBlockMsg(msg processBlockMsg) (isOrphan bool, err error) {
|
|
isOrphan, isDelayed, err := sm.dag.ProcessBlock(
|
|
msg.block, msg.flags|blockdag.BFDisallowDelay)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if isDelayed {
|
|
return false, errors.New("Cannot process blocks from RPC beyond the allowed time offset")
|
|
}
|
|
|
|
return isOrphan, nil
|
|
}
|
|
|
|
func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
|
|
peer := msg.peer
|
|
selectedTipHash := msg.selectedTipHash
|
|
state := sm.peerStates[peer]
|
|
if !state.isPendingForSelectedTip {
|
|
log.Warnf("Got unrequested selected tip message from %s -- "+
|
|
"disconnecting", peer.Addr())
|
|
peer.Disconnect()
|
|
}
|
|
state.isPendingForSelectedTip = false
|
|
if selectedTipHash.IsEqual(peer.SelectedTipHash()) {
|
|
return
|
|
}
|
|
peer.SetSelectedTipHash(selectedTipHash)
|
|
sm.startSync()
|
|
}
|
|
|
|
// messageHandler is the main handler for the sync manager. It must be run as a
|
|
// goroutine. It processes block and inv messages in a separate goroutine
|
|
// from the peer handlers so the block (MsgBlock) messages are handled by a
|
|
// single thread without needing to lock memory data structures. This is
|
|
// important because the sync manager controls which blocks are needed and how
|
|
// the fetching should proceed.
|
|
func (sm *SyncManager) messageHandler() {
|
|
out:
|
|
for {
|
|
select {
|
|
case m := <-sm.msgChan:
|
|
switch msg := m.(type) {
|
|
case *newPeerMsg:
|
|
sm.handleNewPeerMsg(msg.peer)
|
|
|
|
case *txMsg:
|
|
sm.handleTxMsg(msg)
|
|
msg.reply <- struct{}{}
|
|
|
|
case *blockMsg:
|
|
sm.handleBlockMsg(msg)
|
|
msg.reply <- struct{}{}
|
|
|
|
case *invMsg:
|
|
sm.handleInvMsg(msg)
|
|
|
|
case *donePeerMsg:
|
|
sm.handleDonePeerMsg(msg.peer)
|
|
|
|
case *removeFromSyncCandidatesMsg:
|
|
sm.handleRemoveFromSyncCandidatesMsg(msg.peer)
|
|
|
|
case getSyncPeerMsg:
|
|
var peerID int32
|
|
if sm.syncPeer != nil {
|
|
peerID = sm.syncPeer.ID()
|
|
}
|
|
msg.reply <- peerID
|
|
|
|
case processBlockMsg:
|
|
isOrphan, err := sm.handleProcessBlockMsg(msg)
|
|
msg.reply <- processBlockResponse{
|
|
isOrphan: isOrphan,
|
|
err: err,
|
|
}
|
|
|
|
case isCurrentMsg:
|
|
msg.reply <- sm.current()
|
|
|
|
case pauseMsg:
|
|
// Wait until the sender unpauses the manager.
|
|
<-msg.unpause
|
|
|
|
case *selectedTipMsg:
|
|
sm.handleSelectedTipMsg(msg)
|
|
msg.reply <- struct{}{}
|
|
|
|
default:
|
|
log.Warnf("Invalid message type in block "+
|
|
"handler: %T", msg)
|
|
}
|
|
|
|
case <-sm.quit:
|
|
break out
|
|
}
|
|
}
|
|
|
|
sm.wg.Done()
|
|
log.Trace("Block handler done")
|
|
}
|
|
|
|
// handleBlockDAGNotification handles notifications from blockDAG. It does
|
|
// things such as request orphan block parents and relay accepted blocks to
|
|
// connected peers.
|
|
func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notification) {
|
|
switch notification.Type {
|
|
// A block has been accepted into the blockDAG. Relay it to other peers.
|
|
case blockdag.NTBlockAdded:
|
|
data, ok := notification.Data.(*blockdag.BlockAddedNotificationData)
|
|
if !ok {
|
|
log.Warnf("Block Added notification data is of wrong type.")
|
|
break
|
|
}
|
|
block := data.Block
|
|
|
|
// Update mempool
|
|
ch := make(chan mempool.NewBlockMsg)
|
|
spawn(func() {
|
|
err := sm.txMemPool.HandleNewBlock(block, ch)
|
|
close(ch)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("HandleNewBlock failed to handle block %s", block.Hash()))
|
|
}
|
|
})
|
|
|
|
// Relay if we are current and the block was not just now unorphaned.
|
|
// Otherwise peers that are current should already know about it
|
|
if sm.current() && !data.WasUnorphaned {
|
|
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
|
|
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
|
|
}
|
|
|
|
for msg := range ch {
|
|
sm.peerNotifier.TransactionConfirmed(msg.Tx)
|
|
sm.peerNotifier.AnnounceNewTransactions(msg.AcceptedTxs)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NewPeer informs the sync manager of a newly active peer.
|
|
func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) {
|
|
// Ignore if we are shutting down.
|
|
if atomic.LoadInt32(&sm.shutdown) != 0 {
|
|
return
|
|
}
|
|
sm.msgChan <- &newPeerMsg{peer: peer}
|
|
}
|
|
|
|
// QueueTx adds the passed transaction message and peer to the block handling
|
|
// queue. Responds to the done channel argument after the tx message is
|
|
// processed.
|
|
func (sm *SyncManager) QueueTx(tx *util.Tx, peer *peerpkg.Peer, done chan struct{}) {
|
|
// Don't accept more transactions if we're shutting down.
|
|
if atomic.LoadInt32(&sm.shutdown) != 0 {
|
|
done <- struct{}{}
|
|
return
|
|
}
|
|
|
|
sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
|
|
}
|
|
|
|
// QueueBlock adds the passed block message and peer to the block handling
|
|
// queue. Responds to the done channel argument after the block message is
|
|
// processed.
|
|
func (sm *SyncManager) QueueBlock(block *util.Block, peer *peerpkg.Peer, isDelayedBlock bool, done chan struct{}) {
|
|
// Don't accept more blocks if we're shutting down.
|
|
if atomic.LoadInt32(&sm.shutdown) != 0 {
|
|
done <- struct{}{}
|
|
return
|
|
}
|
|
|
|
sm.msgChan <- &blockMsg{block: block, peer: peer, isDelayedBlock: isDelayedBlock, reply: done}
|
|
}
|
|
|
|
// QueueInv adds the passed inv message and peer to the block handling queue.
|
|
func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
|
|
// No channel handling here because peers do not need to block on inv
|
|
// messages.
|
|
if atomic.LoadInt32(&sm.shutdown) != 0 {
|
|
return
|
|
}
|
|
|
|
sm.msgChan <- &invMsg{inv: inv, peer: peer}
|
|
}
|
|
|
|
// QueueSelectedTipMsg adds the passed selected tip message and peer to the
|
|
// block handling queue. Responds to the done channel argument after it finished
|
|
// handling the message.
|
|
func (sm *SyncManager) QueueSelectedTipMsg(msg *wire.MsgSelectedTip, peer *peerpkg.Peer, done chan struct{}) {
|
|
sm.msgChan <- &selectedTipMsg{
|
|
selectedTipHash: msg.SelectedTipHash,
|
|
peer: peer,
|
|
reply: done,
|
|
}
|
|
}
|
|
|
|
// DonePeer informs the blockmanager that a peer has disconnected.
|
|
func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
|
|
// Ignore if we are shutting down.
|
|
if atomic.LoadInt32(&sm.shutdown) != 0 {
|
|
return
|
|
}
|
|
|
|
sm.msgChan <- &donePeerMsg{peer: peer}
|
|
}
|
|
|
|
// RemoveFromSyncCandidates tells the blockmanager to remove a peer as
|
|
// a sync candidate.
|
|
func (sm *SyncManager) RemoveFromSyncCandidates(peer *peerpkg.Peer) {
|
|
// Ignore if we are shutting down.
|
|
if atomic.LoadInt32(&sm.shutdown) != 0 {
|
|
return
|
|
}
|
|
|
|
sm.msgChan <- &removeFromSyncCandidatesMsg{peer: peer}
|
|
}
|
|
|
|
// Start begins the core block handler which processes block and inv messages.
|
|
func (sm *SyncManager) Start() {
|
|
// Already started?
|
|
if atomic.AddInt32(&sm.started, 1) != 1 {
|
|
return
|
|
}
|
|
|
|
log.Trace("Starting sync manager")
|
|
sm.wg.Add(1)
|
|
spawn(sm.messageHandler)
|
|
}
|
|
|
|
// Stop gracefully shuts down the sync manager by stopping all asynchronous
|
|
// handlers and waiting for them to finish.
|
|
func (sm *SyncManager) Stop() error {
|
|
if atomic.AddInt32(&sm.shutdown, 1) != 1 {
|
|
log.Warnf("Sync manager is already in the process of " +
|
|
"shutting down")
|
|
return nil
|
|
}
|
|
|
|
log.Infof("Sync manager shutting down")
|
|
close(sm.quit)
|
|
sm.wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// SyncPeerID returns the ID of the current sync peer, or 0 if there is none.
|
|
func (sm *SyncManager) SyncPeerID() int32 {
|
|
reply := make(chan int32)
|
|
sm.msgChan <- getSyncPeerMsg{reply: reply}
|
|
return <-reply
|
|
}
|
|
|
|
// ProcessBlock makes use of ProcessBlock on an internal instance of a blockDAG.
|
|
func (sm *SyncManager) ProcessBlock(block *util.Block, flags blockdag.BehaviorFlags) (bool, error) {
|
|
reply := make(chan processBlockResponse)
|
|
sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
|
|
response := <-reply
|
|
return response.isOrphan, response.err
|
|
}
|
|
|
|
// IsCurrent returns whether or not the sync manager believes it is synced with
|
|
// the connected peers.
|
|
func (sm *SyncManager) IsCurrent() bool {
|
|
reply := make(chan bool)
|
|
sm.msgChan <- isCurrentMsg{reply: reply}
|
|
return <-reply
|
|
}
|
|
|
|
// Pause pauses the sync manager until the returned channel is closed.
|
|
//
|
|
// Note that while paused, all peer and block processing is halted. The
|
|
// message sender should avoid pausing the sync manager for long durations.
|
|
func (sm *SyncManager) Pause() chan<- struct{} {
|
|
c := make(chan struct{})
|
|
sm.msgChan <- pauseMsg{c}
|
|
return c
|
|
}
|
|
|
|
// New constructs a new SyncManager. Use Start to begin processing asynchronous
|
|
// block, tx, and inv updates.
|
|
func New(config *Config) (*SyncManager, error) {
|
|
sm := SyncManager{
|
|
peerNotifier: config.PeerNotifier,
|
|
dag: config.DAG,
|
|
txMemPool: config.TxMemPool,
|
|
dagParams: config.DAGParams,
|
|
rejectedTxns: make(map[daghash.TxID]struct{}),
|
|
requestedTxns: make(map[daghash.TxID]struct{}),
|
|
requestedBlocks: make(map[daghash.Hash]struct{}),
|
|
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
|
|
progressLogger: newBlockProgressLogger("Processed", log),
|
|
msgChan: make(chan interface{}, config.MaxPeers*3),
|
|
quit: make(chan struct{}),
|
|
}
|
|
|
|
sm.dag.Subscribe(sm.handleBlockDAGNotification)
|
|
|
|
return &sm, nil
|
|
}
|