server: Added config parameter experimental-warning-apply-duration

Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
This commit is contained in:
Sam Batschelet 2021-03-03 11:23:04 -05:00
parent aa7126864d
commit 9aeabe447d
9 changed files with 26 additions and 19 deletions

View File

@ -53,6 +53,7 @@ const (
DefaultMaxSnapshots = 5 DefaultMaxSnapshots = 5
DefaultMaxWALs = 5 DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128) DefaultMaxTxnOps = uint(128)
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024 DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultGRPCKeepAliveMinTime = 5 * time.Second DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour DefaultGRPCKeepAliveInterval = 2 * time.Hour
@ -285,6 +286,9 @@ type Config struct {
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
// ForceNewCluster starts a new cluster even if previously started; unsafe. // ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"` ForceNewCluster bool `json:"force-new-cluster"`
@ -388,8 +392,9 @@ func NewConfig() *Config {
SnapshotCount: etcdserver.DefaultSnapshotCount, SnapshotCount: etcdserver.DefaultSnapshotCount,
SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries, SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries,
MaxTxnOps: DefaultMaxTxnOps, MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes, MaxRequestBytes: DefaultMaxRequestBytes,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,
GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime, GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval, GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,

View File

@ -209,6 +209,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
} }
print(e.cfg.logger, *cfg, srvcfg, memberInitialized) print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {

View File

@ -258,6 +258,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
// unsafe // unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")

View File

@ -212,6 +212,8 @@ Experimental feature:
Skip verification of SAN field in client certificate for peer connections. Skip verification of SAN field in client certificate for peer connections.
--experimental-watch-progress-notify-interval '10m' --experimental-watch-progress-notify-interval '10m'
Duration of periodical watch progress notification. Duration of periodical watch progress notification.
--experimental-warning-apply-duration '100ms'
Warning is generated if requests take more than this duration.
Unsafe feature: Unsafe feature:
--force-new-cluster 'false' --force-new-cluster 'false'

View File

@ -33,10 +33,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const (
warnApplyDuration = 100 * time.Millisecond
)
type applyResult struct { type applyResult struct {
resp proto.Message resp proto.Message
err error err error
@ -115,7 +111,7 @@ func (s *EtcdServer) newApplierV3() applierV3 {
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar := &applyResult{} ar := &applyResult{}
defer func(start time.Time) { defer func(start time.Time) {
warnOfExpensiveRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) warnOfExpensiveRequest(a.s.getLogger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
if ar.err != nil { if ar.err != nil {
warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
} }

View File

@ -119,7 +119,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) Response {
stringer: r, stringer: r,
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) }, alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
} }
defer warnOfExpensiveRequest(s.getLogger(), time.Now(), stringer, nil, nil) defer warnOfExpensiveRequest(s.getLogger(), s.Cfg.WarningApplyDuration, time.Now(), stringer, nil, nil)
switch r.Method { switch r.Method {
case "POST": case "POST":

View File

@ -119,6 +119,8 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft. // MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint MaxRequestBytes uint
WarningApplyDuration time.Duration
StrictReconfigCheck bool StrictReconfigCheck bool
// ClientCertAuthEnabled is true when cert has been signed by the client CA. // ClientCertAuthEnabled is true when cert has been signed by the client CA.

View File

@ -103,12 +103,12 @@ func (nc *notifier) notify(err error) {
close(nc.c) close(nc.c)
} }
func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) { func warnOfExpensiveRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string var resp string
if !isNil(respMsg) { if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg)) resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
} }
warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err) warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "", resp, err)
} }
func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) { func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
@ -130,7 +130,7 @@ func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer
} }
} }
func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
reqStringer := pb.NewLoggableTxnRequest(r) reqStringer := pb.NewLoggableTxnRequest(r)
var resp string var resp string
if !isNil(txnResponse) { if !isNil(txnResponse) {
@ -145,25 +145,25 @@ func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnR
} }
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse)) resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
} }
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only range ", resp, err)
} }
func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) { func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
var resp string var resp string
if !isNil(rangeResponse) { if !isNil(rangeResponse) {
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse)) resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
} }
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only range ", resp, err)
} }
func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { func warnOfExpensiveGenericRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
d := time.Since(now) d := time.Since(now)
if d > warnApplyDuration { if d > warningApplyDuration {
if lg != nil { if lg != nil {
lg.Warn( lg.Warn(
"apply request took too long", "apply request took too long",
zap.Duration("took", d), zap.Duration("took", d),
zap.Duration("expected-duration", warnApplyDuration), zap.Duration("expected-duration", warningApplyDuration),
zap.String("prefix", prefix), zap.String("prefix", prefix),
zap.String("request", reqStringer.String()), zap.String("request", reqStringer.String()),
zap.String("response", resp), zap.String("response", resp),

View File

@ -97,7 +97,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
var resp *pb.RangeResponse var resp *pb.RangeResponse
var err error var err error
defer func(start time.Time) { defer func(start time.Time) {
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err) warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
if resp != nil { if resp != nil {
trace.AddField( trace.AddField(
traceutil.Field{Key: "response_count", Value: len(resp.Kvs)}, traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
@ -158,7 +158,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
} }
defer func(start time.Time) { defer func(start time.Time) {
warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), start, r, resp, err) warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
}(time.Now()) }(time.Now())
get := func() { resp, err = s.applyV3Base.Txn(r) } get := func() { resp, err = s.applyV3Base.Txn(r) }