[NOD-1137] Implement handshake protocol (#792)

* [NOD-1126] Implement block relay flow

* [NOD-1126] Implement block relay flow

* [NOD-1126] Add StartGetRelayBlocksListener

* [NOD-1126] Integrate with new interface

* [NOD-1126] Fix comments

* [NOD-1126] Refactor protocol.go

* [NOD-1126] Split long lines

* [NOD-1126] Fix comment

* [NOD-1126] move sharedRequestedBlocks to a separate file

* [NOD-1126] Fix error message

* [NOD-1126] Move handleInv to StartBlockRelay

* [NOD-1126] Create hashesQueueSet type

* [NOD-1126] Make deleteFromRequestedBlocks a method

* [NOD-1126] Fix comment

* [NOD-1126] Add block logger

* [NOD-1126] Rename advertisedProtoVer->advertisedProtocolVer

* [NOD-1126] Fix comment and an error message

* [NOD-1126] Remove redundant loop

* [NOD-1126] Move requestBlocks upper

* [NOD-1126] Remove exiting blocks in requestedBlocks from hashesToRequest

* [NOD-1126] Change comment

* [NOD-1126] Rename stallResponseTimeout->timeout

* [NOD-1126] Use switch inside readMsgBlock

* [NOD-1126] Fix error message and remove redundant log

* [NOD-1126] Rename pacakge names

* [NOD-1126] Fix comment

* [NOD-1126] Change file names

* [NOD-1126] Convert block to partial if needed

* [NOD-1126] Remove function redeclaration

* [NOD-1126] continue instead of return

* [NOD-1126] Rename LogBlockBlueScore->LogBlock

* [NOD-1126] Add minimum functions to utils

* [NOD-1126] Flip condition on readInv

* [NOD-1126] Rename utilMath->mathUtil

* [NOD-1126] Fix comment

* [NOD-1137] Implement handshake

* [NOD-1137] Replace version's nonce with ID

* [NOD-1137] Remove redundant function

* [NOD-1137] Move handshake to a separate file

* [NOD-1137] Add todo

* [NOD-1137] Replace peer internal id with global peer ID

* [NOD-1137] Add serializer/deserializer to ID

* [NOD-1137] Remove validation from AddUserAgent

* [NOD-1137] Add missing id package

* [NOD-1137] Rename variables

* [NOD-1137] Add comment

* [NOD-1137] Implement GetBestLocalAddress

* [NOD-1137] Implement TODOs

* [NOD-1137] Rename variables

* [NOD-1137] Move errors.Is inside err!=nil branch

* [NOD-1137] Fix erroneous condition on Dequeue

* [NOD-1137] Fix bug in GetReadyPeerIDs

* [NOD-1137] Handle external IP on GetBestLocalAddress

* [NOD-1137] Remove version and verack message types when handshake is over

* [NOD-1137] Add FromBytes to id package

* [NOD-1137] Add protocol error

* [NOD-1137] Add ErrTimeout

* [NOD-1137] Log error only if exists

* [NOD-1137] Replace idFromBytes->id.FromBytes

* [NOD-1137] Add comments

* [NOD-1137] Remove ErrTimeout

* [NOD-1137] Unremove ErrTimeout

* [NOD-1137] Change comment

* [NOD-1137] Use EnqueueWithTimeout everywhere in protocol
This commit is contained in:
Ori Newman 2020-07-14 17:20:29 +03:00 committed by GitHub
parent f8e53d309c
commit 04b578cee1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 728 additions and 1449 deletions

View File

@ -2,6 +2,8 @@ package main
import (
"fmt"
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/server/serverutils"
"sync/atomic"
"github.com/kaspanet/kaspad/util/panics"
@ -89,7 +91,8 @@ func newKaspad(listenAddrs []string, interrupt <-chan struct{}) (*kaspad, error)
txMempool := setupMempool(dag, sigCache)
protocolManager, err := protocol.NewManager(listenAddrs, dag)
addressManager := addrmgr.New(serverutils.KaspadLookup, config.ActiveConfig().SubnetworkID)
protocolManager, err := protocol.NewManager(listenAddrs, dag, addressManager)
if err != nil {
return nil, err
}

View File

@ -54,6 +54,7 @@ var (
blkrLog = BackendLog.Logger("BLKR")
gbrlLog = BackendLog.Logger("GBRL")
blprLog = BackendLog.Logger("BLPR")
snvrLog = BackendLog.Logger("SNVR")
)
// SubsystemTags is an enum of all sub system tags
@ -83,7 +84,8 @@ var SubsystemTags = struct {
BLKR,
NTAR,
GBRL,
BLPR string
BLPR,
SNVR string
}{
ADXR: "ADXR",
AMGR: "AMGR",
@ -111,6 +113,7 @@ var SubsystemTags = struct {
GBRL: "GBRL",
NTAR: "NTAR",
BLPR: "BLPR",
SNVR: "SNVR",
}
// subsystemLoggers maps each subsystem identifier to its associated logger.
@ -141,6 +144,7 @@ var subsystemLoggers = map[string]*logs.Logger{
SubsystemTags.GBRL: gbrlLog,
SubsystemTags.NTAR: ntarLog,
SubsystemTags.BLPR: blprLog,
SubsystemTags.SNVR: snvrLog,
}
// InitLog attaches log file and error log file to the backend log.

View File

@ -1,12 +1,14 @@
package id
import (
"bytes"
"crypto/rand"
"encoding/hex"
"errors"
"io"
)
const idLength = 16
// IDLength of array used to store the ID.
const IDLength = 16
// ID identifies a network connection
type ID struct {
@ -15,22 +17,43 @@ type ID struct {
// GenerateID generates a new ID
func GenerateID() (*ID, error) {
bytes := make([]byte, idLength)
_, err := rand.Read(bytes)
id := new(ID)
err := id.Deserialize(rand.Reader)
if err != nil {
return nil, err
}
return NewID(bytes)
return id, nil
}
// NewID creates an ID from the given bytes
func NewID(bytes []byte) (*ID, error) {
if len(bytes) != idLength {
return nil, errors.New("invalid bytes length")
}
return &ID{bytes: bytes}, nil
// IsEqual returns whether id equals to other.
func (id *ID) IsEqual(other *ID) bool {
return bytes.Equal(id.bytes, other.bytes)
}
func (id *ID) String() string {
return hex.EncodeToString(id.bytes)
}
// Deserialize decodes a block from r into the receiver.
func (id *ID) Deserialize(r io.Reader) error {
id.bytes = make([]byte, IDLength)
_, err := io.ReadFull(r, id.bytes)
return err
}
// Serialize serializes the receiver into the given writer.
func (id *ID) Serialize(w io.Writer) error {
_, err := w.Write(id.bytes)
return err
}
// FromBytes returns an ID deserialized from the given byte slice.
func FromBytes(serializedID []byte) *ID {
r := bytes.NewReader(serializedID)
newID := new(ID)
err := newID.Deserialize(r)
if err != nil {
panic(err)
}
return newID
}

View File

@ -3,6 +3,8 @@ package netadapter
import (
"github.com/kaspanet/kaspad/netadapter/id"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
"net"
"strconv"
"sync"
"sync/atomic"
@ -165,3 +167,58 @@ func (na *NetAdapter) Broadcast(connectionIDs []*id.ID, message wire.Message) er
}
return nil
}
// GetBestLocalAddress returns the most appropriate local address to use
// for the given remote address.
func (na *NetAdapter) GetBestLocalAddress() (*wire.NetAddress, error) {
//TODO(libp2p) Reimplement this, and check reachability to the other node
if len(config.ActiveConfig().ExternalIPs) > 0 {
host, portString, err := net.SplitHostPort(config.ActiveConfig().ExternalIPs[0])
if err != nil {
portString = config.ActiveConfig().NetParams().DefaultPort
}
portInt, err := strconv.Atoi(portString)
if err != nil {
return nil, err
}
ip := net.ParseIP(host)
if ip == nil {
hostAddrs, err := net.LookupHost(host)
if err != nil {
return nil, err
}
ip = net.ParseIP(hostAddrs[0])
if ip == nil {
return nil, errors.Errorf("Cannot resolve IP address for host '%s'", host)
}
}
return wire.NewNetAddressIPPort(ip, uint16(portInt), wire.SFNodeNetwork), nil
}
listenAddress := config.ActiveConfig().Listeners[0]
_, portString, err := net.SplitHostPort(listenAddress)
if err != nil {
portString = config.ActiveConfig().NetParams().DefaultPort
}
portInt, err := strconv.Atoi(portString)
if err != nil {
return nil, err
}
addresses, err := net.InterfaceAddrs()
if err != nil {
return nil, err
}
for _, address := range addresses {
ip, _, err := net.ParseCIDR(address.String())
if err != nil {
continue
}
return wire.NewNetAddressIPPort(ip, uint16(portInt), wire.SFNodeNetwork), nil
}
return nil, errors.New("no address was found")
}

View File

@ -1,8 +1,8 @@
package router
import (
"errors"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"time"
)
@ -10,6 +10,9 @@ const (
maxMessages = 100
)
// ErrTimeout signifies that one of the router functions had a timeout.
var ErrTimeout = errors.New("timeout expired")
// onCapacityReachedHandler is a function that is to be
// called when a route reaches capacity.
type onCapacityReachedHandler func()
@ -61,7 +64,7 @@ func (r *Route) EnqueueWithTimeout(message wire.Message, timeout time.Duration)
}
select {
case <-time.After(timeout):
return false, errors.New("timeout expired")
return false, errors.Wrapf(ErrTimeout, "got timeout after %s", timeout)
case r.channel <- message:
return true, nil
}
@ -75,7 +78,7 @@ func (r *Route) DequeueWithTimeout(timeout time.Duration) (message wire.Message,
}
select {
case <-time.After(timeout):
return nil, false, errors.New("timeout expired")
return nil, false, errors.Wrapf(ErrTimeout, "got timeout after %s", timeout)
case message := <-r.channel:
return message, true, nil
}

View File

@ -1,127 +0,0 @@
// Copyright (c) 2015-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer_test
import (
"fmt"
"net"
"time"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/peer"
"github.com/kaspanet/kaspad/dagconfig"
"github.com/kaspanet/kaspad/wire"
)
func fakeSelectedTipFn() *daghash.Hash {
return &daghash.Hash{0x12, 0x34}
}
// mockRemotePeer creates a basic inbound peer listening on the simnet port for
// use with Example_peerConnection. It does not return until the listner is
// active.
func mockRemotePeer() error {
// Configure peer to act as a simnet node that offers no services.
peerCfg := &peer.Config{
UserAgentName: "peer", // User agent name to advertise.
UserAgentVersion: "1.0.0", // User agent version to advertise.
DAGParams: &dagconfig.SimnetParams,
SelectedTipHash: fakeSelectedTipFn,
SubnetworkID: nil,
}
// Accept connections on the simnet port.
listener, err := net.Listen("tcp", "127.0.0.1:18555")
if err != nil {
return err
}
go func() {
conn, err := listener.Accept()
if err != nil {
fmt.Printf("Accept: error %v\n", err)
return
}
// Create and start the inbound peer.
p := peer.NewInboundPeer(peerCfg)
err = p.AssociateConnection(conn)
if err != nil {
fmt.Printf("AssociateConnection: error %+v\n", err)
return
}
}()
return nil
}
// This example demonstrates the basic process for initializing and creating an
// outbound peer. Peers negotiate by exchanging version and verack messages.
// For demonstration, a simple handler for version message is attached to the
// peer.
func Example_newOutboundPeer() {
// Ordinarily this will not be needed since the outbound peer will be
// connecting to a remote peer, however, since this example is executed
// and tested, a mock remote peer is needed to listen for the outbound
// peer.
if err := mockRemotePeer(); err != nil {
fmt.Printf("mockRemotePeer: unexpected error %v\n", err)
return
}
// Create an outbound peer that is configured to act as a simnet node
// that offers no services and has listeners for the version and verack
// messages. The verack listener is used here to signal the code below
// when the handshake has been finished by signalling a channel.
verack := make(chan struct{})
peerCfg := &peer.Config{
UserAgentName: "peer", // User agent name to advertise.
UserAgentVersion: "1.0.0", // User agent version to advertise.
DAGParams: &dagconfig.SimnetParams,
Services: 0,
Listeners: peer.MessageListeners{
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
fmt.Println("outbound: received version")
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
},
},
SelectedTipHash: fakeSelectedTipFn,
SubnetworkID: nil,
}
p, err := peer.NewOutboundPeer(peerCfg, "127.0.0.1:18555")
if err != nil {
fmt.Printf("NewOutboundPeer: error %v\n", err)
return
}
// Establish the connection to the peer address and mark it connected.
conn, err := net.Dial("tcp", p.Addr())
if err != nil {
fmt.Printf("net.Dial: error %+v\n", err)
return
}
err = p.AssociateConnection(conn)
if err != nil {
fmt.Printf("AssociateConnection: error %+v\n", err)
return
}
// Wait for the verack message or timeout in case of failure.
select {
case <-verack:
case <-time.After(time.Second * 1):
fmt.Printf("Example_peerConnection: verack timeout")
}
// Disconnect the peer.
p.Disconnect()
p.WaitForDisconnect()
// Output:
// outbound: received version
}

View File

@ -1,18 +0,0 @@
// Copyright (c) 2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
/*
This test file is part of the peer package rather than than the peer_test
package so it can bridge access to the internals to properly test cases which
are either not possible or can't reliably be tested via the public interface.
The functions are only exported while the tests are being run.
*/
package peer
// TstAllowSelfConns allows the test package to allow self connections by
// disabling the detection logic.
func TstAllowSelfConns() {
allowSelfConns = true
}

View File

@ -1,170 +0,0 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer
import (
"crypto/rand"
"fmt"
"testing"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
)
// TestMruInventoryMap ensures the MruInventoryMap behaves as expected including
// limiting, eviction of least-recently used entries, specific entry removal,
// and existence tests.
func TestMruInventoryMap(t *testing.T) {
// Create a bunch of fake inventory vectors to use in testing the mru
// inventory code.
numInvVects := 10
invVects := make([]*wire.InvVect, 0, numInvVects)
for i := 0; i < numInvVects; i++ {
hash := &daghash.Hash{byte(i)}
iv := wire.NewInvVect(wire.InvTypeBlock, hash)
invVects = append(invVects, iv)
}
tests := []struct {
name string
limit int
}{
{name: "limit 0", limit: 0},
{name: "limit 1", limit: 1},
{name: "limit 5", limit: 5},
{name: "limit 7", limit: 7},
{name: "limit one less than available", limit: numInvVects - 1},
{name: "limit all available", limit: numInvVects},
}
testLoop:
for i, test := range tests {
// Create a new mru inventory map limited by the specified test
// limit and add all of the test inventory vectors. This will
// cause evicition since there are more test inventory vectors
// than the limits.
mruInvMap := newMruInventoryMap(uint(test.limit))
for j := 0; j < numInvVects; j++ {
mruInvMap.Add(invVects[j])
}
// Ensure the limited number of most recent entries in the
// inventory vector list exist.
for j := numInvVects - test.limit; j < numInvVects; j++ {
if !mruInvMap.Exists(invVects[j]) {
t.Errorf("Exists #%d (%s) entry %s does not "+
"exist", i, test.name, *invVects[j])
continue testLoop
}
}
// Ensure the entries before the limited number of most recent
// entries in the inventory vector list do not exist.
for j := 0; j < numInvVects-test.limit; j++ {
if mruInvMap.Exists(invVects[j]) {
t.Errorf("Exists #%d (%s) entry %s exists", i,
test.name, *invVects[j])
continue testLoop
}
}
// Readd the entry that should currently be the least-recently
// used entry so it becomes the most-recently used entry, then
// force an eviction by adding an entry that doesn't exist and
// ensure the evicted entry is the new least-recently used
// entry.
//
// This check needs at least 2 entries.
if test.limit > 1 {
origLruIndex := numInvVects - test.limit
mruInvMap.Add(invVects[origLruIndex])
iv := wire.NewInvVect(wire.InvTypeBlock,
&daghash.Hash{0x00, 0x01})
mruInvMap.Add(iv)
// Ensure the original lru entry still exists since it
// was updated and should've have become the mru entry.
if !mruInvMap.Exists(invVects[origLruIndex]) {
t.Errorf("MRU #%d (%s) entry %s does not exist",
i, test.name, *invVects[origLruIndex])
continue testLoop
}
// Ensure the entry that should've become the new lru
// entry was evicted.
newLruIndex := origLruIndex + 1
if mruInvMap.Exists(invVects[newLruIndex]) {
t.Errorf("MRU #%d (%s) entry %s exists", i,
test.name, *invVects[newLruIndex])
continue testLoop
}
}
// Delete all of the entries in the inventory vector list,
// including those that don't exist in the map, and ensure they
// no longer exist.
for j := 0; j < numInvVects; j++ {
mruInvMap.Delete(invVects[j])
if mruInvMap.Exists(invVects[j]) {
t.Errorf("Delete #%d (%s) entry %s exists", i,
test.name, *invVects[j])
continue testLoop
}
}
}
}
// TestMruInventoryMapStringer tests the stringized output for the
// MruInventoryMap type.
func TestMruInventoryMapStringer(t *testing.T) {
// Create a couple of fake inventory vectors to use in testing the mru
// inventory stringer code.
hash1 := &daghash.Hash{0x01}
hash2 := &daghash.Hash{0x02}
iv1 := wire.NewInvVect(wire.InvTypeBlock, hash1)
iv2 := wire.NewInvVect(wire.InvTypeBlock, hash2)
// Create new mru inventory map and add the inventory vectors.
mruInvMap := newMruInventoryMap(uint(2))
mruInvMap.Add(iv1)
mruInvMap.Add(iv2)
// Ensure the stringer gives the expected result. Since map iteration
// is not ordered, either entry could be first, so account for both
// cases.
wantStr1 := fmt.Sprintf("<%d>[%s, %s]", 2, iv1, iv2)
wantStr2 := fmt.Sprintf("<%d>[%s, %s]", 2, iv2, iv1)
gotStr := mruInvMap.String()
if gotStr != wantStr1 && gotStr != wantStr2 {
t.Fatalf("unexpected string representation - got %q, want %q "+
"or %q", gotStr, wantStr1, wantStr2)
}
}
// BenchmarkMruInventoryList performs basic benchmarks on the most recently
// used inventory handling.
func BenchmarkMruInventoryList(b *testing.B) {
// Create a bunch of fake inventory vectors to use in benchmarking
// the mru inventory code.
b.StopTimer()
numInvVects := 100000
invVects := make([]*wire.InvVect, 0, numInvVects)
for i := 0; i < numInvVects; i++ {
hashBytes := make([]byte, daghash.HashSize)
rand.Read(hashBytes)
hash, _ := daghash.NewHash(hashBytes)
iv := wire.NewInvVect(wire.InvTypeBlock, hash)
invVects = append(invVects, iv)
}
b.StartTimer()
// Benchmark the add plus evicition code.
limit := 20000
mruInvMap := newMruInventoryMap(uint(limit))
for i := 0; i < b.N; i++ {
mruInvMap.Add(invVects[i%numInvVects])
}
}

View File

@ -1,152 +0,0 @@
// Copyright (c) 2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer
import (
"fmt"
"testing"
)
// TestMruNonceMap ensures the mruNonceMap behaves as expected including
// limiting, eviction of least-recently used entries, specific entry removal,
// and existence tests.
func TestMruNonceMap(t *testing.T) {
// Create a bunch of fake nonces to use in testing the mru nonce code.
numNonces := 10
nonces := make([]uint64, 0, numNonces)
for i := 0; i < numNonces; i++ {
nonces = append(nonces, uint64(i))
}
tests := []struct {
name string
limit int
}{
{name: "limit 0", limit: 0},
{name: "limit 1", limit: 1},
{name: "limit 5", limit: 5},
{name: "limit 7", limit: 7},
{name: "limit one less than available", limit: numNonces - 1},
{name: "limit all available", limit: numNonces},
}
testLoop:
for i, test := range tests {
// Create a new mru nonce map limited by the specified test
// limit and add all of the test nonces. This will cause
// evicition since there are more test nonces than the limits.
mruNonceMap := newMruNonceMap(uint(test.limit))
for j := 0; j < numNonces; j++ {
mruNonceMap.Add(nonces[j])
}
// Ensure the limited number of most recent entries in the list
// exist.
for j := numNonces - test.limit; j < numNonces; j++ {
if !mruNonceMap.Exists(nonces[j]) {
t.Errorf("Exists #%d (%s) entry %d does not "+
"exist", i, test.name, nonces[j])
continue testLoop
}
}
// Ensure the entries before the limited number of most recent
// entries in the list do not exist.
for j := 0; j < numNonces-test.limit; j++ {
if mruNonceMap.Exists(nonces[j]) {
t.Errorf("Exists #%d (%s) entry %d exists", i,
test.name, nonces[j])
continue testLoop
}
}
// Readd the entry that should currently be the least-recently
// used entry so it becomes the most-recently used entry, then
// force an eviction by adding an entry that doesn't exist and
// ensure the evicted entry is the new least-recently used
// entry.
//
// This check needs at least 2 entries.
if test.limit > 1 {
origLruIndex := numNonces - test.limit
mruNonceMap.Add(nonces[origLruIndex])
mruNonceMap.Add(uint64(numNonces) + 1)
// Ensure the original lru entry still exists since it
// was updated and should've have become the mru entry.
if !mruNonceMap.Exists(nonces[origLruIndex]) {
t.Errorf("MRU #%d (%s) entry %d does not exist",
i, test.name, nonces[origLruIndex])
continue testLoop
}
// Ensure the entry that should've become the new lru
// entry was evicted.
newLruIndex := origLruIndex + 1
if mruNonceMap.Exists(nonces[newLruIndex]) {
t.Errorf("MRU #%d (%s) entry %d exists", i,
test.name, nonces[newLruIndex])
continue testLoop
}
}
// Delete all of the entries in the list, including those that
// don't exist in the map, and ensure they no longer exist.
for j := 0; j < numNonces; j++ {
mruNonceMap.Delete(nonces[j])
if mruNonceMap.Exists(nonces[j]) {
t.Errorf("Delete #%d (%s) entry %d exists", i,
test.name, nonces[j])
continue testLoop
}
}
}
}
// TestMruNonceMapStringer tests the stringized output for the mruNonceMap type.
func TestMruNonceMapStringer(t *testing.T) {
// Create a couple of fake nonces to use in testing the mru nonce
// stringer code.
nonce1 := uint64(10)
nonce2 := uint64(20)
// Create new mru nonce map and add the nonces.
mruNonceMap := newMruNonceMap(uint(2))
mruNonceMap.Add(nonce1)
mruNonceMap.Add(nonce2)
// Ensure the stringer gives the expected result. Since map iteration
// is not ordered, either entry could be first, so account for both
// cases.
wantStr1 := fmt.Sprintf("<%d>[%d, %d]", 2, nonce1, nonce2)
wantStr2 := fmt.Sprintf("<%d>[%d, %d]", 2, nonce2, nonce1)
gotStr := mruNonceMap.String()
if gotStr != wantStr1 && gotStr != wantStr2 {
t.Fatalf("unexpected string representation - got %q, want %q "+
"or %q", gotStr, wantStr1, wantStr2)
}
}
// BenchmarkMruNonceList performs basic benchmarks on the most recently used
// nonce handling.
func BenchmarkMruNonceList(b *testing.B) {
// Create a bunch of fake nonces to use in benchmarking the mru nonce
// code.
b.StopTimer()
numNonces := 100000
nonces := make([]uint64, 0, numNonces)
for i := 0; i < numNonces; i++ {
nonces = append(nonces, uint64(i))
}
b.StartTimer()
// Benchmark the add plus evicition code.
limit := 20000
mruNonceMap := newMruNonceMap(uint(limit))
for i := 0; i < b.N; i++ {
mruNonceMap.Add(nonces[i%numNonces])
}
}

View File

@ -713,58 +713,33 @@ func (p *Peer) TimeOffset() int64 {
// localVersionMsg creates a version message that can be used to send to the
// remote peer.
func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
selectedTipHash := p.cfg.SelectedTipHash()
theirNA := p.na
// If we are behind a proxy and the connection comes from the proxy then
// we return an unroutable address as their address. This is to prevent
// leaking the tor proxy address.
if p.cfg.Proxy != "" {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
}
}
// Create a wire.NetAddress with only the services set to use as the
// "addrme" in the version message.
//TODO(libp2p) Remove this function
panic("not supported anymore")
//selectedTipHash := p.cfg.SelectedTipHash()
//
// Older nodes previously added the IP and port information to the
// address manager which proved to be unreliable as an inbound
// connection from a peer didn't necessarily mean the peer itself
// accepted inbound connections.
//// Generate a unique nonce for this peer so self connections can be
//// detected. This is accomplished by adding it to a size-limited map of
//// recently seen nonces.
//nonce := uint64(rand.Int63())
//sentNonces.Add(nonce)
//
// Also, the timestamp is unused in the version message.
ourNA := &wire.NetAddress{
Services: p.cfg.Services,
}
// Generate a unique nonce for this peer so self connections can be
// detected. This is accomplished by adding it to a size-limited map of
// recently seen nonces.
nonce := uint64(rand.Int63())
sentNonces.Add(nonce)
subnetworkID := p.cfg.SubnetworkID
// Version message.
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, selectedTipHash, subnetworkID)
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)
msg.AddrYou.Services = wire.SFNodeNetwork
// Advertise the services flag
msg.Services = p.cfg.Services
// Advertise our max supported protocol version.
msg.ProtocolVersion = p.cfg.ProtocolVersion
// Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = p.cfg.DisableRelayTx
return msg, nil
//subnetworkID := p.cfg.SubnetworkID
//
//// Version message.
//msg := wire.NewMsgVersion(nonce, selectedTipHash, subnetworkID)
//msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
// p.cfg.UserAgentComments...)
//
//// Advertise the services flag
//msg.Services = p.cfg.Services
//
//// Advertise our max supported protocol version.
//msg.ProtocolVersion = p.cfg.ProtocolVersion
//
//// Advertise if inv messages for transactions are desired.
//msg.DisableRelayTx = p.cfg.DisableRelayTx
//
//return msg, nil
}
// PushAddrMsg sends an addr message to the connected peer using the provided
@ -890,10 +865,11 @@ func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string
// from the remote peer. It will return an error if the remote peer's version
// is not compatible with ours.
func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}
//TODO(libp2p) Remove this function
//// Detect self connections.
//if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
// return errors.New("disconnecting peer connected to self")
//}
// Notify and disconnect clients that have a protocol version that is
// too old.
@ -901,7 +877,7 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
if msg.ProtocolVersion < minAcceptableProtocolVersion {
reason := fmt.Sprintf("protocol version must be %d or greater",
minAcceptableProtocolVersion)
return errors.New(reason)

View File

@ -1,778 +0,0 @@
// Copyright (c) 2015-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer
import (
"io"
"net"
"strconv"
"strings"
"testing"
"time"
"github.com/kaspanet/kaspad/util/testtools"
"github.com/pkg/errors"
"github.com/btcsuite/go-socks/socks"
"github.com/kaspanet/kaspad/dagconfig"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
)
// conn mocks a network connection by implementing the net.Conn interface. It
// is used to test peer connection without actually opening a network
// connection.
type conn struct {
io.Reader
io.Writer
io.Closer
// local network, address for the connection.
lnet, laddr string
// remote network, address for the connection.
rnet, raddr string
// mocks socks proxy if true
proxy bool
}
// LocalAddr returns the local address for the connection.
func (c conn) LocalAddr() net.Addr {
return &addr{c.lnet, c.laddr}
}
// Remote returns the remote address for the connection.
func (c conn) RemoteAddr() net.Addr {
if !c.proxy {
return &addr{c.rnet, c.raddr}
}
host, strPort, _ := net.SplitHostPort(c.raddr)
port, _ := strconv.Atoi(strPort)
return &socks.ProxiedAddr{
Net: c.rnet,
Host: host,
Port: port,
}
}
// Close handles closing the connection.
func (c conn) Close() error {
if c.Closer == nil {
return nil
}
return c.Closer.Close()
}
func (c conn) SetDeadline(t time.Time) error { return nil }
func (c conn) SetReadDeadline(t time.Time) error { return nil }
func (c conn) SetWriteDeadline(t time.Time) error { return nil }
// addr mocks a network address
type addr struct {
net, address string
}
func (m addr) Network() string { return m.net }
func (m addr) String() string { return m.address }
// pipe turns two mock connections into a full-duplex connection similar to
// net.Pipe to allow pipe's with (fake) addresses.
func pipe(c1, c2 *conn) (*conn, *conn) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()
c1.Writer = w1
c1.Closer = w1
c2.Reader = r1
c1.Reader = r2
c2.Writer = w2
c2.Closer = w2
return c1, c2
}
// peerStats holds the expected peer stats used for testing peer.
type peerStats struct {
wantUserAgent string
wantServices wire.ServiceFlag
wantProtocolVersion uint32
wantConnected bool
wantVersionKnown bool
wantVerAckReceived bool
wantLastPingTime time.Time
wantLastPingNonce uint64
wantLastPingMicros int64
wantTimeOffset int64
wantBytesSent uint64
wantBytesReceived uint64
}
// testPeer tests the given peer's flags and stats
func testPeer(t *testing.T, p *Peer, s peerStats) {
if p.UserAgent() != s.wantUserAgent {
t.Errorf("testPeer: wrong UserAgent - got %v, want %v", p.UserAgent(), s.wantUserAgent)
return
}
if p.Services() != s.wantServices {
t.Errorf("testPeer: wrong Services - got %v, want %v", p.Services(), s.wantServices)
return
}
if !p.LastPingTime().Equal(s.wantLastPingTime) {
t.Errorf("testPeer: wrong LastPingTime - got %v, want %v", p.LastPingTime(), s.wantLastPingTime)
return
}
if p.LastPingNonce() != s.wantLastPingNonce {
t.Errorf("testPeer: wrong LastPingNonce - got %v, want %v", p.LastPingNonce(), s.wantLastPingNonce)
return
}
if p.LastPingMicros() != s.wantLastPingMicros {
t.Errorf("testPeer: wrong LastPingMicros - got %v, want %v", p.LastPingMicros(), s.wantLastPingMicros)
return
}
if p.VerAckReceived() != s.wantVerAckReceived {
t.Errorf("testPeer: wrong VerAckReceived - got %v, want %v", p.VerAckReceived(), s.wantVerAckReceived)
return
}
if p.VersionKnown() != s.wantVersionKnown {
t.Errorf("testPeer: wrong VersionKnown - got %v, want %v", p.VersionKnown(), s.wantVersionKnown)
return
}
if p.ProtocolVersion() != s.wantProtocolVersion {
t.Errorf("testPeer: wrong ProtocolVersion - got %v, want %v", p.ProtocolVersion(), s.wantProtocolVersion)
return
}
// Allow for a deviation of 1s.
secondsInMs := time.Second.Milliseconds()
if p.TimeOffset() > s.wantTimeOffset+secondsInMs && p.TimeOffset() < s.wantTimeOffset-secondsInMs {
t.Errorf("testPeer: wrong TimeOffset - got %v, want between %v and %v", s.wantTimeOffset-secondsInMs,
s.wantTimeOffset, s.wantTimeOffset+secondsInMs)
return
}
if p.BytesSent() != s.wantBytesSent {
t.Errorf("testPeer: wrong BytesSent - got %v, want %v", p.BytesSent(), s.wantBytesSent)
return
}
if p.BytesReceived() != s.wantBytesReceived {
t.Errorf("testPeer: wrong BytesReceived - got %v, want %v", p.BytesReceived(), s.wantBytesReceived)
return
}
if p.Connected() != s.wantConnected {
t.Errorf("testPeer: wrong Connected - got %v, want %v", p.Connected(), s.wantConnected)
return
}
stats := p.StatsSnapshot()
if p.ID() != stats.ID {
t.Errorf("testPeer: wrong ID - got %v, want %v", p.ID(), stats.ID)
return
}
if p.Addr() != stats.Addr {
t.Errorf("testPeer: wrong Addr - got %v, want %v", p.Addr(), stats.Addr)
return
}
if p.LastSend() != stats.LastSend {
t.Errorf("testPeer: wrong LastSend - got %v, want %v", p.LastSend(), stats.LastSend)
return
}
if p.LastRecv() != stats.LastRecv {
t.Errorf("testPeer: wrong LastRecv - got %v, want %v", p.LastRecv(), stats.LastRecv)
return
}
}
// TestPeerConnection tests connection between inbound and outbound peers.
func TestPeerConnection(t *testing.T) {
inPeerVerack, outPeerVerack, inPeerOnWriteVerack, outPeerOnWriteVerack :=
make(chan struct{}, 1), make(chan struct{}, 1), make(chan struct{}, 1), make(chan struct{}, 1)
inPeerCfg := &Config{
Listeners: MessageListeners{
OnVerAck: func(p *Peer, msg *wire.MsgVerAck) {
inPeerVerack <- struct{}{}
},
OnWrite: func(p *Peer, bytesWritten int, msg wire.Message,
err error) {
if _, ok := msg.(*wire.MsgVerAck); ok {
inPeerOnWriteVerack <- struct{}{}
}
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
UserAgentComments: []string{"comment"},
DAGParams: &dagconfig.MainnetParams,
ProtocolVersion: wire.ProtocolVersion, // Configure with older version
Services: 0,
SelectedTipHash: fakeSelectedTipFn,
SubnetworkID: nil,
}
outPeerCfg := &Config{
Listeners: MessageListeners{
OnVerAck: func(p *Peer, msg *wire.MsgVerAck) {
outPeerVerack <- struct{}{}
},
OnWrite: func(p *Peer, bytesWritten int, msg wire.Message,
err error) {
if _, ok := msg.(*wire.MsgVerAck); ok {
outPeerOnWriteVerack <- struct{}{}
}
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
UserAgentComments: []string{"comment"},
DAGParams: &dagconfig.MainnetParams,
ProtocolVersion: wire.ProtocolVersion + 1,
Services: wire.SFNodeNetwork,
SelectedTipHash: fakeSelectedTipFn,
SubnetworkID: nil,
}
wantStats1 := peerStats{
wantUserAgent: wire.DefaultUserAgent + "peer:1.0(comment)/",
wantServices: 0,
wantProtocolVersion: wire.ProtocolVersion,
wantConnected: true,
wantVersionKnown: true,
wantVerAckReceived: true,
wantLastPingTime: time.Time{},
wantLastPingNonce: uint64(0),
wantLastPingMicros: int64(0),
wantTimeOffset: int64(0),
wantBytesSent: 195, // 171 version + 24 verack
wantBytesReceived: 195,
}
wantStats2 := peerStats{
wantUserAgent: wire.DefaultUserAgent + "peer:1.0(comment)/",
wantServices: wire.SFNodeNetwork,
wantProtocolVersion: wire.ProtocolVersion,
wantConnected: true,
wantVersionKnown: true,
wantVerAckReceived: true,
wantLastPingTime: time.Time{},
wantLastPingNonce: uint64(0),
wantLastPingMicros: int64(0),
wantTimeOffset: int64(0),
wantBytesSent: 195, // 171 version + 24 verack
wantBytesReceived: 195,
}
tests := []struct {
name string
setup func() (*Peer, *Peer, error)
}{
{
"basic handshake",
func() (*Peer, *Peer, error) {
inPeer, outPeer, err := setupPeers(inPeerCfg, outPeerCfg)
if err != nil {
return nil, nil, err
}
// wait for 4 veracks
if !testtools.WaitTillAllCompleteOrTimeout(time.Second,
inPeerVerack, inPeerOnWriteVerack, outPeerVerack, outPeerOnWriteVerack) {
return nil, nil, errors.New("handshake timeout")
}
return inPeer, outPeer, nil
},
},
{
"socks proxy",
func() (*Peer, *Peer, error) {
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:16111", proxy: true},
&conn{raddr: "10.0.0.2:16111"},
)
inPeer, outPeer, err := setupPeersWithConns(inPeerCfg, outPeerCfg, inConn, outConn)
if err != nil {
return nil, nil, err
}
// wait for 4 veracks
if !testtools.WaitTillAllCompleteOrTimeout(time.Second,
inPeerVerack, inPeerOnWriteVerack, outPeerVerack, outPeerOnWriteVerack) {
return nil, nil, errors.New("handshake timeout")
}
return inPeer, outPeer, nil
},
},
}
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
inPeer, outPeer, err := test.setup()
if err != nil {
t.Errorf("TestPeerConnection setup #%d: unexpected err %v", i, err)
return
}
testPeer(t, inPeer, wantStats2)
testPeer(t, outPeer, wantStats1)
inPeer.Disconnect()
outPeer.Disconnect()
inPeer.WaitForDisconnect()
outPeer.WaitForDisconnect()
}
}
// TestPeerListeners tests that the peer listeners are called as expected.
func TestPeerListeners(t *testing.T) {
inPeerVerack, outPeerVerack := make(chan struct{}, 1), make(chan struct{}, 1)
ok := make(chan wire.Message, 20)
inPeerCfg := &Config{
Listeners: MessageListeners{
OnGetAddr: func(p *Peer, msg *wire.MsgGetAddr) {
ok <- msg
},
OnAddr: func(p *Peer, msg *wire.MsgAddr) {
ok <- msg
},
OnPing: func(p *Peer, msg *wire.MsgPing) {
ok <- msg
},
OnPong: func(p *Peer, msg *wire.MsgPong) {
ok <- msg
},
OnTx: func(p *Peer, msg *wire.MsgTx) {
ok <- msg
},
OnBlock: func(p *Peer, msg *wire.MsgBlock, buf []byte) {
ok <- msg
},
OnInv: func(p *Peer, msg *wire.MsgInv) {
ok <- msg
},
OnNotFound: func(p *Peer, msg *wire.MsgNotFound) {
ok <- msg
},
OnGetData: func(p *Peer, msg *wire.MsgGetData) {
ok <- msg
},
OnGetBlockInvs: func(p *Peer, msg *wire.MsgGetBlockInvs) {
ok <- msg
},
OnFeeFilter: func(p *Peer, msg *wire.MsgFeeFilter) {
ok <- msg
},
OnFilterAdd: func(p *Peer, msg *wire.MsgFilterAdd) {
ok <- msg
},
OnFilterClear: func(p *Peer, msg *wire.MsgFilterClear) {
ok <- msg
},
OnFilterLoad: func(p *Peer, msg *wire.MsgFilterLoad) {
ok <- msg
},
OnMerkleBlock: func(p *Peer, msg *wire.MsgMerkleBlock) {
ok <- msg
},
OnVersion: func(p *Peer, msg *wire.MsgVersion) {
ok <- msg
},
OnVerAck: func(p *Peer, msg *wire.MsgVerAck) {
inPeerVerack <- struct{}{}
},
OnReject: func(p *Peer, msg *wire.MsgReject) {
ok <- msg
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
UserAgentComments: []string{"comment"},
DAGParams: &dagconfig.MainnetParams,
Services: wire.SFNodeBloom,
SelectedTipHash: fakeSelectedTipFn,
SubnetworkID: nil,
}
outPeerCfg := &Config{}
*outPeerCfg = *inPeerCfg // copy inPeerCfg
outPeerCfg.Listeners.OnVerAck = func(p *Peer, msg *wire.MsgVerAck) {
outPeerVerack <- struct{}{}
}
inPeer, outPeer, err := setupPeers(inPeerCfg, outPeerCfg)
if err != nil {
t.Errorf("TestPeerListeners: %v", err)
}
// wait for 2 veracks
if !testtools.WaitTillAllCompleteOrTimeout(time.Second, inPeerVerack, outPeerVerack) {
t.Errorf("TestPeerListeners: Timout waiting for veracks")
}
tests := []struct {
listener string
msg wire.Message
}{
{
"OnGetAddr",
wire.NewMsgGetAddr(false, nil),
},
{
"OnAddr",
wire.NewMsgAddr(false, nil),
},
{
"OnPing",
wire.NewMsgPing(42),
},
{
"OnPong",
wire.NewMsgPong(42),
},
{
"OnTx",
wire.NewNativeMsgTx(wire.TxVersion, nil, nil),
},
{
"OnBlock",
wire.NewMsgBlock(wire.NewBlockHeader(1,
[]*daghash.Hash{}, &daghash.Hash{}, &daghash.Hash{}, &daghash.Hash{}, 1, 1)),
},
{
"OnInv",
wire.NewMsgInv(),
},
{
"OnNotFound",
wire.NewMsgNotFound(),
},
{
"OnGetData",
wire.NewMsgGetData(),
},
{
"OnGetBlockInvs",
wire.NewMsgGetBlockInvs(&daghash.Hash{}, &daghash.Hash{}),
},
{
"OnFeeFilter",
wire.NewMsgFeeFilter(15000),
},
{
"OnFilterAdd",
wire.NewMsgFilterAdd([]byte{0x01}),
},
{
"OnFilterClear",
wire.NewMsgFilterClear(),
},
{
"OnFilterLoad",
wire.NewMsgFilterLoad([]byte{0x01}, 10, 0, wire.BloomUpdateNone),
},
{
"OnMerkleBlock",
wire.NewMsgMerkleBlock(wire.NewBlockHeader(1,
[]*daghash.Hash{}, &daghash.Hash{}, &daghash.Hash{}, &daghash.Hash{}, 1, 1)),
},
// only one version message is allowed
// only one verack message is allowed
{
"OnReject",
wire.NewMsgReject("block", wire.RejectDuplicate, "dupe block"),
},
}
t.Logf("Running %d tests", len(tests))
for _, test := range tests {
// Queue the test message
outPeer.QueueMessage(test.msg, nil)
select {
case <-ok:
case <-time.After(time.Second * 1):
t.Errorf("TestPeerListeners: %s timeout", test.listener)
return
}
}
inPeer.Disconnect()
outPeer.Disconnect()
}
// TestOutboundPeer tests that the outbound peer works as expected.
func TestOutboundPeer(t *testing.T) {
peerCfg := &Config{
SelectedTipHash: func() *daghash.Hash {
return &daghash.ZeroHash
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
UserAgentComments: []string{"comment"},
DAGParams: &dagconfig.MainnetParams,
Services: 0,
SubnetworkID: nil,
}
_, p, err := setupPeers(peerCfg, peerCfg)
if err != nil {
t.Fatalf("TestOuboundPeer: unexpected err in setupPeers - %v\n", err)
}
// Test trying to connect for a second time and make sure nothing happens.
err = p.AssociateConnection(p.conn)
if err != nil {
t.Fatalf("AssociateConnection for the second time didn't return nil")
}
p.Disconnect()
// Test Queue Inv
fakeBlockHash := &daghash.Hash{0: 0x00, 1: 0x01}
fakeInv := wire.NewInvVect(wire.InvTypeBlock, fakeBlockHash)
// Should be noops as the peer could not connect.
p.QueueInventory(fakeInv)
p.AddKnownInventory(fakeInv)
p.QueueInventory(fakeInv)
fakeMsg := wire.NewMsgVerAck()
p.QueueMessage(fakeMsg, nil)
done := make(chan struct{})
p.QueueMessage(fakeMsg, done)
<-done
p.Disconnect()
// Test SelectedTipHashAndBlueScore
var selectedTipHash = func() *daghash.Hash {
hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef"
hash, err := daghash.NewHashFromStr(hashStr)
if err != nil {
t.Fatalf("daghash.NewHashFromStr: %s", err)
}
return hash
}
peerCfg.SelectedTipHash = selectedTipHash
_, p1, err := setupPeers(peerCfg, peerCfg)
if err != nil {
t.Fatalf("TestOuboundPeer: unexpected err in setupPeers - %v\n", err)
}
// Test Queue Inv after connection
p1.QueueInventory(fakeInv)
p1.Disconnect()
// Test regression
peerCfg.DAGParams = &dagconfig.RegressionNetParams
peerCfg.Services = wire.SFNodeBloom
_, p2, err := setupPeers(peerCfg, peerCfg)
if err != nil {
t.Fatalf("NewOutboundPeer: unexpected err - %v\n", err)
}
// Test PushXXX
var addrs []*wire.NetAddress
for i := 0; i < 5; i++ {
na := wire.NetAddress{}
addrs = append(addrs, &na)
}
if _, err := p2.PushAddrMsg(addrs, nil); err != nil {
t.Fatalf("PushAddrMsg: unexpected err %v\n", err)
}
if err := p2.PushGetBlockInvsMsg(&daghash.Hash{}, &daghash.Hash{}); err != nil {
t.Fatalf("PushGetBlockInvsMsg: unexpected err %v\n", err)
}
p2.PushRejectMsg("block", wire.RejectMalformed, "malformed", nil, false)
p2.PushRejectMsg("block", wire.RejectInvalid, "invalid", nil, false)
// Test Queue Messages
p2.QueueMessage(wire.NewMsgGetAddr(false, nil), nil)
p2.QueueMessage(wire.NewMsgPing(1), nil)
p2.QueueMessage(wire.NewMsgGetData(), nil)
p2.QueueMessage(wire.NewMsgFeeFilter(20000), nil)
p2.Disconnect()
}
// Tests that the node disconnects from peers with an unsupported protocol
// version.
func TestUnsupportedVersionPeer(t *testing.T) {
peerCfg := &Config{
UserAgentName: "peer",
UserAgentVersion: "1.0",
UserAgentComments: []string{"comment"},
DAGParams: &dagconfig.MainnetParams,
Services: 0,
SelectedTipHash: fakeSelectedTipFn,
}
localNA := wire.NewNetAddressIPPort(
net.ParseIP("10.0.0.1:16111"),
uint16(16111),
wire.SFNodeNetwork,
)
remoteNA := wire.NewNetAddressIPPort(
net.ParseIP("10.0.0.2:16111"),
uint16(16111),
wire.SFNodeNetwork,
)
localConn, remoteConn := pipe(
&conn{laddr: "10.0.0.1:16111", raddr: "10.0.0.2:16111"},
&conn{laddr: "10.0.0.2:16111", raddr: "10.0.0.1:16111"},
)
p, err := NewOutboundPeer(peerCfg, "10.0.0.1:16111")
if err != nil {
t.Fatalf("NewOutboundPeer: unexpected err - %v\n", err)
}
go func() {
err := p.AssociateConnection(localConn)
wantErrorMessage := "protocol version must be 1 or greater"
if err == nil {
t.Fatalf("No error from AssociateConnection to invalid protocol version")
}
gotErrorMessage := err.Error()
if !strings.Contains(gotErrorMessage, wantErrorMessage) {
t.Fatalf("Wrong error message from AssociateConnection to invalid protocol version.\nWant: '%s'\nGot: '%s'",
wantErrorMessage, gotErrorMessage)
}
}()
// Read outbound messages to peer into a channel
outboundMessages := make(chan wire.Message)
go func() {
for {
_, msg, _, err := wire.ReadMessageN(
remoteConn,
p.ProtocolVersion(),
peerCfg.DAGParams.Net,
)
if err == io.EOF {
close(outboundMessages)
return
}
if err != nil {
t.Errorf("Error reading message from local node: %v\n", err)
return
}
outboundMessages <- msg
}
}()
// Read version message sent to remote peer
select {
case msg := <-outboundMessages:
if _, ok := msg.(*wire.MsgVersion); !ok {
t.Fatalf("Expected version message, got [%s]", msg.Command())
}
case <-time.After(time.Second):
t.Fatal("Peer did not send version message")
}
// Remote peer writes version message advertising invalid protocol version 0
invalidVersionMsg := wire.NewMsgVersion(remoteNA, localNA, 0, &daghash.ZeroHash, nil)
invalidVersionMsg.ProtocolVersion = 0
_, err = wire.WriteMessageN(
remoteConn.Writer,
invalidVersionMsg,
uint32(invalidVersionMsg.ProtocolVersion),
peerCfg.DAGParams.Net,
)
if err != nil {
t.Fatalf("wire.WriteMessageN: unexpected err - %v\n", err)
}
// Expect peer to disconnect automatically
disconnected := make(chan struct{})
go func() {
p.WaitForDisconnect()
disconnected <- struct{}{}
}()
select {
case <-disconnected:
close(disconnected)
case <-time.After(time.Second):
t.Fatal("Peer did not automatically disconnect")
}
// Expect no further outbound messages from peer
select {
case msg, chanOpen := <-outboundMessages:
if chanOpen {
t.Fatalf("Expected no further messages, received [%s]", msg.Command())
}
case <-time.After(time.Second):
t.Fatal("Timeout waiting for remote reader to close")
}
}
func init() {
// Allow self connection when running the tests.
TstAllowSelfConns()
}
func fakeSelectedTipFn() *daghash.Hash {
return &daghash.Hash{0x12, 0x34}
}
func setupPeers(inPeerCfg, outPeerCfg *Config) (inPeer *Peer, outPeer *Peer, err error) {
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:16111"},
&conn{raddr: "10.0.0.2:16111"},
)
return setupPeersWithConns(inPeerCfg, outPeerCfg, inConn, outConn)
}
func setupPeersWithConns(inPeerCfg, outPeerCfg *Config, inConn, outConn *conn) (inPeer *Peer, outPeer *Peer, err error) {
inPeer = NewInboundPeer(inPeerCfg)
inPeerDone := make(chan struct{})
var inPeerErr error
go func() {
inPeerErr = inPeer.AssociateConnection(inConn)
inPeerDone <- struct{}{}
}()
outPeer, err = NewOutboundPeer(outPeerCfg, outConn.raddr)
if err != nil {
return nil, nil, err
}
outPeerDone := make(chan struct{})
var outPeerErr error
go func() {
outPeerErr = outPeer.AssociateConnection(outConn)
outPeerDone <- struct{}{}
}()
// wait for AssociateConnection to complete in all instances
if !testtools.WaitTillAllCompleteOrTimeout(2*time.Second, inPeerDone, outPeerDone) {
return nil, nil, errors.New("handshake timeout")
}
if inPeerErr != nil && outPeerErr != nil {
return nil, nil, errors.Errorf("both inPeer and outPeer failed connecting: \nInPeer: %+v\nOutPeer: %+v",
inPeerErr, outPeerErr)
}
if inPeerErr != nil {
return nil, nil, inPeerErr
}
if outPeerErr != nil {
return nil, nil, outPeerErr
}
return inPeer, outPeer, nil
}

View File

@ -4,6 +4,7 @@ import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
)
@ -23,7 +24,7 @@ func HandleRelayBlockRequests(incomingRoute *router.Route, outgoingRoute *router
// Fetch the block from the database.
block, err := dag.BlockByHash(hash)
if blockdag.IsNotInDAGErr(err) {
return errors.Errorf("block %s not found", hash)
return protocolerrors.Errorf(true, "block %s not found", hash)
} else if err != nil {
panic(errors.Wrapf(err, "unable to fetch requested block hash %s", hash))
}
@ -34,7 +35,7 @@ func HandleRelayBlockRequests(incomingRoute *router.Route, outgoingRoute *router
nodeSubnetworkID := dag.SubnetworkID()
peerSubnetworkID, err := peer.SubnetworkID()
if err != nil {
panic(err)
return err
}
isNodeFull := nodeSubnetworkID == nil

View File

@ -6,6 +6,7 @@ import (
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/blocklogger"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/daghash"
mathUtil "github.com/kaspanet/kaspad/util/math"
@ -14,6 +15,8 @@ import (
"time"
)
const timeout = 30 * time.Second
// HandleRelayInvs listens to wire.MsgInvRelayBlock messages, requests their corresponding blocks if they
// are missing, adds them to the DAG and propagates them to the rest of the network.
func HandleRelayInvs(incomingRoute *router.Route, outgoingRoute *router.Route,
@ -31,7 +34,7 @@ func HandleRelayInvs(incomingRoute *router.Route, outgoingRoute *router.Route,
if dag.IsKnownBlock(inv.Hash) {
if dag.IsKnownInvalid(inv.Hash) {
return errors.Errorf("sent inv of an invalid block %s",
return protocolerrors.Errorf(true, "sent inv of an invalid block %s",
inv.Hash)
}
continue
@ -68,7 +71,7 @@ func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlock) (
inv, ok := msg.(*wire.MsgInvRelayBlock)
if !ok {
return nil, false, errors.Errorf("unexpected %s message in the block relay flow while "+
return nil, false, protocolerrors.Errorf(true, "unexpected %s message in the block relay flow while "+
"expecting an inv message", msg.Command())
}
return inv, false, nil
@ -98,7 +101,10 @@ func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Rou
defer requestedBlocks.removeSet(pendingBlocks)
getRelayBlocksMsg := wire.NewMsgGetRelayBlocks(filteredHashesToRequest)
isOpen := outgoingRoute.Enqueue(getRelayBlocksMsg)
isOpen, err := outgoingRoute.EnqueueWithTimeout(getRelayBlocksMsg, timeout)
if err != nil {
return false, err
}
if !isOpen {
return true, nil
}
@ -115,7 +121,7 @@ func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Rou
block := util.NewBlock(msgBlock)
blockHash := block.Hash()
if _, ok := pendingBlocks[*blockHash]; !ok {
return false, errors.Errorf("got unrequested block %s", block.Hash())
return false, protocolerrors.Errorf(true, "got unrequested block %s", block.Hash())
}
delete(pendingBlocks, *blockHash)
requestedBlocks.remove(blockHash)
@ -138,7 +144,6 @@ func readMsgBlock(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlo
msgBlock *wire.MsgBlock, shouldStop bool, err error) {
for {
const timeout = 30 * time.Second
message, isOpen, err := incomingRoute.DequeueWithTimeout(timeout)
if err != nil {
return nil, false, err
@ -174,7 +179,7 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
log.Infof("Rejected block %s from %s: %s", blockHash,
peer, err)
return false, errors.Wrap(err, "got invalid block: %s")
return false, protocolerrors.Wrap(true, err, "got invalid block")
}
if isDelayed {
@ -184,7 +189,8 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
if isOrphan {
blueScore, err := block.BlueScore()
if err != nil {
return false, errors.Errorf("received an orphan block %s with malformed blue score", blockHash)
return false, protocolerrors.Errorf(true, "received an orphan "+
"block %s with malformed blue score", blockHash)
}
const maxOrphanBlueScoreDiff = 10000
@ -205,7 +211,7 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
}
err = blocklogger.LogBlock(block)
if err != nil {
panic(err)
return false, err
}
//TODO(libp2p)
//// When the block is not an orphan, log information about it and

110
protocol/handshake.go Normal file
View File

@ -0,0 +1,110 @@
package protocol
import (
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/receiveversion"
"github.com/kaspanet/kaspad/protocol/sendversion"
"github.com/kaspanet/kaspad/util/locks"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"sync"
"sync/atomic"
)
func handshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
dag *blockdag.BlockDAG, addressManager *addrmgr.AddrManager) (closed bool, err error) {
receiveVersionRoute, err := router.AddIncomingRoute([]string{wire.CmdVersion})
if err != nil {
panic(err)
}
sendVersionRoute, err := router.AddIncomingRoute([]string{wire.CmdVerAck})
if err != nil {
panic(err)
}
// For the handshake to finish, we need to get from the other node
// a version and verack messages, so we increase the wait group by 2
// and block the handshake with wg.Wait().
wg := sync.WaitGroup{}
wg.Add(2)
errChanUsed := uint32(0)
errChan := make(chan error)
var peerAddress *wire.NetAddress
spawn(func() {
defer wg.Done()
address, closed, err := receiveversion.ReceiveVersion(receiveVersionRoute, router.OutgoingRoute(), netAdapter, peer, dag)
if err != nil {
log.Errorf("error from ReceiveVersion: %s", err)
}
if err != nil || closed {
if atomic.AddUint32(&errChanUsed, 1) != 1 {
errChan <- err
}
return
}
peerAddress = address
})
spawn(func() {
defer wg.Done()
closed, err := sendversion.SendVersion(sendVersionRoute, router.OutgoingRoute(), netAdapter, dag)
if err != nil {
log.Errorf("error from SendVersion: %s", err)
}
if err != nil || closed {
if atomic.AddUint32(&errChanUsed, 1) != 1 {
errChan <- err
}
return
}
})
select {
case err := <-errChan:
if err != nil {
return false, err
}
return true, nil
case <-locks.ReceiveFromChanWhenDone(func() { wg.Wait() }):
}
err = peerpkg.AddToReadyPeers(peer)
if err != nil {
if errors.Is(err, peerpkg.ErrPeerWithSameIDExists) {
return false, err
}
panic(err)
}
peerID, err := peer.ID()
if err != nil {
panic(err)
}
err = netAdapter.AssociateRouterID(router, peerID)
if err != nil {
panic(err)
}
if peerAddress != nil {
subnetworkID, err := peer.SubnetworkID()
if err != nil {
panic(err)
}
addressManager.AddAddress(peerAddress, peerAddress, subnetworkID)
}
err = router.RemoveRoute([]string{wire.CmdVersion, wire.CmdVerAck})
if err != nil {
panic(err)
}
return false, nil
}

View File

@ -0,0 +1,42 @@
package peer
// Ban scores for misbehaving nodes
const (
BanScoreUnrequestedBlock = 100
BanScoreInvalidBlock = 100
BanScoreInvalidInvBlock = 100
BanScoreOrphanInvAsPartOfNetsync = 100
BanScoreMalformedBlueScoreInOrphan = 100
BanScoreRequestNonExistingBlock = 10
BanScoreUnrequestedSelectedTip = 20
BanScoreUnrequestedTx = 20
BanScoreInvalidTx = 100
BanScoreMalformedMessage = 10
BanScoreNonVersionFirstMessage = 1
BanScoreDuplicateVersion = 1
BanScoreDuplicateVerack = 1
BanScoreSentTooManyAddresses = 20
BanScoreMsgAddrWithInvalidSubnetwork = 10
BanScoreInvalidFeeFilter = 100
BanScoreNoFilterLoaded = 5
BanScoreInvalidMsgGetBlockInvs = 10
BanScoreInvalidMsgGetBlockLocator = 100
BanScoreEmptyBlockLocator = 100
BanScoreSentTxToBlocksOnly = 20
BanScoreNodeBloomFlagViolation = 100
BanScoreStallTimeout = 1
BanScoreUnrequestedMessage = 100
)

View File

@ -3,6 +3,7 @@ package peer
import (
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/util/daghash"
mathUtil "github.com/kaspanet/kaspad/util/math"
"github.com/kaspanet/kaspad/util/subnetworkid"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
@ -18,7 +19,7 @@ type Peer struct {
selectedTipHashMtx sync.RWMutex
selectedTipHash *daghash.Hash
id uint32
id *id.ID
userAgent string
services wire.ServiceFlag
advertisedProtocolVer uint32 // protocol version advertised by remote
@ -62,6 +63,14 @@ func (p *Peer) SubnetworkID() (*subnetworkid.SubnetworkID, error) {
return p.subnetworkID, nil
}
// ID returns the peer ID.
func (p *Peer) ID() (*id.ID, error) {
if atomic.LoadUint32(&p.ready) == 0 {
return nil, errors.New("peer is not ready yet")
}
return p.id, nil
}
// MarkAsReady marks the peer as ready.
func (p *Peer) MarkAsReady() error {
if atomic.AddUint32(&p.ready, 1) != 1 {
@ -71,15 +80,15 @@ func (p *Peer) MarkAsReady() error {
}
// UpdateFieldsFromMsgVersion updates the peer with the data from the version message.
func (p *Peer) UpdateFieldsFromMsgVersion(msg *wire.MsgVersion, peerID uint32) {
func (p *Peer) UpdateFieldsFromMsgVersion(msg *wire.MsgVersion) {
// Negotiate the protocol version.
p.advertisedProtocolVer = msg.ProtocolVersion
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtocolVer)
p.protocolVersion = mathUtil.MinUint32(p.protocolVersion, p.advertisedProtocolVer)
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Set the peer's ID.
p.id = peerID
p.id = msg.ID
// Set the supported services for the peer to what the remote peer
// advertised.
@ -116,17 +125,47 @@ func (p *Peer) String() string {
panic("unimplemented")
}
// minUint32 is a helper function to return the minimum of two uint32s.
// This avoids a math import and the need to cast to floats.
func minUint32(a, b uint32) uint32 {
if a < b {
return a
var (
readyPeers = make(map[*id.ID]*Peer, 0)
readyPeersMutex sync.RWMutex
)
// ErrPeerWithSameIDExists signifies that a peer with the same ID already exist.
var ErrPeerWithSameIDExists = errors.New("ready with the same ID already exists")
// AddToReadyPeers marks this peer as ready and adds it to the ready peers list.
func AddToReadyPeers(peer *Peer) error {
readyPeersMutex.RLock()
defer readyPeersMutex.RUnlock()
if _, ok := readyPeers[peer.id]; ok {
return errors.Wrapf(ErrPeerWithSameIDExists, "peer with ID %s already exists", peer.id)
}
return b
err := peer.MarkAsReady()
if err != nil {
return err
}
readyPeers[peer.id] = peer
return nil
}
// GetReadyPeerIDs returns the peer IDs of all the ready peers.
func GetReadyPeerIDs() []*id.ID {
// TODO(libp2p)
panic("unimplemented")
readyPeersMutex.RLock()
defer readyPeersMutex.RUnlock()
peerIDs := make([]*id.ID, len(readyPeers))
i := 0
for peerID := range readyPeers {
peerIDs[i] = peerID
i++
}
return peerIDs
}
// IDExists returns whether there's a peer with the given ID.
func IDExists(peerID *id.ID) bool {
_, ok := readyPeers[peerID]
return ok
}

View File

@ -1,9 +1,9 @@
package ping
import (
"errors"
"github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/util/random"
"github.com/kaspanet/kaspad/wire"
"time"
@ -65,7 +65,7 @@ func SendPings(incomingRoute *router.Route, outgoingRoute *router.Route, peer *p
}
pongMessage := message.(*wire.MsgPing)
if pongMessage.Nonce != pingMessage.Nonce {
return errors.New("nonce mismatch between ping and pong")
return protocolerrors.New(true, "nonce mismatch between ping and pong")
}
peer.SetPingIdle()
}

View File

@ -1,6 +1,7 @@
package protocol
import (
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
@ -8,7 +9,9 @@ import (
"github.com/kaspanet/kaspad/protocol/handlerelayinvs"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/ping"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"sync/atomic"
)
@ -18,13 +21,15 @@ type Manager struct {
}
// NewManager creates a new instance of the p2p protocol manager
func NewManager(listeningAddrs []string, dag *blockdag.BlockDAG) (*Manager, error) {
func NewManager(listeningAddrs []string, dag *blockdag.BlockDAG,
addressManager *addrmgr.AddrManager) (*Manager, error) {
netAdapter, err := netadapter.NewNetAdapter(listeningAddrs)
if err != nil {
return nil, err
}
routerInitializer := newRouterInitializer(netAdapter, dag)
routerInitializer := newRouterInitializer(netAdapter, addressManager, dag)
netAdapter.SetRouterInitializer(routerInitializer)
manager := Manager{
@ -43,26 +48,48 @@ func (p *Manager) Stop() error {
return p.netAdapter.Stop()
}
func newRouterInitializer(netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG) netadapter.RouterInitializer {
func newRouterInitializer(netAdapter *netadapter.NetAdapter,
addressManager *addrmgr.AddrManager, dag *blockdag.BlockDAG) netadapter.RouterInitializer {
return func() (*routerpkg.Router, error) {
router := routerpkg.NewRouter()
spawn(func() {
err := startFlows(netAdapter, router, dag)
err := startFlows(netAdapter, router, dag, addressManager)
if err != nil {
// TODO(libp2p) Ban peer
if protocolErr := &(protocolerrors.ProtocolError{}); errors.As(err, &protocolErr) {
if protocolErr.ShouldBan {
// TODO(libp2p) Ban peer
panic("unimplemented")
}
// TODO(libp2p) Disconnect from peer
panic("unimplemented")
}
if errors.Is(err, routerpkg.ErrTimeout) {
// TODO(libp2p) Disconnect peer
panic("unimplemented")
}
panic(err)
}
})
return router, nil
}
}
func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag *blockdag.BlockDAG) error {
func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag *blockdag.BlockDAG,
addressManager *addrmgr.AddrManager) error {
stop := make(chan error)
stopped := uint32(0)
outgoingRoute := router.OutgoingRoute()
peer := new(peerpkg.Peer)
closed, err := handshake(router, netAdapter, peer, dag, addressManager)
if err != nil {
return err
}
if closed {
return nil
}
addFlow("HandleRelayInvs", router, []string{wire.CmdInvRelayBlock, wire.CmdBlock}, &stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return handlerelayinvs.HandleRelayInvs(incomingRoute, outgoingRoute, peer, netAdapter, dag)
@ -87,7 +114,7 @@ func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag
},
)
err := <-stop
err = <-stop
return err
}

View File

@ -0,0 +1,55 @@
package protocolerrors
import "github.com/pkg/errors"
// ProtocolError is an error that signifies a violation
// of the peer-to-peer protocol
type ProtocolError struct {
ShouldBan bool
Cause error
}
func (e *ProtocolError) Error() string {
return e.Cause.Error()
}
func (e *ProtocolError) Unwrap() error {
return e.Cause
}
// Errorf formats according to a format specifier and returns the string
// as a value that satisfies error.
// Errorf also records the stack trace at the point it was called.
func Errorf(shouldBan bool, format string, args ...interface{}) error {
return &ProtocolError{
ShouldBan: shouldBan,
Cause: errors.Errorf(format, args...),
}
}
// New returns an error with the supplied message.
// New also records the stack trace at the point it was called.
func New(shouldBan bool, message string) error {
return &ProtocolError{
ShouldBan: shouldBan,
Cause: errors.New(message),
}
}
// Wrap returns an error annotating err with a stack trace
// at the point Wrap is called, and the supplied message.
func Wrap(shouldBan bool, err error, message string) error {
return &ProtocolError{
ShouldBan: shouldBan,
Cause: errors.Wrap(err, message),
}
}
// Wrapf returns an error annotating err with a stack trace
// at the point Wrapf is called, and the format specifier.
func Wrapf(shouldBan bool, err error, format string, args ...interface{}) error {
return &ProtocolError{
ShouldBan: shouldBan,
Cause: errors.Wrapf(err, format, args...),
}
}

View File

@ -0,0 +1,85 @@
package receiveversion
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/wire"
"time"
)
var (
// allowSelfConnections is only used to allow the tests to bypass the self
// connection detecting and disconnect logic since they intentionally
// do so for testing purposes.
allowSelfConnections bool
// minAcceptableProtocolVersion is the lowest protocol version that a
// connected peer may support.
minAcceptableProtocolVersion = wire.ProtocolVersion
)
const timeout = 30 * time.Second
// ReceiveVersion waits for the peer to send a version message, sends a
// verack in response, and updates its info accordingly.
func ReceiveVersion(incomingRoute *router.Route, outgoingRoute *router.Route, netAdapter *netadapter.NetAdapter,
peer *peerpkg.Peer, dag *blockdag.BlockDAG) (addr *wire.NetAddress, routeClosed bool, err error) {
message, isOpen := incomingRoute.Dequeue()
if !isOpen {
return nil, true, nil
}
msgVersion, ok := message.(*wire.MsgVersion)
if !ok {
return nil, false, protocolerrors.New(true, "a version message must precede all others")
}
if !allowSelfConnections && netAdapter.ID().IsEqual(msgVersion.ID) {
return nil, false, protocolerrors.New(true, "connected to self")
}
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if msgVersion.ProtocolVersion < minAcceptableProtocolVersion {
//TODO(libp2p) create error type for disconnect but don't ban
return nil, false, protocolerrors.Errorf(false, "protocol version must be %d or greater",
minAcceptableProtocolVersion)
}
// Disconnect from partial nodes in networks that don't allow them
if !dag.Params.EnableNonNativeSubnetworks && msgVersion.SubnetworkID != nil {
return nil, false, protocolerrors.New(true, "partial nodes are not allowed")
}
// TODO(libp2p)
//// Disconnect if:
//// - we are a full node and the outbound connection we've initiated is a partial node
//// - the remote node is partial and our subnetwork doesn't match their subnetwork
//localSubnetworkID := config.ActiveConfig().SubnetworkID
//isLocalNodeFull := localSubnetworkID == nil
//isRemoteNodeFull := msgVersion.SubnetworkID == nil
//if (isLocalNodeFull && !isRemoteNodeFull && !connection.IsInbound()) ||
// (!isLocalNodeFull && !isRemoteNodeFull && !msgVersion.SubnetworkID.IsEqual(localSubnetworkID)) {
//
// return nil, false, errors.New("incompatible subnetworks")
//}
peer.UpdateFieldsFromMsgVersion(msgVersion)
isOpen, err = outgoingRoute.EnqueueWithTimeout(wire.NewMsgVerAck(), timeout)
if err != nil {
return nil, false, err
}
if !isOpen {
return nil, true, nil
}
// TODO(libp2p) Register peer ID
return msgVersion.Address, false, nil
}

View File

@ -0,0 +1,9 @@
package sendversion
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.GBRL)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -0,0 +1,73 @@
package sendversion
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/version"
"github.com/kaspanet/kaspad/wire"
"time"
)
var (
// userAgentName is the user agent name and is used to help identify
// ourselves to other kaspa peers.
userAgentName = "kaspad"
// userAgentVersion is the user agent version and is used to help
// identify ourselves to other kaspa peers.
userAgentVersion = version.Version()
// defaultServices describes the default services that are supported by
// the server.
defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom | wire.SFNodeCF
// defaultRequiredServices describes the default services that are
// required to be supported by outbound peers.
defaultRequiredServices = wire.SFNodeNetwork
)
const timeout = 30 * time.Second
// SendVersion sends a version to a peer and waits for verack.
func SendVersion(incomingRoute *router.Route, outgoingRoute *router.Route, netAdapter *netadapter.NetAdapter,
dag *blockdag.BlockDAG) (routeClosed bool, err error) {
selectedTipHash := dag.SelectedTipHash()
subnetworkID := config.ActiveConfig().SubnetworkID
// Version message.
localAddr, err := netAdapter.GetBestLocalAddress()
if err != nil {
panic(err)
}
msg := wire.NewMsgVersion(localAddr, netAdapter.ID(), selectedTipHash, subnetworkID)
msg.AddUserAgent(userAgentName, userAgentVersion, config.ActiveConfig().UserAgentComments...)
// Advertise the services flag
msg.Services = defaultServices
// Advertise our max supported protocol version.
msg.ProtocolVersion = wire.ProtocolVersion
// Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = config.ActiveConfig().BlocksOnly
isOpen, err := outgoingRoute.EnqueueWithTimeout(msg, timeout)
if err != nil {
return false, err
}
if !isOpen {
return true, nil
}
_, isOpen, err = incomingRoute.DequeueWithTimeout(timeout)
if err != nil {
return false, err
}
if !isOpen {
return true, nil
}
return false, nil
}

9
util/locks/log.go Normal file
View File

@ -0,0 +1,9 @@
package locks
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.UTIL)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -0,0 +1,11 @@
package locks
// ReceiveFromChanWhenDone takes a blocking function and returns a channel that sends an empty struct when the function is done.
func ReceiveFromChanWhenDone(callback func()) <-chan struct{} {
ch := make(chan struct{})
spawn(func() {
callback()
close(ch)
})
return ch
}

View File

@ -7,6 +7,7 @@ package wire
import (
"encoding/binary"
"fmt"
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/util/binaryserializer"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/util/mstime"
@ -128,6 +129,9 @@ func ReadElement(r io.Reader, element interface{}) error {
}
return nil
case *id.ID:
return e.Deserialize(r)
case *subnetworkid.SubnetworkID:
_, err := io.ReadFull(r, e[:])
if err != nil {
@ -269,6 +273,9 @@ func WriteElement(w io.Writer, element interface{}) error {
}
return nil
case *id.ID:
return e.Serialize(w)
case *subnetworkid.SubnetworkID:
_, err := w.Write(e[:])
if err != nil {

View File

@ -168,7 +168,10 @@ func readMessageHeader(r io.Reader) (int, *messageHeader, error) {
// Create and populate a messageHeader struct from the raw header bytes.
hdr := messageHeader{}
var command [CommandSize]byte
readElements(hr, &hdr.magic, &command, &hdr.length, &hdr.checksum)
err = readElements(hr, &hdr.magic, &command, &hdr.length, &hdr.checksum)
if err != nil {
return 0, nil, err
}
// Strip trailing zeros from command string.
hdr.command = string(bytes.TrimRight(command[:], string(0)))
@ -249,7 +252,10 @@ func WriteMessageN(w io.Writer, msg Message, pver uint32, kaspaNet KaspaNet) (in
// rather than directly to the writer since writeElements doesn't
// return the number of bytes written.
hw := bytes.NewBuffer(make([]byte, 0, MessageHeaderSize))
writeElements(hw, hdr.magic, command, hdr.length, hdr.checksum)
err = writeElements(hw, hdr.magic, command, hdr.length, hdr.checksum)
if err != nil {
return 0, err
}
// Write header.
n, err := w.Write(hw.Bytes())

View File

@ -7,6 +7,7 @@ package wire
import (
"bytes"
"encoding/binary"
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/util/mstime"
"github.com/pkg/errors"
"io"
@ -47,7 +48,10 @@ func TestMessage(t *testing.T) {
addrMe := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 16111}
me := NewNetAddress(addrMe, SFNodeNetwork)
me.Timestamp = mstime.Time{} // Version message has zero value timestamp.
msgVersion := NewMsgVersion(me, you, 123123, &daghash.ZeroHash, nil)
idMeBytes := make([]byte, id.IDLength)
idMeBytes[0] = 0xff
idMe := id.FromBytes(idMeBytes)
msgVersion := NewMsgVersion(me, idMe, &daghash.ZeroHash, nil)
msgVerack := NewMsgVerAck()
msgGetAddr := NewMsgGetAddr(false, nil)
@ -77,7 +81,7 @@ func TestMessage(t *testing.T) {
kaspaNet KaspaNet // Network to use for wire encoding
bytes int // Expected num bytes read/written
}{
{msgVersion, msgVersion, pver, Mainnet, 153},
{msgVersion, msgVersion, pver, Mainnet, 136},
{msgVerack, msgVerack, pver, Mainnet, 24},
{msgGetAddr, msgGetAddr, pver, Mainnet, 26},
{msgAddr, msgAddr, pver, Mainnet, 27},
@ -119,7 +123,7 @@ func TestMessage(t *testing.T) {
rbuf := bytes.NewReader(buf.Bytes())
nr, msg, _, err := ReadMessageN(rbuf, test.pver, test.kaspaNet)
if err != nil {
t.Errorf("ReadMessage #%d error %v, msg %v", i, err,
t.Errorf("ReadMessage #%d error %v, msg %+v", i, err,
spew.Sdump(msg))
continue
}

View File

@ -7,6 +7,7 @@ package wire
import (
"bytes"
"fmt"
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/util/mstime"
"github.com/kaspanet/kaspad/version"
"github.com/pkg/errors"
@ -41,15 +42,11 @@ type MsgVersion struct {
// Time the message was generated. This is encoded as an int64 on the wire.
Timestamp mstime.Time
// Address of the remote peer.
AddrYou NetAddress
// Address of the local peer.
AddrMe NetAddress
Address *NetAddress
// Unique value associated with message that is used to detect self
// connections.
Nonce uint64
// The peer unique ID
ID *id.ID
// The user agent that generated messsage. This is a encoded as a varString
// on the wire. This has a max length of MaxUserAgentLen.
@ -114,16 +111,22 @@ func (msg *MsgVersion) KaspaDecode(r io.Reader, pver uint32) error {
msg.SubnetworkID = &subnetworkID
}
err = readNetAddress(buf, pver, &msg.AddrYou, false)
var hasAddress bool
err = ReadElement(r, &hasAddress)
if err != nil {
return err
}
err = readNetAddress(buf, pver, &msg.AddrMe, false)
if err != nil {
return err
if hasAddress {
msg.Address = new(NetAddress)
err = readNetAddress(buf, pver, msg.Address, false)
if err != nil {
return err
}
}
err = ReadElement(buf, &msg.Nonce)
msg.ID = new(id.ID)
err = ReadElement(buf, msg.ID)
if err != nil {
return err
}
@ -180,17 +183,19 @@ func (msg *MsgVersion) KaspaEncode(w io.Writer, pver uint32) error {
}
}
err = writeNetAddress(w, pver, &msg.AddrYou, false)
if err != nil {
return err
if msg.Address != nil {
err = WriteElement(w, true)
if err != nil {
return err
}
err = writeNetAddress(w, pver, msg.Address, false)
if err != nil {
return err
}
}
err = writeNetAddress(w, pver, &msg.AddrMe, false)
if err != nil {
return err
}
err = WriteElement(w, msg.Nonce)
err = WriteElement(w, msg.ID)
if err != nil {
return err
}
@ -234,7 +239,7 @@ func (msg *MsgVersion) MaxPayloadLength(pver uint32) uint32 {
// NewMsgVersion returns a new kaspa version message that conforms to the
// Message interface using the passed parameters and defaults for the remaining
// fields.
func NewMsgVersion(me *NetAddress, you *NetAddress, nonce uint64,
func NewMsgVersion(addr *NetAddress, id *id.ID,
selectedTipHash *daghash.Hash, subnetworkID *subnetworkid.SubnetworkID) *MsgVersion {
// Limit the timestamp to one millisecond precision since the protocol
@ -243,9 +248,8 @@ func NewMsgVersion(me *NetAddress, you *NetAddress, nonce uint64,
ProtocolVersion: ProtocolVersion,
Services: 0,
Timestamp: mstime.Now(),
AddrYou: *you,
AddrMe: *me,
Nonce: nonce,
Address: addr,
ID: id,
UserAgent: DefaultUserAgent,
SelectedTipHash: selectedTipHash,
DisableRelayTx: false,
@ -267,7 +271,7 @@ func validateUserAgent(userAgent string) error {
// message. The version string is not defined to any strict format, although
// it is recommended to use the form "major.minor.revision" e.g. "2.6.41".
func (msg *MsgVersion) AddUserAgent(name string, version string,
comments ...string) error {
comments ...string) {
newUserAgent := fmt.Sprintf("%s:%s", name, version)
if len(comments) != 0 {
@ -275,10 +279,5 @@ func (msg *MsgVersion) AddUserAgent(name string, version string,
strings.Join(comments, "; "))
}
newUserAgent = fmt.Sprintf("%s%s/", msg.UserAgent, newUserAgent)
err := validateUserAgent(newUserAgent)
if err != nil {
return err
}
msg.UserAgent = newUserAgent
return nil
}

View File

@ -7,9 +7,9 @@ package wire
import (
"bytes"
"github.com/davecgh/go-spew/spew"
id "github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/util/mstime"
"github.com/kaspanet/kaspad/util/random"
"github.com/pkg/errors"
"io"
"net"
@ -26,30 +26,24 @@ func TestVersion(t *testing.T) {
selectedTipHash := &daghash.Hash{12, 34}
tcpAddrMe := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 16111}
me := NewNetAddress(tcpAddrMe, SFNodeNetwork)
tcpAddrYou := &net.TCPAddr{IP: net.ParseIP("192.168.0.1"), Port: 16111}
you := NewNetAddress(tcpAddrYou, SFNodeNetwork)
nonce, err := random.Uint64()
generatedID, err := id.GenerateID()
if err != nil {
t.Errorf("random.Uint64: error generating nonce: %v", err)
t.Fatalf("id.GenerateID: %s", err)
}
// Ensure we get the correct data back out.
msg := NewMsgVersion(me, you, nonce, selectedTipHash, nil)
msg := NewMsgVersion(me, generatedID, selectedTipHash, nil)
if msg.ProtocolVersion != pver {
t.Errorf("NewMsgVersion: wrong protocol version - got %v, want %v",
msg.ProtocolVersion, pver)
}
if !reflect.DeepEqual(&msg.AddrMe, me) {
if !reflect.DeepEqual(msg.Address, me) {
t.Errorf("NewMsgVersion: wrong me address - got %v, want %v",
spew.Sdump(&msg.AddrMe), spew.Sdump(me))
spew.Sdump(&msg.Address), spew.Sdump(me))
}
if !reflect.DeepEqual(&msg.AddrYou, you) {
t.Errorf("NewMsgVersion: wrong you address - got %v, want %v",
spew.Sdump(&msg.AddrYou), spew.Sdump(you))
}
if msg.Nonce != nonce {
t.Errorf("NewMsgVersion: wrong nonce - got %v, want %v",
msg.Nonce, nonce)
if msg.ID.String() != generatedID.String() {
t.Errorf("NewMsgVersion: wrong nonce - got %s, want %s",
msg.ID, generatedID)
}
if msg.UserAgent != DefaultUserAgent {
t.Errorf("NewMsgVersion: wrong user agent - got %v, want %v",
@ -78,15 +72,6 @@ func TestVersion(t *testing.T) {
msg.UserAgent, customUserAgent)
}
// accounting for ":", "/"
err = msg.AddUserAgent(strings.Repeat("t",
MaxUserAgentLen-len(customUserAgent)-2+1), "")
if msgErr := &(MessageError{}); !errors.As(err, &msgErr) {
t.Errorf("AddUserAgent: expected error not received "+
"- got %v, want %T", err, MessageError{})
}
// Version message should not have any services set by default.
if msg.Services != 0 {
t.Errorf("NewMsgVersion: wrong default services - got %v, want %v",
@ -224,10 +209,10 @@ func TestVersionWireErrors(t *testing.T) {
newLen := len(baseVersionEncoded) - len(baseVersion.UserAgent)
newLen = newLen + len(newUAVarIntBuf.Bytes()) - 1 + len(newUA)
exceedUAVerEncoded := make([]byte, newLen)
copy(exceedUAVerEncoded, baseVersionEncoded[0:81])
copy(exceedUAVerEncoded[81:], newUAVarIntBuf.Bytes())
copy(exceedUAVerEncoded[84:], []byte(newUA))
copy(exceedUAVerEncoded[84+len(newUA):], baseVersionEncoded[98:101])
copy(exceedUAVerEncoded, baseVersionEncoded[0:64])
copy(exceedUAVerEncoded[64:], newUAVarIntBuf.Bytes())
copy(exceedUAVerEncoded[67:], []byte(newUA))
copy(exceedUAVerEncoded[64+len(newUA):], baseVersionEncoded[78:81])
tests := []struct {
in *MsgVersion // Value to encode
@ -245,18 +230,16 @@ func TestVersionWireErrors(t *testing.T) {
{baseVersion, baseVersionEncoded, pver, 12, io.ErrShortWrite, io.EOF},
// Force error in subnetworkID.
{baseVersion, baseVersionEncoded, pver, 20, io.ErrShortWrite, io.EOF},
// Force error in remote address.
{baseVersion, baseVersionEncoded, pver, 21, io.ErrShortWrite, io.EOF},
// Force error in local address.
{baseVersion, baseVersionEncoded, pver, 48, io.ErrShortWrite, io.ErrUnexpectedEOF},
// Force error in nonce.
{baseVersion, baseVersionEncoded, pver, 74, io.ErrShortWrite, io.ErrUnexpectedEOF},
{baseVersion, baseVersionEncoded, pver, 21, io.ErrShortWrite, io.EOF},
// Force error in ID.
{baseVersion, baseVersionEncoded, pver, 49, io.ErrShortWrite, io.ErrUnexpectedEOF},
// Force error in user agent length.
{baseVersion, baseVersionEncoded, pver, 82, io.ErrShortWrite, io.EOF},
{baseVersion, baseVersionEncoded, pver, 65, io.ErrShortWrite, io.EOF},
// Force error in user agent.
{baseVersion, baseVersionEncoded, pver, 83, io.ErrShortWrite, io.ErrUnexpectedEOF},
{baseVersion, baseVersionEncoded, pver, 66, io.ErrShortWrite, io.ErrUnexpectedEOF},
// Force error in last block.
{baseVersion, baseVersionEncoded, pver, 99, io.ErrShortWrite, io.ErrUnexpectedEOF},
{baseVersion, baseVersionEncoded, pver, 82, io.ErrShortWrite, io.ErrUnexpectedEOF},
// Force error due to user agent too big
{exceedUAVer, exceedUAVerEncoded, pver, newLen, wireErr, wireErr},
}
@ -304,24 +287,23 @@ func TestVersionWireErrors(t *testing.T) {
}
}
var baseVersionID = id.FromBytes([]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
})
// baseVersion is used in the various tests as a baseline MsgVersion.
var baseVersion = &MsgVersion{
ProtocolVersion: 60002,
Services: SFNodeNetwork,
Timestamp: mstime.UnixMilliseconds(0x495fab29000),
AddrYou: NetAddress{
Timestamp: mstime.Time{}, // Zero value -- no timestamp in version
Services: SFNodeNetwork,
IP: net.ParseIP("192.168.0.1"),
Port: 16111,
},
AddrMe: NetAddress{
Address: &NetAddress{
Timestamp: mstime.Time{}, // Zero value -- no timestamp in version
Services: SFNodeNetwork,
IP: net.ParseIP("127.0.0.1"),
Port: 16111,
},
Nonce: 123123, // 0x1e0f3
ID: baseVersionID,
UserAgent: "/kaspadtest:0.0.1/",
SelectedTipHash: &daghash.Hash{0x12, 0x34},
}
@ -333,17 +315,14 @@ var baseVersionEncoded = []byte{
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // SFNodeNetwork
0x29, 0xab, 0x5f, 0x49, 0x00, 0x00, 0x00, 0x00, // 64-bit Timestamp
0x01, // is full node
// AddrYou -- No timestamp for NetAddress in version message
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // SFNodeNetwork
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0xff, 0xff, 0xc0, 0xa8, 0x00, 0x01, // IP 192.168.0.1
0x3e, 0xef, // Port 16111 in big-endian
// AddrMe -- No timestamp for NetAddress in version message
// Address -- No timestamp for NetAddress in version message
0x01, // Has address
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // SFNodeNetwork
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x01, // IP 127.0.0.1
0x3e, 0xef, // Port 16111 in big-endian
0xf3, 0xe0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, // Fake Nonce. TODO: (Ori) Replace to a real nonce
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // ID
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0x12, // Varint for user agent length
0x2f, 0x6b, 0x61, 0x73, 0x70, 0x61, 0x64, 0x74,
0x65, 0x73, 0x74, 0x3a, 0x30, 0x2e, 0x30, 0x2e, // User agent
@ -354,24 +333,23 @@ var baseVersionEncoded = []byte{
0x00, 0x00,
}
var baseVersionWithRelayTxID = id.FromBytes([]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe,
})
// baseVersionWithRelayTx is used in the various tests as a baseline MsgVersion
var baseVersionWithRelayTx = &MsgVersion{
ProtocolVersion: 70001,
Services: SFNodeNetwork,
Timestamp: mstime.UnixMilliseconds(0x17315ed0f99),
AddrYou: NetAddress{
Timestamp: mstime.Time{}, // Zero value -- no timestamp in version
Services: SFNodeNetwork,
IP: net.ParseIP("192.168.0.1"),
Port: 16111,
},
AddrMe: NetAddress{
Address: &NetAddress{
Timestamp: mstime.Time{}, // Zero value -- no timestamp in version
Services: SFNodeNetwork,
IP: net.ParseIP("127.0.0.1"),
Port: 16111,
},
Nonce: 123123, // 0x1e0f3
ID: baseVersionWithRelayTxID,
UserAgent: "/kaspadtest:0.0.1/",
SelectedTipHash: &daghash.Hash{0x12, 0x34},
}
@ -383,17 +361,14 @@ var baseVersionWithRelayTxEncoded = []byte{
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // SFNodeNetwork
0x99, 0x0f, 0xed, 0x15, 0x73, 0x01, 0x00, 0x00, // Timestamp
0x01, // is full node
// AddrYou -- No timestamp for NetAddress in version message
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // SFNodeNetwork
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0xff, 0xff, 0xc0, 0xa8, 0x00, 0x01, // IP 192.168.0.1
0x3e, 0xef, // Port 16111 in big-endian
// AddrMe -- No timestamp for NetAddress in version message
// Address -- No timestamp for NetAddress in version message
0x01, // Has address
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // SFNodeNetwork
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x01, // IP 127.0.0.1
0x3e, 0xef, // Port 16111 in big-endian
0xf3, 0xe0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, // Nonce
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // ID
0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe,
0x12, // Varint for user agent length
0x2f, 0x6b, 0x61, 0x73, 0x70, 0x61, 0x64, 0x74,
0x65, 0x73, 0x74, 0x3a, 0x30, 0x2e, 0x30, 0x2e, // User agent