diff --git a/tools/functional-tester/tester/stress_lease.go b/tools/functional-tester/tester/stress_lease.go index 890992f75..52d5a2ba0 100644 --- a/tools/functional-tester/tester/stress_lease.go +++ b/tools/functional-tester/tester/stress_lease.go @@ -22,8 +22,8 @@ import ( "sync/atomic" "time" + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" @@ -41,11 +41,9 @@ type leaseStresser struct { lg *zap.Logger m *rpcpb.Member - cancel func() - conn *grpc.ClientConn - kvc pb.KVClient - lc pb.LeaseClient + cli *clientv3.Client ctx context.Context + cancel func() rateLimiter *rate.Limiter // atomicModifiedKey records the number of keys created and deleted during a test case @@ -118,7 +116,6 @@ func (ls *leaseStresser) setupOnce() error { } ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)} - return nil } @@ -132,20 +129,19 @@ func (ls *leaseStresser) Stress() error { return err } - conn, err := ls.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(1 * time.Second)) + ctx, cancel := context.WithCancel(context.Background()) + ls.ctx = ctx + ls.cancel = cancel + + cli, err := ls.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second)) if err != nil { return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint) } - ls.conn = conn - ls.kvc = pb.NewKVClient(conn) - ls.lc = pb.NewLeaseClient(conn) + ls.cli = cli + ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)} ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)} - ctx, cancel := context.WithCancel(context.Background()) - ls.cancel = cancel - ls.ctx = ctx - ls.runWg.Add(1) go ls.run() return nil @@ -299,17 +295,17 @@ func (ls *leaseStresser) randomlyDropLeases() { } func (ls *leaseStresser) createLease(ttl int64) (int64, error) { - resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl}) + resp, err := ls.cli.Grant(ls.ctx, ttl) if err != nil { return -1, err } - return resp.ID, nil + return int64(resp.ID), nil } func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { defer ls.aliveWg.Done() ctx, cancel := context.WithCancel(ls.ctx) - stream, err := ls.lc.LeaseKeepAlive(ctx) + stream, err := ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID)) defer func() { cancel() }() for { select { @@ -347,36 +343,21 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { ) cancel() ctx, cancel = context.WithCancel(ls.ctx) - stream, err = ls.lc.LeaseKeepAlive(ctx) + stream, err = ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID)) cancel() continue } ls.lg.Debug( - "keepLeaseAlive stream sends lease keepalive request", + "keepLeaseAlive waiting on lease stream", zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) - err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) - if err != nil { - ls.lg.Debug( - "keepLeaseAlive stream failed to send lease keepalive request", - zap.String("endpoint", ls.m.EtcdClientEndpoint), - zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), - zap.Error(err), - ) - continue - } leaseRenewTime := time.Now() - ls.lg.Debug( - "keepLeaseAlive stream sent lease keepalive request", - zap.String("endpoint", ls.m.EtcdClientEndpoint), - zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), - ) - respRC, err := stream.Recv() + respRC := <-stream if err != nil { ls.lg.Debug( - "keepLeaseAlive stream failed to receive lease keepalive response", + "keepLeaseAlive failed to receive lease keepalive response", zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), @@ -409,16 +390,18 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { // the format of key is the concat of leaseID + '_' + '' // e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error { - var txnPuts []*pb.RequestOp + var txnPuts []clientv3.Op for j := 0; j < ls.keysPerLease; j++ { - txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)), - Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}} + txnput := clientv3.OpPut( + fmt.Sprintf("%d%s%d", leaseID, "_", j), + fmt.Sprintf("bar"), + clientv3.WithLease(clientv3.LeaseID(leaseID)), + ) txnPuts = append(txnPuts, txnput) } // keep retrying until lease is not found or ctx is being canceled for ls.ctx.Err() == nil { - txn := &pb.TxnRequest{Success: txnPuts} - _, err := ls.kvc.Txn(ls.ctx, txn) + _, err := ls.cli.Txn(ls.ctx).Then(txnPuts...).Commit() if err == nil { // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease)) @@ -437,9 +420,10 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { if rand.Intn(2) != 0 { return false, nil } + // keep retrying until a lease is dropped or ctx is being canceled for ls.ctx.Err() == nil { - _, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID}) + _, err := ls.cli.Revoke(ls.ctx, clientv3.LeaseID(leaseID)) if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { return true, nil } @@ -454,7 +438,9 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { return false, ls.ctx.Err() } -func (ls *leaseStresser) Pause() { ls.Close() } +func (ls *leaseStresser) Pause() { + ls.Close() +} func (ls *leaseStresser) Close() { ls.lg.Info( @@ -464,7 +450,7 @@ func (ls *leaseStresser) Close() { ls.cancel() ls.runWg.Wait() ls.aliveWg.Wait() - ls.conn.Close() + ls.cli.Close() ls.lg.Info( "lease stresser is closed", zap.String("endpoint", ls.m.EtcdClientEndpoint),