P2P upgrade mechanism (#1921)

* Implement upgrade mechanism for p2p

* Remove dependencies from flowcontext to v3

* Add p2p v4

* Add Ready flow

* Remove copy paste code of v3

* Register SendAddresses flow at the top level

* Add option to set protocol version from CLI and add TestAddressExchangeV3V4

* Send ready message on minimal net adapter

* Rename defaultMaxProtocolVersion->maxAcceptableProtocolVersion
This commit is contained in:
Ori Newman 2022-01-09 09:59:45 +02:00 committed by GitHub
parent 14b2bcbd81
commit d2379608ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
64 changed files with 1529 additions and 1014 deletions

View File

@ -66,6 +66,7 @@ const (
CmdPruningPoints CmdPruningPoints
CmdRequestPruningPointProof CmdRequestPruningPointProof
CmdPruningPointProof CmdPruningPointProof
CmdReady
// rpc // rpc
CmdGetCurrentNetworkRequestMessage CmdGetCurrentNetworkRequestMessage
@ -187,6 +188,7 @@ var ProtocolMessageCommandToString = map[MessageCommand]string{
CmdPruningPoints: "PruningPoints", CmdPruningPoints: "PruningPoints",
CmdRequestPruningPointProof: "RequestPruningPointProof", CmdRequestPruningPointProof: "RequestPruningPointProof",
CmdPruningPointProof: "PruningPointProof", CmdPruningPointProof: "PruningPointProof",
CmdReady: "Ready",
} }
// RPCMessageCommandToString maps all MessageCommands to their string representation // RPCMessageCommandToString maps all MessageCommands to their string representation

View File

@ -18,7 +18,7 @@ import (
// TestBlock tests the MsgBlock API. // TestBlock tests the MsgBlock API.
func TestBlock(t *testing.T) { func TestBlock(t *testing.T) {
pver := ProtocolVersion pver := uint32(4)
// Block 1 header. // Block 1 header.
parents := blockOne.Header.Parents parents := blockOne.Header.Parents

View File

@ -22,7 +22,7 @@ import (
// TestTx tests the MsgTx API. // TestTx tests the MsgTx API.
func TestTx(t *testing.T) { func TestTx(t *testing.T) {
pver := ProtocolVersion pver := uint32(4)
txIDStr := "000000000003ba27aa200b1cecaad478d2b00432346c3f1f3986da1afd33e506" txIDStr := "000000000003ba27aa200b1cecaad478d2b00432346c3f1f3986da1afd33e506"
txID, err := transactionid.FromString(txIDStr) txID, err := transactionid.FromString(txIDStr)

View File

@ -82,12 +82,12 @@ func (msg *MsgVersion) Command() MessageCommand {
// Message interface using the passed parameters and defaults for the remaining // Message interface using the passed parameters and defaults for the remaining
// fields. // fields.
func NewMsgVersion(addr *NetAddress, id *id.ID, network string, func NewMsgVersion(addr *NetAddress, id *id.ID, network string,
subnetworkID *externalapi.DomainSubnetworkID) *MsgVersion { subnetworkID *externalapi.DomainSubnetworkID, protocolVersion uint32) *MsgVersion {
// Limit the timestamp to one millisecond precision since the protocol // Limit the timestamp to one millisecond precision since the protocol
// doesn't support better. // doesn't support better.
return &MsgVersion{ return &MsgVersion{
ProtocolVersion: ProtocolVersion, ProtocolVersion: protocolVersion,
Network: network, Network: network,
Services: 0, Services: 0,
Timestamp: mstime.Now(), Timestamp: mstime.Now(),

View File

@ -15,7 +15,7 @@ import (
// TestVersion tests the MsgVersion API. // TestVersion tests the MsgVersion API.
func TestVersion(t *testing.T) { func TestVersion(t *testing.T) {
pver := ProtocolVersion pver := uint32(4)
// Create version message data. // Create version message data.
tcpAddrMe := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 16111} tcpAddrMe := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 16111}
@ -26,7 +26,7 @@ func TestVersion(t *testing.T) {
} }
// Ensure we get the correct data back out. // Ensure we get the correct data back out.
msg := NewMsgVersion(me, generatedID, "mainnet", nil) msg := NewMsgVersion(me, generatedID, "mainnet", nil, 4)
if msg.ProtocolVersion != pver { if msg.ProtocolVersion != pver {
t.Errorf("NewMsgVersion: wrong protocol version - got %v, want %v", t.Errorf("NewMsgVersion: wrong protocol version - got %v, want %v",
msg.ProtocolVersion, pver) msg.ProtocolVersion, pver)

View File

@ -0,0 +1,22 @@
package appmessage
// MsgReady implements the Message interface and represents a kaspa
// Ready message. It is used to notify that the peer is ready to receive
// messages.
//
// This message has no payload.
type MsgReady struct {
baseMessage
}
// Command returns the protocol command string for the message. This is part
// of the Message interface implementation.
func (msg *MsgReady) Command() MessageCommand {
return CmdReady
}
// NewMsgReady returns a new kaspa Ready message that conforms to the
// Message interface.
func NewMsgReady() *MsgReady {
return &MsgReady{}
}

View File

@ -11,9 +11,6 @@ import (
) )
const ( const (
// ProtocolVersion is the latest protocol version this package supports.
ProtocolVersion uint32 = 3
// DefaultServices describes the default services that are supported by // DefaultServices describes the default services that are supported by
// the server. // the server.
DefaultServices = SFNodeNetwork | SFNodeBloom | SFNodeCF DefaultServices = SFNodeNetwork | SFNodeBloom | SFNodeCF

View File

@ -1,6 +1,8 @@
package common package common
import ( import (
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -12,3 +14,14 @@ const DefaultTimeout = 120 * time.Second
// ErrPeerWithSameIDExists signifies that a peer with the same ID already exist. // ErrPeerWithSameIDExists signifies that a peer with the same ID already exist.
var ErrPeerWithSameIDExists = errors.New("ready peer with the same ID already exists") var ErrPeerWithSameIDExists = errors.New("ready peer with the same ID already exists")
type flowExecuteFunc func(peer *peerpkg.Peer)
// Flow is a a data structure that is used in order to associate a p2p flow to some route in a router.
type Flow struct {
Name string
ExecuteFunc flowExecuteFunc
}
// FlowInitializeFunc is a function that is used in order to initialize a flow
type FlowInitializeFunc func(route *routerpkg.Route, peer *peerpkg.Peer) error

View File

@ -11,7 +11,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/flows/blockrelay"
) )
// OnNewBlock updates the mempool after a new block arrival, and // OnNewBlock updates the mempool after a new block arrival, and
@ -109,7 +108,7 @@ func (f *FlowContext) broadcastTransactionsAfterBlockAdded(
// SharedRequestedBlocks returns a *blockrelay.SharedRequestedBlocks for sharing // SharedRequestedBlocks returns a *blockrelay.SharedRequestedBlocks for sharing
// data about requested blocks between different peers. // data about requested blocks between different peers.
func (f *FlowContext) SharedRequestedBlocks() *blockrelay.SharedRequestedBlocks { func (f *FlowContext) SharedRequestedBlocks() *SharedRequestedBlocks {
return f.sharedRequestedBlocks return f.sharedRequestedBlocks
} }

View File

@ -10,8 +10,6 @@ import (
"github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/domain"
"github.com/kaspanet/kaspad/app/protocol/flows/blockrelay"
"github.com/kaspanet/kaspad/app/protocol/flows/transactionrelay"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/infrastructure/config" "github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/network/addressmanager" "github.com/kaspanet/kaspad/infrastructure/network/addressmanager"
@ -52,9 +50,9 @@ type FlowContext struct {
onTransactionAddedToMempoolHandler OnTransactionAddedToMempoolHandler onTransactionAddedToMempoolHandler OnTransactionAddedToMempoolHandler
lastRebroadcastTime time.Time lastRebroadcastTime time.Time
sharedRequestedTransactions *transactionrelay.SharedRequestedTransactions sharedRequestedTransactions *SharedRequestedTransactions
sharedRequestedBlocks *blockrelay.SharedRequestedBlocks sharedRequestedBlocks *SharedRequestedBlocks
ibdPeer *peerpkg.Peer ibdPeer *peerpkg.Peer
ibdPeerMutex sync.RWMutex ibdPeerMutex sync.RWMutex
@ -82,8 +80,8 @@ func New(cfg *config.Config, domain domain.Domain, addressManager *addressmanage
domain: domain, domain: domain,
addressManager: addressManager, addressManager: addressManager,
connectionManager: connectionManager, connectionManager: connectionManager,
sharedRequestedTransactions: transactionrelay.NewSharedRequestedTransactions(), sharedRequestedTransactions: NewSharedRequestedTransactions(),
sharedRequestedBlocks: blockrelay.NewSharedRequestedBlocks(), sharedRequestedBlocks: NewSharedRequestedBlocks(),
peers: make(map[id.ID]*peerpkg.Peer), peers: make(map[id.ID]*peerpkg.Peer),
orphans: make(map[externalapi.DomainHash]*externalapi.DomainBlock), orphans: make(map[externalapi.DomainHash]*externalapi.DomainBlock),
timeStarted: mstime.Now().UnixMilliseconds(), timeStarted: mstime.Now().UnixMilliseconds(),

View File

@ -1,4 +1,4 @@
package blockrelay package flowcontext
import ( import (
"sync" "sync"
@ -13,13 +13,15 @@ type SharedRequestedBlocks struct {
sync.Mutex sync.Mutex
} }
func (s *SharedRequestedBlocks) remove(hash *externalapi.DomainHash) { // Remove removes a block from the set.
func (s *SharedRequestedBlocks) Remove(hash *externalapi.DomainHash) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
delete(s.blocks, *hash) delete(s.blocks, *hash)
} }
func (s *SharedRequestedBlocks) removeSet(blockHashes map[externalapi.DomainHash]struct{}) { // RemoveSet removes a set of blocks from the set.
func (s *SharedRequestedBlocks) RemoveSet(blockHashes map[externalapi.DomainHash]struct{}) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
for hash := range blockHashes { for hash := range blockHashes {
@ -27,7 +29,8 @@ func (s *SharedRequestedBlocks) removeSet(blockHashes map[externalapi.DomainHash
} }
} }
func (s *SharedRequestedBlocks) addIfNotExists(hash *externalapi.DomainHash) (exists bool) { // AddIfNotExists adds a block to the set if it doesn't exist yet.
func (s *SharedRequestedBlocks) AddIfNotExists(hash *externalapi.DomainHash) (exists bool) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
_, ok := s.blocks[*hash] _, ok := s.blocks[*hash]

View File

@ -1,4 +1,4 @@
package transactionrelay package flowcontext
import ( import (
"sync" "sync"
@ -13,13 +13,15 @@ type SharedRequestedTransactions struct {
sync.Mutex sync.Mutex
} }
func (s *SharedRequestedTransactions) remove(txID *externalapi.DomainTransactionID) { // Remove removes a transaction from the set.
func (s *SharedRequestedTransactions) Remove(txID *externalapi.DomainTransactionID) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
delete(s.transactions, *txID) delete(s.transactions, *txID)
} }
func (s *SharedRequestedTransactions) removeMany(txIDs []*externalapi.DomainTransactionID) { // RemoveMany removes a set of transactions from the set.
func (s *SharedRequestedTransactions) RemoveMany(txIDs []*externalapi.DomainTransactionID) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
for _, txID := range txIDs { for _, txID := range txIDs {
@ -27,7 +29,8 @@ func (s *SharedRequestedTransactions) removeMany(txIDs []*externalapi.DomainTran
} }
} }
func (s *SharedRequestedTransactions) addIfNotExists(txID *externalapi.DomainTransactionID) (exists bool) { // AddIfNotExists adds a transaction to the set if it doesn't exist yet.
func (s *SharedRequestedTransactions) AddIfNotExists(txID *externalapi.DomainTransactionID) (exists bool) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
_, ok := s.transactions[*txID] _, ok := s.transactions[*txID]

View File

@ -4,7 +4,6 @@ import (
"time" "time"
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/flows/transactionrelay"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing"
) )
@ -30,7 +29,7 @@ func (f *FlowContext) shouldRebroadcastTransactions() bool {
// SharedRequestedTransactions returns a *transactionrelay.SharedRequestedTransactions for sharing // SharedRequestedTransactions returns a *transactionrelay.SharedRequestedTransactions for sharing
// data about requested transactions between different peers. // data about requested transactions between different peers.
func (f *FlowContext) SharedRequestedTransactions() *transactionrelay.SharedRequestedTransactions { func (f *FlowContext) SharedRequestedTransactions() *SharedRequestedTransactions {
return f.sharedRequestedTransactions return f.sharedRequestedTransactions
} }

View File

@ -28,7 +28,7 @@ type HandleHandshakeContext interface {
HandleError(err error, flowName string, isStopping *uint32, errChan chan<- error) HandleError(err error, flowName string, isStopping *uint32, errChan chan<- error)
} }
// HandleHandshake sets up the handshake protocol - It sends a version message and waits for an incoming // HandleHandshake sets up the new_handshake protocol - It sends a version message and waits for an incoming
// version message, as well as a verack for the sent version // version message, as well as a verack for the sent version
func HandleHandshake(context HandleHandshakeContext, netConnection *netadapter.NetConnection, func HandleHandshake(context HandleHandshakeContext, netConnection *netadapter.NetConnection,
receiveVersionRoute *routerpkg.Route, sendVersionRoute *routerpkg.Route, outgoingRoute *routerpkg.Route, receiveVersionRoute *routerpkg.Route, sendVersionRoute *routerpkg.Route, outgoingRoute *routerpkg.Route,
@ -98,7 +98,7 @@ func HandleHandshake(context HandleHandshakeContext, netConnection *netadapter.N
} }
// Handshake is different from other flows, since in it should forward router.ErrRouteClosed to errChan // Handshake is different from other flows, since in it should forward router.ErrRouteClosed to errChan
// Therefore we implement a separate handleError for handshake // Therefore we implement a separate handleError for new_handshake
func handleError(err error, flowName string, isStopping *uint32, errChan chan error) { func handleError(err error, flowName string, isStopping *uint32, errChan chan error) {
if errors.Is(err, routerpkg.ErrRouteClosed) { if errors.Is(err, routerpkg.ErrRouteClosed) {
if atomic.AddUint32(isStopping, 1) == 1 { if atomic.AddUint32(isStopping, 1) == 1 {

View File

@ -7,6 +7,7 @@ import (
"github.com/kaspanet/kaspad/app/protocol/protocolerrors" "github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/infrastructure/logger" "github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
) )
var ( var (
@ -17,7 +18,9 @@ var (
// minAcceptableProtocolVersion is the lowest protocol version that a // minAcceptableProtocolVersion is the lowest protocol version that a
// connected peer may support. // connected peer may support.
minAcceptableProtocolVersion = appmessage.ProtocolVersion minAcceptableProtocolVersion = uint32(3)
maxAcceptableProtocolVersion = uint32(4)
) )
type receiveVersionFlow struct { type receiveVersionFlow struct {
@ -97,7 +100,12 @@ func (flow *receiveVersionFlow) start() (*appmessage.NetAddress, error) {
return nil, protocolerrors.New(false, "incompatible subnetworks") return nil, protocolerrors.New(false, "incompatible subnetworks")
} }
flow.peer.UpdateFieldsFromMsgVersion(msgVersion) if flow.Config().ProtocolVersion > maxAcceptableProtocolVersion {
return nil, errors.Errorf("%d is a non existing protocol version", flow.Config().ProtocolVersion)
}
maxProtocolVersion := flow.Config().ProtocolVersion
flow.peer.UpdateFieldsFromMsgVersion(msgVersion, maxProtocolVersion)
err = flow.outgoingRoute.Enqueue(appmessage.NewMsgVerAck()) err = flow.outgoingRoute.Enqueue(appmessage.NewMsgVerAck())
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -7,6 +7,7 @@ import (
"github.com/kaspanet/kaspad/infrastructure/logger" "github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/version" "github.com/kaspanet/kaspad/version"
"github.com/pkg/errors"
) )
var ( var (
@ -56,15 +57,18 @@ func (flow *sendVersionFlow) start() error {
// Version message. // Version message.
localAddress := flow.AddressManager().BestLocalAddress(flow.peer.Connection().NetAddress()) localAddress := flow.AddressManager().BestLocalAddress(flow.peer.Connection().NetAddress())
subnetworkID := flow.Config().SubnetworkID subnetworkID := flow.Config().SubnetworkID
if flow.Config().ProtocolVersion < minAcceptableProtocolVersion {
return errors.Errorf("configured protocol version %d is obsolete", flow.Config().ProtocolVersion)
}
msg := appmessage.NewMsgVersion(localAddress, flow.NetAdapter().ID(), msg := appmessage.NewMsgVersion(localAddress, flow.NetAdapter().ID(),
flow.Config().ActiveNetParams.Name, subnetworkID) flow.Config().ActiveNetParams.Name, subnetworkID, flow.Config().ProtocolVersion)
msg.AddUserAgent(userAgentName, userAgentVersion, flow.Config().UserAgentComments...) msg.AddUserAgent(userAgentName, userAgentVersion, flow.Config().UserAgentComments...)
// Advertise the services flag // Advertise the services flag
msg.Services = defaultServices msg.Services = defaultServices
// Advertise our max supported protocol version. // Advertise our max supported protocol version.
msg.ProtocolVersion = appmessage.ProtocolVersion msg.ProtocolVersion = flow.Config().ProtocolVersion
// Advertise if inv messages for transactions are desired. // Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = flow.Config().BlocksOnly msg.DisableRelayTx = flow.Config().BlocksOnly

View File

@ -0,0 +1,9 @@
package ready
import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log = logger.RegisterSubSystem("PROT")
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -0,0 +1,56 @@
package ready
import (
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/common"
"sync/atomic"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
)
// HandleReady notify the other peer that peer is ready for messages, and wait for the other peer
// to send a ready message before start running the flows.
func HandleReady(incomingRoute *routerpkg.Route, outgoingRoute *routerpkg.Route,
peer *peerpkg.Peer,
) error {
log.Debugf("Sending ready message to %s", peer)
isStopping := uint32(0)
err := outgoingRoute.Enqueue(appmessage.NewMsgReady())
if err != nil {
return handleError(err, "HandleReady", &isStopping)
}
_, err = incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return handleError(err, "HandleReady", &isStopping)
}
log.Debugf("Got ready message from %s", peer)
return nil
}
// Ready is different from other flows, since in it should forward router.ErrRouteClosed to errChan
// Therefore we implement a separate handleError for 'ready'
func handleError(err error, flowName string, isStopping *uint32) error {
if errors.Is(err, routerpkg.ErrRouteClosed) {
if atomic.AddUint32(isStopping, 1) == 1 {
return err
}
return nil
}
if protocolErr := (protocolerrors.ProtocolError{}); errors.As(err, &protocolErr) {
log.Errorf("Ready protocol error from %s: %s", flowName, err)
if atomic.AddUint32(isStopping, 1) == 1 {
return err
}
return nil
}
panic(err)
}

View File

@ -3,6 +3,7 @@ package blockrelay
import ( import (
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/common" "github.com/kaspanet/kaspad/app/protocol/common"
"github.com/kaspanet/kaspad/app/protocol/flowcontext"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors" "github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/domain"
@ -26,7 +27,7 @@ type RelayInvsContext interface {
OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error
OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error
OnPruningPointUTXOSetOverride() error OnPruningPointUTXOSetOverride() error
SharedRequestedBlocks() *SharedRequestedBlocks SharedRequestedBlocks() *flowcontext.SharedRequestedBlocks
Broadcast(message appmessage.Message) error Broadcast(message appmessage.Message) error
AddOrphan(orphanBlock *externalapi.DomainBlock) AddOrphan(orphanBlock *externalapi.DomainBlock)
GetOrphanRoots(orphanHash *externalapi.DomainHash) ([]*externalapi.DomainHash, bool, error) GetOrphanRoots(orphanHash *externalapi.DomainHash) ([]*externalapi.DomainHash, bool, error)
@ -194,14 +195,14 @@ func (flow *handleRelayInvsFlow) readInv() (*appmessage.MsgInvRelayBlock, error)
} }
func (flow *handleRelayInvsFlow) requestBlock(requestHash *externalapi.DomainHash) (*externalapi.DomainBlock, bool, error) { func (flow *handleRelayInvsFlow) requestBlock(requestHash *externalapi.DomainHash) (*externalapi.DomainBlock, bool, error) {
exists := flow.SharedRequestedBlocks().addIfNotExists(requestHash) exists := flow.SharedRequestedBlocks().AddIfNotExists(requestHash)
if exists { if exists {
return nil, true, nil return nil, true, nil
} }
// In case the function returns earlier than expected, we want to make sure flow.SharedRequestedBlocks() is // In case the function returns earlier than expected, we want to make sure flow.SharedRequestedBlocks() is
// clean from any pending blocks. // clean from any pending blocks.
defer flow.SharedRequestedBlocks().remove(requestHash) defer flow.SharedRequestedBlocks().Remove(requestHash)
getRelayBlocksMsg := appmessage.NewMsgRequestRelayBlocks([]*externalapi.DomainHash{requestHash}) getRelayBlocksMsg := appmessage.NewMsgRequestRelayBlocks([]*externalapi.DomainHash{requestHash})
err := flow.outgoingRoute.Enqueue(getRelayBlocksMsg) err := flow.outgoingRoute.Enqueue(getRelayBlocksMsg)

View File

@ -0,0 +1,179 @@
package v3
import (
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/common"
"github.com/kaspanet/kaspad/app/protocol/flowcontext"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/addressexchange"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/blockrelay"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/ping"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/rejects"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/transactionrelay"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
)
type protocolManager interface {
RegisterFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand, isStopping *uint32,
errChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow
RegisterOneTimeFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand,
isStopping *uint32, stopChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow
RegisterFlowWithCapacity(name string, capacity int, router *routerpkg.Router,
messageTypes []appmessage.MessageCommand, isStopping *uint32,
errChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow
Context() *flowcontext.FlowContext
}
// Register is used in order to register all the protocol flows to the given router.
func Register(m protocolManager, router *routerpkg.Router, errChan chan error, isStopping *uint32) (flows []*common.Flow) {
flows = registerAddressFlows(m, router, isStopping, errChan)
flows = append(flows, registerBlockRelayFlows(m, router, isStopping, errChan)...)
flows = append(flows, registerPingFlows(m, router, isStopping, errChan)...)
flows = append(flows, registerTransactionRelayFlow(m, router, isStopping, errChan)...)
flows = append(flows, registerRejectsFlow(m, router, isStopping, errChan)...)
return flows
}
func registerAddressFlows(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
m.RegisterOneTimeFlow("ReceiveAddresses", router, []appmessage.MessageCommand{appmessage.CmdAddresses}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return addressexchange.ReceiveAddresses(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
}
}
func registerBlockRelayFlows(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
m.RegisterOneTimeFlow("SendVirtualSelectedParentInv", router, []appmessage.MessageCommand{},
isStopping, errChan, func(route *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.SendVirtualSelectedParentInv(m.Context(), outgoingRoute, peer)
}),
m.RegisterFlow("HandleRelayInvs", router, []appmessage.MessageCommand{
appmessage.CmdInvRelayBlock, appmessage.CmdBlock, appmessage.CmdBlockLocator,
appmessage.CmdDoneHeaders, appmessage.CmdUnexpectedPruningPoint, appmessage.CmdPruningPointUTXOSetChunk,
appmessage.CmdBlockHeaders, appmessage.CmdIBDBlockLocatorHighestHash, appmessage.CmdBlockWithTrustedData,
appmessage.CmdDoneBlocksWithTrustedData, appmessage.CmdIBDBlockLocatorHighestHashNotFound,
appmessage.CmdDonePruningPointUTXOSetChunks, appmessage.CmdIBDBlock, appmessage.CmdPruningPoints,
appmessage.CmdPruningPointProof,
},
isStopping, errChan, func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRelayInvs(m.Context(), incomingRoute,
outgoingRoute, peer)
},
),
m.RegisterFlow("HandleRelayBlockRequests", router, []appmessage.MessageCommand{appmessage.CmdRequestRelayBlocks}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRelayBlockRequests(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
m.RegisterFlow("HandleRequestBlockLocator", router,
[]appmessage.MessageCommand{appmessage.CmdRequestBlockLocator}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestBlockLocator(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("HandleRequestHeaders", router,
[]appmessage.MessageCommand{appmessage.CmdRequestHeaders, appmessage.CmdRequestNextHeaders}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestHeaders(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
m.RegisterFlow("HandleIBDBlockRequests", router,
[]appmessage.MessageCommand{appmessage.CmdRequestIBDBlocks}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleIBDBlockRequests(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("HandleRequestPruningPointUTXOSet", router,
[]appmessage.MessageCommand{appmessage.CmdRequestPruningPointUTXOSet,
appmessage.CmdRequestNextPruningPointUTXOSetChunk}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestPruningPointUTXOSet(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("HandlePruningPointAndItsAnticoneRequests", router,
[]appmessage.MessageCommand{appmessage.CmdRequestPruningPointAndItsAnticone}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandlePruningPointAndItsAnticoneRequests(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
m.RegisterFlow("HandleIBDBlockLocator", router,
[]appmessage.MessageCommand{appmessage.CmdIBDBlockLocator}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleIBDBlockLocator(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
m.RegisterFlow("HandlePruningPointProofRequests", router,
[]appmessage.MessageCommand{appmessage.CmdRequestPruningPointProof}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandlePruningPointProofRequests(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
}
}
func registerPingFlows(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
m.RegisterFlow("ReceivePings", router, []appmessage.MessageCommand{appmessage.CmdPing}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return ping.ReceivePings(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("SendPings", router, []appmessage.MessageCommand{appmessage.CmdPong}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return ping.SendPings(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
}
}
func registerTransactionRelayFlow(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
m.RegisterFlowWithCapacity("HandleRelayedTransactions", 10_000, router,
[]appmessage.MessageCommand{appmessage.CmdInvTransaction, appmessage.CmdTx, appmessage.CmdTransactionNotFound}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return transactionrelay.HandleRelayedTransactions(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("HandleRequestTransactions", router,
[]appmessage.MessageCommand{appmessage.CmdRequestTransactions}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return transactionrelay.HandleRequestedTransactions(m.Context(), incomingRoute, outgoingRoute)
},
),
}
}
func registerRejectsFlow(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
m.RegisterFlow("HandleRejects", router,
[]appmessage.MessageCommand{appmessage.CmdReject}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return rejects.HandleRejects(m.Context(), incomingRoute, outgoingRoute)
},
),
}
}

View File

@ -5,7 +5,7 @@ import (
"time" "time"
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/flows/addressexchange" "github.com/kaspanet/kaspad/app/protocol/flows/v3/addressexchange"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/domain/consensus" "github.com/kaspanet/kaspad/domain/consensus"
"github.com/kaspanet/kaspad/domain/consensus/utils/testutils" "github.com/kaspanet/kaspad/domain/consensus/utils/testutils"

View File

@ -3,6 +3,7 @@ package transactionrelay
import ( import (
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/common" "github.com/kaspanet/kaspad/app/protocol/common"
"github.com/kaspanet/kaspad/app/protocol/flowcontext"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors" "github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/domain"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
@ -18,7 +19,7 @@ import (
type TransactionsRelayContext interface { type TransactionsRelayContext interface {
NetAdapter() *netadapter.NetAdapter NetAdapter() *netadapter.NetAdapter
Domain() domain.Domain Domain() domain.Domain
SharedRequestedTransactions() *SharedRequestedTransactions SharedRequestedTransactions() *flowcontext.SharedRequestedTransactions
OnTransactionAddedToMempool() OnTransactionAddedToMempool()
EnqueueTransactionIDsForPropagation(transactionIDs []*externalapi.DomainTransactionID) error EnqueueTransactionIDsForPropagation(transactionIDs []*externalapi.DomainTransactionID) error
} }
@ -68,7 +69,7 @@ func (flow *handleRelayedTransactionsFlow) requestInvTransactions(
if flow.isKnownTransaction(txID) { if flow.isKnownTransaction(txID) {
continue continue
} }
exists := flow.SharedRequestedTransactions().addIfNotExists(txID) exists := flow.SharedRequestedTransactions().AddIfNotExists(txID)
if exists { if exists {
continue continue
} }
@ -82,7 +83,7 @@ func (flow *handleRelayedTransactionsFlow) requestInvTransactions(
msgGetTransactions := appmessage.NewMsgRequestTransactions(idsToRequest) msgGetTransactions := appmessage.NewMsgRequestTransactions(idsToRequest)
err = flow.outgoingRoute.Enqueue(msgGetTransactions) err = flow.outgoingRoute.Enqueue(msgGetTransactions)
if err != nil { if err != nil {
flow.SharedRequestedTransactions().removeMany(idsToRequest) flow.SharedRequestedTransactions().RemoveMany(idsToRequest)
return nil, err return nil, err
} }
return idsToRequest, nil return idsToRequest, nil
@ -151,7 +152,7 @@ func (flow *handleRelayedTransactionsFlow) readMsgTxOrNotFound() (
func (flow *handleRelayedTransactionsFlow) receiveTransactions(requestedTransactions []*externalapi.DomainTransactionID) error { func (flow *handleRelayedTransactionsFlow) receiveTransactions(requestedTransactions []*externalapi.DomainTransactionID) error {
// In case the function returns earlier than expected, we want to make sure sharedRequestedTransactions is // In case the function returns earlier than expected, we want to make sure sharedRequestedTransactions is
// clean from any pending transactions. // clean from any pending transactions.
defer flow.SharedRequestedTransactions().removeMany(requestedTransactions) defer flow.SharedRequestedTransactions().RemoveMany(requestedTransactions)
for _, expectedID := range requestedTransactions { for _, expectedID := range requestedTransactions {
msgTx, msgTxNotFound, err := flow.readMsgTxOrNotFound() msgTx, msgTxNotFound, err := flow.readMsgTxOrNotFound()
if err != nil { if err != nil {

View File

@ -2,10 +2,11 @@ package transactionrelay_test
import ( import (
"errors" "errors"
"github.com/kaspanet/kaspad/app/protocol/flowcontext"
"strings" "strings"
"testing" "testing"
"github.com/kaspanet/kaspad/app/protocol/flows/transactionrelay" "github.com/kaspanet/kaspad/app/protocol/flows/v3/transactionrelay"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors" "github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/domain"
"github.com/kaspanet/kaspad/domain/consensus" "github.com/kaspanet/kaspad/domain/consensus"
@ -24,7 +25,7 @@ import (
type mocTransactionsRelayContext struct { type mocTransactionsRelayContext struct {
netAdapter *netadapter.NetAdapter netAdapter *netadapter.NetAdapter
domain domain.Domain domain domain.Domain
sharedRequestedTransactions *transactionrelay.SharedRequestedTransactions sharedRequestedTransactions *flowcontext.SharedRequestedTransactions
} }
func (m *mocTransactionsRelayContext) NetAdapter() *netadapter.NetAdapter { func (m *mocTransactionsRelayContext) NetAdapter() *netadapter.NetAdapter {
@ -35,7 +36,7 @@ func (m *mocTransactionsRelayContext) Domain() domain.Domain {
return m.domain return m.domain
} }
func (m *mocTransactionsRelayContext) SharedRequestedTransactions() *transactionrelay.SharedRequestedTransactions { func (m *mocTransactionsRelayContext) SharedRequestedTransactions() *flowcontext.SharedRequestedTransactions {
return m.sharedRequestedTransactions return m.sharedRequestedTransactions
} }
@ -60,7 +61,7 @@ func TestHandleRelayedTransactionsNotFound(t *testing.T) {
} }
defer teardown(false) defer teardown(false)
sharedRequestedTransactions := transactionrelay.NewSharedRequestedTransactions() sharedRequestedTransactions := flowcontext.NewSharedRequestedTransactions()
adapter, err := netadapter.NewNetAdapter(config.DefaultConfig()) adapter, err := netadapter.NewNetAdapter(config.DefaultConfig())
if err != nil { if err != nil {
t.Fatalf("Failed to create a NetAdapter: %v", err) t.Fatalf("Failed to create a NetAdapter: %v", err)
@ -153,7 +154,7 @@ func TestOnClosedIncomingRoute(t *testing.T) {
} }
defer teardown(false) defer teardown(false)
sharedRequestedTransactions := transactionrelay.NewSharedRequestedTransactions() sharedRequestedTransactions := flowcontext.NewSharedRequestedTransactions()
adapter, err := netadapter.NewNetAdapter(config.DefaultConfig()) adapter, err := netadapter.NewNetAdapter(config.DefaultConfig())
if err != nil { if err != nil {
t.Fatalf("Failed to creat a NetAdapter : %v", err) t.Fatalf("Failed to creat a NetAdapter : %v", err)

View File

@ -1,10 +1,11 @@
package transactionrelay_test package transactionrelay_test
import ( import (
"github.com/kaspanet/kaspad/app/protocol/flowcontext"
"testing" "testing"
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/flows/transactionrelay" "github.com/kaspanet/kaspad/app/protocol/flows/v3/transactionrelay"
"github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/domain"
"github.com/kaspanet/kaspad/domain/consensus" "github.com/kaspanet/kaspad/domain/consensus"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
@ -31,7 +32,7 @@ func TestHandleRequestedTransactionsNotFound(t *testing.T) {
} }
defer teardown(false) defer teardown(false)
sharedRequestedTransactions := transactionrelay.NewSharedRequestedTransactions() sharedRequestedTransactions := flowcontext.NewSharedRequestedTransactions()
adapter, err := netadapter.NewNetAdapter(config.DefaultConfig()) adapter, err := netadapter.NewNetAdapter(config.DefaultConfig())
if err != nil { if err != nil {
t.Fatalf("Failed to create a NetAdapter: %v", err) t.Fatalf("Failed to create a NetAdapter: %v", err)

View File

@ -0,0 +1,187 @@
package v4
import (
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/common"
"github.com/kaspanet/kaspad/app/protocol/flowcontext"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/addressexchange"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/blockrelay"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/ping"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/rejects"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/transactionrelay"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
)
type protocolManager interface {
RegisterFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand, isStopping *uint32,
errChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow
RegisterOneTimeFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand,
isStopping *uint32, stopChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow
RegisterFlowWithCapacity(name string, capacity int, router *routerpkg.Router,
messageTypes []appmessage.MessageCommand, isStopping *uint32,
errChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow
Context() *flowcontext.FlowContext
}
// Register is used in order to register all the protocol flows to the given router.
func Register(m protocolManager, router *routerpkg.Router, errChan chan error, isStopping *uint32) (flows []*common.Flow) {
flows = registerAddressFlows(m, router, isStopping, errChan)
flows = append(flows, registerBlockRelayFlows(m, router, isStopping, errChan)...)
flows = append(flows, registerPingFlows(m, router, isStopping, errChan)...)
flows = append(flows, registerTransactionRelayFlow(m, router, isStopping, errChan)...)
flows = append(flows, registerRejectsFlow(m, router, isStopping, errChan)...)
return flows
}
func registerAddressFlows(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
// TODO: This code was moved to the upper level to prevent a race condition when connecting to v3 peers. This should be uncommented
// and removed from the upper level once v3 is obsolete.
//m.RegisterFlow("SendAddresses", router, []appmessage.MessageCommand{appmessage.CmdRequestAddresses}, isStopping, errChan,
// func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
// return addressexchange.SendAddresses(m.Context(), incomingRoute, outgoingRoute)
// },
//),
m.RegisterOneTimeFlow("ReceiveAddresses", router, []appmessage.MessageCommand{appmessage.CmdAddresses}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return addressexchange.ReceiveAddresses(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
}
}
func registerBlockRelayFlows(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
m.RegisterOneTimeFlow("SendVirtualSelectedParentInv", router, []appmessage.MessageCommand{},
isStopping, errChan, func(route *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.SendVirtualSelectedParentInv(m.Context(), outgoingRoute, peer)
}),
m.RegisterFlow("HandleRelayInvs", router, []appmessage.MessageCommand{
appmessage.CmdInvRelayBlock, appmessage.CmdBlock, appmessage.CmdBlockLocator,
appmessage.CmdDoneHeaders, appmessage.CmdUnexpectedPruningPoint, appmessage.CmdPruningPointUTXOSetChunk,
appmessage.CmdBlockHeaders, appmessage.CmdIBDBlockLocatorHighestHash, appmessage.CmdBlockWithTrustedData,
appmessage.CmdDoneBlocksWithTrustedData, appmessage.CmdIBDBlockLocatorHighestHashNotFound,
appmessage.CmdDonePruningPointUTXOSetChunks, appmessage.CmdIBDBlock, appmessage.CmdPruningPoints,
appmessage.CmdPruningPointProof,
},
isStopping, errChan, func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRelayInvs(m.Context(), incomingRoute,
outgoingRoute, peer)
},
),
m.RegisterFlow("HandleRelayBlockRequests", router, []appmessage.MessageCommand{appmessage.CmdRequestRelayBlocks}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRelayBlockRequests(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
m.RegisterFlow("HandleRequestBlockLocator", router,
[]appmessage.MessageCommand{appmessage.CmdRequestBlockLocator}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestBlockLocator(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("HandleRequestHeaders", router,
[]appmessage.MessageCommand{appmessage.CmdRequestHeaders, appmessage.CmdRequestNextHeaders}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestHeaders(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
m.RegisterFlow("HandleIBDBlockRequests", router,
[]appmessage.MessageCommand{appmessage.CmdRequestIBDBlocks}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleIBDBlockRequests(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("HandleRequestPruningPointUTXOSet", router,
[]appmessage.MessageCommand{appmessage.CmdRequestPruningPointUTXOSet,
appmessage.CmdRequestNextPruningPointUTXOSetChunk}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestPruningPointUTXOSet(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("HandlePruningPointAndItsAnticoneRequests", router,
[]appmessage.MessageCommand{appmessage.CmdRequestPruningPointAndItsAnticone}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandlePruningPointAndItsAnticoneRequests(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
m.RegisterFlow("HandleIBDBlockLocator", router,
[]appmessage.MessageCommand{appmessage.CmdIBDBlockLocator}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleIBDBlockLocator(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
m.RegisterFlow("HandlePruningPointProofRequests", router,
[]appmessage.MessageCommand{appmessage.CmdRequestPruningPointProof}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandlePruningPointProofRequests(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
}
}
func registerPingFlows(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
m.RegisterFlow("ReceivePings", router, []appmessage.MessageCommand{appmessage.CmdPing}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return ping.ReceivePings(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("SendPings", router, []appmessage.MessageCommand{appmessage.CmdPong}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return ping.SendPings(m.Context(), incomingRoute, outgoingRoute, peer)
},
),
}
}
func registerTransactionRelayFlow(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
m.RegisterFlowWithCapacity("HandleRelayedTransactions", 10_000, router,
[]appmessage.MessageCommand{appmessage.CmdInvTransaction, appmessage.CmdTx, appmessage.CmdTransactionNotFound}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return transactionrelay.HandleRelayedTransactions(m.Context(), incomingRoute, outgoingRoute)
},
),
m.RegisterFlow("HandleRequestTransactions", router,
[]appmessage.MessageCommand{appmessage.CmdRequestTransactions}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return transactionrelay.HandleRequestedTransactions(m.Context(), incomingRoute, outgoingRoute)
},
),
}
}
func registerRejectsFlow(m protocolManager, router *routerpkg.Router, isStopping *uint32, errChan chan error) []*common.Flow {
outgoingRoute := router.OutgoingRoute()
return []*common.Flow{
m.RegisterFlow("HandleRejects", router,
[]appmessage.MessageCommand{appmessage.CmdReject}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return rejects.HandleRejects(m.Context(), incomingRoute, outgoingRoute)
},
),
}
}

View File

@ -2,6 +2,7 @@ package protocol
import ( import (
"fmt" "fmt"
"github.com/kaspanet/kaspad/app/protocol/common"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -71,11 +72,16 @@ func (m *Manager) AddBlock(block *externalapi.DomainBlock) error {
return m.context.AddBlock(block) return m.context.AddBlock(block)
} }
func (m *Manager) runFlows(flows []*flow, peer *peerpkg.Peer, errChan <-chan error, flowsWaitGroup *sync.WaitGroup) error { // Context returns the manager's flow context
func (m *Manager) Context() *flowcontext.FlowContext {
return m.context
}
func (m *Manager) runFlows(flows []*common.Flow, peer *peerpkg.Peer, errChan <-chan error, flowsWaitGroup *sync.WaitGroup) error {
flowsWaitGroup.Add(len(flows)) flowsWaitGroup.Add(len(flows))
for _, flow := range flows { for _, flow := range flows {
executeFunc := flow.executeFunc // extract to new variable so that it's not overwritten executeFunc := flow.ExecuteFunc // extract to new variable so that it's not overwritten
spawn(fmt.Sprintf("flow-%s", flow.name), func() { spawn(fmt.Sprintf("flow-%s", flow.Name), func() {
executeFunc(peer) executeFunc(peer)
flowsWaitGroup.Done() flowsWaitGroup.Done()
}) })

View File

@ -13,10 +13,6 @@ import (
"github.com/kaspanet/kaspad/util/mstime" "github.com/kaspanet/kaspad/util/mstime"
) )
// maxProtocolVersion version is the maximum supported protocol
// version this kaspad node supports
const maxProtocolVersion = 3
// Peer holds data about a peer. // Peer holds data about a peer.
type Peer struct { type Peer struct {
connection *netadapter.NetConnection connection *netadapter.NetConnection
@ -76,6 +72,11 @@ func (p *Peer) AdvertisedProtocolVersion() uint32 {
return p.advertisedProtocolVerion return p.advertisedProtocolVerion
} }
// ProtocolVersion returns the protocol version which is used when communicating with the peer.
func (p *Peer) ProtocolVersion() uint32 {
return p.protocolVersion
}
// TimeConnected returns the time since the connection to this been has been started. // TimeConnected returns the time since the connection to this been has been started.
func (p *Peer) TimeConnected() time.Duration { func (p *Peer) TimeConnected() time.Duration {
return time.Since(p.connectionStarted) return time.Since(p.connectionStarted)
@ -87,7 +88,7 @@ func (p *Peer) IsOutbound() bool {
} }
// UpdateFieldsFromMsgVersion updates the peer with the data from the version message. // UpdateFieldsFromMsgVersion updates the peer with the data from the version message.
func (p *Peer) UpdateFieldsFromMsgVersion(msg *appmessage.MsgVersion) { func (p *Peer) UpdateFieldsFromMsgVersion(msg *appmessage.MsgVersion, maxProtocolVersion uint32) {
// Negotiate the protocol version. // Negotiate the protocol version.
p.advertisedProtocolVerion = msg.ProtocolVersion p.advertisedProtocolVerion = msg.ProtocolVersion
p.protocolVersion = mathUtil.MinUint32(maxProtocolVersion, p.advertisedProtocolVerion) p.protocolVersion = mathUtil.MinUint32(maxProtocolVersion, p.advertisedProtocolVerion)

View File

@ -1,16 +1,16 @@
package protocol package protocol
import ( import (
"github.com/kaspanet/kaspad/app/protocol/common"
"github.com/kaspanet/kaspad/app/protocol/flows/ready"
v3 "github.com/kaspanet/kaspad/app/protocol/flows/v3"
"github.com/kaspanet/kaspad/app/protocol/flows/v3/addressexchange"
v4 "github.com/kaspanet/kaspad/app/protocol/flows/v4"
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/flows/addressexchange"
"github.com/kaspanet/kaspad/app/protocol/flows/blockrelay"
"github.com/kaspanet/kaspad/app/protocol/flows/handshake" "github.com/kaspanet/kaspad/app/protocol/flows/handshake"
"github.com/kaspanet/kaspad/app/protocol/flows/ping"
"github.com/kaspanet/kaspad/app/protocol/flows/rejects"
"github.com/kaspanet/kaspad/app/protocol/flows/transactionrelay"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors" "github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/infrastructure/network/addressmanager" "github.com/kaspanet/kaspad/infrastructure/network/addressmanager"
@ -20,14 +20,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
type flowInitializeFunc func(route *routerpkg.Route, peer *peerpkg.Peer) error
type flowExecuteFunc func(peer *peerpkg.Peer)
type flow struct {
name string
executeFunc flowExecuteFunc
}
func (m *Manager) routerInitializer(router *routerpkg.Router, netConnection *netadapter.NetConnection) { func (m *Manager) routerInitializer(router *routerpkg.Router, netConnection *netadapter.NetConnection) {
// isStopping flag is raised the moment that the connection associated with this router is disconnected // isStopping flag is raised the moment that the connection associated with this router is disconnected
// errChan is used by the flow goroutines to return to runFlows when an error occurs. // errChan is used by the flow goroutines to return to runFlows when an error occurs.
@ -35,8 +27,7 @@ func (m *Manager) routerInitializer(router *routerpkg.Router, netConnection *net
isStopping := uint32(0) isStopping := uint32(0)
errChan := make(chan error) errChan := make(chan error)
flows := m.registerFlows(router, errChan, &isStopping) receiveVersionRoute, sendVersionRoute, receiveReadyRoute := registerHandshakeRoutes(router)
receiveVersionRoute, sendVersionRoute := registerHandshakeRoutes(router)
// After flows were registered - spawn a new thread that will wait for connection to finish initializing // After flows were registered - spawn a new thread that will wait for connection to finish initializing
// and start receiving messages // and start receiving messages
@ -64,6 +55,14 @@ func (m *Manager) routerInitializer(router *routerpkg.Router, netConnection *net
} }
}) })
// TODO: This code was moved here to prevent a race condition when connecting to v3 peers. This should be moved to v4.registerAddressFlows
// once v3 is obsolete.
sendAddressesFlow := m.RegisterFlow("SendAddresses", router, []appmessage.MessageCommand{appmessage.CmdRequestAddresses}, &isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return addressexchange.SendAddresses(m.Context(), incomingRoute, router.OutgoingRoute())
},
)
peer, err := handshake.HandleHandshake(m.context, netConnection, receiveVersionRoute, peer, err := handshake.HandleHandshake(m.context, netConnection, receiveVersionRoute,
sendVersionRoute, router.OutgoingRoute()) sendVersionRoute, router.OutgoingRoute())
@ -84,6 +83,26 @@ func (m *Manager) routerInitializer(router *routerpkg.Router, netConnection *net
} }
defer m.context.RemoveFromPeers(peer) defer m.context.RemoveFromPeers(peer)
var flows []*common.Flow
log.Infof("Registering p2p flows for peer %s for protocol version %d", peer, peer.ProtocolVersion())
switch peer.ProtocolVersion() {
case 3:
flows = v3.Register(m, router, errChan, &isStopping)
case 4:
flows = v4.Register(m, router, errChan, &isStopping)
default:
panic(errors.Errorf("no way to handle protocol version %d", peer.ProtocolVersion()))
}
flows = append(flows, sendAddressesFlow)
if peer.ProtocolVersion() > 3 {
err = ready.HandleReady(receiveReadyRoute, router.OutgoingRoute(), peer)
if err != nil {
m.handleError(err, netConnection, router.OutgoingRoute())
return
}
}
removeHandshakeRoutes(router) removeHandshakeRoutes(router)
flowsWaitGroup := &sync.WaitGroup{} flowsWaitGroup := &sync.WaitGroup{}
@ -130,167 +149,9 @@ func (m *Manager) handleError(err error, netConnection *netadapter.NetConnection
panic(err) panic(err)
} }
func (m *Manager) registerFlows(router *routerpkg.Router, errChan chan error, isStopping *uint32) (flows []*flow) { // RegisterFlow registers a flow to the given router.
flows = m.registerAddressFlows(router, isStopping, errChan) func (m *Manager) RegisterFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand, isStopping *uint32,
flows = append(flows, m.registerBlockRelayFlows(router, isStopping, errChan)...) errChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow {
flows = append(flows, m.registerPingFlows(router, isStopping, errChan)...)
flows = append(flows, m.registerTransactionRelayFlow(router, isStopping, errChan)...)
flows = append(flows, m.registerRejectsFlow(router, isStopping, errChan)...)
return flows
}
func (m *Manager) registerAddressFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
outgoingRoute := router.OutgoingRoute()
return []*flow{
m.registerFlow("SendAddresses", router, []appmessage.MessageCommand{appmessage.CmdRequestAddresses}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return addressexchange.SendAddresses(m.context, incomingRoute, outgoingRoute)
},
),
m.registerOneTimeFlow("ReceiveAddresses", router, []appmessage.MessageCommand{appmessage.CmdAddresses}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return addressexchange.ReceiveAddresses(m.context, incomingRoute, outgoingRoute, peer)
},
),
}
}
func (m *Manager) registerBlockRelayFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
outgoingRoute := router.OutgoingRoute()
return []*flow{
m.registerOneTimeFlow("SendVirtualSelectedParentInv", router, []appmessage.MessageCommand{},
isStopping, errChan, func(route *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.SendVirtualSelectedParentInv(m.context, outgoingRoute, peer)
}),
m.registerFlow("HandleRelayInvs", router, []appmessage.MessageCommand{
appmessage.CmdInvRelayBlock, appmessage.CmdBlock, appmessage.CmdBlockLocator,
appmessage.CmdDoneHeaders, appmessage.CmdUnexpectedPruningPoint, appmessage.CmdPruningPointUTXOSetChunk,
appmessage.CmdBlockHeaders, appmessage.CmdIBDBlockLocatorHighestHash, appmessage.CmdBlockWithTrustedData,
appmessage.CmdDoneBlocksWithTrustedData, appmessage.CmdIBDBlockLocatorHighestHashNotFound,
appmessage.CmdDonePruningPointUTXOSetChunks, appmessage.CmdIBDBlock, appmessage.CmdPruningPoints,
appmessage.CmdPruningPointProof,
},
isStopping, errChan, func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRelayInvs(m.context, incomingRoute,
outgoingRoute, peer)
},
),
m.registerFlow("HandleRelayBlockRequests", router, []appmessage.MessageCommand{appmessage.CmdRequestRelayBlocks}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRelayBlockRequests(m.context, incomingRoute, outgoingRoute, peer)
},
),
m.registerFlow("HandleRequestBlockLocator", router,
[]appmessage.MessageCommand{appmessage.CmdRequestBlockLocator}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestBlockLocator(m.context, incomingRoute, outgoingRoute)
},
),
m.registerFlow("HandleRequestHeaders", router,
[]appmessage.MessageCommand{appmessage.CmdRequestHeaders, appmessage.CmdRequestNextHeaders}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestHeaders(m.context, incomingRoute, outgoingRoute, peer)
},
),
m.registerFlow("HandleIBDBlockRequests", router,
[]appmessage.MessageCommand{appmessage.CmdRequestIBDBlocks}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleIBDBlockRequests(m.context, incomingRoute, outgoingRoute)
},
),
m.registerFlow("HandleRequestPruningPointUTXOSet", router,
[]appmessage.MessageCommand{appmessage.CmdRequestPruningPointUTXOSet,
appmessage.CmdRequestNextPruningPointUTXOSetChunk}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleRequestPruningPointUTXOSet(m.context, incomingRoute, outgoingRoute)
},
),
m.registerFlow("HandlePruningPointAndItsAnticoneRequests", router,
[]appmessage.MessageCommand{appmessage.CmdRequestPruningPointAndItsAnticone}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandlePruningPointAndItsAnticoneRequests(m.context, incomingRoute, outgoingRoute, peer)
},
),
m.registerFlow("HandleIBDBlockLocator", router,
[]appmessage.MessageCommand{appmessage.CmdIBDBlockLocator}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandleIBDBlockLocator(m.context, incomingRoute, outgoingRoute, peer)
},
),
m.registerFlow("HandlePruningPointProofRequests", router,
[]appmessage.MessageCommand{appmessage.CmdRequestPruningPointProof}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return blockrelay.HandlePruningPointProofRequests(m.context, incomingRoute, outgoingRoute, peer)
},
),
}
}
func (m *Manager) registerPingFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
outgoingRoute := router.OutgoingRoute()
return []*flow{
m.registerFlow("ReceivePings", router, []appmessage.MessageCommand{appmessage.CmdPing}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return ping.ReceivePings(m.context, incomingRoute, outgoingRoute)
},
),
m.registerFlow("SendPings", router, []appmessage.MessageCommand{appmessage.CmdPong}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return ping.SendPings(m.context, incomingRoute, outgoingRoute, peer)
},
),
}
}
func (m *Manager) registerTransactionRelayFlow(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
outgoingRoute := router.OutgoingRoute()
return []*flow{
m.registerFlowWithCapacity("HandleRelayedTransactions", 10_000, router,
[]appmessage.MessageCommand{appmessage.CmdInvTransaction, appmessage.CmdTx, appmessage.CmdTransactionNotFound}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return transactionrelay.HandleRelayedTransactions(m.context, incomingRoute, outgoingRoute)
},
),
m.registerFlow("HandleRequestTransactions", router,
[]appmessage.MessageCommand{appmessage.CmdRequestTransactions}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return transactionrelay.HandleRequestedTransactions(m.context, incomingRoute, outgoingRoute)
},
),
}
}
func (m *Manager) registerRejectsFlow(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
outgoingRoute := router.OutgoingRoute()
return []*flow{
m.registerFlow("HandleRejects", router,
[]appmessage.MessageCommand{appmessage.CmdReject}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return rejects.HandleRejects(m.context, incomingRoute, outgoingRoute)
},
),
}
}
func (m *Manager) registerFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand, isStopping *uint32,
errChan chan error, initializeFunc flowInitializeFunc) *flow {
route, err := router.AddIncomingRoute(name, messageTypes) route, err := router.AddIncomingRoute(name, messageTypes)
if err != nil { if err != nil {
@ -300,9 +161,10 @@ func (m *Manager) registerFlow(name string, router *routerpkg.Router, messageTyp
return m.registerFlowForRoute(route, name, isStopping, errChan, initializeFunc) return m.registerFlowForRoute(route, name, isStopping, errChan, initializeFunc)
} }
func (m *Manager) registerFlowWithCapacity(name string, capacity int, router *routerpkg.Router, // RegisterFlowWithCapacity registers a flow to the given router with a custom capacity.
func (m *Manager) RegisterFlowWithCapacity(name string, capacity int, router *routerpkg.Router,
messageTypes []appmessage.MessageCommand, isStopping *uint32, messageTypes []appmessage.MessageCommand, isStopping *uint32,
errChan chan error, initializeFunc flowInitializeFunc) *flow { errChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow {
route, err := router.AddIncomingRouteWithCapacity(name, capacity, messageTypes) route, err := router.AddIncomingRouteWithCapacity(name, capacity, messageTypes)
if err != nil { if err != nil {
@ -313,11 +175,11 @@ func (m *Manager) registerFlowWithCapacity(name string, capacity int, router *ro
} }
func (m *Manager) registerFlowForRoute(route *routerpkg.Route, name string, isStopping *uint32, func (m *Manager) registerFlowForRoute(route *routerpkg.Route, name string, isStopping *uint32,
errChan chan error, initializeFunc flowInitializeFunc) *flow { errChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow {
return &flow{ return &common.Flow{
name: name, Name: name,
executeFunc: func(peer *peerpkg.Peer) { ExecuteFunc: func(peer *peerpkg.Peer) {
err := initializeFunc(route, peer) err := initializeFunc(route, peer)
if err != nil { if err != nil {
m.context.HandleError(err, name, isStopping, errChan) m.context.HandleError(err, name, isStopping, errChan)
@ -327,17 +189,18 @@ func (m *Manager) registerFlowForRoute(route *routerpkg.Route, name string, isSt
} }
} }
func (m *Manager) registerOneTimeFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand, // RegisterOneTimeFlow registers a one-time flow (that exits once some operations are done) to the given router.
isStopping *uint32, stopChan chan error, initializeFunc flowInitializeFunc) *flow { func (m *Manager) RegisterOneTimeFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand,
isStopping *uint32, stopChan chan error, initializeFunc common.FlowInitializeFunc) *common.Flow {
route, err := router.AddIncomingRoute(name, messageTypes) route, err := router.AddIncomingRoute(name, messageTypes)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return &flow{ return &common.Flow{
name: name, Name: name,
executeFunc: func(peer *peerpkg.Peer) { ExecuteFunc: func(peer *peerpkg.Peer) {
defer func() { defer func() {
err := router.RemoveRoute(messageTypes) err := router.RemoveRoute(messageTypes)
if err != nil { if err != nil {
@ -355,7 +218,7 @@ func (m *Manager) registerOneTimeFlow(name string, router *routerpkg.Router, mes
} }
func registerHandshakeRoutes(router *routerpkg.Router) ( func registerHandshakeRoutes(router *routerpkg.Router) (
receiveVersionRoute *routerpkg.Route, sendVersionRoute *routerpkg.Route) { receiveVersionRoute, sendVersionRoute, receiveReadyRoute *routerpkg.Route) {
receiveVersionRoute, err := router.AddIncomingRoute("recieveVersion - incoming", []appmessage.MessageCommand{appmessage.CmdVersion}) receiveVersionRoute, err := router.AddIncomingRoute("recieveVersion - incoming", []appmessage.MessageCommand{appmessage.CmdVersion})
if err != nil { if err != nil {
panic(err) panic(err)
@ -366,11 +229,16 @@ func registerHandshakeRoutes(router *routerpkg.Router) (
panic(err) panic(err)
} }
return receiveVersionRoute, sendVersionRoute receiveReadyRoute, err = router.AddIncomingRoute("recieveReady - incoming", []appmessage.MessageCommand{appmessage.CmdReady})
if err != nil {
panic(err)
}
return receiveVersionRoute, sendVersionRoute, receiveReadyRoute
} }
func removeHandshakeRoutes(router *routerpkg.Router) { func removeHandshakeRoutes(router *routerpkg.Router) {
err := router.RemoveRoute([]appmessage.MessageCommand{appmessage.CmdVersion, appmessage.CmdVerAck}) err := router.RemoveRoute([]appmessage.MessageCommand{appmessage.CmdVersion, appmessage.CmdVerAck, appmessage.CmdReady})
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -513,6 +513,9 @@ func (pm *pruningManager) ArePruningPointsViolatingFinality(stagingArea *model.S
return false, err return false, err
} }
// We need to check if virtualFinalityPointFinalityPoint is in the selected chain of
// the most recent known pruning point, so we iterate the pruning points from the most
// recent one until we find a known pruning point.
for i := len(pruningPoints) - 1; i >= 0; i-- { for i := len(pruningPoints) - 1; i >= 0; i-- {
blockHash := consensushashing.HeaderHash(pruningPoints[i]) blockHash := consensushashing.HeaderHash(pruningPoints[i])
exists, err := pm.blockStatusStore.Exists(pm.databaseContext, stagingArea, blockHash) exists, err := pm.blockStatusStore.Exists(pm.databaseContext, stagingArea, blockHash)

View File

@ -52,6 +52,7 @@ const (
defaultSigCacheMaxSize = 100000 defaultSigCacheMaxSize = 100000
sampleConfigFilename = "sample-kaspad.conf" sampleConfigFilename = "sample-kaspad.conf"
defaultMaxUTXOCacheSize = 5000000000 defaultMaxUTXOCacheSize = 5000000000
defaultProtocolVersion = 4
) )
var ( var (
@ -122,6 +123,7 @@ type Flags struct {
IsArchivalNode bool `long:"archival" description:"Run as an archival node: don't delete old block data when moving the pruning point (Warning: heavy disk usage)'"` IsArchivalNode bool `long:"archival" description:"Run as an archival node: don't delete old block data when moving the pruning point (Warning: heavy disk usage)'"`
AllowSubmitBlockWhenNotSynced bool `long:"allow-submit-block-when-not-synced" hidden:"true" description:"Allow the node to accept blocks from RPC while not synced (this flag is mainly used for testing)"` AllowSubmitBlockWhenNotSynced bool `long:"allow-submit-block-when-not-synced" hidden:"true" description:"Allow the node to accept blocks from RPC while not synced (this flag is mainly used for testing)"`
EnableSanityCheckPruningUTXOSet bool `long:"enable-sanity-check-pruning-utxo" hidden:"true" description:"When moving the pruning point - check that the utxo set matches the utxo commitment"` EnableSanityCheckPruningUTXOSet bool `long:"enable-sanity-check-pruning-utxo" hidden:"true" description:"When moving the pruning point - check that the utxo set matches the utxo commitment"`
ProtocolVersion uint32 `long:"protocol-version" description:"Use non default p2p protocol version"`
NetworkFlags NetworkFlags
ServiceOptions *ServiceOptions ServiceOptions *ServiceOptions
} }
@ -188,6 +190,7 @@ func defaultFlags() *Flags {
MinRelayTxFee: defaultMinRelayTxFee, MinRelayTxFee: defaultMinRelayTxFee,
MaxUTXOCacheSize: defaultMaxUTXOCacheSize, MaxUTXOCacheSize: defaultMaxUTXOCacheSize,
ServiceOptions: &ServiceOptions{}, ServiceOptions: &ServiceOptions{},
ProtocolVersion: defaultProtocolVersion,
} }
} }

View File

@ -44,6 +44,7 @@ message KaspadMessage {
PruningPointsMessage pruningPoints = 47; PruningPointsMessage pruningPoints = 47;
RequestPruningPointProofMessage requestPruningPointProof = 48; RequestPruningPointProofMessage requestPruningPointProof = 48;
PruningPointProofMessage pruningPointProof = 49; PruningPointProofMessage pruningPointProof = 49;
ReadyMessage ready = 50;
GetCurrentNetworkRequestMessage getCurrentNetworkRequest = 1001; GetCurrentNetworkRequestMessage getCurrentNetworkRequest = 1001;
GetCurrentNetworkResponseMessage getCurrentNetworkResponse = 1002; GetCurrentNetworkResponseMessage getCurrentNetworkResponse = 1002;

View File

@ -11,8 +11,7 @@ import (
// This is a compile-time assertion to ensure that this generated file // This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against. // is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion6
const _ = grpc.SupportPackageIsVersion7
// P2PClient is the client API for P2P service. // P2PClient is the client API for P2P service.
// //
@ -30,7 +29,7 @@ func NewP2PClient(cc grpc.ClientConnInterface) P2PClient {
} }
func (c *p2PClient) MessageStream(ctx context.Context, opts ...grpc.CallOption) (P2P_MessageStreamClient, error) { func (c *p2PClient) MessageStream(ctx context.Context, opts ...grpc.CallOption) (P2P_MessageStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &P2P_ServiceDesc.Streams[0], "/protowire.P2P/MessageStream", opts...) stream, err := c.cc.NewStream(ctx, &_P2P_serviceDesc.Streams[0], "/protowire.P2P/MessageStream", opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -72,20 +71,13 @@ type P2PServer interface {
type UnimplementedP2PServer struct { type UnimplementedP2PServer struct {
} }
func (UnimplementedP2PServer) MessageStream(P2P_MessageStreamServer) error { func (*UnimplementedP2PServer) MessageStream(P2P_MessageStreamServer) error {
return status.Errorf(codes.Unimplemented, "method MessageStream not implemented") return status.Errorf(codes.Unimplemented, "method MessageStream not implemented")
} }
func (UnimplementedP2PServer) mustEmbedUnimplementedP2PServer() {} func (*UnimplementedP2PServer) mustEmbedUnimplementedP2PServer() {}
// UnsafeP2PServer may be embedded to opt out of forward compatibility for this service. func RegisterP2PServer(s *grpc.Server, srv P2PServer) {
// Use of this interface is not recommended, as added methods to P2PServer will s.RegisterService(&_P2P_serviceDesc, srv)
// result in compilation errors.
type UnsafeP2PServer interface {
mustEmbedUnimplementedP2PServer()
}
func RegisterP2PServer(s grpc.ServiceRegistrar, srv P2PServer) {
s.RegisterService(&P2P_ServiceDesc, srv)
} }
func _P2P_MessageStream_Handler(srv interface{}, stream grpc.ServerStream) error { func _P2P_MessageStream_Handler(srv interface{}, stream grpc.ServerStream) error {
@ -114,10 +106,7 @@ func (x *p2PMessageStreamServer) Recv() (*KaspadMessage, error) {
return m, nil return m, nil
} }
// P2P_ServiceDesc is the grpc.ServiceDesc for P2P service. var _P2P_serviceDesc = grpc.ServiceDesc{
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var P2P_ServiceDesc = grpc.ServiceDesc{
ServiceName: "protowire.P2P", ServiceName: "protowire.P2P",
HandlerType: (*P2PServer)(nil), HandlerType: (*P2PServer)(nil),
Methods: []grpc.MethodDesc{}, Methods: []grpc.MethodDesc{},
@ -148,7 +137,7 @@ func NewRPCClient(cc grpc.ClientConnInterface) RPCClient {
} }
func (c *rPCClient) MessageStream(ctx context.Context, opts ...grpc.CallOption) (RPC_MessageStreamClient, error) { func (c *rPCClient) MessageStream(ctx context.Context, opts ...grpc.CallOption) (RPC_MessageStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &RPC_ServiceDesc.Streams[0], "/protowire.RPC/MessageStream", opts...) stream, err := c.cc.NewStream(ctx, &_RPC_serviceDesc.Streams[0], "/protowire.RPC/MessageStream", opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -190,20 +179,13 @@ type RPCServer interface {
type UnimplementedRPCServer struct { type UnimplementedRPCServer struct {
} }
func (UnimplementedRPCServer) MessageStream(RPC_MessageStreamServer) error { func (*UnimplementedRPCServer) MessageStream(RPC_MessageStreamServer) error {
return status.Errorf(codes.Unimplemented, "method MessageStream not implemented") return status.Errorf(codes.Unimplemented, "method MessageStream not implemented")
} }
func (UnimplementedRPCServer) mustEmbedUnimplementedRPCServer() {} func (*UnimplementedRPCServer) mustEmbedUnimplementedRPCServer() {}
// UnsafeRPCServer may be embedded to opt out of forward compatibility for this service. func RegisterRPCServer(s *grpc.Server, srv RPCServer) {
// Use of this interface is not recommended, as added methods to RPCServer will s.RegisterService(&_RPC_serviceDesc, srv)
// result in compilation errors.
type UnsafeRPCServer interface {
mustEmbedUnimplementedRPCServer()
}
func RegisterRPCServer(s grpc.ServiceRegistrar, srv RPCServer) {
s.RegisterService(&RPC_ServiceDesc, srv)
} }
func _RPC_MessageStream_Handler(srv interface{}, stream grpc.ServerStream) error { func _RPC_MessageStream_Handler(srv interface{}, stream grpc.ServerStream) error {
@ -232,10 +214,7 @@ func (x *rPCMessageStreamServer) Recv() (*KaspadMessage, error) {
return m, nil return m, nil
} }
// RPC_ServiceDesc is the grpc.ServiceDesc for RPC service. var _RPC_serviceDesc = grpc.ServiceDesc{
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var RPC_ServiceDesc = grpc.ServiceDesc{
ServiceName: "protowire.RPC", ServiceName: "protowire.RPC",
HandlerType: (*RPCServer)(nil), HandlerType: (*RPCServer)(nil),
Methods: []grpc.MethodDesc{}, Methods: []grpc.MethodDesc{},

View File

@ -1,12 +1,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.26.0 // protoc-gen-go v1.25.0
// protoc v3.12.3 // protoc v3.12.3
// source: p2p.proto // source: p2p.proto
package protowire package protowire
import ( import (
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect" reflect "reflect"
@ -20,6 +21,10 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
) )
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type RequestAddressesMessage struct { type RequestAddressesMessage struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -2798,6 +2803,44 @@ func (x *PruningPointProofHeaderArray) GetHeaders() []*BlockHeader {
return nil return nil
} }
type ReadyMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *ReadyMessage) Reset() {
*x = ReadyMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_p2p_proto_msgTypes[52]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReadyMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReadyMessage) ProtoMessage() {}
func (x *ReadyMessage) ProtoReflect() protoreflect.Message {
mi := &file_p2p_proto_msgTypes[52]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReadyMessage.ProtoReflect.Descriptor instead.
func (*ReadyMessage) Descriptor() ([]byte, []int) {
return file_p2p_proto_rawDescGZIP(), []int{52}
}
var File_p2p_proto protoreflect.FileDescriptor var File_p2p_proto protoreflect.FileDescriptor
var file_p2p_proto_rawDesc = []byte{ var file_p2p_proto_rawDesc = []byte{
@ -3137,7 +3180,8 @@ var file_p2p_proto_rawDesc = []byte{
0x72, 0x72, 0x61, 0x79, 0x12, 0x30, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x72, 0x72, 0x61, 0x79, 0x12, 0x30, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18,
0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x77, 0x69, 0x72, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x77, 0x69, 0x72,
0x65, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x07, 0x68, 0x65, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x07, 0x68,
0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x22, 0x0e, 0x0a, 0x0c, 0x52, 0x65, 0x61, 0x64, 0x79, 0x4d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x61, 0x73, 0x70, 0x61, 0x6e, 0x65, 0x74, 0x2f, 0x6b, 0x61, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x61, 0x73, 0x70, 0x61, 0x6e, 0x65, 0x74, 0x2f, 0x6b, 0x61,
0x73, 0x70, 0x61, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x77, 0x69, 0x72, 0x65, 0x62, 0x06, 0x73, 0x70, 0x61, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x77, 0x69, 0x72, 0x65, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
@ -3155,7 +3199,7 @@ func file_p2p_proto_rawDescGZIP() []byte {
return file_p2p_proto_rawDescData return file_p2p_proto_rawDescData
} }
var file_p2p_proto_msgTypes = make([]protoimpl.MessageInfo, 52) var file_p2p_proto_msgTypes = make([]protoimpl.MessageInfo, 53)
var file_p2p_proto_goTypes = []interface{}{ var file_p2p_proto_goTypes = []interface{}{
(*RequestAddressesMessage)(nil), // 0: protowire.RequestAddressesMessage (*RequestAddressesMessage)(nil), // 0: protowire.RequestAddressesMessage
(*AddressesMessage)(nil), // 1: protowire.AddressesMessage (*AddressesMessage)(nil), // 1: protowire.AddressesMessage
@ -3209,6 +3253,7 @@ var file_p2p_proto_goTypes = []interface{}{
(*RequestPruningPointProofMessage)(nil), // 49: protowire.RequestPruningPointProofMessage (*RequestPruningPointProofMessage)(nil), // 49: protowire.RequestPruningPointProofMessage
(*PruningPointProofMessage)(nil), // 50: protowire.PruningPointProofMessage (*PruningPointProofMessage)(nil), // 50: protowire.PruningPointProofMessage
(*PruningPointProofHeaderArray)(nil), // 51: protowire.PruningPointProofHeaderArray (*PruningPointProofHeaderArray)(nil), // 51: protowire.PruningPointProofHeaderArray
(*ReadyMessage)(nil), // 52: protowire.ReadyMessage
} }
var file_p2p_proto_depIdxs = []int32{ var file_p2p_proto_depIdxs = []int32{
3, // 0: protowire.RequestAddressesMessage.subnetworkId:type_name -> protowire.SubnetworkId 3, // 0: protowire.RequestAddressesMessage.subnetworkId:type_name -> protowire.SubnetworkId
@ -3900,6 +3945,18 @@ func file_p2p_proto_init() {
return nil return nil
} }
} }
file_p2p_proto_msgTypes[52].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReadyMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
} }
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
@ -3907,7 +3964,7 @@ func file_p2p_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_p2p_proto_rawDesc, RawDescriptor: file_p2p_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 52, NumMessages: 53,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },

View File

@ -78,7 +78,7 @@ message BlockHeader{
} }
message BlockLevelParents { message BlockLevelParents {
repeated Hash parentHashes = 1; repeated Hash parentHashes = 1;
} }
message Hash{ message Hash{
@ -252,3 +252,6 @@ message PruningPointProofMessage {
message PruningPointProofHeaderArray { message PruningPointProofHeaderArray {
repeated BlockHeader headers = 1; repeated BlockHeader headers = 1;
} }
message ReadyMessage {
}

View File

@ -0,0 +1,17 @@
package protowire
import (
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/pkg/errors"
)
func (x *KaspadMessage_Ready) toAppMessage() (appmessage.Message, error) {
if x == nil {
return nil, errors.Wrapf(errorNil, "KaspadMessage_Ready is nil")
}
return &appmessage.MsgReady{}, nil
}
func (x *KaspadMessage_Ready) fromAppMessage(_ *appmessage.MsgReady) error {
return nil
}

View File

@ -10,13 +10,14 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.26.0 // protoc-gen-go v1.25.0
// protoc v3.12.3 // protoc v3.12.3
// source: rpc.proto // source: rpc.proto
package protowire package protowire
import ( import (
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect" reflect "reflect"
@ -30,6 +31,10 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
) )
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type SubmitBlockResponseMessage_RejectReason int32 type SubmitBlockResponseMessage_RejectReason int32
const ( const (

View File

@ -310,6 +310,13 @@ func toP2PPayload(message appmessage.Message) (isKaspadMessage_Payload, error) {
return nil, err return nil, err
} }
return payload, nil return payload, nil
case *appmessage.MsgReady:
payload := new(KaspadMessage_Ready)
err := payload.fromAppMessage(message)
if err != nil {
return nil, err
}
return payload, nil
default: default:
return nil, nil return nil, nil
} }

View File

@ -214,6 +214,11 @@ outerLoop:
panic(errors.Wrap(err, "error registering everythingElseRoute")) panic(errors.Wrap(err, "error registering everythingElseRoute"))
} }
err = router.OutgoingRoute().Enqueue(appmessage.NewMsgReady())
if err != nil {
panic(errors.Wrap(err, "error sending ready message"))
}
spawn("netAdapterMock-routeInitializer-sendRoutesToChan", func() { spawn("netAdapterMock-routeInitializer-sendRoutesToChan", func() {
routesChan <- &Routes{ routesChan <- &Routes{
netConnection: netConnection, netConnection: netConnection,

View File

@ -31,3 +31,51 @@ func TestAddressExchange(t *testing.T) {
t.Errorf("Didn't find testAddress in list of addresses of appHarness3") t.Errorf("Didn't find testAddress in list of addresses of appHarness3")
} }
func TestAddressExchangeV3V4(t *testing.T) {
harnesses, teardown := setupHarnesses(t, []*harnessParams{
{
p2pAddress: p2pAddress1,
rpcAddress: rpcAddress1,
miningAddress: miningAddress1,
miningAddressPrivateKey: miningAddress1PrivateKey,
},
{
p2pAddress: p2pAddress2,
rpcAddress: rpcAddress2,
miningAddress: miningAddress2,
miningAddressPrivateKey: miningAddress2PrivateKey,
}, {
p2pAddress: p2pAddress3,
rpcAddress: rpcAddress3,
miningAddress: miningAddress3,
miningAddressPrivateKey: miningAddress3PrivateKey,
protocolVersion: 3,
},
})
defer teardown()
appHarness1, appHarness2, appHarness3 := harnesses[0], harnesses[1], harnesses[2]
testAddress := "1.2.3.4:6789"
err := addressmanager.AddAddressByIP(appHarness1.app.AddressManager(), testAddress, nil)
if err != nil {
t.Fatalf("Error adding address to addressManager: %+v", err)
}
connect(t, appHarness1, appHarness2)
connect(t, appHarness2, appHarness3)
peerAddresses, err := appHarness3.rpcClient.GetPeerAddresses()
if err != nil {
t.Fatalf("Error getting peer addresses: %+v", err)
}
for _, peerAddress := range peerAddresses.Addresses {
if peerAddress.Addr == testAddress {
return
}
}
t.Errorf("Didn't find testAddress in list of addresses of appHarness3")
}

View File

@ -30,13 +30,16 @@ const (
defaultTimeout = 10 * time.Second defaultTimeout = 10 * time.Second
) )
func setConfig(t *testing.T, harness *appHarness) { func setConfig(t *testing.T, harness *appHarness, protocolVersion uint32) {
harness.config = commonConfig() harness.config = commonConfig()
harness.config.AppDir = randomDirectory(t) harness.config.AppDir = randomDirectory(t)
harness.config.Listeners = []string{harness.p2pAddress} harness.config.Listeners = []string{harness.p2pAddress}
harness.config.RPCListeners = []string{harness.rpcAddress} harness.config.RPCListeners = []string{harness.rpcAddress}
harness.config.UTXOIndex = harness.utxoIndex harness.config.UTXOIndex = harness.utxoIndex
harness.config.AllowSubmitBlockWhenNotSynced = true harness.config.AllowSubmitBlockWhenNotSynced = true
if protocolVersion != 0 {
harness.config.ProtocolVersion = protocolVersion
}
if harness.overrideDAGParams != nil { if harness.overrideDAGParams != nil {
harness.config.ActiveNetParams = harness.overrideDAGParams harness.config.ActiveNetParams = harness.overrideDAGParams

View File

@ -34,6 +34,7 @@ type harnessParams struct {
miningAddressPrivateKey string miningAddressPrivateKey string
utxoIndex bool utxoIndex bool
overrideDAGParams *dagconfig.Params overrideDAGParams *dagconfig.Params
protocolVersion uint32
} }
// setupHarness creates a single appHarness with given parameters // setupHarness creates a single appHarness with given parameters
@ -47,7 +48,7 @@ func setupHarness(t *testing.T, params *harnessParams) (harness *appHarness, tea
overrideDAGParams: params.overrideDAGParams, overrideDAGParams: params.overrideDAGParams,
} }
setConfig(t, harness) setConfig(t, harness, params.protocolVersion)
setDatabaseContext(t, harness) setDatabaseContext(t, harness)
setApp(t, harness) setApp(t, harness)
harness.app.Start() harness.app.Start()

View File

@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
const ( const (
appMajor uint = 0 appMajor uint = 0
appMinor uint = 11 appMinor uint = 11
appPatch uint = 9 appPatch uint = 10
) )
// appBuild is defined as a variable so it can be overridden during the build // appBuild is defined as a variable so it can be overridden during the build