mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
rafthttp: make metrics naming consistent
This commit is contained in:
parent
ab33c068b7
commit
a32abdbb0f
@ -23,15 +23,15 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
msgWriteDuration = prometheus.NewSummaryVec(
|
msgSentDuration = prometheus.NewSummaryVec(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
Name: "rafthttp_message_sending_latency_microseconds",
|
Name: "rafthttp_message_sent_latency_microseconds",
|
||||||
Help: "message sending latency distributions.",
|
Help: "message sent latency distributions.",
|
||||||
},
|
},
|
||||||
[]string{"channel", "remoteID", "msgType"},
|
[]string{"channel", "remoteID", "msgType"},
|
||||||
)
|
)
|
||||||
|
|
||||||
msgWriteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
|
msgSentFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
Name: "rafthttp_message_sent_failed_total",
|
Name: "rafthttp_message_sent_failed_total",
|
||||||
Help: "The total number of failed messages sent.",
|
Help: "The total number of failed messages sent.",
|
||||||
},
|
},
|
||||||
@ -40,22 +40,22 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(msgWriteDuration)
|
prometheus.MustRegister(msgSentDuration)
|
||||||
prometheus.MustRegister(msgWriteFailed)
|
prometheus.MustRegister(msgSentFailed)
|
||||||
}
|
}
|
||||||
|
|
||||||
func reportSendingDuration(channel string, m raftpb.Message, duration time.Duration) {
|
func reportSentDuration(channel string, m raftpb.Message, duration time.Duration) {
|
||||||
typ := m.Type.String()
|
typ := m.Type.String()
|
||||||
if isLinkHeartbeatMessage(m) {
|
if isLinkHeartbeatMessage(m) {
|
||||||
typ = "MsgLinkHeartbeat"
|
typ = "MsgLinkHeartbeat"
|
||||||
}
|
}
|
||||||
msgWriteDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond)))
|
msgSentDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func reportMessageFailure(channel string, m raftpb.Message) {
|
func reportSentFailure(channel string, m raftpb.Message) {
|
||||||
typ := m.Type.String()
|
typ := m.Type.String()
|
||||||
if isLinkHeartbeatMessage(m) {
|
if isLinkHeartbeatMessage(m) {
|
||||||
typ = "MsgLinkHeartbeat"
|
typ = "MsgLinkHeartbeat"
|
||||||
}
|
}
|
||||||
msgWriteFailed.WithLabelValues(channel, types.ID(m.To).String(), typ).Inc()
|
msgSentFailed.WithLabelValues(channel, types.ID(m.To).String(), typ).Inc()
|
||||||
}
|
}
|
||||||
|
@ -94,7 +94,7 @@ func (p *pipeline) handle() {
|
|||||||
|
|
||||||
p.Lock()
|
p.Lock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
reportMessageFailure(pipelineMsg, m)
|
reportSentFailure(pipelineMsg, m)
|
||||||
|
|
||||||
if p.errored == nil || p.errored.Error() != err.Error() {
|
if p.errored == nil || p.errored.Error() != err.Error() {
|
||||||
log.Printf("pipeline: error posting to %s: %v", p.id, err)
|
log.Printf("pipeline: error posting to %s: %v", p.id, err)
|
||||||
@ -123,7 +123,7 @@ func (p *pipeline) handle() {
|
|||||||
if isMsgSnap(m) {
|
if isMsgSnap(m) {
|
||||||
p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
|
p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
|
||||||
}
|
}
|
||||||
reportSendingDuration(pipelineMsg, m, time.Since(start))
|
reportSentDuration(pipelineMsg, m, time.Since(start))
|
||||||
}
|
}
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ func (cw *streamWriter) run() {
|
|||||||
case <-heartbeatc:
|
case <-heartbeatc:
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := enc.encode(linkHeartbeatMessage); err != nil {
|
if err := enc.encode(linkHeartbeatMessage); err != nil {
|
||||||
reportMessageFailure(string(t), linkHeartbeatMessage)
|
reportSentFailure(string(t), linkHeartbeatMessage)
|
||||||
|
|
||||||
log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
|
log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
|
||||||
cw.resetCloser()
|
cw.resetCloser()
|
||||||
@ -110,7 +110,7 @@ func (cw *streamWriter) run() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
reportSendingDuration(string(t), linkHeartbeatMessage, time.Since(start))
|
reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start))
|
||||||
case m := <-msgc:
|
case m := <-msgc:
|
||||||
if t == streamTypeMsgApp && m.Term != msgAppTerm {
|
if t == streamTypeMsgApp && m.Term != msgAppTerm {
|
||||||
// TODO: reasonable retry logic
|
// TODO: reasonable retry logic
|
||||||
@ -122,7 +122,7 @@ func (cw *streamWriter) run() {
|
|||||||
}
|
}
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := enc.encode(m); err != nil {
|
if err := enc.encode(m); err != nil {
|
||||||
reportMessageFailure(string(t), m)
|
reportSentFailure(string(t), m)
|
||||||
|
|
||||||
log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
|
log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
|
||||||
cw.resetCloser()
|
cw.resetCloser()
|
||||||
@ -131,7 +131,7 @@ func (cw *streamWriter) run() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
reportSendingDuration(string(t), m, time.Since(start))
|
reportSentDuration(string(t), m, time.Since(start))
|
||||||
case conn := <-cw.connc:
|
case conn := <-cw.connc:
|
||||||
cw.resetCloser()
|
cw.resetCloser()
|
||||||
t = conn.t
|
t = conn.t
|
||||||
|
Loading…
x
Reference in New Issue
Block a user