diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 179a4bd96..ec24b4316 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "sort" + "strconv" "time" "github.com/coreos/go-semver/semver" @@ -39,6 +40,7 @@ import ( const ( warnApplyDuration = 100 * time.Millisecond + v3Version = "v3" ) type applyResult struct { @@ -130,10 +132,13 @@ func (s *EtcdServer) newApplierV3() applierV3 { } func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { + op := "unknown" ar := &applyResult{} defer func(start time.Time) { + success := ar.err == nil || ar.err == mvcc.ErrCompacted + applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) warnOfExpensiveRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) - if ar.err != nil && ar.err != mvcc.ErrCompacted { + if !success { warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) } }(time.Now()) @@ -141,62 +146,90 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls switch { case r.Range != nil: + op = "Range" ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) case r.Put != nil: + op = "Put" ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put) case r.DeleteRange != nil: + op = "DeleteRange" ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) case r.Txn != nil: + op = "Txn" ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn) case r.Compaction != nil: + op = "Compaction" ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction) case r.LeaseGrant != nil: + op = "LeaseGrant" ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant) case r.LeaseRevoke != nil: + op = "LeaseRevoke" ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke) case r.LeaseCheckpoint != nil: + op = "LeaseCheckpoint" ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint) case r.Alarm != nil: + op = "Alarm" ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm) case r.Authenticate != nil: + op = "Authenticate" ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate) case r.AuthEnable != nil: + op = "AuthEnable" ar.resp, ar.err = a.s.applyV3.AuthEnable() case r.AuthDisable != nil: + op = "AuthDisable" ar.resp, ar.err = a.s.applyV3.AuthDisable() case r.AuthStatus != nil: ar.resp, ar.err = a.s.applyV3.AuthStatus() case r.AuthUserAdd != nil: + op = "AuthUserAdd" ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd) case r.AuthUserDelete != nil: + op = "AuthUserDelete" ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete) case r.AuthUserChangePassword != nil: + op = "AuthUserChangePassword" ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword) case r.AuthUserGrantRole != nil: + op = "AuthUserGrantRole" ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole) case r.AuthUserGet != nil: + op = "AuthUserGet" ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet) case r.AuthUserRevokeRole != nil: + op = "AuthUserRevokeRole" ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole) case r.AuthRoleAdd != nil: + op = "AuthRoleAdd" ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd) case r.AuthRoleGrantPermission != nil: + op = "AuthRoleGrantPermission" ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission) case r.AuthRoleGet != nil: + op = "AuthRoleGet" ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet) case r.AuthRoleRevokePermission != nil: + op = "AuthRoleRevokePermission" ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission) case r.AuthRoleDelete != nil: + op = "AuthRoleDelete" ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete) case r.AuthUserList != nil: + op = "AuthUserList" ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList) case r.AuthRoleList != nil: + op = "AuthRoleList" ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) case r.ClusterVersionSet != nil: + op = "ClusterVersionSet" a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) case r.ClusterMemberAttrSet != nil: + op = "ClusterMemberAttrSet" a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet) case r.DowngradeInfoSet != nil: + op = "DowngradeInfoSet" a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet) default: panic("not implemented") diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index 06cd86938..386f325ca 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "path" + "strconv" "time" "go.etcd.io/etcd/pkg/v3/pbutil" @@ -27,6 +28,8 @@ import ( "go.uber.org/zap" ) +const v2Version = "v2" + // ApplierV2 is the interface for processing V2 raft messages type ApplierV2 interface { Delete(r *RequestV2) Response @@ -109,12 +112,16 @@ func (a *applierV2store) Sync(r *RequestV2) Response { // applyV2Request interprets r as a call to v2store.X // and returns a Response interpreted from v2store.Event -func (s *EtcdServer) applyV2Request(r *RequestV2) Response { +func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) { stringer := panicAlternativeStringer{ stringer: r, alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) }, } - defer warnOfExpensiveRequest(s.getLogger(), time.Now(), stringer, nil, nil) + defer func(start time.Time) { + success := resp.Err == nil + applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) + warnOfExpensiveRequest(s.getLogger(), start, stringer, nil, nil) + }(time.Now()) switch r.Method { case "POST": diff --git a/server/etcdserver/metrics.go b/server/etcdserver/metrics.go index e4a1d3bec..06263a9cd 100644 --- a/server/etcdserver/metrics.go +++ b/server/etcdserver/metrics.go @@ -164,6 +164,17 @@ var ( Name: "limit", Help: "The file descriptor limit.", }) + applySec = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "apply_duration_seconds", + Help: "The latency distributions of v2 apply called by backend.", + + // lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2 + // highest bucket start of 0.0001 sec * 2^19 == 52.4288 sec + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), + }, + []string{"version", "op", "success"}) ) func init() { @@ -189,6 +200,7 @@ func init() { prometheus.MustRegister(learnerPromoteFailed) prometheus.MustRegister(fdUsed) prometheus.MustRegister(fdLimit) + prometheus.MustRegister(applySec) currentVersion.With(prometheus.Labels{ "server_version": version.Version,