mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver/api/rafthttp: add v3 snapshot send/receive metrics
Distribution would be: 0.1 second or more ... 25.6 seconds or more 51.2 seconds or more etcd_network_snapshot_send_success etcd_network_snapshot_send_failures etcd_network_snapshot_send_total_duration_seconds etcd_network_snapshot_receive_success etcd_network_snapshot_receive_failures etcd_network_snapshot_receive_total_duration_seconds Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
parent
7ec9ff62b5
commit
d838e24f80
@ -22,6 +22,7 @@ import (
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
@ -149,6 +150,8 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
|
||||
}
|
||||
}
|
||||
|
||||
const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
|
||||
|
||||
// ServeHTTP serves HTTP request to receive and process snapshot message.
|
||||
//
|
||||
// If request sender dies without closing underlying TCP connection,
|
||||
@ -159,9 +162,12 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
|
||||
// received and processed.
|
||||
// 2. this case should happen rarely, so no further optimization is done.
|
||||
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
|
||||
if r.Method != "POST" {
|
||||
w.Header().Set("Allow", "POST")
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@ -169,6 +175,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusPreconditionFailed)
|
||||
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@ -177,19 +184,22 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
dec := &messageDecoder{r: r.Body}
|
||||
// let snapshots be very large since they can exceed 512MB for large installations
|
||||
m, err := dec.decodeLimit(uint64(1 << 63))
|
||||
from := types.ID(m.From).String()
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
|
||||
plog.Errorf(msg)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
|
||||
receivedBytes.WithLabelValues(from).Add(float64(m.Size()))
|
||||
|
||||
if m.Type != raftpb.MsgSnap {
|
||||
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
|
||||
http.Error(w, "wrong raft message type", http.StatusBadRequest)
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@ -200,9 +210,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
||||
plog.Error(msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
return
|
||||
}
|
||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
|
||||
receivedBytes.WithLabelValues(from).Add(float64(n))
|
||||
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
||||
|
||||
if err := h.r.Process(context.TODO(), m); err != nil {
|
||||
@ -215,12 +226,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
msg := fmt.Sprintf("failed to process raft message (%v)", err)
|
||||
plog.Warningf(msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
}
|
||||
return
|
||||
}
|
||||
// Write StatusNoContent header after the message has been processed by
|
||||
// raft, which facilitates the client to report MsgSnap status.
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
|
||||
snapshotReceive.WithLabelValues(from).Inc()
|
||||
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
|
||||
type streamHandler struct {
|
||||
|
@ -53,6 +53,68 @@ var (
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_send_success",
|
||||
Help: "Total number of successful snapshot sends",
|
||||
},
|
||||
[]string{"To"},
|
||||
)
|
||||
|
||||
snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_send_failures",
|
||||
Help: "Total number of snapshot send failures",
|
||||
},
|
||||
[]string{"To"},
|
||||
)
|
||||
|
||||
snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_send_total_duration_seconds",
|
||||
Help: "Total latency distributions of v3 snapshot sends",
|
||||
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||
},
|
||||
[]string{"To"},
|
||||
)
|
||||
|
||||
snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_receive_success",
|
||||
Help: "Total number of successful snapshot receives",
|
||||
},
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_receive_failures",
|
||||
Help: "Total number of snapshot receive failures",
|
||||
},
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_receive_total_duration_seconds",
|
||||
Help: "Total latency distributions of v3 snapshot receives",
|
||||
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||
},
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
@ -69,5 +131,13 @@ func init() {
|
||||
prometheus.MustRegister(receivedBytes)
|
||||
prometheus.MustRegister(sentFailures)
|
||||
prometheus.MustRegister(recvFailures)
|
||||
|
||||
prometheus.MustRegister(snapshotSend)
|
||||
prometheus.MustRegister(snapshotSendFailures)
|
||||
prometheus.MustRegister(snapshotSendSeconds)
|
||||
prometheus.MustRegister(snapshotReceive)
|
||||
prometheus.MustRegister(snapshotReceiveFailures)
|
||||
prometheus.MustRegister(snapshotReceiveSeconds)
|
||||
|
||||
prometheus.MustRegister(rtts)
|
||||
}
|
||||
|
@ -64,7 +64,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
|
||||
func (s *snapshotSender) stop() { close(s.stopc) }
|
||||
|
||||
func (s *snapshotSender) send(merged snap.Message) {
|
||||
start := time.Now()
|
||||
|
||||
m := merged.Message
|
||||
to := types.ID(m.To).String()
|
||||
|
||||
body := createSnapBody(merged)
|
||||
defer body.Close()
|
||||
@ -92,14 +95,18 @@ func (s *snapshotSender) send(merged snap.Message) {
|
||||
// machine knows about it, it would pause a while and retry sending
|
||||
// new snapshot message.
|
||||
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
||||
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
|
||||
sentFailures.WithLabelValues(to).Inc()
|
||||
snapshotSendFailures.WithLabelValues(to).Inc()
|
||||
return
|
||||
}
|
||||
s.status.activate()
|
||||
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
|
||||
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
||||
|
||||
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
|
||||
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
|
||||
|
||||
snapshotSend.WithLabelValues(to).Inc()
|
||||
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
|
||||
// post posts the given request.
|
||||
|
Loading…
x
Reference in New Issue
Block a user