Apply encapsulation: Cleanup metrics reporting.

Side effect: applySec(0.4s) used to be reported as 0s, now it's correctly 0.4s.
This commit is contained in:
Piotr Tabor 2022-04-04 20:41:11 +02:00
parent 47a771871b
commit 4e04770bac
9 changed files with 173 additions and 117 deletions

View File

@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm" "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
"go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/mvcc"
@ -111,13 +112,12 @@ func (a *UberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s
op := "unknown" op := "unknown"
ar := &ApplyResult{} ar := &ApplyResult{}
defer func(start time.Time) { defer func(start time.Time) {
op += " " success := ar.Err == nil || ar.Err == mvcc.ErrCompacted
// success := ar.Err == nil || ar.Err == mvcc.ErrCompacted txn.ApplySecObserve(v3Version, op, success, time.Since(start))
//etcdserver.applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) txn.WarnOfExpensiveRequest(a.lg, a.warningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.Resp, ar.Err)
//etcdserver.warnOfExpensiveRequest(a.lg, a.warningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.Resp, ar.Err) if !success {
//if !success { txn.WarnOfFailedRequest(a.lg, start, &pb.InternalRaftStringer{Request: r}, ar.Resp, ar.Err)
// etcdserver.warnOfFailedRequest(a.lg, start, &pb.InternalRaftStringer{Request: r}, ar.Resp, ar.Err) }
//}
}(time.Now()) }(time.Now())
switch { switch {

View File

@ -18,7 +18,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"path" "path"
"strconv"
"time" "time"
"unicode/utf8" "unicode/utf8"
@ -28,6 +27,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors" "go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -130,8 +130,8 @@ func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.Shoul
return return
} }
success := resp.Err == nil success := resp.Err == nil
applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) txn.ApplySecObserve(v2Version, r.Method, success, time.Since(start))
warnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil) txn.WarnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
}(time.Now()) }(time.Now())
switch r.Method { switch r.Method {

View File

@ -70,12 +70,6 @@ var (
Name: "heartbeat_send_failures_total", Name: "heartbeat_send_failures_total",
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).", Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
}) })
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "slow_apply_total",
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
})
applySnapshotInProgress = prometheus.NewGauge(prometheus.GaugeOpts{ applySnapshotInProgress = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd", Namespace: "etcd",
Subsystem: "server", Subsystem: "server",
@ -159,17 +153,6 @@ var (
Name: "limit", Name: "limit",
Help: "The file descriptor limit.", Help: "The file descriptor limit.",
}) })
applySec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "apply_duration_seconds",
Help: "The latency distributions of v2 apply called by backend.",
// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
// highest bucket start of 0.0001 sec * 2^19 == 52.4288 sec
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20),
},
[]string{"version", "op", "success"})
) )
func init() { func init() {
@ -177,7 +160,6 @@ func init() {
prometheus.MustRegister(isLeader) prometheus.MustRegister(isLeader)
prometheus.MustRegister(leaderChanges) prometheus.MustRegister(leaderChanges)
prometheus.MustRegister(heartbeatSendFailures) prometheus.MustRegister(heartbeatSendFailures)
prometheus.MustRegister(slowApplies)
prometheus.MustRegister(applySnapshotInProgress) prometheus.MustRegister(applySnapshotInProgress)
prometheus.MustRegister(proposalsCommitted) prometheus.MustRegister(proposalsCommitted)
prometheus.MustRegister(proposalsApplied) prometheus.MustRegister(proposalsApplied)
@ -194,7 +176,6 @@ func init() {
prometheus.MustRegister(learnerPromoteFailed) prometheus.MustRegister(learnerPromoteFailed)
prometheus.MustRegister(fdUsed) prometheus.MustRegister(fdUsed)
prometheus.MustRegister(fdLimit) prometheus.MustRegister(fdLimit)
prometheus.MustRegister(applySec)
currentVersion.With(prometheus.Labels{ currentVersion.With(prometheus.Labels{
"server_version": version.Version, "server_version": version.Version,

View File

@ -33,6 +33,7 @@ import (
) )
func TestGetIDs(t *testing.T) { func TestGetIDs(t *testing.T) {
lg := zaptest.NewLogger(t)
addcc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2} addcc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
addEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(addcc)} addEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(addcc)}
removecc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2} removecc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
@ -67,7 +68,7 @@ func TestGetIDs(t *testing.T) {
if tt.confState != nil { if tt.confState != nil {
snap.Metadata.ConfState = *tt.confState snap.Metadata.ConfState = *tt.confState
} }
idSet := serverstorage.GetEffectiveNodeIDsFromWalEntries(zaptest.NewLogger(t), &snap, tt.ents) idSet := serverstorage.GetEffectiveNodeIDsFromWalEntries(lg, &snap, tt.ents)
if !reflect.DeepEqual(idSet, tt.widSet) { if !reflect.DeepEqual(idSet, tt.widSet) {
t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet) t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet)
} }
@ -75,6 +76,7 @@ func TestGetIDs(t *testing.T) {
} }
func TestCreateConfigChangeEnts(t *testing.T) { func TestCreateConfigChangeEnts(t *testing.T) {
lg := zaptest.NewLogger(t)
m := membership.Member{ m := membership.Member{
ID: types.ID(1), ID: types.ID(1),
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}}, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
@ -147,7 +149,7 @@ func TestCreateConfigChangeEnts(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
gents := serverstorage.CreateConfigChangeEnts(zaptest.NewLogger(t), tt.ids, tt.self, tt.term, tt.index) gents := serverstorage.CreateConfigChangeEnts(lg, tt.ids, tt.self, tt.term, tt.index)
if !reflect.DeepEqual(gents, tt.wents) { if !reflect.DeepEqual(gents, tt.wents) {
t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents) t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents)
} }

View File

@ -0,0 +1,51 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txn
import (
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "slow_apply_total",
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
})
applySec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "apply_duration_seconds",
Help: "The latency distributions of v2 apply called by backend.",
// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
// highest bucket start of 0.0001 sec * 2^19 == 52.4288 sec
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20),
},
[]string{"version", "op", "success"})
)
func ApplySecObserve(version, op string, success bool, latency time.Duration) {
applySec.WithLabelValues(version, op, strconv.FormatBool(success)).Observe(float64(latency.Microseconds()) / 1000000.0)
}
func init() {
prometheus.MustRegister(applySec)
prometheus.MustRegister(slowApplies)
}

