diff --git a/tools/functional-tester/build b/tools/functional-tester/build index 0786f7346..ef1168202 100755 --- a/tools/functional-tester/build +++ b/tools/functional-tester/build @@ -7,4 +7,5 @@ fi CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcd-agent ./cmd/tools/functional-tester/etcd-agent CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcd-tester ./cmd/tools/functional-tester/etcd-tester +CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcd-runner ./cmd/tools/functional-tester/etcd-runner diff --git a/tools/functional-tester/etcd-tester/checks.go b/tools/functional-tester/etcd-tester/checks.go index 7cff67c57..f3c5de9b4 100644 --- a/tools/functional-tester/etcd-tester/checks.go +++ b/tools/functional-tester/etcd-tester/checks.go @@ -245,6 +245,19 @@ func (cchecker *compositeChecker) Check() error { return errsToError(errs) } +type runnerChecker struct { + errc chan error +} + +func (rc *runnerChecker) Check() error { + select { + case err := <-rc.errc: + return err + default: + return nil + } +} + type noChecker struct{} func newNoChecker() Checker { return &noChecker{} } diff --git a/tools/functional-tester/etcd-tester/etcd_runner_stresser.go b/tools/functional-tester/etcd-tester/etcd_runner_stresser.go new file mode 100644 index 000000000..23636bf5a --- /dev/null +++ b/tools/functional-tester/etcd-tester/etcd_runner_stresser.go @@ -0,0 +1,97 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "io/ioutil" + "os/exec" + "syscall" + + "golang.org/x/time/rate" +) + +type runnerStresser struct { + cmd *exec.Cmd + cmdStr string + args []string + rl *rate.Limiter + reqRate int + + errc chan error + donec chan struct{} +} + +func newRunnerStresser(cmdStr string, args []string, rl *rate.Limiter, reqRate int) *runnerStresser { + rl.SetLimit(rl.Limit() - rate.Limit(reqRate)) + return &runnerStresser{ + cmdStr: cmdStr, + args: args, + rl: rl, + reqRate: reqRate, + errc: make(chan error, 1), + donec: make(chan struct{}), + } +} + +func (rs *runnerStresser) setupOnce() (err error) { + if rs.cmd != nil { + return nil + } + + rs.cmd = exec.Command(rs.cmdStr, rs.args...) + stderr, err := rs.cmd.StderrPipe() + if err != nil { + return err + } + + go func() { + defer close(rs.donec) + out, err := ioutil.ReadAll(stderr) + if err != nil { + rs.errc <- err + } else { + rs.errc <- fmt.Errorf("(%v %v) stderr %v", rs.cmdStr, rs.args, string(out)) + } + }() + + return rs.cmd.Start() +} + +func (rs *runnerStresser) Stress() (err error) { + if err = rs.setupOnce(); err != nil { + return err + } + return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT) +} + +func (rs *runnerStresser) Pause() { + syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP) +} + +func (rs *runnerStresser) Close() { + syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT) + rs.cmd.Wait() + <-rs.donec + rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate)) +} + +func (rs *runnerStresser) ModifiedKeys() int64 { + return 1 +} + +func (rs *runnerStresser) Checker() Checker { + return &runnerChecker{rs.errc} +} diff --git a/tools/functional-tester/etcd-tester/key_stresser.go b/tools/functional-tester/etcd-tester/key_stresser.go index e049a62c9..1e351b7e1 100644 --- a/tools/functional-tester/etcd-tester/key_stresser.go +++ b/tools/functional-tester/etcd-tester/key_stresser.go @@ -140,11 +140,16 @@ func (s *keyStresser) run(ctx context.Context) { } } -func (s *keyStresser) Cancel() { +func (s *keyStresser) Pause() { + s.Close() +} + +func (s *keyStresser) Close() { s.cancel() s.conn.Close() s.wg.Wait() - plog.Infof("keyStresser %q is canceled", s.Endpoint) + plog.Infof("keyStresser %q is closed", s.Endpoint) + } func (s *keyStresser) ModifiedKeys() int64 { diff --git a/tools/functional-tester/etcd-tester/lease_stresser.go b/tools/functional-tester/etcd-tester/lease_stresser.go index fe334952d..0767ccc2b 100644 --- a/tools/functional-tester/etcd-tester/lease_stresser.go +++ b/tools/functional-tester/etcd-tester/lease_stresser.go @@ -361,13 +361,17 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { return false, ls.ctx.Err() } -func (ls *leaseStresser) Cancel() { - plog.Debugf("lease stresser %q is canceling...", ls.endpoint) +func (ls *leaseStresser) Pause() { + ls.Close() +} + +func (ls *leaseStresser) Close() { + plog.Debugf("lease stresser %q is closing...", ls.endpoint) ls.cancel() ls.runWg.Wait() ls.aliveWg.Wait() ls.conn.Close() - plog.Infof("lease stresser %q is canceled", ls.endpoint) + plog.Infof("lease stresser %q is closed", ls.endpoint) } func (ls *leaseStresser) ModifiedKeys() int64 { diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index 265157c72..0c04347e0 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -51,7 +51,8 @@ func main() { stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.") schedCases := flag.String("schedule-cases", "", "test case schedule") consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)") - stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressers (keys, lease, v2keys, nop).") + stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressers (keys, lease, v2keys, nop, election-runner, watch-runner, lock-racer-runner, lease-runner).") + etcdRunnerPath := flag.String("etcd-runner", "", "specify a path of etcd runner binary") failureTypes := flag.String("failures", "default,failpoints", "specify failures (concat of \"default\" and \"failpoints\").") externalFailures := flag.String("external-failures", "", "specify a path of script for enabling/disabling an external fault injector") enablePprof := flag.Bool("enable-pprof", false, "true to enable pprof") @@ -120,6 +121,8 @@ func main() { keySuffixRange: int(*stressKeySuffixRange), numLeases: 10, keysPerLease: 10, + + etcdRunnerPath: *etcdRunnerPath, } t := &tester{ diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index ea8968d58..30e8d47d7 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -15,6 +15,7 @@ package main import ( + "fmt" "strings" "sync" "time" @@ -28,8 +29,10 @@ func init() { grpclog.SetLogger(plog) } type Stresser interface { // Stress starts to stress the etcd cluster Stress() error - // Cancel cancels the stress test on the etcd cluster - Cancel() + // Pause stops the stresser from sending requests to etcd. Resume by calling Stress. + Pause() + // Close releases all of the Stresser's resources. + Close() // ModifiedKeys reports the number of keys created and deleted by stresser ModifiedKeys() int64 // Checker returns an invariant checker for after the stresser is canceled. @@ -43,7 +46,8 @@ type nopStresser struct { } func (s *nopStresser) Stress() error { return nil } -func (s *nopStresser) Cancel() {} +func (s *nopStresser) Pause() {} +func (s *nopStresser) Close() {} func (s *nopStresser) ModifiedKeys() int64 { return 0 } @@ -59,7 +63,7 @@ func (cs *compositeStresser) Stress() error { for i, s := range cs.stressers { if err := s.Stress(); err != nil { for j := 0; j < i; j++ { - cs.stressers[i].Cancel() + cs.stressers[i].Close() } return err } @@ -67,13 +71,25 @@ func (cs *compositeStresser) Stress() error { return nil } -func (cs *compositeStresser) Cancel() { +func (cs *compositeStresser) Pause() { var wg sync.WaitGroup wg.Add(len(cs.stressers)) for i := range cs.stressers { go func(s Stresser) { defer wg.Done() - s.Cancel() + s.Pause() + }(cs.stressers[i]) + } + wg.Wait() +} + +func (cs *compositeStresser) Close() { + var wg sync.WaitGroup + wg.Add(len(cs.stressers)) + for i := range cs.stressers { + go func(s Stresser) { + defer wg.Done() + s.Close() }(cs.stressers[i]) } wg.Wait() @@ -108,6 +124,8 @@ type stressConfig struct { keysPerLease int rateLimiter *rate.Limiter + + etcdRunnerPath string } // NewStresser creates stresser from a comma separated list of stresser types. @@ -149,6 +167,49 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser { keysPerLease: sc.keysPerLease, rateLimiter: sc.rateLimiter, } + case "election-runner": + reqRate := 100 + args := []string{ + "election", + fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time + "--dial-timeout=10s", + "--endpoints", m.grpcAddr(), + "--total-client-connections=10", + "--rounds=0", // runs forever + "--req-rate", fmt.Sprintf("%v", reqRate), + } + return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate) + case "watch-runner": + reqRate := 100 + args := []string{ + "watcher", + "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time + "--total-keys=1", + "--total-prefixes=1", + "--watch-per-prefix=1", + "--endpoints", m.grpcAddr(), + "--rounds=0", // runs forever + "--req-rate", fmt.Sprintf("%v", reqRate), + } + return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate) + case "lock-racer-runner": + reqRate := 100 + args := []string{ + "lock-racer", + fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time + "--endpoints", m.grpcAddr(), + "--total-client-connections=10", + "--rounds=0", // runs forever + "--req-rate", fmt.Sprintf("%v", reqRate), + } + return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate) + case "lease-runner": + args := []string{ + "lease-renewer", + "--ttl=30", + "--endpoints", m.grpcAddr(), + } + return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0) default: plog.Panicf("unknown stresser type: %s\n", s) } diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index 5eecf292d..f1b49b45e 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -114,7 +114,7 @@ func (tt *tester) doRound(round int) error { return fmt.Errorf("recovery error: %v", err) } plog.Infof("%s recovered failure", tt.logPrefix()) - tt.cancelStresser() + tt.pauseStresser() plog.Infof("%s wait until cluster is healthy", tt.logPrefix()) if err := tt.cluster.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) @@ -161,7 +161,7 @@ func (tt *tester) checkConsistency() (err error) { } func (tt *tester) compact(rev int64, timeout time.Duration) (err error) { - tt.cancelStresser() + tt.pauseStresser() defer func() { if err == nil { err = tt.startStresser() @@ -217,7 +217,7 @@ func (tt *tester) cleanup() error { } caseFailedTotalCounter.WithLabelValues(desc).Inc() - tt.cancelStresser() + tt.closeStresser() if err := tt.cluster.Cleanup(); err != nil { plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err) return err @@ -229,10 +229,10 @@ func (tt *tester) cleanup() error { return tt.resetStressCheck() } -func (tt *tester) cancelStresser() { - plog.Infof("%s canceling the stressers...", tt.logPrefix()) - tt.stresser.Cancel() - plog.Infof("%s canceled stressers", tt.logPrefix()) +func (tt *tester) pauseStresser() { + plog.Infof("%s pausing the stressers...", tt.logPrefix()) + tt.stresser.Pause() + plog.Infof("%s paused stressers", tt.logPrefix()) } func (tt *tester) startStresser() (err error) { @@ -242,6 +242,12 @@ func (tt *tester) startStresser() (err error) { return err } +func (tt *tester) closeStresser() { + plog.Infof("%s closing the stressers...", tt.logPrefix()) + tt.stresser.Close() + plog.Infof("%s closed stressers", tt.logPrefix()) +} + func (tt *tester) resetStressCheck() error { plog.Infof("%s resetting stressers and checkers...", tt.logPrefix()) cs := &compositeStresser{} diff --git a/tools/functional-tester/etcd-tester/v2_stresser.go b/tools/functional-tester/etcd-tester/v2_stresser.go index 39fbd722c..620532e0c 100644 --- a/tools/functional-tester/etcd-tester/v2_stresser.go +++ b/tools/functional-tester/etcd-tester/v2_stresser.go @@ -93,11 +93,15 @@ func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) { } } -func (s *v2Stresser) Cancel() { +func (s *v2Stresser) Pause() { s.cancel() s.wg.Wait() } +func (s *v2Stresser) Close() { + s.Pause() +} + func (s *v2Stresser) ModifiedKeys() int64 { return atomic.LoadInt64(&s.atomicModifiedKey) }