[NOD-16] implement initial sync first version (#234)

* [NOD-58] Replace lastBlock with selected tip in version message (#210)

* [NOD-58] Replace lastBlock with selected tip in version message

* [NOD-58] Fix typo in comment

* [NOD-58] Add mutex to SelectedTipHash

* [NOD-58] Remove redundant comment

* [NOD-58] Remove wantStartingHeight from peerStats

* [NOD-58] Remove lock from SelectedTipHash

* Nod 53 change getheaders message to handle new block locator (#213)

* [NOD-53] Change getheaders message to handle the new block locator mechanism

* [NOD-53] Use heap in locateHeaders

* [NOD-53] Create a constructor for each heap direction

* [NOD-57] Check if a node is synced only by timestamps (#214)

* [NOD-60] implement isSyncCandidate (#218)

* [NOD-60] Implement isSyncCandidate

* [NOD-60] Fix typo

* [NOD-65] Fix netsync related tests and remove fields optionality from… (#220)

* [NOD-65] Fix netsync related tests and remove fields optionality from msgversion

* [NOD-65] gofmt rpcserver.go

* [NOD-65] add missing test for verRelayTxFalse

* [NOD-62] Change getblocks message to handle the new block locator mechanism (#219)

* [NOD-62] Change getblocks message to handle the new block locator mechanism

* [NOD-62] Add locateBlockNodes function

* [NOD-68] Adjust orphan parents requesting for a DAG (#222)

* [NOD-68] Adjust orphan parents requesting for a DAG

* [NOD-68] add sendInvsFromRequestedQueue and trigger it when requested blocks slice is empty, or immediatly if we're not in sync mode

* [NOD-68] Prevent duplicates from entering to state.requestQueue and add wrapping locks to addBlocksToRequestQueue

* [NOD-68] Fix Lock -> Unlock in sendInvsFromRequestedQueue

* [NOD-74] Starts syncing again when the current sync peer is done (#225)

* [NOD-74] Starts syncing again when the current sync peer is done

* [NOD-74] Unlock mtx before netsync is restarted

* [NOD-74] Fix name isSyncPeerFree -> isWaitingForBlocks

* [NOD-75] fixing netsync bugs (#227)

* [NOD-74] Starts syncing again when the current sync peer is done

* [NOD-74] Unlock mtx before netsync is restarted

* [NOD-75] Fixing netsync bugs

* [NOD-80] Request block data from block propagation just after you are… (#231)

* [NOD-80] Request block data from block propagation just after you are current

* [NOD-80] Fix adding to both queues in addInvToRequestQueue

* [NOD-81] Start to mine on top of genesis iff all peers selected tip is genesis (#232)

* [NOD-81] Start to mine on top of genesis only if all of your peers' selected tip is genesis

* [NOD-81] Explain forAllPeers/forAllOutboundPeers shouldContinue behaviour in comments

* [NOD-81] Add forAllInboundPeers and add return values for forAllPeers/forAllOutboundPeers/forAllInboundPeers functions

* [NOD-16] Add pushSet to the BlockHeap type

* [NOD-16] Fixed syntax error
This commit is contained in:
Ori Newman 2019-03-31 16:47:28 +03:00 committed by Svarog
parent 1fe1b11823
commit 62d14bf2bd
23 changed files with 703 additions and 735 deletions

View File

@ -24,12 +24,26 @@ func (h *baseHeap) Pop() interface{} {
return popped
}
func (h baseHeap) Less(i, j int) bool {
if h[i].height == h[j].height {
return daghash.HashToBig(&h[i].hash).Cmp(daghash.HashToBig(&h[j].hash)) > 0
// upHeap extends baseHeap to include Less operation that traverses from bottom to top
type upHeap struct{ baseHeap }
func (h upHeap) Less(i, j int) bool {
if h.baseHeap[i].height == h.baseHeap[j].height {
return daghash.HashToBig(&h.baseHeap[i].hash).Cmp(daghash.HashToBig(&h.baseHeap[j].hash)) < 0
}
return h[i].height > h[j].height
return h.baseHeap[i].height < h.baseHeap[j].height
}
// downHeap extends baseHeap to include Less operation that traverses from top to bottom
type downHeap struct{ baseHeap }
func (h downHeap) Less(i, j int) bool {
if h.baseHeap[i].height == h.baseHeap[j].height {
return daghash.HashToBig(&h.baseHeap[i].hash).Cmp(daghash.HashToBig(&h.baseHeap[j].hash)) > 0
}
return h.baseHeap[i].height > h.baseHeap[j].height
}
// BlockHeap represents a mutable heap of Blocks, sorted by their height
@ -37,9 +51,16 @@ type BlockHeap struct {
impl heap.Interface
}
// NewHeap initializes and returns a new BlockHeap
func NewHeap() BlockHeap {
h := BlockHeap{impl: &baseHeap{}}
// NewDownHeap initializes and returns a new BlockHeap
func NewDownHeap() BlockHeap {
h := BlockHeap{impl: &downHeap{}}
heap.Init(h.impl)
return h
}
// NewUpHeap initializes and returns a new BlockHeap
func NewUpHeap() BlockHeap {
h := BlockHeap{impl: &upHeap{}}
heap.Init(h.impl)
return h
}
@ -54,6 +75,13 @@ func (bh BlockHeap) Push(block *blockNode) {
heap.Push(bh.impl, block)
}
// pushSet pushes a blockset to the heap.
func (bh BlockHeap) pushSet(bs blockSet) {
for _, block := range bs {
heap.Push(bh.impl, block)
}
}
// Len returns the length of this heap
func (bh BlockHeap) Len() int {
return bh.impl.Len()

View File

@ -19,72 +19,102 @@ func TestBlockHeap(t *testing.T) {
block0smallHash.hash = daghash.Hash{}
tests := []struct {
name string
toPush []*blockNode
expectedLength int
expectedPop *blockNode
name string
toPush []*blockNode
expectedLength int
expectedPopUp *blockNode
expectedPopDown *blockNode
}{
{
name: "empty heap must have length 0",
toPush: []*blockNode{},
expectedLength: 0,
expectedPop: nil,
name: "empty heap must have length 0",
toPush: []*blockNode{},
expectedLength: 0,
expectedPopDown: nil,
expectedPopUp: nil,
},
{
name: "heap with one push must have length 1",
toPush: []*blockNode{block0},
expectedLength: 1,
expectedPop: nil,
name: "heap with one push must have length 1",
toPush: []*blockNode{block0},
expectedLength: 1,
expectedPopDown: nil,
expectedPopUp: nil,
},
{
name: "heap with one push and one pop",
toPush: []*blockNode{block0},
expectedLength: 0,
expectedPop: block0,
name: "heap with one push and one pop",
toPush: []*blockNode{block0},
expectedLength: 0,
expectedPopDown: block0,
expectedPopUp: block0,
},
{
name: "push two blocks with different heights, heap shouldn't have to rebalance",
toPush: []*blockNode{block100000, block0},
expectedLength: 1,
expectedPop: block100000,
name: "push two blocks with different heights, heap shouldn't have to rebalance " +
"for down direction, but will have to rebalance for up direction",
toPush: []*blockNode{block100000, block0},
expectedLength: 1,
expectedPopDown: block100000,
expectedPopUp: block0,
},
{
name: "push two blocks with different heights, heap must rebalance",
toPush: []*blockNode{block0, block100000},
expectedLength: 1,
expectedPop: block100000,
name: "push two blocks with different heights, heap shouldn't have to rebalance " +
"for up direction, but will have to rebalance for down direction",
toPush: []*blockNode{block0, block100000},
expectedLength: 1,
expectedPopDown: block100000,
expectedPopUp: block0,
},
{
name: "push two blocks with equal heights but different hashes, heap shouldn't have to rebalance",
toPush: []*blockNode{block0, block0smallHash},
expectedLength: 1,
expectedPop: block0,
name: "push two blocks with equal heights but different hashes, heap shouldn't have to rebalance " +
"for down direction, but will have to rebalance for up direction",
toPush: []*blockNode{block0, block0smallHash},
expectedLength: 1,
expectedPopDown: block0,
expectedPopUp: block0smallHash,
},
{
name: "push two blocks with equal heights but different hashes, heap must rebalance",
toPush: []*blockNode{block0smallHash, block0},
expectedLength: 1,
expectedPop: block0,
name: "push two blocks with equal heights but different hashes, heap shouldn't have to rebalance " +
"for up direction, but will have to rebalance for down direction",
toPush: []*blockNode{block0smallHash, block0},
expectedLength: 1,
expectedPopDown: block0,
expectedPopUp: block0smallHash,
},
}
for _, test := range tests {
heap := NewHeap()
dHeap := NewDownHeap()
for _, block := range test.toPush {
heap.Push(block)
dHeap.Push(block)
}
var poppedBlock *blockNode
if test.expectedPop != nil {
poppedBlock = heap.pop()
if test.expectedPopDown != nil {
poppedBlock = dHeap.pop()
}
if heap.Len() != test.expectedLength {
t.Errorf("unexpected heap length in test \"%s\". "+
"Expected: %v, got: %v", test.name, test.expectedLength, heap.Len())
if dHeap.Len() != test.expectedLength {
t.Errorf("unexpected down heap length in test \"%s\". "+
"Expected: %v, got: %v", test.name, test.expectedLength, dHeap.Len())
}
if poppedBlock != test.expectedPop {
t.Errorf("unexpected popped block in test \"%s\". "+
"Expected: %v, got: %v", test.name, test.expectedPop, poppedBlock)
if poppedBlock != test.expectedPopDown {
t.Errorf("unexpected popped block for down heap in test \"%s\". "+
"Expected: %v, got: %v", test.name, test.expectedPopDown, poppedBlock)
}
uHeap := NewUpHeap()
for _, block := range test.toPush {
uHeap.Push(block)
}
poppedBlock = nil
if test.expectedPopUp != nil {
poppedBlock = uHeap.pop()
}
if uHeap.Len() != test.expectedLength {
t.Errorf("unexpected up heap length in test \"%s\". "+
"Expected: %v, got: %v", test.name, test.expectedLength, uHeap.Len())
}
if poppedBlock != test.expectedPopUp {
t.Errorf("unexpected popped block for up heap in test \"%s\". "+
"Expected: %v, got: %v", test.name, test.expectedPopDown, poppedBlock)
}
}
}

View File

@ -158,7 +158,7 @@ type BlockDAG struct {
//
// This function is safe for concurrent access.
func (dag *BlockDAG) HaveBlock(hash *daghash.Hash) (bool, error) {
exists, err := dag.blockExists(hash)
exists, err := dag.BlockExists(hash)
if err != nil {
return false, err
}
@ -204,30 +204,41 @@ func (dag *BlockDAG) IsKnownOrphan(hash *daghash.Hash) bool {
return exists
}
// GetOrphanRoot returns the head of the chain for the provided hash from the
// map of orphan blocks.
// GetOrphanMissingAncestorHashes returns all of the missing parents in the orphan's sub-DAG
//
// This function is safe for concurrent access.
func (dag *BlockDAG) GetOrphanRoot(hash *daghash.Hash) *daghash.Hash {
func (dag *BlockDAG) GetOrphanMissingAncestorHashes(hash *daghash.Hash) ([]*daghash.Hash, error) {
// Protect concurrent access. Using a read lock only so multiple
// readers can query without blocking each other.
dag.orphanLock.RLock()
defer dag.orphanLock.RUnlock()
// Keep looping while the parent of each orphaned block is
// known and is an orphan itself.
orphanRoot := hash
parentHash := hash
for {
orphan, exists := dag.orphans[*parentHash]
if !exists {
break
}
orphanRoot = parentHash
parentHash = orphan.block.MsgBlock().Header.SelectedParentHash()
}
missingAncestorsHashes := make([]*daghash.Hash, 0)
return orphanRoot
visited := make(map[daghash.Hash]bool)
queue := []*daghash.Hash{hash}
for len(queue) > 0 {
var current *daghash.Hash
current, queue = queue[0], queue[1:]
if !visited[*current] {
visited[*current] = true
orphan, orphanExists := dag.orphans[*current]
if orphanExists {
for _, parentHash := range orphan.block.MsgBlock().Header.ParentHashes {
queue = append(queue, &parentHash)
}
} else {
existsInDag, err := dag.BlockExists(current)
if err != nil {
return nil, err
}
if !existsInDag {
missingAncestorsHashes = append(missingAncestorsHashes, current)
}
}
}
}
return missingAncestorsHashes, nil
}
// removeOrphanBlock removes the passed orphan block from the orphan pool and
@ -1037,8 +1048,6 @@ func (dag *BlockDAG) IsCurrent() bool {
// selectedTip returns the current selected tip for the DAG.
// It will return nil if there is no tip.
//
// This function is safe for concurrent access.
func (dag *BlockDAG) selectedTip() *blockNode {
return dag.virtual.selectedParent
}
@ -1056,6 +1065,19 @@ func (dag *BlockDAG) SelectedTipHeader() *wire.BlockHeader {
return selectedTip.Header()
}
// SelectedTipHash returns the hash of the current selected tip for the DAG.
// It will return nil if there is no tip.
//
// This function is safe for concurrent access.
func (dag *BlockDAG) SelectedTipHash() *daghash.Hash {
selectedTip := dag.selectedTip()
if selectedTip == nil {
return nil
}
return &selectedTip.hash
}
// UTXOSet returns the DAG's UTXO set
func (dag *BlockDAG) UTXOSet() *FullUTXOSet {
return dag.virtual.utxoSet
@ -1125,19 +1147,26 @@ func (dag *BlockDAG) HeaderByHash(hash *daghash.Hash) (*wire.BlockHeader, error)
return node.Header(), nil
}
// BlockLocatorFromHash returns a block locator for the passed block hash.
// BlockLocatorFromHash traverses the selected parent chain of the given block hash
// until it finds a block that exists in the virtual's selected parent chain, and
// then it returns its block locator.
// See BlockLocator for details on the algorithm used to create a block locator.
//
// In addition to the general algorithm referenced above, this function will
// return the block locator for the latest known tip of the main (best) chain if
// return the block locator for the selected tip if
// the passed hash is not currently known.
//
// This function is safe for concurrent access.
func (dag *BlockDAG) BlockLocatorFromHash(hash *daghash.Hash) BlockLocator {
dag.dagLock.RLock()
defer dag.dagLock.RUnlock()
node := dag.index.LookupNode(hash)
if node != nil {
for !dag.IsInSelectedPathChain(&node.hash) {
node = node.selectedParent
}
}
locator := dag.blockLocator(node)
dag.dagLock.RUnlock()
return locator
}
@ -1145,11 +1174,11 @@ func (dag *BlockDAG) BlockLocatorFromHash(hash *daghash.Hash) BlockLocator {
// main (best) chain.
//
// This function is safe for concurrent access.
func (dag *BlockDAG) LatestBlockLocator() (BlockLocator, error) {
func (dag *BlockDAG) LatestBlockLocator() BlockLocator {
dag.dagLock.RLock()
defer dag.dagLock.RUnlock()
locator := dag.blockLocator(nil)
dag.dagLock.RUnlock()
return locator, nil
return locator
}
// blockLocator returns a block locator for the passed block node. The passed
@ -1348,24 +1377,16 @@ func (dag *BlockDAG) locateInventory(locator BlockLocator, hashStop *daghash.Has
}
}
// Start at the block after the most recently known block. When there
// is no next block it means the most recently known block is the tip of
// the best chain, so there is nothing more to do.
startNode = startNode.diffChild
if startNode == nil {
return nil, 0
}
// Calculate how many entries are needed.
total := uint32((dag.selectedTip().height - startNode.height) + 1)
// Estimate how many entries are needed.
estimatedEntries := uint32((dag.selectedTip().blueScore - startNode.blueScore) + 1)
if stopNode != nil && stopNode.height >= startNode.height {
total = uint32((stopNode.height - startNode.height) + 1)
estimatedEntries = uint32((stopNode.blueScore - startNode.blueScore) + 1)
}
if total > maxEntries {
total = maxEntries
if estimatedEntries > maxEntries {
estimatedEntries = maxEntries
}
return startNode, total
return startNode, estimatedEntries
}
// locateBlocks returns the hashes of the blocks after the first known block in
@ -1376,23 +1397,47 @@ func (dag *BlockDAG) locateInventory(locator BlockLocator, hashStop *daghash.Has
//
// This function MUST be called with the DAG state lock held (for reads).
func (dag *BlockDAG) locateBlocks(locator BlockLocator, hashStop *daghash.Hash, maxHashes uint32) []daghash.Hash {
// Find the node after the first known block in the locator and the
// total number of nodes after it needed while respecting the stop hash
// and max entries.
node, total := dag.locateInventory(locator, hashStop, maxHashes)
if total == 0 {
return nil
}
// Populate and return the found hashes.
hashes := make([]daghash.Hash, 0, total)
for i := uint32(0); i < total; i++ {
hashes = append(hashes, node.hash)
node = node.diffChild
nodes := dag.locateBlockNodes(locator, hashStop, maxHashes)
hashes := make([]daghash.Hash, len(nodes))
for i, node := range nodes {
hashes[i] = node.hash
}
return hashes
}
func (dag *BlockDAG) locateBlockNodes(locator BlockLocator, hashStop *daghash.Hash, maxEntries uint32) []*blockNode {
// Find the first known block in the locator and the
// estimated number of nodes after it needed while respecting the stop hash
// and max entries.
node, estimatedEntries := dag.locateInventory(locator, hashStop, maxEntries)
if estimatedEntries == 0 {
return nil
}
stopNode := dag.index.LookupNode(hashStop)
// Populate and return the found nodes.
nodes := make([]*blockNode, 0, estimatedEntries)
queue := NewUpHeap()
queue.pushSet(node.children)
visited := newSet()
for i := uint32(0); queue.Len() > 0 && i < maxEntries; i++ {
var current *blockNode
current = queue.pop()
if !visited.contains(current) {
visited.add(current)
isBeforeStop := (stopNode == nil) || (current.height < stopNode.height)
if isBeforeStop || current.hash.IsEqual(hashStop) {
nodes = append(nodes, current)
}
if isBeforeStop {
queue.pushSet(current.children)
}
}
}
return nodes
}
// LocateBlocks returns the hashes of the blocks after the first known block in
// the locator until the provided stop hash is reached, or up to the provided
// max number of block hashes.
@ -1421,19 +1466,10 @@ func (dag *BlockDAG) LocateBlocks(locator BlockLocator, hashStop *daghash.Hash,
//
// This function MUST be called with the DAG state lock held (for reads).
func (dag *BlockDAG) locateHeaders(locator BlockLocator, hashStop *daghash.Hash, maxHeaders uint32) []*wire.BlockHeader {
// Find the node after the first known block in the locator and the
// total number of nodes after it needed while respecting the stop hash
// and max entries.
node, total := dag.locateInventory(locator, hashStop, maxHeaders)
if total == 0 {
return nil
}
// Populate and return the found headers.
headers := make([]*wire.BlockHeader, 0, total)
for i := uint32(0); i < total; i++ {
headers = append(headers, node.Header())
node = node.diffChild
nodes := dag.locateBlockNodes(locator, hashStop, maxHeaders)
headers := make([]*wire.BlockHeader, len(nodes))
for i, node := range nodes {
headers[i] = node.Header()
}
return headers
}

View File

@ -77,7 +77,7 @@ func blueCandidates(chainStart *blockNode) blockSet {
func traverseCandidates(newBlock *blockNode, candidates blockSet, selectedParent *blockNode) []*blockNode {
blues := []*blockNode{}
selectedParentPast := newSet()
queue := NewHeap()
queue := NewDownHeap()
visited := newSet()
for _, parent := range newBlock.parents {

View File

@ -33,11 +33,11 @@ const (
BFNone BehaviorFlags = 0
)
// blockExists determines whether a block with the given hash exists either in
// the main chain or any side chains.
// BlockExists determines whether a block with the given hash exists in
// the DAG.
//
// This function is safe for concurrent access.
func (dag *BlockDAG) blockExists(hash *daghash.Hash) (bool, error) {
func (dag *BlockDAG) BlockExists(hash *daghash.Hash) (bool, error) {
// Check block index first (could be main chain or side chain blocks).
if dag.index.HaveBlock(hash) {
return true, nil
@ -148,7 +148,7 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (bool,
log.Tracef("Processing block %s", blockHash)
// The block must not already exist in the main chain or side chains.
exists, err := dag.blockExists(blockHash)
exists, err := dag.BlockExists(blockHash)
if err != nil {
return false, err
}
@ -212,7 +212,7 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (bool,
// Handle orphan blocks.
allParentsExist := true
for _, parentHash := range blockHeader.ParentHashes {
parentExists, err := dag.blockExists(&parentHash)
parentExists, err := dag.BlockExists(&parentHash)
if err != nil {
return false, err
}

View File

@ -754,7 +754,7 @@ func (dag *BlockDAG) validateDifficulty(header *wire.BlockHeader, bluestParent *
// validateParents validates that no parent is an ancestor of another parent
func validateParents(blockHeader *wire.BlockHeader, parents blockSet) error {
minHeight := int32(math.MaxInt32)
queue := NewHeap()
queue := NewDownHeap()
visited := newSet()
for _, parent := range parents {
if parent.height < minHeight {

View File

@ -32,7 +32,7 @@ func TestVirtualBlock(t *testing.T) {
// Set its tips to tipsToSet
// Add to it all the tips in tipsToAdd, one after the other
// Call .Tips() on it and compare the result to expectedTips
// Call .SelectedTip() on it and compare the result to expectedSelectedParent
// Call .selectedTip() on it and compare the result to expectedSelectedParent
tests := []struct {
name string
tipsToSet []*blockNode

View File

@ -229,26 +229,25 @@ type GetNetworkInfoResult struct {
// GetPeerInfoResult models the data returned from the getpeerinfo command.
type GetPeerInfoResult struct {
ID int32 `json:"id"`
Addr string `json:"addr"`
Services string `json:"services"`
RelayTxes bool `json:"relayTxes"`
LastSend int64 `json:"lastSend"`
LastRecv int64 `json:"lastRecv"`
BytesSent uint64 `json:"bytesSent"`
BytesRecv uint64 `json:"bytesRecv"`
ConnTime int64 `json:"connTime"`
TimeOffset int64 `json:"timeOffset"`
PingTime float64 `json:"pingTime"`
PingWait float64 `json:"pingWait,omitempty"`
Version uint32 `json:"version"`
SubVer string `json:"subVer"`
Inbound bool `json:"inbound"`
StartingHeight int32 `json:"startingHeight"`
CurrentHeight int32 `json:"currentHeight,omitempty"`
BanScore int32 `json:"banScore"`
FeeFilter int64 `json:"feeFilter"`
SyncNode bool `json:"syncNode"`
ID int32 `json:"id"`
Addr string `json:"addr"`
Services string `json:"services"`
RelayTxes bool `json:"relayTxes"`
LastSend int64 `json:"lastSend"`
LastRecv int64 `json:"lastRecv"`
BytesSent uint64 `json:"bytesSent"`
BytesRecv uint64 `json:"bytesRecv"`
ConnTime int64 `json:"connTime"`
TimeOffset int64 `json:"timeOffset"`
PingTime float64 `json:"pingTime"`
PingWait float64 `json:"pingWait,omitempty"`
Version uint32 `json:"version"`
SubVer string `json:"subVer"`
Inbound bool `json:"inbound"`
SelectedTip string `json:"selectedTip,omitempty"`
BanScore int32 `json:"banScore"`
FeeFilter int64 `json:"feeFilter"`
SyncNode bool `json:"syncNode"`
}
// GetRawMempoolVerboseResult models the data returned from the getrawmempool

View File

@ -71,6 +71,11 @@ type Config struct {
// found blocks to.
ConnectedCount func() int32
// ShouldMineOnGenesis checks if the node is connected to at least one
// peer, and at least one of its peers knows of any blocks that were mined
// on top of the genesis block.
ShouldMineOnGenesis func() bool
// IsCurrent defines the function to use to obtain whether or not the
// block chain is current. This is used by the automatic persistent
// mining routine to determine whether or it should attempt mining.
@ -309,14 +314,14 @@ out:
continue
}
// No point in searching for a solution before the chain is
// No point in searching for a solution before the DAG is
// synced. Also, grab the same lock as used for block
// submission, since the current block will be changing and
// this would otherwise end up building a new block template on
// a block that is in the process of becoming stale.
m.submitBlockLock.Lock()
curHeight := m.g.DAGHeight()
if curHeight != 0 && !m.cfg.IsCurrent() {
if (curHeight != 0 && !m.cfg.IsCurrent()) || (curHeight == 0 && !m.cfg.ShouldMineOnGenesis()) {
m.submitBlockLock.Unlock()
time.Sleep(time.Second)
continue

View File

@ -7,9 +7,7 @@ package netsync
import (
"github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/dagconfig"
"github.com/daglabs/btcd/dagconfig/daghash"
"github.com/daglabs/btcd/mempool"
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/wire"
)
@ -20,8 +18,6 @@ import (
type PeerNotifier interface {
AnnounceNewTransactions(newTxs []*mempool.TxDesc)
UpdatePeerHeights(latestBlkHash *daghash.Hash, latestHeight int32, updateSource *peer.Peer)
RelayInventory(invVect *wire.InvVect, data interface{})
TransactionConfirmed(tx *util.Tx)

View File

@ -131,9 +131,17 @@ type headerNode struct {
// about a peer.
type peerSyncState struct {
syncCandidate bool
requestQueue []*wire.InvVect
requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{}
requestQueueMtx sync.Mutex
// relayedInvsRequestQueue contains invs of blocks and transactions
// which are relayed to our node.
relayedInvsRequestQueue []*wire.InvVect
// requestQueue contains all of the invs that are not relayed to
// us; we get them by requesting them or by manually creating them.
requestQueue []*wire.InvVect
relayedInvsRequestQueueSet map[daghash.Hash]struct{}
requestQueueSet map[daghash.Hash]struct{}
requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{}
}
// SyncManager is used to communicate block related messages with peers. The
@ -230,13 +238,14 @@ func (sm *SyncManager) startSync() {
continue
}
// Remove sync candidate peers that are no longer candidates due
// to passing their latest known block. NOTE: The < is
// intentional as opposed to <=. While technically the peer
// doesn't have a later block when it's equal, it will likely
// have one soon so it is a reasonable choice. It also allows
// the case where both are at 0 such as during regression test.
if peer.LastBlock() < sm.dag.Height() { //TODO: (Ori) This is probably wrong. Done only for compilation
isCandidate, err := peer.IsSyncCandidate()
if err != nil {
log.Errorf("Failed to check if peer %s is"+
"a sync candidate: %s", peer, err)
return
}
if !isCandidate {
state.syncCandidate = false
continue
}
@ -253,15 +262,10 @@ func (sm *SyncManager) startSync() {
// to send.
sm.requestedBlocks = make(map[daghash.Hash]struct{})
locator, err := sm.dag.LatestBlockLocator()
if err != nil {
log.Errorf("Failed to get block locator for the "+
"latest block: %s", err)
return
}
locator := sm.dag.LatestBlockLocator()
log.Infof("Syncing to block height %d from peer %s",
bestPeer.LastBlock(), bestPeer.Addr())
log.Infof("Syncing to block %s from peer %s",
bestPeer.SelectedTip(), bestPeer.Addr())
// When the current height is less than a known checkpoint we
// can use block headers to learn about which blocks comprise
@ -342,9 +346,11 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
// Initialize the peer state
isSyncCandidate := sm.isSyncCandidate(peer)
sm.peerStates[peer] = &peerSyncState{
syncCandidate: isSyncCandidate,
requestedTxns: make(map[daghash.TxID]struct{}),
requestedBlocks: make(map[daghash.Hash]struct{}),
syncCandidate: isSyncCandidate,
requestedTxns: make(map[daghash.TxID]struct{}),
requestedBlocks: make(map[daghash.Hash]struct{}),
requestQueueSet: make(map[daghash.Hash]struct{}),
relayedInvsRequestQueueSet: make(map[daghash.Hash]struct{}),
}
// Start syncing by choosing the best candidate if needed.
@ -466,22 +472,26 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
// current returns true if we believe we are synced with our peers, false if we
// still have blocks to check
func (sm *SyncManager) current() bool {
if !sm.dag.IsCurrent() {
return false
}
return sm.dag.IsCurrent()
}
// if blockChain thinks we are current and we have no syncPeer it
// is probably right.
if sm.syncPeer == nil {
return true
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any
// blocks from the current one.
func (sm *SyncManager) restartSyncIfNeeded() {
if sm.syncPeer != nil {
syncPeerState, exists := sm.peerStates[sm.syncPeer]
if exists {
isWaitingForBlocks := func() bool {
syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueue) != 0
}()
if isWaitingForBlocks {
return
}
}
}
// No matter what chain thinks, if we are below the block we are syncing
// to we are not current.
if sm.dag.Height() < sm.syncPeer.LastBlock() { //TODO: (Ori) This is probably wrong. Done only for compilation
return false
}
return true
sm.startSync()
}
// handleBlockMsg handles block messages from all peers.
@ -533,14 +543,17 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}
// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
// Process the block to include validation, orphan handling, etc.
isOrphan, err := sm.dag.ProcessBlock(bmsg.block, behaviorFlags)
// Remove block from request maps. Either DAG knows about it and
// so we shouldn't have any more instances of trying to fetch it, or
// the insertion fails and thus we'll retry next time we get an inv.
delete(state.requestedBlocks, *blockHash)
delete(sm.requestedBlocks, *blockHash)
// Process the block to include validation, orphan handling, etc.
isOrphan, err := sm.dag.ProcessBlock(bmsg.block, behaviorFlags)
sm.restartSyncIfNeeded()
if err != nil {
// When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
@ -565,69 +578,32 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
return
}
// Meta-data about the new block this peer is reporting. We use this
// below to update this peer's lastest block height and the heights of
// other peers based on their last announced block hash. This allows us
// to dynamically update the block heights of peers, avoiding stale
// heights when looking for a new sync peer. Upon acceptance of a block
// or recognition of an orphan, we also use this information to update
// the block heights over other peers who's invs may have been ignored
// if we are actively syncing while the chain is not yet current or
// who may have lost the lock announcment race.
var heightUpdate int32
var blkHashUpdate *daghash.Hash
// Request the parents for the orphan block from the peer that sent it.
if isOrphan {
// We've just received an orphan block from a peer. In order
// to update the height of the peer, we try to extract the
// block height from the scriptSig of the coinbase transaction.
// Extraction is only attempted if the block's version is
// high enough (ver 2+).
coinbaseTx := bmsg.block.Transactions()[0]
cbHeight, err := blockdag.ExtractCoinbaseHeight(coinbaseTx)
missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(blockHash)
if err != nil {
log.Warnf("Unable to extract height from "+
"coinbase tx: %s", err)
} else {
log.Debugf("Extracted height of %d from "+
"orphan block", cbHeight)
heightUpdate = cbHeight
blkHashUpdate = blockHash
}
orphanRoot := sm.dag.GetOrphanRoot(blockHash)
locator, err := sm.dag.LatestBlockLocator()
if err != nil {
log.Warnf("Failed to get block locator for the "+
"latest block: %s", err)
} else {
peer.PushGetBlocksMsg(locator, orphanRoot)
log.Errorf("Failed to find missing ancestors for block %s: %s",
blockHash, err)
return
}
sm.addBlocksToRequestQueue(state, missingAncestors, false)
} else {
// When the block is not an orphan, log information about it and
// update the chain state.
sm.progressLogger.LogBlockHeight(bmsg.block)
// Update this peer's latest block height, for future
// potential sync node candidacy.
highestTipHash := sm.dag.HighestTipHash()
heightUpdate = sm.dag.Height() //TODO: (Ori) This is probably wrong. Done only for compilation
blkHashUpdate = &highestTipHash
// Clear the rejected transactions.
sm.rejectedTxns = make(map[daghash.TxID]struct{})
}
// Update the block height for this peer. But only send a message to
// the server for updating peer heights if this is an orphan or our
// chain is "current". This avoids sending a spammy amount of messages
// if we're syncing the chain from scratch.
if blkHashUpdate != nil && heightUpdate != 0 {
peer.UpdateLastBlockHeight(heightUpdate)
if isOrphan || sm.current() {
go sm.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate,
peer)
// We don't want to flood our sync peer with getdata messages, so
// instead of asking it immediately about missing ancestors, we first
// wait until it finishes to send us all of the requested blocks.
if (isOrphan && peer != sm.syncPeer) || (peer == sm.syncPeer && len(state.requestedBlocks) == 0) {
err := sm.sendInvsFromRequestQueue(peer, state)
if err != nil {
log.Errorf("Failed to send invs from queue: %s", err)
return
}
}
@ -683,6 +659,37 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}
func (sm *SyncManager) addBlocksToRequestQueue(state *peerSyncState, hashes []*daghash.Hash, isRelayedInv bool) {
state.requestQueueMtx.Lock()
defer state.requestQueueMtx.Unlock()
for _, hash := range hashes {
if _, exists := sm.requestedBlocks[*hash]; !exists {
iv := wire.NewInvVect(wire.InvTypeBlock, hash)
state.addInvToRequestQueue(iv, isRelayedInv)
}
}
}
func (state *peerSyncState) addInvToRequestQueue(iv *wire.InvVect, isRelayedInv bool) {
if isRelayedInv {
if _, exists := state.relayedInvsRequestQueueSet[iv.Hash]; !exists {
state.relayedInvsRequestQueueSet[iv.Hash] = struct{}{}
state.relayedInvsRequestQueue = append(state.relayedInvsRequestQueue, iv)
}
} else {
if _, exists := state.requestQueueSet[iv.Hash]; !exists {
state.requestQueueSet[iv.Hash] = struct{}{}
state.requestQueue = append(state.requestQueue, iv)
}
}
}
func (state *peerSyncState) addInvToRequestQueueWithLock(iv *wire.InvVect, isRelayedInv bool) {
state.requestQueueMtx.Lock()
defer state.requestQueueMtx.Unlock()
state.addInvToRequestQueue(iv, isRelayedInv)
}
// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
// list of blocks to be downloaded based on the current list of headers.
func (sm *SyncManager) fetchHeaderBlocks() {
@ -845,9 +852,10 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
// are in the memory pool (either the main pool or orphan pool).
func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
switch invVect.Type {
case wire.InvTypeSyncBlock:
fallthrough
case wire.InvTypeBlock:
// Ask chain if the block is known to it in any form (main
// chain, side chain, or orphan).
// Ask DAG if the block is known to it in any form (in DAG or as an orphan).
return sm.dag.HaveBlock(&invVect.Hash)
case wire.InvTypeTx:
@ -900,30 +908,12 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
lastBlock := -1
invVects := imsg.inv.InvList
for i := len(invVects) - 1; i >= 0; i-- {
if invVects[i].Type == wire.InvTypeBlock {
if invVects[i].IsBlockOrSyncBlock() {
lastBlock = i
break
}
}
// If this inv contains a block announcement, and this isn't coming from
// our current sync peer or we're current, then update the last
// announced block for this peer. We'll use this information later to
// update the heights of peers based on blocks we've accepted that they
// previously announced.
if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) {
peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
}
// If our chain is current and a peer announces a block we already
// know of, then update their current block height.
if lastBlock != -1 && sm.current() {
blkHeight, err := sm.dag.BlockHeightByHash(&invVects[lastBlock].Hash)
if err == nil {
peer.UpdateLastBlockHeight(blkHeight)
}
}
// Request the advertised inventory if we don't already have it. Also,
// request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains
@ -932,6 +922,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
// Ignore unsupported inventory types.
switch iv.Type {
case wire.InvTypeBlock:
case wire.InvTypeSyncBlock:
case wire.InvTypeTx:
default:
continue
@ -964,11 +955,11 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
}
// Add it to the request queue.
state.requestQueue = append(state.requestQueue, iv)
state.addInvToRequestQueueWithLock(iv, iv.Type != wire.InvTypeSyncBlock)
continue
}
if iv.Type == wire.InvTypeBlock {
if iv.IsBlockOrSyncBlock() {
// The block is an orphan block that we already have.
// When the existing orphan was processed, it requested
// the missing parent blocks. When this scenario
@ -980,27 +971,23 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
// to signal there are more missing blocks that need to
// be requested.
if sm.dag.IsKnownOrphan(&iv.Hash) {
// Request blocks starting at the latest known
// up to the root of the orphan that just came
// in.
orphanRoot := sm.dag.GetOrphanRoot(&iv.Hash)
locator, err := sm.dag.LatestBlockLocator()
missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(&iv.Hash)
if err != nil {
log.Errorf("PEER: Failed to get block "+
"locator for the latest block: "+
"%s", err)
continue
log.Errorf("Failed to find missing ancestors for block %s: %s",
iv.Hash, err)
return
}
peer.PushGetBlocksMsg(locator, orphanRoot)
sm.addBlocksToRequestQueue(state, missingAncestors, iv.Type != wire.InvTypeSyncBlock)
continue
}
// We already have the final block advertised by this
// inventory message, so force a request for more. This
// should only happen if we're on a really long side
// chain.
if i == lastBlock {
// Request blocks after this one up to the
// should only happen if our DAG and the peer's DAG have
// diverged long time ago.
if i == lastBlock && peer == sm.syncPeer {
// Request blocks after the first block's ancestor that exists
// in the selected path chain, one up to the
// final one the remote peer knows about (zero
// stop hash).
locator := sm.dag.BlockLocatorFromHash(&iv.Hash)
@ -1009,30 +996,58 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
}
}
// Request as much as possible at once. Anything that won't fit into
// the request will be requested on the next inv message.
numRequested := 0
gdmsg := wire.NewMsgGetData()
requestQueue := state.requestQueue
err := sm.sendInvsFromRequestQueue(peer, state)
if err != nil {
log.Errorf("Failed to send invs from queue: %s", err)
}
}
func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData, state *peerSyncState, requestQueue []*wire.InvVect) ([]*wire.InvVect, error) {
var invsNum int
leftSpaceInGdmsg := wire.MaxInvPerMsg - len(gdmsg.InvList)
if len(requestQueue) > leftSpaceInGdmsg {
invsNum = leftSpaceInGdmsg
} else {
invsNum = len(requestQueue)
}
invsToAdd := make([]*wire.InvVect, 0, invsNum)
for len(requestQueue) != 0 {
iv := requestQueue[0]
requestQueue[0] = nil
requestQueue = requestQueue[1:]
switch iv.Type {
case wire.InvTypeBlock:
// Request the block if there is not already a pending
// request.
if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
sm.requestedBlocks[iv.Hash] = struct{}{}
sm.limitHashMap(sm.requestedBlocks, maxRequestedBlocks)
state.requestedBlocks[iv.Hash] = struct{}{}
exists, err := sm.haveInventory(iv)
if err != nil {
return nil, err
}
if !exists {
invsToAdd = append(invsToAdd, iv)
}
}
gdmsg.AddInvVect(iv)
numRequested++
}
addBlockInv := func(iv *wire.InvVect) {
// Request the block if there is not already a pending
// request.
if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
sm.requestedBlocks[iv.Hash] = struct{}{}
sm.limitHashMap(sm.requestedBlocks, maxRequestedBlocks)
state.requestedBlocks[iv.Hash] = struct{}{}
gdmsg.AddInvVect(iv)
}
}
for _, iv := range invsToAdd {
switch iv.Type {
case wire.InvTypeSyncBlock:
delete(state.requestQueueSet, iv.Hash)
addBlockInv(iv)
case wire.InvTypeBlock:
delete(state.relayedInvsRequestQueueSet, iv.Hash)
addBlockInv(iv)
case wire.InvTypeTx:
delete(state.relayedInvsRequestQueueSet, iv.Hash)
// Request the transaction if there is not already a
// pending request.
if _, exists := sm.requestedTxns[daghash.TxID(iv.Hash)]; !exists {
@ -1041,18 +1056,36 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
state.requestedTxns[daghash.TxID(iv.Hash)] = struct{}{}
gdmsg.AddInvVect(iv)
numRequested++
}
}
if numRequested >= wire.MaxInvPerMsg {
if len(requestQueue) >= wire.MaxInvPerMsg {
break
}
}
state.requestQueue = requestQueue
return requestQueue, nil
}
func (sm *SyncManager) sendInvsFromRequestQueue(peer *peerpkg.Peer, state *peerSyncState) error {
state.requestQueueMtx.Lock()
defer state.requestQueueMtx.Unlock()
gdmsg := wire.NewMsgGetData()
newRequestQueue, err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, state.requestQueue)
if err != nil {
return err
}
state.requestQueue = newRequestQueue
if sm.current() {
newRequestQueue, err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, state.relayedInvsRequestQueue)
if err != nil {
return err
}
state.relayedInvsRequestQueue = newRequestQueue
}
if len(gdmsg.InvList) > 0 {
peer.QueueMessage(gdmsg, nil)
}
return nil
}
// limitTxIDMap is a helper function for maps that require a maximum limit by

View File

@ -25,6 +25,7 @@ func mockRemotePeer() error {
UserAgentVersion: "1.0.0", // User agent version to advertise.
DAGParams: &dagconfig.SimNetParams,
SubnetworkID: subnetworkid.SubnetworkIDSupportsAll,
SelectedTip: fakeSelectedTipFn,
}
// Accept connections on the simnet port.
@ -80,6 +81,7 @@ func Example_newOutboundPeer() {
},
},
SubnetworkID: subnetworkid.SubnetworkIDSupportsAll,
SelectedTip: fakeSelectedTipFn,
}
p, err := peer.NewOutboundPeer(peerCfg, "127.0.0.1:18555")
if err != nil {

View File

@ -74,6 +74,8 @@ func invSummary(invList []*wire.InvVect) string {
return fmt.Sprintf("error %s", iv.Hash)
case wire.InvTypeBlock:
return fmt.Sprintf("block %s", iv.Hash)
case wire.InvTypeSyncBlock:
return fmt.Sprintf("sync block %s", iv.Hash)
case wire.InvTypeTx:
return fmt.Sprintf("tx %s", iv.Hash)
}
@ -124,8 +126,8 @@ func sanitizeString(str string, maxLength uint) string {
func messageSummary(msg wire.Message) string {
switch msg := msg.(type) {
case *wire.MsgVersion:
return fmt.Sprintf("agent %s, pver %d, block %d",
msg.UserAgent, msg.ProtocolVersion, msg.LastBlock)
return fmt.Sprintf("agent %s, pver %d, selected tip %s",
msg.UserAgent, msg.ProtocolVersion, msg.SelectedTip)
case *wire.MsgVerAck:
// No summary.

View File

@ -216,12 +216,13 @@ type MessageListeners struct {
// Config is the struct to hold configuration options useful to Peer.
type Config struct {
// NewestBlock specifies a callback which provides the newest block
// details to the peer as needed. This can be nil in which case the
// peer will report a block height of 0, however it is good practice for
// peers to specify this so their currently best known is accurately
// reported.
NewestBlock HashFunc
// SelectedTip specifies a callback which provides the selected tip
// to the peer as needed.
SelectedTip func() *daghash.Hash
// SelectedTip specifies a callback which provides the selected tip
// to the peer as needed.
BlockExists func(*daghash.Hash) (bool, error)
// HostToNetAddress returns the netaddress for the given host. This can be
// nil in which case the host will be parsed as an IP address.
@ -369,8 +370,7 @@ type StatsSnap struct {
Version uint32
UserAgent string
Inbound bool
StartingHeight int32
LastBlock int32
SelectedTip *daghash.Hash
LastPingNonce uint64
LastPingTime time.Time
LastPingMicros int64
@ -452,15 +452,13 @@ type Peer struct {
// These fields keep track of statistics for the peer and are protected
// by the statsMtx mutex.
statsMtx sync.RWMutex
timeOffset int64
timeConnected time.Time
startingHeight int32
lastBlock int32
lastAnnouncedBlock *daghash.Hash
lastPingNonce uint64 // Set to nonce if we have a pending ping.
lastPingTime time.Time // Time we sent last ping.
lastPingMicros int64 // Time for last ping to return.
statsMtx sync.RWMutex
timeOffset int64
timeConnected time.Time
selectedTip *daghash.Hash
lastPingNonce uint64 // Set to nonce if we have a pending ping.
lastPingTime time.Time // Time we sent last ping.
lastPingMicros int64 // Time for last ping to return.
stallControl chan stallControlMsg
outputQueue chan outMsg
@ -481,29 +479,6 @@ func (p *Peer) String() string {
return fmt.Sprintf("%s (%s)", p.addr, logger.DirectionString(p.inbound))
}
// UpdateLastBlockHeight updates the last known block for the peer.
//
// This function is safe for concurrent access.
func (p *Peer) UpdateLastBlockHeight(newHeight int32) {
p.statsMtx.Lock()
log.Tracef("Updating last block height of peer %s from %s to %s",
p.addr, p.lastBlock, newHeight)
p.lastBlock = newHeight
p.statsMtx.Unlock()
}
// UpdateLastAnnouncedBlock updates meta-data about the last block hash this
// peer is known to have announced.
//
// This function is safe for concurrent access.
func (p *Peer) UpdateLastAnnouncedBlock(blkHash *daghash.Hash) {
log.Tracef("Updating last blk for peer %s, %s", p.addr, blkHash)
p.statsMtx.Lock()
p.lastAnnouncedBlock = blkHash
p.statsMtx.Unlock()
}
// AddKnownInventory adds the passed inventory to the cache of known inventory
// for the peer.
//
@ -540,8 +515,7 @@ func (p *Peer) StatsSnapshot() *StatsSnap {
TimeOffset: p.timeOffset,
Version: protocolVersion,
Inbound: p.inbound,
StartingHeight: p.startingHeight,
LastBlock: p.lastBlock,
SelectedTip: p.selectedTip,
LastPingNonce: p.lastPingNonce,
LastPingMicros: p.lastPingMicros,
LastPingTime: p.lastPingTime,
@ -620,17 +594,6 @@ func (p *Peer) SubnetworkID() *subnetworkid.SubnetworkID {
return subnetworkID
}
// LastAnnouncedBlock returns the last announced block of the remote peer.
//
// This function is safe for concurrent access.
func (p *Peer) LastAnnouncedBlock() *daghash.Hash {
p.statsMtx.RLock()
lastAnnouncedBlock := p.lastAnnouncedBlock
p.statsMtx.RUnlock()
return lastAnnouncedBlock
}
// LastPingNonce returns the last ping nonce of the remote peer.
//
// This function is safe for concurrent access.
@ -699,15 +662,26 @@ func (p *Peer) ProtocolVersion() uint32 {
return protocolVersion
}
// LastBlock returns the last block of the peer.
// SelectedTip returns the selected tip of the peer.
//
// This function is safe for concurrent access.
func (p *Peer) LastBlock() int32 {
func (p *Peer) SelectedTip() *daghash.Hash {
p.statsMtx.RLock()
lastBlock := p.lastBlock
selectedTip := p.selectedTip
p.statsMtx.RUnlock()
return lastBlock
return selectedTip
}
// IsSyncCandidate returns whether or not this peer is a sync candidate.
//
// This function is safe for concurrent access.
func (p *Peer) IsSyncCandidate() (bool, error) {
exists, err := p.cfg.BlockExists(p.selectedTip)
if err != nil {
return false, err
}
return !exists, nil
}
// LastSend returns the last send time of the peer.
@ -762,18 +736,6 @@ func (p *Peer) TimeOffset() int64 {
return timeOffset
}
// StartingHeight returns the last known height the peer reported during the
// initial negotiation phase.
//
// This function is safe for concurrent access.
func (p *Peer) StartingHeight() int32 {
p.statsMtx.RLock()
startingHeight := p.startingHeight
p.statsMtx.RUnlock()
return startingHeight
}
// WantsHeaders returns if the peer wants header messages instead of
// inventory vectors for blocks.
//
@ -789,15 +751,7 @@ func (p *Peer) WantsHeaders() bool {
// localVersionMsg creates a version message that can be used to send to the
// remote peer.
func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
var blockNum int32
if p.cfg.NewestBlock != nil {
var err error
_, blockNum, err = p.cfg.NewestBlock()
if err != nil {
return nil, err
}
}
selectedTip := p.cfg.SelectedTip()
theirNA := p.na
// If we are behind a proxy and the connection comes from the proxy then
@ -833,7 +787,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
subnetworkID := p.cfg.SubnetworkID
// Version message.
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum, subnetworkID)
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, selectedTip, subnetworkID)
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)
@ -1057,8 +1011,7 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
p.statsMtx.Lock()
p.lastBlock = msg.LastBlock
p.startingHeight = msg.LastBlock
p.selectedTip = msg.SelectedTip
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()

View File

@ -101,8 +101,6 @@ type peerStats struct {
wantConnected bool
wantVersionKnown bool
wantVerAckReceived bool
wantLastBlock int32
wantStartingHeight int32
wantLastPingTime time.Time
wantLastPingNonce uint64
wantLastPingMicros int64
@ -153,11 +151,6 @@ func testPeer(t *testing.T, p *peer.Peer, s peerStats) {
return
}
if p.LastBlock() != s.wantLastBlock {
t.Errorf("testPeer: wrong LastBlock - got %v, want %v", p.LastBlock(), s.wantLastBlock)
return
}
// Allow for a deviation of 1s, as the second may tick when the message is
// in transit and the protocol doesn't support any further precision.
if p.TimeOffset() != s.wantTimeOffset && p.TimeOffset() != s.wantTimeOffset-1 {
@ -176,11 +169,6 @@ func testPeer(t *testing.T, p *peer.Peer, s peerStats) {
return
}
if p.StartingHeight() != s.wantStartingHeight {
t.Errorf("testPeer: wrong StartingHeight - got %v, want %v", p.StartingHeight(), s.wantStartingHeight)
return
}
if p.Connected() != s.wantConnected {
t.Errorf("testPeer: wrong Connected - got %v, want %v", p.Connected(), s.wantConnected)
return
@ -231,6 +219,7 @@ func TestPeerConnection(t *testing.T) {
ProtocolVersion: wire.ProtocolVersion, // Configure with older version
Services: 0,
SubnetworkID: subnetworkid.SubnetworkIDSupportsAll,
SelectedTip: fakeSelectedTipFn,
}
peer2Cfg := &peer.Config{
Listeners: peer1Cfg.Listeners,
@ -241,6 +230,7 @@ func TestPeerConnection(t *testing.T) {
ProtocolVersion: wire.ProtocolVersion + 1,
Services: wire.SFNodeNetwork,
SubnetworkID: subnetworkid.SubnetworkIDSupportsAll,
SelectedTip: fakeSelectedTipFn,
}
wantStats1 := peerStats{
@ -254,8 +244,8 @@ func TestPeerConnection(t *testing.T) {
wantLastPingNonce: uint64(0),
wantLastPingMicros: int64(0),
wantTimeOffset: int64(0),
wantBytesSent: 187, // 163 version + 24 verack
wantBytesReceived: 187,
wantBytesSent: 215, // 191 version + 24 verack
wantBytesReceived: 215,
}
wantStats2 := peerStats{
wantUserAgent: wire.DefaultUserAgent + "peer:1.0(comment)/",
@ -268,8 +258,8 @@ func TestPeerConnection(t *testing.T) {
wantLastPingNonce: uint64(0),
wantLastPingMicros: int64(0),
wantTimeOffset: int64(0),
wantBytesSent: 187, // 163 version + 24 verack
wantBytesReceived: 187,
wantBytesSent: 215, // 191 version + 24 verack
wantBytesReceived: 215,
}
tests := []struct {
@ -443,6 +433,7 @@ func TestPeerListeners(t *testing.T) {
DAGParams: &dagconfig.MainNetParams,
Services: wire.SFNodeBloom,
SubnetworkID: subnetworkid.SubnetworkIDSupportsAll,
SelectedTip: fakeSelectedTipFn,
}
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333"},
@ -603,10 +594,9 @@ func TestPeerListeners(t *testing.T) {
// TestOutboundPeer tests that the outbound peer works as expected.
func TestOutboundPeer(t *testing.T) {
peerCfg := &peer.Config{
NewestBlock: func() (*daghash.Hash, int32, error) {
return nil, 0, errors.New("newest block not found")
SelectedTip: func() *daghash.Hash {
return &daghash.ZeroHash
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
@ -628,23 +618,7 @@ func TestOutboundPeer(t *testing.T) {
// Test trying to connect twice.
p.AssociateConnection(c)
p.AssociateConnection(c)
disconnected := make(chan struct{})
go func() {
p.WaitForDisconnect()
disconnected <- struct{}{}
}()
select {
case <-disconnected:
close(disconnected)
case <-time.After(time.Second):
t.Fatal("Peer did not automatically disconnect.")
}
if p.Connected() {
t.Fatalf("Should not be connected as NewestBlock produces error.")
}
p.Disconnect()
// Test Queue Inv
fakeBlockHash := &daghash.Hash{0: 0x00, 1: 0x01}
@ -662,17 +636,17 @@ func TestOutboundPeer(t *testing.T) {
<-done
p.Disconnect()
// Test NewestBlock
var newestBlock = func() (*daghash.Hash, int32, error) {
// Test SelectedTip
var selectedTip = func() *daghash.Hash {
hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef"
hash, err := daghash.NewHashFromStr(hashStr)
if err != nil {
return nil, 0, err
t.Fatalf("daghash.NewHashFromStr: %s", err)
}
return hash, 234439, nil
return hash
}
peerCfg.NewestBlock = newestBlock
peerCfg.SelectedTip = selectedTip
r1, w1 := io.Pipe()
c1 := &conn{raddr: "10.0.0.1:8333", Writer: w1, Reader: r1}
p1, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
@ -682,20 +656,6 @@ func TestOutboundPeer(t *testing.T) {
}
p1.AssociateConnection(c1)
// Test update latest block
latestBlockHash, err := daghash.NewHashFromStr("1a63f9cdff1752e6375c8c76e543a71d239e1a2e5c6db1aa679")
if err != nil {
t.Errorf("NewHashFromStr: unexpected err %v\n", err)
return
}
p1.UpdateLastAnnouncedBlock(latestBlockHash)
p1.UpdateLastBlockHeight(234440)
if p1.LastAnnouncedBlock() != latestBlockHash {
t.Errorf("LastAnnouncedBlock: wrong block - got %v, want %v",
p1.LastAnnouncedBlock(), latestBlockHash)
return
}
// Test Queue Inv after connection
p1.QueueInventory(fakeInv)
p1.Disconnect()
@ -755,6 +715,7 @@ func TestUnsupportedVersionPeer(t *testing.T) {
DAGParams: &dagconfig.MainNetParams,
Services: 0,
SubnetworkID: subnetworkid.SubnetworkIDSupportsAll,
SelectedTip: fakeSelectedTipFn,
}
localNA := wire.NewNetAddressIPPort(
@ -811,7 +772,7 @@ func TestUnsupportedVersionPeer(t *testing.T) {
}
// Remote peer writes version message advertising invalid protocol version 0
invalidVersionMsg := wire.NewMsgVersion(remoteNA, localNA, 0, 0, subnetworkid.SubnetworkIDSupportsAll)
invalidVersionMsg := wire.NewMsgVersion(remoteNA, localNA, 0, &daghash.ZeroHash, subnetworkid.SubnetworkIDSupportsAll)
invalidVersionMsg.ProtocolVersion = 0
_, err = wire.WriteMessageN(
@ -853,3 +814,7 @@ func init() {
// Allow self connection when running the tests.
peer.TstAllowSelfConns()
}
func fakeSelectedTipFn() *daghash.Hash {
return &daghash.Hash{0x12, 0x34}
}

View File

@ -190,24 +190,51 @@ func (ps *peerState) Count() int {
len(ps.persistentPeers)
}
// forAllOutboundPeers is a helper function that runs closure on all outbound
// forAllOutboundPeers is a helper function that runs a callback on all outbound
// peers known to peerState.
func (ps *peerState) forAllOutboundPeers(closure func(sp *Peer)) {
// The loop stops and returns false if one of the callback calls returns false.
// Otherwise the function should return true.
func (ps *peerState) forAllOutboundPeers(callback func(sp *Peer) bool) bool {
for _, e := range ps.outboundPeers {
closure(e)
shouldContinue := callback(e)
if !shouldContinue {
return false
}
}
for _, e := range ps.persistentPeers {
closure(e)
shouldContinue := callback(e)
if !shouldContinue {
return false
}
}
return true
}
// forAllPeers is a helper function that runs closure on all peers known to
// peerState.
func (ps *peerState) forAllPeers(closure func(sp *Peer)) {
// forAllInboundPeers is a helper function that runs a callback on all inbound
// peers known to peerState.
// The loop stops and returns false if one of the callback calls returns false.
// Otherwise the function should return true.
func (ps *peerState) forAllInboundPeers(callback func(sp *Peer) bool) bool {
for _, e := range ps.inboundPeers {
closure(e)
shouldContinue := callback(e)
if !shouldContinue {
return false
}
}
ps.forAllOutboundPeers(closure)
return true
}
// forAllPeers is a helper function that runs a callback on all peers known to
// peerState.
// The loop stops and returns false if one of the callback calls returns false.
// Otherwise the function should return true.
func (ps *peerState) forAllPeers(callback func(sp *Peer) bool) bool {
shouldContinue := ps.forAllInboundPeers(callback)
if !shouldContinue {
return false
}
ps.forAllOutboundPeers(callback)
return true
}
// cfHeaderKV is a tuple of a filter header and its associated block hash. The
@ -243,7 +270,6 @@ type Server struct {
Query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
peerHeightsUpdate chan updatePeerHeightsMsg
wg sync.WaitGroup
quit chan struct{}
nat serverutils.NAT
@ -286,11 +312,15 @@ func newServerPeer(s *Server, isPersistent bool) *Peer {
}
}
// newestBlock returns the current best block hash and height using the format
// required by the configuration for the peer package.
func (sp *Peer) newestBlock() (*daghash.Hash, int32, error) {
highestTipHash := sp.server.DAG.HighestTipHash()
return &highestTipHash, sp.server.DAG.Height(), nil //TODO: (Ori) This is probably wrong. Done only for compilation
// selectedTip returns the current selected tip
func (sp *Peer) selectedTip() *daghash.Hash {
return sp.server.DAG.SelectedTipHash()
}
// blockExists determines whether a block with the given hash exists in
// the DAG.
func (sp *Peer) blockExists(hash *daghash.Hash) (bool, error) {
return sp.server.DAG.BlockExists(hash)
}
// addKnownAddresses adds the given addresses to the set of known addresses to
@ -617,6 +647,8 @@ func (sp *Peer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
switch iv.Type {
case wire.InvTypeTx:
err = sp.server.pushTxMsg(sp, (*daghash.TxID)(&iv.Hash), c, waitChan)
case wire.InvTypeSyncBlock:
fallthrough
case wire.InvTypeBlock:
err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan)
case wire.InvTypeFilteredBlock:
@ -675,7 +707,7 @@ func (sp *Peer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
// Generate inventory message.
invMsg := wire.NewMsgInv()
for i := range hashList {
iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i])
iv := wire.NewInvVect(wire.InvTypeSyncBlock, &hashList[i])
invMsg.AddInvVect(iv)
}
@ -712,8 +744,8 @@ func (sp *Peer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
// over with the genesis block if unknown block locators are provided.
//
// This mirrors the behavior in the reference implementation.
chain := sp.server.DAG
headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop)
dag := sp.server.DAG
headers := dag.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop)
// Send found headers to the requesting peer.
blockHeaders := make([]*wire.BlockHeader, len(headers))
@ -1381,34 +1413,6 @@ func (s *Server) pushMerkleBlockMsg(sp *Peer, hash *daghash.Hash,
return nil
}
// handleUpdatePeerHeight updates the heights of all peers who were known to
// announce a block we recently accepted.
func (s *Server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) {
state.forAllPeers(func(sp *Peer) {
// The origin peer should already have the updated height.
if sp.Peer == umsg.originPeer {
return
}
// This is a pointer to the underlying memory which doesn't
// change.
latestBlkHash := sp.LastAnnouncedBlock()
// Skip this peer if it hasn't recently announced any new blocks.
if latestBlkHash == nil {
return
}
// If the peer has recently announced a block, and this block
// matches our newly accepted block, then update their block
// height.
if *latestBlkHash == *umsg.newHash {
sp.UpdateLastBlockHeight(umsg.newHeight)
sp.UpdateLastAnnouncedBlock(nil)
}
})
}
// handleAddPeerMsg deals with adding new peers. It is invoked from the
// peerHandler goroutine.
func (s *Server) handleAddPeerMsg(state *peerState, sp *Peer) bool {
@ -1524,9 +1528,9 @@ func (s *Server) handleBanPeerMsg(state *peerState, sp *Peer) {
// handleRelayInvMsg deals with relaying inventory to peers that are not already
// known to have it. It is invoked from the peerHandler goroutine.
func (s *Server) handleRelayInvMsg(state *peerState, msg relayMsg) {
state.forAllPeers(func(sp *Peer) {
state.forAllPeers(func(sp *Peer) bool {
if !sp.Connected() {
return
return true
}
// If the inventory is a block and the peer prefers headers,
@ -1537,23 +1541,23 @@ func (s *Server) handleRelayInvMsg(state *peerState, msg relayMsg) {
if !ok {
peerLog.Warnf("Underlying data for headers" +
" is not a block header")
return
return true
}
msgHeaders := wire.NewMsgHeaders()
if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil {
peerLog.Errorf("Failed to add block"+
" header: %s", err)
return
return true
}
sp.QueueMessage(msgHeaders, nil)
return
return true
}
if msg.invVect.Type == wire.InvTypeTx {
// Don't relay the transaction to the peer when it has
// transaction relaying disabled.
if sp.relayTxDisabled() {
return
return true
}
txD, ok := msg.data.(*mempool.TxDesc)
@ -1561,28 +1565,28 @@ func (s *Server) handleRelayInvMsg(state *peerState, msg relayMsg) {
peerLog.Warnf("Underlying data for tx inv "+
"relay is not a *mempool.TxDesc: %T",
msg.data)
return
return true
}
// Don't relay the transaction if the transaction fee-per-kb
// is less than the peer's feefilter.
feeFilter := uint64(atomic.LoadInt64(&sp.FeeFilterInt))
if feeFilter > 0 && txD.FeePerKB < feeFilter {
return
return true
}
// Don't relay the transaction if there is a bloom
// filter loaded and the transaction doesn't match it.
if sp.filter.IsLoaded() {
if !sp.filter.MatchTxAndUpdate(txD.Tx) {
return
return true
}
}
// Don't relay the transaction if the peer's subnetwork is
// incompatible with it.
if !txD.Tx.MsgTx().IsSubnetworkCompatible(sp.Peer.SubnetworkID()) {
return
return true
}
}
@ -1590,24 +1594,26 @@ func (s *Server) handleRelayInvMsg(state *peerState, msg relayMsg) {
// It will be ignored if the peer is already known to
// have the inventory.
sp.QueueInventory(msg.invVect)
return true
})
}
// handleBroadcastMsg deals with broadcasting messages to peers. It is invoked
// from the peerHandler goroutine.
func (s *Server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) {
state.forAllPeers(func(sp *Peer) {
state.forAllPeers(func(sp *Peer) bool {
if !sp.Connected() {
return
return true
}
for _, ep := range bmsg.excludePeers {
if sp == ep {
return
return true
}
}
sp.QueueMessage(bmsg.message, nil)
return true
})
}
@ -1615,6 +1621,10 @@ type getConnCountMsg struct {
reply chan int32
}
type getShouldMineOnGenesisMsg struct {
reply chan bool
}
//GetPeersMsg is the message type which is used by the rpc server to get the peers list from the p2p server
type GetPeersMsg struct {
Reply chan []*Peer
@ -1655,20 +1665,36 @@ func (s *Server) handleQuery(state *peerState, querymsg interface{}) {
switch msg := querymsg.(type) {
case getConnCountMsg:
nconnected := int32(0)
state.forAllPeers(func(sp *Peer) {
state.forAllPeers(func(sp *Peer) bool {
if sp.Connected() {
nconnected++
}
return true
})
msg.reply <- nconnected
case getShouldMineOnGenesisMsg:
shouldMineOnGenesis := true
if state.Count() != 0 {
shouldMineOnGenesis = state.forAllPeers(func(sp *Peer) bool {
if !sp.SelectedTip().IsEqual(s.DAGParams.GenesisHash) {
return false
}
return true
})
} else {
shouldMineOnGenesis = false
}
msg.reply <- shouldMineOnGenesis
case GetPeersMsg:
peers := make([]*Peer, 0, state.Count())
state.forAllPeers(func(sp *Peer) {
state.forAllPeers(func(sp *Peer) bool {
if !sp.Connected() {
return
return true
}
peers = append(peers, sp)
return true
})
msg.Reply <- peers
@ -1816,7 +1842,8 @@ func newPeerConfig(sp *Peer) *peer.Config {
// other implementations' alert messages, we will not relay theirs.
OnAlert: nil,
},
NewestBlock: sp.newestBlock,
SelectedTip: sp.selectedTip,
BlockExists: sp.blockExists,
HostToNetAddress: sp.server.addrManager.HostToNetAddress,
Proxy: config.MainConfig().Proxy,
UserAgentName: userAgentName,
@ -1937,10 +1964,6 @@ out:
case p := <-s.donePeers:
s.handleDonePeerMsg(state, p)
// Block accepted in mainchain or orphan, update peer height.
case umsg := <-s.peerHeightsUpdate:
s.handleUpdatePeerHeights(state, umsg)
// Peer to ban.
case p := <-s.banPeers:
s.handleBanPeerMsg(state, p)
@ -1959,9 +1982,10 @@ out:
case <-s.quit:
// Disconnect all peers on server shutdown.
state.forAllPeers(func(sp *Peer) {
state.forAllPeers(func(sp *Peer) bool {
srvrLog.Tracef("Shutdown peer %s", sp)
sp.Disconnect()
return true
})
break out
}
@ -1978,7 +2002,6 @@ cleanup:
select {
case <-s.newPeers:
case <-s.donePeers:
case <-s.peerHeightsUpdate:
case <-s.relayInv:
case <-s.broadcast:
case <-s.Query:
@ -2024,6 +2047,17 @@ func (s *Server) ConnectedCount() int32 {
return <-replyChan
}
// ShouldMineOnGenesis checks if the node is connected to at least one
// peer, and at least one of its peers knows of any blocks that were mined
// on top of the genesis block.
func (s *Server) ShouldMineOnGenesis() bool {
replyChan := make(chan bool)
s.Query <- getShouldMineOnGenesisMsg{reply: replyChan}
return <-replyChan
}
// OutboundGroupCount returns the number of peers connected to the given
// outbound group key.
func (s *Server) OutboundGroupCount(key string) int {
@ -2051,18 +2085,6 @@ func (s *Server) NetTotals() (uint64, uint64) {
atomic.LoadUint64(&s.bytesSent)
}
// UpdatePeerHeights updates the heights of all peers who have have announced
// the latest connected main chain block, or a recognized orphan. These height
// updates allow us to dynamically refresh peer heights, ensuring sync peer
// selection has access to the latest block heights for each peer.
func (s *Server) UpdatePeerHeights(latestBlkHash *daghash.Hash, latestHeight int32, updateSource *peer.Peer) {
s.peerHeightsUpdate <- updatePeerHeightsMsg{
newHash: latestBlkHash,
newHeight: latestHeight,
originPeer: updateSource,
}
}
// rebroadcastHandler keeps track of user submitted inventories that we have
// sent out but have not yet made it into a block. We periodically rebroadcast
// them in case our peers restarted or otherwise lost track of them.
@ -2350,7 +2372,6 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
broadcast: make(chan broadcastMsg, config.MainConfig().MaxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
peerHeightsUpdate: make(chan updatePeerHeightsMsg),
nat: nat,
db: db,
TimeSource: blockdag.NewMedianTime(),

View File

@ -2450,25 +2450,24 @@ func handleGetPeerInfo(s *Server, cmd interface{}, closeChan <-chan struct{}) (i
for _, p := range peers {
statsSnap := p.ToPeer().StatsSnapshot()
info := &btcjson.GetPeerInfoResult{
ID: statsSnap.ID,
Addr: statsSnap.Addr,
Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)),
RelayTxes: !p.IsTxRelayDisabled(),
LastSend: statsSnap.LastSend.Unix(),
LastRecv: statsSnap.LastRecv.Unix(),
BytesSent: statsSnap.BytesSent,
BytesRecv: statsSnap.BytesRecv,
ConnTime: statsSnap.ConnTime.Unix(),
PingTime: float64(statsSnap.LastPingMicros),
TimeOffset: statsSnap.TimeOffset,
Version: statsSnap.Version,
SubVer: statsSnap.UserAgent,
Inbound: statsSnap.Inbound,
StartingHeight: statsSnap.StartingHeight,
CurrentHeight: statsSnap.LastBlock,
BanScore: int32(p.BanScore()),
FeeFilter: p.FeeFilter(),
SyncNode: statsSnap.ID == syncPeerID,
ID: statsSnap.ID,
Addr: statsSnap.Addr,
Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)),
RelayTxes: !p.IsTxRelayDisabled(),
LastSend: statsSnap.LastSend.Unix(),
LastRecv: statsSnap.LastRecv.Unix(),
BytesSent: statsSnap.BytesSent,
BytesRecv: statsSnap.BytesRecv,
ConnTime: statsSnap.ConnTime.Unix(),
PingTime: float64(statsSnap.LastPingMicros),
TimeOffset: statsSnap.TimeOffset,
Version: statsSnap.Version,
SubVer: statsSnap.UserAgent,
Inbound: statsSnap.Inbound,
SelectedTip: statsSnap.SelectedTip.String(),
BanScore: int32(p.BanScore()),
FeeFilter: p.FeeFilter(),
SyncNode: statsSnap.ID == syncPeerID,
}
if p.ToPeer().LastPingNonce() != 0 {
wait := float64(time.Since(statsSnap.LastPingTime).Nanoseconds())

View File

@ -450,26 +450,25 @@ var helpDescsEnUS = map[string]string{
"getNetTotalsResult-timeMillis": "Number of milliseconds since 1 Jan 1970 GMT",
// GetPeerInfoResult help.
"getPeerInfoResult-id": "A unique node ID",
"getPeerInfoResult-addr": "The ip address and port of the peer",
"getPeerInfoResult-services": "Services bitmask which represents the services supported by the peer",
"getPeerInfoResult-relayTxes": "Peer has requested transactions be relayed to it",
"getPeerInfoResult-lastSend": "Time the last message was received in seconds since 1 Jan 1970 GMT",
"getPeerInfoResult-lastRecv": "Time the last message was sent in seconds since 1 Jan 1970 GMT",
"getPeerInfoResult-bytesSent": "Total bytes sent",
"getPeerInfoResult-bytesRecv": "Total bytes received",
"getPeerInfoResult-connTime": "Time the connection was made in seconds since 1 Jan 1970 GMT",
"getPeerInfoResult-timeOffset": "The time offset of the peer",
"getPeerInfoResult-pingTime": "Number of microseconds the last ping took",
"getPeerInfoResult-pingWait": "Number of microseconds a queued ping has been waiting for a response",
"getPeerInfoResult-version": "The protocol version of the peer",
"getPeerInfoResult-subVer": "The user agent of the peer",
"getPeerInfoResult-inbound": "Whether or not the peer is an inbound connection",
"getPeerInfoResult-startingHeight": "The latest block height the peer knew about when the connection was established",
"getPeerInfoResult-currentHeight": "The current height of the peer",
"getPeerInfoResult-banScore": "The ban score",
"getPeerInfoResult-feeFilter": "The requested minimum fee a transaction must have to be announced to the peer",
"getPeerInfoResult-syncNode": "Whether or not the peer is the sync peer",
"getPeerInfoResult-id": "A unique node ID",
"getPeerInfoResult-addr": "The ip address and port of the peer",
"getPeerInfoResult-services": "Services bitmask which represents the services supported by the peer",
"getPeerInfoResult-relayTxes": "Peer has requested transactions be relayed to it",
"getPeerInfoResult-lastSend": "Time the last message was received in seconds since 1 Jan 1970 GMT",
"getPeerInfoResult-lastRecv": "Time the last message was sent in seconds since 1 Jan 1970 GMT",
"getPeerInfoResult-bytesSent": "Total bytes sent",
"getPeerInfoResult-bytesRecv": "Total bytes received",
"getPeerInfoResult-connTime": "Time the connection was made in seconds since 1 Jan 1970 GMT",
"getPeerInfoResult-timeOffset": "The time offset of the peer",
"getPeerInfoResult-pingTime": "Number of microseconds the last ping took",
"getPeerInfoResult-pingWait": "Number of microseconds a queued ping has been waiting for a response",
"getPeerInfoResult-version": "The protocol version of the peer",
"getPeerInfoResult-subVer": "The user agent of the peer",
"getPeerInfoResult-inbound": "Whether or not the peer is an inbound connection",
"getPeerInfoResult-selectedTip": "The selected tip of the peer",
"getPeerInfoResult-banScore": "The ban score",
"getPeerInfoResult-feeFilter": "The requested minimum fee a transaction must have to be announced to the peer",
"getPeerInfoResult-syncNode": "Whether or not the peer is the sync peer",
// GetPeerInfoCmd help.
"getPeerInfo--synopsis": "Returns data about each connected network peer as an array of json objects.",

View File

@ -114,6 +114,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
MiningAddrs: cfg.MiningAddrs,
ProcessBlock: s.p2pServer.SyncManager.ProcessBlock,
ConnectedCount: s.p2pServer.ConnectedCount,
ShouldMineOnGenesis: s.p2pServer.ShouldMineOnGenesis,
IsCurrent: s.p2pServer.SyncManager.IsCurrent,
})

View File

@ -29,6 +29,7 @@ const (
InvTypeTx InvType = 1
InvTypeBlock InvType = 2
InvTypeFilteredBlock InvType = 3
InvTypeSyncBlock InvType = 4
)
// Map of service flags back to their constant names for pretty printing.
@ -37,6 +38,7 @@ var ivStrings = map[InvType]string{
InvTypeTx: "MSG_TX",
InvTypeBlock: "MSG_BLOCK",
InvTypeFilteredBlock: "MSG_FILTERED_BLOCK",
InvTypeSyncBlock: "MSG_SYNC_BLOCK",
}
// String returns the InvType in human-readable form.
@ -78,3 +80,8 @@ func writeInvVect(w io.Writer, pver uint32, iv *InvVect) error {
func (iv *InvVect) String() string {
return fmt.Sprintf("{%s:%s}", iv.Type, iv.Hash)
}
// IsBlockOrSyncBlock returns true if the inv type is InvTypeBlock or InvTypeSyncBlock
func (iv *InvVect) IsBlockOrSyncBlock() bool {
return iv.Type == InvTypeBlock || iv.Type == InvTypeSyncBlock
}

View File

@ -47,7 +47,7 @@ func TestMessage(t *testing.T) {
addrMe := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8333}
me := NewNetAddress(addrMe, SFNodeNetwork)
me.Timestamp = time.Time{} // Version message has zero value timestamp.
msgVersion := NewMsgVersion(me, you, 123123, 0, subnetworkid.SubnetworkIDSupportsAll)
msgVersion := NewMsgVersion(me, you, 123123, &daghash.ZeroHash, subnetworkid.SubnetworkIDSupportsAll)
msgVerack := NewMsgVerAck()
msgGetAddr := NewMsgGetAddr(nil)
@ -87,7 +87,7 @@ func TestMessage(t *testing.T) {
btcnet BitcoinNet // Network to use for wire encoding
bytes int // Expected num bytes read/written
}{
{msgVersion, msgVersion, pver, MainNet, 145},
{msgVersion, msgVersion, pver, MainNet, 173},
{msgVerack, msgVerack, pver, MainNet, 24},
{msgGetAddr, msgGetAddr, pver, MainNet, 25},
{msgAddr, msgAddr, pver, MainNet, 26},

View File

@ -11,6 +11,7 @@ import (
"strings"
"time"
"github.com/daglabs/btcd/dagconfig/daghash"
"github.com/daglabs/btcd/util/subnetworkid"
)
@ -52,8 +53,8 @@ type MsgVersion struct {
// on the wire. This has a max length of MaxUserAgentLen.
UserAgent string
// Last block seen by the generator of the version message.
LastBlock int32
// The selected tip of the generator of the version message.
SelectedTip *daghash.Hash
// Don't announce transactions to peer.
DisableRelayTx bool
@ -104,55 +105,36 @@ func (msg *MsgVersion) BtcDecode(r io.Reader, pver uint32) error {
return err
}
// Protocol versions >= 106 added a from address, nonce, and user agent
// field and they are only considered present if there are bytes
// remaining in the message.
if buf.Len() > 0 {
err = readNetAddress(buf, pver, &msg.AddrMe, false)
if err != nil {
return err
}
err = readNetAddress(buf, pver, &msg.AddrMe, false)
if err != nil {
return err
}
if buf.Len() > 0 {
err = readElement(buf, &msg.Nonce)
if err != nil {
return err
}
err = readElement(buf, &msg.Nonce)
if err != nil {
return err
}
if buf.Len() > 0 {
userAgent, err := ReadVarString(buf, pver)
if err != nil {
return err
}
err = validateUserAgent(userAgent)
if err != nil {
return err
}
msg.UserAgent = userAgent
userAgent, err := ReadVarString(buf, pver)
if err != nil {
return err
}
err = validateUserAgent(userAgent)
if err != nil {
return err
}
msg.UserAgent = userAgent
msg.SelectedTip = &daghash.Hash{}
err = readElement(buf, msg.SelectedTip)
if err != nil {
return err
}
// Protocol versions >= 209 added a last known block field. It is only
// considered present if there are bytes remaining in the message.
if buf.Len() > 0 {
err = readElement(buf, &msg.LastBlock)
if err != nil {
return err
}
}
// There was no relay transactions field before BIP0037Version, but
// the default behavior prior to the addition of the field was to always
// relay transactions.
if buf.Len() > 0 {
// It's safe to ignore the error here since the buffer has at
// least one byte and that byte will result in a boolean value
// regardless of its value. Also, the wire encoding for the
// field is true when transactions should be relayed, so reverse
// it for the DisableRelayTx field.
var relayTx bool
readElement(r, &relayTx)
msg.DisableRelayTx = !relayTx
var relayTx bool
err = readElement(r, &relayTx)
if err != nil {
return err
}
msg.DisableRelayTx = !relayTx
return nil
}
@ -196,7 +178,7 @@ func (msg *MsgVersion) BtcEncode(w io.Writer, pver uint32) error {
return err
}
err = writeElement(w, msg.LastBlock)
err = writeElement(w, msg.SelectedTip)
if err != nil {
return err
}
@ -233,7 +215,7 @@ func (msg *MsgVersion) MaxPayloadLength(pver uint32) uint32 {
// Message interface using the passed parameters and defaults for the remaining
// fields.
func NewMsgVersion(me *NetAddress, you *NetAddress, nonce uint64,
lastBlock int32, subnetworkID *subnetworkid.SubnetworkID) *MsgVersion {
selectedTip *daghash.Hash, subnetworkID *subnetworkid.SubnetworkID) *MsgVersion {
// Limit the timestamp to one second precision since the protocol
// doesn't support better.
@ -245,7 +227,7 @@ func NewMsgVersion(me *NetAddress, you *NetAddress, nonce uint64,
AddrMe: *me,
Nonce: nonce,
UserAgent: DefaultUserAgent,
LastBlock: lastBlock,
SelectedTip: selectedTip,
DisableRelayTx: false,
SubnetworkID: *subnetworkID,
}

View File

@ -13,6 +13,7 @@ import (
"testing"
"time"
"github.com/daglabs/btcd/dagconfig/daghash"
"github.com/daglabs/btcd/util/random"
"github.com/daglabs/btcd/util/subnetworkid"
"github.com/davecgh/go-spew/spew"
@ -23,7 +24,7 @@ func TestVersion(t *testing.T) {
pver := ProtocolVersion
// Create version message data.
lastBlock := int32(234234)
selectedTip := &daghash.Hash{12, 34}
tcpAddrMe := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8333}
me := NewNetAddress(tcpAddrMe, SFNodeNetwork)
tcpAddrYou := &net.TCPAddr{IP: net.ParseIP("192.168.0.1"), Port: 8333}
@ -34,7 +35,7 @@ func TestVersion(t *testing.T) {
}
// Ensure we get the correct data back out.
msg := NewMsgVersion(me, you, nonce, lastBlock, subnetworkid.SubnetworkIDSupportsAll)
msg := NewMsgVersion(me, you, nonce, selectedTip, subnetworkid.SubnetworkIDSupportsAll)
if msg.ProtocolVersion != int32(pver) {
t.Errorf("NewMsgVersion: wrong protocol version - got %v, want %v",
msg.ProtocolVersion, pver)
@ -55,9 +56,9 @@ func TestVersion(t *testing.T) {
t.Errorf("NewMsgVersion: wrong user agent - got %v, want %v",
msg.UserAgent, DefaultUserAgent)
}
if msg.LastBlock != lastBlock {
t.Errorf("NewMsgVersion: wrong last block - got %v, want %v",
msg.LastBlock, lastBlock)
if !msg.SelectedTip.IsEqual(selectedTip) {
t.Errorf("NewMsgVersion: wrong selected tip - got %s, want %s",
msg.SelectedTip, selectedTip)
}
if msg.DisableRelayTx {
t.Errorf("NewMsgVersion: disable relay tx is not false by "+
@ -131,13 +132,12 @@ func TestVersion(t *testing.T) {
// TestVersionWire tests the MsgVersion wire encode and decode for various
// protocol versions.
func TestVersionWire(t *testing.T) {
// verRelayTxFalse and verRelayTxFalseEncoded is a version message as of
// BIP0037Version with the transaction relay disabled.
baseVersionBIP0037Copy := *baseVersionBIP0037
verRelayTxFalse := &baseVersionBIP0037Copy
// verRelayTxFalse and verRelayTxFalseEncoded is a version message with the transaction relay disabled.
baseVersionWithRelayTxCopy := *baseVersionWithRelayTx
verRelayTxFalse := &baseVersionWithRelayTxCopy
verRelayTxFalse.DisableRelayTx = true
verRelayTxFalseEncoded := make([]byte, len(baseVersionBIP0037Encoded))
copy(verRelayTxFalseEncoded, baseVersionBIP0037Encoded)
verRelayTxFalseEncoded := make([]byte, len(baseVersionWithRelayTxEncoded))
copy(verRelayTxFalseEncoded, baseVersionWithRelayTxEncoded)
verRelayTxFalseEncoded[len(verRelayTxFalseEncoded)-1] = 0
tests := []struct {
@ -148,9 +148,15 @@ func TestVersionWire(t *testing.T) {
}{
// Latest protocol version.
{
baseVersionBIP0037,
baseVersionBIP0037,
baseVersionBIP0037Encoded,
baseVersionWithRelayTx,
baseVersionWithRelayTx,
baseVersionWithRelayTxEncoded,
ProtocolVersion,
},
{
verRelayTxFalse,
verRelayTxFalse,
verRelayTxFalseEncoded,
ProtocolVersion,
},
}
@ -299,107 +305,6 @@ func TestVersionWireErrors(t *testing.T) {
}
}
// TestVersionOptionalFields performs tests to ensure that an encoded version
// messages that omit optional fields are handled correctly.
func TestVersionOptionalFields(t *testing.T) {
// onlyRequiredVersion is a version message that only contains the
// required versions and all other values set to their default values.
onlyRequiredVersion := MsgVersion{
ProtocolVersion: 60002,
Services: SFNodeNetwork,
Timestamp: time.Unix(0x495fab29, 0), // 2009-01-03 12:15:05 -0600 CST)
AddrYou: NetAddress{
Timestamp: time.Time{}, // Zero value -- no timestamp in version
Services: SFNodeNetwork,
IP: net.ParseIP("192.168.0.1"),
Port: 8333,
},
}
onlyRequiredVersionEncoded := make([]byte, len(baseVersionEncoded)-55)
copy(onlyRequiredVersionEncoded, baseVersionEncoded)
// addrMeVersion is a version message that contains all fields through
// the AddrMe field.
addrMeVersion := onlyRequiredVersion
addrMeVersion.AddrMe = NetAddress{
Timestamp: time.Time{}, // Zero value -- no timestamp in version
Services: SFNodeNetwork,
IP: net.ParseIP("127.0.0.1"),
Port: 8333,
}
addrMeVersionEncoded := make([]byte, len(baseVersionEncoded)-29)
copy(addrMeVersionEncoded, baseVersionEncoded)
// nonceVersion is a version message that contains all fields through
// the Nonce field.
nonceVersion := addrMeVersion
nonceVersion.Nonce = 123123 // 0x1e0f3
nonceVersionEncoded := make([]byte, len(baseVersionEncoded)-21)
copy(nonceVersionEncoded, baseVersionEncoded)
// uaVersion is a version message that contains all fields through
// the UserAgent field.
uaVersion := nonceVersion
uaVersion.UserAgent = "/btcdtest:0.0.1/"
uaVersionEncoded := make([]byte, len(baseVersionEncoded)-4)
copy(uaVersionEncoded, baseVersionEncoded)
// lastBlockVersion is a version message that contains all fields
// through the LastBlock field.
lastBlockVersion := uaVersion
lastBlockVersion.LastBlock = 234234 // 0x392fa
lastBlockVersionEncoded := make([]byte, len(baseVersionEncoded))
copy(lastBlockVersionEncoded, baseVersionEncoded)
tests := []struct {
msg *MsgVersion // Expected message
buf []byte // Wire encoding
pver uint32 // Protocol version for wire encoding
}{
{
&onlyRequiredVersion,
onlyRequiredVersionEncoded,
ProtocolVersion,
},
{
&addrMeVersion,
addrMeVersionEncoded,
ProtocolVersion,
},
{
&nonceVersion,
nonceVersionEncoded,
ProtocolVersion,
},
{
&uaVersion,
uaVersionEncoded,
ProtocolVersion,
},
{
&lastBlockVersion,
lastBlockVersionEncoded,
ProtocolVersion,
},
}
for i, test := range tests {
// Decode the message from wire format.
var msg MsgVersion
rbuf := bytes.NewBuffer(test.buf)
err := msg.BtcDecode(rbuf, test.pver)
if err != nil {
t.Errorf("BtcDecode #%d error %v", i, err)
continue
}
if !reflect.DeepEqual(&msg, test.msg) {
t.Errorf("BtcDecode #%d\n got: %s want: %s", i,
spew.Sdump(msg), spew.Sdump(test.msg))
continue
}
}
}
// baseVersion is used in the various tests as a baseline MsgVersion.
var baseVersion = &MsgVersion{
ProtocolVersion: 60002,
@ -417,9 +322,9 @@ var baseVersion = &MsgVersion{
IP: net.ParseIP("127.0.0.1"),
Port: 8333,
},
Nonce: 123123, // 0x1e0f3
UserAgent: "/btcdtest:0.0.1/",
LastBlock: 234234, // 0x392fa
Nonce: 123123, // 0x1e0f3
UserAgent: "/btcdtest:0.0.1/",
SelectedTip: &daghash.Hash{0x12, 0x34},
}
// baseVersionEncoded is the wire encoded bytes for baseVersion using protocol
@ -445,12 +350,14 @@ var baseVersionEncoded = []byte{
0x10, // Varint for user agent length
0x2f, 0x62, 0x74, 0x63, 0x64, 0x74, 0x65, 0x73,
0x74, 0x3a, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x2f, // User agent
0xfa, 0x92, 0x03, 0x00, // Last block
0x12, 0x34, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Selected Tip
}
// baseVersionBIP0037 is used in the various tests as a baseline MsgVersion for
// BIP0037.
var baseVersionBIP0037 = &MsgVersion{
// baseVersionWithRelayTx is used in the various tests as a baseline MsgVersion
var baseVersionWithRelayTx = &MsgVersion{
ProtocolVersion: 70001,
Services: SFNodeNetwork,
Timestamp: time.Unix(0x495fab29, 0), // 2009-01-03 12:15:05 -0600 CST)
@ -466,14 +373,14 @@ var baseVersionBIP0037 = &MsgVersion{
IP: net.ParseIP("127.0.0.1"),
Port: 8333,
},
Nonce: 123123, // 0x1e0f3
UserAgent: "/btcdtest:0.0.1/",
LastBlock: 234234, // 0x392fa
Nonce: 123123, // 0x1e0f3
UserAgent: "/btcdtest:0.0.1/",
SelectedTip: &daghash.Hash{0x12, 0x34},
}
// baseVersionBIP0037Encoded is the wire encoded bytes for baseVersionBIP0037
// using protocol version BIP0037Version and is used in the various tests.
var baseVersionBIP0037Encoded = []byte{
// baseVersionWithRelayTxEncoded is the wire encoded bytes for
// baseVersionWithRelayTx and is used in the various tests.
var baseVersionWithRelayTxEncoded = []byte{
0x71, 0x11, 0x01, 0x00, // Protocol version 70001
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // SFNodeNetwork
0x29, 0xab, 0x5f, 0x49, 0x00, 0x00, 0x00, 0x00, // 64-bit Timestamp
@ -490,10 +397,13 @@ var baseVersionBIP0037Encoded = []byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x01, // IP 127.0.0.1
0x20, 0x8d, // Port 8333 in big-endian
0xf3, 0xe0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, // Fake Nonce. TODO: (Ori) Replace to a real nonce
0xf3, 0xe0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, // Nonce
0x10, // Varint for user agent length
0x2f, 0x62, 0x74, 0x63, 0x64, 0x74, 0x65, 0x73,
0x74, 0x3a, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x2f, // User agent
0xfa, 0x92, 0x03, 0x00, // Last block
0x12, 0x34, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Selected Tip
0x01, // Relay tx
}