diff --git a/blockdag/blockheap.go b/blockdag/blockheap.go index 4917d6390..f66919329 100644 --- a/blockdag/blockheap.go +++ b/blockdag/blockheap.go @@ -24,12 +24,26 @@ func (h *baseHeap) Pop() interface{} { return popped } -func (h baseHeap) Less(i, j int) bool { - if h[i].height == h[j].height { - return daghash.HashToBig(&h[i].hash).Cmp(daghash.HashToBig(&h[j].hash)) > 0 +// upHeap extends baseHeap to include Less operation that traverses from bottom to top +type upHeap struct{ baseHeap } + +func (h upHeap) Less(i, j int) bool { + if h.baseHeap[i].height == h.baseHeap[j].height { + return daghash.HashToBig(&h.baseHeap[i].hash).Cmp(daghash.HashToBig(&h.baseHeap[j].hash)) < 0 } - return h[i].height > h[j].height + return h.baseHeap[i].height < h.baseHeap[j].height +} + +// downHeap extends baseHeap to include Less operation that traverses from top to bottom +type downHeap struct{ baseHeap } + +func (h downHeap) Less(i, j int) bool { + if h.baseHeap[i].height == h.baseHeap[j].height { + return daghash.HashToBig(&h.baseHeap[i].hash).Cmp(daghash.HashToBig(&h.baseHeap[j].hash)) > 0 + } + + return h.baseHeap[i].height > h.baseHeap[j].height } // BlockHeap represents a mutable heap of Blocks, sorted by their height @@ -37,9 +51,16 @@ type BlockHeap struct { impl heap.Interface } -// NewHeap initializes and returns a new BlockHeap -func NewHeap() BlockHeap { - h := BlockHeap{impl: &baseHeap{}} +// NewDownHeap initializes and returns a new BlockHeap +func NewDownHeap() BlockHeap { + h := BlockHeap{impl: &downHeap{}} + heap.Init(h.impl) + return h +} + +// NewUpHeap initializes and returns a new BlockHeap +func NewUpHeap() BlockHeap { + h := BlockHeap{impl: &upHeap{}} heap.Init(h.impl) return h } @@ -54,6 +75,13 @@ func (bh BlockHeap) Push(block *blockNode) { heap.Push(bh.impl, block) } +// pushSet pushes a blockset to the heap. +func (bh BlockHeap) pushSet(bs blockSet) { + for _, block := range bs { + heap.Push(bh.impl, block) + } +} + // Len returns the length of this heap func (bh BlockHeap) Len() int { return bh.impl.Len() diff --git a/blockdag/blockheap_test.go b/blockdag/blockheap_test.go index 0f1c77d13..cd10e4f88 100644 --- a/blockdag/blockheap_test.go +++ b/blockdag/blockheap_test.go @@ -19,72 +19,102 @@ func TestBlockHeap(t *testing.T) { block0smallHash.hash = daghash.Hash{} tests := []struct { - name string - toPush []*blockNode - expectedLength int - expectedPop *blockNode + name string + toPush []*blockNode + expectedLength int + expectedPopUp *blockNode + expectedPopDown *blockNode }{ { - name: "empty heap must have length 0", - toPush: []*blockNode{}, - expectedLength: 0, - expectedPop: nil, + name: "empty heap must have length 0", + toPush: []*blockNode{}, + expectedLength: 0, + expectedPopDown: nil, + expectedPopUp: nil, }, { - name: "heap with one push must have length 1", - toPush: []*blockNode{block0}, - expectedLength: 1, - expectedPop: nil, + name: "heap with one push must have length 1", + toPush: []*blockNode{block0}, + expectedLength: 1, + expectedPopDown: nil, + expectedPopUp: nil, }, { - name: "heap with one push and one pop", - toPush: []*blockNode{block0}, - expectedLength: 0, - expectedPop: block0, + name: "heap with one push and one pop", + toPush: []*blockNode{block0}, + expectedLength: 0, + expectedPopDown: block0, + expectedPopUp: block0, }, { - name: "push two blocks with different heights, heap shouldn't have to rebalance", - toPush: []*blockNode{block100000, block0}, - expectedLength: 1, - expectedPop: block100000, + name: "push two blocks with different heights, heap shouldn't have to rebalance " + + "for down direction, but will have to rebalance for up direction", + toPush: []*blockNode{block100000, block0}, + expectedLength: 1, + expectedPopDown: block100000, + expectedPopUp: block0, }, { - name: "push two blocks with different heights, heap must rebalance", - toPush: []*blockNode{block0, block100000}, - expectedLength: 1, - expectedPop: block100000, + name: "push two blocks with different heights, heap shouldn't have to rebalance " + + "for up direction, but will have to rebalance for down direction", + toPush: []*blockNode{block0, block100000}, + expectedLength: 1, + expectedPopDown: block100000, + expectedPopUp: block0, }, { - name: "push two blocks with equal heights but different hashes, heap shouldn't have to rebalance", - toPush: []*blockNode{block0, block0smallHash}, - expectedLength: 1, - expectedPop: block0, + name: "push two blocks with equal heights but different hashes, heap shouldn't have to rebalance " + + "for down direction, but will have to rebalance for up direction", + toPush: []*blockNode{block0, block0smallHash}, + expectedLength: 1, + expectedPopDown: block0, + expectedPopUp: block0smallHash, }, { - name: "push two blocks with equal heights but different hashes, heap must rebalance", - toPush: []*blockNode{block0smallHash, block0}, - expectedLength: 1, - expectedPop: block0, + name: "push two blocks with equal heights but different hashes, heap shouldn't have to rebalance " + + "for up direction, but will have to rebalance for down direction", + toPush: []*blockNode{block0smallHash, block0}, + expectedLength: 1, + expectedPopDown: block0, + expectedPopUp: block0smallHash, }, } for _, test := range tests { - heap := NewHeap() + dHeap := NewDownHeap() for _, block := range test.toPush { - heap.Push(block) + dHeap.Push(block) } var poppedBlock *blockNode - if test.expectedPop != nil { - poppedBlock = heap.pop() + if test.expectedPopDown != nil { + poppedBlock = dHeap.pop() } - if heap.Len() != test.expectedLength { - t.Errorf("unexpected heap length in test \"%s\". "+ - "Expected: %v, got: %v", test.name, test.expectedLength, heap.Len()) + if dHeap.Len() != test.expectedLength { + t.Errorf("unexpected down heap length in test \"%s\". "+ + "Expected: %v, got: %v", test.name, test.expectedLength, dHeap.Len()) } - if poppedBlock != test.expectedPop { - t.Errorf("unexpected popped block in test \"%s\". "+ - "Expected: %v, got: %v", test.name, test.expectedPop, poppedBlock) + if poppedBlock != test.expectedPopDown { + t.Errorf("unexpected popped block for down heap in test \"%s\". "+ + "Expected: %v, got: %v", test.name, test.expectedPopDown, poppedBlock) + } + + uHeap := NewUpHeap() + for _, block := range test.toPush { + uHeap.Push(block) + } + + poppedBlock = nil + if test.expectedPopUp != nil { + poppedBlock = uHeap.pop() + } + if uHeap.Len() != test.expectedLength { + t.Errorf("unexpected up heap length in test \"%s\". "+ + "Expected: %v, got: %v", test.name, test.expectedLength, uHeap.Len()) + } + if poppedBlock != test.expectedPopUp { + t.Errorf("unexpected popped block for up heap in test \"%s\". "+ + "Expected: %v, got: %v", test.name, test.expectedPopDown, poppedBlock) } } } diff --git a/blockdag/dag.go b/blockdag/dag.go index a45b57110..6495ae38b 100644 --- a/blockdag/dag.go +++ b/blockdag/dag.go @@ -158,7 +158,7 @@ type BlockDAG struct { // // This function is safe for concurrent access. func (dag *BlockDAG) HaveBlock(hash *daghash.Hash) (bool, error) { - exists, err := dag.blockExists(hash) + exists, err := dag.BlockExists(hash) if err != nil { return false, err } @@ -204,30 +204,41 @@ func (dag *BlockDAG) IsKnownOrphan(hash *daghash.Hash) bool { return exists } -// GetOrphanRoot returns the head of the chain for the provided hash from the -// map of orphan blocks. +// GetOrphanMissingAncestorHashes returns all of the missing parents in the orphan's sub-DAG // // This function is safe for concurrent access. -func (dag *BlockDAG) GetOrphanRoot(hash *daghash.Hash) *daghash.Hash { +func (dag *BlockDAG) GetOrphanMissingAncestorHashes(hash *daghash.Hash) ([]*daghash.Hash, error) { // Protect concurrent access. Using a read lock only so multiple // readers can query without blocking each other. dag.orphanLock.RLock() defer dag.orphanLock.RUnlock() - // Keep looping while the parent of each orphaned block is - // known and is an orphan itself. - orphanRoot := hash - parentHash := hash - for { - orphan, exists := dag.orphans[*parentHash] - if !exists { - break - } - orphanRoot = parentHash - parentHash = orphan.block.MsgBlock().Header.SelectedParentHash() - } + missingAncestorsHashes := make([]*daghash.Hash, 0) - return orphanRoot + visited := make(map[daghash.Hash]bool) + queue := []*daghash.Hash{hash} + for len(queue) > 0 { + var current *daghash.Hash + current, queue = queue[0], queue[1:] + if !visited[*current] { + visited[*current] = true + orphan, orphanExists := dag.orphans[*current] + if orphanExists { + for _, parentHash := range orphan.block.MsgBlock().Header.ParentHashes { + queue = append(queue, &parentHash) + } + } else { + existsInDag, err := dag.BlockExists(current) + if err != nil { + return nil, err + } + if !existsInDag { + missingAncestorsHashes = append(missingAncestorsHashes, current) + } + } + } + } + return missingAncestorsHashes, nil } // removeOrphanBlock removes the passed orphan block from the orphan pool and @@ -1037,8 +1048,6 @@ func (dag *BlockDAG) IsCurrent() bool { // selectedTip returns the current selected tip for the DAG. // It will return nil if there is no tip. -// -// This function is safe for concurrent access. func (dag *BlockDAG) selectedTip() *blockNode { return dag.virtual.selectedParent } @@ -1056,6 +1065,19 @@ func (dag *BlockDAG) SelectedTipHeader() *wire.BlockHeader { return selectedTip.Header() } +// SelectedTipHash returns the hash of the current selected tip for the DAG. +// It will return nil if there is no tip. +// +// This function is safe for concurrent access. +func (dag *BlockDAG) SelectedTipHash() *daghash.Hash { + selectedTip := dag.selectedTip() + if selectedTip == nil { + return nil + } + + return &selectedTip.hash +} + // UTXOSet returns the DAG's UTXO set func (dag *BlockDAG) UTXOSet() *FullUTXOSet { return dag.virtual.utxoSet @@ -1125,19 +1147,26 @@ func (dag *BlockDAG) HeaderByHash(hash *daghash.Hash) (*wire.BlockHeader, error) return node.Header(), nil } -// BlockLocatorFromHash returns a block locator for the passed block hash. +// BlockLocatorFromHash traverses the selected parent chain of the given block hash +// until it finds a block that exists in the virtual's selected parent chain, and +// then it returns its block locator. // See BlockLocator for details on the algorithm used to create a block locator. // // In addition to the general algorithm referenced above, this function will -// return the block locator for the latest known tip of the main (best) chain if +// return the block locator for the selected tip if // the passed hash is not currently known. // // This function is safe for concurrent access. func (dag *BlockDAG) BlockLocatorFromHash(hash *daghash.Hash) BlockLocator { dag.dagLock.RLock() + defer dag.dagLock.RUnlock() node := dag.index.LookupNode(hash) + if node != nil { + for !dag.IsInSelectedPathChain(&node.hash) { + node = node.selectedParent + } + } locator := dag.blockLocator(node) - dag.dagLock.RUnlock() return locator } @@ -1145,11 +1174,11 @@ func (dag *BlockDAG) BlockLocatorFromHash(hash *daghash.Hash) BlockLocator { // main (best) chain. // // This function is safe for concurrent access. -func (dag *BlockDAG) LatestBlockLocator() (BlockLocator, error) { +func (dag *BlockDAG) LatestBlockLocator() BlockLocator { dag.dagLock.RLock() + defer dag.dagLock.RUnlock() locator := dag.blockLocator(nil) - dag.dagLock.RUnlock() - return locator, nil + return locator } // blockLocator returns a block locator for the passed block node. The passed @@ -1348,24 +1377,16 @@ func (dag *BlockDAG) locateInventory(locator BlockLocator, hashStop *daghash.Has } } - // Start at the block after the most recently known block. When there - // is no next block it means the most recently known block is the tip of - // the best chain, so there is nothing more to do. - startNode = startNode.diffChild - if startNode == nil { - return nil, 0 - } - - // Calculate how many entries are needed. - total := uint32((dag.selectedTip().height - startNode.height) + 1) + // Estimate how many entries are needed. + estimatedEntries := uint32((dag.selectedTip().blueScore - startNode.blueScore) + 1) if stopNode != nil && stopNode.height >= startNode.height { - total = uint32((stopNode.height - startNode.height) + 1) + estimatedEntries = uint32((stopNode.blueScore - startNode.blueScore) + 1) } - if total > maxEntries { - total = maxEntries + if estimatedEntries > maxEntries { + estimatedEntries = maxEntries } - return startNode, total + return startNode, estimatedEntries } // locateBlocks returns the hashes of the blocks after the first known block in @@ -1376,23 +1397,47 @@ func (dag *BlockDAG) locateInventory(locator BlockLocator, hashStop *daghash.Has // // This function MUST be called with the DAG state lock held (for reads). func (dag *BlockDAG) locateBlocks(locator BlockLocator, hashStop *daghash.Hash, maxHashes uint32) []daghash.Hash { - // Find the node after the first known block in the locator and the - // total number of nodes after it needed while respecting the stop hash - // and max entries. - node, total := dag.locateInventory(locator, hashStop, maxHashes) - if total == 0 { - return nil - } - - // Populate and return the found hashes. - hashes := make([]daghash.Hash, 0, total) - for i := uint32(0); i < total; i++ { - hashes = append(hashes, node.hash) - node = node.diffChild + nodes := dag.locateBlockNodes(locator, hashStop, maxHashes) + hashes := make([]daghash.Hash, len(nodes)) + for i, node := range nodes { + hashes[i] = node.hash } return hashes } +func (dag *BlockDAG) locateBlockNodes(locator BlockLocator, hashStop *daghash.Hash, maxEntries uint32) []*blockNode { + // Find the first known block in the locator and the + // estimated number of nodes after it needed while respecting the stop hash + // and max entries. + node, estimatedEntries := dag.locateInventory(locator, hashStop, maxEntries) + if estimatedEntries == 0 { + return nil + } + stopNode := dag.index.LookupNode(hashStop) + + // Populate and return the found nodes. + nodes := make([]*blockNode, 0, estimatedEntries) + queue := NewUpHeap() + queue.pushSet(node.children) + + visited := newSet() + for i := uint32(0); queue.Len() > 0 && i < maxEntries; i++ { + var current *blockNode + current = queue.pop() + if !visited.contains(current) { + visited.add(current) + isBeforeStop := (stopNode == nil) || (current.height < stopNode.height) + if isBeforeStop || current.hash.IsEqual(hashStop) { + nodes = append(nodes, current) + } + if isBeforeStop { + queue.pushSet(current.children) + } + } + } + return nodes +} + // LocateBlocks returns the hashes of the blocks after the first known block in // the locator until the provided stop hash is reached, or up to the provided // max number of block hashes. @@ -1421,19 +1466,10 @@ func (dag *BlockDAG) LocateBlocks(locator BlockLocator, hashStop *daghash.Hash, // // This function MUST be called with the DAG state lock held (for reads). func (dag *BlockDAG) locateHeaders(locator BlockLocator, hashStop *daghash.Hash, maxHeaders uint32) []*wire.BlockHeader { - // Find the node after the first known block in the locator and the - // total number of nodes after it needed while respecting the stop hash - // and max entries. - node, total := dag.locateInventory(locator, hashStop, maxHeaders) - if total == 0 { - return nil - } - - // Populate and return the found headers. - headers := make([]*wire.BlockHeader, 0, total) - for i := uint32(0); i < total; i++ { - headers = append(headers, node.Header()) - node = node.diffChild + nodes := dag.locateBlockNodes(locator, hashStop, maxHeaders) + headers := make([]*wire.BlockHeader, len(nodes)) + for i, node := range nodes { + headers[i] = node.Header() } return headers } diff --git a/blockdag/phantom.go b/blockdag/phantom.go index 453d90352..44ca39af6 100644 --- a/blockdag/phantom.go +++ b/blockdag/phantom.go @@ -77,7 +77,7 @@ func blueCandidates(chainStart *blockNode) blockSet { func traverseCandidates(newBlock *blockNode, candidates blockSet, selectedParent *blockNode) []*blockNode { blues := []*blockNode{} selectedParentPast := newSet() - queue := NewHeap() + queue := NewDownHeap() visited := newSet() for _, parent := range newBlock.parents { diff --git a/blockdag/process.go b/blockdag/process.go index 915603cec..de5bb34a0 100644 --- a/blockdag/process.go +++ b/blockdag/process.go @@ -33,11 +33,11 @@ const ( BFNone BehaviorFlags = 0 ) -// blockExists determines whether a block with the given hash exists either in -// the main chain or any side chains. +// BlockExists determines whether a block with the given hash exists in +// the DAG. // // This function is safe for concurrent access. -func (dag *BlockDAG) blockExists(hash *daghash.Hash) (bool, error) { +func (dag *BlockDAG) BlockExists(hash *daghash.Hash) (bool, error) { // Check block index first (could be main chain or side chain blocks). if dag.index.HaveBlock(hash) { return true, nil @@ -148,7 +148,7 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (bool, log.Tracef("Processing block %s", blockHash) // The block must not already exist in the main chain or side chains. - exists, err := dag.blockExists(blockHash) + exists, err := dag.BlockExists(blockHash) if err != nil { return false, err } @@ -212,7 +212,7 @@ func (dag *BlockDAG) ProcessBlock(block *util.Block, flags BehaviorFlags) (bool, // Handle orphan blocks. allParentsExist := true for _, parentHash := range blockHeader.ParentHashes { - parentExists, err := dag.blockExists(&parentHash) + parentExists, err := dag.BlockExists(&parentHash) if err != nil { return false, err } diff --git a/blockdag/validate.go b/blockdag/validate.go index 8fce6de39..3a2ea644d 100644 --- a/blockdag/validate.go +++ b/blockdag/validate.go @@ -754,7 +754,7 @@ func (dag *BlockDAG) validateDifficulty(header *wire.BlockHeader, bluestParent * // validateParents validates that no parent is an ancestor of another parent func validateParents(blockHeader *wire.BlockHeader, parents blockSet) error { minHeight := int32(math.MaxInt32) - queue := NewHeap() + queue := NewDownHeap() visited := newSet() for _, parent := range parents { if parent.height < minHeight { diff --git a/blockdag/virtualblock_test.go b/blockdag/virtualblock_test.go index de6b93d71..8c0714453 100644 --- a/blockdag/virtualblock_test.go +++ b/blockdag/virtualblock_test.go @@ -32,7 +32,7 @@ func TestVirtualBlock(t *testing.T) { // Set its tips to tipsToSet // Add to it all the tips in tipsToAdd, one after the other // Call .Tips() on it and compare the result to expectedTips - // Call .SelectedTip() on it and compare the result to expectedSelectedParent + // Call .selectedTip() on it and compare the result to expectedSelectedParent tests := []struct { name string tipsToSet []*blockNode diff --git a/btcjson/dagsvrresults.go b/btcjson/dagsvrresults.go index 7f0df5fc8..5f0f2dad9 100644 --- a/btcjson/dagsvrresults.go +++ b/btcjson/dagsvrresults.go @@ -229,26 +229,25 @@ type GetNetworkInfoResult struct { // GetPeerInfoResult models the data returned from the getpeerinfo command. type GetPeerInfoResult struct { - ID int32 `json:"id"` - Addr string `json:"addr"` - Services string `json:"services"` - RelayTxes bool `json:"relayTxes"` - LastSend int64 `json:"lastSend"` - LastRecv int64 `json:"lastRecv"` - BytesSent uint64 `json:"bytesSent"` - BytesRecv uint64 `json:"bytesRecv"` - ConnTime int64 `json:"connTime"` - TimeOffset int64 `json:"timeOffset"` - PingTime float64 `json:"pingTime"` - PingWait float64 `json:"pingWait,omitempty"` - Version uint32 `json:"version"` - SubVer string `json:"subVer"` - Inbound bool `json:"inbound"` - StartingHeight int32 `json:"startingHeight"` - CurrentHeight int32 `json:"currentHeight,omitempty"` - BanScore int32 `json:"banScore"` - FeeFilter int64 `json:"feeFilter"` - SyncNode bool `json:"syncNode"` + ID int32 `json:"id"` + Addr string `json:"addr"` + Services string `json:"services"` + RelayTxes bool `json:"relayTxes"` + LastSend int64 `json:"lastSend"` + LastRecv int64 `json:"lastRecv"` + BytesSent uint64 `json:"bytesSent"` + BytesRecv uint64 `json:"bytesRecv"` + ConnTime int64 `json:"connTime"` + TimeOffset int64 `json:"timeOffset"` + PingTime float64 `json:"pingTime"` + PingWait float64 `json:"pingWait,omitempty"` + Version uint32 `json:"version"` + SubVer string `json:"subVer"` + Inbound bool `json:"inbound"` + SelectedTip string `json:"selectedTip,omitempty"` + BanScore int32 `json:"banScore"` + FeeFilter int64 `json:"feeFilter"` + SyncNode bool `json:"syncNode"` } // GetRawMempoolVerboseResult models the data returned from the getrawmempool diff --git a/mining/cpuminer/cpuminer.go b/mining/cpuminer/cpuminer.go index ca4890046..d1946a019 100644 --- a/mining/cpuminer/cpuminer.go +++ b/mining/cpuminer/cpuminer.go @@ -71,6 +71,11 @@ type Config struct { // found blocks to. ConnectedCount func() int32 + // ShouldMineOnGenesis checks if the node is connected to at least one + // peer, and at least one of its peers knows of any blocks that were mined + // on top of the genesis block. + ShouldMineOnGenesis func() bool + // IsCurrent defines the function to use to obtain whether or not the // block chain is current. This is used by the automatic persistent // mining routine to determine whether or it should attempt mining. @@ -309,14 +314,14 @@ out: continue } - // No point in searching for a solution before the chain is + // No point in searching for a solution before the DAG is // synced. Also, grab the same lock as used for block // submission, since the current block will be changing and // this would otherwise end up building a new block template on // a block that is in the process of becoming stale. m.submitBlockLock.Lock() curHeight := m.g.DAGHeight() - if curHeight != 0 && !m.cfg.IsCurrent() { + if (curHeight != 0 && !m.cfg.IsCurrent()) || (curHeight == 0 && !m.cfg.ShouldMineOnGenesis()) { m.submitBlockLock.Unlock() time.Sleep(time.Second) continue diff --git a/netsync/interface.go b/netsync/interface.go index 80019484a..a3592739d 100644 --- a/netsync/interface.go +++ b/netsync/interface.go @@ -7,9 +7,7 @@ package netsync import ( "github.com/daglabs/btcd/blockdag" "github.com/daglabs/btcd/dagconfig" - "github.com/daglabs/btcd/dagconfig/daghash" "github.com/daglabs/btcd/mempool" - "github.com/daglabs/btcd/peer" "github.com/daglabs/btcd/util" "github.com/daglabs/btcd/wire" ) @@ -20,8 +18,6 @@ import ( type PeerNotifier interface { AnnounceNewTransactions(newTxs []*mempool.TxDesc) - UpdatePeerHeights(latestBlkHash *daghash.Hash, latestHeight int32, updateSource *peer.Peer) - RelayInventory(invVect *wire.InvVect, data interface{}) TransactionConfirmed(tx *util.Tx) diff --git a/netsync/manager.go b/netsync/manager.go index f1c27a9c4..90b6365fa 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -131,9 +131,17 @@ type headerNode struct { // about a peer. type peerSyncState struct { syncCandidate bool - requestQueue []*wire.InvVect - requestedTxns map[daghash.TxID]struct{} - requestedBlocks map[daghash.Hash]struct{} + requestQueueMtx sync.Mutex + // relayedInvsRequestQueue contains invs of blocks and transactions + // which are relayed to our node. + relayedInvsRequestQueue []*wire.InvVect + // requestQueue contains all of the invs that are not relayed to + // us; we get them by requesting them or by manually creating them. + requestQueue []*wire.InvVect + relayedInvsRequestQueueSet map[daghash.Hash]struct{} + requestQueueSet map[daghash.Hash]struct{} + requestedTxns map[daghash.TxID]struct{} + requestedBlocks map[daghash.Hash]struct{} } // SyncManager is used to communicate block related messages with peers. The @@ -230,13 +238,14 @@ func (sm *SyncManager) startSync() { continue } - // Remove sync candidate peers that are no longer candidates due - // to passing their latest known block. NOTE: The < is - // intentional as opposed to <=. While technically the peer - // doesn't have a later block when it's equal, it will likely - // have one soon so it is a reasonable choice. It also allows - // the case where both are at 0 such as during regression test. - if peer.LastBlock() < sm.dag.Height() { //TODO: (Ori) This is probably wrong. Done only for compilation + isCandidate, err := peer.IsSyncCandidate() + if err != nil { + log.Errorf("Failed to check if peer %s is"+ + "a sync candidate: %s", peer, err) + return + } + + if !isCandidate { state.syncCandidate = false continue } @@ -253,15 +262,10 @@ func (sm *SyncManager) startSync() { // to send. sm.requestedBlocks = make(map[daghash.Hash]struct{}) - locator, err := sm.dag.LatestBlockLocator() - if err != nil { - log.Errorf("Failed to get block locator for the "+ - "latest block: %s", err) - return - } + locator := sm.dag.LatestBlockLocator() - log.Infof("Syncing to block height %d from peer %s", - bestPeer.LastBlock(), bestPeer.Addr()) + log.Infof("Syncing to block %s from peer %s", + bestPeer.SelectedTip(), bestPeer.Addr()) // When the current height is less than a known checkpoint we // can use block headers to learn about which blocks comprise @@ -342,9 +346,11 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { // Initialize the peer state isSyncCandidate := sm.isSyncCandidate(peer) sm.peerStates[peer] = &peerSyncState{ - syncCandidate: isSyncCandidate, - requestedTxns: make(map[daghash.TxID]struct{}), - requestedBlocks: make(map[daghash.Hash]struct{}), + syncCandidate: isSyncCandidate, + requestedTxns: make(map[daghash.TxID]struct{}), + requestedBlocks: make(map[daghash.Hash]struct{}), + requestQueueSet: make(map[daghash.Hash]struct{}), + relayedInvsRequestQueueSet: make(map[daghash.Hash]struct{}), } // Start syncing by choosing the best candidate if needed. @@ -466,22 +472,26 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { // current returns true if we believe we are synced with our peers, false if we // still have blocks to check func (sm *SyncManager) current() bool { - if !sm.dag.IsCurrent() { - return false - } + return sm.dag.IsCurrent() +} - // if blockChain thinks we are current and we have no syncPeer it - // is probably right. - if sm.syncPeer == nil { - return true +// restartSyncIfNeeded finds a new sync candidate if we're not expecting any +// blocks from the current one. +func (sm *SyncManager) restartSyncIfNeeded() { + if sm.syncPeer != nil { + syncPeerState, exists := sm.peerStates[sm.syncPeer] + if exists { + isWaitingForBlocks := func() bool { + syncPeerState.requestQueueMtx.Lock() + defer syncPeerState.requestQueueMtx.Unlock() + return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueue) != 0 + }() + if isWaitingForBlocks { + return + } + } } - - // No matter what chain thinks, if we are below the block we are syncing - // to we are not current. - if sm.dag.Height() < sm.syncPeer.LastBlock() { //TODO: (Ori) This is probably wrong. Done only for compilation - return false - } - return true + sm.startSync() } // handleBlockMsg handles block messages from all peers. @@ -533,14 +543,17 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // Remove block from request maps. Either chain will know about it and - // so we shouldn't have any more instances of trying to fetch it, or we - // will fail the insert and thus we'll retry next time we get an inv. + // Process the block to include validation, orphan handling, etc. + isOrphan, err := sm.dag.ProcessBlock(bmsg.block, behaviorFlags) + + // Remove block from request maps. Either DAG knows about it and + // so we shouldn't have any more instances of trying to fetch it, or + // the insertion fails and thus we'll retry next time we get an inv. delete(state.requestedBlocks, *blockHash) delete(sm.requestedBlocks, *blockHash) - // Process the block to include validation, orphan handling, etc. - isOrphan, err := sm.dag.ProcessBlock(bmsg.block, behaviorFlags) + sm.restartSyncIfNeeded() + if err != nil { // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log @@ -565,69 +578,32 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { return } - // Meta-data about the new block this peer is reporting. We use this - // below to update this peer's lastest block height and the heights of - // other peers based on their last announced block hash. This allows us - // to dynamically update the block heights of peers, avoiding stale - // heights when looking for a new sync peer. Upon acceptance of a block - // or recognition of an orphan, we also use this information to update - // the block heights over other peers who's invs may have been ignored - // if we are actively syncing while the chain is not yet current or - // who may have lost the lock announcment race. - var heightUpdate int32 - var blkHashUpdate *daghash.Hash - // Request the parents for the orphan block from the peer that sent it. if isOrphan { - // We've just received an orphan block from a peer. In order - // to update the height of the peer, we try to extract the - // block height from the scriptSig of the coinbase transaction. - // Extraction is only attempted if the block's version is - // high enough (ver 2+). - coinbaseTx := bmsg.block.Transactions()[0] - cbHeight, err := blockdag.ExtractCoinbaseHeight(coinbaseTx) + missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(blockHash) if err != nil { - log.Warnf("Unable to extract height from "+ - "coinbase tx: %s", err) - } else { - log.Debugf("Extracted height of %d from "+ - "orphan block", cbHeight) - heightUpdate = cbHeight - blkHashUpdate = blockHash - } - - orphanRoot := sm.dag.GetOrphanRoot(blockHash) - locator, err := sm.dag.LatestBlockLocator() - if err != nil { - log.Warnf("Failed to get block locator for the "+ - "latest block: %s", err) - } else { - peer.PushGetBlocksMsg(locator, orphanRoot) + log.Errorf("Failed to find missing ancestors for block %s: %s", + blockHash, err) + return } + sm.addBlocksToRequestQueue(state, missingAncestors, false) } else { // When the block is not an orphan, log information about it and // update the chain state. sm.progressLogger.LogBlockHeight(bmsg.block) - // Update this peer's latest block height, for future - // potential sync node candidacy. - highestTipHash := sm.dag.HighestTipHash() - heightUpdate = sm.dag.Height() //TODO: (Ori) This is probably wrong. Done only for compilation - blkHashUpdate = &highestTipHash - // Clear the rejected transactions. sm.rejectedTxns = make(map[daghash.TxID]struct{}) } - // Update the block height for this peer. But only send a message to - // the server for updating peer heights if this is an orphan or our - // chain is "current". This avoids sending a spammy amount of messages - // if we're syncing the chain from scratch. - if blkHashUpdate != nil && heightUpdate != 0 { - peer.UpdateLastBlockHeight(heightUpdate) - if isOrphan || sm.current() { - go sm.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, - peer) + // We don't want to flood our sync peer with getdata messages, so + // instead of asking it immediately about missing ancestors, we first + // wait until it finishes to send us all of the requested blocks. + if (isOrphan && peer != sm.syncPeer) || (peer == sm.syncPeer && len(state.requestedBlocks) == 0) { + err := sm.sendInvsFromRequestQueue(peer, state) + if err != nil { + log.Errorf("Failed to send invs from queue: %s", err) + return } } @@ -683,6 +659,37 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } +func (sm *SyncManager) addBlocksToRequestQueue(state *peerSyncState, hashes []*daghash.Hash, isRelayedInv bool) { + state.requestQueueMtx.Lock() + defer state.requestQueueMtx.Unlock() + for _, hash := range hashes { + if _, exists := sm.requestedBlocks[*hash]; !exists { + iv := wire.NewInvVect(wire.InvTypeBlock, hash) + state.addInvToRequestQueue(iv, isRelayedInv) + } + } +} + +func (state *peerSyncState) addInvToRequestQueue(iv *wire.InvVect, isRelayedInv bool) { + if isRelayedInv { + if _, exists := state.relayedInvsRequestQueueSet[iv.Hash]; !exists { + state.relayedInvsRequestQueueSet[iv.Hash] = struct{}{} + state.relayedInvsRequestQueue = append(state.relayedInvsRequestQueue, iv) + } + } else { + if _, exists := state.requestQueueSet[iv.Hash]; !exists { + state.requestQueueSet[iv.Hash] = struct{}{} + state.requestQueue = append(state.requestQueue, iv) + } + } +} + +func (state *peerSyncState) addInvToRequestQueueWithLock(iv *wire.InvVect, isRelayedInv bool) { + state.requestQueueMtx.Lock() + defer state.requestQueueMtx.Unlock() + state.addInvToRequestQueue(iv, isRelayedInv) +} + // fetchHeaderBlocks creates and sends a request to the syncPeer for the next // list of blocks to be downloaded based on the current list of headers. func (sm *SyncManager) fetchHeaderBlocks() { @@ -845,9 +852,10 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // are in the memory pool (either the main pool or orphan pool). func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) { switch invVect.Type { + case wire.InvTypeSyncBlock: + fallthrough case wire.InvTypeBlock: - // Ask chain if the block is known to it in any form (main - // chain, side chain, or orphan). + // Ask DAG if the block is known to it in any form (in DAG or as an orphan). return sm.dag.HaveBlock(&invVect.Hash) case wire.InvTypeTx: @@ -900,30 +908,12 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { lastBlock := -1 invVects := imsg.inv.InvList for i := len(invVects) - 1; i >= 0; i-- { - if invVects[i].Type == wire.InvTypeBlock { + if invVects[i].IsBlockOrSyncBlock() { lastBlock = i break } } - // If this inv contains a block announcement, and this isn't coming from - // our current sync peer or we're current, then update the last - // announced block for this peer. We'll use this information later to - // update the heights of peers based on blocks we've accepted that they - // previously announced. - if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) { - peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) - } - - // If our chain is current and a peer announces a block we already - // know of, then update their current block height. - if lastBlock != -1 && sm.current() { - blkHeight, err := sm.dag.BlockHeightByHash(&invVects[lastBlock].Hash) - if err == nil { - peer.UpdateLastBlockHeight(blkHeight) - } - } - // Request the advertised inventory if we don't already have it. Also, // request parent blocks of orphans if we receive one we already have. // Finally, attempt to detect potential stalls due to long side chains @@ -932,6 +922,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { // Ignore unsupported inventory types. switch iv.Type { case wire.InvTypeBlock: + case wire.InvTypeSyncBlock: case wire.InvTypeTx: default: continue @@ -964,11 +955,11 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { } // Add it to the request queue. - state.requestQueue = append(state.requestQueue, iv) + state.addInvToRequestQueueWithLock(iv, iv.Type != wire.InvTypeSyncBlock) continue } - if iv.Type == wire.InvTypeBlock { + if iv.IsBlockOrSyncBlock() { // The block is an orphan block that we already have. // When the existing orphan was processed, it requested // the missing parent blocks. When this scenario @@ -980,27 +971,23 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { // to signal there are more missing blocks that need to // be requested. if sm.dag.IsKnownOrphan(&iv.Hash) { - // Request blocks starting at the latest known - // up to the root of the orphan that just came - // in. - orphanRoot := sm.dag.GetOrphanRoot(&iv.Hash) - locator, err := sm.dag.LatestBlockLocator() + missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(&iv.Hash) if err != nil { - log.Errorf("PEER: Failed to get block "+ - "locator for the latest block: "+ - "%s", err) - continue + log.Errorf("Failed to find missing ancestors for block %s: %s", + iv.Hash, err) + return } - peer.PushGetBlocksMsg(locator, orphanRoot) + sm.addBlocksToRequestQueue(state, missingAncestors, iv.Type != wire.InvTypeSyncBlock) continue } // We already have the final block advertised by this // inventory message, so force a request for more. This - // should only happen if we're on a really long side - // chain. - if i == lastBlock { - // Request blocks after this one up to the + // should only happen if our DAG and the peer's DAG have + // diverged long time ago. + if i == lastBlock && peer == sm.syncPeer { + // Request blocks after the first block's ancestor that exists + // in the selected path chain, one up to the // final one the remote peer knows about (zero // stop hash). locator := sm.dag.BlockLocatorFromHash(&iv.Hash) @@ -1009,30 +996,58 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { } } - // Request as much as possible at once. Anything that won't fit into - // the request will be requested on the next inv message. - numRequested := 0 - gdmsg := wire.NewMsgGetData() - requestQueue := state.requestQueue + err := sm.sendInvsFromRequestQueue(peer, state) + if err != nil { + log.Errorf("Failed to send invs from queue: %s", err) + } +} + +func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData, state *peerSyncState, requestQueue []*wire.InvVect) ([]*wire.InvVect, error) { + var invsNum int + leftSpaceInGdmsg := wire.MaxInvPerMsg - len(gdmsg.InvList) + if len(requestQueue) > leftSpaceInGdmsg { + invsNum = leftSpaceInGdmsg + } else { + invsNum = len(requestQueue) + } + invsToAdd := make([]*wire.InvVect, 0, invsNum) + for len(requestQueue) != 0 { iv := requestQueue[0] requestQueue[0] = nil requestQueue = requestQueue[1:] - switch iv.Type { - case wire.InvTypeBlock: - // Request the block if there is not already a pending - // request. - if _, exists := sm.requestedBlocks[iv.Hash]; !exists { - sm.requestedBlocks[iv.Hash] = struct{}{} - sm.limitHashMap(sm.requestedBlocks, maxRequestedBlocks) - state.requestedBlocks[iv.Hash] = struct{}{} + exists, err := sm.haveInventory(iv) + if err != nil { + return nil, err + } + if !exists { + invsToAdd = append(invsToAdd, iv) + } + } - gdmsg.AddInvVect(iv) - numRequested++ - } + addBlockInv := func(iv *wire.InvVect) { + // Request the block if there is not already a pending + // request. + if _, exists := sm.requestedBlocks[iv.Hash]; !exists { + sm.requestedBlocks[iv.Hash] = struct{}{} + sm.limitHashMap(sm.requestedBlocks, maxRequestedBlocks) + state.requestedBlocks[iv.Hash] = struct{}{} + + gdmsg.AddInvVect(iv) + } + } + for _, iv := range invsToAdd { + switch iv.Type { + case wire.InvTypeSyncBlock: + delete(state.requestQueueSet, iv.Hash) + addBlockInv(iv) + case wire.InvTypeBlock: + delete(state.relayedInvsRequestQueueSet, iv.Hash) + addBlockInv(iv) case wire.InvTypeTx: + delete(state.relayedInvsRequestQueueSet, iv.Hash) // Request the transaction if there is not already a // pending request. if _, exists := sm.requestedTxns[daghash.TxID(iv.Hash)]; !exists { @@ -1041,18 +1056,36 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { state.requestedTxns[daghash.TxID(iv.Hash)] = struct{}{} gdmsg.AddInvVect(iv) - numRequested++ } } - if numRequested >= wire.MaxInvPerMsg { + if len(requestQueue) >= wire.MaxInvPerMsg { break } } - state.requestQueue = requestQueue + return requestQueue, nil +} + +func (sm *SyncManager) sendInvsFromRequestQueue(peer *peerpkg.Peer, state *peerSyncState) error { + state.requestQueueMtx.Lock() + defer state.requestQueueMtx.Unlock() + gdmsg := wire.NewMsgGetData() + newRequestQueue, err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, state.requestQueue) + if err != nil { + return err + } + state.requestQueue = newRequestQueue + if sm.current() { + newRequestQueue, err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, state.relayedInvsRequestQueue) + if err != nil { + return err + } + state.relayedInvsRequestQueue = newRequestQueue + } if len(gdmsg.InvList) > 0 { peer.QueueMessage(gdmsg, nil) } + return nil } // limitTxIDMap is a helper function for maps that require a maximum limit by diff --git a/peer/example_test.go b/peer/example_test.go index 894ffa3c9..ae2e0ad08 100644 --- a/peer/example_test.go +++ b/peer/example_test.go @@ -25,6 +25,7 @@ func mockRemotePeer() error { UserAgentVersion: "1.0.0", // User agent version to advertise. DAGParams: &dagconfig.SimNetParams, SubnetworkID: subnetworkid.SubnetworkIDSupportsAll, + SelectedTip: fakeSelectedTipFn, } // Accept connections on the simnet port. @@ -80,6 +81,7 @@ func Example_newOutboundPeer() { }, }, SubnetworkID: subnetworkid.SubnetworkIDSupportsAll, + SelectedTip: fakeSelectedTipFn, } p, err := peer.NewOutboundPeer(peerCfg, "127.0.0.1:18555") if err != nil { diff --git a/peer/log.go b/peer/log.go index 16be368bc..a9f3b7b79 100644 --- a/peer/log.go +++ b/peer/log.go @@ -74,6 +74,8 @@ func invSummary(invList []*wire.InvVect) string { return fmt.Sprintf("error %s", iv.Hash) case wire.InvTypeBlock: return fmt.Sprintf("block %s", iv.Hash) + case wire.InvTypeSyncBlock: + return fmt.Sprintf("sync block %s", iv.Hash) case wire.InvTypeTx: return fmt.Sprintf("tx %s", iv.Hash) } @@ -124,8 +126,8 @@ func sanitizeString(str string, maxLength uint) string { func messageSummary(msg wire.Message) string { switch msg := msg.(type) { case *wire.MsgVersion: - return fmt.Sprintf("agent %s, pver %d, block %d", - msg.UserAgent, msg.ProtocolVersion, msg.LastBlock) + return fmt.Sprintf("agent %s, pver %d, selected tip %s", + msg.UserAgent, msg.ProtocolVersion, msg.SelectedTip) case *wire.MsgVerAck: // No summary. diff --git a/peer/peer.go b/peer/peer.go index 2bb7bad05..3695d099d 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -216,12 +216,13 @@ type MessageListeners struct { // Config is the struct to hold configuration options useful to Peer. type Config struct { - // NewestBlock specifies a callback which provides the newest block - // details to the peer as needed. This can be nil in which case the - // peer will report a block height of 0, however it is good practice for - // peers to specify this so their currently best known is accurately - // reported. - NewestBlock HashFunc + // SelectedTip specifies a callback which provides the selected tip + // to the peer as needed. + SelectedTip func() *daghash.Hash + + // SelectedTip specifies a callback which provides the selected tip + // to the peer as needed. + BlockExists func(*daghash.Hash) (bool, error) // HostToNetAddress returns the netaddress for the given host. This can be // nil in which case the host will be parsed as an IP address. @@ -369,8 +370,7 @@ type StatsSnap struct { Version uint32 UserAgent string Inbound bool - StartingHeight int32 - LastBlock int32 + SelectedTip *daghash.Hash LastPingNonce uint64 LastPingTime time.Time LastPingMicros int64 @@ -452,15 +452,13 @@ type Peer struct { // These fields keep track of statistics for the peer and are protected // by the statsMtx mutex. - statsMtx sync.RWMutex - timeOffset int64 - timeConnected time.Time - startingHeight int32 - lastBlock int32 - lastAnnouncedBlock *daghash.Hash - lastPingNonce uint64 // Set to nonce if we have a pending ping. - lastPingTime time.Time // Time we sent last ping. - lastPingMicros int64 // Time for last ping to return. + statsMtx sync.RWMutex + timeOffset int64 + timeConnected time.Time + selectedTip *daghash.Hash + lastPingNonce uint64 // Set to nonce if we have a pending ping. + lastPingTime time.Time // Time we sent last ping. + lastPingMicros int64 // Time for last ping to return. stallControl chan stallControlMsg outputQueue chan outMsg @@ -481,29 +479,6 @@ func (p *Peer) String() string { return fmt.Sprintf("%s (%s)", p.addr, logger.DirectionString(p.inbound)) } -// UpdateLastBlockHeight updates the last known block for the peer. -// -// This function is safe for concurrent access. -func (p *Peer) UpdateLastBlockHeight(newHeight int32) { - p.statsMtx.Lock() - log.Tracef("Updating last block height of peer %s from %s to %s", - p.addr, p.lastBlock, newHeight) - p.lastBlock = newHeight - p.statsMtx.Unlock() -} - -// UpdateLastAnnouncedBlock updates meta-data about the last block hash this -// peer is known to have announced. -// -// This function is safe for concurrent access. -func (p *Peer) UpdateLastAnnouncedBlock(blkHash *daghash.Hash) { - log.Tracef("Updating last blk for peer %s, %s", p.addr, blkHash) - - p.statsMtx.Lock() - p.lastAnnouncedBlock = blkHash - p.statsMtx.Unlock() -} - // AddKnownInventory adds the passed inventory to the cache of known inventory // for the peer. // @@ -540,8 +515,7 @@ func (p *Peer) StatsSnapshot() *StatsSnap { TimeOffset: p.timeOffset, Version: protocolVersion, Inbound: p.inbound, - StartingHeight: p.startingHeight, - LastBlock: p.lastBlock, + SelectedTip: p.selectedTip, LastPingNonce: p.lastPingNonce, LastPingMicros: p.lastPingMicros, LastPingTime: p.lastPingTime, @@ -620,17 +594,6 @@ func (p *Peer) SubnetworkID() *subnetworkid.SubnetworkID { return subnetworkID } -// LastAnnouncedBlock returns the last announced block of the remote peer. -// -// This function is safe for concurrent access. -func (p *Peer) LastAnnouncedBlock() *daghash.Hash { - p.statsMtx.RLock() - lastAnnouncedBlock := p.lastAnnouncedBlock - p.statsMtx.RUnlock() - - return lastAnnouncedBlock -} - // LastPingNonce returns the last ping nonce of the remote peer. // // This function is safe for concurrent access. @@ -699,15 +662,26 @@ func (p *Peer) ProtocolVersion() uint32 { return protocolVersion } -// LastBlock returns the last block of the peer. +// SelectedTip returns the selected tip of the peer. // // This function is safe for concurrent access. -func (p *Peer) LastBlock() int32 { +func (p *Peer) SelectedTip() *daghash.Hash { p.statsMtx.RLock() - lastBlock := p.lastBlock + selectedTip := p.selectedTip p.statsMtx.RUnlock() - return lastBlock + return selectedTip +} + +// IsSyncCandidate returns whether or not this peer is a sync candidate. +// +// This function is safe for concurrent access. +func (p *Peer) IsSyncCandidate() (bool, error) { + exists, err := p.cfg.BlockExists(p.selectedTip) + if err != nil { + return false, err + } + return !exists, nil } // LastSend returns the last send time of the peer. @@ -762,18 +736,6 @@ func (p *Peer) TimeOffset() int64 { return timeOffset } -// StartingHeight returns the last known height the peer reported during the -// initial negotiation phase. -// -// This function is safe for concurrent access. -func (p *Peer) StartingHeight() int32 { - p.statsMtx.RLock() - startingHeight := p.startingHeight - p.statsMtx.RUnlock() - - return startingHeight -} - // WantsHeaders returns if the peer wants header messages instead of // inventory vectors for blocks. // @@ -789,15 +751,7 @@ func (p *Peer) WantsHeaders() bool { // localVersionMsg creates a version message that can be used to send to the // remote peer. func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) { - var blockNum int32 - if p.cfg.NewestBlock != nil { - var err error - _, blockNum, err = p.cfg.NewestBlock() - if err != nil { - return nil, err - } - } - + selectedTip := p.cfg.SelectedTip() theirNA := p.na // If we are behind a proxy and the connection comes from the proxy then @@ -833,7 +787,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) { subnetworkID := p.cfg.SubnetworkID // Version message. - msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum, subnetworkID) + msg := wire.NewMsgVersion(ourNA, theirNA, nonce, selectedTip, subnetworkID) msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion, p.cfg.UserAgentComments...) @@ -1057,8 +1011,7 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error { // Updating a bunch of stats including block based stats, and the // peer's time offset. p.statsMtx.Lock() - p.lastBlock = msg.LastBlock - p.startingHeight = msg.LastBlock + p.selectedTip = msg.SelectedTip p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix() p.statsMtx.Unlock() diff --git a/peer/peer_test.go b/peer/peer_test.go index fc5178c82..a308708aa 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -101,8 +101,6 @@ type peerStats struct { wantConnected bool wantVersionKnown bool wantVerAckReceived bool - wantLastBlock int32 - wantStartingHeight int32 wantLastPingTime time.Time wantLastPingNonce uint64 wantLastPingMicros int64 @@ -153,11 +151,6 @@ func testPeer(t *testing.T, p *peer.Peer, s peerStats) { return } - if p.LastBlock() != s.wantLastBlock { - t.Errorf("testPeer: wrong LastBlock - got %v, want %v", p.LastBlock(), s.wantLastBlock) - return - } - // Allow for a deviation of 1s, as the second may tick when the message is // in transit and the protocol doesn't support any further precision. if p.TimeOffset() != s.wantTimeOffset && p.TimeOffset() != s.wantTimeOffset-1 { @@ -176,11 +169,6 @@ func testPeer(t *testing.T, p *peer.Peer, s peerStats) { return } - if p.StartingHeight() != s.wantStartingHeight { - t.Errorf("testPeer: wrong StartingHeight - got %v, want %v", p.StartingHeight(), s.wantStartingHeight) - return - } - if p.Connected() != s.wantConnected { t.Errorf("testPeer: wrong Connected - got %v, want %v", p.Connected(), s.wantConnected) return @@ -231,6 +219,7 @@ func TestPeerConnection(t *testing.T) { ProtocolVersion: wire.ProtocolVersion, // Configure with older version Services: 0, SubnetworkID: subnetworkid.SubnetworkIDSupportsAll, + SelectedTip: fakeSelectedTipFn, } peer2Cfg := &peer.Config{ Listeners: peer1Cfg.Listeners, @@ -241,6 +230,7 @@ func TestPeerConnection(t *testing.T) { ProtocolVersion: wire.ProtocolVersion + 1, Services: wire.SFNodeNetwork, SubnetworkID: subnetworkid.SubnetworkIDSupportsAll, + SelectedTip: fakeSelectedTipFn, } wantStats1 := peerStats{ @@ -254,8 +244,8 @@ func TestPeerConnection(t *testing.T) { wantLastPingNonce: uint64(0), wantLastPingMicros: int64(0), wantTimeOffset: int64(0), - wantBytesSent: 187, // 163 version + 24 verack - wantBytesReceived: 187, + wantBytesSent: 215, // 191 version + 24 verack + wantBytesReceived: 215, } wantStats2 := peerStats{ wantUserAgent: wire.DefaultUserAgent + "peer:1.0(comment)/", @@ -268,8 +258,8 @@ func TestPeerConnection(t *testing.T) { wantLastPingNonce: uint64(0), wantLastPingMicros: int64(0), wantTimeOffset: int64(0), - wantBytesSent: 187, // 163 version + 24 verack - wantBytesReceived: 187, + wantBytesSent: 215, // 191 version + 24 verack + wantBytesReceived: 215, } tests := []struct { @@ -443,6 +433,7 @@ func TestPeerListeners(t *testing.T) { DAGParams: &dagconfig.MainNetParams, Services: wire.SFNodeBloom, SubnetworkID: subnetworkid.SubnetworkIDSupportsAll, + SelectedTip: fakeSelectedTipFn, } inConn, outConn := pipe( &conn{raddr: "10.0.0.1:8333"}, @@ -603,10 +594,9 @@ func TestPeerListeners(t *testing.T) { // TestOutboundPeer tests that the outbound peer works as expected. func TestOutboundPeer(t *testing.T) { - peerCfg := &peer.Config{ - NewestBlock: func() (*daghash.Hash, int32, error) { - return nil, 0, errors.New("newest block not found") + SelectedTip: func() *daghash.Hash { + return &daghash.ZeroHash }, UserAgentName: "peer", UserAgentVersion: "1.0", @@ -628,23 +618,7 @@ func TestOutboundPeer(t *testing.T) { // Test trying to connect twice. p.AssociateConnection(c) p.AssociateConnection(c) - - disconnected := make(chan struct{}) - go func() { - p.WaitForDisconnect() - disconnected <- struct{}{} - }() - - select { - case <-disconnected: - close(disconnected) - case <-time.After(time.Second): - t.Fatal("Peer did not automatically disconnect.") - } - - if p.Connected() { - t.Fatalf("Should not be connected as NewestBlock produces error.") - } + p.Disconnect() // Test Queue Inv fakeBlockHash := &daghash.Hash{0: 0x00, 1: 0x01} @@ -662,17 +636,17 @@ func TestOutboundPeer(t *testing.T) { <-done p.Disconnect() - // Test NewestBlock - var newestBlock = func() (*daghash.Hash, int32, error) { + // Test SelectedTip + var selectedTip = func() *daghash.Hash { hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef" hash, err := daghash.NewHashFromStr(hashStr) if err != nil { - return nil, 0, err + t.Fatalf("daghash.NewHashFromStr: %s", err) } - return hash, 234439, nil + return hash } - peerCfg.NewestBlock = newestBlock + peerCfg.SelectedTip = selectedTip r1, w1 := io.Pipe() c1 := &conn{raddr: "10.0.0.1:8333", Writer: w1, Reader: r1} p1, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333") @@ -682,20 +656,6 @@ func TestOutboundPeer(t *testing.T) { } p1.AssociateConnection(c1) - // Test update latest block - latestBlockHash, err := daghash.NewHashFromStr("1a63f9cdff1752e6375c8c76e543a71d239e1a2e5c6db1aa679") - if err != nil { - t.Errorf("NewHashFromStr: unexpected err %v\n", err) - return - } - p1.UpdateLastAnnouncedBlock(latestBlockHash) - p1.UpdateLastBlockHeight(234440) - if p1.LastAnnouncedBlock() != latestBlockHash { - t.Errorf("LastAnnouncedBlock: wrong block - got %v, want %v", - p1.LastAnnouncedBlock(), latestBlockHash) - return - } - // Test Queue Inv after connection p1.QueueInventory(fakeInv) p1.Disconnect() @@ -755,6 +715,7 @@ func TestUnsupportedVersionPeer(t *testing.T) { DAGParams: &dagconfig.MainNetParams, Services: 0, SubnetworkID: subnetworkid.SubnetworkIDSupportsAll, + SelectedTip: fakeSelectedTipFn, } localNA := wire.NewNetAddressIPPort( @@ -811,7 +772,7 @@ func TestUnsupportedVersionPeer(t *testing.T) { } // Remote peer writes version message advertising invalid protocol version 0 - invalidVersionMsg := wire.NewMsgVersion(remoteNA, localNA, 0, 0, subnetworkid.SubnetworkIDSupportsAll) + invalidVersionMsg := wire.NewMsgVersion(remoteNA, localNA, 0, &daghash.ZeroHash, subnetworkid.SubnetworkIDSupportsAll) invalidVersionMsg.ProtocolVersion = 0 _, err = wire.WriteMessageN( @@ -853,3 +814,7 @@ func init() { // Allow self connection when running the tests. peer.TstAllowSelfConns() } + +func fakeSelectedTipFn() *daghash.Hash { + return &daghash.Hash{0x12, 0x34} +} diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go index b63d5abaa..ce39e5cad 100644 --- a/server/p2p/p2p.go +++ b/server/p2p/p2p.go @@ -190,24 +190,51 @@ func (ps *peerState) Count() int { len(ps.persistentPeers) } -// forAllOutboundPeers is a helper function that runs closure on all outbound +// forAllOutboundPeers is a helper function that runs a callback on all outbound // peers known to peerState. -func (ps *peerState) forAllOutboundPeers(closure func(sp *Peer)) { +// The loop stops and returns false if one of the callback calls returns false. +// Otherwise the function should return true. +func (ps *peerState) forAllOutboundPeers(callback func(sp *Peer) bool) bool { for _, e := range ps.outboundPeers { - closure(e) + shouldContinue := callback(e) + if !shouldContinue { + return false + } } for _, e := range ps.persistentPeers { - closure(e) + shouldContinue := callback(e) + if !shouldContinue { + return false + } } + return true } -// forAllPeers is a helper function that runs closure on all peers known to -// peerState. -func (ps *peerState) forAllPeers(closure func(sp *Peer)) { +// forAllInboundPeers is a helper function that runs a callback on all inbound +// peers known to peerState. +// The loop stops and returns false if one of the callback calls returns false. +// Otherwise the function should return true. +func (ps *peerState) forAllInboundPeers(callback func(sp *Peer) bool) bool { for _, e := range ps.inboundPeers { - closure(e) + shouldContinue := callback(e) + if !shouldContinue { + return false + } } - ps.forAllOutboundPeers(closure) + return true +} + +// forAllPeers is a helper function that runs a callback on all peers known to +// peerState. +// The loop stops and returns false if one of the callback calls returns false. +// Otherwise the function should return true. +func (ps *peerState) forAllPeers(callback func(sp *Peer) bool) bool { + shouldContinue := ps.forAllInboundPeers(callback) + if !shouldContinue { + return false + } + ps.forAllOutboundPeers(callback) + return true } // cfHeaderKV is a tuple of a filter header and its associated block hash. The @@ -243,7 +270,6 @@ type Server struct { Query chan interface{} relayInv chan relayMsg broadcast chan broadcastMsg - peerHeightsUpdate chan updatePeerHeightsMsg wg sync.WaitGroup quit chan struct{} nat serverutils.NAT @@ -286,11 +312,15 @@ func newServerPeer(s *Server, isPersistent bool) *Peer { } } -// newestBlock returns the current best block hash and height using the format -// required by the configuration for the peer package. -func (sp *Peer) newestBlock() (*daghash.Hash, int32, error) { - highestTipHash := sp.server.DAG.HighestTipHash() - return &highestTipHash, sp.server.DAG.Height(), nil //TODO: (Ori) This is probably wrong. Done only for compilation +// selectedTip returns the current selected tip +func (sp *Peer) selectedTip() *daghash.Hash { + return sp.server.DAG.SelectedTipHash() +} + +// blockExists determines whether a block with the given hash exists in +// the DAG. +func (sp *Peer) blockExists(hash *daghash.Hash) (bool, error) { + return sp.server.DAG.BlockExists(hash) } // addKnownAddresses adds the given addresses to the set of known addresses to @@ -617,6 +647,8 @@ func (sp *Peer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { switch iv.Type { case wire.InvTypeTx: err = sp.server.pushTxMsg(sp, (*daghash.TxID)(&iv.Hash), c, waitChan) + case wire.InvTypeSyncBlock: + fallthrough case wire.InvTypeBlock: err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan) case wire.InvTypeFilteredBlock: @@ -675,7 +707,7 @@ func (sp *Peer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) { // Generate inventory message. invMsg := wire.NewMsgInv() for i := range hashList { - iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i]) + iv := wire.NewInvVect(wire.InvTypeSyncBlock, &hashList[i]) invMsg.AddInvVect(iv) } @@ -712,8 +744,8 @@ func (sp *Peer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) { // over with the genesis block if unknown block locators are provided. // // This mirrors the behavior in the reference implementation. - chain := sp.server.DAG - headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop) + dag := sp.server.DAG + headers := dag.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop) // Send found headers to the requesting peer. blockHeaders := make([]*wire.BlockHeader, len(headers)) @@ -1381,34 +1413,6 @@ func (s *Server) pushMerkleBlockMsg(sp *Peer, hash *daghash.Hash, return nil } -// handleUpdatePeerHeight updates the heights of all peers who were known to -// announce a block we recently accepted. -func (s *Server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) { - state.forAllPeers(func(sp *Peer) { - // The origin peer should already have the updated height. - if sp.Peer == umsg.originPeer { - return - } - - // This is a pointer to the underlying memory which doesn't - // change. - latestBlkHash := sp.LastAnnouncedBlock() - - // Skip this peer if it hasn't recently announced any new blocks. - if latestBlkHash == nil { - return - } - - // If the peer has recently announced a block, and this block - // matches our newly accepted block, then update their block - // height. - if *latestBlkHash == *umsg.newHash { - sp.UpdateLastBlockHeight(umsg.newHeight) - sp.UpdateLastAnnouncedBlock(nil) - } - }) -} - // handleAddPeerMsg deals with adding new peers. It is invoked from the // peerHandler goroutine. func (s *Server) handleAddPeerMsg(state *peerState, sp *Peer) bool { @@ -1524,9 +1528,9 @@ func (s *Server) handleBanPeerMsg(state *peerState, sp *Peer) { // handleRelayInvMsg deals with relaying inventory to peers that are not already // known to have it. It is invoked from the peerHandler goroutine. func (s *Server) handleRelayInvMsg(state *peerState, msg relayMsg) { - state.forAllPeers(func(sp *Peer) { + state.forAllPeers(func(sp *Peer) bool { if !sp.Connected() { - return + return true } // If the inventory is a block and the peer prefers headers, @@ -1537,23 +1541,23 @@ func (s *Server) handleRelayInvMsg(state *peerState, msg relayMsg) { if !ok { peerLog.Warnf("Underlying data for headers" + " is not a block header") - return + return true } msgHeaders := wire.NewMsgHeaders() if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil { peerLog.Errorf("Failed to add block"+ " header: %s", err) - return + return true } sp.QueueMessage(msgHeaders, nil) - return + return true } if msg.invVect.Type == wire.InvTypeTx { // Don't relay the transaction to the peer when it has // transaction relaying disabled. if sp.relayTxDisabled() { - return + return true } txD, ok := msg.data.(*mempool.TxDesc) @@ -1561,28 +1565,28 @@ func (s *Server) handleRelayInvMsg(state *peerState, msg relayMsg) { peerLog.Warnf("Underlying data for tx inv "+ "relay is not a *mempool.TxDesc: %T", msg.data) - return + return true } // Don't relay the transaction if the transaction fee-per-kb // is less than the peer's feefilter. feeFilter := uint64(atomic.LoadInt64(&sp.FeeFilterInt)) if feeFilter > 0 && txD.FeePerKB < feeFilter { - return + return true } // Don't relay the transaction if there is a bloom // filter loaded and the transaction doesn't match it. if sp.filter.IsLoaded() { if !sp.filter.MatchTxAndUpdate(txD.Tx) { - return + return true } } // Don't relay the transaction if the peer's subnetwork is // incompatible with it. if !txD.Tx.MsgTx().IsSubnetworkCompatible(sp.Peer.SubnetworkID()) { - return + return true } } @@ -1590,24 +1594,26 @@ func (s *Server) handleRelayInvMsg(state *peerState, msg relayMsg) { // It will be ignored if the peer is already known to // have the inventory. sp.QueueInventory(msg.invVect) + return true }) } // handleBroadcastMsg deals with broadcasting messages to peers. It is invoked // from the peerHandler goroutine. func (s *Server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) { - state.forAllPeers(func(sp *Peer) { + state.forAllPeers(func(sp *Peer) bool { if !sp.Connected() { - return + return true } for _, ep := range bmsg.excludePeers { if sp == ep { - return + return true } } sp.QueueMessage(bmsg.message, nil) + return true }) } @@ -1615,6 +1621,10 @@ type getConnCountMsg struct { reply chan int32 } +type getShouldMineOnGenesisMsg struct { + reply chan bool +} + //GetPeersMsg is the message type which is used by the rpc server to get the peers list from the p2p server type GetPeersMsg struct { Reply chan []*Peer @@ -1655,20 +1665,36 @@ func (s *Server) handleQuery(state *peerState, querymsg interface{}) { switch msg := querymsg.(type) { case getConnCountMsg: nconnected := int32(0) - state.forAllPeers(func(sp *Peer) { + state.forAllPeers(func(sp *Peer) bool { if sp.Connected() { nconnected++ } + return true }) msg.reply <- nconnected + case getShouldMineOnGenesisMsg: + shouldMineOnGenesis := true + if state.Count() != 0 { + shouldMineOnGenesis = state.forAllPeers(func(sp *Peer) bool { + if !sp.SelectedTip().IsEqual(s.DAGParams.GenesisHash) { + return false + } + return true + }) + } else { + shouldMineOnGenesis = false + } + msg.reply <- shouldMineOnGenesis + case GetPeersMsg: peers := make([]*Peer, 0, state.Count()) - state.forAllPeers(func(sp *Peer) { + state.forAllPeers(func(sp *Peer) bool { if !sp.Connected() { - return + return true } peers = append(peers, sp) + return true }) msg.Reply <- peers @@ -1816,7 +1842,8 @@ func newPeerConfig(sp *Peer) *peer.Config { // other implementations' alert messages, we will not relay theirs. OnAlert: nil, }, - NewestBlock: sp.newestBlock, + SelectedTip: sp.selectedTip, + BlockExists: sp.blockExists, HostToNetAddress: sp.server.addrManager.HostToNetAddress, Proxy: config.MainConfig().Proxy, UserAgentName: userAgentName, @@ -1937,10 +1964,6 @@ out: case p := <-s.donePeers: s.handleDonePeerMsg(state, p) - // Block accepted in mainchain or orphan, update peer height. - case umsg := <-s.peerHeightsUpdate: - s.handleUpdatePeerHeights(state, umsg) - // Peer to ban. case p := <-s.banPeers: s.handleBanPeerMsg(state, p) @@ -1959,9 +1982,10 @@ out: case <-s.quit: // Disconnect all peers on server shutdown. - state.forAllPeers(func(sp *Peer) { + state.forAllPeers(func(sp *Peer) bool { srvrLog.Tracef("Shutdown peer %s", sp) sp.Disconnect() + return true }) break out } @@ -1978,7 +2002,6 @@ cleanup: select { case <-s.newPeers: case <-s.donePeers: - case <-s.peerHeightsUpdate: case <-s.relayInv: case <-s.broadcast: case <-s.Query: @@ -2024,6 +2047,17 @@ func (s *Server) ConnectedCount() int32 { return <-replyChan } +// ShouldMineOnGenesis checks if the node is connected to at least one +// peer, and at least one of its peers knows of any blocks that were mined +// on top of the genesis block. +func (s *Server) ShouldMineOnGenesis() bool { + replyChan := make(chan bool) + + s.Query <- getShouldMineOnGenesisMsg{reply: replyChan} + + return <-replyChan +} + // OutboundGroupCount returns the number of peers connected to the given // outbound group key. func (s *Server) OutboundGroupCount(key string) int { @@ -2051,18 +2085,6 @@ func (s *Server) NetTotals() (uint64, uint64) { atomic.LoadUint64(&s.bytesSent) } -// UpdatePeerHeights updates the heights of all peers who have have announced -// the latest connected main chain block, or a recognized orphan. These height -// updates allow us to dynamically refresh peer heights, ensuring sync peer -// selection has access to the latest block heights for each peer. -func (s *Server) UpdatePeerHeights(latestBlkHash *daghash.Hash, latestHeight int32, updateSource *peer.Peer) { - s.peerHeightsUpdate <- updatePeerHeightsMsg{ - newHash: latestBlkHash, - newHeight: latestHeight, - originPeer: updateSource, - } -} - // rebroadcastHandler keeps track of user submitted inventories that we have // sent out but have not yet made it into a block. We periodically rebroadcast // them in case our peers restarted or otherwise lost track of them. @@ -2350,7 +2372,6 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params broadcast: make(chan broadcastMsg, config.MainConfig().MaxPeers), quit: make(chan struct{}), modifyRebroadcastInv: make(chan interface{}), - peerHeightsUpdate: make(chan updatePeerHeightsMsg), nat: nat, db: db, TimeSource: blockdag.NewMedianTime(), diff --git a/server/rpc/rpcserver.go b/server/rpc/rpcserver.go index 926a8ceab..f57533550 100644 --- a/server/rpc/rpcserver.go +++ b/server/rpc/rpcserver.go @@ -2450,25 +2450,24 @@ func handleGetPeerInfo(s *Server, cmd interface{}, closeChan <-chan struct{}) (i for _, p := range peers { statsSnap := p.ToPeer().StatsSnapshot() info := &btcjson.GetPeerInfoResult{ - ID: statsSnap.ID, - Addr: statsSnap.Addr, - Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)), - RelayTxes: !p.IsTxRelayDisabled(), - LastSend: statsSnap.LastSend.Unix(), - LastRecv: statsSnap.LastRecv.Unix(), - BytesSent: statsSnap.BytesSent, - BytesRecv: statsSnap.BytesRecv, - ConnTime: statsSnap.ConnTime.Unix(), - PingTime: float64(statsSnap.LastPingMicros), - TimeOffset: statsSnap.TimeOffset, - Version: statsSnap.Version, - SubVer: statsSnap.UserAgent, - Inbound: statsSnap.Inbound, - StartingHeight: statsSnap.StartingHeight, - CurrentHeight: statsSnap.LastBlock, - BanScore: int32(p.BanScore()), - FeeFilter: p.FeeFilter(), - SyncNode: statsSnap.ID == syncPeerID, + ID: statsSnap.ID, + Addr: statsSnap.Addr, + Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)), + RelayTxes: !p.IsTxRelayDisabled(), + LastSend: statsSnap.LastSend.Unix(), + LastRecv: statsSnap.LastRecv.Unix(), + BytesSent: statsSnap.BytesSent, + BytesRecv: statsSnap.BytesRecv, + ConnTime: statsSnap.ConnTime.Unix(), + PingTime: float64(statsSnap.LastPingMicros), + TimeOffset: statsSnap.TimeOffset, + Version: statsSnap.Version, + SubVer: statsSnap.UserAgent, + Inbound: statsSnap.Inbound, + SelectedTip: statsSnap.SelectedTip.String(), + BanScore: int32(p.BanScore()), + FeeFilter: p.FeeFilter(), + SyncNode: statsSnap.ID == syncPeerID, } if p.ToPeer().LastPingNonce() != 0 { wait := float64(time.Since(statsSnap.LastPingTime).Nanoseconds()) diff --git a/server/rpc/rpcserverhelp.go b/server/rpc/rpcserverhelp.go index 3f82aabcb..e5b58153a 100644 --- a/server/rpc/rpcserverhelp.go +++ b/server/rpc/rpcserverhelp.go @@ -450,26 +450,25 @@ var helpDescsEnUS = map[string]string{ "getNetTotalsResult-timeMillis": "Number of milliseconds since 1 Jan 1970 GMT", // GetPeerInfoResult help. - "getPeerInfoResult-id": "A unique node ID", - "getPeerInfoResult-addr": "The ip address and port of the peer", - "getPeerInfoResult-services": "Services bitmask which represents the services supported by the peer", - "getPeerInfoResult-relayTxes": "Peer has requested transactions be relayed to it", - "getPeerInfoResult-lastSend": "Time the last message was received in seconds since 1 Jan 1970 GMT", - "getPeerInfoResult-lastRecv": "Time the last message was sent in seconds since 1 Jan 1970 GMT", - "getPeerInfoResult-bytesSent": "Total bytes sent", - "getPeerInfoResult-bytesRecv": "Total bytes received", - "getPeerInfoResult-connTime": "Time the connection was made in seconds since 1 Jan 1970 GMT", - "getPeerInfoResult-timeOffset": "The time offset of the peer", - "getPeerInfoResult-pingTime": "Number of microseconds the last ping took", - "getPeerInfoResult-pingWait": "Number of microseconds a queued ping has been waiting for a response", - "getPeerInfoResult-version": "The protocol version of the peer", - "getPeerInfoResult-subVer": "The user agent of the peer", - "getPeerInfoResult-inbound": "Whether or not the peer is an inbound connection", - "getPeerInfoResult-startingHeight": "The latest block height the peer knew about when the connection was established", - "getPeerInfoResult-currentHeight": "The current height of the peer", - "getPeerInfoResult-banScore": "The ban score", - "getPeerInfoResult-feeFilter": "The requested minimum fee a transaction must have to be announced to the peer", - "getPeerInfoResult-syncNode": "Whether or not the peer is the sync peer", + "getPeerInfoResult-id": "A unique node ID", + "getPeerInfoResult-addr": "The ip address and port of the peer", + "getPeerInfoResult-services": "Services bitmask which represents the services supported by the peer", + "getPeerInfoResult-relayTxes": "Peer has requested transactions be relayed to it", + "getPeerInfoResult-lastSend": "Time the last message was received in seconds since 1 Jan 1970 GMT", + "getPeerInfoResult-lastRecv": "Time the last message was sent in seconds since 1 Jan 1970 GMT", + "getPeerInfoResult-bytesSent": "Total bytes sent", + "getPeerInfoResult-bytesRecv": "Total bytes received", + "getPeerInfoResult-connTime": "Time the connection was made in seconds since 1 Jan 1970 GMT", + "getPeerInfoResult-timeOffset": "The time offset of the peer", + "getPeerInfoResult-pingTime": "Number of microseconds the last ping took", + "getPeerInfoResult-pingWait": "Number of microseconds a queued ping has been waiting for a response", + "getPeerInfoResult-version": "The protocol version of the peer", + "getPeerInfoResult-subVer": "The user agent of the peer", + "getPeerInfoResult-inbound": "Whether or not the peer is an inbound connection", + "getPeerInfoResult-selectedTip": "The selected tip of the peer", + "getPeerInfoResult-banScore": "The ban score", + "getPeerInfoResult-feeFilter": "The requested minimum fee a transaction must have to be announced to the peer", + "getPeerInfoResult-syncNode": "Whether or not the peer is the sync peer", // GetPeerInfoCmd help. "getPeerInfo--synopsis": "Returns data about each connected network peer as an array of json objects.", diff --git a/server/server.go b/server/server.go index b3c6e92f9..742778cd9 100644 --- a/server/server.go +++ b/server/server.go @@ -114,6 +114,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params MiningAddrs: cfg.MiningAddrs, ProcessBlock: s.p2pServer.SyncManager.ProcessBlock, ConnectedCount: s.p2pServer.ConnectedCount, + ShouldMineOnGenesis: s.p2pServer.ShouldMineOnGenesis, IsCurrent: s.p2pServer.SyncManager.IsCurrent, }) diff --git a/wire/invvect.go b/wire/invvect.go index dc755557b..bd1f4337d 100644 --- a/wire/invvect.go +++ b/wire/invvect.go @@ -29,6 +29,7 @@ const ( InvTypeTx InvType = 1 InvTypeBlock InvType = 2 InvTypeFilteredBlock InvType = 3 + InvTypeSyncBlock InvType = 4 ) // Map of service flags back to their constant names for pretty printing. @@ -37,6 +38,7 @@ var ivStrings = map[InvType]string{ InvTypeTx: "MSG_TX", InvTypeBlock: "MSG_BLOCK", InvTypeFilteredBlock: "MSG_FILTERED_BLOCK", + InvTypeSyncBlock: "MSG_SYNC_BLOCK", } // String returns the InvType in human-readable form. @@ -78,3 +80,8 @@ func writeInvVect(w io.Writer, pver uint32, iv *InvVect) error { func (iv *InvVect) String() string { return fmt.Sprintf("{%s:%s}", iv.Type, iv.Hash) } + +// IsBlockOrSyncBlock returns true if the inv type is InvTypeBlock or InvTypeSyncBlock +func (iv *InvVect) IsBlockOrSyncBlock() bool { + return iv.Type == InvTypeBlock || iv.Type == InvTypeSyncBlock +} diff --git a/wire/message_test.go b/wire/message_test.go index 6ef325032..33613375d 100644 --- a/wire/message_test.go +++ b/wire/message_test.go @@ -47,7 +47,7 @@ func TestMessage(t *testing.T) { addrMe := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8333} me := NewNetAddress(addrMe, SFNodeNetwork) me.Timestamp = time.Time{} // Version message has zero value timestamp. - msgVersion := NewMsgVersion(me, you, 123123, 0, subnetworkid.SubnetworkIDSupportsAll) + msgVersion := NewMsgVersion(me, you, 123123, &daghash.ZeroHash, subnetworkid.SubnetworkIDSupportsAll) msgVerack := NewMsgVerAck() msgGetAddr := NewMsgGetAddr(nil) @@ -87,7 +87,7 @@ func TestMessage(t *testing.T) { btcnet BitcoinNet // Network to use for wire encoding bytes int // Expected num bytes read/written }{ - {msgVersion, msgVersion, pver, MainNet, 145}, + {msgVersion, msgVersion, pver, MainNet, 173}, {msgVerack, msgVerack, pver, MainNet, 24}, {msgGetAddr, msgGetAddr, pver, MainNet, 25}, {msgAddr, msgAddr, pver, MainNet, 26}, diff --git a/wire/msgversion.go b/wire/msgversion.go index fc1f3c31f..8458d93db 100644 --- a/wire/msgversion.go +++ b/wire/msgversion.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/daglabs/btcd/dagconfig/daghash" "github.com/daglabs/btcd/util/subnetworkid" ) @@ -52,8 +53,8 @@ type MsgVersion struct { // on the wire. This has a max length of MaxUserAgentLen. UserAgent string - // Last block seen by the generator of the version message. - LastBlock int32 + // The selected tip of the generator of the version message. + SelectedTip *daghash.Hash // Don't announce transactions to peer. DisableRelayTx bool @@ -104,55 +105,36 @@ func (msg *MsgVersion) BtcDecode(r io.Reader, pver uint32) error { return err } - // Protocol versions >= 106 added a from address, nonce, and user agent - // field and they are only considered present if there are bytes - // remaining in the message. - if buf.Len() > 0 { - err = readNetAddress(buf, pver, &msg.AddrMe, false) - if err != nil { - return err - } + err = readNetAddress(buf, pver, &msg.AddrMe, false) + if err != nil { + return err } - if buf.Len() > 0 { - err = readElement(buf, &msg.Nonce) - if err != nil { - return err - } + err = readElement(buf, &msg.Nonce) + if err != nil { + return err } - if buf.Len() > 0 { - userAgent, err := ReadVarString(buf, pver) - if err != nil { - return err - } - err = validateUserAgent(userAgent) - if err != nil { - return err - } - msg.UserAgent = userAgent + userAgent, err := ReadVarString(buf, pver) + if err != nil { + return err + } + err = validateUserAgent(userAgent) + if err != nil { + return err + } + msg.UserAgent = userAgent + + msg.SelectedTip = &daghash.Hash{} + err = readElement(buf, msg.SelectedTip) + if err != nil { + return err } - // Protocol versions >= 209 added a last known block field. It is only - // considered present if there are bytes remaining in the message. - if buf.Len() > 0 { - err = readElement(buf, &msg.LastBlock) - if err != nil { - return err - } - } - - // There was no relay transactions field before BIP0037Version, but - // the default behavior prior to the addition of the field was to always - // relay transactions. - if buf.Len() > 0 { - // It's safe to ignore the error here since the buffer has at - // least one byte and that byte will result in a boolean value - // regardless of its value. Also, the wire encoding for the - // field is true when transactions should be relayed, so reverse - // it for the DisableRelayTx field. - var relayTx bool - readElement(r, &relayTx) - msg.DisableRelayTx = !relayTx + var relayTx bool + err = readElement(r, &relayTx) + if err != nil { + return err } + msg.DisableRelayTx = !relayTx return nil } @@ -196,7 +178,7 @@ func (msg *MsgVersion) BtcEncode(w io.Writer, pver uint32) error { return err } - err = writeElement(w, msg.LastBlock) + err = writeElement(w, msg.SelectedTip) if err != nil { return err } @@ -233,7 +215,7 @@ func (msg *MsgVersion) MaxPayloadLength(pver uint32) uint32 { // Message interface using the passed parameters and defaults for the remaining // fields. func NewMsgVersion(me *NetAddress, you *NetAddress, nonce uint64, - lastBlock int32, subnetworkID *subnetworkid.SubnetworkID) *MsgVersion { + selectedTip *daghash.Hash, subnetworkID *subnetworkid.SubnetworkID) *MsgVersion { // Limit the timestamp to one second precision since the protocol // doesn't support better. @@ -245,7 +227,7 @@ func NewMsgVersion(me *NetAddress, you *NetAddress, nonce uint64, AddrMe: *me, Nonce: nonce, UserAgent: DefaultUserAgent, - LastBlock: lastBlock, + SelectedTip: selectedTip, DisableRelayTx: false, SubnetworkID: *subnetworkID, } diff --git a/wire/msgversion_test.go b/wire/msgversion_test.go index f927986a9..0986ae9da 100644 --- a/wire/msgversion_test.go +++ b/wire/msgversion_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/daglabs/btcd/dagconfig/daghash" "github.com/daglabs/btcd/util/random" "github.com/daglabs/btcd/util/subnetworkid" "github.com/davecgh/go-spew/spew" @@ -23,7 +24,7 @@ func TestVersion(t *testing.T) { pver := ProtocolVersion // Create version message data. - lastBlock := int32(234234) + selectedTip := &daghash.Hash{12, 34} tcpAddrMe := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8333} me := NewNetAddress(tcpAddrMe, SFNodeNetwork) tcpAddrYou := &net.TCPAddr{IP: net.ParseIP("192.168.0.1"), Port: 8333} @@ -34,7 +35,7 @@ func TestVersion(t *testing.T) { } // Ensure we get the correct data back out. - msg := NewMsgVersion(me, you, nonce, lastBlock, subnetworkid.SubnetworkIDSupportsAll) + msg := NewMsgVersion(me, you, nonce, selectedTip, subnetworkid.SubnetworkIDSupportsAll) if msg.ProtocolVersion != int32(pver) { t.Errorf("NewMsgVersion: wrong protocol version - got %v, want %v", msg.ProtocolVersion, pver) @@ -55,9 +56,9 @@ func TestVersion(t *testing.T) { t.Errorf("NewMsgVersion: wrong user agent - got %v, want %v", msg.UserAgent, DefaultUserAgent) } - if msg.LastBlock != lastBlock { - t.Errorf("NewMsgVersion: wrong last block - got %v, want %v", - msg.LastBlock, lastBlock) + if !msg.SelectedTip.IsEqual(selectedTip) { + t.Errorf("NewMsgVersion: wrong selected tip - got %s, want %s", + msg.SelectedTip, selectedTip) } if msg.DisableRelayTx { t.Errorf("NewMsgVersion: disable relay tx is not false by "+ @@ -131,13 +132,12 @@ func TestVersion(t *testing.T) { // TestVersionWire tests the MsgVersion wire encode and decode for various // protocol versions. func TestVersionWire(t *testing.T) { - // verRelayTxFalse and verRelayTxFalseEncoded is a version message as of - // BIP0037Version with the transaction relay disabled. - baseVersionBIP0037Copy := *baseVersionBIP0037 - verRelayTxFalse := &baseVersionBIP0037Copy + // verRelayTxFalse and verRelayTxFalseEncoded is a version message with the transaction relay disabled. + baseVersionWithRelayTxCopy := *baseVersionWithRelayTx + verRelayTxFalse := &baseVersionWithRelayTxCopy verRelayTxFalse.DisableRelayTx = true - verRelayTxFalseEncoded := make([]byte, len(baseVersionBIP0037Encoded)) - copy(verRelayTxFalseEncoded, baseVersionBIP0037Encoded) + verRelayTxFalseEncoded := make([]byte, len(baseVersionWithRelayTxEncoded)) + copy(verRelayTxFalseEncoded, baseVersionWithRelayTxEncoded) verRelayTxFalseEncoded[len(verRelayTxFalseEncoded)-1] = 0 tests := []struct { @@ -148,9 +148,15 @@ func TestVersionWire(t *testing.T) { }{ // Latest protocol version. { - baseVersionBIP0037, - baseVersionBIP0037, - baseVersionBIP0037Encoded, + baseVersionWithRelayTx, + baseVersionWithRelayTx, + baseVersionWithRelayTxEncoded, + ProtocolVersion, + }, + { + verRelayTxFalse, + verRelayTxFalse, + verRelayTxFalseEncoded, ProtocolVersion, }, } @@ -299,107 +305,6 @@ func TestVersionWireErrors(t *testing.T) { } } -// TestVersionOptionalFields performs tests to ensure that an encoded version -// messages that omit optional fields are handled correctly. -func TestVersionOptionalFields(t *testing.T) { - // onlyRequiredVersion is a version message that only contains the - // required versions and all other values set to their default values. - onlyRequiredVersion := MsgVersion{ - ProtocolVersion: 60002, - Services: SFNodeNetwork, - Timestamp: time.Unix(0x495fab29, 0), // 2009-01-03 12:15:05 -0600 CST) - AddrYou: NetAddress{ - Timestamp: time.Time{}, // Zero value -- no timestamp in version - Services: SFNodeNetwork, - IP: net.ParseIP("192.168.0.1"), - Port: 8333, - }, - } - onlyRequiredVersionEncoded := make([]byte, len(baseVersionEncoded)-55) - copy(onlyRequiredVersionEncoded, baseVersionEncoded) - - // addrMeVersion is a version message that contains all fields through - // the AddrMe field. - addrMeVersion := onlyRequiredVersion - addrMeVersion.AddrMe = NetAddress{ - Timestamp: time.Time{}, // Zero value -- no timestamp in version - Services: SFNodeNetwork, - IP: net.ParseIP("127.0.0.1"), - Port: 8333, - } - addrMeVersionEncoded := make([]byte, len(baseVersionEncoded)-29) - copy(addrMeVersionEncoded, baseVersionEncoded) - - // nonceVersion is a version message that contains all fields through - // the Nonce field. - nonceVersion := addrMeVersion - nonceVersion.Nonce = 123123 // 0x1e0f3 - nonceVersionEncoded := make([]byte, len(baseVersionEncoded)-21) - copy(nonceVersionEncoded, baseVersionEncoded) - - // uaVersion is a version message that contains all fields through - // the UserAgent field. - uaVersion := nonceVersion - uaVersion.UserAgent = "/btcdtest:0.0.1/" - uaVersionEncoded := make([]byte, len(baseVersionEncoded)-4) - copy(uaVersionEncoded, baseVersionEncoded) - - // lastBlockVersion is a version message that contains all fields - // through the LastBlock field. - lastBlockVersion := uaVersion - lastBlockVersion.LastBlock = 234234 // 0x392fa - lastBlockVersionEncoded := make([]byte, len(baseVersionEncoded)) - copy(lastBlockVersionEncoded, baseVersionEncoded) - - tests := []struct { - msg *MsgVersion // Expected message - buf []byte // Wire encoding - pver uint32 // Protocol version for wire encoding - }{ - { - &onlyRequiredVersion, - onlyRequiredVersionEncoded, - ProtocolVersion, - }, - { - &addrMeVersion, - addrMeVersionEncoded, - ProtocolVersion, - }, - { - &nonceVersion, - nonceVersionEncoded, - ProtocolVersion, - }, - { - &uaVersion, - uaVersionEncoded, - ProtocolVersion, - }, - { - &lastBlockVersion, - lastBlockVersionEncoded, - ProtocolVersion, - }, - } - - for i, test := range tests { - // Decode the message from wire format. - var msg MsgVersion - rbuf := bytes.NewBuffer(test.buf) - err := msg.BtcDecode(rbuf, test.pver) - if err != nil { - t.Errorf("BtcDecode #%d error %v", i, err) - continue - } - if !reflect.DeepEqual(&msg, test.msg) { - t.Errorf("BtcDecode #%d\n got: %s want: %s", i, - spew.Sdump(msg), spew.Sdump(test.msg)) - continue - } - } -} - // baseVersion is used in the various tests as a baseline MsgVersion. var baseVersion = &MsgVersion{ ProtocolVersion: 60002, @@ -417,9 +322,9 @@ var baseVersion = &MsgVersion{ IP: net.ParseIP("127.0.0.1"), Port: 8333, }, - Nonce: 123123, // 0x1e0f3 - UserAgent: "/btcdtest:0.0.1/", - LastBlock: 234234, // 0x392fa + Nonce: 123123, // 0x1e0f3 + UserAgent: "/btcdtest:0.0.1/", + SelectedTip: &daghash.Hash{0x12, 0x34}, } // baseVersionEncoded is the wire encoded bytes for baseVersion using protocol @@ -445,12 +350,14 @@ var baseVersionEncoded = []byte{ 0x10, // Varint for user agent length 0x2f, 0x62, 0x74, 0x63, 0x64, 0x74, 0x65, 0x73, 0x74, 0x3a, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x2f, // User agent - 0xfa, 0x92, 0x03, 0x00, // Last block + 0x12, 0x34, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Selected Tip } -// baseVersionBIP0037 is used in the various tests as a baseline MsgVersion for -// BIP0037. -var baseVersionBIP0037 = &MsgVersion{ +// baseVersionWithRelayTx is used in the various tests as a baseline MsgVersion +var baseVersionWithRelayTx = &MsgVersion{ ProtocolVersion: 70001, Services: SFNodeNetwork, Timestamp: time.Unix(0x495fab29, 0), // 2009-01-03 12:15:05 -0600 CST) @@ -466,14 +373,14 @@ var baseVersionBIP0037 = &MsgVersion{ IP: net.ParseIP("127.0.0.1"), Port: 8333, }, - Nonce: 123123, // 0x1e0f3 - UserAgent: "/btcdtest:0.0.1/", - LastBlock: 234234, // 0x392fa + Nonce: 123123, // 0x1e0f3 + UserAgent: "/btcdtest:0.0.1/", + SelectedTip: &daghash.Hash{0x12, 0x34}, } -// baseVersionBIP0037Encoded is the wire encoded bytes for baseVersionBIP0037 -// using protocol version BIP0037Version and is used in the various tests. -var baseVersionBIP0037Encoded = []byte{ +// baseVersionWithRelayTxEncoded is the wire encoded bytes for +// baseVersionWithRelayTx and is used in the various tests. +var baseVersionWithRelayTxEncoded = []byte{ 0x71, 0x11, 0x01, 0x00, // Protocol version 70001 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // SFNodeNetwork 0x29, 0xab, 0x5f, 0x49, 0x00, 0x00, 0x00, 0x00, // 64-bit Timestamp @@ -490,10 +397,13 @@ var baseVersionBIP0037Encoded = []byte{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x01, // IP 127.0.0.1 0x20, 0x8d, // Port 8333 in big-endian - 0xf3, 0xe0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, // Fake Nonce. TODO: (Ori) Replace to a real nonce + 0xf3, 0xe0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, // Nonce 0x10, // Varint for user agent length 0x2f, 0x62, 0x74, 0x63, 0x64, 0x74, 0x65, 0x73, 0x74, 0x3a, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x2f, // User agent - 0xfa, 0x92, 0x03, 0x00, // Last block + 0x12, 0x34, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Selected Tip 0x01, // Relay tx }