Merge pull request #12455 from mborsz/metrics

Add etcd_server_apply_duration_seconds
This commit is contained in:
Jingyi Hu 2020-11-10 00:47:11 +08:00 committed by GitHub
commit 01844fd285
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 3 deletions

View File

@ -19,6 +19,7 @@ import (
"context"
"fmt"
"sort"
"strconv"
"time"
"github.com/coreos/go-semver/semver"
@ -39,6 +40,7 @@ import (
const (
warnApplyDuration = 100 * time.Millisecond
v3Version = "v3"
)
type applyResult struct {
@ -130,10 +132,13 @@ func (s *EtcdServer) newApplierV3() applierV3 {
}
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
op := "unknown"
ar := &applyResult{}
defer func(start time.Time) {
success := ar.err == nil || ar.err == mvcc.ErrCompacted
applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
warnOfExpensiveRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
if ar.err != nil && ar.err != mvcc.ErrCompacted {
if !success {
warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}
}(time.Now())
@ -141,62 +146,90 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {
case r.Range != nil:
op = "Range"
ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
case r.Put != nil:
op = "Put"
ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)
case r.DeleteRange != nil:
op = "DeleteRange"
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
case r.Txn != nil:
op = "Txn"
ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)
case r.Compaction != nil:
op = "Compaction"
ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
case r.LeaseGrant != nil:
op = "LeaseGrant"
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
case r.LeaseRevoke != nil:
op = "LeaseRevoke"
ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
case r.LeaseCheckpoint != nil:
op = "LeaseCheckpoint"
ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
case r.Alarm != nil:
op = "Alarm"
ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
case r.Authenticate != nil:
op = "Authenticate"
ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
case r.AuthEnable != nil:
op = "AuthEnable"
ar.resp, ar.err = a.s.applyV3.AuthEnable()
case r.AuthDisable != nil:
op = "AuthDisable"
ar.resp, ar.err = a.s.applyV3.AuthDisable()
case r.AuthStatus != nil:
ar.resp, ar.err = a.s.applyV3.AuthStatus()
case r.AuthUserAdd != nil:
op = "AuthUserAdd"
ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
case r.AuthUserDelete != nil:
op = "AuthUserDelete"
ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)
case r.AuthUserChangePassword != nil:
op = "AuthUserChangePassword"
ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)
case r.AuthUserGrantRole != nil:
op = "AuthUserGrantRole"
ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)
case r.AuthUserGet != nil:
op = "AuthUserGet"
ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)
case r.AuthUserRevokeRole != nil:
op = "AuthUserRevokeRole"
ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
case r.AuthRoleAdd != nil:
op = "AuthRoleAdd"
ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)
case r.AuthRoleGrantPermission != nil:
op = "AuthRoleGrantPermission"
ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
case r.AuthRoleGet != nil:
op = "AuthRoleGet"
ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)
case r.AuthRoleRevokePermission != nil:
op = "AuthRoleRevokePermission"
ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
case r.AuthRoleDelete != nil:
op = "AuthRoleDelete"
ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)
case r.AuthUserList != nil:
op = "AuthUserList"
ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
case r.AuthRoleList != nil:
op = "AuthRoleList"
ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
case r.ClusterVersionSet != nil:
op = "ClusterVersionSet"
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet"
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet"
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet)
default:
panic("not implemented")

View File

@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"path"
"strconv"
"time"
"go.etcd.io/etcd/pkg/v3/pbutil"
@ -27,6 +28,8 @@ import (
"go.uber.org/zap"
)
const v2Version = "v2"
// ApplierV2 is the interface for processing V2 raft messages
type ApplierV2 interface {
Delete(r *RequestV2) Response
@ -109,12 +112,16 @@ func (a *applierV2store) Sync(r *RequestV2) Response {
// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *RequestV2) Response {
func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) {
stringer := panicAlternativeStringer{
stringer: r,
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
}
defer warnOfExpensiveRequest(s.getLogger(), time.Now(), stringer, nil, nil)
defer func(start time.Time) {
success := resp.Err == nil
applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
warnOfExpensiveRequest(s.getLogger(), start, stringer, nil, nil)
}(time.Now())
switch r.Method {
case "POST":

View File

@ -164,6 +164,17 @@ var (
Name: "limit",
Help: "The file descriptor limit.",
})
applySec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "apply_duration_seconds",
Help: "The latency distributions of v2 apply called by backend.",
// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
// highest bucket start of 0.0001 sec * 2^19 == 52.4288 sec
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20),
},
[]string{"version", "op", "success"})
)
func init() {
@ -189,6 +200,7 @@ func init() {
prometheus.MustRegister(learnerPromoteFailed)
prometheus.MustRegister(fdUsed)
prometheus.MustRegister(fdLimit)
prometheus.MustRegister(applySec)
currentVersion.With(prometheus.Labels{
"server_version": version.Version,