*: simplify network metrics

This commit is contained in:
Xiang Li 2016-05-06 18:02:55 -07:00
parent 34fbec118a
commit 98dbdd5fbb
10 changed files with 82 additions and 85 deletions

View File

@ -32,6 +32,22 @@ is totally unavailable.
`leader_changes_seen_total` counts the number of leader changes the member has seen since its start. Rapid leadership changes impact the performance of etcd significantly. It also signals that the leader is unstable, perhaps due to network connectivity issues or excessive load hitting the etcd cluster. `leader_changes_seen_total` counts the number of leader changes the member has seen since its start. Rapid leadership changes impact the performance of etcd significantly. It also signals that the leader is unstable, perhaps due to network connectivity issues or excessive load hitting the etcd cluster.
### network
These metrics describe the status of the network.
All these metrics are prefixed with `etcd_network_`
| Name | Description | Type |
|---------------------------|--------------------------------------------------------------------|---------------|
| sent_bytes_total | The total number of bytes sent to the member with ID `TO`. | Counter(To) |
| received_bytes_total | The total number of bytes received from the member with ID `From`. | Counter(From) |
| round_trip_time_seconds | Round-Trip-Time histogram between members. | Histogram(To) |
`sent_bytes_total` counts the total number of bytes sent to a specific member. Usually the leader member sends more data than other members since it is responsible for transmitting replicated data.
`received_bytes_total` counts the total number of bytes received from a specific member. Usually follower members receive data only from the leader member.
### gRPC requests ### gRPC requests
These metrics describe the requests served by a specific etcd member: total received requests, total failed requests, and processing latency. They are useful for tracking user-generated traffic hitting the etcd cluster. These metrics describe the requests served by a specific etcd member: total received requests, total failed requests, and processing latency. They are useful for tracking user-generated traffic hitting the etcd cluster.
@ -94,24 +110,6 @@ Abnormally high fsync duration (`fsync_duration_seconds`) indicates disk issues
Abnormally high snapshot duration (`snapshot_save_total_duration_seconds`) indicates disk issues and might cause the cluster to be unstable. Abnormally high snapshot duration (`snapshot_save_total_duration_seconds`) indicates disk issues and might cause the cluster to be unstable.
### rafthttp
| Name | Description | Type | Labels |
|-----------------------------------|--------------------------------------------|--------------|--------------------------------|
| message_sent_latency_seconds | The latency distributions of messages sent | HistogramVec | sendingType, msgType, remoteID |
| message_sent_failed_total | The total number of failed messages sent | Summary | sendingType, msgType, remoteID |
Abnormally high message duration (`message_sent_latency_seconds`) indicates network issues and might cause the cluster to be unstable.
An increase in message failures (`message_sent_failed_total`) indicates more severe network issues and might cause the cluster to be unstable.
Label `sendingType` is the connection type to send messages. `message`, `msgapp` and `msgappv2` use HTTP streaming, while `pipeline` does HTTP request for each message.
Label `msgType` is the type of raft message. `MsgApp` is log replication messages; `MsgSnap` is snapshot install messages; `MsgProp` is proposal forward messages; the others maintain internal raft status. Given large snapshots, a lengthy msgSnap transmission latency should be expected. For other types of messages, given enough network bandwidth, latencies comparable to ping latency should be expected.
Label `remoteID` is the member ID of the message destination.
## Prometheus supplied metrics ## Prometheus supplied metrics
The Prometheus client library provides a number of metrics under the `go` and `process` namespaces. There are a few that are particlarly interesting. The Prometheus client library provides a number of metrics under the `go` and `process` namespaces. There are a few that are particlarly interesting.

View File

