diff --git a/Documentation/metrics.md b/Documentation/metrics.md index 527ef53ec..205c180c4 100644 --- a/Documentation/metrics.md +++ b/Documentation/metrics.md @@ -70,6 +70,8 @@ All these metrics are prefixed with `etcd_network_` |---------------------------|--------------------------------------------------------------------|---------------| | peer_sent_bytes_total | The total number of bytes sent to the peer with ID `To`. | Counter(To) | | peer_received_bytes_total | The total number of bytes received from the peer with ID `From`. | Counter(From) | +| peer_sent_failures_total | The total number of send failures from the peer with ID `To`. | Counter(To) | +| peer_received_failures_total | The total number of receive failures from the peer with ID `From`. | Counter(From) | | peer_round_trip_time_seconds | Round-Trip-Time histogram between peers. | Histogram(To) | | client_grpc_sent_bytes_total | The total number of bytes sent to grpc clients. | Counter | | client_grpc_received_bytes_total| The total number of bytes received to grpc clients. | Counter | diff --git a/rafthttp/http.go b/rafthttp/http.go index 05f246113..471028a61 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -104,6 +104,7 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { plog.Errorf("failed to read raft message (%v)", err) http.Error(w, "error reading raft message", http.StatusBadRequest) + recvFailures.WithLabelValues(r.RemoteAddr).Inc() return } @@ -111,6 +112,7 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err := m.Unmarshal(b); err != nil { plog.Errorf("failed to unmarshal raft message (%v)", err) http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) + recvFailures.WithLabelValues(r.RemoteAddr).Inc() return } @@ -186,6 +188,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { msg := fmt.Sprintf("failed to decode raft message (%v)", err) plog.Errorf(msg) http.Error(w, msg, http.StatusBadRequest) + recvFailures.WithLabelValues(r.RemoteAddr).Inc() return } diff --git a/rafthttp/metrics.go b/rafthttp/metrics.go index e9550c3bb..320bfe726 100644 --- a/rafthttp/metrics.go +++ b/rafthttp/metrics.go @@ -16,7 +16,6 @@ package rafthttp import "github.com/prometheus/client_golang/prometheus" -// TODO: record write/recv failures. var ( sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "etcd", @@ -36,6 +35,24 @@ var ( []string{"From"}, ) + sentFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "peer_sent_failures_total", + Help: "The total number of send failures from peers.", + }, + []string{"To"}, + ) + + recvFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "peer_received_failures_total", + Help: "The total number of receive failures from peers.", + }, + []string{"From"}, + ) + rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "etcd", Subsystem: "network", @@ -50,5 +67,7 @@ var ( func init() { prometheus.MustRegister(sentBytes) prometheus.MustRegister(receivedBytes) + prometheus.MustRegister(sentFailures) + prometheus.MustRegister(recvFailures) prometheus.MustRegister(rtts) } diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 1f07f3e49..ccd9eb786 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -93,6 +93,7 @@ func (p *pipeline) handle() { if isMsgSnap(m) { p.raft.ReportSnapshot(m.To, raft.SnapshotFailure) } + sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() continue } diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index 3d73ac809..105b33072 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -91,6 +91,7 @@ func (s *snapshotSender) send(merged snap.Message) { // machine knows about it, it would pause a while and retry sending // new snapshot message. s.r.ReportSnapshot(m.To, raft.SnapshotFailure) + sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() return } s.status.activate() diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 875df82a4..e7109a023 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -158,6 +158,7 @@ func (cw *streamWriter) run() { cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) + sentFailures.WithLabelValues(cw.peerID.String()).Inc() cw.close() plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = nil, nil @@ -184,6 +185,7 @@ func (cw *streamWriter) run() { plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = nil, nil cw.r.ReportUnreachable(m.To) + sentFailures.WithLabelValues(cw.peerID.String()).Inc() case conn := <-cw.connc: cw.mu.Lock() @@ -388,6 +390,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From)) } plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) + recvFailures.WithLabelValues(types.ID(m.From).String()).Inc() } } }