View File

@ -0,0 +1,103 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txn
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/golang/protobuf/proto"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.uber.org/zap"
)
func WarnOfExpensiveRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
if time.Since(now) <= warningApplyDuration {
return
}
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "", resp, err)
}
func WarnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
d := time.Since(now)
lg.Warn(
"failed to apply request",
zap.Duration("took", d),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
zap.Error(err),
)
}
func WarnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
if time.Since(now) <= warningApplyDuration {
return
}
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string
if !isNil(txnResponse) {
var resps []string
for _, r := range txnResponse.Responses {
switch op := r.Response.(type) {
case *pb.ResponseOp_ResponseRange:
resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
default:
// only range responses should be in a read only txn request
}
}
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), txnResponse.Size())
}
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only txn ", resp, err)
}
func WarnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
if time.Since(now) <= warningApplyDuration {
return
}
var resp string
if !isNil(rangeResponse) {
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), rangeResponse.Size())
}
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only range ", resp, err)
}
// callers need make sure time has passed warningApplyDuration
func warnOfExpensiveGenericRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
lg.Warn(
"apply request took too long",
zap.Duration("took", time.Since(now)),
zap.Duration("expected-duration", warningApplyDuration),
zap.String("prefix", prefix),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
zap.Error(err),
)
slowApplies.Inc()
}
func isNil(msg proto.Message) bool {
return msg == nil || reflect.ValueOf(msg).IsNil()
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package etcdserver package txn
import ( import (
"errors" "errors"
@ -46,7 +46,8 @@ func BenchmarkWarnOfExpensiveRequestNoLog(b *testing.B) {
Context: nil, Context: nil,
} }
err := errors.New("benchmarking warn of expensive request") err := errors.New("benchmarking warn of expensive request")
lg := zaptest.NewLogger(b)
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
warnOfExpensiveRequest(zaptest.NewLogger(b), time.Second, time.Now(), nil, m, err) WarnOfExpensiveRequest(lg, time.Second, time.Now(), nil, m, err)
} }
} }

