From 3a3eb24c692e572015ac31481274efc8e2eb96ae Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Tue, 1 Oct 2019 15:38:52 -0700 Subject: [PATCH] etcdserver: trace raft requests. --- etcdserver/apply.go | 35 ++++++++++++++++------------------- etcdserver/apply_auth.go | 9 +++++---- etcdserver/corrupt.go | 5 +++-- etcdserver/v3_server.go | 8 ++++++++ pkg/traceutil/trace.go | 15 +++++++++++++++ 5 files changed, 47 insertions(+), 25 deletions(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index c1ea27687..81b16f39a 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -34,8 +34,7 @@ import ( ) const ( - warnApplyDuration = 100 * time.Millisecond - rangeTraceThreshold = 100 * time.Millisecond + warnApplyDuration = 100 * time.Millisecond ) type applyResult struct { @@ -45,13 +44,14 @@ type applyResult struct { // to being logically reflected by the node. Currently only used for // Compaction requests. physc <-chan struct{} + trace *traceutil.Trace } // applierV3 is the interface for processing V3 raft messages type applierV3 interface { Apply(r *pb.InternalRaftRequest) *applyResult - Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) + Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) @@ -123,7 +123,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { case r.Range != nil: ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) case r.Put != nil: - ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put) + ar.resp, ar.trace, ar.err = a.s.applyV3.Put(nil, r.Put) case r.DeleteRange != nil: ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) case r.Txn != nil: @@ -176,22 +176,19 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { return ar } -func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) { +func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} - trace := traceutil.New("put", + trace = traceutil.New("put", a.s.getLogger(), traceutil.Field{Key: "key", Value: string(p.Key)}, traceutil.Field{Key: "value", Value: string(p.Value)}, ) - defer func() { - trace.LogIfLong(warnApplyDuration) - }() val, leaseID := p.Value, lease.LeaseID(p.Lease) if txn == nil { if leaseID != lease.NoLease { if l := a.s.lessor.Lookup(leaseID); l == nil { - return nil, lease.ErrLeaseNotFound + return nil, nil, lease.ErrLeaseNotFound } } txn = a.s.KV().Write(trace) @@ -203,14 +200,14 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu trace.StepBegin() rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) if err != nil { - return nil, err + return nil, nil, err } trace.StepEnd("get previous kv pair") } if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { // ignore_{lease,value} flag expects previous key-value pair - return nil, ErrKeyNotFound + return nil, nil, ErrKeyNotFound } } if p.IgnoreValue { @@ -226,7 +223,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu } resp.Header.Revision = txn.Put(p.Key, val, leaseID) - return resp, nil + return resp, trace, nil } func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { @@ -540,7 +537,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat } respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp case *pb.RequestOp_RequestPut: - resp, err := a.Put(txn, tv.RequestPut) + resp, _, err := a.Put(txn, tv.RequestPut) if err != nil { if lg != nil { lg.Panic("unexpected error during txn", zap.Error(err)) @@ -688,8 +685,8 @@ type applierV3Capped struct { // with Puts so that the number of keys in the store is capped. func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } -func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { - return nil, ErrNoSpace +func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { + return nil, nil, ErrNoSpace } func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) { @@ -838,13 +835,13 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { return "aApplierV3{app, NewBackendQuota(s, "v3-applier")} } -func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { +func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { ok := a.q.Available(p) - resp, err := a.applierV3.Put(txn, p) + resp, trace, err := a.applierV3.Put(txn, p) if err == nil && !ok { err = ErrNoSpace } - return resp, err + return resp, trace, err } func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { diff --git a/etcdserver/apply_auth.go b/etcdserver/apply_auth.go index c31644b3d..269af4758 100644 --- a/etcdserver/apply_auth.go +++ b/etcdserver/apply_auth.go @@ -22,6 +22,7 @@ import ( pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/traceutil" ) type authApplierV3 struct { @@ -62,9 +63,9 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { return ret } -func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) { +func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil { - return nil, err + return nil, nil, err } if err := aa.checkLeasePuts(lease.LeaseID(r.Lease)); err != nil { @@ -72,13 +73,13 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon // be written by this user. It means the user cannot revoke the // lease so attaching the lease to the newly written key should // be forbidden. - return nil, err + return nil, nil, err } if r.PrevKv { err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, nil) if err != nil { - return nil, err + return nil, nil, err } } return aa.applierV3.Put(txn, r) diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 0f9a4053f..07f306424 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -382,8 +383,8 @@ type applierV3Corrupt struct { func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} } -func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { - return nil, ErrCorrupt +func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { + return nil, nil, ErrCorrupt } func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 721800dc8..a005d8e2c 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -39,6 +39,8 @@ const ( // However, if the committed entries are very heavy to apply, the gap might grow. // We should stop accepting new proposals if the gap growing to a certain point. maxGapBetweenApplyAndCommitIndex = 5000 + rangeTraceThreshold = 100 * time.Millisecond + putTraceThreshold = 100 * time.Millisecond ) type RaftKV interface { @@ -126,6 +128,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe } func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + ctx = context.WithValue(ctx, "time", time.Now()) resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) if err != nil { return nil, err @@ -549,6 +552,11 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque if result.err != nil { return nil, result.err } + if startTime, ok := ctx.Value("time").(time.Time); ok && result.trace != nil { + applyStart := result.trace.ResetStartTime(startTime) + result.trace.InsertStep(0, applyStart, "process raft request") + result.trace.LogIfLong(putTraceThreshold) + } return result.resp, nil } diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go index d056097a0..2628db665 100644 --- a/pkg/traceutil/trace.go +++ b/pkg/traceutil/trace.go @@ -81,6 +81,21 @@ func Get(ctx context.Context) *Trace { return TODO() } +func (t *Trace) ResetStartTime(time time.Time) (prev time.Time) { + prev = t.startTime + t.startTime = time + return prev +} + +func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) { + newStep := step{time, msg, fields} + if at < len(t.steps) { + t.steps = append(t.steps[:at+1], t.steps[at:]...) + t.steps[at] = newStep + } else { + t.steps = append(t.steps, newStep) + } +} func (t *Trace) Step(msg string, fields ...Field) { if !t.inStep { t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields})