diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 81b16f39a..822b5e322 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -55,7 +55,7 @@ type applierV3 interface { Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) - Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) + Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) @@ -129,7 +129,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { case r.Txn != nil: ar.resp, ar.err = a.s.applyV3.Txn(r.Txn) case r.Compaction != nil: - ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction) + ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction) case r.LeaseGrant != nil: ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant) case r.LeaseRevoke != nil: @@ -182,7 +182,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu trace = traceutil.New("put", a.s.getLogger(), traceutil.Field{Key: "key", Value: string(p.Key)}, - traceutil.Field{Key: "value", Value: string(p.Value)}, + traceutil.Field{Key: "req_size", Value: proto.Size(p)}, ) val, leaseID := p.Value, lease.LeaseID(p.Lease) if txn == nil { @@ -197,12 +197,13 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu var rr *mvcc.RangeResult if p.IgnoreValue || p.IgnoreLease || p.PrevKv { - trace.StepBegin() + trace.DisableStep() rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) if err != nil { return nil, nil, err } - trace.StepEnd("get previous kv pair") + trace.EnableStep() + trace.Step("get previous kv pair") } if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { @@ -223,6 +224,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu } resp.Header.Revision = txn.Put(p.Key, val, leaseID) + trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) return resp, trace, nil } @@ -568,17 +570,22 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat return txns } -func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) { +func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { resp := &pb.CompactionResponse{} resp.Header = &pb.ResponseHeader{} - ch, err := a.s.KV().Compact(compaction.Revision) + trace := traceutil.New("compact", + a.s.getLogger(), + traceutil.Field{Key: "revision", Value: compaction.Revision}, + ) + + ch, err := a.s.KV().Compact(trace, compaction.Revision) if err != nil { - return nil, ch, err + return nil, ch, nil, err } // get the current revision. which key to get is not important. rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{}) resp.Header.Revision = rr.Rev - return resp, ch, err + return resp, ch, trace, err } func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 07f306424..2351eef44 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -399,8 +399,8 @@ func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { return nil, ErrCorrupt } -func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) { - return nil, nil, ErrCorrupt +func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { + return nil, nil, nil, ErrCorrupt } func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index a005d8e2c..bfe08ea35 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -39,8 +39,7 @@ const ( // However, if the committed entries are very heavy to apply, the gap might grow. // We should stop accepting new proposals if the gap growing to a certain point. maxGapBetweenApplyAndCommitIndex = 5000 - rangeTraceThreshold = 100 * time.Millisecond - putTraceThreshold = 100 * time.Millisecond + traceThreshold = 100 * time.Millisecond ) type RaftKV interface { @@ -93,7 +92,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe traceutil.Field{Key: "range_begin", Value: string(r.Key)}, traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)}, ) - ctx = context.WithValue(ctx, traceutil.CtxKey, trace) + ctx = context.WithValue(ctx, traceutil.TraceKey, trace) var resp *pb.RangeResponse var err error @@ -105,7 +104,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}, ) } - trace.LogIfLong(rangeTraceThreshold) + trace.LogIfLong(traceThreshold) }(time.Now()) if !r.Serializable { @@ -128,7 +127,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe } func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { - ctx = context.WithValue(ctx, "time", time.Now()) + ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now()) resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) if err != nil { return nil, err @@ -205,7 +204,18 @@ func isTxnReadonly(r *pb.TxnRequest) bool { } func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + startTime := time.Now() result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r}) + trace := traceutil.TODO() + if result != nil && result.trace != nil { + trace = result.trace + defer func() { + trace.LogIfLong(traceThreshold) + }() + applyStart := result.trace.GetStartTime() + result.trace.SetStartTime(startTime) + trace.InsertStep(0, applyStart, "process raft request") + } if r.Physical && result != nil && result.physc != nil { <-result.physc // The compaction is done deleting keys; the hash is now settled @@ -214,6 +224,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. // if the compaction resumes. Force the finished compaction to // commit so it won't resume following a crash. s.be.ForceCommit() + trace.Step("physically apply compaction") } if err != nil { return nil, err @@ -229,6 +240,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. resp.Header = &pb.ResponseHeader{} } resp.Header.Revision = s.kv.Rev() + trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) return resp, nil } @@ -552,10 +564,14 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque if result.err != nil { return nil, result.err } - if startTime, ok := ctx.Value("time").(time.Time); ok && result.trace != nil { - applyStart := result.trace.ResetStartTime(startTime) + if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.trace != nil { + applyStart := result.trace.GetStartTime() + // The trace object is created in apply. Here reset the start time to trace + // the raft request time by the difference between the request start time + // and apply start time + result.trace.SetStartTime(startTime) result.trace.InsertStep(0, applyStart, "process raft request") - result.trace.LogIfLong(putTraceThreshold) + result.trace.LogIfLong(traceThreshold) } return result.resp, nil } diff --git a/integration/v3_alarm_test.go b/integration/v3_alarm_test.go index 443c2aae1..0b2dd05ce 100644 --- a/integration/v3_alarm_test.go +++ b/integration/v3_alarm_test.go @@ -27,6 +27,7 @@ import ( "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/pkg/testutil" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -173,7 +174,7 @@ func TestV3CorruptAlarm(t *testing.T) { // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'. s.Put([]byte("abc"), []byte("def"), 0) s.Put([]byte("xyz"), []byte("123"), 0) - s.Compact(5) + s.Compact(traceutil.TODO(), 5) s.Commit() s.Close() be.Close() diff --git a/mvcc/kv.go b/mvcc/kv.go index b7d2a1472..c057f9261 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -115,7 +115,7 @@ type KV interface { HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) // Compact frees all superseded keys with revisions less than rev. - Compact(rev int64) (<-chan struct{}, error) + Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) // Commit commits outstanding txns into the underlying backend. Commit() diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 38e17e1ca..466040790 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -183,7 +183,7 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) { defer cleanup(s, b, tmpPath) put3TestKVs(s) - if _, err := s.Compact(4); err != nil { + if _, err := s.Compact(traceutil.TODO(), 4); err != nil { t.Fatalf("compact error (%v)", err) } @@ -545,7 +545,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { }, } for i, tt := range tests { - _, err := s.Compact(tt.rev) + _, err := s.Compact(traceutil.TODO(), tt.rev) if err != nil { t.Errorf("#%d: unexpect compact error %v", i, err) } @@ -581,7 +581,7 @@ func TestKVCompactBad(t *testing.T) { {100, ErrFutureRev}, } for i, tt := range tests { - _, err := s.Compact(tt.rev) + _, err := s.Compact(traceutil.TODO(), tt.rev) if err != tt.werr { t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr) } @@ -627,7 +627,7 @@ func TestKVRestore(t *testing.T) { func(kv KV) { kv.Put([]byte("foo"), []byte("bar0"), 1) kv.Put([]byte("foo"), []byte("bar1"), 2) - kv.Compact(1) + kv.Compact(traceutil.TODO(), 1) }, } for i, tt := range tests { diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index f99900660..7e6c0046b 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -271,9 +271,10 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { return nil, nil } -func (s *store) compact(rev int64) (<-chan struct{}, error) { +func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { start := time.Now() keep := s.kvindex.Compact(rev) + trace.Step("compact in-memory index tree") ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { @@ -290,6 +291,7 @@ func (s *store) compact(rev int64) (<-chan struct{}, error) { s.fifoSched.Schedule(j) indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) + trace.Step("schedule compaction") return ch, nil } @@ -299,21 +301,21 @@ func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { return ch, err } - return s.compact(rev) + return s.compact(traceutil.TODO(), rev) } -func (s *store) Compact(rev int64) (<-chan struct{}, error) { +func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() ch, err := s.updateCompactRev(rev) - + trace.Step("check and update compact revision") if err != nil { s.mu.Unlock() return ch, err } s.mu.Unlock() - return s.compact(rev) + return s.compact(trace, rev) } // DefaultIgnores is a map of keys to ignore in hash checking. diff --git a/mvcc/kvstore_compaction_test.go b/mvcc/kvstore_compaction_test.go index 1d5c63261..d1e576dcb 100644 --- a/mvcc/kvstore_compaction_test.go +++ b/mvcc/kvstore_compaction_test.go @@ -22,6 +22,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -109,7 +110,7 @@ func TestCompactAllAndRestore(t *testing.T) { rev := s0.Rev() // compact all keys - done, err := s0.Compact(rev) + done, err := s0.Compact(traceutil.TODO(), rev) if err != nil { t.Fatal(err) } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index fc2b33204..eb9b1f130 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -332,7 +332,7 @@ func TestStoreCompact(t *testing.T) { key2 := newTestKeyBytes(revision{2, 0}, false) b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil} - s.Compact(3) + s.Compact(traceutil.TODO(), 3) s.fifoSched.WaitFinish(1) if s.compactMainRev != 3 { @@ -583,7 +583,7 @@ func TestHashKVWhenCompacting(t *testing.T) { go func() { defer wg.Done() for i := 100; i >= 0; i-- { - _, err := s.Compact(int64(rev - 1 - i)) + _, err := s.Compact(traceutil.TODO(), int64(rev-1-i)) if err != nil { t.Error(err) } @@ -610,7 +610,7 @@ func TestHashKVZeroRevision(t *testing.T) { for i := 2; i <= rev; i++ { s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease) } - if _, err := s.Compact(int64(rev / 2)); err != nil { + if _, err := s.Compact(traceutil.TODO(), int64(rev/2)); err != nil { t.Fatal(err) } diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index fd496ad75..e4d0cd62e 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -237,7 +238,7 @@ func TestWatchCompacted(t *testing.T) { for i := 0; i < maxRev; i++ { s.Put(testKey, testValue, lease.NoLease) } - _, err := s.Compact(compactRev) + _, err := s.Compact(traceutil.TODO(), compactRev) if err != nil { t.Fatalf("failed to compact kv (%v)", err) } diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go index 2628db665..2d247dd9a 100644 --- a/pkg/traceutil/trace.go +++ b/pkg/traceutil/trace.go @@ -25,7 +25,10 @@ import ( "go.uber.org/zap" ) -const CtxKey = "trace" +const ( + TraceKey = "trace" + StartTimeKey = "startTime" +) // Field is a kv pair to record additional details of the trace. type Field struct { @@ -51,12 +54,12 @@ func writeFields(fields []Field) string { } type Trace struct { - operation string - lg *zap.Logger - fields []Field - startTime time.Time - steps []step - inStep bool + operation string + lg *zap.Logger + fields []Field + startTime time.Time + steps []step + stepDisabled bool } type step struct { @@ -75,16 +78,18 @@ func TODO() *Trace { } func Get(ctx context.Context) *Trace { - if trace, ok := ctx.Value(CtxKey).(*Trace); ok && trace != nil { + if trace, ok := ctx.Value(TraceKey).(*Trace); ok && trace != nil { return trace } return TODO() } -func (t *Trace) ResetStartTime(time time.Time) (prev time.Time) { - prev = t.startTime +func (t *Trace) GetStartTime() time.Time { + return t.startTime +} + +func (t *Trace) SetStartTime(time time.Time) { t.startTime = time - return prev } func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) { @@ -96,19 +101,22 @@ func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) t.steps = append(t.steps, newStep) } } + +// Step adds step to trace func (t *Trace) Step(msg string, fields ...Field) { - if !t.inStep { + if !t.stepDisabled { t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) } } -func (t *Trace) StepBegin() { - t.inStep = true +// DisableStep sets the flag to prevent the trace from adding steps +func (t *Trace) DisableStep() { + t.stepDisabled = true } -func (t *Trace) StepEnd(msg string, fields ...Field) { - t.inStep = false - t.Step(msg, fields...) +// EnableStep re-enable the trace to add steps +func (t *Trace) EnableStep() { + t.stepDisabled = false } func (t *Trace) AddField(fields ...Field) { @@ -149,7 +157,7 @@ func (t *Trace) logInfo(threshold time.Duration) (string, []zap.Field) { for _, step := range t.steps { stepDuration := step.time.Sub(lastStepTime) if stepDuration > threshold { - steps = append(steps, fmt.Sprintf("trace[%d] step '%v' %s (duration: %v)", + steps = append(steps, fmt.Sprintf("trace[%d] '%v' %s (duration: %v)", traceNum, step.msg, writeFields(step.fields), stepDuration)) } lastStepTime = step.time diff --git a/pkg/traceutil/trace_test.go b/pkg/traceutil/trace_test.go index 59111ec89..9b9928876 100644 --- a/pkg/traceutil/trace_test.go +++ b/pkg/traceutil/trace_test.go @@ -41,7 +41,7 @@ func TestGet(t *testing.T) { }, { name: "When the context has trace", - inputCtx: context.WithValue(context.Background(), CtxKey, traceForTest), + inputCtx: context.WithValue(context.Background(), TraceKey, traceForTest), outputTrace: traceForTest, }, } diff --git a/tools/benchmark/cmd/mvcc-put.go b/tools/benchmark/cmd/mvcc-put.go index 026693efe..200db9f02 100644 --- a/tools/benchmark/cmd/mvcc-put.go +++ b/tools/benchmark/cmd/mvcc-put.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/pkg/report" + "go.etcd.io/etcd/pkg/traceutil" "github.com/spf13/cobra" ) @@ -114,7 +115,7 @@ func mvccPutFunc(cmd *cobra.Command, args []string) { for i := 0; i < mvccTotalRequests; i++ { st := time.Now() - tw := s.Write() + tw := s.Write(traceutil.TODO()) for j := i; j < i+nrTxnOps; j++ { tw.Put(keys[j], vals[j], lease.NoLease) }