[NOD-1142] Implement ping flow (#793)

* [NOD-1124] Move Router to the router package.

* [NOD-1124] Implement SetOnRouteCapacityReachedHandler.

* [NOD-1124] Use Routes instead of bare channels.

* [NOD-1124] Fix merge errors.

* [NOD-1124] Connect the Router to the Connection.

* [NOD-1124] Fix merge errors.

* [NOD-1124] Move some variables around.

* [NOD-1124] Fix unreachable code.

* [NOD-1124] Fix a variable name.

* [NOD-1142] Implement ping flows.

* [NOD-1142] Add ping flows to startFlows.

* [NOD-1142] Fix merge errors.

* [NOD-1142] Fix a typo.

* [NOD-1142] Add comments to exported functions.

* [NOD-1142] Fix bad flow name.

* [NOD-1142] Remove a redundant empty line.

* [NOD-1142] Fix a typo.

* [NOD-1142] Simplify for loop.

* [NOD-1142] Rename HandlePing to HandleIncomingPings and StartPingLoop to StartSendingPings.

* [NOD-1142] Fix no-longer-infinite loop.

* [NOD-1142] Represent ping duration as time.Duration instead of an int64.

* [NOD-1142] Rename HandleIncomingPings to ReceivePings and StartSendingPings to SendPings.

* [NOD-1142] Move pingInterval to within SendPings.

* [NOD-1142] Rephrase a comment.
This commit is contained in:
stasatdaglabs 2020-07-14 12:41:37 +03:00 committed by GitHub
parent 05db135d23
commit 6076309b3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 21 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
"sync"
"sync/atomic"
"time"
)
// Peer holds data about a peer.
@ -24,6 +25,11 @@ type Peer struct {
protocolVersion uint32 // negotiated protocol version
disableRelayTx bool
subnetworkID *subnetworkid.SubnetworkID
pingLock sync.RWMutex
lastPingNonce uint64 // The nonce of the last ping we sent
lastPingTime time.Time // Time we sent last ping
lastPingDuration time.Duration // Time for last ping to return
}
// SelectedTipHash returns the selected tip of the peer.
@ -87,6 +93,24 @@ func (p *Peer) UpdateFieldsFromMsgVersion(msg *wire.MsgVersion, peerID uint32) {
p.subnetworkID = msg.SubnetworkID
}
// SetPingPending sets the ping state of the peer to 'pending'
func (p *Peer) SetPingPending(nonce uint64) {
p.pingLock.Lock()
defer p.pingLock.Unlock()
p.lastPingNonce = nonce
p.lastPingTime = time.Now()
}
// SetPingIdle sets the ping state of the peer to 'idle'
func (p *Peer) SetPingIdle() {
p.pingLock.Lock()
defer p.pingLock.Unlock()
p.lastPingNonce = 0
p.lastPingDuration = time.Since(p.lastPingTime)
}
func (p *Peer) String() string {
//TODO(libp2p)
panic("unimplemented")

62
protocol/ping/ping.go Normal file
View File

@ -0,0 +1,62 @@
package ping
import (
"errors"
"github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/util/random"
"github.com/kaspanet/kaspad/wire"
"time"
)
// ReceivePings handles all ping messages coming through incomingRoute.
// This function assumes that incomingRoute will only return MsgPing.
func ReceivePings(incomingRoute *router.Route, outgoingRoute *router.Route) error {
for {
message, isOpen := incomingRoute.Dequeue()
if !isOpen {
return nil
}
pingMessage := message.(*wire.MsgPing)
pongMessage := wire.NewMsgPong(pingMessage.Nonce)
isOpen = outgoingRoute.Enqueue(pongMessage)
if !isOpen {
return nil
}
}
}
// SendPings starts sending MsgPings every pingInterval seconds to the
// given peer.
// This function assumes that incomingRoute will only return MsgPong.
func SendPings(incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error {
const pingInterval = 2 * time.Minute
ticker := time.NewTicker(pingInterval)
defer ticker.Stop()
for range ticker.C {
nonce, err := random.Uint64()
if err != nil {
return err
}
peer.SetPingPending(nonce)
pingMessage := wire.NewMsgPing(nonce)
isOpen := outgoingRoute.Enqueue(pingMessage)
if !isOpen {
return nil
}
message, isOpen := incomingRoute.Dequeue()
if !isOpen {
return nil
}
pongMessage := message.(*wire.MsgPing)
if pongMessage.Nonce != pingMessage.Nonce {
return errors.New("nonce mismatch between ping and pong")
}
peer.SetPingIdle()
}
return nil
}

View File

@ -7,6 +7,7 @@ import (
"github.com/kaspanet/kaspad/protocol/handlerelayblockrequests"
"github.com/kaspanet/kaspad/protocol/handlerelayinvs"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/ping"
"github.com/kaspanet/kaspad/wire"
"sync/atomic"
)
@ -74,28 +75,17 @@ func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag
},
)
// TODO(libp2p): Remove this and change it with a real Ping-Pong flow.
addFlow("PingPong", router, []string{wire.CmdPing, wire.CmdPong},
&stopped, stop, func(incomingRoute *routerpkg.Route) error {
addFlow("ReceivePings", router, []string{wire.CmdPing}, &stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ping.ReceivePings(incomingRoute, outgoingRoute)
},
)
isOpen := outgoingRoute.Enqueue(wire.NewMsgPing(666))
if !isOpen {
return nil
}
message, isOpen := incomingRoute.Dequeue()
if !isOpen {
return nil
}
for {
log.Infof("Got message: %+v", message.Command())
if message.Command() == "ping" {
isOpen := outgoingRoute.Enqueue(wire.NewMsgPong(666))
if !isOpen {
return nil
}
}
}
})
addFlow("SendPings", router, []string{wire.CmdPong}, &stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ping.SendPings(incomingRoute, outgoingRoute, peer)
},
)
err := <-stop
return err