functional-tester/tester: use "*rpcpb.Member" directly

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-04-03 10:58:32 -07:00
parent b55a5a9771
commit 629e5a0e7a
4 changed files with 57 additions and 62 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -29,6 +30,7 @@ import (
const retries = 7 const retries = 7
// Checker checks cluster consistency.
type Checker interface { type Checker interface {
// Check returns an error if the system fails a consistency check. // Check returns an error if the system fails a consistency check.
Check() error Check() error
@ -57,7 +59,6 @@ func (hc *hashChecker) checkRevAndHashes() (err error) {
revs map[string]int64 revs map[string]int64
hashes map[string]int64 hashes map[string]int64
) )
// retries in case of transient failure or etcd cluster has not stablized yet. // retries in case of transient failure or etcd cluster has not stablized yet.
for i := 0; i < retries; i++ { for i := 0; i < retries; i++ {
revs, hashes, err = hc.hrg.getRevisionHash() revs, hashes, err = hc.hrg.getRevisionHash()
@ -97,19 +98,17 @@ func (hc *hashChecker) Check() error {
} }
type leaseChecker struct { type leaseChecker struct {
lg *zap.Logger lg *zap.Logger
m *rpcpb.Member
endpoint string // TODO: use Member ls *leaseStresser
lsc pb.LeaseClient
ls *leaseStresser kvc pb.KVClient
leaseClient pb.LeaseClient
kvc pb.KVClient
} }
func (lc *leaseChecker) Check() error { func (lc *leaseChecker) Check() error {
conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1)) conn, err := lc.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(time.Second))
if err != nil { if err != nil {
return fmt.Errorf("%v (%s)", err, lc.ls.endpoint) return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint)
} }
defer func() { defer func() {
if conn != nil { if conn != nil {
@ -117,7 +116,7 @@ func (lc *leaseChecker) Check() error {
} }
}() }()
lc.kvc = pb.NewKVClient(conn) lc.kvc = pb.NewKVClient(conn)
lc.leaseClient = pb.NewLeaseClient(conn) lc.lsc = pb.NewLeaseClient(conn)
if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil { if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
return err return err
} }
@ -197,7 +196,7 @@ func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID in
if err != nil { if err != nil {
lc.lg.Warn( lc.lg.Warn(
"hasKeysAttachedToLeaseExpired failed", "hasKeysAttachedToLeaseExpired failed",
zap.String("endpoint", lc.endpoint), zap.String("endpoint", lc.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return err return err
@ -206,7 +205,7 @@ func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID in
if err != nil { if err != nil {
lc.lg.Warn( lc.lg.Warn(
"hasLeaseExpired failed", "hasLeaseExpired failed",
zap.String("endpoint", lc.endpoint), zap.String("endpoint", lc.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return err return err
@ -233,7 +232,7 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) { func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true} ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) return lc.lsc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
} }
func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
@ -250,7 +249,7 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo
} }
lc.lg.Warn( lc.lg.Warn(
"hasLeaseExpired getLeaseByID failed", "hasLeaseExpired getLeaseByID failed",
zap.String("endpoint", lc.endpoint), zap.String("endpoint", lc.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
@ -269,7 +268,7 @@ func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, lease
if err != nil { if err != nil {
lc.lg.Warn( lc.lg.Warn(
"hasKeysAttachedToLeaseExpired failed", "hasKeysAttachedToLeaseExpired failed",
zap.String("endpoint", lc.endpoint), zap.String("endpoint", lc.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )

View File

@ -50,7 +50,7 @@ func newStresser(clus *Cluster, idx int) Stresser {
// 'out of memory' error. Put rate limits in server side. // 'out of memory' error. Put rate limits in server side.
stressers[i] = &keyStresser{ stressers[i] = &keyStresser{
lg: clus.lg, lg: clus.lg,
Endpoint: clus.Members[idx].EtcdClientEndpoint, m: clus.Members[idx],
keySize: int(clus.Tester.StressKeySize), keySize: int(clus.Tester.StressKeySize),
keyLargeSize: int(clus.Tester.StressKeySizeLarge), keyLargeSize: int(clus.Tester.StressKeySizeLarge),
keySuffixRange: int(clus.Tester.StressKeySuffixRange), keySuffixRange: int(clus.Tester.StressKeySuffixRange),
@ -63,7 +63,7 @@ func newStresser(clus *Cluster, idx int) Stresser {
case "LEASE": case "LEASE":
stressers[i] = &leaseStresser{ stressers[i] = &leaseStresser{
lg: clus.lg, lg: clus.lg,
endpoint: clus.Members[idx].EtcdClientEndpoint, m: clus.Members[idx],
numLeases: 10, // TODO: configurable numLeases: 10, // TODO: configurable
keysPerLease: 10, // TODO: configurable keysPerLease: 10, // TODO: configurable
rateLimiter: clus.rateLimiter, rateLimiter: clus.rateLimiter,

View File

@ -25,6 +25,7 @@ import (
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -35,7 +36,7 @@ import (
type keyStresser struct { type keyStresser struct {
lg *zap.Logger lg *zap.Logger
Endpoint string // TODO: use Member m *rpcpb.Member
keySize int keySize int
keyLargeSize int keyLargeSize int
@ -59,9 +60,9 @@ type keyStresser struct {
func (s *keyStresser) Stress() error { func (s *keyStresser) Stress() error {
// TODO: add backoff option // TODO: add backoff option
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure()) conn, err := s.m.DialEtcdGRPCServer()
if err != nil { if err != nil {
return fmt.Errorf("%v (%s)", err, s.Endpoint) return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint)
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -98,7 +99,7 @@ func (s *keyStresser) Stress() error {
s.lg.Info( s.lg.Info(
"key stresser started in background", "key stresser started in background",
zap.String("endpoint", s.Endpoint), zap.String("endpoint", s.m.EtcdClientEndpoint),
) )
return nil return nil
} }
@ -152,7 +153,7 @@ func (s *keyStresser) run(ctx context.Context) {
default: default:
s.lg.Warn( s.lg.Warn(
"key stresser exited with error", "key stresser exited with error",
zap.String("endpoint", s.Endpoint), zap.String("endpoint", s.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return return
@ -171,7 +172,7 @@ func (s *keyStresser) Close() {
s.lg.Info( s.lg.Info(
"key stresser is closed", "key stresser is closed",
zap.String("endpoint", s.Endpoint), zap.String("endpoint", s.m.EtcdClientEndpoint),
) )
} }

View File

@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -39,12 +40,12 @@ const (
type leaseStresser struct { type leaseStresser struct {
lg *zap.Logger lg *zap.Logger
endpoint string m *rpcpb.Member
cancel func() cancel func()
conn *grpc.ClientConn conn *grpc.ClientConn
kvc pb.KVClient kvc pb.KVClient
lc pb.LeaseClient lc pb.LeaseClient
ctx context.Context ctx context.Context
rateLimiter *rate.Limiter rateLimiter *rate.Limiter
// atomicModifiedKey records the number of keys created and deleted during a test case // atomicModifiedKey records the number of keys created and deleted during a test case
@ -124,16 +125,16 @@ func (ls *leaseStresser) setupOnce() error {
func (ls *leaseStresser) Stress() error { func (ls *leaseStresser) Stress() error {
ls.lg.Info( ls.lg.Info(
"lease stresser is started", "lease stresser is started",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
if err := ls.setupOnce(); err != nil { if err := ls.setupOnce(); err != nil {
return err return err
} }
conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second)) conn, err := ls.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(1 * time.Second))
if err != nil { if err != nil {
return fmt.Errorf("%v (%s)", err, ls.endpoint) return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint)
} }
ls.conn = conn ls.conn = conn
ls.kvc = pb.NewKVClient(conn) ls.kvc = pb.NewKVClient(conn)
@ -163,22 +164,22 @@ func (ls *leaseStresser) run() {
ls.lg.Debug( ls.lg.Debug(
"lease stresser is creating leases", "lease stresser is creating leases",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
ls.createLeases() ls.createLeases()
ls.lg.Debug( ls.lg.Debug(
"lease stresser created leases", "lease stresser created leases",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
ls.lg.Debug( ls.lg.Debug(
"lease stresser is dropped leases", "lease stresser is dropped leases",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
ls.randomlyDropLeases() ls.randomlyDropLeases()
ls.lg.Debug( ls.lg.Debug(
"lease stresser dropped leases", "lease stresser dropped leases",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
} }
} }
@ -208,7 +209,7 @@ func (ls *leaseStresser) createAliveLeases() {
if err != nil { if err != nil {
ls.lg.Debug( ls.lg.Debug(
"createLeaseWithKeys failed", "createLeaseWithKeys failed",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return return
@ -246,7 +247,7 @@ func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
if err != nil { if err != nil {
ls.lg.Debug( ls.lg.Debug(
"createLease failed", "createLease failed",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return -1, err return -1, err
@ -254,7 +255,7 @@ func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
ls.lg.Debug( ls.lg.Debug(
"createLease created lease", "createLease created lease",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
if err := ls.attachKeysWithLease(leaseID); err != nil { if err := ls.attachKeysWithLease(leaseID); err != nil {
@ -275,7 +276,7 @@ func (ls *leaseStresser) randomlyDropLeases() {
if err != nil { if err != nil {
ls.lg.Debug( ls.lg.Debug(
"randomlyDropLease failed", "randomlyDropLease failed",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
@ -287,7 +288,7 @@ func (ls *leaseStresser) randomlyDropLeases() {
} }
ls.lg.Debug( ls.lg.Debug(
"randomlyDropLease dropped a lease", "randomlyDropLease dropped a lease",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
ls.revokedLeases.add(leaseID, time.Now()) ls.revokedLeases.add(leaseID, time.Now())
@ -316,7 +317,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
case <-ls.ctx.Done(): case <-ls.ctx.Done():
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive context canceled", "keepLeaseAlive context canceled",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(ls.ctx.Err()), zap.Error(ls.ctx.Err()),
) )
@ -330,7 +331,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
ls.aliveLeases.remove(leaseID) ls.aliveLeases.remove(leaseID)
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive lease has not been renewed, dropped it", "keepLeaseAlive lease has not been renewed, dropped it",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
} }
@ -340,7 +341,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
if err != nil { if err != nil {
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive lease creates stream error", "keepLeaseAlive lease creates stream error",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
@ -353,14 +354,14 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive stream sends lease keepalive request", "keepLeaseAlive stream sends lease keepalive request",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
if err != nil { if err != nil {
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive stream failed to send lease keepalive request", "keepLeaseAlive stream failed to send lease keepalive request",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
@ -369,14 +370,14 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
leaseRenewTime := time.Now() leaseRenewTime := time.Now()
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive stream sent lease keepalive request", "keepLeaseAlive stream sent lease keepalive request",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
respRC, err := stream.Recv() respRC, err := stream.Recv()
if err != nil { if err != nil {
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive stream failed to receive lease keepalive response", "keepLeaseAlive stream failed to receive lease keepalive response",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
@ -387,7 +388,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
if respRC.TTL <= 0 { if respRC.TTL <= 0 {
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive stream received lease keepalive response TTL <= 0", "keepLeaseAlive stream received lease keepalive response TTL <= 0",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Int64("ttl", respRC.TTL), zap.Int64("ttl", respRC.TTL),
) )
@ -397,7 +398,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
// renew lease timestamp only if lease is present // renew lease timestamp only if lease is present
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive renewed a lease", "keepLeaseAlive renewed a lease",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
ls.aliveLeases.update(leaseID, leaseRenewTime) ls.aliveLeases.update(leaseID, leaseRenewTime)
@ -446,21 +447,19 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
ls.lg.Debug( ls.lg.Debug(
"randomlyDropLease error", "randomlyDropLease error",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(ls.ctx.Err()), zap.Error(ls.ctx.Err()),
) )
return false, ls.ctx.Err() return false, ls.ctx.Err()
} }
func (ls *leaseStresser) Pause() { func (ls *leaseStresser) Pause() { ls.Close() }
ls.Close()
}
func (ls *leaseStresser) Close() { func (ls *leaseStresser) Close() {
ls.lg.Info( ls.lg.Info(
"lease stresser is closing", "lease stresser is closing",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
ls.cancel() ls.cancel()
ls.runWg.Wait() ls.runWg.Wait()
@ -468,7 +467,7 @@ func (ls *leaseStresser) Close() {
ls.conn.Close() ls.conn.Close()
ls.lg.Info( ls.lg.Info(
"lease stresser is closed", "lease stresser is closed",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
} }
@ -477,9 +476,5 @@ func (ls *leaseStresser) ModifiedKeys() int64 {
} }
func (ls *leaseStresser) Checker() Checker { func (ls *leaseStresser) Checker() Checker {
return &leaseChecker{ return &leaseChecker{lg: ls.lg, m: ls.m, ls: ls}
lg: ls.lg,
endpoint: ls.endpoint,
ls: ls,
}
} }