diff --git a/rafthttp/probing_status.go b/rafthttp/probing_status.go index c7a3c7ab9..109a0aea0 100644 --- a/rafthttp/probing_status.go +++ b/rafthttp/probing_status.go @@ -17,6 +17,7 @@ package rafthttp import ( "time" + "github.com/prometheus/client_golang/prometheus" "github.com/xiang90/probing" ) @@ -28,7 +29,15 @@ var ( statusErrorInterval = 5 * time.Second ) -func addPeerToProber(p probing.Prober, id string, us []string) { +const ( + // RoundTripperNameRaftMessage is the name of round-tripper that sends + // all other Raft messages, other than "snap.Message". + RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE" + // RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message. + RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT" +) + +func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { hus := make([]string, len(us)) for i := range us { hus[i] = us[i] + ProbingPrefix @@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) { if err != nil { plog.Errorf("failed to add peer %s into prober", id) } else { - go monitorProbingStatus(s, id) + go monitorProbingStatus(s, id, roundTripperName, rttSecProm) } } -func monitorProbingStatus(s probing.Status, id string) { +func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { // set the first interval short to log error early. interval := statusErrorInterval for { select { case <-time.After(interval): if !s.Health() { - plog.Warningf("health check for peer %s could not connect: %v", id, s.Err()) + plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName) interval = statusErrorInterval } else { interval = statusMonitoringInterval } if s.ClockDiff() > time.Second { - plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) + plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName) } - rtts.WithLabelValues(id).Observe(s.SRTT().Seconds()) + rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds()) case <-s.StopNotify(): return } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index f96149aa3..d26b280b0 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -26,6 +26,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" + "github.com/coreos/pkg/capnslog" "github.com/xiang90/probing" "golang.org/x/net/context" @@ -121,7 +122,8 @@ type Transport struct { remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map - prober probing.Prober + pipelineProber probing.Prober + streamProber probing.Prober } func (t *Transport) Start() error { @@ -136,7 +138,8 @@ func (t *Transport) Start() error { } t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) - t.prober = probing.NewProber(t.pipelineRt) + t.pipelineProber = probing.NewProber(t.pipelineRt) + t.streamProber = probing.NewProber(t.streamRt) return nil } @@ -197,7 +200,8 @@ func (t *Transport) Stop() { for _, p := range t.peers { p.stop() } - t.prober.RemoveAll() + t.pipelineProber.RemoveAll() + t.streamProber.RemoveAll() if tr, ok := t.streamRt.(*http.Transport); ok { tr.CloseIdleConnections() } @@ -276,8 +280,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) { } fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) - addPeerToProber(t.prober, id.String(), us) - + addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts) + addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts) plog.Infof("added peer %s", id) } @@ -304,7 +308,8 @@ func (t *Transport) removePeer(id types.ID) { } delete(t.peers, id) delete(t.LeaderStats.Followers, id.String()) - t.prober.Remove(id.String()) + t.pipelineProber.Remove(id.String()) + t.streamProber.Remove(id.String()) plog.Infof("removed peer %s", id) } @@ -321,8 +326,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { } t.peers[id].update(urls) - t.prober.Remove(id.String()) - addPeerToProber(t.prober, id.String(), us) + t.pipelineProber.Remove(id.String()) + addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts) + t.streamProber.Remove(id.String()) + addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts) plog.Infof("updated peer %s", id) } diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index e4cf37154..4a82a97a4 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -33,8 +33,10 @@ func TestTransportSend(t *testing.T) { peer1 := newFakePeer() peer2 := newFakePeer() tr := &Transport{ - ServerStats: stats.NewServerStats("", ""), - peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2}, + ServerStats: stats.NewServerStats("", ""), + peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2}, + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } wmsgsIgnored := []raftpb.Message{ // bad local message @@ -68,8 +70,10 @@ func TestTransportCutMend(t *testing.T) { peer1 := newFakePeer() peer2 := newFakePeer() tr := &Transport{ - ServerStats: stats.NewServerStats("", ""), - peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2}, + ServerStats: stats.NewServerStats("", ""), + peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2}, + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.CutPeer(types.ID(1)) @@ -96,10 +100,11 @@ func TestTransportCutMend(t *testing.T) { func TestTransportAdd(t *testing.T) { ls := stats.NewLeaderStats("") tr := &Transport{ - LeaderStats: ls, - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: ls, + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) @@ -124,10 +129,11 @@ func TestTransportAdd(t *testing.T) { func TestTransportRemove(t *testing.T) { tr := &Transport{ - LeaderStats: stats.NewLeaderStats(""), - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: stats.NewLeaderStats(""), + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) tr.RemovePeer(types.ID(1)) @@ -141,8 +147,9 @@ func TestTransportRemove(t *testing.T) { func TestTransportUpdate(t *testing.T) { peer := newFakePeer() tr := &Transport{ - peers: map[types.ID]Peer{types.ID(1): peer}, - prober: probing.NewProber(nil), + peers: map[types.ID]Peer{types.ID(1): peer}, + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } u := "http://localhost:2380" tr.UpdatePeer(types.ID(1), []string{u}) @@ -155,13 +162,14 @@ func TestTransportUpdate(t *testing.T) { func TestTransportErrorc(t *testing.T) { errorc := make(chan error, 1) tr := &Transport{ - Raft: &fakeRaft{}, - LeaderStats: stats.NewLeaderStats(""), - ErrorC: errorc, - streamRt: newRespRoundTripper(http.StatusForbidden, nil), - pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + Raft: &fakeRaft{}, + LeaderStats: stats.NewLeaderStats(""), + ErrorC: errorc, + streamRt: newRespRoundTripper(http.StatusForbidden, nil), + pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) defer tr.Stop()