From e9c4bad2d11bfb14a6ce4c98078d8a79fd8706db Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 11 Apr 2018 19:10:01 -0700 Subject: [PATCH] functional/tester: add "Checker", remove compositeChecker Signed-off-by: Gyuho Lee --- functional.yaml | 7 +- functional/tester/checker.go | 25 +++ functional/tester/checker_kv_hash.go | 91 +++++++++++ .../{checks.go => checker_lease_expire.go} | 142 +++--------------- functional/tester/checker_no_check.go | 23 +++ functional/tester/checker_runner.go | 42 ++++++ functional/tester/cluster.go | 73 ++++++--- functional/tester/cluster_read_config.go | 11 +- functional/tester/cluster_run.go | 4 +- functional/tester/cluster_test.go | 4 +- functional/tester/{stress.go => stresser.go} | 22 ++- ...ess_composite.go => stresser_composite.go} | 13 -- .../tester/{stress_key.go => stresser_key.go} | 4 +- .../{stress_lease.go => stresser_lease.go} | 6 +- .../{stress_runner.go => stresser_runner.go} | 8 +- 15 files changed, 287 insertions(+), 188 deletions(-) create mode 100644 functional/tester/checker.go create mode 100644 functional/tester/checker_kv_hash.go rename functional/tester/{checks.go => checker_lease_expire.go} (61%) create mode 100644 functional/tester/checker_no_check.go create mode 100644 functional/tester/checker_runner.go rename functional/tester/{stress.go => stresser.go} (87%) rename functional/tester/{stress_composite.go => stresser_composite.go} (87%) rename functional/tester/{stress_key.go => stresser_key.go} (99%) rename functional/tester/{stress_lease.go => stresser_lease.go} (99%) rename functional/tester/{stress_runner.go => stresser_runner.go} (94%) diff --git a/functional.yaml b/functional.yaml index 6f5527659..933bb2d3c 100644 --- a/functional.yaml +++ b/functional.yaml @@ -157,7 +157,6 @@ tester-config: round-limit: 1 exit-on-failure: true - consistency-check: true enable-pprof: true case-delay-ms: 7000 @@ -205,7 +204,7 @@ tester-config: runner-exec-path: ./bin/etcd-runner external-exec-path: "" - stress-types: + stressers: - KV - LEASE # - ELECTION_RUNNER @@ -213,6 +212,10 @@ tester-config: # - LOCK_RACER_RUNNER # - LEASE_RUNNER + checkers: + - KV_HASH + - LEASE_EXPIRE + stress-key-size: 100 stress-key-size-large: 32769 stress-key-suffix-range: 250000 diff --git a/functional/tester/checker.go b/functional/tester/checker.go new file mode 100644 index 000000000..c4894f46d --- /dev/null +++ b/functional/tester/checker.go @@ -0,0 +1,25 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import "github.com/coreos/etcd/functional/rpcpb" + +// Checker checks cluster consistency. +type Checker interface { + // Type returns the checker type. + Type() rpcpb.Checker + // Check returns an error if the system fails a consistency check. + Check() error +} diff --git a/functional/tester/checker_kv_hash.go b/functional/tester/checker_kv_hash.go new file mode 100644 index 000000000..d7acf4a8e --- /dev/null +++ b/functional/tester/checker_kv_hash.go @@ -0,0 +1,91 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "fmt" + "time" + + "github.com/coreos/etcd/functional/rpcpb" + + "go.uber.org/zap" +) + +const retries = 7 + +type hashRevGetter interface { + getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error) +} + +type kvHashChecker struct { + ctype rpcpb.Checker + lg *zap.Logger + hrg hashRevGetter +} + +func newKVHashChecker(lg *zap.Logger, hrg hashRevGetter) Checker { + return &kvHashChecker{ + ctype: rpcpb.Checker_KV_HASH, + lg: lg, + hrg: hrg, + } +} + +func (hc *kvHashChecker) checkRevAndHashes() (err error) { + var ( + revs map[string]int64 + hashes map[string]int64 + ) + // retries in case of transient failure or etcd cluster has not stablized yet. + for i := 0; i < retries; i++ { + revs, hashes, err = hc.hrg.getRevisionHash() + if err != nil { + hc.lg.Warn( + "failed to get revision and hash", + zap.Int("retries", i), + zap.Error(err), + ) + } else { + sameRev := getSameValue(revs) + sameHashes := getSameValue(hashes) + if sameRev && sameHashes { + return nil + } + hc.lg.Warn( + "retrying; etcd cluster is not stable", + zap.Int("retries", i), + zap.Bool("same-revisions", sameRev), + zap.Bool("same-hashes", sameHashes), + zap.String("revisions", fmt.Sprintf("%+v", revs)), + zap.String("hashes", fmt.Sprintf("%+v", hashes)), + ) + } + time.Sleep(time.Second) + } + + if err != nil { + return fmt.Errorf("failed revision and hash check (%v)", err) + } + + return fmt.Errorf("etcd cluster is not stable: [revisions: %v] and [hashes: %v]", revs, hashes) +} + +func (hc *kvHashChecker) Type() rpcpb.Checker { + return hc.ctype +} + +func (hc *kvHashChecker) Check() error { + return hc.checkRevAndHashes() +} diff --git a/functional/tester/checks.go b/functional/tester/checker_lease_expire.go similarity index 61% rename from functional/tester/checks.go rename to functional/tester/checker_lease_expire.go index d628e218f..95ca33f0d 100644 --- a/functional/tester/checks.go +++ b/functional/tester/checker_lease_expire.go @@ -27,83 +27,30 @@ import ( "google.golang.org/grpc" ) -const retries = 7 - -// Checker checks cluster consistency. -type Checker interface { - // Check returns an error if the system fails a consistency check. - Check() error +type leaseExpireChecker struct { + ctype rpcpb.Checker + lg *zap.Logger + m *rpcpb.Member + ls *leaseStresser + cli *clientv3.Client } -type hashAndRevGetter interface { - getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error) -} - -type hashChecker struct { - lg *zap.Logger - hrg hashAndRevGetter -} - -func newHashChecker(lg *zap.Logger, hrg hashAndRevGetter) Checker { - return &hashChecker{ - lg: lg, - hrg: hrg, +func newLeaseExpireChecker(ls *leaseStresser) Checker { + return &leaseExpireChecker{ + ctype: rpcpb.Checker_LEASE_EXPIRE, + lg: ls.lg, + m: ls.m, + ls: ls, } } -const leaseCheckerTimeout = 10 * time.Second +const leaseExpireCheckerTimeout = 10 * time.Second -func (hc *hashChecker) checkRevAndHashes() (err error) { - var ( - revs map[string]int64 - hashes map[string]int64 - ) - // retries in case of transient failure or etcd cluster has not stablized yet. - for i := 0; i < retries; i++ { - revs, hashes, err = hc.hrg.getRevisionHash() - if err != nil { - hc.lg.Warn( - "failed to get revision and hash", - zap.Int("retries", i), - zap.Error(err), - ) - } else { - sameRev := getSameValue(revs) - sameHashes := getSameValue(hashes) - if sameRev && sameHashes { - return nil - } - hc.lg.Warn( - "retrying; etcd cluster is not stable", - zap.Int("retries", i), - zap.Bool("same-revisions", sameRev), - zap.Bool("same-hashes", sameHashes), - zap.String("revisions", fmt.Sprintf("%+v", revs)), - zap.String("hashes", fmt.Sprintf("%+v", hashes)), - ) - } - time.Sleep(time.Second) - } - - if err != nil { - return fmt.Errorf("failed revision and hash check (%v)", err) - } - - return fmt.Errorf("etcd cluster is not stable: [revisions: %v] and [hashes: %v]", revs, hashes) +func (lc *leaseExpireChecker) Type() rpcpb.Checker { + return lc.ctype } -func (hc *hashChecker) Check() error { - return hc.checkRevAndHashes() -} - -type leaseChecker struct { - lg *zap.Logger - m *rpcpb.Member - ls *leaseStresser - cli *clientv3.Client -} - -func (lc *leaseChecker) Check() error { +func (lc *leaseExpireChecker) Check() error { if lc.ls == nil { return nil } @@ -135,8 +82,8 @@ func (lc *leaseChecker) Check() error { } // checkShortLivedLeases ensures leases expire. -func (lc *leaseChecker) checkShortLivedLeases() error { - ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout) +func (lc *leaseExpireChecker) checkShortLivedLeases() error { + ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout) errc := make(chan error) defer cancel() for leaseID := range lc.ls.shortLivedLeases.leases { @@ -154,7 +101,7 @@ func (lc *leaseChecker) checkShortLivedLeases() error { return errsToError(errs) } -func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) { +func (lc *leaseExpireChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) { // retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it. var resp *clientv3.LeaseTimeToLiveResponse for i := 0; i < retries; i++ { @@ -199,7 +146,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) return err } -func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error { +func (lc *leaseExpireChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error { keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID) if err != nil { lc.lg.Warn( @@ -227,8 +174,8 @@ func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID in return nil } -func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error { - ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout) +func (lc *leaseExpireChecker) check(expired bool, leases map[int64]time.Time) error { + ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout) defer cancel() for leaseID := range leases { if err := lc.checkLease(ctx, expired, leaseID); err != nil { @@ -239,7 +186,7 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error { } // TODO: handle failures from "grpc.FailFast(false)" -func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) { +func (lc *leaseExpireChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) { return lc.cli.TimeToLive( ctx, clientv3.LeaseID(leaseID), @@ -247,7 +194,7 @@ func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clien ) } -func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { +func (lc *leaseExpireChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { // keep retrying until lease's state is known or ctx is being canceled for ctx.Err() == nil { resp, err := lc.getLeaseByID(ctx, leaseID) @@ -272,7 +219,7 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo // The keys attached to the lease has the format of "_" where idx is the ordering key creation // Since the format of keys contains about leaseID, finding keys base on "" prefix // determines whether the attached keys for a given leaseID has been deleted or not -func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { +func (lc *leaseExpireChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { resp, err := lc.cli.Get(ctx, fmt.Sprintf("%d", leaseID), clientv3.WithPrefix()) if err != nil { lc.lg.Warn( @@ -285,42 +232,3 @@ func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, lease } return len(resp.Kvs) == 0, nil } - -// compositeChecker implements a checker that runs a slice of Checkers concurrently. -type compositeChecker struct{ checkers []Checker } - -func newCompositeChecker(checkers []Checker) Checker { - return &compositeChecker{checkers} -} - -func (cchecker *compositeChecker) Check() error { - errc := make(chan error) - for _, c := range cchecker.checkers { - go func(chk Checker) { errc <- chk.Check() }(c) - } - var errs []error - for range cchecker.checkers { - if err := <-errc; err != nil { - errs = append(errs, err) - } - } - return errsToError(errs) -} - -type runnerChecker struct { - errc chan error -} - -func (rc *runnerChecker) Check() error { - select { - case err := <-rc.errc: - return err - default: - return nil - } -} - -type noChecker struct{} - -func newNoChecker() Checker { return &noChecker{} } -func (nc *noChecker) Check() error { return nil } diff --git a/functional/tester/checker_no_check.go b/functional/tester/checker_no_check.go new file mode 100644 index 000000000..e841703d7 --- /dev/null +++ b/functional/tester/checker_no_check.go @@ -0,0 +1,23 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import "github.com/coreos/etcd/functional/rpcpb" + +type noCheck struct{} + +func newNoChecker() Checker { return &noCheck{} } +func (nc *noCheck) Type() rpcpb.Checker { return rpcpb.Checker_NO_CHECK } +func (nc *noCheck) Check() error { return nil } diff --git a/functional/tester/checker_runner.go b/functional/tester/checker_runner.go new file mode 100644 index 000000000..6a09349d9 --- /dev/null +++ b/functional/tester/checker_runner.go @@ -0,0 +1,42 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import "github.com/coreos/etcd/functional/rpcpb" + +type runnerChecker struct { + ctype rpcpb.Checker + errc chan error +} + +func newRunnerChecker(errc chan error) Checker { + return &runnerChecker{ + ctype: rpcpb.Checker_RUNNER, + errc: errc, + } +} + +func (rc *runnerChecker) Type() rpcpb.Checker { + return rc.ctype +} + +func (rc *runnerChecker) Check() error { + select { + case err := <-rc.errc: + return err + default: + return nil + } +} diff --git a/functional/tester/cluster.go b/functional/tester/cluster.go index e5ff19c42..b606ddb60 100644 --- a/functional/tester/cluster.go +++ b/functional/tester/cluster.go @@ -56,7 +56,7 @@ type Cluster struct { rateLimiter *rate.Limiter stresser Stresser - checker Checker + checkers []Checker currentRevision int64 rd int @@ -118,7 +118,7 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) { int(clus.Tester.StressQPS), ) - clus.updateStresserChecker() + clus.setStresserChecker() return clus, nil } @@ -274,26 +274,49 @@ func (clus *Cluster) UpdateDelayLatencyMs() { } } -func (clus *Cluster) updateStresserChecker() { - cs := &compositeStresser{} +func (clus *Cluster) setStresserChecker() { + css := &compositeStresser{} + lss := []*leaseStresser{} + rss := []*runnerStresser{} for _, m := range clus.Members { - cs.stressers = append(cs.stressers, newStresser(clus, m)) - } - clus.stresser = cs - - if clus.Tester.ConsistencyCheck { - clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus)) - if schk := cs.Checker(); schk != nil { - clus.checker = newCompositeChecker([]Checker{clus.checker, schk}) + sss := newStresser(clus, m) + css.stressers = append(css.stressers, &compositeStresser{sss}) + for _, s := range sss { + if v, ok := s.(*leaseStresser); ok { + lss = append(lss, v) + clus.lg.Info("added lease stresser", zap.String("endpoint", m.EtcdClientEndpoint)) + } + if v, ok := s.(*runnerStresser); ok { + rss = append(rss, v) + clus.lg.Info("added lease stresser", zap.String("endpoint", m.EtcdClientEndpoint)) + } } - } else { - clus.checker = newNoChecker() } + clus.stresser = css + for _, cs := range clus.Tester.Checkers { + switch cs { + case "KV_HASH": + clus.checkers = append(clus.checkers, newKVHashChecker(clus.lg, hashRevGetter(clus))) + + case "LEASE_EXPIRE": + for _, ls := range lss { + clus.checkers = append(clus.checkers, newLeaseExpireChecker(ls)) + } + + case "RUNNER": + for _, rs := range rss { + clus.checkers = append(clus.checkers, newRunnerChecker(rs.errc)) + } + + case "NO_CHECK": + clus.checkers = append(clus.checkers, newNoChecker()) + } + } clus.lg.Info("updated stressers") } -func (clus *Cluster) checkConsistency() (err error) { +func (clus *Cluster) runCheckers() (err error) { defer func() { if err != nil { return @@ -307,15 +330,19 @@ func (clus *Cluster) checkConsistency() (err error) { } }() - if err = clus.checker.Check(); err != nil { - clus.lg.Warn( - "consistency check FAIL", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - zap.Error(err), - ) - return err + for _, chk := range clus.checkers { + if err = chk.Check(); err != nil { + clus.lg.Warn( + "consistency check FAIL", + zap.String("checker", chk.Type().String()), + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Error(err), + ) + return err + } } + clus.lg.Info( "consistency check ALL PASS", zap.Int("round", clus.rd), diff --git a/functional/tester/cluster_read_config.go b/functional/tester/cluster_read_config.go index cdef81de0..223265e66 100644 --- a/functional/tester/cluster_read_config.go +++ b/functional/tester/cluster_read_config.go @@ -336,9 +336,14 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) { } } - for _, v := range clus.Tester.StressTypes { - if _, ok := rpcpb.StressType_value[v]; !ok { - return nil, fmt.Errorf("StressType is unknown; got %q", v) + for _, v := range clus.Tester.Stressers { + if _, ok := rpcpb.Stresser_value[v]; !ok { + return nil, fmt.Errorf("Stresser is unknown; got %q", v) + } + } + for _, v := range clus.Tester.Checkers { + if _, ok := rpcpb.Checker_value[v]; !ok { + return nil, fmt.Errorf("Checker is unknown; got %q", v) } } diff --git a/functional/tester/cluster_run.go b/functional/tester/cluster_run.go index e406fb1be..f016d9088 100644 --- a/functional/tester/cluster_run.go +++ b/functional/tester/cluster_run.go @@ -237,7 +237,7 @@ func (clus *Cluster) doRound() error { zap.Int("case-total", len(clus.cases)), zap.String("desc", fa.Desc()), ) - if err := clus.checkConsistency(); err != nil { + if err := clus.runCheckers(); err != nil { return fmt.Errorf("consistency check error (%v)", err) } @@ -362,6 +362,6 @@ func (clus *Cluster) cleanup() error { return err } - clus.updateStresserChecker() + clus.setStresserChecker() return nil } diff --git a/functional/tester/cluster_test.go b/functional/tester/cluster_test.go index 5eef6fd63..e3ab8a135 100644 --- a/functional/tester/cluster_test.go +++ b/functional/tester/cluster_test.go @@ -190,7 +190,6 @@ func Test_read(t *testing.T) { UpdatedDelayLatencyMs: 5000, RoundLimit: 1, ExitOnCaseFail: true, - ConsistencyCheck: true, EnablePprof: true, CaseDelayMs: 7000, CaseShuffle: true, @@ -230,7 +229,8 @@ func Test_read(t *testing.T) { FailpointCommands: []string{`panic("etcd-tester")`}, RunnerExecPath: "./bin/etcd-runner", ExternalExecPath: "", - StressTypes: []string{"KV", "LEASE"}, + Stressers: []string{"KV", "LEASE"}, + Checkers: []string{"KV_HASH", "LEASE_EXPIRE"}, StressKeySize: 100, StressKeySizeLarge: 32769, StressKeySuffixRange: 250000, diff --git a/functional/tester/stress.go b/functional/tester/stresser.go similarity index 87% rename from functional/tester/stress.go rename to functional/tester/stresser.go index 7671853b3..7753bf8ea 100644 --- a/functional/tester/stress.go +++ b/functional/tester/stresser.go @@ -33,14 +33,12 @@ type Stresser interface { Close() map[string]int // ModifiedKeys reports the number of keys created and deleted by stresser ModifiedKeys() int64 - // Checker returns an invariant checker for after the stresser is canceled. - Checker() Checker } // newStresser creates stresser from a comma separated list of stresser types. -func newStresser(clus *Cluster, m *rpcpb.Member) Stresser { - stressers := make([]Stresser, len(clus.Tester.StressTypes)) - for i, stype := range clus.Tester.StressTypes { +func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { + stressers = make([]Stresser, len(clus.Tester.Stressers)) + for i, stype := range clus.Tester.Stressers { clus.lg.Info( "creating stresser", zap.String("type", stype), @@ -52,7 +50,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser { // TODO: Too intensive stressing clients can panic etcd member with // 'out of memory' error. Put rate limits in server side. stressers[i] = &keyStresser{ - stype: rpcpb.StressType_KV, + stype: rpcpb.Stresser_KV, lg: clus.lg, m: m, keySize: int(clus.Tester.StressKeySize), @@ -66,7 +64,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser { case "LEASE": stressers[i] = &leaseStresser{ - stype: rpcpb.StressType_LEASE, + stype: rpcpb.Stresser_LEASE, lg: clus.lg, m: m, numLeases: 10, // TODO: configurable @@ -86,7 +84,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser { "--req-rate", fmt.Sprintf("%v", reqRate), } stressers[i] = newRunnerStresser( - rpcpb.StressType_ELECTION_RUNNER, + rpcpb.Stresser_ELECTION_RUNNER, clus.lg, clus.Tester.RunnerExecPath, args, @@ -107,7 +105,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser { "--req-rate", fmt.Sprintf("%v", reqRate), } stressers[i] = newRunnerStresser( - rpcpb.StressType_WATCH_RUNNER, + rpcpb.Stresser_WATCH_RUNNER, clus.lg, clus.Tester.RunnerExecPath, args, @@ -126,7 +124,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser { "--req-rate", fmt.Sprintf("%v", reqRate), } stressers[i] = newRunnerStresser( - rpcpb.StressType_LOCK_RACER_RUNNER, + rpcpb.Stresser_LOCK_RACER_RUNNER, clus.lg, clus.Tester.RunnerExecPath, args, @@ -141,7 +139,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser { "--endpoints", m.EtcdClientEndpoint, } stressers[i] = newRunnerStresser( - rpcpb.StressType_LEASE_RUNNER, + rpcpb.Stresser_LEASE_RUNNER, clus.lg, clus.Tester.RunnerExecPath, args, @@ -150,5 +148,5 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser { ) } } - return &compositeStresser{stressers} + return stressers } diff --git a/functional/tester/stress_composite.go b/functional/tester/stresser_composite.go similarity index 87% rename from functional/tester/stress_composite.go rename to functional/tester/stresser_composite.go index c19f764ff..6492458a2 100644 --- a/functional/tester/stress_composite.go +++ b/functional/tester/stresser_composite.go @@ -74,16 +74,3 @@ func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) { } return modifiedKey } - -func (cs *compositeStresser) Checker() Checker { - var chks []Checker - for _, s := range cs.stressers { - if chk := s.Checker(); chk != nil { - chks = append(chks, chk) - } - } - if len(chks) == 0 { - return nil - } - return newCompositeChecker(chks) -} diff --git a/functional/tester/stress_key.go b/functional/tester/stresser_key.go similarity index 99% rename from functional/tester/stress_key.go rename to functional/tester/stresser_key.go index 509748b8a..2fc1bf2b0 100644 --- a/functional/tester/stress_key.go +++ b/functional/tester/stresser_key.go @@ -35,7 +35,7 @@ import ( ) type keyStresser struct { - stype rpcpb.StressType + stype rpcpb.Stresser lg *zap.Logger m *rpcpb.Member @@ -204,8 +204,6 @@ func (s *keyStresser) ModifiedKeys() int64 { return atomic.LoadInt64(&s.atomicModifiedKeys) } -func (s *keyStresser) Checker() Checker { return nil } - type stressFunc func(ctx context.Context) (err error, modifiedKeys int64) type stressEntry struct { diff --git a/functional/tester/stress_lease.go b/functional/tester/stresser_lease.go similarity index 99% rename from functional/tester/stress_lease.go rename to functional/tester/stresser_lease.go index c3797f47f..8510a0765 100644 --- a/functional/tester/stress_lease.go +++ b/functional/tester/stresser_lease.go @@ -38,7 +38,7 @@ const ( ) type leaseStresser struct { - stype rpcpb.StressType + stype rpcpb.Stresser lg *zap.Logger m *rpcpb.Member @@ -485,7 +485,3 @@ func (ls *leaseStresser) Close() map[string]int { func (ls *leaseStresser) ModifiedKeys() int64 { return atomic.LoadInt64(&ls.atomicModifiedKey) } - -func (ls *leaseStresser) Checker() Checker { - return &leaseChecker{lg: ls.lg, m: ls.m, ls: ls} -} diff --git a/functional/tester/stress_runner.go b/functional/tester/stresser_runner.go similarity index 94% rename from functional/tester/stress_runner.go rename to functional/tester/stresser_runner.go index d52b94cb8..7a3da4259 100644 --- a/functional/tester/stress_runner.go +++ b/functional/tester/stresser_runner.go @@ -27,7 +27,7 @@ import ( ) type runnerStresser struct { - stype rpcpb.StressType + stype rpcpb.Stresser lg *zap.Logger cmd *exec.Cmd @@ -41,7 +41,7 @@ type runnerStresser struct { } func newRunnerStresser( - stype rpcpb.StressType, + stype rpcpb.Stresser, lg *zap.Logger, cmdStr string, args []string, @@ -115,7 +115,3 @@ func (rs *runnerStresser) Close() map[string]int { func (rs *runnerStresser) ModifiedKeys() int64 { return 1 } - -func (rs *runnerStresser) Checker() Checker { - return &runnerChecker{rs.errc} -}