From 629e5a0e7a41060ea8ef49e94243f10e8d1760c0 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 3 Apr 2018 10:58:32 -0700 Subject: [PATCH] functional-tester/tester: use "*rpcpb.Member" directly Signed-off-by: Gyuho Lee --- tools/functional-tester/tester/checks.go | 31 ++++---- tools/functional-tester/tester/stress.go | 4 +- tools/functional-tester/tester/stress_key.go | 13 ++-- .../functional-tester/tester/stress_lease.go | 71 +++++++++---------- 4 files changed, 57 insertions(+), 62 deletions(-) diff --git a/tools/functional-tester/tester/checks.go b/tools/functional-tester/tester/checks.go index f18e391ad..6c1c0a75b 100644 --- a/tools/functional-tester/tester/checks.go +++ b/tools/functional-tester/tester/checks.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" "google.golang.org/grpc" @@ -29,6 +30,7 @@ import ( const retries = 7 +// Checker checks cluster consistency. type Checker interface { // Check returns an error if the system fails a consistency check. Check() error @@ -57,7 +59,6 @@ func (hc *hashChecker) checkRevAndHashes() (err error) { revs map[string]int64 hashes map[string]int64 ) - // retries in case of transient failure or etcd cluster has not stablized yet. for i := 0; i < retries; i++ { revs, hashes, err = hc.hrg.getRevisionHash() @@ -97,19 +98,17 @@ func (hc *hashChecker) Check() error { } type leaseChecker struct { - lg *zap.Logger - - endpoint string // TODO: use Member - - ls *leaseStresser - leaseClient pb.LeaseClient - kvc pb.KVClient + lg *zap.Logger + m *rpcpb.Member + ls *leaseStresser + lsc pb.LeaseClient + kvc pb.KVClient } func (lc *leaseChecker) Check() error { - conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1)) + conn, err := lc.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(time.Second)) if err != nil { - return fmt.Errorf("%v (%s)", err, lc.ls.endpoint) + return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint) } defer func() { if conn != nil { @@ -117,7 +116,7 @@ func (lc *leaseChecker) Check() error { } }() lc.kvc = pb.NewKVClient(conn) - lc.leaseClient = pb.NewLeaseClient(conn) + lc.lsc = pb.NewLeaseClient(conn) if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil { return err } @@ -197,7 +196,7 @@ func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID in if err != nil { lc.lg.Warn( "hasKeysAttachedToLeaseExpired failed", - zap.String("endpoint", lc.endpoint), + zap.String("endpoint", lc.m.EtcdClientEndpoint), zap.Error(err), ) return err @@ -206,7 +205,7 @@ func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID in if err != nil { lc.lg.Warn( "hasLeaseExpired failed", - zap.String("endpoint", lc.endpoint), + zap.String("endpoint", lc.m.EtcdClientEndpoint), zap.Error(err), ) return err @@ -233,7 +232,7 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error { func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) { ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true} - return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) + return lc.lsc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) } func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { @@ -250,7 +249,7 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo } lc.lg.Warn( "hasLeaseExpired getLeaseByID failed", - zap.String("endpoint", lc.endpoint), + zap.String("endpoint", lc.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) @@ -269,7 +268,7 @@ func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, lease if err != nil { lc.lg.Warn( "hasKeysAttachedToLeaseExpired failed", - zap.String("endpoint", lc.endpoint), + zap.String("endpoint", lc.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) diff --git a/tools/functional-tester/tester/stress.go b/tools/functional-tester/tester/stress.go index 646fed1f8..421a6d93f 100644 --- a/tools/functional-tester/tester/stress.go +++ b/tools/functional-tester/tester/stress.go @@ -50,7 +50,7 @@ func newStresser(clus *Cluster, idx int) Stresser { // 'out of memory' error. Put rate limits in server side. stressers[i] = &keyStresser{ lg: clus.lg, - Endpoint: clus.Members[idx].EtcdClientEndpoint, + m: clus.Members[idx], keySize: int(clus.Tester.StressKeySize), keyLargeSize: int(clus.Tester.StressKeySizeLarge), keySuffixRange: int(clus.Tester.StressKeySuffixRange), @@ -63,7 +63,7 @@ func newStresser(clus *Cluster, idx int) Stresser { case "LEASE": stressers[i] = &leaseStresser{ lg: clus.lg, - endpoint: clus.Members[idx].EtcdClientEndpoint, + m: clus.Members[idx], numLeases: 10, // TODO: configurable keysPerLease: 10, // TODO: configurable rateLimiter: clus.rateLimiter, diff --git a/tools/functional-tester/tester/stress_key.go b/tools/functional-tester/tester/stress_key.go index 49fba0b26..9f795b937 100644 --- a/tools/functional-tester/tester/stress_key.go +++ b/tools/functional-tester/tester/stress_key.go @@ -25,6 +25,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" "golang.org/x/time/rate" @@ -35,7 +36,7 @@ import ( type keyStresser struct { lg *zap.Logger - Endpoint string // TODO: use Member + m *rpcpb.Member keySize int keyLargeSize int @@ -59,9 +60,9 @@ type keyStresser struct { func (s *keyStresser) Stress() error { // TODO: add backoff option - conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure()) + conn, err := s.m.DialEtcdGRPCServer() if err != nil { - return fmt.Errorf("%v (%s)", err, s.Endpoint) + return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint) } ctx, cancel := context.WithCancel(context.Background()) @@ -98,7 +99,7 @@ func (s *keyStresser) Stress() error { s.lg.Info( "key stresser started in background", - zap.String("endpoint", s.Endpoint), + zap.String("endpoint", s.m.EtcdClientEndpoint), ) return nil } @@ -152,7 +153,7 @@ func (s *keyStresser) run(ctx context.Context) { default: s.lg.Warn( "key stresser exited with error", - zap.String("endpoint", s.Endpoint), + zap.String("endpoint", s.m.EtcdClientEndpoint), zap.Error(err), ) return @@ -171,7 +172,7 @@ func (s *keyStresser) Close() { s.lg.Info( "key stresser is closed", - zap.String("endpoint", s.Endpoint), + zap.String("endpoint", s.m.EtcdClientEndpoint), ) } diff --git a/tools/functional-tester/tester/stress_lease.go b/tools/functional-tester/tester/stress_lease.go index 3c5f3c2d9..890992f75 100644 --- a/tools/functional-tester/tester/stress_lease.go +++ b/tools/functional-tester/tester/stress_lease.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" "golang.org/x/time/rate" @@ -39,12 +40,12 @@ const ( type leaseStresser struct { lg *zap.Logger - endpoint string - cancel func() - conn *grpc.ClientConn - kvc pb.KVClient - lc pb.LeaseClient - ctx context.Context + m *rpcpb.Member + cancel func() + conn *grpc.ClientConn + kvc pb.KVClient + lc pb.LeaseClient + ctx context.Context rateLimiter *rate.Limiter // atomicModifiedKey records the number of keys created and deleted during a test case @@ -124,16 +125,16 @@ func (ls *leaseStresser) setupOnce() error { func (ls *leaseStresser) Stress() error { ls.lg.Info( "lease stresser is started", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) if err := ls.setupOnce(); err != nil { return err } - conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second)) + conn, err := ls.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(1 * time.Second)) if err != nil { - return fmt.Errorf("%v (%s)", err, ls.endpoint) + return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint) } ls.conn = conn ls.kvc = pb.NewKVClient(conn) @@ -163,22 +164,22 @@ func (ls *leaseStresser) run() { ls.lg.Debug( "lease stresser is creating leases", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) ls.createLeases() ls.lg.Debug( "lease stresser created leases", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) ls.lg.Debug( "lease stresser is dropped leases", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) ls.randomlyDropLeases() ls.lg.Debug( "lease stresser dropped leases", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) } } @@ -208,7 +209,7 @@ func (ls *leaseStresser) createAliveLeases() { if err != nil { ls.lg.Debug( "createLeaseWithKeys failed", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.Error(err), ) return @@ -246,7 +247,7 @@ func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) { if err != nil { ls.lg.Debug( "createLease failed", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.Error(err), ) return -1, err @@ -254,7 +255,7 @@ func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) { ls.lg.Debug( "createLease created lease", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) if err := ls.attachKeysWithLease(leaseID); err != nil { @@ -275,7 +276,7 @@ func (ls *leaseStresser) randomlyDropLeases() { if err != nil { ls.lg.Debug( "randomlyDropLease failed", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) @@ -287,7 +288,7 @@ func (ls *leaseStresser) randomlyDropLeases() { } ls.lg.Debug( "randomlyDropLease dropped a lease", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) ls.revokedLeases.add(leaseID, time.Now()) @@ -316,7 +317,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { case <-ls.ctx.Done(): ls.lg.Debug( "keepLeaseAlive context canceled", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(ls.ctx.Err()), ) @@ -330,7 +331,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { ls.aliveLeases.remove(leaseID) ls.lg.Debug( "keepLeaseAlive lease has not been renewed, dropped it", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) } @@ -340,7 +341,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { if err != nil { ls.lg.Debug( "keepLeaseAlive lease creates stream error", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) @@ -353,14 +354,14 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { ls.lg.Debug( "keepLeaseAlive stream sends lease keepalive request", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) if err != nil { ls.lg.Debug( "keepLeaseAlive stream failed to send lease keepalive request", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) @@ -369,14 +370,14 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { leaseRenewTime := time.Now() ls.lg.Debug( "keepLeaseAlive stream sent lease keepalive request", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) respRC, err := stream.Recv() if err != nil { ls.lg.Debug( "keepLeaseAlive stream failed to receive lease keepalive response", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) @@ -387,7 +388,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { if respRC.TTL <= 0 { ls.lg.Debug( "keepLeaseAlive stream received lease keepalive response TTL <= 0", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Int64("ttl", respRC.TTL), ) @@ -397,7 +398,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { // renew lease timestamp only if lease is present ls.lg.Debug( "keepLeaseAlive renewed a lease", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) ls.aliveLeases.update(leaseID, leaseRenewTime) @@ -446,21 +447,19 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { ls.lg.Debug( "randomlyDropLease error", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(ls.ctx.Err()), ) return false, ls.ctx.Err() } -func (ls *leaseStresser) Pause() { - ls.Close() -} +func (ls *leaseStresser) Pause() { ls.Close() } func (ls *leaseStresser) Close() { ls.lg.Info( "lease stresser is closing", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) ls.cancel() ls.runWg.Wait() @@ -468,7 +467,7 @@ func (ls *leaseStresser) Close() { ls.conn.Close() ls.lg.Info( "lease stresser is closed", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) } @@ -477,9 +476,5 @@ func (ls *leaseStresser) ModifiedKeys() int64 { } func (ls *leaseStresser) Checker() Checker { - return &leaseChecker{ - lg: ls.lg, - endpoint: ls.endpoint, - ls: ls, - } + return &leaseChecker{lg: ls.lg, m: ls.m, ls: ls} }