mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #9838 from jpbetz/automated-cherry-pick-of-#9821-origin-release-3.1-1528833932
etcdserver: Automated cherry pick of detailed "took too long" warnings to release-3.1
This commit is contained in:
commit
1d7a2ca520
@ -95,6 +95,9 @@ func (s *EtcdServer) newApplierV3() applierV3 {
|
|||||||
|
|
||||||
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
||||||
ar := &applyResult{}
|
ar := &applyResult{}
|
||||||
|
defer func(start time.Time) {
|
||||||
|
warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
||||||
switch {
|
switch {
|
||||||
|
@ -105,10 +105,12 @@ func (a *applierV2store) Sync(r *pb.Request) Response {
|
|||||||
return Response{}
|
return Response{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// applyV2Request interprets r as a call to store.X and returns a Response interpreted
|
// applyV2Request interprets r as a call to v2store.X
|
||||||
// from store.Event
|
// and returns a Response interpreted from v2store.Event
|
||||||
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
|
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
|
||||||
|
defer warnOfExpensiveRequest(time.Now(), r, nil, nil)
|
||||||
toTTLOptions(r)
|
toTTLOptions(r)
|
||||||
|
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
case "POST":
|
case "POST":
|
||||||
return s.applyV2.Post(r)
|
return s.applyV2.Post(r)
|
||||||
|
175
etcdserver/etcdserverpb/raft_internal_stringer.go
Normal file
175
etcdserver/etcdserverpb/raft_internal_stringer.go
Normal file
@ -0,0 +1,175 @@
|
|||||||
|
// Copyright 2018 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package etcdserverpb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
proto "github.com/golang/protobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// InternalRaftStringer implements custom proto Stringer:
|
||||||
|
// redact password, replace value fields with value_size fields.
|
||||||
|
type InternalRaftStringer struct {
|
||||||
|
Request *InternalRaftRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *InternalRaftStringer) String() string {
|
||||||
|
switch {
|
||||||
|
case as.Request.LeaseGrant != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> lease_grant:<ttl:%d-second id:%016x>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.LeaseGrant.TTL,
|
||||||
|
as.Request.LeaseGrant.ID,
|
||||||
|
)
|
||||||
|
case as.Request.LeaseRevoke != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> lease_revoke:<id:%016x>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.LeaseRevoke.ID,
|
||||||
|
)
|
||||||
|
case as.Request.Authenticate != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> authenticate:<name:%s simple_token:%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.Authenticate.Name,
|
||||||
|
as.Request.Authenticate.SimpleToken,
|
||||||
|
)
|
||||||
|
case as.Request.AuthUserAdd != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> auth_user_add:<name:%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.AuthUserAdd.Name,
|
||||||
|
)
|
||||||
|
case as.Request.AuthUserChangePassword != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> auth_user_change_password:<name:%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.AuthUserChangePassword.Name,
|
||||||
|
)
|
||||||
|
case as.Request.Put != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> put:<%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
newLoggablePutRequest(as.Request.Put).String(),
|
||||||
|
)
|
||||||
|
case as.Request.Txn != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> txn:<%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
NewLoggableTxnRequest(as.Request.Txn).String(),
|
||||||
|
)
|
||||||
|
default:
|
||||||
|
// nothing to redact
|
||||||
|
}
|
||||||
|
return as.Request.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// txnRequestStringer implements a custom proto String to replace value bytes fields with value size
|
||||||
|
// fields in any nested txn and put operations.
|
||||||
|
type txnRequestStringer struct {
|
||||||
|
Request *TxnRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer {
|
||||||
|
return &txnRequestStringer{request}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *txnRequestStringer) String() string {
|
||||||
|
var compare []string
|
||||||
|
for _, c := range as.Request.Compare {
|
||||||
|
switch cv := c.TargetUnion.(type) {
|
||||||
|
case *Compare_Value:
|
||||||
|
compare = append(compare, newLoggableValueCompare(c, cv).String())
|
||||||
|
default:
|
||||||
|
// nothing to redact
|
||||||
|
compare = append(compare, c.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var success []string
|
||||||
|
for _, s := range as.Request.Success {
|
||||||
|
success = append(success, newLoggableRequestOp(s).String())
|
||||||
|
}
|
||||||
|
var failure []string
|
||||||
|
for _, f := range as.Request.Failure {
|
||||||
|
failure = append(failure, newLoggableRequestOp(f).String())
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("compare:<%s> success:<%s> failure:<%s>",
|
||||||
|
strings.Join(compare, " "),
|
||||||
|
strings.Join(success, " "),
|
||||||
|
strings.Join(failure, " "),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// requestOpStringer implements a custom proto String to replace value bytes fields with value
|
||||||
|
// size fields in any nested txn and put operations.
|
||||||
|
type requestOpStringer struct {
|
||||||
|
Op *RequestOp
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
|
||||||
|
return &requestOpStringer{op}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *requestOpStringer) String() string {
|
||||||
|
switch op := as.Op.Request.(type) {
|
||||||
|
case *RequestOp_RequestPut:
|
||||||
|
return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String())
|
||||||
|
default:
|
||||||
|
// nothing to redact
|
||||||
|
}
|
||||||
|
return as.Op.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// loggableValueCompare implements a custom proto String for Compare.Value union member types to
|
||||||
|
// replace the value bytes field with a value size field.
|
||||||
|
// To preserve proto encoding of the key and range_end bytes, a faked out proto type is used here.
|
||||||
|
type loggableValueCompare struct {
|
||||||
|
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
|
||||||
|
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
|
||||||
|
Key []byte `protobuf:"bytes,3,opt,name=key,proto3"`
|
||||||
|
ValueSize int `protobuf:"bytes,7,opt,name=value_size,proto3"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare {
|
||||||
|
return &loggableValueCompare{
|
||||||
|
c.Result,
|
||||||
|
c.Target,
|
||||||
|
c.Key,
|
||||||
|
len(cv.Value),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *loggableValueCompare) Reset() { *m = loggableValueCompare{} }
|
||||||
|
func (m *loggableValueCompare) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*loggableValueCompare) ProtoMessage() {}
|
||||||
|
|
||||||
|
// loggablePutRequest implements a custom proto String to replace value bytes field with a value
|
||||||
|
// size field.
|
||||||
|
// To preserve proto encoding of the key bytes, a faked out proto type is used here.
|
||||||
|
type loggablePutRequest struct {
|
||||||
|
Key []byte `protobuf:"bytes,1,opt,name=key,proto3"`
|
||||||
|
ValueSize int `protobuf:"varint,2,opt,name=value_size,proto3"`
|
||||||
|
Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"`
|
||||||
|
PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLoggablePutRequest(request *PutRequest) *loggablePutRequest {
|
||||||
|
return &loggablePutRequest{
|
||||||
|
request.Key,
|
||||||
|
len(request.Value),
|
||||||
|
request.Lease,
|
||||||
|
request.PrevKv,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *loggablePutRequest) Reset() { *m = loggablePutRequest{} }
|
||||||
|
func (m *loggablePutRequest) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*loggablePutRequest) ProtoMessage() {}
|
@ -800,14 +800,8 @@ func (s *EtcdServer) run() {
|
|||||||
|
|
||||||
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
||||||
s.applySnapshot(ep, apply)
|
s.applySnapshot(ep, apply)
|
||||||
st := time.Now()
|
|
||||||
s.applyEntries(ep, apply)
|
s.applyEntries(ep, apply)
|
||||||
d := time.Since(st)
|
|
||||||
entriesNum := len(apply.entries)
|
|
||||||
if entriesNum != 0 && d > time.Duration(entriesNum)*warnApplyDuration {
|
|
||||||
plog.Warningf("apply entries took too long [%v for %d entries]", d, len(apply.entries))
|
|
||||||
plog.Warningf("avoid queries with large range/delete range!")
|
|
||||||
}
|
|
||||||
proposalsApplied.Set(float64(ep.appliedi))
|
proposalsApplied.Set(float64(ep.appliedi))
|
||||||
s.applyWait.Trigger(ep.appliedi)
|
s.applyWait.Trigger(ep.appliedi)
|
||||||
// wait for the raft routine to finish the disk writes before triggering a
|
// wait for the raft routine to finish the disk writes before triggering a
|
||||||
|
@ -15,11 +15,16 @@
|
|||||||
package etcdserver
|
package etcdserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
"github.com/coreos/etcd/etcdserver/membership"
|
"github.com/coreos/etcd/etcdserver/membership"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/rafthttp"
|
"github.com/coreos/etcd/rafthttp"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
// isConnectedToQuorumSince checks whether the local member is connected to the
|
// isConnectedToQuorumSince checks whether the local member is connected to the
|
||||||
@ -95,3 +100,55 @@ func (nc *notifier) notify(err error) {
|
|||||||
nc.err = err
|
nc.err = err
|
||||||
close(nc.c)
|
close(nc.c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
|
||||||
|
var resp string
|
||||||
|
if !isNil(respMsg) {
|
||||||
|
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
|
||||||
|
}
|
||||||
|
warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
|
||||||
|
reqStringer := pb.NewLoggableTxnRequest(r)
|
||||||
|
var resp string
|
||||||
|
if !isNil(txnResponse) {
|
||||||
|
var resps []string
|
||||||
|
for _, r := range txnResponse.Responses {
|
||||||
|
switch op := r.Response.(type) {
|
||||||
|
case *pb.ResponseOp_ResponseRange:
|
||||||
|
resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
|
||||||
|
default:
|
||||||
|
// only range responses should be in a read only txn request
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
|
||||||
|
}
|
||||||
|
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
|
||||||
|
var resp string
|
||||||
|
if !isNil(rangeResponse) {
|
||||||
|
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
|
||||||
|
}
|
||||||
|
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
|
||||||
|
// TODO: add metrics
|
||||||
|
d := time.Since(now)
|
||||||
|
if d > warnApplyDuration {
|
||||||
|
var result string
|
||||||
|
if err != nil {
|
||||||
|
result = fmt.Sprintf("error:%v", err)
|
||||||
|
} else {
|
||||||
|
result = resp
|
||||||
|
}
|
||||||
|
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isNil(msg proto.Message) bool {
|
||||||
|
return msg == nil || reflect.ValueOf(msg).IsNil()
|
||||||
|
}
|
||||||
|
@ -95,21 +95,25 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
|
|||||||
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
||||||
return s.legacyRange(ctx, r)
|
return s.legacyRange(ctx, r)
|
||||||
}
|
}
|
||||||
|
var resp *pb.RangeResponse
|
||||||
|
var err error
|
||||||
|
defer func(start time.Time) {
|
||||||
|
warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
if !r.Serializable {
|
if !r.Serializable {
|
||||||
err := s.linearizableReadNotify(ctx)
|
err = s.linearizableReadNotify(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var resp *pb.RangeResponse
|
|
||||||
var err error
|
|
||||||
chk := func(ai *auth.AuthInfo) error {
|
chk := func(ai *auth.AuthInfo) error {
|
||||||
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
||||||
}
|
}
|
||||||
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
|
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
|
||||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||||
return nil, serr
|
err = serr
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
@ -178,6 +182,11 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
|
|||||||
chk := func(ai *auth.AuthInfo) error {
|
chk := func(ai *auth.AuthInfo) error {
|
||||||
return checkTxnAuth(s.authStore, ai, r)
|
return checkTxnAuth(s.authStore, ai, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func(start time.Time) {
|
||||||
|
warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
||||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||||
return nil, serr
|
return nil, serr
|
||||||
|
Loading…
x
Reference in New Issue
Block a user