[NOD-652] Add selected tip and get selected tip messages (#595)

* [NOD-652] Add selectedTip and getSelectedTip messages

* [NOD-652] Remove peerSyncState.isSelectedTipKnown

* [NOD-652] Do nothing on OnSelectedTip if the peer selected tip hasn't changed

* [NOD-652] Handle selected tip message with block handler

* [NOD-652] Add comments

* [NOD-652] go mod tidy

* [NOD-652] Fix TestVersion

* [NOD-652] Use dag.AdjustedTime instead of dag.timeSource.AdjustedTime

* [NOD-652] Create shouldQueryPeerSelectedTips and queueMsgGetSelectedTip functions

* [NOD-652] Change selectedTip to selectedTipHash where needed

* [NOD-652] add minDAGTimeDelay constant

* [NOD-652] add comments

* [NOD-652] Fix names and comments

* [NOD-652] Put msg.reply push in the right place

* [NOD-652] Fix comments and names
This commit is contained in:
Ori Newman 2020-01-22 16:34:21 +02:00 committed by Svarog
parent 94ec159147
commit 29bcc271b5
26 changed files with 529 additions and 123 deletions

View File

@ -114,7 +114,7 @@ func (dag *BlockDAG) newBlockNode(blockHeader *wire.BlockHeader, parents blockSe
parents: parents,
children: make(blockSet),
blueScore: math.MaxUint64, // Initialized to the max value to avoid collisions with the genesis block
timestamp: dag.timeSource.AdjustedTime().Unix(),
timestamp: dag.AdjustedTime().Unix(),
bluesAnticoneSizes: make(map[daghash.Hash]uint32),
}

View File

@ -1274,10 +1274,17 @@ func (dag *BlockDAG) isCurrent() bool {
} else {
dagTimestamp = selectedTip.timestamp
}
minus24Hours := dag.timeSource.AdjustedTime().Add(-24 * time.Hour).Unix()
minus24Hours := dag.AdjustedTime().Add(-24 * time.Hour).Unix()
return dagTimestamp >= minus24Hours
}
// AdjustedTime returns the adjusted time according to
// dag.timeSource. See MedianTimeSource.AdjustedTime for
// more details.
func (dag *BlockDAG) AdjustedTime() time.Time {
return dag.timeSource.AdjustedTime()
}
// IsCurrent returns whether or not the DAG believes it is current. Several
// factors are used to guess, but the key factors that allow the DAG to
// believe it is current are:
@ -1774,7 +1781,7 @@ func (dag *BlockDAG) SubnetworkID() *subnetworkid.SubnetworkID {
}
func (dag *BlockDAG) addDelayedBlock(block *util.Block, delay time.Duration) error {
processTime := dag.timeSource.AdjustedTime().Add(delay)
processTime := dag.AdjustedTime().Add(delay)
log.Debugf("Adding block to delayed blocks queue (block hash: %s, process time: %s)", block.Hash().String(), processTime)
delayedBlock := &delayedBlock{
block: block,
@ -1792,7 +1799,7 @@ func (dag *BlockDAG) processDelayedBlocks() error {
// Check if the delayed block with the earliest process time should be processed
for dag.delayedBlocksQueue.Len() > 0 {
earliestDelayedBlockProcessTime := dag.peekDelayedBlock().processTime
if earliestDelayedBlockProcessTime.After(dag.timeSource.AdjustedTime()) {
if earliestDelayedBlockProcessTime.After(dag.AdjustedTime()) {
break
}
delayedBlock := dag.popDelayedBlock()

View File

@ -114,7 +114,7 @@ func (dag *BlockDAG) NextBlockTime() time.Time {
// timestamp is truncated to a second boundary before comparison since a
// block timestamp does not supported a precision greater than one
// second.
newTimestamp := dag.timeSource.AdjustedTime()
newTimestamp := dag.AdjustedTime()
minTimestamp := dag.NextBlockMinimumTime()
if newTimestamp.Before(minTimestamp) {
newTimestamp = minTimestamp

View File

@ -251,7 +251,7 @@ func (dag *BlockDAG) maxDelayOfParents(parentHashes []*daghash.Hash) (delay time
for _, parentHash := range parentHashes {
if delayedParent, exists := dag.delayedBlocks[*parentHash]; exists {
isDelayed = true
parentDelay := delayedParent.processTime.Sub(dag.timeSource.AdjustedTime())
parentDelay := delayedParent.processTime.Sub(dag.AdjustedTime())
if parentDelay > delay {
delay = parentDelay
}

View File

@ -435,7 +435,7 @@ func (dag *BlockDAG) checkBlockHeaderSanity(header *wire.BlockHeader, flags Beha
// the duration of time that should be waited before the block becomes valid.
// This check needs to be last as it does not return an error but rather marks the
// header as delayed (and valid).
maxTimestamp := dag.timeSource.AdjustedTime().Add(time.Second *
maxTimestamp := dag.AdjustedTime().Add(time.Second *
time.Duration(int64(dag.TimestampDeviationTolerance)*dag.targetTimePerBlock))
if header.Timestamp.After(maxTimestamp) {
return header.Timestamp.Sub(maxTimestamp), nil

4
go.mod
View File

@ -16,8 +16,8 @@ require (
github.com/jrick/logrotate v1.0.0
github.com/kr/pretty v0.1.0 // indirect
github.com/pkg/errors v0.8.1
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 // indirect
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
golang.org/x/sys v0.0.0-20190426135247-a129542de9ae // indirect
golang.org/x/text v0.3.2 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect

8
go.sum
View File

@ -34,12 +34,12 @@ github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734 h1:p/H982KKEjUnLJkM3tt/LemDnOc1GiZL5FCVlORJ5zo=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190426135247-a129542de9ae h1:mQLHiymj/JXKnnjc62tb7nD5pZLs940/sXJu+Xp3DBA=

View File

@ -18,6 +18,7 @@ import (
"net"
"sync"
"sync/atomic"
"time"
)
const (
@ -32,6 +33,10 @@ const (
// maxRequestedTxns is the maximum number of requested transactions
// hashes to store in memory.
maxRequestedTxns = wire.MaxInvPerMsg
minGetSelectedTipInterval = time.Minute
minDAGTimeDelay = time.Minute
)
// newPeerMsg signifies a newly connected peer to the block handler.
@ -113,6 +118,12 @@ type pauseMsg struct {
unpause <-chan struct{}
}
type selectedTipMsg struct {
selectedTipHash *daghash.Hash
peer *peerpkg.Peer
reply chan struct{}
}
type requestQueueAndSet struct {
queue []*wire.InvVect
set map[daghash.Hash]struct{}
@ -121,11 +132,13 @@ type requestQueueAndSet struct {
// peerSyncState stores additional information that the SyncManager tracks
// about a peer.
type peerSyncState struct {
syncCandidate bool
requestQueueMtx sync.Mutex
requestQueues map[wire.InvType]*requestQueueAndSet
requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{}
syncCandidate bool
lastSelectedTipRequest time.Time
isPendingForSelectedTip bool
requestQueueMtx sync.Mutex
requestQueues map[wire.InvType]*requestQueueAndSet
requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{}
}
// SyncManager is used to communicate block related messages with peers. The
@ -169,8 +182,7 @@ func (sm *SyncManager) startSync() {
continue
}
if !peer.IsSyncCandidate() {
state.syncCandidate = false
if !peer.IsSelectedTipKnown() {
continue
}
@ -187,13 +199,38 @@ func (sm *SyncManager) startSync() {
sm.requestedBlocks = make(map[daghash.Hash]struct{})
log.Infof("Syncing to block %s from peer %s",
syncPeer.SelectedTip(), syncPeer.Addr())
syncPeer.SelectedTipHash(), syncPeer.Addr())
syncPeer.PushGetBlockLocatorMsg(syncPeer.SelectedTip(), sm.dagParams.GenesisHash)
syncPeer.PushGetBlockLocatorMsg(syncPeer.SelectedTipHash(), sm.dagParams.GenesisHash)
sm.syncPeer = syncPeer
} else {
log.Warnf("No sync peer candidates available")
return
}
log.Warnf("No sync peer candidates available")
if sm.shouldQueryPeerSelectedTips() {
for peer, state := range sm.peerStates {
if !state.syncCandidate {
continue
}
if time.Now().Sub(state.lastSelectedTipRequest) < minGetSelectedTipInterval {
continue
}
queueMsgGetSelectedTip(peer, state)
}
}
return
}
func (sm *SyncManager) shouldQueryPeerSelectedTips() bool {
return sm.dag.AdjustedTime().Sub(sm.dag.CalcPastMedianTime()) > minDAGTimeDelay
}
func queueMsgGetSelectedTip(peer *peerpkg.Peer, state *peerSyncState) {
state.lastSelectedTipRequest = time.Now()
state.isPendingForSelectedTip = true
peer.QueueMessage(wire.NewMsgGetSelectedTip(), nil)
}
// isSyncCandidate returns whether or not the peer is a candidate to consider
@ -827,6 +864,23 @@ func (sm *SyncManager) limitHashMap(m map[daghash.Hash]struct{}, limit int) {
}
}
func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
peer := msg.peer
selectedTipHash := msg.selectedTipHash
state := sm.peerStates[peer]
if !state.isPendingForSelectedTip {
log.Warnf("Got unrequested selected tip message from %s -- "+
"disconnecting", peer.Addr())
peer.Disconnect()
}
state.isPendingForSelectedTip = false
if selectedTipHash.IsEqual(peer.SelectedTipHash()) {
return
}
peer.SetSelectedTipHash(selectedTipHash)
sm.startSync()
}
// blockHandler is the main handler for the sync manager. It must be run as a
// goroutine. It processes block and inv messages in a separate goroutine
// from the peer handlers so the block (MsgBlock) messages are handled by a
@ -894,6 +948,10 @@ out:
// Wait until the sender unpauses the manager.
<-msg.unpause
case *selectedTipMsg:
sm.handleSelectedTipMsg(msg)
msg.reply <- struct{}{}
default:
log.Warnf("Invalid message type in block "+
"handler: %T", msg)
@ -992,6 +1050,17 @@ func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
sm.msgChan <- &invMsg{inv: inv, peer: peer}
}
// QueueSelectedTipMsg adds the passed selected tip message and peer to the
// block handling queue. Responds to the done channel argument after it finished
// handling the message.
func (sm *SyncManager) QueueSelectedTipMsg(msg *wire.MsgSelectedTip, peer *peerpkg.Peer, done chan struct{}) {
sm.msgChan <- &selectedTipMsg{
selectedTipHash: msg.SelectedTipHash,
peer: peer,
reply: done,
}
}
// DonePeer informs the blockmanager that a peer has disconnected.
func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
// Ignore if we are shutting down.

View File

@ -23,7 +23,7 @@ func mockRemotePeer() error {
UserAgentName: "peer", // User agent name to advertise.
UserAgentVersion: "1.0.0", // User agent version to advertise.
DAGParams: &dagconfig.SimnetParams,
SelectedTip: fakeSelectedTipFn,
SelectedTipHash: fakeSelectedTipFn,
}
// Accept connections on the simnet port.
@ -78,7 +78,7 @@ func Example_newOutboundPeer() {
verack <- struct{}{}
},
},
SelectedTip: fakeSelectedTipFn,
SelectedTipHash: fakeSelectedTipFn,
}
p, err := peer.NewOutboundPeer(peerCfg, "127.0.0.1:18555")
if err != nil {

View File

@ -96,7 +96,7 @@ func messageSummary(msg wire.Message) string {
switch msg := msg.(type) {
case *wire.MsgVersion:
return fmt.Sprintf("agent %s, pver %d, selected tip %s",
msg.UserAgent, msg.ProtocolVersion, msg.SelectedTip)
msg.UserAgent, msg.ProtocolVersion, msg.SelectedTipHash)
case *wire.MsgVerAck:
// No summary.

View File

@ -168,6 +168,14 @@ type MessageListeners struct {
// message.
OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders)
// OnGetSelectedTip is invoked when a peer receives a getSelectedTip kaspa
// message.
OnGetSelectedTip func()
// OnSelectedTip is invoked when a peer receives a selectedTip kaspa
// message.
OnSelectedTip func(p *Peer, msg *wire.MsgSelectedTip)
// OnRead is invoked when a peer receives a kaspa message. It
// consists of the number of bytes read, the message, and whether or not
// an error in the read occurred. Typically, callers will opt to use
@ -186,12 +194,12 @@ type MessageListeners struct {
// Config is the struct to hold configuration options useful to Peer.
type Config struct {
// SelectedTip specifies a callback which provides the selected tip
// SelectedTipHash specifies a callback which provides the selected tip
// to the peer as needed.
SelectedTip func() *daghash.Hash
SelectedTipHash func() *daghash.Hash
// SelectedTip specifies a callback which provides the selected tip
// to the peer as needed.
// BlockExists determines whether a block with the given hash exists in
// the DAG.
BlockExists func(*daghash.Hash) bool
// HostToNetAddress returns the netaddress for the given host. This can be
@ -329,22 +337,22 @@ type stallControlMsg struct {
// StatsSnap is a snapshot of peer stats at a point in time.
type StatsSnap struct {
ID int32
Addr string
Services wire.ServiceFlag
LastSend time.Time
LastRecv time.Time
BytesSent uint64
BytesRecv uint64
ConnTime time.Time
TimeOffset int64
Version uint32
UserAgent string
Inbound bool
SelectedTip *daghash.Hash
LastPingNonce uint64
LastPingTime time.Time
LastPingMicros int64
ID int32
Addr string
Services wire.ServiceFlag
LastSend time.Time
LastRecv time.Time
BytesSent uint64
BytesRecv uint64
ConnTime time.Time
TimeOffset int64
Version uint32
UserAgent string
Inbound bool
SelectedTipHash *daghash.Hash
LastPingNonce uint64
LastPingTime time.Time
LastPingMicros int64
}
// HashFunc is a function which returns a block hash, height and error
@ -423,13 +431,13 @@ type Peer struct {
// These fields keep track of statistics for the peer and are protected
// by the statsMtx mutex.
statsMtx sync.RWMutex
timeOffset int64
timeConnected time.Time
selectedTip *daghash.Hash
lastPingNonce uint64 // Set to nonce if we have a pending ping.
lastPingTime time.Time // Time we sent last ping.
lastPingMicros int64 // Time for last ping to return.
statsMtx sync.RWMutex
timeOffset int64
timeConnected time.Time
selectedTipHash *daghash.Hash
lastPingNonce uint64 // Set to nonce if we have a pending ping.
lastPingTime time.Time // Time we sent last ping.
lastPingMicros int64 // Time for last ping to return.
stallControl chan stallControlMsg
outputQueue chan outMsg
@ -474,22 +482,22 @@ func (p *Peer) StatsSnapshot() *StatsSnap {
// Get a copy of all relevant flags and stats.
statsSnap := &StatsSnap{
ID: id,
Addr: addr,
UserAgent: userAgent,
Services: services,
LastSend: p.LastSend(),
LastRecv: p.LastRecv(),
BytesSent: p.BytesSent(),
BytesRecv: p.BytesReceived(),
ConnTime: p.timeConnected,
TimeOffset: p.timeOffset,
Version: protocolVersion,
Inbound: p.inbound,
SelectedTip: p.selectedTip,
LastPingNonce: p.lastPingNonce,
LastPingMicros: p.lastPingMicros,
LastPingTime: p.lastPingTime,
ID: id,
Addr: addr,
UserAgent: userAgent,
Services: services,
LastSend: p.LastSend(),
LastRecv: p.LastRecv(),
BytesSent: p.BytesSent(),
BytesRecv: p.BytesReceived(),
ConnTime: p.timeConnected,
TimeOffset: p.timeOffset,
Version: protocolVersion,
Inbound: p.inbound,
SelectedTipHash: p.selectedTipHash,
LastPingNonce: p.lastPingNonce,
LastPingMicros: p.lastPingMicros,
LastPingTime: p.lastPingTime,
}
p.statsMtx.RUnlock()
@ -633,22 +641,28 @@ func (p *Peer) ProtocolVersion() uint32 {
return protocolVersion
}
// SelectedTip returns the selected tip of the peer.
// SelectedTipHash returns the selected tip of the peer.
//
// This function is safe for concurrent access.
func (p *Peer) SelectedTip() *daghash.Hash {
func (p *Peer) SelectedTipHash() *daghash.Hash {
p.statsMtx.RLock()
selectedTip := p.selectedTip
selectedTipHash := p.selectedTipHash
p.statsMtx.RUnlock()
return selectedTip
return selectedTipHash
}
// IsSyncCandidate returns whether or not this peer is a sync candidate.
// SetSelectedTipHash sets the selected tip of the peer.
func (p *Peer) SetSelectedTipHash(selectedTipHash *daghash.Hash) {
p.selectedTipHash = selectedTipHash
}
// IsSelectedTipKnown returns whether or not this peer selected
// tip is a known block.
//
// This function is safe for concurrent access.
func (p *Peer) IsSyncCandidate() bool {
return !p.cfg.BlockExists(p.selectedTip)
func (p *Peer) IsSelectedTipKnown() bool {
return !p.cfg.BlockExists(p.selectedTipHash)
}
// LastSend returns the last send time of the peer.
@ -718,7 +732,7 @@ func (p *Peer) WantsHeaders() bool {
// localVersionMsg creates a version message that can be used to send to the
// remote peer.
func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
selectedTip := p.cfg.SelectedTip()
selectedTipHash := p.cfg.SelectedTipHash()
theirNA := p.na
// If we are behind a proxy and the connection comes from the proxy then
@ -754,7 +768,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
subnetworkID := p.cfg.SubnetworkID
// Version message.
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, selectedTip, subnetworkID)
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, selectedTipHash, subnetworkID)
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)
@ -960,7 +974,7 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
p.statsMtx.Lock()
p.selectedTip = msg.SelectedTip
p.selectedTipHash = msg.SelectedTipHash
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()
@ -1172,6 +1186,10 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st
// headers.
deadline = time.Now().Add(stallResponseTimeout * 3)
pendingResponses[wire.CmdHeaders] = deadline
case wire.CmdGetSelectedTip:
// Expects a selected tip message.
pendingResponses[wire.CmdSelectedTip] = deadline
}
}
@ -1500,6 +1518,16 @@ out:
p.cfg.Listeners.OnSendHeaders(p, msg)
}
case *wire.MsgGetSelectedTip:
if p.cfg.Listeners.OnGetSelectedTip != nil {
p.cfg.Listeners.OnGetSelectedTip()
}
case *wire.MsgSelectedTip:
if p.cfg.Listeners.OnSelectedTip != nil {
p.cfg.Listeners.OnSelectedTip(p, msg)
}
default:
log.Debugf("Received unhandled message of type %s "+
"from %s", rmsg.Command(), p)

View File

@ -218,7 +218,7 @@ func TestPeerConnection(t *testing.T) {
DAGParams: &dagconfig.MainnetParams,
ProtocolVersion: wire.ProtocolVersion, // Configure with older version
Services: 0,
SelectedTip: fakeSelectedTipFn,
SelectedTipHash: fakeSelectedTipFn,
}
peer2Cfg := &peer.Config{
Listeners: peer1Cfg.Listeners,
@ -228,7 +228,7 @@ func TestPeerConnection(t *testing.T) {
DAGParams: &dagconfig.MainnetParams,
ProtocolVersion: wire.ProtocolVersion + 1,
Services: wire.SFNodeNetwork,
SelectedTip: fakeSelectedTipFn,
SelectedTipHash: fakeSelectedTipFn,
}
wantStats1 := peerStats{
@ -403,7 +403,7 @@ func TestPeerListeners(t *testing.T) {
UserAgentComments: []string{"comment"},
DAGParams: &dagconfig.MainnetParams,
Services: wire.SFNodeBloom,
SelectedTip: fakeSelectedTipFn,
SelectedTipHash: fakeSelectedTipFn,
}
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:16111"},
@ -528,7 +528,7 @@ func TestPeerListeners(t *testing.T) {
// TestOutboundPeer tests that the outbound peer works as expected.
func TestOutboundPeer(t *testing.T) {
peerCfg := &peer.Config{
SelectedTip: func() *daghash.Hash {
SelectedTipHash: func() *daghash.Hash {
return &daghash.ZeroHash
},
UserAgentName: "peer",
@ -568,8 +568,8 @@ func TestOutboundPeer(t *testing.T) {
<-done
p.Disconnect()
// Test SelectedTip
var selectedTip = func() *daghash.Hash {
// Test SelectedTipHashAndBlueScore
var selectedTipHash = func() *daghash.Hash {
hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef"
hash, err := daghash.NewHashFromStr(hashStr)
if err != nil {
@ -578,7 +578,7 @@ func TestOutboundPeer(t *testing.T) {
return hash
}
peerCfg.SelectedTip = selectedTip
peerCfg.SelectedTipHash = selectedTipHash
r1, w1 := io.Pipe()
c1 := &conn{raddr: "10.0.0.1:16111", Writer: w1, Reader: r1}
p1, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:16111")
@ -645,7 +645,7 @@ func TestUnsupportedVersionPeer(t *testing.T) {
UserAgentComments: []string{"comment"},
DAGParams: &dagconfig.MainnetParams,
Services: 0,
SelectedTip: fakeSelectedTipFn,
SelectedTipHash: fakeSelectedTipFn,
}
localNA := wire.NewNetAddressIPPort(

View File

@ -33,7 +33,7 @@ func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) {
// This is not a mistake. The invs we desire start from the highest
// hash that we know of and end at the highest hash that the peer
// knows of.
err := sp.Peer.PushGetBlockInvsMsg(highHash, sp.Peer.SelectedTip())
err := sp.Peer.PushGetBlockInvsMsg(highHash, sp.Peer.SelectedTipHash())
if err != nil {
peerLog.Errorf("Failed pushing get blocks message for peer %s: %s",
sp, err)

View File

@ -0,0 +1,11 @@
package p2p
import (
"github.com/kaspanet/kaspad/wire"
)
// OnGetSelectedTip is invoked when a peer receives a getSelectedTip kaspa
// message.
func (sp *Peer) OnGetSelectedTip() {
sp.QueueMessage(wire.NewMsgSelectedTip(sp.selectedTipHash()), nil)
}

View File

@ -0,0 +1,14 @@
package p2p
import (
"github.com/kaspanet/kaspad/peer"
"github.com/kaspanet/kaspad/wire"
)
// OnSelectedTip is invoked when a peer receives a selectedTip kaspa
// message.
func (sp *Peer) OnSelectedTip(peer *peer.Peer, msg *wire.MsgSelectedTip) {
done := make(chan struct{})
sp.server.SyncManager.QueueSelectedTipMsg(msg, peer, done)
<-done
}

View File

@ -296,8 +296,8 @@ func newServerPeer(s *Server, isPersistent bool) *Peer {
}
}
// selectedTip returns the current selected tip
func (sp *Peer) selectedTip() *daghash.Hash {
// selectedTipHash returns the current selected tip hash
func (sp *Peer) selectedTipHash() *daghash.Hash {
return sp.server.DAG.SelectedTipHash()
}
@ -894,7 +894,7 @@ func (s *Server) handleQuery(state *peerState, querymsg interface{}) {
shouldMineOnGenesis := true
if state.Count() != 0 {
shouldMineOnGenesis = state.forAllPeers(func(sp *Peer) bool {
if !sp.SelectedTip().IsEqual(s.DAGParams.GenesisHash) {
if !sp.SelectedTipHash().IsEqual(s.DAGParams.GenesisHash) {
return false
}
return true
@ -1046,10 +1046,12 @@ func newPeerConfig(sp *Peer) *peer.Config {
OnFilterLoad: sp.OnFilterLoad,
OnGetAddr: sp.OnGetAddr,
OnAddr: sp.OnAddr,
OnGetSelectedTip: sp.OnGetSelectedTip,
OnSelectedTip: sp.OnSelectedTip,
OnRead: sp.OnRead,
OnWrite: sp.OnWrite,
},
SelectedTip: sp.selectedTip,
SelectedTipHash: sp.selectedTipHash,
BlockExists: sp.blockExists,
HostToNetAddress: sp.server.addrManager.HostToNetAddress,
Proxy: config.ActiveConfig().Proxy,

View File

@ -28,7 +28,7 @@ func handleGetPeerInfo(s *Server, cmd interface{}, closeChan <-chan struct{}) (i
Version: statsSnap.Version,
SubVer: statsSnap.UserAgent,
Inbound: statsSnap.Inbound,
SelectedTip: statsSnap.SelectedTip.String(),
SelectedTip: statsSnap.SelectedTipHash.String(),
BanScore: int32(p.BanScore()),
FeeFilter: p.FeeFilter(),
SyncNode: statsSnap.ID == syncPeerID,

View File

@ -364,7 +364,7 @@ var helpDescsEnUS = map[string]string{
// GetChainFromBlockResult help.
"getChainFromBlockResult-removedChainBlockHashes": "List chain-block hashes that were removed from the selected parent chain in top-to-bottom order",
"getChainFromBlockResult-addedChainBlocks": "List of ChainBlocks from Virtual.SelectedTip to StartHash (excluding StartHash) ordered bottom-to-top.",
"getChainFromBlockResult-addedChainBlocks": "List of ChainBlocks from Virtual.SelectedTipHashAndBlueScore to StartHash (excluding StartHash) ordered bottom-to-top.",
"getChainFromBlockResult-blocks": "If includeBlocks=true - contains the contents of all chain and accepted blocks in the AddedChainBlocks. Otherwise - omitted.",
// GetConnectionCountCmd help.

View File

@ -52,6 +52,8 @@ const (
CmdFeeFilter = "feefilter"
CmdGetBlockLocator = "getlocator"
CmdBlockLocator = "locator"
CmdSelectedTip = "selectedtip"
CmdGetSelectedTip = "getseltip"
)
// Message is an interface that describes a kaspa message. A type that
@ -139,6 +141,12 @@ func makeEmptyMessage(command string) (Message, error) {
case CmdFeeFilter:
msg = &MsgFeeFilter{}
case CmdGetSelectedTip:
msg = &MsgGetSelectedTip{}
case CmdSelectedTip:
msg = &MsgSelectedTip{}
default:
return nil, errors.Errorf("unhandled command [%s]", command)
}

View File

@ -9,8 +9,6 @@ import (
// MsgGetBlockLocator implements the Message interface and represents a kaspa
// getlocator message. It is used to request a block locator between start and stop hash.
// The locator is returned via a locator message (MsgBlockLocator).
//
// This message has no payload.
type MsgGetBlockLocator struct {
HighHash *daghash.Hash
LowHash *daghash.Hash

41
wire/msggetselectedtip.go Normal file
View File

@ -0,0 +1,41 @@
package wire
import (
"io"
)
// MsgGetSelectedTip implements the Message interface and represents a kaspa
// getseltip message. It is used to request the selected tip of another peer.
//
// This message has no payload.
type MsgGetSelectedTip struct{}
// KaspaDecode decodes r using the kaspa protocol encoding into the receiver.
// This is part of the Message interface implementation.
func (msg *MsgGetSelectedTip) KaspaDecode(r io.Reader, pver uint32) error {
return nil
}
// KaspaEncode encodes the receiver to w using the kaspa protocol encoding.
// This is part of the Message interface implementation.
func (msg *MsgGetSelectedTip) KaspaEncode(w io.Writer, pver uint32) error {
return nil
}
// Command returns the protocol command string for the message. This is part
// of the Message interface implementation.
func (msg *MsgGetSelectedTip) Command() string {
return CmdGetSelectedTip
}
// MaxPayloadLength returns the maximum length the payload can be for the
// receiver. This is part of the Message interface implementation.
func (msg *MsgGetSelectedTip) MaxPayloadLength(pver uint32) uint32 {
return 0
}
// NewMsgGetSelectedTip returns a new kaspa getseltip message that conforms to the
// Message interface.
func NewMsgGetSelectedTip() *MsgGetSelectedTip {
return &MsgGetSelectedTip{}
}

View File

@ -0,0 +1,87 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package wire
import (
"bytes"
"reflect"
"testing"
"github.com/davecgh/go-spew/spew"
)
// TestGetSelectedTip tests the MsgGetSelectedTip API.
func TestGetSelectedTip(t *testing.T) {
pver := ProtocolVersion
// Ensure the command is expected value.
wantCmd := "getseltip"
msg := NewMsgGetSelectedTip()
if cmd := msg.Command(); cmd != wantCmd {
t.Errorf("NewMsgGetSelectedTip: wrong command - got %v want %v",
cmd, wantCmd)
}
// Ensure max payload is expected value.
wantPayload := uint32(0)
maxPayload := msg.MaxPayloadLength(pver)
if maxPayload != wantPayload {
t.Errorf("MaxPayloadLength: wrong max payload length for "+
"protocol version %d - got %v, want %v", pver,
maxPayload, wantPayload)
}
}
// TestGetSelectedTipWire tests the MsgGetSelectedTip wire encode and decode for various
// protocol versions.
func TestGetSelectedTipWire(t *testing.T) {
msgGetSelectedTip := NewMsgGetSelectedTip()
msgGetSelectedTipEncoded := []byte{}
tests := []struct {
in *MsgGetSelectedTip // Message to encode
out *MsgGetSelectedTip // Expected decoded message
buf []byte // Wire encoding
pver uint32 // Protocol version for wire encoding
}{
// Latest protocol version.
{
msgGetSelectedTip,
msgGetSelectedTip,
msgGetSelectedTipEncoded,
ProtocolVersion,
},
}
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
// Encode the message to wire format.
var buf bytes.Buffer
err := test.in.KaspaEncode(&buf, test.pver)
if err != nil {
t.Errorf("KaspaEncode #%d error %v", i, err)
continue
}
if !bytes.Equal(buf.Bytes(), test.buf) {
t.Errorf("KaspaEncode #%d\n got: %s want: %s", i,
spew.Sdump(buf.Bytes()), spew.Sdump(test.buf))
continue
}
// Decode the message from wire format.
var msg MsgGetSelectedTip
rbuf := bytes.NewReader(test.buf)
err = msg.KaspaDecode(rbuf, test.pver)
if err != nil {
t.Errorf("KaspaDecode #%d error %v", i, err)
continue
}
if !reflect.DeepEqual(&msg, test.out) {
t.Errorf("KaspaDecode #%d\n got: %s want: %s", i,
spew.Sdump(msg), spew.Sdump(test.out))
continue
}
}
}

53
wire/msgselectedtip.go Normal file
View File

@ -0,0 +1,53 @@
package wire
import (
"github.com/kaspanet/kaspad/util/daghash"
"io"
)
// MsgSelectedTip implements the Message interface and represents a kaspa
// selectedtip message. It is used to answer getseltip messages and tell
// the asking peer what is the selected tip of this peer.
type MsgSelectedTip struct {
// The selected tip hash of the generator of the message.
SelectedTipHash *daghash.Hash
}
// KaspaDecode decodes r using the kaspa protocol encoding into the receiver.
// This is part of the Message interface implementation.
func (msg *MsgSelectedTip) KaspaDecode(r io.Reader, pver uint32) error {
msg.SelectedTipHash = &daghash.Hash{}
err := ReadElement(r, msg.SelectedTipHash)
if err != nil {
return err
}
return nil
}
// KaspaEncode encodes the receiver to w using the kaspa protocol encoding.
// This is part of the Message interface implementation.
func (msg *MsgSelectedTip) KaspaEncode(w io.Writer, pver uint32) error {
return WriteElement(w, msg.SelectedTipHash)
}
// Command returns the protocol command string for the message. This is part
// of the Message interface implementation.
func (msg *MsgSelectedTip) Command() string {
return CmdSelectedTip
}
// MaxPayloadLength returns the maximum length the payload can be for the
// receiver. This is part of the Message interface implementation.
func (msg *MsgSelectedTip) MaxPayloadLength(_ uint32) uint32 {
// selected tip hash 32 bytes
return daghash.HashSize
}
// NewMsgSelectedTip returns a new kaspa selectedtip message that conforms to the
// Message interface.
func NewMsgSelectedTip(selectedTipHash *daghash.Hash) *MsgSelectedTip {
return &MsgSelectedTip{
SelectedTipHash: selectedTipHash,
}
}

View File

@ -0,0 +1,90 @@
package wire
import (
"bytes"
"github.com/kaspanet/kaspad/util/daghash"
"reflect"
"testing"
"github.com/davecgh/go-spew/spew"
)
// TestSelectedTip tests the MsgSelectedTip API.
func TestSelectedTip(t *testing.T) {
pver := ProtocolVersion
// Ensure the command is expected value.
wantCmd := "selectedtip"
msg := NewMsgSelectedTip(&daghash.ZeroHash)
if cmd := msg.Command(); cmd != wantCmd {
t.Errorf("NewMsgSelectedTip: wrong command - got %v want %v",
cmd, wantCmd)
}
// Ensure max payload is expected value.
wantPayload := uint32(32)
maxPayload := msg.MaxPayloadLength(pver)
if maxPayload != wantPayload {
t.Errorf("MaxPayloadLength: wrong max payload length for "+
"protocol version %d - got %v, want %v", pver,
maxPayload, wantPayload)
}
}
// TestSelectedTipWire tests the MsgSelectedTip wire encode and decode for various
// protocol versions.
func TestSelectedTipWire(t *testing.T) {
hash := &daghash.Hash{1, 2, 3}
msgSelectedTip := NewMsgSelectedTip(hash)
msgSelectedTipEncoded := []byte{
0x01, 0x02, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}
tests := []struct {
in *MsgSelectedTip // Message to encode
out *MsgSelectedTip // Expected decoded message
buf []byte // Wire encoding
pver uint32 // Protocol version for wire encoding
}{
// Latest protocol version.
{
msgSelectedTip,
msgSelectedTip,
msgSelectedTipEncoded,
ProtocolVersion,
},
}
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
// Encode the message to wire format.
var buf bytes.Buffer
err := test.in.KaspaEncode(&buf, test.pver)
if err != nil {
t.Errorf("KaspaEncode #%d error %v", i, err)
continue
}
if !bytes.Equal(buf.Bytes(), test.buf) {
t.Errorf("KaspaEncode #%d\n got: %s want: %s", i,
spew.Sdump(buf.Bytes()), spew.Sdump(test.buf))
continue
}
// Decode the message from wire format.
var msg MsgSelectedTip
rbuf := bytes.NewReader(test.buf)
err = msg.KaspaDecode(rbuf, test.pver)
if err != nil {
t.Errorf("KaspaDecode #%d error %v", i, err)
continue
}
if !reflect.DeepEqual(&msg, test.out) {
t.Errorf("KaspaDecode #%d\n got: %s want: %s", i,
spew.Sdump(msg), spew.Sdump(test.out))
continue
}
}
}

View File

@ -55,8 +55,8 @@ type MsgVersion struct {
// on the wire. This has a max length of MaxUserAgentLen.
UserAgent string
// The selected tip of the generator of the version message.
SelectedTip *daghash.Hash
// The selected tip hash of the generator of the version message.
SelectedTipHash *daghash.Hash
// Don't announce transactions to peer.
DisableRelayTx bool
@ -137,8 +137,8 @@ func (msg *MsgVersion) KaspaDecode(r io.Reader, pver uint32) error {
}
msg.UserAgent = userAgent
msg.SelectedTip = &daghash.Hash{}
err = ReadElement(buf, msg.SelectedTip)
msg.SelectedTipHash = &daghash.Hash{}
err = ReadElement(buf, msg.SelectedTipHash)
if err != nil {
return err
}
@ -200,7 +200,7 @@ func (msg *MsgVersion) KaspaEncode(w io.Writer, pver uint32) error {
return err
}
err = WriteElement(w, msg.SelectedTip)
err = WriteElement(w, msg.SelectedTipHash)
if err != nil {
return err
}
@ -223,21 +223,19 @@ func (msg *MsgVersion) Command() string {
// MaxPayloadLength returns the maximum length the payload can be for the
// receiver. This is part of the Message interface implementation.
func (msg *MsgVersion) MaxPayloadLength(pver uint32) uint32 {
// XXX: <= 106 different
// Protocol version 4 bytes + services 8 bytes + timestamp 16 bytes +
// remote and local net addresses + nonce 8 bytes + length of user
// agent (varInt) + max allowed useragent length + last block 4 bytes +
// agent (varInt) + max allowed useragent length + selected tip hash length +
// relay transactions flag 1 byte.
return 33 + (maxNetAddressPayload(pver) * 2) + MaxVarIntPayload +
MaxUserAgentLen
return 29 + (maxNetAddressPayload(pver) * 2) + MaxVarIntPayload +
MaxUserAgentLen + daghash.HashSize
}
// NewMsgVersion returns a new kaspa version message that conforms to the
// Message interface using the passed parameters and defaults for the remaining
// fields.
func NewMsgVersion(me *NetAddress, you *NetAddress, nonce uint64,
selectedTip *daghash.Hash, subnetworkID *subnetworkid.SubnetworkID) *MsgVersion {
selectedTipHash *daghash.Hash, subnetworkID *subnetworkid.SubnetworkID) *MsgVersion {
// Limit the timestamp to one second precision since the protocol
// doesn't support better.
@ -249,7 +247,7 @@ func NewMsgVersion(me *NetAddress, you *NetAddress, nonce uint64,
AddrMe: *me,
Nonce: nonce,
UserAgent: DefaultUserAgent,
SelectedTip: selectedTip,
SelectedTipHash: selectedTipHash,
DisableRelayTx: false,
SubnetworkID: subnetworkID,
}

View File

@ -23,7 +23,7 @@ func TestVersion(t *testing.T) {
pver := ProtocolVersion
// Create version message data.
selectedTip := &daghash.Hash{12, 34}
selectedTipHash := &daghash.Hash{12, 34}
tcpAddrMe := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 16111}
me := NewNetAddress(tcpAddrMe, SFNodeNetwork)
tcpAddrYou := &net.TCPAddr{IP: net.ParseIP("192.168.0.1"), Port: 16111}
@ -34,7 +34,7 @@ func TestVersion(t *testing.T) {
}
// Ensure we get the correct data back out.
msg := NewMsgVersion(me, you, nonce, selectedTip, nil)
msg := NewMsgVersion(me, you, nonce, selectedTipHash, nil)
if msg.ProtocolVersion != int32(pver) {
t.Errorf("NewMsgVersion: wrong protocol version - got %v, want %v",
msg.ProtocolVersion, pver)
@ -55,9 +55,9 @@ func TestVersion(t *testing.T) {
t.Errorf("NewMsgVersion: wrong user agent - got %v, want %v",
msg.UserAgent, DefaultUserAgent)
}
if !msg.SelectedTip.IsEqual(selectedTip) {
t.Errorf("NewMsgVersion: wrong selected tip - got %s, want %s",
msg.SelectedTip, selectedTip)
if !msg.SelectedTipHash.IsEqual(selectedTipHash) {
t.Errorf("NewMsgVersion: wrong selected tip hash - got %s, want %s",
msg.SelectedTipHash, selectedTipHash)
}
if msg.DisableRelayTx {
t.Errorf("NewMsgVersion: disable relay tx is not false by "+
@ -106,10 +106,10 @@ func TestVersion(t *testing.T) {
// Ensure max payload is expected value.
// Protocol version 4 bytes + services 8 bytes + timestamp 16 bytes +
// remote and local net addresses + nonce 8 bytes + length of user agent
// (varInt) + max allowed user agent length + last block 4 bytes +
// remote and local net addresses + nonce 8 bytes + length of user
// agent (varInt) + max allowed useragent length + selected tip hash length +
// relay transactions flag 1 byte.
wantPayload := uint32(366)
wantPayload := uint32(394)
maxPayload := msg.MaxPayloadLength(pver)
if maxPayload != wantPayload {
t.Errorf("MaxPayloadLength: wrong max payload length for "+
@ -321,9 +321,9 @@ var baseVersion = &MsgVersion{
IP: net.ParseIP("127.0.0.1"),
Port: 16111,
},
Nonce: 123123, // 0x1e0f3
UserAgent: "/kaspadtest:0.0.1/",
SelectedTip: &daghash.Hash{0x12, 0x34},
Nonce: 123123, // 0x1e0f3
UserAgent: "/kaspadtest:0.0.1/",
SelectedTipHash: &daghash.Hash{0x12, 0x34},
}
// baseVersionEncoded is the wire encoded bytes for baseVersion using protocol
@ -371,9 +371,9 @@ var baseVersionWithRelayTx = &MsgVersion{
IP: net.ParseIP("127.0.0.1"),
Port: 16111,
},
Nonce: 123123, // 0x1e0f3
UserAgent: "/kaspadtest:0.0.1/",
SelectedTip: &daghash.Hash{0x12, 0x34},
Nonce: 123123, // 0x1e0f3
UserAgent: "/kaspadtest:0.0.1/",
SelectedTipHash: &daghash.Hash{0x12, 0x34},
}
// baseVersionWithRelayTxEncoded is the wire encoded bytes for