mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: Add response byte size and range response count to took too long warning
This commit is contained in:
parent
ef154094b3
commit
07f833ae3e
@ -107,9 +107,10 @@ func (s *EtcdServer) newApplierV3() applierV3 {
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
||||
defer warnOfExpensiveRequest(time.Now(), &pb.InternalRaftStringer{Request: r})
|
||||
|
||||
ar := &applyResult{}
|
||||
defer func(start time.Time) {
|
||||
warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||
}(time.Now())
|
||||
|
||||
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
||||
switch {
|
||||
|
@ -107,7 +107,7 @@ func (a *applierV2store) Sync(r *RequestV2) Response {
|
||||
// applyV2Request interprets r as a call to store.X and returns a Response interpreted
|
||||
// from store.Event
|
||||
func (s *EtcdServer) applyV2Request(r *RequestV2) Response {
|
||||
defer warnOfExpensiveRequest(time.Now(), r)
|
||||
defer warnOfExpensiveRequest(time.Now(), r, nil, nil)
|
||||
|
||||
switch r.Method {
|
||||
case "POST":
|
||||
|
@ -16,11 +16,15 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
// isConnectedToQuorumSince checks whether the local member is connected to the
|
||||
@ -97,18 +101,53 @@ func (nc *notifier) notify(err error) {
|
||||
close(nc.c)
|
||||
}
|
||||
|
||||
func warnOfExpensiveRequest(now time.Time, stringer fmt.Stringer) {
|
||||
warnOfExpensiveGenericRequest(now, stringer, "")
|
||||
func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
|
||||
var resp string
|
||||
if !isNil(respMsg) {
|
||||
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
|
||||
}
|
||||
warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
|
||||
}
|
||||
|
||||
func warnOfExpensiveReadOnlyRangeRequest(now time.Time, stringer fmt.Stringer) {
|
||||
warnOfExpensiveGenericRequest(now, stringer, "read-only range ")
|
||||
func warnOfExpensiveReadOnlyTxnRequest(now time.Time, reqStringer fmt.Stringer, txnResponse *pb.TxnResponse, err error) {
|
||||
var resp string
|
||||
if !isNil(txnResponse) {
|
||||
var resps []string
|
||||
for _, r := range txnResponse.Responses {
|
||||
switch op := r.Response.(type) {
|
||||
case *pb.ResponseOp_ResponseRange:
|
||||
resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
|
||||
default:
|
||||
// only range responses should be in a read only txn request
|
||||
}
|
||||
}
|
||||
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
|
||||
}
|
||||
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
|
||||
}
|
||||
|
||||
func warnOfExpensiveGenericRequest(now time.Time, stringer fmt.Stringer, prefix string) {
|
||||
func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
|
||||
var resp string
|
||||
if !isNil(rangeResponse) {
|
||||
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
|
||||
}
|
||||
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
|
||||
}
|
||||
|
||||
func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
|
||||
// TODO: add metrics
|
||||
d := time.Since(now)
|
||||
if d > warnApplyDuration {
|
||||
plog.Warningf("%srequest %q took too long (%v) to execute", prefix, stringer.String(), d)
|
||||
var result string
|
||||
if err != nil {
|
||||
result = fmt.Sprintf("error:%v", err)
|
||||
} else {
|
||||
result = resp
|
||||
}
|
||||
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
||||
}
|
||||
}
|
||||
|
||||
func isNil(msg proto.Message) bool {
|
||||
return msg == nil || reflect.ValueOf(msg).IsNil()
|
||||
}
|
||||
|
@ -84,23 +84,26 @@ type Authenticator interface {
|
||||
}
|
||||
|
||||
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
defer warnOfExpensiveReadOnlyRangeRequest(time.Now(), r)
|
||||
var resp *pb.RangeResponse
|
||||
var err error
|
||||
defer func(start time.Time) {
|
||||
warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
|
||||
}(time.Now())
|
||||
|
||||
if !r.Serializable {
|
||||
err := s.linearizableReadNotify(ctx)
|
||||
err = s.linearizableReadNotify(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
var resp *pb.RangeResponse
|
||||
var err error
|
||||
chk := func(ai *auth.AuthInfo) error {
|
||||
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
||||
}
|
||||
|
||||
get := func() { resp, err = s.applyV3Base.Range(nil, r) }
|
||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||
return nil, serr
|
||||
err = serr
|
||||
return nil, err
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
@ -135,7 +138,9 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
|
||||
return checkTxnAuth(s.authStore, ai, r)
|
||||
}
|
||||
|
||||
defer warnOfExpensiveReadOnlyRangeRequest(time.Now(), r)
|
||||
defer func(start time.Time) {
|
||||
warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
|
||||
}(time.Now())
|
||||
|
||||
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user