mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #11179 from YoyinZyc/trace
Add tracing to range request in etcd server.
This commit is contained in:
commit
340f0ac797
@ -39,6 +39,7 @@ import (
|
|||||||
"go.etcd.io/etcd/mvcc"
|
"go.etcd.io/etcd/mvcc"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
"go.etcd.io/etcd/pkg/fileutil"
|
"go.etcd.io/etcd/pkg/fileutil"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
"go.etcd.io/etcd/pkg/types"
|
"go.etcd.io/etcd/pkg/types"
|
||||||
"go.etcd.io/etcd/raft"
|
"go.etcd.io/etcd/raft"
|
||||||
"go.etcd.io/etcd/raft/raftpb"
|
"go.etcd.io/etcd/raft/raftpb"
|
||||||
@ -384,7 +385,7 @@ func (s *v3Manager) saveDB() error {
|
|||||||
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
|
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
|
||||||
|
|
||||||
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
||||||
txn := mvs.Write()
|
txn := mvs.Write(traceutil.TODO())
|
||||||
btx := be.BatchTx()
|
btx := be.BatchTx()
|
||||||
del := func(k, v []byte) error {
|
del := func(k, v []byte) error {
|
||||||
txn.DeleteRange(k, nil)
|
txn.DeleteRange(k, nil)
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/mvcc"
|
"go.etcd.io/etcd/mvcc"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
"go.etcd.io/etcd/pkg/types"
|
"go.etcd.io/etcd/pkg/types"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
@ -43,17 +44,18 @@ type applyResult struct {
|
|||||||
// to being logically reflected by the node. Currently only used for
|
// to being logically reflected by the node. Currently only used for
|
||||||
// Compaction requests.
|
// Compaction requests.
|
||||||
physc <-chan struct{}
|
physc <-chan struct{}
|
||||||
|
trace *traceutil.Trace
|
||||||
}
|
}
|
||||||
|
|
||||||
// applierV3 is the interface for processing V3 raft messages
|
// applierV3 is the interface for processing V3 raft messages
|
||||||
type applierV3 interface {
|
type applierV3 interface {
|
||||||
Apply(r *pb.InternalRaftRequest) *applyResult
|
Apply(r *pb.InternalRaftRequest) *applyResult
|
||||||
|
|
||||||
Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
|
Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
|
||||||
Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
||||||
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
|
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
|
||||||
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
|
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
|
||||||
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
|
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)
|
||||||
|
|
||||||
LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
|
LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
|
||||||
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
|
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
|
||||||
@ -119,15 +121,15 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
|||||||
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
||||||
switch {
|
switch {
|
||||||
case r.Range != nil:
|
case r.Range != nil:
|
||||||
ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
|
ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
|
||||||
case r.Put != nil:
|
case r.Put != nil:
|
||||||
ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
|
ar.resp, ar.trace, ar.err = a.s.applyV3.Put(nil, r.Put)
|
||||||
case r.DeleteRange != nil:
|
case r.DeleteRange != nil:
|
||||||
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
|
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
|
||||||
case r.Txn != nil:
|
case r.Txn != nil:
|
||||||
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
|
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
|
||||||
case r.Compaction != nil:
|
case r.Compaction != nil:
|
||||||
ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction)
|
ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
|
||||||
case r.LeaseGrant != nil:
|
case r.LeaseGrant != nil:
|
||||||
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
|
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
|
||||||
case r.LeaseRevoke != nil:
|
case r.LeaseRevoke != nil:
|
||||||
@ -174,32 +176,39 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
|||||||
return ar
|
return ar
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
|
func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
|
||||||
resp = &pb.PutResponse{}
|
resp = &pb.PutResponse{}
|
||||||
resp.Header = &pb.ResponseHeader{}
|
resp.Header = &pb.ResponseHeader{}
|
||||||
|
trace = traceutil.New("put",
|
||||||
|
a.s.getLogger(),
|
||||||
|
traceutil.Field{Key: "key", Value: string(p.Key)},
|
||||||
|
traceutil.Field{Key: "req_size", Value: proto.Size(p)},
|
||||||
|
)
|
||||||
val, leaseID := p.Value, lease.LeaseID(p.Lease)
|
val, leaseID := p.Value, lease.LeaseID(p.Lease)
|
||||||
if txn == nil {
|
if txn == nil {
|
||||||
if leaseID != lease.NoLease {
|
if leaseID != lease.NoLease {
|
||||||
if l := a.s.lessor.Lookup(leaseID); l == nil {
|
if l := a.s.lessor.Lookup(leaseID); l == nil {
|
||||||
return nil, lease.ErrLeaseNotFound
|
return nil, nil, lease.ErrLeaseNotFound
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
txn = a.s.KV().Write()
|
txn = a.s.KV().Write(trace)
|
||||||
defer txn.End()
|
defer txn.End()
|
||||||
}
|
}
|
||||||
|
|
||||||
var rr *mvcc.RangeResult
|
var rr *mvcc.RangeResult
|
||||||
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
|
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
|
||||||
|
trace.DisableStep()
|
||||||
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
|
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
trace.EnableStep()
|
||||||
|
trace.Step("get previous kv pair")
|
||||||
}
|
}
|
||||||
if p.IgnoreValue || p.IgnoreLease {
|
if p.IgnoreValue || p.IgnoreLease {
|
||||||
if rr == nil || len(rr.KVs) == 0 {
|
if rr == nil || len(rr.KVs) == 0 {
|
||||||
// ignore_{lease,value} flag expects previous key-value pair
|
// ignore_{lease,value} flag expects previous key-value pair
|
||||||
return nil, ErrKeyNotFound
|
return nil, nil, ErrKeyNotFound
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if p.IgnoreValue {
|
if p.IgnoreValue {
|
||||||
@ -215,7 +224,8 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
|
|||||||
}
|
}
|
||||||
|
|
||||||
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
|
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
|
||||||
return resp, nil
|
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
|
||||||
|
return resp, trace, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||||
@ -224,7 +234,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
|
|||||||
end := mkGteRange(dr.RangeEnd)
|
end := mkGteRange(dr.RangeEnd)
|
||||||
|
|
||||||
if txn == nil {
|
if txn == nil {
|
||||||
txn = a.s.kv.Write()
|
txn = a.s.kv.Write(traceutil.TODO())
|
||||||
defer txn.End()
|
defer txn.End()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -245,12 +255,14 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||||
|
trace := traceutil.Get(ctx)
|
||||||
|
|
||||||
resp := &pb.RangeResponse{}
|
resp := &pb.RangeResponse{}
|
||||||
resp.Header = &pb.ResponseHeader{}
|
resp.Header = &pb.ResponseHeader{}
|
||||||
|
|
||||||
if txn == nil {
|
if txn == nil {
|
||||||
txn = a.s.kv.Read()
|
txn = a.s.kv.Read(trace)
|
||||||
defer txn.End()
|
defer txn.End()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -327,7 +339,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
|
|||||||
rr.KVs = rr.KVs[:r.Limit]
|
rr.KVs = rr.KVs[:r.Limit]
|
||||||
resp.More = true
|
resp.More = true
|
||||||
}
|
}
|
||||||
|
trace.Step("filter and sort the key-value pairs")
|
||||||
resp.Header.Revision = rr.Rev
|
resp.Header.Revision = rr.Rev
|
||||||
resp.Count = int64(rr.Count)
|
resp.Count = int64(rr.Count)
|
||||||
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
|
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
|
||||||
@ -337,12 +349,13 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
|
|||||||
}
|
}
|
||||||
resp.Kvs[i] = &rr.KVs[i]
|
resp.Kvs[i] = &rr.KVs[i]
|
||||||
}
|
}
|
||||||
|
trace.Step("assemble the response")
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||||
isWrite := !isTxnReadonly(rt)
|
isWrite := !isTxnReadonly(rt)
|
||||||
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
|
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(traceutil.TODO()))
|
||||||
|
|
||||||
txnPath := compareToPath(txn, rt)
|
txnPath := compareToPath(txn, rt)
|
||||||
if isWrite {
|
if isWrite {
|
||||||
@ -364,7 +377,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|||||||
// be the revision of the write txn.
|
// be the revision of the write txn.
|
||||||
if isWrite {
|
if isWrite {
|
||||||
txn.End()
|
txn.End()
|
||||||
txn = a.s.KV().Write()
|
txn = a.s.KV().Write(traceutil.TODO())
|
||||||
}
|
}
|
||||||
a.applyTxn(txn, rt, txnPath, txnResp)
|
a.applyTxn(txn, rt, txnPath, txnResp)
|
||||||
rev := txn.Rev()
|
rev := txn.Rev()
|
||||||
@ -516,7 +529,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
|
|||||||
respi := tresp.Responses[i].Response
|
respi := tresp.Responses[i].Response
|
||||||
switch tv := req.Request.(type) {
|
switch tv := req.Request.(type) {
|
||||||
case *pb.RequestOp_RequestRange:
|
case *pb.RequestOp_RequestRange:
|
||||||
resp, err := a.Range(txn, tv.RequestRange)
|
resp, err := a.Range(context.TODO(), txn, tv.RequestRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if lg != nil {
|
if lg != nil {
|
||||||
lg.Panic("unexpected error during txn", zap.Error(err))
|
lg.Panic("unexpected error during txn", zap.Error(err))
|
||||||
@ -526,7 +539,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
|
|||||||
}
|
}
|
||||||
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
|
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
|
||||||
case *pb.RequestOp_RequestPut:
|
case *pb.RequestOp_RequestPut:
|
||||||
resp, err := a.Put(txn, tv.RequestPut)
|
resp, _, err := a.Put(txn, tv.RequestPut)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if lg != nil {
|
if lg != nil {
|
||||||
lg.Panic("unexpected error during txn", zap.Error(err))
|
lg.Panic("unexpected error during txn", zap.Error(err))
|
||||||
@ -557,17 +570,22 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
|
|||||||
return txns
|
return txns
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
|
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
|
||||||
resp := &pb.CompactionResponse{}
|
resp := &pb.CompactionResponse{}
|
||||||
resp.Header = &pb.ResponseHeader{}
|
resp.Header = &pb.ResponseHeader{}
|
||||||
ch, err := a.s.KV().Compact(compaction.Revision)
|
trace := traceutil.New("compact",
|
||||||
|
a.s.getLogger(),
|
||||||
|
traceutil.Field{Key: "revision", Value: compaction.Revision},
|
||||||
|
)
|
||||||
|
|
||||||
|
ch, err := a.s.KV().Compact(trace, compaction.Revision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ch, err
|
return nil, ch, nil, err
|
||||||
}
|
}
|
||||||
// get the current revision. which key to get is not important.
|
// get the current revision. which key to get is not important.
|
||||||
rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
|
rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
|
||||||
resp.Header.Revision = rr.Rev
|
resp.Header.Revision = rr.Rev
|
||||||
return resp, ch, err
|
return resp, ch, trace, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
||||||
@ -674,8 +692,8 @@ type applierV3Capped struct {
|
|||||||
// with Puts so that the number of keys in the store is capped.
|
// with Puts so that the number of keys in the store is capped.
|
||||||
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
|
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
|
||||||
|
|
||||||
func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
|
func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||||
return nil, ErrNoSpace
|
return nil, nil, ErrNoSpace
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||||
@ -824,13 +842,13 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
|
|||||||
return "aApplierV3{app, NewBackendQuota(s, "v3-applier")}
|
return "aApplierV3{app, NewBackendQuota(s, "v3-applier")}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
|
func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||||
ok := a.q.Available(p)
|
ok := a.q.Available(p)
|
||||||
resp, err := a.applierV3.Put(txn, p)
|
resp, trace, err := a.applierV3.Put(txn, p)
|
||||||
if err == nil && !ok {
|
if err == nil && !ok {
|
||||||
err = ErrNoSpace
|
err = ErrNoSpace
|
||||||
}
|
}
|
||||||
return resp, err
|
return resp, trace, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||||
|
@ -15,12 +15,14 @@
|
|||||||
package etcdserver
|
package etcdserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.etcd.io/etcd/auth"
|
"go.etcd.io/etcd/auth"
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/mvcc"
|
"go.etcd.io/etcd/mvcc"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type authApplierV3 struct {
|
type authApplierV3 struct {
|
||||||
@ -61,9 +63,9 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) {
|
func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||||
if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
|
if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := aa.checkLeasePuts(lease.LeaseID(r.Lease)); err != nil {
|
if err := aa.checkLeasePuts(lease.LeaseID(r.Lease)); err != nil {
|
||||||
@ -71,23 +73,23 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon
|
|||||||
// be written by this user. It means the user cannot revoke the
|
// be written by this user. It means the user cannot revoke the
|
||||||
// lease so attaching the lease to the newly written key should
|
// lease so attaching the lease to the newly written key should
|
||||||
// be forbidden.
|
// be forbidden.
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.PrevKv {
|
if r.PrevKv {
|
||||||
err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, nil)
|
err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return aa.applierV3.Put(txn, r)
|
return aa.applierV3.Put(txn, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||||
if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
|
if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return aa.applierV3.Range(txn, r)
|
return aa.applierV3.Range(ctx, txn, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
"go.etcd.io/etcd/mvcc"
|
"go.etcd.io/etcd/mvcc"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
"go.etcd.io/etcd/pkg/types"
|
"go.etcd.io/etcd/pkg/types"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -382,11 +383,11 @@ type applierV3Corrupt struct {
|
|||||||
|
|
||||||
func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
|
func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
|
||||||
|
|
||||||
func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
|
func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||||
return nil, ErrCorrupt
|
return nil, nil, ErrCorrupt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
|
func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||||
return nil, ErrCorrupt
|
return nil, ErrCorrupt
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -398,8 +399,8 @@ func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|||||||
return nil, ErrCorrupt
|
return nil, ErrCorrupt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
|
func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
|
||||||
return nil, nil, ErrCorrupt
|
return nil, nil, nil, ErrCorrupt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
||||||
|
@ -50,6 +50,7 @@ import (
|
|||||||
"go.etcd.io/etcd/pkg/pbutil"
|
"go.etcd.io/etcd/pkg/pbutil"
|
||||||
"go.etcd.io/etcd/pkg/runtime"
|
"go.etcd.io/etcd/pkg/runtime"
|
||||||
"go.etcd.io/etcd/pkg/schedule"
|
"go.etcd.io/etcd/pkg/schedule"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
"go.etcd.io/etcd/pkg/types"
|
"go.etcd.io/etcd/pkg/types"
|
||||||
"go.etcd.io/etcd/pkg/wait"
|
"go.etcd.io/etcd/pkg/wait"
|
||||||
"go.etcd.io/etcd/raft"
|
"go.etcd.io/etcd/raft"
|
||||||
@ -1178,7 +1179,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
|||||||
plog.Info("recovering lessor...")
|
plog.Info("recovering lessor...")
|
||||||
}
|
}
|
||||||
|
|
||||||
s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
|
s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
|
||||||
|
|
||||||
if lg != nil {
|
if lg != nil {
|
||||||
lg.Info("restored lease store")
|
lg.Info("restored lease store")
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/lease/leasehttp"
|
"go.etcd.io/etcd/lease/leasehttp"
|
||||||
"go.etcd.io/etcd/mvcc"
|
"go.etcd.io/etcd/mvcc"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
"go.etcd.io/etcd/raft"
|
"go.etcd.io/etcd/raft"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
@ -38,6 +39,7 @@ const (
|
|||||||
// However, if the committed entries are very heavy to apply, the gap might grow.
|
// However, if the committed entries are very heavy to apply, the gap might grow.
|
||||||
// We should stop accepting new proposals if the gap growing to a certain point.
|
// We should stop accepting new proposals if the gap growing to a certain point.
|
||||||
maxGapBetweenApplyAndCommitIndex = 5000
|
maxGapBetweenApplyAndCommitIndex = 5000
|
||||||
|
traceThreshold = 100 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
type RaftKV interface {
|
type RaftKV interface {
|
||||||
@ -85,14 +87,29 @@ type Authenticator interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||||
|
trace := traceutil.New("range",
|
||||||
|
s.getLogger(),
|
||||||
|
traceutil.Field{Key: "range_begin", Value: string(r.Key)},
|
||||||
|
traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)},
|
||||||
|
)
|
||||||
|
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
||||||
|
|
||||||
var resp *pb.RangeResponse
|
var resp *pb.RangeResponse
|
||||||
var err error
|
var err error
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err)
|
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err)
|
||||||
|
if resp != nil {
|
||||||
|
trace.AddField(
|
||||||
|
traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
|
||||||
|
traceutil.Field{Key: "response_revision", Value: resp.Header.Revision},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
trace.LogIfLong(traceThreshold)
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
if !r.Serializable {
|
if !r.Serializable {
|
||||||
err = s.linearizableReadNotify(ctx)
|
err = s.linearizableReadNotify(ctx)
|
||||||
|
trace.Step("agreement among raft nodes before linearized reading")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -101,7 +118,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
|
|||||||
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
||||||
}
|
}
|
||||||
|
|
||||||
get := func() { resp, err = s.applyV3Base.Range(nil, r) }
|
get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
|
||||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||||
err = serr
|
err = serr
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -110,6 +127,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
|
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
|
||||||
|
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
|
||||||
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
|
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -186,7 +204,18 @@ func isTxnReadonly(r *pb.TxnRequest) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
|
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
|
||||||
|
startTime := time.Now()
|
||||||
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
|
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
|
||||||
|
trace := traceutil.TODO()
|
||||||
|
if result != nil && result.trace != nil {
|
||||||
|
trace = result.trace
|
||||||
|
defer func() {
|
||||||
|
trace.LogIfLong(traceThreshold)
|
||||||
|
}()
|
||||||
|
applyStart := result.trace.GetStartTime()
|
||||||
|
result.trace.SetStartTime(startTime)
|
||||||
|
trace.InsertStep(0, applyStart, "process raft request")
|
||||||
|
}
|
||||||
if r.Physical && result != nil && result.physc != nil {
|
if r.Physical && result != nil && result.physc != nil {
|
||||||
<-result.physc
|
<-result.physc
|
||||||
// The compaction is done deleting keys; the hash is now settled
|
// The compaction is done deleting keys; the hash is now settled
|
||||||
@ -195,6 +224,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
|
|||||||
// if the compaction resumes. Force the finished compaction to
|
// if the compaction resumes. Force the finished compaction to
|
||||||
// commit so it won't resume following a crash.
|
// commit so it won't resume following a crash.
|
||||||
s.be.ForceCommit()
|
s.be.ForceCommit()
|
||||||
|
trace.Step("physically apply compaction")
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -210,6 +240,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
|
|||||||
resp.Header = &pb.ResponseHeader{}
|
resp.Header = &pb.ResponseHeader{}
|
||||||
}
|
}
|
||||||
resp.Header.Revision = s.kv.Rev()
|
resp.Header.Revision = s.kv.Rev()
|
||||||
|
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -533,6 +564,15 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque
|
|||||||
if result.err != nil {
|
if result.err != nil {
|
||||||
return nil, result.err
|
return nil, result.err
|
||||||
}
|
}
|
||||||
|
if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.trace != nil {
|
||||||
|
applyStart := result.trace.GetStartTime()
|
||||||
|
// The trace object is created in apply. Here reset the start time to trace
|
||||||
|
// the raft request time by the difference between the request start time
|
||||||
|
// and apply start time
|
||||||
|
result.trace.SetStartTime(startTime)
|
||||||
|
result.trace.InsertStep(0, applyStart, "process raft request")
|
||||||
|
result.trace.LogIfLong(traceThreshold)
|
||||||
|
}
|
||||||
return result.resp, nil
|
return result.resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -547,6 +587,7 @@ func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest)
|
|||||||
|
|
||||||
// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
|
// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
|
||||||
func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
|
func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
|
||||||
|
trace := traceutil.Get(ctx)
|
||||||
ai, err := s.AuthInfoFromCtx(ctx)
|
ai, err := s.AuthInfoFromCtx(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -558,6 +599,7 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e
|
|||||||
if err = chk(ai); err != nil {
|
if err = chk(ai); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
trace.Step("get authentication metadata")
|
||||||
// fetch response for serialized request
|
// fetch response for serialized request
|
||||||
get()
|
get()
|
||||||
// check for stale token revision in case the auth store was updated while
|
// check for stale token revision in case the auth store was updated while
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"go.etcd.io/etcd/mvcc"
|
"go.etcd.io/etcd/mvcc"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@ -173,7 +174,7 @@ func TestV3CorruptAlarm(t *testing.T) {
|
|||||||
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
|
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
|
||||||
s.Put([]byte("abc"), []byte("def"), 0)
|
s.Put([]byte("abc"), []byte("def"), 0)
|
||||||
s.Put([]byte("xyz"), []byte("123"), 0)
|
s.Put([]byte("xyz"), []byte("123"), 0)
|
||||||
s.Compact(5)
|
s.Compact(traceutil.TODO(), 5)
|
||||||
s.Commit()
|
s.Commit()
|
||||||
s.Close()
|
s.Close()
|
||||||
be.Close()
|
be.Close()
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RangeOptions struct {
|
type RangeOptions struct {
|
||||||
@ -102,10 +103,10 @@ type KV interface {
|
|||||||
WriteView
|
WriteView
|
||||||
|
|
||||||
// Read creates a read transaction.
|
// Read creates a read transaction.
|
||||||
Read() TxnRead
|
Read(trace *traceutil.Trace) TxnRead
|
||||||
|
|
||||||
// Write creates a write transaction.
|
// Write creates a write transaction.
|
||||||
Write() TxnWrite
|
Write(trace *traceutil.Trace) TxnWrite
|
||||||
|
|
||||||
// Hash computes the hash of the KV's backend.
|
// Hash computes the hash of the KV's backend.
|
||||||
Hash() (hash uint32, revision int64, err error)
|
Hash() (hash uint32, revision int64, err error)
|
||||||
@ -114,7 +115,7 @@ type KV interface {
|
|||||||
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
|
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
|
||||||
|
|
||||||
// Compact frees all superseded keys with revisions less than rev.
|
// Compact frees all superseded keys with revisions less than rev.
|
||||||
Compact(rev int64) (<-chan struct{}, error)
|
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
|
||||||
|
|
||||||
// Commit commits outstanding txns into the underlying backend.
|
// Commit commits outstanding txns into the underlying backend.
|
||||||
Commit()
|
Commit()
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
@ -47,7 +48,7 @@ var (
|
|||||||
return kv.Range(key, end, ro)
|
return kv.Range(key, end, ro)
|
||||||
}
|
}
|
||||||
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
||||||
txn := kv.Read()
|
txn := kv.Read(traceutil.TODO())
|
||||||
defer txn.End()
|
defer txn.End()
|
||||||
return txn.Range(key, end, ro)
|
return txn.Range(key, end, ro)
|
||||||
}
|
}
|
||||||
@ -56,7 +57,7 @@ var (
|
|||||||
return kv.Put(key, value, lease)
|
return kv.Put(key, value, lease)
|
||||||
}
|
}
|
||||||
txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
|
txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
|
||||||
txn := kv.Write()
|
txn := kv.Write(traceutil.TODO())
|
||||||
defer txn.End()
|
defer txn.End()
|
||||||
return txn.Put(key, value, lease)
|
return txn.Put(key, value, lease)
|
||||||
}
|
}
|
||||||
@ -65,7 +66,7 @@ var (
|
|||||||
return kv.DeleteRange(key, end)
|
return kv.DeleteRange(key, end)
|
||||||
}
|
}
|
||||||
txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
|
txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
|
||||||
txn := kv.Write()
|
txn := kv.Write(traceutil.TODO())
|
||||||
defer txn.End()
|
defer txn.End()
|
||||||
return txn.DeleteRange(key, end)
|
return txn.DeleteRange(key, end)
|
||||||
}
|
}
|
||||||
@ -182,7 +183,7 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) {
|
|||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
put3TestKVs(s)
|
put3TestKVs(s)
|
||||||
if _, err := s.Compact(4); err != nil {
|
if _, err := s.Compact(traceutil.TODO(), 4); err != nil {
|
||||||
t.Fatalf("compact error (%v)", err)
|
t.Fatalf("compact error (%v)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -409,7 +410,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) {
|
|||||||
func() { s.DeleteRange([]byte("foo"), nil) },
|
func() { s.DeleteRange([]byte("foo"), nil) },
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
txn := s.Write()
|
txn := s.Write(traceutil.TODO())
|
||||||
done := make(chan struct{}, 1)
|
done := make(chan struct{}, 1)
|
||||||
go func() {
|
go func() {
|
||||||
tt()
|
tt()
|
||||||
@ -438,7 +439,7 @@ func TestKVTxnNonBlockRange(t *testing.T) {
|
|||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
txn := s.Write()
|
txn := s.Write(traceutil.TODO())
|
||||||
defer txn.End()
|
defer txn.End()
|
||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
@ -460,7 +461,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
|
|||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
txn := s.Write()
|
txn := s.Write(traceutil.TODO())
|
||||||
base := int64(i + 1)
|
base := int64(i + 1)
|
||||||
|
|
||||||
// put foo
|
// put foo
|
||||||
@ -544,7 +545,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
_, err := s.Compact(tt.rev)
|
_, err := s.Compact(traceutil.TODO(), tt.rev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("#%d: unexpect compact error %v", i, err)
|
t.Errorf("#%d: unexpect compact error %v", i, err)
|
||||||
}
|
}
|
||||||
@ -580,7 +581,7 @@ func TestKVCompactBad(t *testing.T) {
|
|||||||
{100, ErrFutureRev},
|
{100, ErrFutureRev},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
_, err := s.Compact(tt.rev)
|
_, err := s.Compact(traceutil.TODO(), tt.rev)
|
||||||
if err != tt.werr {
|
if err != tt.werr {
|
||||||
t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr)
|
t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr)
|
||||||
}
|
}
|
||||||
@ -626,7 +627,7 @@ func TestKVRestore(t *testing.T) {
|
|||||||
func(kv KV) {
|
func(kv KV) {
|
||||||
kv.Put([]byte("foo"), []byte("bar0"), 1)
|
kv.Put([]byte("foo"), []byte("bar0"), 1)
|
||||||
kv.Put([]byte("foo"), []byte("bar1"), 2)
|
kv.Put([]byte("foo"), []byte("bar1"), 2)
|
||||||
kv.Compact(1)
|
kv.Compact(traceutil.TODO(), 1)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
|
@ -14,24 +14,27 @@
|
|||||||
|
|
||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import "go.etcd.io/etcd/lease"
|
import (
|
||||||
|
"go.etcd.io/etcd/lease"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
)
|
||||||
|
|
||||||
type readView struct{ kv KV }
|
type readView struct{ kv KV }
|
||||||
|
|
||||||
func (rv *readView) FirstRev() int64 {
|
func (rv *readView) FirstRev() int64 {
|
||||||
tr := rv.kv.Read()
|
tr := rv.kv.Read(traceutil.TODO())
|
||||||
defer tr.End()
|
defer tr.End()
|
||||||
return tr.FirstRev()
|
return tr.FirstRev()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rv *readView) Rev() int64 {
|
func (rv *readView) Rev() int64 {
|
||||||
tr := rv.kv.Read()
|
tr := rv.kv.Read(traceutil.TODO())
|
||||||
defer tr.End()
|
defer tr.End()
|
||||||
return tr.Rev()
|
return tr.Rev()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||||
tr := rv.kv.Read()
|
tr := rv.kv.Read(traceutil.TODO())
|
||||||
defer tr.End()
|
defer tr.End()
|
||||||
return tr.Range(key, end, ro)
|
return tr.Range(key, end, ro)
|
||||||
}
|
}
|
||||||
@ -39,13 +42,13 @@ func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err
|
|||||||
type writeView struct{ kv KV }
|
type writeView struct{ kv KV }
|
||||||
|
|
||||||
func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
|
func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
|
||||||
tw := wv.kv.Write()
|
tw := wv.kv.Write(traceutil.TODO())
|
||||||
defer tw.End()
|
defer tw.End()
|
||||||
return tw.DeleteRange(key, end)
|
return tw.DeleteRange(key, end)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||||
tw := wv.kv.Write()
|
tw := wv.kv.Write(traceutil.TODO())
|
||||||
defer tw.End()
|
defer tw.End()
|
||||||
return tw.Put(key, value, lease)
|
return tw.Put(key, value, lease)
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/schedule"
|
"go.etcd.io/etcd/pkg/schedule"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -140,7 +141,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI
|
|||||||
s.ReadView = &readView{s}
|
s.ReadView = &readView{s}
|
||||||
s.WriteView = &writeView{s}
|
s.WriteView = &writeView{s}
|
||||||
if s.le != nil {
|
if s.le != nil {
|
||||||
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
|
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
|
||||||
}
|
}
|
||||||
|
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
@ -270,9 +271,10 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) compact(rev int64) (<-chan struct{}, error) {
|
func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
keep := s.kvindex.Compact(rev)
|
keep := s.kvindex.Compact(rev)
|
||||||
|
trace.Step("compact in-memory index tree")
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
var j = func(ctx context.Context) {
|
var j = func(ctx context.Context) {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
@ -289,6 +291,7 @@ func (s *store) compact(rev int64) (<-chan struct{}, error) {
|
|||||||
s.fifoSched.Schedule(j)
|
s.fifoSched.Schedule(j)
|
||||||
|
|
||||||
indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
||||||
|
trace.Step("schedule compaction")
|
||||||
return ch, nil
|
return ch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,21 +301,21 @@ func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
|
|||||||
return ch, err
|
return ch, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.compact(rev)
|
return s.compact(traceutil.TODO(), rev)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
|
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
|
||||||
ch, err := s.updateCompactRev(rev)
|
ch, err := s.updateCompactRev(rev)
|
||||||
|
trace.Step("check and update compact revision")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
return ch, err
|
return ch, err
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
return s.compact(rev)
|
return s.compact(trace, rev)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultIgnores is a map of keys to ignore in hash checking.
|
// DefaultIgnores is a map of keys to ignore in hash checking.
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
|
|
||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@ -130,7 +131,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
|
|||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
txn := s.Write()
|
txn := s.Write(traceutil.TODO())
|
||||||
txn.Put(keys[i], vals[i], lease.NoLease)
|
txn.Put(keys[i], vals[i], lease.NoLease)
|
||||||
txn.End()
|
txn.End()
|
||||||
}
|
}
|
||||||
@ -151,7 +152,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
|
|||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
for j := 0; j < revsPerKey; j++ {
|
for j := 0; j < revsPerKey; j++ {
|
||||||
txn := s.Write()
|
txn := s.Write(traceutil.TODO())
|
||||||
txn.Put(keys[i], vals[i], lease.NoLease)
|
txn.Put(keys[i], vals[i], lease.NoLease)
|
||||||
txn.End()
|
txn.End()
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
|
|
||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -109,7 +110,7 @@ func TestCompactAllAndRestore(t *testing.T) {
|
|||||||
|
|
||||||
rev := s0.Rev()
|
rev := s0.Rev()
|
||||||
// compact all keys
|
// compact all keys
|
||||||
done, err := s0.Compact(rev)
|
done, err := s0.Compact(traceutil.TODO(), rev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/schedule"
|
"go.etcd.io/etcd/pkg/schedule"
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@ -331,7 +332,7 @@ func TestStoreCompact(t *testing.T) {
|
|||||||
key2 := newTestKeyBytes(revision{2, 0}, false)
|
key2 := newTestKeyBytes(revision{2, 0}, false)
|
||||||
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil}
|
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil}
|
||||||
|
|
||||||
s.Compact(3)
|
s.Compact(traceutil.TODO(), 3)
|
||||||
s.fifoSched.WaitFinish(1)
|
s.fifoSched.WaitFinish(1)
|
||||||
|
|
||||||
if s.compactMainRev != 3 {
|
if s.compactMainRev != 3 {
|
||||||
@ -582,7 +583,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for i := 100; i >= 0; i-- {
|
for i := 100; i >= 0; i-- {
|
||||||
_, err := s.Compact(int64(rev - 1 - i))
|
_, err := s.Compact(traceutil.TODO(), int64(rev-1-i))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -609,7 +610,7 @@ func TestHashKVZeroRevision(t *testing.T) {
|
|||||||
for i := 2; i <= rev; i++ {
|
for i := 2; i <= rev; i++ {
|
||||||
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
|
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
|
||||||
}
|
}
|
||||||
if _, err := s.Compact(int64(rev / 2)); err != nil {
|
if _, err := s.Compact(traceutil.TODO(), int64(rev/2)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -639,7 +640,7 @@ func TestTxnPut(t *testing.T) {
|
|||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < sliceN; i++ {
|
for i := 0; i < sliceN; i++ {
|
||||||
txn := s.Write()
|
txn := s.Write(traceutil.TODO())
|
||||||
base := int64(i + 2)
|
base := int64(i + 2)
|
||||||
if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base {
|
if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base {
|
||||||
t.Errorf("#%d: rev = %d, want %d", i, rev, base)
|
t.Errorf("#%d: rev = %d, want %d", i, rev, base)
|
||||||
@ -658,7 +659,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
|||||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
|
|
||||||
// readTx simulates a long read request
|
// readTx simulates a long read request
|
||||||
readTx1 := s.Read()
|
readTx1 := s.Read(traceutil.TODO())
|
||||||
|
|
||||||
// write should not be blocked by reads
|
// write should not be blocked by reads
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -673,7 +674,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// readTx2 simulates a short read request
|
// readTx2 simulates a short read request
|
||||||
readTx2 := s.Read()
|
readTx2 := s.Read(traceutil.TODO())
|
||||||
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
|
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
|
||||||
ret, err := readTx2.Range([]byte("foo"), nil, ro)
|
ret, err := readTx2.Range([]byte("foo"), nil, ro)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -730,7 +731,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
|
time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
|
||||||
|
|
||||||
tx := s.Write()
|
tx := s.Write(traceutil.TODO())
|
||||||
numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1
|
numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1
|
||||||
var pendingKvs kvs
|
var pendingKvs kvs
|
||||||
for j := 0; j < numOfPuts; j++ {
|
for j := 0; j < numOfPuts; j++ {
|
||||||
@ -756,7 +757,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
|
|||||||
mu.Lock()
|
mu.Lock()
|
||||||
wKVs := make(kvs, len(committedKVs))
|
wKVs := make(kvs, len(committedKVs))
|
||||||
copy(wKVs, committedKVs)
|
copy(wKVs, committedKVs)
|
||||||
tx := s.Read()
|
tx := s.Read(traceutil.TODO())
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
// get all keys in backend store, and compare with wKVs
|
// get all keys in backend store, and compare with wKVs
|
||||||
ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
|
ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -27,9 +28,11 @@ type storeTxnRead struct {
|
|||||||
|
|
||||||
firstRev int64
|
firstRev int64
|
||||||
rev int64
|
rev int64
|
||||||
|
|
||||||
|
trace *traceutil.Trace
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Read() TxnRead {
|
func (s *store) Read(trace *traceutil.Trace) TxnRead {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
s.revMu.RLock()
|
s.revMu.RLock()
|
||||||
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
|
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
|
||||||
@ -38,7 +41,7 @@ func (s *store) Read() TxnRead {
|
|||||||
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
|
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
|
||||||
firstRev, rev := s.compactMainRev, s.currentRev
|
firstRev, rev := s.compactMainRev, s.currentRev
|
||||||
s.revMu.RUnlock()
|
s.revMu.RUnlock()
|
||||||
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
|
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
|
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
|
||||||
@ -61,12 +64,12 @@ type storeTxnWrite struct {
|
|||||||
changes []mvccpb.KeyValue
|
changes []mvccpb.KeyValue
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Write() TxnWrite {
|
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
tw := &storeTxnWrite{
|
tw := &storeTxnWrite{
|
||||||
storeTxnRead: storeTxnRead{s, tx, 0, 0},
|
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
|
||||||
tx: tx,
|
tx: tx,
|
||||||
beginRev: s.currentRev,
|
beginRev: s.currentRev,
|
||||||
changes: make([]mvccpb.KeyValue, 0, 4),
|
changes: make([]mvccpb.KeyValue, 0, 4),
|
||||||
@ -124,6 +127,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
|
|||||||
}
|
}
|
||||||
|
|
||||||
revpairs := tr.s.kvindex.Revisions(key, end, rev)
|
revpairs := tr.s.kvindex.Revisions(key, end, rev)
|
||||||
|
tr.trace.Step("range keys from in-memory index tree")
|
||||||
if len(revpairs) == 0 {
|
if len(revpairs) == 0 {
|
||||||
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
|
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
|
||||||
}
|
}
|
||||||
@ -163,6 +167,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
tr.trace.Step("range keys from bolt db")
|
||||||
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
|
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,7 +183,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
|||||||
c = created.main
|
c = created.main
|
||||||
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
|
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
|
||||||
}
|
}
|
||||||
|
tw.trace.Step("get key's previous created_revision and leaseID")
|
||||||
ibytes := newRevBytes()
|
ibytes := newRevBytes()
|
||||||
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
|
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
|
||||||
revToBytes(idxRev, ibytes)
|
revToBytes(idxRev, ibytes)
|
||||||
@ -205,9 +210,11 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tw.trace.Step("marshal mvccpb.KeyValue")
|
||||||
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
||||||
tw.s.kvindex.Put(key, idxRev)
|
tw.s.kvindex.Put(key, idxRev)
|
||||||
tw.changes = append(tw.changes, kv)
|
tw.changes = append(tw.changes, kv)
|
||||||
|
tw.trace.Step("store kv pair into bolt db")
|
||||||
|
|
||||||
if oldLease != lease.NoLease {
|
if oldLease != lease.NoLease {
|
||||||
if tw.s.le == nil {
|
if tw.s.le == nil {
|
||||||
@ -234,6 +241,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
|||||||
panic("unexpected error from lease Attach")
|
panic("unexpected error from lease Attach")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
tw.trace.Step("attach lease to kv pair")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
|
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -84,7 +85,7 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co
|
|||||||
s.store.WriteView = &writeView{s}
|
s.store.WriteView = &writeView{s}
|
||||||
if s.le != nil {
|
if s.le != nil {
|
||||||
// use this store as the deleter so revokes trigger watch events
|
// use this store as the deleter so revokes trigger watch events
|
||||||
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
|
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
|
||||||
}
|
}
|
||||||
s.wg.Add(2)
|
s.wg.Add(2)
|
||||||
go s.syncWatchersLoop()
|
go s.syncWatchersLoop()
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
|
|
||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@ -59,7 +60,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) {
|
|||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
txn := s.Write()
|
txn := s.Write(traceutil.TODO())
|
||||||
txn.Put(keys[i], vals[i], lease.NoLease)
|
txn.Put(keys[i], vals[i], lease.NoLease)
|
||||||
txn.End()
|
txn.End()
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -237,7 +238,7 @@ func TestWatchCompacted(t *testing.T) {
|
|||||||
for i := 0; i < maxRev; i++ {
|
for i := 0; i < maxRev; i++ {
|
||||||
s.Put(testKey, testValue, lease.NoLease)
|
s.Put(testKey, testValue, lease.NoLease)
|
||||||
}
|
}
|
||||||
_, err := s.Compact(compactRev)
|
_, err := s.Compact(traceutil.TODO(), compactRev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to compact kv (%v)", err)
|
t.Fatalf("failed to compact kv (%v)", err)
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,10 @@
|
|||||||
|
|
||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import "go.etcd.io/etcd/mvcc/mvccpb"
|
import (
|
||||||
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
)
|
||||||
|
|
||||||
func (tw *watchableStoreTxnWrite) End() {
|
func (tw *watchableStoreTxnWrite) End() {
|
||||||
changes := tw.Changes()
|
changes := tw.Changes()
|
||||||
@ -48,4 +51,6 @@ type watchableStoreTxnWrite struct {
|
|||||||
s *watchableStore
|
s *watchableStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }
|
func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {
|
||||||
|
return &watchableStoreTxnWrite{s.store.Write(trace), s}
|
||||||
|
}
|
||||||
|
172
pkg/traceutil/trace.go
Normal file
172
pkg/traceutil/trace.go
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
// Copyright 2019 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// Package traceutil implements tracing utilities using "context".
|
||||||
|
package traceutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
TraceKey = "trace"
|
||||||
|
StartTimeKey = "startTime"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Field is a kv pair to record additional details of the trace.
|
||||||
|
type Field struct {
|
||||||
|
Key string
|
||||||
|
Value interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Field) format() string {
|
||||||
|
return fmt.Sprintf("%s:%v; ", f.Key, f.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeFields(fields []Field) string {
|
||||||
|
if len(fields) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.WriteString("{")
|
||||||
|
for _, f := range fields {
|
||||||
|
buf.WriteString(f.format())
|
||||||
|
}
|
||||||
|
buf.WriteString("}")
|
||||||
|
return buf.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
type Trace struct {
|
||||||
|
operation string
|
||||||
|
lg *zap.Logger
|
||||||
|
fields []Field
|
||||||
|
startTime time.Time
|
||||||
|
steps []step
|
||||||
|
stepDisabled bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type step struct {
|
||||||
|
time time.Time
|
||||||
|
msg string
|
||||||
|
fields []Field
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(op string, lg *zap.Logger, fields ...Field) *Trace {
|
||||||
|
return &Trace{operation: op, lg: lg, startTime: time.Now(), fields: fields}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO returns a non-nil, empty Trace
|
||||||
|
func TODO() *Trace {
|
||||||
|
return &Trace{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Get(ctx context.Context) *Trace {
|
||||||
|
if trace, ok := ctx.Value(TraceKey).(*Trace); ok && trace != nil {
|
||||||
|
return trace
|
||||||
|
}
|
||||||
|
return TODO()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Trace) GetStartTime() time.Time {
|
||||||
|
return t.startTime
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Trace) SetStartTime(time time.Time) {
|
||||||
|
t.startTime = time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) {
|
||||||
|
newStep := step{time, msg, fields}
|
||||||
|
if at < len(t.steps) {
|
||||||
|
t.steps = append(t.steps[:at+1], t.steps[at:]...)
|
||||||
|
t.steps[at] = newStep
|
||||||
|
} else {
|
||||||
|
t.steps = append(t.steps, newStep)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step adds step to trace
|
||||||
|
func (t *Trace) Step(msg string, fields ...Field) {
|
||||||
|
if !t.stepDisabled {
|
||||||
|
t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DisableStep sets the flag to prevent the trace from adding steps
|
||||||
|
func (t *Trace) DisableStep() {
|
||||||
|
t.stepDisabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableStep re-enable the trace to add steps
|
||||||
|
func (t *Trace) EnableStep() {
|
||||||
|
t.stepDisabled = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Trace) AddField(fields ...Field) {
|
||||||
|
for _, f := range fields {
|
||||||
|
t.fields = append(t.fields, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log dumps all steps in the Trace
|
||||||
|
func (t *Trace) Log() {
|
||||||
|
t.LogWithStepThreshold(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogIfLong dumps logs if the duration is longer than threshold
|
||||||
|
func (t *Trace) LogIfLong(threshold time.Duration) {
|
||||||
|
if time.Since(t.startTime) > threshold {
|
||||||
|
stepThreshold := threshold / time.Duration(len(t.steps)+1)
|
||||||
|
t.LogWithStepThreshold(stepThreshold)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogWithStepThreshold only dumps step whose duration is longer than step threshold
|
||||||
|
func (t *Trace) LogWithStepThreshold(threshold time.Duration) {
|
||||||
|
msg, fs := t.logInfo(threshold)
|
||||||
|
if t.lg != nil {
|
||||||
|
t.lg.Info(msg, fs...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Trace) logInfo(threshold time.Duration) (string, []zap.Field) {
|
||||||
|
endTime := time.Now()
|
||||||
|
totalDuration := endTime.Sub(t.startTime)
|
||||||
|
traceNum := rand.Int31()
|
||||||
|
msg := fmt.Sprintf("trace[%d] %s", traceNum, t.operation)
|
||||||
|
|
||||||
|
var steps []string
|
||||||
|
lastStepTime := t.startTime
|
||||||
|
for _, step := range t.steps {
|
||||||
|
stepDuration := step.time.Sub(lastStepTime)
|
||||||
|
if stepDuration > threshold {
|
||||||
|
steps = append(steps, fmt.Sprintf("trace[%d] '%v' %s (duration: %v)",
|
||||||
|
traceNum, step.msg, writeFields(step.fields), stepDuration))
|
||||||
|
}
|
||||||
|
lastStepTime = step.time
|
||||||
|
}
|
||||||
|
|
||||||
|
fs := []zap.Field{zap.String("detail", writeFields(t.fields)),
|
||||||
|
zap.Duration("duration", totalDuration),
|
||||||
|
zap.Time("start", t.startTime),
|
||||||
|
zap.Time("end", endTime),
|
||||||
|
zap.Strings("steps", steps)}
|
||||||
|
return msg, fs
|
||||||
|
}
|
262
pkg/traceutil/trace_test.go
Normal file
262
pkg/traceutil/trace_test.go
Normal file
@ -0,0 +1,262 @@
|
|||||||
|
// Copyright 2019 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package traceutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGet(t *testing.T) {
|
||||||
|
traceForTest := &Trace{operation: "test"}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputCtx context.Context
|
||||||
|
outputTrace *Trace
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "When the context does not have trace",
|
||||||
|
inputCtx: context.TODO(),
|
||||||
|
outputTrace: TODO(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "When the context has trace",
|
||||||
|
inputCtx: context.WithValue(context.Background(), TraceKey, traceForTest),
|
||||||
|
outputTrace: traceForTest,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
trace := Get(tt.inputCtx)
|
||||||
|
if trace == nil {
|
||||||
|
t.Errorf("Expected %v; Got nil", tt.outputTrace)
|
||||||
|
}
|
||||||
|
if trace.operation != tt.outputTrace.operation {
|
||||||
|
t.Errorf("Expected %v; Got %v", tt.outputTrace, trace)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCreate(t *testing.T) {
|
||||||
|
var (
|
||||||
|
op = "Test"
|
||||||
|
steps = []string{"Step1, Step2"}
|
||||||
|
fields = []Field{
|
||||||
|
{"traceKey1", "traceValue1"},
|
||||||
|
{"traceKey2", "traceValue2"},
|
||||||
|
}
|
||||||
|
stepFields = []Field{
|
||||||
|
{"stepKey1", "stepValue2"},
|
||||||
|
{"stepKey2", "stepValue2"},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
trace := New(op, nil, fields[0], fields[1])
|
||||||
|
if trace.operation != op {
|
||||||
|
t.Errorf("Expected %v; Got %v", op, trace.operation)
|
||||||
|
}
|
||||||
|
for i, f := range trace.fields {
|
||||||
|
if f.Key != fields[i].Key {
|
||||||
|
t.Errorf("Expected %v; Got %v", fields[i].Key, f.Key)
|
||||||
|
}
|
||||||
|
if f.Value != fields[i].Value {
|
||||||
|
t.Errorf("Expected %v; Got %v", fields[i].Value, f.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, v := range steps {
|
||||||
|
trace.Step(v, stepFields[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, v := range trace.steps {
|
||||||
|
if steps[i] != v.msg {
|
||||||
|
t.Errorf("Expected %v; Got %v", steps[i], v.msg)
|
||||||
|
}
|
||||||
|
if stepFields[i].Key != v.fields[0].Key {
|
||||||
|
t.Errorf("Expected %v; Got %v", stepFields[i].Key, v.fields[0].Key)
|
||||||
|
}
|
||||||
|
if stepFields[i].Value != v.fields[0].Value {
|
||||||
|
t.Errorf("Expected %v; Got %v", stepFields[i].Value, v.fields[0].Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLog(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
trace *Trace
|
||||||
|
fields []Field
|
||||||
|
expectedMsg []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "When dump all logs",
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{time: time.Now().Add(-80 * time.Millisecond), msg: "msg1"},
|
||||||
|
{time: time.Now().Add(-50 * time.Millisecond), msg: "msg2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{
|
||||||
|
"msg1", "msg2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "When trace has fields",
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{
|
||||||
|
time: time.Now().Add(-80 * time.Millisecond),
|
||||||
|
msg: "msg1",
|
||||||
|
fields: []Field{{"stepKey1", "stepValue1"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
time: time.Now().Add(-50 * time.Millisecond),
|
||||||
|
msg: "msg2",
|
||||||
|
fields: []Field{{"stepKey2", "stepValue2"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
fields: []Field{
|
||||||
|
{"traceKey1", "traceValue1"},
|
||||||
|
{"count", 1},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{
|
||||||
|
"Test",
|
||||||
|
"msg1", "msg2",
|
||||||
|
"traceKey1:traceValue1", "count:1",
|
||||||
|
"stepKey1:stepValue1", "stepKey2:stepValue2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-%d", time.Now().UnixNano()))
|
||||||
|
defer os.RemoveAll(logPath)
|
||||||
|
|
||||||
|
lcfg := zap.NewProductionConfig()
|
||||||
|
lcfg.OutputPaths = []string{logPath}
|
||||||
|
lcfg.ErrorOutputPaths = []string{logPath}
|
||||||
|
lg, _ := lcfg.Build()
|
||||||
|
|
||||||
|
for _, f := range tt.fields {
|
||||||
|
tt.trace.AddField(f)
|
||||||
|
}
|
||||||
|
tt.trace.lg = lg
|
||||||
|
tt.trace.Log()
|
||||||
|
data, err := ioutil.ReadFile(logPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msg := range tt.expectedMsg {
|
||||||
|
if !bytes.Contains(data, []byte(msg)) {
|
||||||
|
t.Errorf("Expected to find %v in log", msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLogIfLong(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
threshold time.Duration
|
||||||
|
trace *Trace
|
||||||
|
expectedMsg []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "When the duration is smaller than threshold",
|
||||||
|
threshold: time.Duration(200 * time.Millisecond),
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{time: time.Now().Add(-50 * time.Millisecond), msg: "msg1"},
|
||||||
|
{time: time.Now(), msg: "msg2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "When the duration is longer than threshold",
|
||||||
|
threshold: time.Duration(50 * time.Millisecond),
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{time: time.Now().Add(-50 * time.Millisecond), msg: "msg1"},
|
||||||
|
{time: time.Now(), msg: "msg2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{
|
||||||
|
"msg1", "msg2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "When not all steps are longer than step threshold",
|
||||||
|
threshold: time.Duration(50 * time.Millisecond),
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{time: time.Now(), msg: "msg1"},
|
||||||
|
{time: time.Now(), msg: "msg2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{
|
||||||
|
"msg1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-%d", time.Now().UnixNano()))
|
||||||
|
defer os.RemoveAll(logPath)
|
||||||
|
|
||||||
|
lcfg := zap.NewProductionConfig()
|
||||||
|
lcfg.OutputPaths = []string{logPath}
|
||||||
|
lcfg.ErrorOutputPaths = []string{logPath}
|
||||||
|
lg, _ := lcfg.Build()
|
||||||
|
|
||||||
|
tt.trace.lg = lg
|
||||||
|
tt.trace.LogIfLong(tt.threshold)
|
||||||
|
data, err := ioutil.ReadFile(logPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for _, msg := range tt.expectedMsg {
|
||||||
|
if !bytes.Contains(data, []byte(msg)) {
|
||||||
|
t.Errorf("Expected to find %v in log", msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -23,6 +23,7 @@ import (
|
|||||||
|
|
||||||
"go.etcd.io/etcd/lease"
|
"go.etcd.io/etcd/lease"
|
||||||
"go.etcd.io/etcd/pkg/report"
|
"go.etcd.io/etcd/pkg/report"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
@ -114,7 +115,7 @@ func mvccPutFunc(cmd *cobra.Command, args []string) {
|
|||||||
for i := 0; i < mvccTotalRequests; i++ {
|
for i := 0; i < mvccTotalRequests; i++ {
|
||||||
st := time.Now()
|
st := time.Now()
|
||||||
|
|
||||||
tw := s.Write()
|
tw := s.Write(traceutil.TODO())
|
||||||
for j := i; j < i+nrTxnOps; j++ {
|
for j := i; j < i+nrTxnOps; j++ {
|
||||||
tw.Put(keys[j], vals[j], lease.NoLease)
|
tw.Put(keys[j], vals[j], lease.NoLease)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user