@ -39,8 +39,9 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64,
plog.Panicf("store save should never fail: %v", err) plog.Panicf("store save should never fail: %v", err)
} }
dbsnap := s.be.Snapshot()
// get a snapshot of v3 KV as readCloser // get a snapshot of v3 KV as readCloser
rc := newSnapshotReaderCloser(s.be.Snapshot()) rc := newSnapshotReaderCloser(dbsnap)
// put the []byte snapshot of store into raft snapshot and return the merged snapshot with // put the []byte snapshot of store into raft snapshot and return the merged snapshot with
// KV readCloser snapshot. // KV readCloser snapshot.
@ -54,7 +55,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64,
} }
m.Snapshot = snapshot m.Snapshot = snapshot
return *snap.NewMessage(m, rc) return *snap.NewMessage(m, rc, dbsnap.Size())
} }
func newSnapshotReaderCloser(snapshot backend.Snapshot) io.ReadCloser { func newSnapshotReaderCloser(snapshot backend.Snapshot) io.ReadCloser {

View File

@ -106,12 +106,16 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error reading raft message", http.StatusBadRequest) http.Error(w, "error reading raft message", http.StatusBadRequest)
return return
} }
var m raftpb.Message var m raftpb.Message
if err := m.Unmarshal(b); err != nil { if err := m.Unmarshal(b); err != nil {
plog.Errorf("failed to unmarshal raft message (%v)", err) plog.Errorf("failed to unmarshal raft message (%v)", err)
http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
return return
} }
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
if err := h.r.Process(context.TODO(), m); err != nil { if err := h.r.Process(context.TODO(), m); err != nil {
switch v := err.(type) { switch v := err.(type) {
case writerToResponse: case writerToResponse:
@ -181,6 +185,9 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, msg, http.StatusBadRequest) http.Error(w, msg, http.StatusBadRequest)
return return
} }
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
if m.Type != raftpb.MsgSnap { if m.Type != raftpb.MsgSnap {
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
http.Error(w, "wrong raft message type", http.StatusBadRequest) http.Error(w, "wrong raft message type", http.StatusBadRequest)
@ -189,13 +196,15 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From)) plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
// save incoming database snapshot. // save incoming database snapshot.
if err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil { if n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err) msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
plog.Error(msg) plog.Error(msg)
http.Error(w, msg, http.StatusInternalServerError) http.Error(w, msg, http.StatusInternalServerError)
return return
} else {
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
} }
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
if err := h.r.Process(context.TODO(), m); err != nil { if err := h.r.Process(context.TODO(), m); err != nil {
switch v := err.(type) { switch v := err.(type) {

View File

@ -14,57 +14,41 @@
package rafthttp package rafthttp
import ( import "github.com/prometheus/client_golang/prometheus"
"time"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/prometheus/client_golang/prometheus"
)
// TODO: record write/recv failures.
var ( var (
// TODO: create a separate histogram for recording sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
// snapshot sending metric. snapshot can be large and Namespace: "etcd",
// take a long time to send. So it needs a different Subsystem: "network",
// time range than other type of messages. Name: "sent_bytes_total",
msgSentDuration = prometheus.NewHistogramVec( Help: "The total number of bytes sent.",
prometheus.HistogramOpts{ },
Namespace: "etcd_debugging", []string{"To"},
Subsystem: "rafthttp",
Name: "message_sent_latency_seconds",
Help: "message sent latency distributions.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
},
[]string{"sendingType", "remoteID", "msgType"},
) )
msgSentFailed = prometheus.NewCounterVec(prometheus.CounterOpts{ receivedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd_debugging", Namespace: "etcd",
Subsystem: "rafthttp", Subsystem: "network",
Name: "message_sent_failed_total", Name: "received_bytes_total",
Help: "The total number of failed messages sent.", Help: "The total number of bytes received.",
}, },
[]string{"sendingType", "remoteID", "msgType"}, []string{"From"},
)
rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "round_trip_time_seconds",
Help: "Round-Trip-Time histogram between members.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 14),
},
[]string{"To"},
) )
) )
func init() { func init() {
prometheus.MustRegister(msgSentDuration) prometheus.MustRegister(sentBytes)
prometheus.MustRegister(msgSentFailed) prometheus.MustRegister(receivedBytes)
} prometheus.MustRegister(rtts)
func reportSentDuration(sendingType string, m raftpb.Message, duration time.Duration) {
typ := m.Type.String()
if isLinkHeartbeatMessage(m) {
typ = "MsgLinkHeartbeat"
}
msgSentDuration.WithLabelValues(sendingType, types.ID(m.To).String(), typ).Observe(float64(duration) / float64(time.Second))
}
func reportSentFailure(sendingType string, m raftpb.Message) {
typ := m.Type.String()
if isLinkHeartbeatMessage(m) {
typ = "MsgLinkHeartbeat"
}
msgSentFailed.WithLabelValues(sendingType, types.ID(m.To).String(), typ).Inc()
} }

View File

@ -96,7 +96,6 @@ func (p *pipeline) handle() {
if err != nil { if err != nil {
p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error()) p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
reportSentFailure(pipelineMsg, m)
if m.Type == raftpb.MsgApp && p.fs != nil { if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Fail() p.fs.Fail()
} }
@ -114,7 +113,7 @@ func (p *pipeline) handle() {
if isMsgSnap(m) { if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFinish) p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
} }
reportSentDuration(pipelineMsg, m, time.Since(start)) sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
case <-p.stopc: case <-p.stopc:
return return
} }

View File

@ -53,6 +53,7 @@ func monitorProbingStatus(s probing.Status, id string) {
if s.ClockDiff() > time.Second { if s.ClockDiff() > time.Second {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
} }
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
case <-s.StopNotify(): case <-s.StopNotify():
return return
} }

View File

