From 98dbdd5fbb43d244964873d8cd2eb8d5e3fd1f48 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 6 May 2016 18:02:55 -0700 Subject: [PATCH] *: simplify network metrics --- Documentation/metrics.md | 34 ++++++++--------- etcdserver/snapshot_merge.go | 5 ++- rafthttp/http.go | 13 ++++++- rafthttp/metrics.go | 72 ++++++++++++++---------------------- rafthttp/pipeline.go | 3 +- rafthttp/probing_status.go | 1 + rafthttp/snapshot_sender.go | 6 +-- rafthttp/stream.go | 17 ++++++--- snap/db.go | 12 +++--- snap/message.go | 4 +- 10 files changed, 82 insertions(+), 85 deletions(-) diff --git a/Documentation/metrics.md b/Documentation/metrics.md index 529268bcd..3988146db 100644 --- a/Documentation/metrics.md +++ b/Documentation/metrics.md @@ -32,6 +32,22 @@ is totally unavailable. `leader_changes_seen_total` counts the number of leader changes the member has seen since its start. Rapid leadership changes impact the performance of etcd significantly. It also signals that the leader is unstable, perhaps due to network connectivity issues or excessive load hitting the etcd cluster. +### network + +These metrics describe the status of the network. + +All these metrics are prefixed with `etcd_network_` + +| Name | Description | Type | +|---------------------------|--------------------------------------------------------------------|---------------| +| sent_bytes_total | The total number of bytes sent to the member with ID `TO`. | Counter(To) | +| received_bytes_total | The total number of bytes received from the member with ID `From`. | Counter(From) | +| round_trip_time_seconds | Round-Trip-Time histogram between members. | Histogram(To) | + +`sent_bytes_total` counts the total number of bytes sent to a specific member. Usually the leader member sends more data than other members since it is responsible for transmitting replicated data. + +`received_bytes_total` counts the total number of bytes received from a specific member. Usually follower members receive data only from the leader member. + ### gRPC requests These metrics describe the requests served by a specific etcd member: total received requests, total failed requests, and processing latency. They are useful for tracking user-generated traffic hitting the etcd cluster. @@ -94,24 +110,6 @@ Abnormally high fsync duration (`fsync_duration_seconds`) indicates disk issues Abnormally high snapshot duration (`snapshot_save_total_duration_seconds`) indicates disk issues and might cause the cluster to be unstable. -### rafthttp - -| Name | Description | Type | Labels | -|-----------------------------------|--------------------------------------------|--------------|--------------------------------| -| message_sent_latency_seconds | The latency distributions of messages sent | HistogramVec | sendingType, msgType, remoteID | -| message_sent_failed_total | The total number of failed messages sent | Summary | sendingType, msgType, remoteID | - - -Abnormally high message duration (`message_sent_latency_seconds`) indicates network issues and might cause the cluster to be unstable. - -An increase in message failures (`message_sent_failed_total`) indicates more severe network issues and might cause the cluster to be unstable. - -Label `sendingType` is the connection type to send messages. `message`, `msgapp` and `msgappv2` use HTTP streaming, while `pipeline` does HTTP request for each message. - -Label `msgType` is the type of raft message. `MsgApp` is log replication messages; `MsgSnap` is snapshot install messages; `MsgProp` is proposal forward messages; the others maintain internal raft status. Given large snapshots, a lengthy msgSnap transmission latency should be expected. For other types of messages, given enough network bandwidth, latencies comparable to ping latency should be expected. - -Label `remoteID` is the member ID of the message destination. - ## Prometheus supplied metrics The Prometheus client library provides a number of metrics under the `go` and `process` namespaces. There are a few that are particlarly interesting. diff --git a/etcdserver/snapshot_merge.go b/etcdserver/snapshot_merge.go index c98463b2b..a567fd994 100644 --- a/etcdserver/snapshot_merge.go +++ b/etcdserver/snapshot_merge.go @@ -39,8 +39,9 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, plog.Panicf("store save should never fail: %v", err) } + dbsnap := s.be.Snapshot() // get a snapshot of v3 KV as readCloser - rc := newSnapshotReaderCloser(s.be.Snapshot()) + rc := newSnapshotReaderCloser(dbsnap) // put the []byte snapshot of store into raft snapshot and return the merged snapshot with // KV readCloser snapshot. @@ -54,7 +55,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, } m.Snapshot = snapshot - return *snap.NewMessage(m, rc) + return *snap.NewMessage(m, rc, dbsnap.Size()) } func newSnapshotReaderCloser(snapshot backend.Snapshot) io.ReadCloser { diff --git a/rafthttp/http.go b/rafthttp/http.go index 662b856b9..fd84d3113 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -106,12 +106,16 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error reading raft message", http.StatusBadRequest) return } + var m raftpb.Message if err := m.Unmarshal(b); err != nil { plog.Errorf("failed to unmarshal raft message (%v)", err) http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) return } + + receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b))) + if err := h.r.Process(context.TODO(), m); err != nil { switch v := err.(type) { case writerToResponse: @@ -181,6 +185,9 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, msg, http.StatusBadRequest) return } + + receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) + if m.Type != raftpb.MsgSnap { plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) http.Error(w, "wrong raft message type", http.StatusBadRequest) @@ -189,13 +196,15 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From)) // save incoming database snapshot. - if err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil { + if n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil { msg := fmt.Sprintf("failed to save KV snapshot (%v)", err) plog.Error(msg) http.Error(w, msg, http.StatusInternalServerError) return + } else { + receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n)) + plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From)) } - plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From)) if err := h.r.Process(context.TODO(), m); err != nil { switch v := err.(type) { diff --git a/rafthttp/metrics.go b/rafthttp/metrics.go index cb03de789..f93b4e5f9 100644 --- a/rafthttp/metrics.go +++ b/rafthttp/metrics.go @@ -14,57 +14,41 @@ package rafthttp -import ( - "time" - - "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft/raftpb" - "github.com/prometheus/client_golang/prometheus" -) +import "github.com/prometheus/client_golang/prometheus" +// TODO: record write/recv failures. var ( - // TODO: create a separate histogram for recording - // snapshot sending metric. snapshot can be large and - // take a long time to send. So it needs a different - // time range than other type of messages. - msgSentDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "etcd_debugging", - Subsystem: "rafthttp", - Name: "message_sent_latency_seconds", - Help: "message sent latency distributions.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), - }, - []string{"sendingType", "remoteID", "msgType"}, + sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "sent_bytes_total", + Help: "The total number of bytes sent.", + }, + []string{"To"}, ) - msgSentFailed = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "etcd_debugging", - Subsystem: "rafthttp", - Name: "message_sent_failed_total", - Help: "The total number of failed messages sent.", + receivedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "received_bytes_total", + Help: "The total number of bytes received.", }, - []string{"sendingType", "remoteID", "msgType"}, + []string{"From"}, + ) + + rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "round_trip_time_seconds", + Help: "Round-Trip-Time histogram between members.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 14), + }, + []string{"To"}, ) ) func init() { - prometheus.MustRegister(msgSentDuration) - prometheus.MustRegister(msgSentFailed) -} - -func reportSentDuration(sendingType string, m raftpb.Message, duration time.Duration) { - typ := m.Type.String() - if isLinkHeartbeatMessage(m) { - typ = "MsgLinkHeartbeat" - } - msgSentDuration.WithLabelValues(sendingType, types.ID(m.To).String(), typ).Observe(float64(duration) / float64(time.Second)) -} - -func reportSentFailure(sendingType string, m raftpb.Message) { - typ := m.Type.String() - if isLinkHeartbeatMessage(m) { - typ = "MsgLinkHeartbeat" - } - msgSentFailed.WithLabelValues(sendingType, types.ID(m.To).String(), typ).Inc() + prometheus.MustRegister(sentBytes) + prometheus.MustRegister(receivedBytes) + prometheus.MustRegister(rtts) } diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index b47dd5614..47e43de5d 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -96,7 +96,6 @@ func (p *pipeline) handle() { if err != nil { p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error()) - reportSentFailure(pipelineMsg, m) if m.Type == raftpb.MsgApp && p.fs != nil { p.fs.Fail() } @@ -114,7 +113,7 @@ func (p *pipeline) handle() { if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFinish) } - reportSentDuration(pipelineMsg, m, time.Since(start)) + sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size())) case <-p.stopc: return } diff --git a/rafthttp/probing_status.go b/rafthttp/probing_status.go index b3027a96f..5d27d1194 100644 --- a/rafthttp/probing_status.go +++ b/rafthttp/probing_status.go @@ -53,6 +53,7 @@ func monitorProbingStatus(s probing.Status, id string) { if s.ClockDiff() > time.Second { plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) } + rtts.WithLabelValues(id).Observe(s.SRTT().Seconds()) case <-s.StopNotify(): return } diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index a871b886a..aad63be99 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -65,8 +65,6 @@ func (s *snapshotSender) stop() { close(s.stopc) } func (s *snapshotSender) send(merged snap.Message) { m := merged.Message - start := time.Now() - body := createSnapBody(merged) defer body.Close() @@ -87,7 +85,6 @@ func (s *snapshotSender) send(merged snap.Message) { } s.picker.unreachable(u) - reportSentFailure(sendSnap, m) s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error()) s.r.ReportUnreachable(m.To) // report SnapshotFailure to raft state machine. After raft state @@ -96,10 +93,11 @@ func (s *snapshotSender) send(merged snap.Message) { s.r.ReportSnapshot(m.To, raft.SnapshotFailure) return } - reportSentDuration(sendSnap, m, time.Since(start)) s.status.activate() s.r.ReportSnapshot(m.To, raft.SnapshotFinish) plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To)) + + sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize)) } // post posts the given request. diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 1d84311a8..d4cab47bf 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -139,40 +139,42 @@ func (cw *streamWriter) run() { batched int ) tickc := time.Tick(ConnReadTimeout / 3) + unflushed := 0 for { select { case <-heartbeatc: - start := time.Now() err := enc.encode(linkHeartbeatMessage) + unflushed += linkHeartbeatMessage.Size() if err == nil { flusher.Flush() batched = 0 - reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start)) + sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed)) + unflushed = 0 continue } - reportSentFailure(string(t), linkHeartbeatMessage) cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) cw.close() heartbeatc, msgc = nil, nil case m := <-msgc: - start := time.Now() err := enc.encode(m) if err == nil { + unflushed += m.Size() + if len(msgc) == 0 || batched > streamBufSize/2 { flusher.Flush() + sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed)) + unflushed = 0 batched = 0 } else { batched++ } - reportSentDuration(string(t), m, time.Since(start)) continue } - reportSentFailure(string(t), m) cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.close() heartbeatc, msgc = nil, nil @@ -190,6 +192,7 @@ func (cw *streamWriter) run() { plog.Panicf("unhandled stream type %s", conn.t) } flusher = conn.Flusher + unflushed = 0 cw.mu.Lock() cw.status.activate() cw.closer = conn.Closer @@ -332,6 +335,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { return err } + receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) + cr.mu.Lock() paused := cr.paused cr.mu.Unlock() diff --git a/snap/db.go b/snap/db.go index 4a2083fe5..ed5f9a2a1 100644 --- a/snap/db.go +++ b/snap/db.go @@ -26,10 +26,10 @@ import ( // SaveDBFrom saves snapshot of the database from the given reader. It // guarantees the save operation is atomic. -func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) error { +func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) { f, err := ioutil.TempFile(s.dir, "tmp") if err != nil { - return err + return 0, err } var n int64 n, err = io.Copy(f, r) @@ -39,22 +39,22 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) error { f.Close() if err != nil { os.Remove(f.Name()) - return err + return n, err } fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id)) if fileutil.Exist(fn) { os.Remove(f.Name()) - return nil + return n, nil } err = os.Rename(f.Name(), fn) if err != nil { os.Remove(f.Name()) - return err + return n, err } plog.Infof("saved database snapshot to disk [total bytes: %d]", n) - return nil + return n, nil } // DBFilePath returns the file path for the snapshot of the database with diff --git a/snap/message.go b/snap/message.go index 2d2b21106..deb0160e2 100644 --- a/snap/message.go +++ b/snap/message.go @@ -31,13 +31,15 @@ import ( type Message struct { raftpb.Message ReadCloser io.ReadCloser + TotalSize int64 closeC chan bool } -func NewMessage(rs raftpb.Message, rc io.ReadCloser) *Message { +func NewMessage(rs raftpb.Message, rc io.ReadCloser, rcSize int64) *Message { return &Message{ Message: rs, ReadCloser: rc, + TotalSize: int64(rs.Size()) + rcSize, closeC: make(chan bool, 1), } }