diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index 54f8c67c9..791035e7d 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -39,6 +39,7 @@ import ( "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/pkg/fileutil" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" @@ -384,7 +385,7 @@ func (s *v3Manager) saveDB() error { lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) - txn := mvs.Write() + txn := mvs.Write(traceutil.TODO()) btx := be.BatchTx() del := func(k, v []byte) error { txn.DeleteRange(k, nil) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 02fdea731..c1ea27687 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -179,7 +179,14 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) { resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} - + trace := traceutil.New("put", + a.s.getLogger(), + traceutil.Field{Key: "key", Value: string(p.Key)}, + traceutil.Field{Key: "value", Value: string(p.Value)}, + ) + defer func() { + trace.LogIfLong(warnApplyDuration) + }() val, leaseID := p.Value, lease.LeaseID(p.Lease) if txn == nil { if leaseID != lease.NoLease { @@ -187,16 +194,18 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu return nil, lease.ErrLeaseNotFound } } - txn = a.s.KV().Write() + txn = a.s.KV().Write(trace) defer txn.End() } var rr *mvcc.RangeResult if p.IgnoreValue || p.IgnoreLease || p.PrevKv { + trace.StepBegin() rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) if err != nil { return nil, err } + trace.StepEnd("get previous kv pair") } if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { @@ -226,7 +235,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ end := mkGteRange(dr.RangeEnd) if txn == nil { - txn = a.s.kv.Write() + txn = a.s.kv.Write(traceutil.TODO()) defer txn.End() } @@ -369,7 +378,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { // be the revision of the write txn. if isWrite { txn.End() - txn = a.s.KV().Write() + txn = a.s.KV().Write(traceutil.TODO()) } a.applyTxn(txn, rt, txnPath, txnResp) rev := txn.Rev() diff --git a/etcdserver/server.go b/etcdserver/server.go index 78daa0ea9..e2a5fa004 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -50,6 +50,7 @@ import ( "go.etcd.io/etcd/pkg/pbutil" "go.etcd.io/etcd/pkg/runtime" "go.etcd.io/etcd/pkg/schedule" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.etcd.io/etcd/pkg/wait" "go.etcd.io/etcd/raft" @@ -1178,7 +1179,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { plog.Info("recovering lessor...") } - s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() }) + s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) }) if lg != nil { lg.Info("restored lease store") diff --git a/mvcc/kv.go b/mvcc/kv.go index 065b90799..b7d2a1472 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -106,7 +106,7 @@ type KV interface { Read(trace *traceutil.Trace) TxnRead // Write creates a write transaction. - Write() TxnWrite + Write(trace *traceutil.Trace) TxnWrite // Hash computes the hash of the KV's backend. Hash() (hash uint32, revision int64, err error) diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 6c72d0879..38e17e1ca 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -57,7 +57,7 @@ var ( return kv.Put(key, value, lease) } txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 { - txn := kv.Write() + txn := kv.Write(traceutil.TODO()) defer txn.End() return txn.Put(key, value, lease) } @@ -66,7 +66,7 @@ var ( return kv.DeleteRange(key, end) } txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) { - txn := kv.Write() + txn := kv.Write(traceutil.TODO()) defer txn.End() return txn.DeleteRange(key, end) } @@ -410,7 +410,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) { func() { s.DeleteRange([]byte("foo"), nil) }, } for i, tt := range tests { - txn := s.Write() + txn := s.Write(traceutil.TODO()) done := make(chan struct{}, 1) go func() { tt() @@ -439,7 +439,7 @@ func TestKVTxnNonBlockRange(t *testing.T) { s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) - txn := s.Write() + txn := s.Write(traceutil.TODO()) defer txn.End() donec := make(chan struct{}) @@ -461,7 +461,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) base := int64(i + 1) // put foo diff --git a/mvcc/kv_view.go b/mvcc/kv_view.go index 9750fd764..d4f0ca688 100644 --- a/mvcc/kv_view.go +++ b/mvcc/kv_view.go @@ -42,13 +42,13 @@ func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err type writeView struct{ kv KV } func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) { - tw := wv.kv.Write() + tw := wv.kv.Write(traceutil.TODO()) defer tw.End() return tw.DeleteRange(key, end) } func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) { - tw := wv.kv.Write() + tw := wv.kv.Write(traceutil.TODO()) defer tw.End() return tw.Put(key, value, lease) } diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index c2ef4b252..f398dd59f 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/schedule" + "go.etcd.io/etcd/pkg/traceutil" "github.com/coreos/pkg/capnslog" "go.uber.org/zap" @@ -140,7 +141,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI s.ReadView = &readView{s} s.WriteView = &writeView{s} if s.le != nil { - s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } tx := s.b.BatchTx() diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index 4e7c9a497..e6a4af840 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -20,6 +20,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -130,7 +131,7 @@ func BenchmarkStoreTxnPut(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) txn.Put(keys[i], vals[i], lease.NoLease) txn.End() } @@ -151,7 +152,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < revsPerKey; j++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) txn.Put(keys[i], vals[i], lease.NoLease) txn.End() } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index c4180c963..fc2b33204 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -640,7 +640,7 @@ func TestTxnPut(t *testing.T) { defer cleanup(s, b, tmpPath) for i := 0; i < sliceN; i++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) base := int64(i + 2) if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base { t.Errorf("#%d: rev = %d, want %d", i, rev, base) @@ -731,7 +731,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { defer wg.Done() time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time - tx := s.Write() + tx := s.Write(traceutil.TODO()) numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1 var pendingKvs kvs for j := 0; j < numOfPuts; j++ { diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 27afe889b..716a6d82f 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -64,12 +64,12 @@ type storeTxnWrite struct { changes []mvccpb.KeyValue } -func (s *store) Write() TxnWrite { +func (s *store) Write(trace *traceutil.Trace) TxnWrite { s.mu.RLock() tx := s.b.BatchTx() tx.Lock() tw := &storeTxnWrite{ - storeTxnRead: storeTxnRead{s, tx, 0, 0, traceutil.TODO()}, + storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, tx: tx, beginRev: s.currentRev, changes: make([]mvccpb.KeyValue, 0, 4), @@ -183,7 +183,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { c = created.main oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) } - + tw.trace.Step("get key's previous created_revision and leaseID") ibytes := newRevBytes() idxRev := revision{main: rev, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) @@ -210,9 +210,11 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { } } + tw.trace.Step("marshal mvccpb.KeyValue") tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) tw.s.kvindex.Put(key, idxRev) tw.changes = append(tw.changes, kv) + tw.trace.Step("store kv pair into bolt db") if oldLease != lease.NoLease { if tw.s.le == nil { @@ -239,6 +241,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { panic("unexpected error from lease Attach") } } + tw.trace.Step("attach lease to kv pair") } func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 { diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 3cf491d1f..a51e5aa52 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -21,6 +21,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -84,7 +85,7 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co s.store.WriteView = &writeView{s} if s.le != nil { // use this store as the deleter so revokes trigger watch events - s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } s.wg.Add(2) go s.syncWatchersLoop() diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 0f8fb578d..0f553493f 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -21,6 +21,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -59,7 +60,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) txn.Put(keys[i], vals[i], lease.NoLease) txn.End() } diff --git a/mvcc/watchable_store_txn.go b/mvcc/watchable_store_txn.go index 3bcfa4d75..70b12983d 100644 --- a/mvcc/watchable_store_txn.go +++ b/mvcc/watchable_store_txn.go @@ -14,7 +14,10 @@ package mvcc -import "go.etcd.io/etcd/mvcc/mvccpb" +import ( + "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" +) func (tw *watchableStoreTxnWrite) End() { changes := tw.Changes() @@ -48,4 +51,6 @@ type watchableStoreTxnWrite struct { s *watchableStore } -func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} } +func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite { + return &watchableStoreTxnWrite{s.store.Write(trace), s} +} diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go index f0b71bb9c..d056097a0 100644 --- a/pkg/traceutil/trace.go +++ b/pkg/traceutil/trace.go @@ -56,6 +56,7 @@ type Trace struct { fields []Field startTime time.Time steps []step + inStep bool } type step struct { @@ -81,7 +82,18 @@ func Get(ctx context.Context) *Trace { } func (t *Trace) Step(msg string, fields ...Field) { - t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) + if !t.inStep { + t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) + } +} + +func (t *Trace) StepBegin() { + t.inStep = true +} + +func (t *Trace) StepEnd(msg string, fields ...Field) { + t.inStep = false + t.Step(msg, fields...) } func (t *Trace) AddField(fields ...Field) {