rafthttp: probe all raft transports

This PR adds another probing routine to monitor the connection
for Raft message transports. Previously, we only monitored
snapshot transports.

In our production cluster, we found one TCP connection had >8-sec
latencies to a remote peer, but "etcd_network_peer_round_trip_time_seconds"
metrics shows <1-sec latency distribution, which means etcd server
was not sampling enough while such latency spikes happen
outside of snapshot pipeline connection.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
Gyuho Lee 2018-10-09 18:16:08 -07:00
parent 86fdbdc7f9
commit b6d11019e0
3 changed files with 59 additions and 35 deletions

View File

@ -17,6 +17,7 @@ package rafthttp
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/xiang90/probing"
)
@ -28,7 +29,15 @@ var (
statusErrorInterval = 5 * time.Second
)
func addPeerToProber(p probing.Prober, id string, us []string) {
const (
// RoundTripperNameRaftMessage is the name of round-tripper that sends
// all other Raft messages, other than "snap.Message".
RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
)
func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
hus := make([]string, len(us))
for i := range us {
hus[i] = us[i] + ProbingPrefix
@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
if err != nil {
plog.Errorf("failed to add peer %s into prober", id)
} else {
go monitorProbingStatus(s, id)
go monitorProbingStatus(s, id, roundTripperName, rttSecProm)
}
}
func monitorProbingStatus(s probing.Status, id string) {
func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
// set the first interval short to log error early.
interval := statusErrorInterval
for {
select {
case <-time.After(interval):
if !s.Health() {
plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName)
interval = statusErrorInterval
} else {
interval = statusMonitoringInterval
}
if s.ClockDiff() > time.Second {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName)
}
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
case <-s.StopNotify():
return
}

View File

@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/pkg/capnslog"
"github.com/xiang90/probing"
"golang.org/x/net/context"
@ -121,7 +122,8 @@ type Transport struct {
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
prober probing.Prober
pipelineProber probing.Prober
streamProber probing.Prober
}
func (t *Transport) Start() error {
@ -136,7 +138,8 @@ func (t *Transport) Start() error {
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)
t.pipelineProber = probing.NewProber(t.pipelineRt)
t.streamProber = probing.NewProber(t.streamRt)
return nil
}
@ -197,7 +200,8 @@ func (t *Transport) Stop() {
for _, p := range t.peers {
p.stop()
}
t.prober.RemoveAll()
t.pipelineProber.RemoveAll()
t.streamProber.RemoveAll()
if tr, ok := t.streamRt.(*http.Transport); ok {
tr.CloseIdleConnections()
}
@ -276,8 +280,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, id, fs)
addPeerToProber(t.prober, id.String(), us)
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("added peer %s", id)
}
@ -304,7 +308,8 @@ func (t *Transport) removePeer(id types.ID) {
}
delete(t.peers, id)
delete(t.LeaderStats.Followers, id.String())
t.prober.Remove(id.String())
t.pipelineProber.Remove(id.String())
t.streamProber.Remove(id.String())
plog.Infof("removed peer %s", id)
}
@ -321,8 +326,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
}
t.peers[id].update(urls)
t.prober.Remove(id.String())
addPeerToProber(t.prober, id.String(), us)
t.pipelineProber.Remove(id.String())
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
t.streamProber.Remove(id.String())
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("updated peer %s", id)
}

View File

@ -33,8 +33,10 @@ func TestTransportSend(t *testing.T) {
peer1 := newFakePeer()
peer2 := newFakePeer()
tr := &Transport{
ServerStats: stats.NewServerStats("", ""),
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
ServerStats: stats.NewServerStats("", ""),
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
wmsgsIgnored := []raftpb.Message{
// bad local message
@ -68,8 +70,10 @@ func TestTransportCutMend(t *testing.T) {
peer1 := newFakePeer()
peer2 := newFakePeer()
tr := &Transport{
ServerStats: stats.NewServerStats("", ""),
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
ServerStats: stats.NewServerStats("", ""),
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.CutPeer(types.ID(1))
@ -96,10 +100,11 @@ func TestTransportCutMend(t *testing.T) {
func TestTransportAdd(t *testing.T) {
ls := stats.NewLeaderStats("")
tr := &Transport{
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
@ -124,10 +129,11 @@ func TestTransportAdd(t *testing.T) {
func TestTransportRemove(t *testing.T) {
tr := &Transport{
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
tr.RemovePeer(types.ID(1))
@ -141,8 +147,9 @@ func TestTransportRemove(t *testing.T) {
func TestTransportUpdate(t *testing.T) {
peer := newFakePeer()
tr := &Transport{
peers: map[types.ID]Peer{types.ID(1): peer},
prober: probing.NewProber(nil),
peers: map[types.ID]Peer{types.ID(1): peer},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
u := "http://localhost:2380"
tr.UpdatePeer(types.ID(1), []string{u})
@ -155,13 +162,14 @@ func TestTransportUpdate(t *testing.T) {
func TestTransportErrorc(t *testing.T) {
errorc := make(chan error, 1)
tr := &Transport{
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
defer tr.Stop()