View File

@ -16,17 +16,11 @@ package etcdserver
import ( import (
"fmt" "fmt"
"reflect"
"strings"
"time" "time"
"github.com/golang/protobuf/proto"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.uber.org/zap"
) )
// isConnectedToQuorumSince checks whether the local member is connected to the // isConnectedToQuorumSince checks whether the local member is connected to the
@ -103,82 +97,6 @@ func (nc *notifier) notify(err error) {
close(nc.c) close(nc.c)
} }
func warnOfExpensiveRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
if time.Since(now) <= warningApplyDuration {
return
}
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "", resp, err)
}
func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
d := time.Since(now)
lg.Warn(
"failed to apply request",
zap.Duration("took", d),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
zap.Error(err),
)
}
func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
if time.Since(now) <= warningApplyDuration {
return
}
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string
if !isNil(txnResponse) {
var resps []string
for _, r := range txnResponse.Responses {
switch op := r.Response.(type) {
case *pb.ResponseOp_ResponseRange:
resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
default:
// only range responses should be in a read only txn request
}
}
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), txnResponse.Size())
}
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only txn ", resp, err)
}
func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
if time.Since(now) <= warningApplyDuration {
return
}
var resp string
if !isNil(rangeResponse) {
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), rangeResponse.Size())
}
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only range ", resp, err)
}
// callers need make sure time has passed warningApplyDuration
func warnOfExpensiveGenericRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
lg.Warn(
"apply request took too long",
zap.Duration("took", time.Since(now)),
zap.Duration("expected-duration", warningApplyDuration),
zap.String("prefix", prefix),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
zap.Error(err),
)
slowApplies.Inc()
}
func isNil(msg proto.Message) bool {
return msg == nil || reflect.ValueOf(msg).IsNil()
}
// panicAlternativeStringer wraps a fmt.Stringer, and if calling String() panics, calls the alternative instead. // panicAlternativeStringer wraps a fmt.Stringer, and if calling String() panics, calls the alternative instead.
// This is needed to ensure logging slow v2 requests does not panic, which occurs when running integration tests // This is needed to ensure logging slow v2 requests does not panic, which occurs when running integration tests
// with the embedded server with github.com/golang/protobuf v1.4.0+. See https://github.com/etcd-io/etcd/issues/12197. // with the embedded server with github.com/golang/protobuf v1.4.0+. See https://github.com/etcd-io/etcd/issues/12197.

View File

@ -110,7 +110,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
var resp *pb.RangeResponse var resp *pb.RangeResponse
var err error var err error
defer func(start time.Time) { defer func(start time.Time) {
warnOfExpensiveReadOnlyRangeRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err) txn.WarnOfExpensiveReadOnlyRangeRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
if resp != nil { if resp != nil {
trace.AddField( trace.AddField(
traceutil.Field{Key: "response_count", Value: len(resp.Kvs)}, traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
@ -177,7 +177,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
} }
defer func(start time.Time) { defer func(start time.Time) {
warnOfExpensiveReadOnlyTxnRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err) txn.WarnOfExpensiveReadOnlyTxnRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
trace.LogIfLong(traceThreshold) trace.LogIfLong(traceThreshold)
}(time.Now()) }(time.Now())