From f574a9aaed5af98b6fd8215332ef710f6c48f87c Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 11 Apr 2018 19:22:54 -0700 Subject: [PATCH] functional/tester: add "EtcdClientEndpoints" to "Checker" Signed-off-by: Gyuho Lee --- functional/tester/checker.go | 3 +++ functional/tester/checker_kv_hash.go | 21 ++++++++++----------- functional/tester/checker_lease_expire.go | 8 ++++++-- functional/tester/checker_no_check.go | 7 ++++--- functional/tester/checker_runner.go | 16 +++++++++++----- functional/tester/cluster.go | 14 ++++++++++++-- functional/tester/stresser.go | 4 ++++ functional/tester/stresser_runner.go | 21 ++++++++++++--------- 8 files changed, 62 insertions(+), 32 deletions(-) diff --git a/functional/tester/checker.go b/functional/tester/checker.go index c4894f46d..48e98cb0d 100644 --- a/functional/tester/checker.go +++ b/functional/tester/checker.go @@ -20,6 +20,9 @@ import "github.com/coreos/etcd/functional/rpcpb" type Checker interface { // Type returns the checker type. Type() rpcpb.Checker + // EtcdClientEndpoints returns the client endpoints of + // all checker target nodes.. + EtcdClientEndpoints() []string // Check returns an error if the system fails a consistency check. Check() error } diff --git a/functional/tester/checker_kv_hash.go b/functional/tester/checker_kv_hash.go index d7acf4a8e..20343bc01 100644 --- a/functional/tester/checker_kv_hash.go +++ b/functional/tester/checker_kv_hash.go @@ -25,21 +25,16 @@ import ( const retries = 7 -type hashRevGetter interface { - getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error) -} - type kvHashChecker struct { ctype rpcpb.Checker lg *zap.Logger - hrg hashRevGetter + clus *Cluster } -func newKVHashChecker(lg *zap.Logger, hrg hashRevGetter) Checker { +func newKVHashChecker(clus *Cluster) Checker { return &kvHashChecker{ ctype: rpcpb.Checker_KV_HASH, - lg: lg, - hrg: hrg, + clus: clus, } } @@ -50,9 +45,9 @@ func (hc *kvHashChecker) checkRevAndHashes() (err error) { ) // retries in case of transient failure or etcd cluster has not stablized yet. for i := 0; i < retries; i++ { - revs, hashes, err = hc.hrg.getRevisionHash() + revs, hashes, err = hc.clus.getRevisionHash() if err != nil { - hc.lg.Warn( + hc.clus.lg.Warn( "failed to get revision and hash", zap.Int("retries", i), zap.Error(err), @@ -63,7 +58,7 @@ func (hc *kvHashChecker) checkRevAndHashes() (err error) { if sameRev && sameHashes { return nil } - hc.lg.Warn( + hc.clus.lg.Warn( "retrying; etcd cluster is not stable", zap.Int("retries", i), zap.Bool("same-revisions", sameRev), @@ -86,6 +81,10 @@ func (hc *kvHashChecker) Type() rpcpb.Checker { return hc.ctype } +func (hc *kvHashChecker) EtcdClientEndpoints() []string { + return hc.clus.EtcdClientEndpoints() +} + func (hc *kvHashChecker) Check() error { return hc.checkRevAndHashes() } diff --git a/functional/tester/checker_lease_expire.go b/functional/tester/checker_lease_expire.go index 95ca33f0d..a89742128 100644 --- a/functional/tester/checker_lease_expire.go +++ b/functional/tester/checker_lease_expire.go @@ -44,12 +44,14 @@ func newLeaseExpireChecker(ls *leaseStresser) Checker { } } -const leaseExpireCheckerTimeout = 10 * time.Second - func (lc *leaseExpireChecker) Type() rpcpb.Checker { return lc.ctype } +func (lc *leaseExpireChecker) EtcdClientEndpoints() []string { + return []string{lc.m.EtcdClientEndpoint} +} + func (lc *leaseExpireChecker) Check() error { if lc.ls == nil { return nil @@ -81,6 +83,8 @@ func (lc *leaseExpireChecker) Check() error { return lc.checkShortLivedLeases() } +const leaseExpireCheckerTimeout = 10 * time.Second + // checkShortLivedLeases ensures leases expire. func (lc *leaseExpireChecker) checkShortLivedLeases() error { ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout) diff --git a/functional/tester/checker_no_check.go b/functional/tester/checker_no_check.go index e841703d7..d36702319 100644 --- a/functional/tester/checker_no_check.go +++ b/functional/tester/checker_no_check.go @@ -18,6 +18,7 @@ import "github.com/coreos/etcd/functional/rpcpb" type noCheck struct{} -func newNoChecker() Checker { return &noCheck{} } -func (nc *noCheck) Type() rpcpb.Checker { return rpcpb.Checker_NO_CHECK } -func (nc *noCheck) Check() error { return nil } +func newNoChecker() Checker { return &noCheck{} } +func (nc *noCheck) Type() rpcpb.Checker { return rpcpb.Checker_NO_CHECK } +func (nc *noCheck) EtcdClientEndpoints() []string { return nil } +func (nc *noCheck) Check() error { return nil } diff --git a/functional/tester/checker_runner.go b/functional/tester/checker_runner.go index 6a09349d9..a5b7ff4d1 100644 --- a/functional/tester/checker_runner.go +++ b/functional/tester/checker_runner.go @@ -17,14 +17,16 @@ package tester import "github.com/coreos/etcd/functional/rpcpb" type runnerChecker struct { - ctype rpcpb.Checker - errc chan error + ctype rpcpb.Checker + etcdClientEndpoint string + errc chan error } -func newRunnerChecker(errc chan error) Checker { +func newRunnerChecker(ep string, errc chan error) Checker { return &runnerChecker{ - ctype: rpcpb.Checker_RUNNER, - errc: errc, + ctype: rpcpb.Checker_RUNNER, + etcdClientEndpoint: ep, + errc: errc, } } @@ -32,6 +34,10 @@ func (rc *runnerChecker) Type() rpcpb.Checker { return rc.ctype } +func (rc *runnerChecker) EtcdClientEndpoints() []string { + return []string{rc.etcdClientEndpoint} +} + func (rc *runnerChecker) Check() error { select { case err := <-rc.errc: diff --git a/functional/tester/cluster.go b/functional/tester/cluster.go index b606ddb60..5bd7a1709 100644 --- a/functional/tester/cluster.go +++ b/functional/tester/cluster.go @@ -123,6 +123,15 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) { return clus, nil } +// EtcdClientEndpoints returns all etcd client endpoints. +func (clus *Cluster) EtcdClientEndpoints() (css []string) { + css = make([]string, len(clus.Members)) + for i := range clus.Members { + css[i] = clus.Members[i].EtcdClientEndpoint + } + return css +} + func (clus *Cluster) serveTesterServer() { clus.lg.Info( "started tester HTTP server", @@ -297,7 +306,7 @@ func (clus *Cluster) setStresserChecker() { for _, cs := range clus.Tester.Checkers { switch cs { case "KV_HASH": - clus.checkers = append(clus.checkers, newKVHashChecker(clus.lg, hashRevGetter(clus))) + clus.checkers = append(clus.checkers, newKVHashChecker(clus)) case "LEASE_EXPIRE": for _, ls := range lss { @@ -306,7 +315,7 @@ func (clus *Cluster) setStresserChecker() { case "RUNNER": for _, rs := range rss { - clus.checkers = append(clus.checkers, newRunnerChecker(rs.errc)) + clus.checkers = append(clus.checkers, newRunnerChecker(rs.etcdClientEndpoint, rs.errc)) } case "NO_CHECK": @@ -335,6 +344,7 @@ func (clus *Cluster) runCheckers() (err error) { clus.lg.Warn( "consistency check FAIL", zap.String("checker", chk.Type().String()), + zap.Strings("client-endpoints", chk.EtcdClientEndpoints()), zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Error(err), diff --git a/functional/tester/stresser.go b/functional/tester/stresser.go index 7753bf8ea..b74b84b15 100644 --- a/functional/tester/stresser.go +++ b/functional/tester/stresser.go @@ -85,6 +85,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { } stressers[i] = newRunnerStresser( rpcpb.Stresser_ELECTION_RUNNER, + m.EtcdClientEndpoint, clus.lg, clus.Tester.RunnerExecPath, args, @@ -106,6 +107,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { } stressers[i] = newRunnerStresser( rpcpb.Stresser_WATCH_RUNNER, + m.EtcdClientEndpoint, clus.lg, clus.Tester.RunnerExecPath, args, @@ -125,6 +127,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { } stressers[i] = newRunnerStresser( rpcpb.Stresser_LOCK_RACER_RUNNER, + m.EtcdClientEndpoint, clus.lg, clus.Tester.RunnerExecPath, args, @@ -140,6 +143,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { } stressers[i] = newRunnerStresser( rpcpb.Stresser_LEASE_RUNNER, + m.EtcdClientEndpoint, clus.lg, clus.Tester.RunnerExecPath, args, diff --git a/functional/tester/stresser_runner.go b/functional/tester/stresser_runner.go index 7a3da4259..18487f402 100644 --- a/functional/tester/stresser_runner.go +++ b/functional/tester/stresser_runner.go @@ -27,8 +27,9 @@ import ( ) type runnerStresser struct { - stype rpcpb.Stresser - lg *zap.Logger + stype rpcpb.Stresser + etcdClientEndpoint string + lg *zap.Logger cmd *exec.Cmd cmdStr string @@ -42,6 +43,7 @@ type runnerStresser struct { func newRunnerStresser( stype rpcpb.Stresser, + ep string, lg *zap.Logger, cmdStr string, args []string, @@ -50,13 +52,14 @@ func newRunnerStresser( ) *runnerStresser { rl.SetLimit(rl.Limit() - rate.Limit(reqRate)) return &runnerStresser{ - stype: stype, - cmdStr: cmdStr, - args: args, - rl: rl, - reqRate: reqRate, - errc: make(chan error, 1), - donec: make(chan struct{}), + stype: stype, + etcdClientEndpoint: ep, + cmdStr: cmdStr, + args: args, + rl: rl, + reqRate: reqRate, + errc: make(chan error, 1), + donec: make(chan struct{}), } }