diff --git a/tools/functional-tester/rpcpb/member.go b/tools/functional-tester/rpcpb/member.go index f82fd4d9e..a1b00f19b 100644 --- a/tools/functional-tester/rpcpb/member.go +++ b/tools/functional-tester/rpcpb/member.go @@ -41,15 +41,17 @@ func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, } // CreateEtcdClient creates a client from member. -func (m *Member) CreateEtcdClient() (*clientv3.Client, error) { +func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) { + cfg := clientv3.Config{ + Endpoints: []string{m.EtcdClientEndpoint}, + DialTimeout: 5 * time.Second, + DialOptions: opts, + } if m.EtcdClientTLS { // TODO: support TLS panic("client TLS not supported yet") } - return clientv3.New(clientv3.Config{ - Endpoints: []string{m.EtcdClientEndpoint}, - DialTimeout: 5 * time.Second, - }) + return clientv3.New(cfg) } // CheckCompact ensures that historical data before given revision has been compacted. @@ -124,6 +126,21 @@ func (m *Member) Rev(ctx context.Context) (int64, error) { return resp.Header.Revision, nil } +// Compact compacts member storage with given revision. +// It blocks until it's physically done. +func (m *Member) Compact(rev int64, timeout time.Duration) error { + cli, err := m.CreateEtcdClient() + if err != nil { + return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) + } + defer cli.Close() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + _, err = cli.Compact(ctx, rev, clientv3.WithCompactPhysical()) + cancel() + return err +} + // IsLeader returns true if this member is the current cluster leader. func (m *Member) IsLeader() (bool, error) { cli, err := m.CreateEtcdClient() diff --git a/tools/functional-tester/tester/checks.go b/tools/functional-tester/tester/checks.go index 6c1c0a75b..0a5207fe9 100644 --- a/tools/functional-tester/tester/checks.go +++ b/tools/functional-tester/tester/checks.go @@ -21,7 +21,6 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" @@ -101,22 +100,20 @@ type leaseChecker struct { lg *zap.Logger m *rpcpb.Member ls *leaseStresser - lsc pb.LeaseClient - kvc pb.KVClient + cli *clientv3.Client } func (lc *leaseChecker) Check() error { - conn, err := lc.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(time.Second)) + cli, err := lc.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(time.Second)) if err != nil { return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint) } defer func() { - if conn != nil { - conn.Close() + if cli != nil { + cli.Close() } }() - lc.kvc = pb.NewKVClient(conn) - lc.lsc = pb.NewLeaseClient(conn) + lc.cli = cli if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil { return err } @@ -148,7 +145,7 @@ func (lc *leaseChecker) checkShortLivedLeases() error { func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) { // retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it. - var resp *pb.LeaseTimeToLiveResponse + var resp *clientv3.LeaseTimeToLiveResponse for i := 0; i < retries; i++ { resp, err = lc.getLeaseByID(ctx, leaseID) // lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound @@ -230,9 +227,13 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error { return nil } -func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) { - ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true} - return lc.lsc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) +// TODO: handle failures from "grpc.FailFast(false)" +func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) { + return lc.cli.TimeToLive( + ctx, + clientv3.LeaseID(leaseID), + clientv3.WithAttachedKeys(), + ) } func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { @@ -261,10 +262,7 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo // Since the format of keys contains about leaseID, finding keys base on "" prefix // determines whether the attached keys for a given leaseID has been deleted or not func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { - resp, err := lc.kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte(fmt.Sprintf("%d", leaseID)), - RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))), - }, grpc.FailFast(false)) + resp, err := lc.cli.Get(ctx, fmt.Sprintf("%d", leaseID), clientv3.WithPrefix()) if err != nil { lc.lg.Warn( "hasKeysAttachedToLeaseExpired failed", diff --git a/tools/functional-tester/tester/cluster.go b/tools/functional-tester/tester/cluster.go index f20953a0e..3113e8465 100644 --- a/tools/functional-tester/tester/cluster.go +++ b/tools/functional-tester/tester/cluster.go @@ -25,7 +25,6 @@ import ( "strings" "time" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/debugutil" "github.com/coreos/etcd/tools/functional-tester/rpcpb" @@ -681,31 +680,14 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { } for i, m := range clus.Members { - conn, derr := m.DialEtcdGRPCServer() - if derr != nil { - clus.lg.Warn( - "compactKV dial failed", - zap.String("endpoint", m.EtcdClientEndpoint), - zap.Error(derr), - ) - err = derr - continue - } - kvc := pb.NewKVClient(conn) - clus.lg.Info( "compacting", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), zap.Duration("timeout", timeout), ) - now := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), timeout) - _, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false)) - cancel() - - conn.Close() + cerr := m.Compact(rev, timeout) succeed := true if cerr != nil { if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 { diff --git a/tools/functional-tester/tester/stress_key.go b/tools/functional-tester/tester/stress_key.go index 9f795b937..e3868c2ae 100644 --- a/tools/functional-tester/tester/stress_key.go +++ b/tools/functional-tester/tester/stress_key.go @@ -22,9 +22,9 @@ import ( "sync/atomic" "time" + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" @@ -51,7 +51,7 @@ type keyStresser struct { wg sync.WaitGroup cancel func() - conn *grpc.ClientConn + cli *clientv3.Client // atomicModifiedKeys records the number of keys created and deleted by the stresser. atomicModifiedKeys int64 @@ -60,35 +60,33 @@ type keyStresser struct { func (s *keyStresser) Stress() error { // TODO: add backoff option - conn, err := s.m.DialEtcdGRPCServer() + cli, err := s.m.CreateEtcdClient() if err != nil { return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint) } ctx, cancel := context.WithCancel(context.Background()) s.wg.Add(s.N) - s.conn = conn + s.cli = cli s.cancel = cancel - kvc := pb.NewKVClient(conn) - var stressEntries = []stressEntry{ - {weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)}, + {weight: 0.7, f: newStressPut(cli, s.keySuffixRange, s.keySize)}, { weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize), - f: newStressPut(kvc, s.keySuffixRange, s.keyLargeSize), + f: newStressPut(cli, s.keySuffixRange, s.keyLargeSize), }, - {weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)}, - {weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)}, - {weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)}, - {weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)}, + {weight: 0.07, f: newStressRange(cli, s.keySuffixRange)}, + {weight: 0.07, f: newStressRangeInterval(cli, s.keySuffixRange)}, + {weight: 0.07, f: newStressDelete(cli, s.keySuffixRange)}, + {weight: 0.07, f: newStressDeleteInterval(cli, s.keySuffixRange)}, } if s.keyTxnSuffixRange > 0 { // adjust to make up ±70% of workloads with writes stressEntries[0].weight = 0.35 stressEntries = append(stressEntries, stressEntry{ weight: 0.35, - f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps), + f: newStressTxn(cli, s.keyTxnSuffixRange, s.keyTxnOps), }) } s.stressTable = createStressTable(stressEntries) @@ -167,7 +165,7 @@ func (s *keyStresser) Pause() { func (s *keyStresser) Close() { s.cancel() - s.conn.Close() + s.cli.Close() s.wg.Wait() s.lg.Info( @@ -216,25 +214,26 @@ func (st *stressTable) choose() stressFunc { return st.entries[idx].f } -func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc { +func newStressPut(cli *clientv3.Client, keySuffixRange, keySize int) stressFunc { return func(ctx context.Context) (error, int64) { - _, err := kvc.Put(ctx, &pb.PutRequest{ - Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), - Value: randBytes(keySize), - }, grpc.FailFast(false)) + _, err := cli.Put( + ctx, + fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)), + string(randBytes(keySize)), + ) return err, 1 } } -func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc { +func newStressTxn(cli *clientv3.Client, keyTxnSuffixRange, txnOps int) stressFunc { keys := make([]string, keyTxnSuffixRange) for i := range keys { keys[i] = fmt.Sprintf("/k%03d", i) } - return writeTxn(kvc, keys, txnOps) + return writeTxn(cli, keys, txnOps) } -func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc { +func writeTxn(cli *clientv3.Client, keys []string, txnOps int) stressFunc { return func(ctx context.Context) (error, int64) { ks := make(map[string]struct{}, txnOps) for len(ks) != txnOps { @@ -244,99 +243,75 @@ func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc { for k := range ks { selected = append(selected, k) } - com, delOp, putOp := getTxnReqs(selected[0], "bar00") - txnReq := &pb.TxnRequest{ - Compare: []*pb.Compare{com}, - Success: []*pb.RequestOp{delOp}, - Failure: []*pb.RequestOp{putOp}, - } - - // add nested txns if any - for i := 1; i < txnOps; i++ { + com, delOp, putOp := getTxnOps(selected[0], "bar00") + thenOps := []clientv3.Op{delOp} + elseOps := []clientv3.Op{putOp} + for i := 1; i < txnOps; i++ { // nested txns k, v := selected[i], fmt.Sprintf("bar%02d", i) - com, delOp, putOp = getTxnReqs(k, v) - nested := &pb.RequestOp{ - Request: &pb.RequestOp_RequestTxn{ - RequestTxn: &pb.TxnRequest{ - Compare: []*pb.Compare{com}, - Success: []*pb.RequestOp{delOp}, - Failure: []*pb.RequestOp{putOp}, - }, - }, - } - txnReq.Success = append(txnReq.Success, nested) - txnReq.Failure = append(txnReq.Failure, nested) + com, delOp, putOp = getTxnOps(k, v) + txnOp := clientv3.OpTxn( + []clientv3.Cmp{com}, + []clientv3.Op{delOp}, + []clientv3.Op{putOp}, + ) + thenOps = append(thenOps, txnOp) + elseOps = append(elseOps, txnOp) } - - _, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false)) + _, err := cli.Txn(ctx). + If(com). + Then(thenOps...). + Else(elseOps...). + Commit() return err, int64(txnOps) } } -func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) { +func getTxnOps(k, v string) ( + cmp clientv3.Cmp, + dop clientv3.Op, + pop clientv3.Op) { // if key exists (version > 0) - com = &pb.Compare{ - Key: []byte(key), - Target: pb.Compare_VERSION, - Result: pb.Compare_GREATER, - TargetUnion: &pb.Compare_Version{Version: 0}, - } - delOp = &pb.RequestOp{ - Request: &pb.RequestOp_RequestDeleteRange{ - RequestDeleteRange: &pb.DeleteRangeRequest{ - Key: []byte(key), - }, - }, - } - putOp = &pb.RequestOp{ - Request: &pb.RequestOp_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: []byte(key), - Value: []byte(val), - }, - }, - } - return com, delOp, putOp + cmp = clientv3.Compare(clientv3.Version(k), ">", 0) + dop = clientv3.OpDelete(k) + pop = clientv3.OpPut(k, v) + return cmp, dop, pop } -func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc { +func newStressRange(cli *clientv3.Client, keySuffixRange int) stressFunc { return func(ctx context.Context) (error, int64) { - _, err := kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), - }, grpc.FailFast(false)) + _, err := cli.Get(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))) return err, 0 } } -func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { +func newStressRangeInterval(cli *clientv3.Client, keySuffixRange int) stressFunc { return func(ctx context.Context) (error, int64) { start := rand.Intn(keySuffixRange) end := start + 500 - _, err := kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte(fmt.Sprintf("foo%016x", start)), - RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), - }, grpc.FailFast(false)) + _, err := cli.Get( + ctx, + fmt.Sprintf("foo%016x", start), + clientv3.WithRange(fmt.Sprintf("foo%016x", end)), + ) return err, 0 } } -func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc { +func newStressDelete(cli *clientv3.Client, keySuffixRange int) stressFunc { return func(ctx context.Context) (error, int64) { - _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ - Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), - }, grpc.FailFast(false)) + _, err := cli.Delete(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))) return err, 1 } } -func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { +func newStressDeleteInterval(cli *clientv3.Client, keySuffixRange int) stressFunc { return func(ctx context.Context) (error, int64) { start := rand.Intn(keySuffixRange) end := start + 500 - resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ - Key: []byte(fmt.Sprintf("foo%016x", start)), - RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), - }, grpc.FailFast(false)) + resp, err := cli.Delete(ctx, + fmt.Sprintf("foo%016x", start), + clientv3.WithRange(fmt.Sprintf("foo%016x", end)), + ) if err == nil { return nil, resp.Deleted } diff --git a/tools/functional-tester/tester/stress_lease.go b/tools/functional-tester/tester/stress_lease.go index 890992f75..264f3ee99 100644 --- a/tools/functional-tester/tester/stress_lease.go +++ b/tools/functional-tester/tester/stress_lease.go @@ -22,8 +22,8 @@ import ( "sync/atomic" "time" + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" @@ -41,11 +41,9 @@ type leaseStresser struct { lg *zap.Logger m *rpcpb.Member - cancel func() - conn *grpc.ClientConn - kvc pb.KVClient - lc pb.LeaseClient + cli *clientv3.Client ctx context.Context + cancel func() rateLimiter *rate.Limiter // atomicModifiedKey records the number of keys created and deleted during a test case @@ -118,7 +116,6 @@ func (ls *leaseStresser) setupOnce() error { } ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)} - return nil } @@ -132,20 +129,19 @@ func (ls *leaseStresser) Stress() error { return err } - conn, err := ls.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(1 * time.Second)) + ctx, cancel := context.WithCancel(context.Background()) + ls.ctx = ctx + ls.cancel = cancel + + cli, err := ls.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second)) if err != nil { return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint) } - ls.conn = conn - ls.kvc = pb.NewKVClient(conn) - ls.lc = pb.NewLeaseClient(conn) + ls.cli = cli + ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)} ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)} - ctx, cancel := context.WithCancel(context.Background()) - ls.cancel = cancel - ls.ctx = ctx - ls.runWg.Add(1) go ls.run() return nil @@ -299,17 +295,17 @@ func (ls *leaseStresser) randomlyDropLeases() { } func (ls *leaseStresser) createLease(ttl int64) (int64, error) { - resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl}) + resp, err := ls.cli.Grant(ls.ctx, ttl) if err != nil { return -1, err } - return resp.ID, nil + return int64(resp.ID), nil } func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { defer ls.aliveWg.Done() ctx, cancel := context.WithCancel(ls.ctx) - stream, err := ls.lc.LeaseKeepAlive(ctx) + stream, err := ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID)) defer func() { cancel() }() for { select { @@ -347,42 +343,36 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { ) cancel() ctx, cancel = context.WithCancel(ls.ctx) - stream, err = ls.lc.LeaseKeepAlive(ctx) + stream, err = ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID)) cancel() continue } + if err != nil { + ls.lg.Debug( + "keepLeaseAlive failed to receive lease keepalive response", + zap.String("endpoint", ls.m.EtcdClientEndpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(err), + ) + continue + } ls.lg.Debug( - "keepLeaseAlive stream sends lease keepalive request", + "keepLeaseAlive waiting on lease stream", zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) - err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) - if err != nil { - ls.lg.Debug( - "keepLeaseAlive stream failed to send lease keepalive request", - zap.String("endpoint", ls.m.EtcdClientEndpoint), - zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), - zap.Error(err), - ) - continue - } leaseRenewTime := time.Now() - ls.lg.Debug( - "keepLeaseAlive stream sent lease keepalive request", - zap.String("endpoint", ls.m.EtcdClientEndpoint), - zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), - ) - respRC, err := stream.Recv() - if err != nil { + respRC := <-stream + if respRC == nil { ls.lg.Debug( - "keepLeaseAlive stream failed to receive lease keepalive response", + "keepLeaseAlive received nil lease keepalive response", zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), - zap.Error(err), ) continue } + // lease expires after TTL become 0 // don't send keepalive if the lease has expired if respRC.TTL <= 0 { @@ -409,16 +399,18 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { // the format of key is the concat of leaseID + '_' + '' // e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error { - var txnPuts []*pb.RequestOp + var txnPuts []clientv3.Op for j := 0; j < ls.keysPerLease; j++ { - txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)), - Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}} + txnput := clientv3.OpPut( + fmt.Sprintf("%d%s%d", leaseID, "_", j), + fmt.Sprintf("bar"), + clientv3.WithLease(clientv3.LeaseID(leaseID)), + ) txnPuts = append(txnPuts, txnput) } // keep retrying until lease is not found or ctx is being canceled for ls.ctx.Err() == nil { - txn := &pb.TxnRequest{Success: txnPuts} - _, err := ls.kvc.Txn(ls.ctx, txn) + _, err := ls.cli.Txn(ls.ctx).Then(txnPuts...).Commit() if err == nil { // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease)) @@ -437,9 +429,10 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { if rand.Intn(2) != 0 { return false, nil } + // keep retrying until a lease is dropped or ctx is being canceled for ls.ctx.Err() == nil { - _, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID}) + _, err := ls.cli.Revoke(ls.ctx, clientv3.LeaseID(leaseID)) if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { return true, nil } @@ -454,7 +447,9 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { return false, ls.ctx.Err() } -func (ls *leaseStresser) Pause() { ls.Close() } +func (ls *leaseStresser) Pause() { + ls.Close() +} func (ls *leaseStresser) Close() { ls.lg.Info( @@ -464,7 +459,7 @@ func (ls *leaseStresser) Close() { ls.cancel() ls.runWg.Wait() ls.aliveWg.Wait() - ls.conn.Close() + ls.cli.Close() ls.lg.Info( "lease stresser is closed", zap.String("endpoint", ls.m.EtcdClientEndpoint),