From f4e7fc56a711c563c5f38d6829bf51f090130a53 Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Mon, 9 Sep 2019 15:38:03 -0700 Subject: [PATCH] pkg: create package traceutil for tracing. mvcc: add tracing steps:range from the in-memory index tree; range from boltdb. etcdserver: add tracing steps: agreement among raft nodes before linerized reading; authentication; filter and sort kv pairs; assemble the response. --- etcdserver/apply.go | 23 +++++++++----- etcdserver/apply_auth.go | 5 ++-- etcdserver/corrupt.go | 2 +- etcdserver/util.go | 14 +++++---- etcdserver/v3_server.go | 13 ++++++-- mvcc/kv.go | 3 +- mvcc/kv_test.go | 2 +- mvcc/kv_view.go | 6 ++-- mvcc/kvstore_test.go | 6 ++-- mvcc/kvstore_txn.go | 15 ++++++++-- pkg/traceutil/trace.go | 60 +++++++++++++++++++++++++++++++++++++ pkg/traceutil/trace_test.go | 28 +++++++++++++++++ 12 files changed, 149 insertions(+), 28 deletions(-) create mode 100644 pkg/traceutil/trace.go create mode 100644 pkg/traceutil/trace_test.go diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 1f06ad0dd..f5b3c24b0 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "github.com/gogo/protobuf/proto" @@ -50,7 +51,7 @@ type applierV3 interface { Apply(r *pb.InternalRaftRequest) *applyResult Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) - Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) + Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) @@ -119,7 +120,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls switch { case r.Range != nil: - ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range) + ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) case r.Put != nil: ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put) case r.DeleteRange != nil: @@ -245,12 +246,18 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ return resp, nil } -func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { + trace, ok := ctx.Value("trace").(*traceutil.Trace) + if !ok || trace == nil { + trace = traceutil.New("Apply Range") + ctx = context.WithValue(ctx, "trace", trace) + } + resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} if txn == nil { - txn = a.s.kv.Read() + txn = a.s.kv.Read(trace) defer txn.End() } @@ -327,7 +334,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang rr.KVs = rr.KVs[:r.Limit] resp.More = true } - + trace.Step("Filter and sort the key-value pairs.") resp.Header.Revision = rr.Rev resp.Count = int64(rr.Count) resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) @@ -337,12 +344,14 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang } resp.Kvs[i] = &rr.KVs[i] } + trace.Step("Assemble the response.") return resp, nil } func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { isWrite := !isTxnReadonly(rt) - txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read()) + trace := traceutil.New("ReadOnlyTxn") + txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace)) txnPath := compareToPath(txn, rt) if isWrite { @@ -516,7 +525,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat respi := tresp.Responses[i].Response switch tv := req.Request.(type) { case *pb.RequestOp_RequestRange: - resp, err := a.Range(txn, tv.RequestRange) + resp, err := a.Range(context.TODO(), txn, tv.RequestRange) if err != nil { if lg != nil { lg.Panic("unexpected error during txn", zap.Error(err)) diff --git a/etcdserver/apply_auth.go b/etcdserver/apply_auth.go index 4b094ad5d..c31644b3d 100644 --- a/etcdserver/apply_auth.go +++ b/etcdserver/apply_auth.go @@ -15,6 +15,7 @@ package etcdserver import ( + "context" "sync" "go.etcd.io/etcd/auth" @@ -83,11 +84,11 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon return aa.applierV3.Put(txn, r) } -func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err } - return aa.applierV3.Range(txn, r) + return aa.applierV3.Range(ctx, txn, r) } func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 32678a7c5..0f9a4053f 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -386,7 +386,7 @@ func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResp return nil, ErrCorrupt } -func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { +func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { return nil, ErrCorrupt } diff --git a/etcdserver/util.go b/etcdserver/util.go index fe5024ef0..4db42d065 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -24,6 +24,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/rafthttp" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -108,7 +109,7 @@ func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Strin if !isNil(respMsg) { resp = fmt.Sprintf("size:%d", proto.Size(respMsg)) } - warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err) + warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "", resp, err) } func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { @@ -126,18 +127,18 @@ func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnR } resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse)) } - warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) + warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "read-only range ", resp, err) } -func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) { +func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) { var resp string if !isNil(rangeResponse) { resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse)) } - warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) + warnOfExpensiveGenericRequest(lg, trace, now, reqStringer, "read-only range ", resp, err) } -func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { +func warnOfExpensiveGenericRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { d := time.Since(now) if d > warnApplyDuration { if lg != nil { @@ -159,6 +160,9 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fm } plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d) } + if trace != nil { + trace.Log(lg) + } slowApplies.Inc() } } diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index b2084618b..efe3bfe35 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/lease/leasehttp" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/raft" "github.com/gogo/protobuf/proto" @@ -85,14 +86,18 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + trace := traceutil.New("Range") + ctx = context.WithValue(ctx, "trace", trace) + var resp *pb.RangeResponse var err error defer func(start time.Time) { - warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err) + warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), trace, start, r, resp, err) }(time.Now()) if !r.Serializable { err = s.linearizableReadNotify(ctx) + trace.Step("Agreement among raft nodes before linearized reading.") if err != nil { return nil, err } @@ -101,7 +106,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, err = s.applyV3Base.Range(nil, r) } + get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { err = serr return nil, err @@ -558,6 +563,10 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e if err = chk(ai); err != nil { return err } + + if trace, ok := ctx.Value("trace").(*traceutil.Trace); ok && trace != nil { + trace.Step("Authentication.") + } // fetch response for serialized request get() // check for stale token revision in case the auth store was updated while diff --git a/mvcc/kv.go b/mvcc/kv.go index 8e898a5ad..065b90799 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -18,6 +18,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" ) type RangeOptions struct { @@ -102,7 +103,7 @@ type KV interface { WriteView // Read creates a read transaction. - Read() TxnRead + Read(trace *traceutil.Trace) TxnRead // Write creates a write transaction. Write() TxnWrite diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 012537a4e..673cfba74 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -47,7 +47,7 @@ var ( return kv.Range(key, end, ro) } txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) { - txn := kv.Read() + txn := kv.Read(nil) defer txn.End() return txn.Range(key, end, ro) } diff --git a/mvcc/kv_view.go b/mvcc/kv_view.go index bd2e77729..56070d18f 100644 --- a/mvcc/kv_view.go +++ b/mvcc/kv_view.go @@ -19,19 +19,19 @@ import "go.etcd.io/etcd/lease" type readView struct{ kv KV } func (rv *readView) FirstRev() int64 { - tr := rv.kv.Read() + tr := rv.kv.Read(nil) defer tr.End() return tr.FirstRev() } func (rv *readView) Rev() int64 { - tr := rv.kv.Read() + tr := rv.kv.Read(nil) defer tr.End() return tr.Rev() } func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - tr := rv.kv.Read() + tr := rv.kv.Read(nil) defer tr.End() return tr.Range(key, end, ro) } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index cac11e1f8..cde1954de 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { s.Put([]byte("foo"), []byte("bar"), lease.NoLease) // readTx simulates a long read request - readTx1 := s.Read() + readTx1 := s.Read(nil) // write should not be blocked by reads done := make(chan struct{}) @@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { } // readTx2 simulates a short read request - readTx2 := s.Read() + readTx2 := s.Read(nil) ro := RangeOptions{Limit: 1, Rev: 0, Count: false} ret, err := readTx2.Range([]byte("foo"), nil, ro) if err != nil { @@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { mu.Lock() wKVs := make(kvs, len(committedKVs)) copy(wKVs, committedKVs) - tx := s.Read() + tx := s.Read(nil) mu.Unlock() // get all keys in backend store, and compare with wKVs ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{}) diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 969825464..c5e5c973e 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -18,6 +18,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -27,9 +28,11 @@ type storeTxnRead struct { firstRev int64 rev int64 + + trace *traceutil.Trace } -func (s *store) Read() TxnRead { +func (s *store) Read(trace *traceutil.Trace) TxnRead { s.mu.RLock() s.revMu.RLock() // backend holds b.readTx.RLock() only when creating the concurrentReadTx. After @@ -38,7 +41,7 @@ func (s *store) Read() TxnRead { tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created. firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() - return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev}) + return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace}) } func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev } @@ -66,7 +69,7 @@ func (s *store) Write() TxnWrite { tx := s.b.BatchTx() tx.Lock() tw := &storeTxnWrite{ - storeTxnRead: storeTxnRead{s, tx, 0, 0}, + storeTxnRead: storeTxnRead{s, tx, 0, 0, nil}, tx: tx, beginRev: s.currentRev, changes: make([]mvccpb.KeyValue, 0, 4), @@ -124,6 +127,9 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } revpairs := tr.s.kvindex.Revisions(key, end, rev) + if tr.trace != nil { + tr.trace.Step("Range keys from in-memory index tree.") + } if len(revpairs) == 0 { return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil } @@ -163,6 +169,9 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } } } + if tr.trace != nil { + tr.trace.Step("Range keys from bolt db.") + } return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil } diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go new file mode 100644 index 000000000..e21f6ef09 --- /dev/null +++ b/pkg/traceutil/trace.go @@ -0,0 +1,60 @@ +package traceutil + +import ( + "bytes" + "fmt" + "time" + + "github.com/coreos/pkg/capnslog" + "go.uber.org/zap" +) + +var ( + plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "trace") +) + +type Trace struct { + operation string + startTime time.Time + steps []step +} + +type step struct { + time time.Time + msg string +} + +func New(op string) *Trace { + return &Trace{operation: op, startTime: time.Now()} +} + +func (t *Trace) Step(msg string) { + t.steps = append(t.steps, step{time: time.Now(), msg: msg}) +} + +// Dump all steps in the Trace +func (t *Trace) Log(lg *zap.Logger) { + + var buf bytes.Buffer + + buf.WriteString(fmt.Sprintf("The tracing of %v request:\n", t.operation)) + + buf.WriteString("Request started at:") + buf.WriteString(t.startTime.Format("2006-01-02 15:04:05")) + buf.WriteString(fmt.Sprintf(".%06d", t.startTime.Nanosecond()/1000)) + buf.WriteString("\n") + lastStepTime := t.startTime + for i, step := range t.steps { + buf.WriteString(fmt.Sprintf("Step %d: %v Time cost: %v\n", i, step.msg, step.time.Sub(lastStepTime))) + //fmt.Println(step.msg, " costs: ", step.time.Sub(lastStepTime)) + lastStepTime = step.time + } + buf.WriteString("Trace End\n") + + s := buf.String() + if lg != nil { + lg.Info(s) + } else { + plog.Info(s) + } +} diff --git a/pkg/traceutil/trace_test.go b/pkg/traceutil/trace_test.go new file mode 100644 index 000000000..2517c5463 --- /dev/null +++ b/pkg/traceutil/trace_test.go @@ -0,0 +1,28 @@ +package traceutil + +import ( + "testing" +) + +func TestTrace(t *testing.T) { + var ( + op = "Test" + steps = []string{"Step1, Step2"} + ) + + trace := New(op) + if trace.operation != op { + t.Errorf("Expected %v, got %v\n", op, trace.operation) + } + + for _, v := range steps { + trace.Step(v) + trace.Step(v) + } + + for i, v := range steps { + if v != trace.steps[i].msg { + t.Errorf("Expected %v, got %v\n.", v, trace.steps[i].msg) + } + } +}