mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver/api/rafthttp: add v3 snapshot send/receive metrics
Distribution would be: 0.1 second or more ... 25.6 seconds or more 51.2 seconds or more etcd_network_snapshot_send_success etcd_network_snapshot_send_failures etcd_network_snapshot_send_total_duration_seconds etcd_network_snapshot_receive_success etcd_network_snapshot_receive_failures etcd_network_snapshot_receive_total_duration_seconds Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
parent
c392cd20cf
commit
6f4c509ad8
@ -22,6 +22,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||||
@ -185,6 +186,8 @@ func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
|
||||||
|
|
||||||
// ServeHTTP serves HTTP request to receive and process snapshot message.
|
// ServeHTTP serves HTTP request to receive and process snapshot message.
|
||||||
//
|
//
|
||||||
// If request sender dies without closing underlying TCP connection,
|
// If request sender dies without closing underlying TCP connection,
|
||||||
@ -195,9 +198,12 @@ func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid
|
|||||||
// received and processed.
|
// received and processed.
|
||||||
// 2. this case should happen rarely, so no further optimization is done.
|
// 2. this case should happen rarely, so no further optimization is done.
|
||||||
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
if r.Method != "POST" {
|
if r.Method != "POST" {
|
||||||
w.Header().Set("Allow", "POST")
|
w.Header().Set("Allow", "POST")
|
||||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,6 +211,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
|
if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusPreconditionFailed)
|
http.Error(w, err.Error(), http.StatusPreconditionFailed)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,13 +220,14 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
dec := &messageDecoder{r: r.Body}
|
dec := &messageDecoder{r: r.Body}
|
||||||
// let snapshots be very large since they can exceed 512MB for large installations
|
// let snapshots be very large since they can exceed 512MB for large installations
|
||||||
m, err := dec.decodeLimit(uint64(1 << 63))
|
m, err := dec.decodeLimit(uint64(1 << 63))
|
||||||
|
from := types.ID(m.From).String()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
|
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
|
||||||
if h.lg != nil {
|
if h.lg != nil {
|
||||||
h.lg.Warn(
|
h.lg.Warn(
|
||||||
"failed to decode Raft message",
|
"failed to decode Raft message",
|
||||||
zap.String("local-member-id", h.localID.String()),
|
zap.String("local-member-id", h.localID.String()),
|
||||||
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
zap.String("remote-snapshot-sender-id", from),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
@ -227,24 +235,26 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
http.Error(w, msg, http.StatusBadRequest)
|
http.Error(w, msg, http.StatusBadRequest)
|
||||||
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
|
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
|
||||||
|
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
msgSize := m.Size()
|
msgSize := m.Size()
|
||||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(msgSize))
|
receivedBytes.WithLabelValues(from).Add(float64(msgSize))
|
||||||
|
|
||||||
if m.Type != raftpb.MsgSnap {
|
if m.Type != raftpb.MsgSnap {
|
||||||
if h.lg != nil {
|
if h.lg != nil {
|
||||||
h.lg.Warn(
|
h.lg.Warn(
|
||||||
"unexpected Raft message type",
|
"unexpected Raft message type",
|
||||||
zap.String("local-member-id", h.localID.String()),
|
zap.String("local-member-id", h.localID.String()),
|
||||||
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
zap.String("remote-snapshot-sender-id", from),
|
||||||
zap.String("message-type", m.Type.String()),
|
zap.String("message-type", m.Type.String()),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
|
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
|
||||||
}
|
}
|
||||||
http.Error(w, "wrong raft message type", http.StatusBadRequest)
|
http.Error(w, "wrong raft message type", http.StatusBadRequest)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -252,7 +262,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
h.lg.Info(
|
h.lg.Info(
|
||||||
"receiving database snapshot",
|
"receiving database snapshot",
|
||||||
zap.String("local-member-id", h.localID.String()),
|
zap.String("local-member-id", h.localID.String()),
|
||||||
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
zap.String("remote-snapshot-sender-id", from),
|
||||||
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.Int("incoming-snapshot-message-size-bytes", msgSize),
|
zap.Int("incoming-snapshot-message-size-bytes", msgSize),
|
||||||
zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
|
zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
|
||||||
@ -269,7 +279,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
h.lg.Warn(
|
h.lg.Warn(
|
||||||
"failed to save incoming database snapshot",
|
"failed to save incoming database snapshot",
|
||||||
zap.String("local-member-id", h.localID.String()),
|
zap.String("local-member-id", h.localID.String()),
|
||||||
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
zap.String("remote-snapshot-sender-id", from),
|
||||||
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
@ -277,16 +287,17 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
plog.Error(msg)
|
plog.Error(msg)
|
||||||
}
|
}
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
http.Error(w, msg, http.StatusInternalServerError)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
|
receivedBytes.WithLabelValues(from).Add(float64(n))
|
||||||
|
|
||||||
if h.lg != nil {
|
if h.lg != nil {
|
||||||
h.lg.Info(
|
h.lg.Info(
|
||||||
"received and saved database snapshot",
|
"received and saved database snapshot",
|
||||||
zap.String("local-member-id", h.localID.String()),
|
zap.String("local-member-id", h.localID.String()),
|
||||||
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
zap.String("remote-snapshot-sender-id", from),
|
||||||
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.Int64("incoming-snapshot-size-bytes", n),
|
zap.Int64("incoming-snapshot-size-bytes", n),
|
||||||
zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
|
zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
|
||||||
@ -307,13 +318,14 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
h.lg.Warn(
|
h.lg.Warn(
|
||||||
"failed to process Raft message",
|
"failed to process Raft message",
|
||||||
zap.String("local-member-id", h.localID.String()),
|
zap.String("local-member-id", h.localID.String()),
|
||||||
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
|
zap.String("remote-snapshot-sender-id", from),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
plog.Error(msg)
|
plog.Error(msg)
|
||||||
}
|
}
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
http.Error(w, msg, http.StatusInternalServerError)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -321,6 +333,9 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
// Write StatusNoContent header after the message has been processed by
|
// Write StatusNoContent header after the message has been processed by
|
||||||
// raft, which facilitates the client to report MsgSnap status.
|
// raft, which facilitates the client to report MsgSnap status.
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
|
||||||
|
snapshotReceive.WithLabelValues(from).Inc()
|
||||||
|
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamHandler struct {
|
type streamHandler struct {
|
||||||
|
@ -71,6 +71,68 @@ var (
|
|||||||
[]string{"From"},
|
[]string{"From"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_send_success",
|
||||||
|
Help: "Total number of successful snapshot sends",
|
||||||
|
},
|
||||||
|
[]string{"To"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_send_failures",
|
||||||
|
Help: "Total number of snapshot send failures",
|
||||||
|
},
|
||||||
|
[]string{"To"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_send_total_duration_seconds",
|
||||||
|
Help: "Total latency distributions of v3 snapshot sends",
|
||||||
|
|
||||||
|
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||||
|
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||||
|
},
|
||||||
|
[]string{"To"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_receive_success",
|
||||||
|
Help: "Total number of successful snapshot receives",
|
||||||
|
},
|
||||||
|
[]string{"From"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_receive_failures",
|
||||||
|
Help: "Total number of snapshot receive failures",
|
||||||
|
},
|
||||||
|
[]string{"From"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_receive_total_duration_seconds",
|
||||||
|
Help: "Total latency distributions of v3 snapshot receives",
|
||||||
|
|
||||||
|
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||||
|
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||||
|
},
|
||||||
|
[]string{"From"},
|
||||||
|
)
|
||||||
|
|
||||||
rttSec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
rttSec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
Namespace: "etcd",
|
Namespace: "etcd",
|
||||||
Subsystem: "network",
|
Subsystem: "network",
|
||||||
@ -92,5 +154,13 @@ func init() {
|
|||||||
prometheus.MustRegister(receivedBytes)
|
prometheus.MustRegister(receivedBytes)
|
||||||
prometheus.MustRegister(sentFailures)
|
prometheus.MustRegister(sentFailures)
|
||||||
prometheus.MustRegister(recvFailures)
|
prometheus.MustRegister(recvFailures)
|
||||||
|
|
||||||
|
prometheus.MustRegister(snapshotSend)
|
||||||
|
prometheus.MustRegister(snapshotSendFailures)
|
||||||
|
prometheus.MustRegister(snapshotSendSeconds)
|
||||||
|
prometheus.MustRegister(snapshotReceive)
|
||||||
|
prometheus.MustRegister(snapshotReceiveFailures)
|
||||||
|
prometheus.MustRegister(snapshotReceiveSeconds)
|
||||||
|
|
||||||
prometheus.MustRegister(rttSec)
|
prometheus.MustRegister(rttSec)
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
|
|||||||
func (s *snapshotSender) stop() { close(s.stopc) }
|
func (s *snapshotSender) stop() { close(s.stopc) }
|
||||||
|
|
||||||
func (s *snapshotSender) send(merged snap.Message) {
|
func (s *snapshotSender) send(merged snap.Message) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
m := merged.Message
|
m := merged.Message
|
||||||
|
to := types.ID(m.To).String()
|
||||||
|
|
||||||
body := createSnapBody(s.tr.Logger, merged)
|
body := createSnapBody(s.tr.Logger, merged)
|
||||||
defer body.Close()
|
defer body.Close()
|
||||||
@ -79,7 +82,7 @@ func (s *snapshotSender) send(merged snap.Message) {
|
|||||||
s.tr.Logger.Info(
|
s.tr.Logger.Info(
|
||||||
"sending database snapshot",
|
"sending database snapshot",
|
||||||
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.String("remote-peer-id", types.ID(m.To).String()),
|
zap.String("remote-peer-id", to),
|
||||||
zap.Int64("bytes", merged.TotalSize),
|
zap.Int64("bytes", merged.TotalSize),
|
||||||
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
||||||
)
|
)
|
||||||
@ -94,7 +97,7 @@ func (s *snapshotSender) send(merged snap.Message) {
|
|||||||
s.tr.Logger.Warn(
|
s.tr.Logger.Warn(
|
||||||
"failed to send database snapshot",
|
"failed to send database snapshot",
|
||||||
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.String("remote-peer-id", types.ID(m.To).String()),
|
zap.String("remote-peer-id", to),
|
||||||
zap.Int64("bytes", merged.TotalSize),
|
zap.Int64("bytes", merged.TotalSize),
|
||||||
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
@ -116,7 +119,8 @@ func (s *snapshotSender) send(merged snap.Message) {
|
|||||||
// machine knows about it, it would pause a while and retry sending
|
// machine knows about it, it would pause a while and retry sending
|
||||||
// new snapshot message.
|
// new snapshot message.
|
||||||
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
||||||
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
|
sentFailures.WithLabelValues(to).Inc()
|
||||||
|
snapshotSendFailures.WithLabelValues(to).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.status.activate()
|
s.status.activate()
|
||||||
@ -126,7 +130,7 @@ func (s *snapshotSender) send(merged snap.Message) {
|
|||||||
s.tr.Logger.Info(
|
s.tr.Logger.Info(
|
||||||
"sent database snapshot",
|
"sent database snapshot",
|
||||||
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
|
||||||
zap.String("remote-peer-id", types.ID(m.To).String()),
|
zap.String("remote-peer-id", to),
|
||||||
zap.Int64("bytes", merged.TotalSize),
|
zap.Int64("bytes", merged.TotalSize),
|
||||||
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
||||||
)
|
)
|
||||||
@ -134,7 +138,10 @@ func (s *snapshotSender) send(merged snap.Message) {
|
|||||||
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
||||||
}
|
}
|
||||||
|
|
||||||
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
|
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
|
||||||
|
|
||||||
|
snapshotSend.WithLabelValues(to).Inc()
|
||||||
|
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
// post posts the given request.
|
// post posts the given request.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user