From aa5bc3428004eeebf8889e901e989d36fdcc0815 Mon Sep 17 00:00:00 2001 From: Svarog Date: Sun, 19 Jul 2020 14:57:34 +0300 Subject: [PATCH] [NOD-1148] P2P stabilization (#798) * [NOD-1148] Add lock around route's close operation * [NOD-1148] Added tracing of incoming and outgoing messages * [NOD-1148] Cast to MsgPing should have been to MsgPong * [NOD-1148] Check for NeedMoreAddresses before sending GetAddr message and invert condition --- netadapter/router/route.go | 39 ++++++++++++------- .../server/grpcserver/connection_loops.go | 13 +++++++ .../flows/addressexchange/receiveaddresses.go | 8 ++-- protocol/flows/ping/ping.go | 5 ++- protocol/protocol.go | 2 +- wire/message.go | 7 ++-- 6 files changed, 49 insertions(+), 25 deletions(-) diff --git a/netadapter/router/route.go b/netadapter/router/route.go index 79a4a5799..9df7e0b33 100644 --- a/netadapter/router/route.go +++ b/netadapter/router/route.go @@ -1,9 +1,11 @@ package router import ( + "sync" + "time" + "github.com/kaspanet/kaspad/wire" "github.com/pkg/errors" - "time" ) const ( @@ -20,7 +22,10 @@ type onCapacityReachedHandler func() // Route represents an incoming or outgoing Router route type Route struct { channel chan wire.Message - closed bool + // closed and closeLock are used to protect us from writing to a closed channel + // reads use the channel's built-in mechanism to check if the channel is closed + closed bool + closeLock sync.Mutex onCapacityReachedHandler onCapacityReachedHandler } @@ -35,6 +40,9 @@ func NewRoute() *Route { // Enqueue enqueues a message to the Route func (r *Route) Enqueue(message wire.Message) (isOpen bool) { + r.closeLock.Lock() + defer r.closeLock.Unlock() + if r.closed { return false } @@ -45,17 +53,12 @@ func (r *Route) Enqueue(message wire.Message) (isOpen bool) { return true } -// Dequeue dequeues a message from the Route -func (r *Route) Dequeue() (message wire.Message, isOpen bool) { - if r.closed { - return nil, false - } - return <-r.channel, true -} - // EnqueueWithTimeout attempts to enqueue a message to the Route // and returns an error if the given timeout expires first. func (r *Route) EnqueueWithTimeout(message wire.Message, timeout time.Duration) (isOpen bool, err error) { + r.closeLock.Lock() + defer r.closeLock.Unlock() + if r.closed { return false, nil } @@ -70,17 +73,20 @@ func (r *Route) EnqueueWithTimeout(message wire.Message, timeout time.Duration) } } +// Dequeue dequeues a message from the Route +func (r *Route) Dequeue() (message wire.Message, isOpen bool) { + message, isOpen = <-r.channel + return message, isOpen +} + // DequeueWithTimeout attempts to dequeue a message from the Route // and returns an error if the given timeout expires first. func (r *Route) DequeueWithTimeout(timeout time.Duration) (message wire.Message, isOpen bool, err error) { - if r.closed { - return nil, false, nil - } select { case <-time.After(timeout): return nil, false, errors.Wrapf(ErrTimeout, "got timeout after %s", timeout) - case message := <-r.channel: - return message, true, nil + case message, isOpen = <-r.channel: + return message, isOpen, nil } } @@ -90,6 +96,9 @@ func (r *Route) setOnCapacityReachedHandler(onCapacityReachedHandler onCapacityR // Close closes this route func (r *Route) Close() error { + r.closeLock.Lock() + defer r.closeLock.Unlock() + r.closed = true close(r.channel) return nil diff --git a/netadapter/server/grpcserver/connection_loops.go b/netadapter/server/grpcserver/connection_loops.go index db0c3afeb..3b72d96a6 100644 --- a/netadapter/server/grpcserver/connection_loops.go +++ b/netadapter/server/grpcserver/connection_loops.go @@ -3,6 +3,9 @@ package grpcserver import ( "io" + "github.com/davecgh/go-spew/spew" + "github.com/kaspanet/kaspad/logger" + "github.com/kaspanet/kaspad/netadapter/server/grpcserver/protowire" ) @@ -33,6 +36,11 @@ func (c *gRPCConnection) sendLoop() error { if !isOpen { return nil } + + log.Tracef("outgoing '%s' message to %s: %s", message.Command(), c, logger.NewLogClosure(func() string { + return spew.Sdump(message) + })) + messageProto, err := protowire.FromWireMessage(message) if err != nil { return err @@ -60,6 +68,11 @@ func (c *gRPCConnection) receiveLoop() error { if err != nil { return err } + + log.Tracef("incoming '%s' message from %s: %s", message.Command(), c, logger.NewLogClosure(func() string { + return spew.Sdump(message) + })) + isOpen, err := c.router.EnqueueIncomingMessage(message) if err != nil { return err diff --git a/protocol/flows/addressexchange/receiveaddresses.go b/protocol/flows/addressexchange/receiveaddresses.go index b9c8aeaea..e26dffef6 100644 --- a/protocol/flows/addressexchange/receiveaddresses.go +++ b/protocol/flows/addressexchange/receiveaddresses.go @@ -22,6 +22,10 @@ func ReceiveAddresses(incomingRoute *router.Route, outgoingRoute *router.Route, panic(err) } + if !addressManager.NeedMoreAddresses() { + return false, nil + } + msgGetAddresses := wire.NewMsgGetAddresses(false, subnetworkID) isOpen, err := outgoingRoute.EnqueueWithTimeout(msgGetAddresses, timeout) if err != nil { @@ -31,10 +35,6 @@ func ReceiveAddresses(incomingRoute *router.Route, outgoingRoute *router.Route, return true, nil } - if addressManager.NeedMoreAddresses() { - return false, nil - } - message, isOpen, err := incomingRoute.DequeueWithTimeout(timeout) if err != nil { return false, err diff --git a/protocol/flows/ping/ping.go b/protocol/flows/ping/ping.go index bd6f8b181..e215c46a8 100644 --- a/protocol/flows/ping/ping.go +++ b/protocol/flows/ping/ping.go @@ -1,12 +1,13 @@ package ping import ( + "time" + "github.com/kaspanet/kaspad/netadapter/router" peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/protocol/protocolerrors" "github.com/kaspanet/kaspad/util/random" "github.com/kaspanet/kaspad/wire" - "time" ) const pingTimeout = 30 * time.Second @@ -63,7 +64,7 @@ func SendPings(incomingRoute *router.Route, outgoingRoute *router.Route, peer *p if !isOpen { return nil } - pongMessage := message.(*wire.MsgPing) + pongMessage := message.(*wire.MsgPong) if pongMessage.Nonce != pingMessage.Nonce { return protocolerrors.New(true, "nonce mismatch between ping and pong") } diff --git a/protocol/protocol.go b/protocol/protocol.go index afe4efbd1..c47a1e3a9 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -1,7 +1,6 @@ package protocol import ( - "errors" "sync/atomic" "github.com/kaspanet/kaspad/protocol/flows/handshake" @@ -17,6 +16,7 @@ import ( peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/protocol/protocolerrors" "github.com/kaspanet/kaspad/wire" + "github.com/pkg/errors" ) // Init initializes the p2p protocol diff --git a/wire/message.go b/wire/message.go index beb15da9a..76bad9af0 100644 --- a/wire/message.go +++ b/wire/message.go @@ -7,9 +7,10 @@ package wire import ( "bytes" "fmt" - "github.com/pkg/errors" "io" + "github.com/pkg/errors" + "github.com/kaspanet/kaspad/util/daghash" ) @@ -65,8 +66,8 @@ const ( var messageCommandToString = map[MessageCommand]string{ CmdVersion: "Version", CmdVerAck: "VerAck", - CmdGetAddresses: "GetAddr", - CmdAddress: "Addr", + CmdGetAddresses: "GetAddress", + CmdAddress: "Address", CmdGetBlockInvs: "GetBlockInvs", CmdInv: "Inv", CmdGetData: "GetData",