From b45f5306dc235b1214bcbf1cb7f7df2f39be928a Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 9 Oct 2018 18:16:08 -0700 Subject: [PATCH] rafthttp: probe all raft transports This PR adds another probing routine to monitor the connection for Raft message transports. Previously, we only monitored snapshot transports. In our production cluster, we found one TCP connection had >8-sec latencies to a remote peer, but "etcd_network_peer_round_trip_time_seconds" metrics shows <1-sec latency distribution, which means etcd server was not sampling enough while such latency spikes happen outside of snapshot pipeline connection. Signed-off-by: Gyuho Lee --- rafthttp/probing_status.go | 21 +++++++++++----- rafthttp/transport.go | 22 +++++++++++------ rafthttp/transport_test.go | 50 ++++++++++++++++++++++---------------- 3 files changed, 58 insertions(+), 35 deletions(-) diff --git a/rafthttp/probing_status.go b/rafthttp/probing_status.go index c7a3c7ab9..109a0aea0 100644 --- a/rafthttp/probing_status.go +++ b/rafthttp/probing_status.go @@ -17,6 +17,7 @@ package rafthttp import ( "time" + "github.com/prometheus/client_golang/prometheus" "github.com/xiang90/probing" ) @@ -28,7 +29,15 @@ var ( statusErrorInterval = 5 * time.Second ) -func addPeerToProber(p probing.Prober, id string, us []string) { +const ( + // RoundTripperNameRaftMessage is the name of round-tripper that sends + // all other Raft messages, other than "snap.Message". + RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE" + // RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message. + RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT" +) + +func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { hus := make([]string, len(us)) for i := range us { hus[i] = us[i] + ProbingPrefix @@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) { if err != nil { plog.Errorf("failed to add peer %s into prober", id) } else { - go monitorProbingStatus(s, id) + go monitorProbingStatus(s, id, roundTripperName, rttSecProm) } } -func monitorProbingStatus(s probing.Status, id string) { +func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { // set the first interval short to log error early. interval := statusErrorInterval for { select { case <-time.After(interval): if !s.Health() { - plog.Warningf("health check for peer %s could not connect: %v", id, s.Err()) + plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName) interval = statusErrorInterval } else { interval = statusMonitoringInterval } if s.ClockDiff() > time.Second { - plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) + plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName) } - rtts.WithLabelValues(id).Observe(s.SRTT().Seconds()) + rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds()) case <-s.StopNotify(): return } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index ce7343302..16e854c8a 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -127,7 +127,8 @@ type Transport struct { remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map - prober probing.Prober + pipelineProber probing.Prober + streamProber probing.Prober } func (t *Transport) Start() error { @@ -142,7 +143,8 @@ func (t *Transport) Start() error { } t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) - t.prober = probing.NewProber(t.pipelineRt) + t.pipelineProber = probing.NewProber(t.pipelineRt) + t.streamProber = probing.NewProber(t.streamRt) // If client didn't provide dial retry frequency, use the default // (100ms backoff between attempts to create a new stream), @@ -210,7 +212,8 @@ func (t *Transport) Stop() { for _, p := range t.peers { p.stop() } - t.prober.RemoveAll() + t.pipelineProber.RemoveAll() + t.streamProber.RemoveAll() if tr, ok := t.streamRt.(*http.Transport); ok { tr.CloseIdleConnections() } @@ -289,8 +292,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) { } fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) - addPeerToProber(t.prober, id.String(), us) - + addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts) + addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts) plog.Infof("added peer %s", id) } @@ -317,7 +320,8 @@ func (t *Transport) removePeer(id types.ID) { } delete(t.peers, id) delete(t.LeaderStats.Followers, id.String()) - t.prober.Remove(id.String()) + t.pipelineProber.Remove(id.String()) + t.streamProber.Remove(id.String()) plog.Infof("removed peer %s", id) } @@ -334,8 +338,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { } t.peers[id].update(urls) - t.prober.Remove(id.String()) - addPeerToProber(t.prober, id.String(), us) + t.pipelineProber.Remove(id.String()) + addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts) + t.streamProber.Remove(id.String()) + addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts) plog.Infof("updated peer %s", id) } diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index e4cf37154..4a82a97a4 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -33,8 +33,10 @@ func TestTransportSend(t *testing.T) { peer1 := newFakePeer() peer2 := newFakePeer() tr := &Transport{ - ServerStats: stats.NewServerStats("", ""), - peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2}, + ServerStats: stats.NewServerStats("", ""), + peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2}, + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } wmsgsIgnored := []raftpb.Message{ // bad local message @@ -68,8 +70,10 @@ func TestTransportCutMend(t *testing.T) { peer1 := newFakePeer() peer2 := newFakePeer() tr := &Transport{ - ServerStats: stats.NewServerStats("", ""), - peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2}, + ServerStats: stats.NewServerStats("", ""), + peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2}, + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.CutPeer(types.ID(1)) @@ -96,10 +100,11 @@ func TestTransportCutMend(t *testing.T) { func TestTransportAdd(t *testing.T) { ls := stats.NewLeaderStats("") tr := &Transport{ - LeaderStats: ls, - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: ls, + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) @@ -124,10 +129,11 @@ func TestTransportAdd(t *testing.T) { func TestTransportRemove(t *testing.T) { tr := &Transport{ - LeaderStats: stats.NewLeaderStats(""), - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: stats.NewLeaderStats(""), + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) tr.RemovePeer(types.ID(1)) @@ -141,8 +147,9 @@ func TestTransportRemove(t *testing.T) { func TestTransportUpdate(t *testing.T) { peer := newFakePeer() tr := &Transport{ - peers: map[types.ID]Peer{types.ID(1): peer}, - prober: probing.NewProber(nil), + peers: map[types.ID]Peer{types.ID(1): peer}, + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } u := "http://localhost:2380" tr.UpdatePeer(types.ID(1), []string{u}) @@ -155,13 +162,14 @@ func TestTransportUpdate(t *testing.T) { func TestTransportErrorc(t *testing.T) { errorc := make(chan error, 1) tr := &Transport{ - Raft: &fakeRaft{}, - LeaderStats: stats.NewLeaderStats(""), - ErrorC: errorc, - streamRt: newRespRoundTripper(http.StatusForbidden, nil), - pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + Raft: &fakeRaft{}, + LeaderStats: stats.NewLeaderStats(""), + ErrorC: errorc, + streamRt: newRespRoundTripper(http.StatusForbidden, nil), + pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) defer tr.Stop()