diff --git a/protocol/peer/peer.go b/protocol/peer/peer.go index 0016cc554..ccc61881e 100644 --- a/protocol/peer/peer.go +++ b/protocol/peer/peer.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "sync" "sync/atomic" + "time" ) // Peer holds data about a peer. @@ -24,6 +25,11 @@ type Peer struct { protocolVersion uint32 // negotiated protocol version disableRelayTx bool subnetworkID *subnetworkid.SubnetworkID + + pingLock sync.RWMutex + lastPingNonce uint64 // The nonce of the last ping we sent + lastPingTime time.Time // Time we sent last ping + lastPingDuration time.Duration // Time for last ping to return } // SelectedTipHash returns the selected tip of the peer. @@ -87,6 +93,24 @@ func (p *Peer) UpdateFieldsFromMsgVersion(msg *wire.MsgVersion, peerID uint32) { p.subnetworkID = msg.SubnetworkID } +// SetPingPending sets the ping state of the peer to 'pending' +func (p *Peer) SetPingPending(nonce uint64) { + p.pingLock.Lock() + defer p.pingLock.Unlock() + + p.lastPingNonce = nonce + p.lastPingTime = time.Now() +} + +// SetPingIdle sets the ping state of the peer to 'idle' +func (p *Peer) SetPingIdle() { + p.pingLock.Lock() + defer p.pingLock.Unlock() + + p.lastPingNonce = 0 + p.lastPingDuration = time.Since(p.lastPingTime) +} + func (p *Peer) String() string { //TODO(libp2p) panic("unimplemented") diff --git a/protocol/ping/ping.go b/protocol/ping/ping.go new file mode 100644 index 000000000..1c184ca36 --- /dev/null +++ b/protocol/ping/ping.go @@ -0,0 +1,62 @@ +package ping + +import ( + "errors" + "github.com/kaspanet/kaspad/netadapter/router" + peerpkg "github.com/kaspanet/kaspad/protocol/peer" + "github.com/kaspanet/kaspad/util/random" + "github.com/kaspanet/kaspad/wire" + "time" +) + +// ReceivePings handles all ping messages coming through incomingRoute. +// This function assumes that incomingRoute will only return MsgPing. +func ReceivePings(incomingRoute *router.Route, outgoingRoute *router.Route) error { + for { + message, isOpen := incomingRoute.Dequeue() + if !isOpen { + return nil + } + pingMessage := message.(*wire.MsgPing) + + pongMessage := wire.NewMsgPong(pingMessage.Nonce) + isOpen = outgoingRoute.Enqueue(pongMessage) + if !isOpen { + return nil + } + } +} + +// SendPings starts sending MsgPings every pingInterval seconds to the +// given peer. +// This function assumes that incomingRoute will only return MsgPong. +func SendPings(incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { + const pingInterval = 2 * time.Minute + ticker := time.NewTicker(pingInterval) + defer ticker.Stop() + + for range ticker.C { + nonce, err := random.Uint64() + if err != nil { + return err + } + peer.SetPingPending(nonce) + + pingMessage := wire.NewMsgPing(nonce) + isOpen := outgoingRoute.Enqueue(pingMessage) + if !isOpen { + return nil + } + + message, isOpen := incomingRoute.Dequeue() + if !isOpen { + return nil + } + pongMessage := message.(*wire.MsgPing) + if pongMessage.Nonce != pingMessage.Nonce { + return errors.New("nonce mismatch between ping and pong") + } + peer.SetPingIdle() + } + return nil +} diff --git a/protocol/protocol.go b/protocol/protocol.go index a6debd5a2..edd4196f2 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -7,6 +7,7 @@ import ( "github.com/kaspanet/kaspad/protocol/handlerelayblockrequests" "github.com/kaspanet/kaspad/protocol/handlerelayinvs" peerpkg "github.com/kaspanet/kaspad/protocol/peer" + "github.com/kaspanet/kaspad/protocol/ping" "github.com/kaspanet/kaspad/wire" "sync/atomic" ) @@ -74,28 +75,17 @@ func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag }, ) - // TODO(libp2p): Remove this and change it with a real Ping-Pong flow. - addFlow("PingPong", router, []string{wire.CmdPing, wire.CmdPong}, - &stopped, stop, func(incomingRoute *routerpkg.Route) error { + addFlow("ReceivePings", router, []string{wire.CmdPing}, &stopped, stop, + func(incomingRoute *routerpkg.Route) error { + return ping.ReceivePings(incomingRoute, outgoingRoute) + }, + ) - isOpen := outgoingRoute.Enqueue(wire.NewMsgPing(666)) - if !isOpen { - return nil - } - message, isOpen := incomingRoute.Dequeue() - if !isOpen { - return nil - } - for { - log.Infof("Got message: %+v", message.Command()) - if message.Command() == "ping" { - isOpen := outgoingRoute.Enqueue(wire.NewMsgPong(666)) - if !isOpen { - return nil - } - } - } - }) + addFlow("SendPings", router, []string{wire.CmdPong}, &stopped, stop, + func(incomingRoute *routerpkg.Route) error { + return ping.SendPings(incomingRoute, outgoingRoute, peer) + }, + ) err := <-stop return err