@ -65,8 +65,6 @@ func (s *snapshotSender) stop() { close(s.stopc) }
func (s *snapshotSender) send(merged snap.Message) { func (s *snapshotSender) send(merged snap.Message) {
m := merged.Message m := merged.Message
start := time.Now()
body := createSnapBody(merged) body := createSnapBody(merged)
defer body.Close() defer body.Close()
@ -87,7 +85,6 @@ func (s *snapshotSender) send(merged snap.Message) {
} }
s.picker.unreachable(u) s.picker.unreachable(u)
reportSentFailure(sendSnap, m)
s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error()) s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
s.r.ReportUnreachable(m.To) s.r.ReportUnreachable(m.To)
// report SnapshotFailure to raft state machine. After raft state // report SnapshotFailure to raft state machine. After raft state
@ -96,10 +93,11 @@ func (s *snapshotSender) send(merged snap.Message) {
s.r.ReportSnapshot(m.To, raft.SnapshotFailure) s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
return return
} }
reportSentDuration(sendSnap, m, time.Since(start))
s.status.activate() s.status.activate()
s.r.ReportSnapshot(m.To, raft.SnapshotFinish) s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To)) plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
} }
// post posts the given request. // post posts the given request.

View File

@ -139,40 +139,42 @@ func (cw *streamWriter) run() {
batched int batched int
) )
tickc := time.Tick(ConnReadTimeout / 3) tickc := time.Tick(ConnReadTimeout / 3)
unflushed := 0
for { for {
select { select {
case <-heartbeatc: case <-heartbeatc:
start := time.Now()
err := enc.encode(linkHeartbeatMessage) err := enc.encode(linkHeartbeatMessage)
unflushed += linkHeartbeatMessage.Size()
if err == nil { if err == nil {
flusher.Flush() flusher.Flush()
batched = 0 batched = 0
reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start)) sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed))
unflushed = 0
continue continue
} }
reportSentFailure(string(t), linkHeartbeatMessage)
cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
cw.close() cw.close()
heartbeatc, msgc = nil, nil heartbeatc, msgc = nil, nil
case m := <-msgc: case m := <-msgc:
start := time.Now()
err := enc.encode(m) err := enc.encode(m)
if err == nil { if err == nil {
unflushed += m.Size()
if len(msgc) == 0 || batched > streamBufSize/2 { if len(msgc) == 0 || batched > streamBufSize/2 {
flusher.Flush() flusher.Flush()
sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed))
unflushed = 0
batched = 0 batched = 0
} else { } else {
batched++ batched++
} }
reportSentDuration(string(t), m, time.Since(start))
continue continue
} }
reportSentFailure(string(t), m)
cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
cw.close() cw.close()
heartbeatc, msgc = nil, nil heartbeatc, msgc = nil, nil
@ -190,6 +192,7 @@ func (cw *streamWriter) run() {
plog.Panicf("unhandled stream type %s", conn.t) plog.Panicf("unhandled stream type %s", conn.t)
} }
flusher = conn.Flusher flusher = conn.Flusher
unflushed = 0
cw.mu.Lock() cw.mu.Lock()
cw.status.activate() cw.status.activate()
cw.closer = conn.Closer cw.closer = conn.Closer
@ -332,6 +335,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
return err return err
} }
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
cr.mu.Lock() cr.mu.Lock()
paused := cr.paused paused := cr.paused
cr.mu.Unlock() cr.mu.Unlock()

View File

@ -26,10 +26,10 @@ import (
// SaveDBFrom saves snapshot of the database from the given reader. It // SaveDBFrom saves snapshot of the database from the given reader. It
// guarantees the save operation is atomic. // guarantees the save operation is atomic.
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) error { func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
f, err := ioutil.TempFile(s.dir, "tmp") f, err := ioutil.TempFile(s.dir, "tmp")
if err != nil { if err != nil {
return err return 0, err
} }
var n int64 var n int64
n, err = io.Copy(f, r) n, err = io.Copy(f, r)
@ -39,22 +39,22 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) error {
f.Close() f.Close()
if err != nil { if err != nil {
os.Remove(f.Name()) os.Remove(f.Name())
return err return n, err
} }
fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id)) fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
if fileutil.Exist(fn) { if fileutil.Exist(fn) {
os.Remove(f.Name()) os.Remove(f.Name())
return nil return n, nil
} }
err = os.Rename(f.Name(), fn) err = os.Rename(f.Name(), fn)
if err != nil { if err != nil {
os.Remove(f.Name()) os.Remove(f.Name())
return err return n, err
} }
plog.Infof("saved database snapshot to disk [total bytes: %d]", n) plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
return nil return n, nil
} }
// DBFilePath returns the file path for the snapshot of the database with // DBFilePath returns the file path for the snapshot of the database with

View File

@ -31,13 +31,15 @@ import (
type Message struct { type Message struct {
raftpb.Message raftpb.Message
ReadCloser io.ReadCloser ReadCloser io.ReadCloser
TotalSize int64
closeC chan bool closeC chan bool
} }
func NewMessage(rs raftpb.Message, rc io.ReadCloser) *Message { func NewMessage(rs raftpb.Message, rc io.ReadCloser, rcSize int64) *Message {
return &Message{ return &Message{
Message: rs, Message: rs,
ReadCloser: rc, ReadCloser: rc,
TotalSize: int64(rs.Size()) + rcSize,
closeC: make(chan bool, 1), closeC: make(chan bool, 1),
} }
} }