Merge pull request #9529 from gyuho/fff

functional-tester: expect no client error in liveness mode
This commit is contained in:
Gyuho Lee 2018-04-04 18:18:07 -07:00 committed by GitHub
commit 95119a769e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 16 deletions

View File

@ -135,7 +135,8 @@ func (clus *Cluster) doRound() error {
}
stressStarted := false
if fa.FailureCase() != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
fcase := fa.FailureCase()
if fcase != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
clus.lg.Info(
"starting stressers before injecting failures",
zap.Int("round", clus.rd),
@ -173,7 +174,21 @@ func (clus *Cluster) doRound() error {
if stressStarted {
clus.lg.Info("pausing stresser after failure recovery, before wait health")
clus.stresser.Pause()
ems := clus.stresser.Pause()
if fcase == rpcpb.FailureCase_NO_FAIL_WITH_STRESS && len(ems) > 0 {
ess := make([]string, 0, len(ems))
cnt := 0
for k, v := range ems {
ess = append(ess, fmt.Sprintf("%s (count %d)", k, v))
cnt += v
}
clus.lg.Warn(
"expected no errors",
zap.String("desc", fa.Desc()),
zap.Strings("errors", ess),
)
return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess)
}
}
clus.lg.Info("wait health after recover")

View File

@ -28,9 +28,9 @@ type Stresser interface {
// Stress starts to stress the etcd cluster
Stress() error
// Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
Pause()
Pause() map[string]int
// Close releases all of the Stresser's resources.
Close()
Close() map[string]int
// ModifiedKeys reports the number of keys created and deleted by stresser
ModifiedKeys() int64
// Checker returns an invariant checker for after the stresser is canceled.

View File

@ -34,28 +34,38 @@ func (cs *compositeStresser) Stress() error {
return nil
}
func (cs *compositeStresser) Pause() {
func (cs *compositeStresser) Pause() (ems map[string]int) {
ems = make(map[string]int)
var wg sync.WaitGroup
wg.Add(len(cs.stressers))
for i := range cs.stressers {
go func(s Stresser) {
defer wg.Done()
s.Pause()
errs := s.Pause()
for k, v := range errs {
ems[k] += v
}
}(cs.stressers[i])
}
wg.Wait()
return ems
}
func (cs *compositeStresser) Close() {
func (cs *compositeStresser) Close() (ems map[string]int) {
ems = make(map[string]int)
var wg sync.WaitGroup
wg.Add(len(cs.stressers))
for i := range cs.stressers {
go func(s Stresser) {
defer wg.Done()
s.Close()
errs := s.Close()
for k, v := range errs {
ems[k] += v
}
}(cs.stressers[i])
}
wg.Wait()
return ems
}
func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {

View File

@ -53,6 +53,10 @@ type keyStresser struct {
cancel func()
cli *clientv3.Client
emu sync.RWMutex
ems map[string]int
paused bool
// atomicModifiedKeys records the number of keys created and deleted by the stresser.
atomicModifiedKeys int64
@ -89,6 +93,10 @@ func (s *keyStresser) Stress() error {
}
s.stressTable = createStressTable(stressEntries)
s.emu.Lock()
s.paused = false
s.ems = make(map[string]int, 100)
s.emu.Unlock()
for i := 0; i < s.clientsN; i++ {
go s.run()
}
@ -154,14 +162,21 @@ func (s *keyStresser) run() {
)
return
}
// only record errors before pausing stressers
s.emu.Lock()
if !s.paused {
s.ems[err.Error()]++
}
s.emu.Unlock()
}
}
func (s *keyStresser) Pause() {
s.Close()
func (s *keyStresser) Pause() map[string]int {
return s.Close()
}
func (s *keyStresser) Close() {
func (s *keyStresser) Close() map[string]int {
s.cancel()
s.cli.Close()
s.wg.Wait()
@ -170,6 +185,13 @@ func (s *keyStresser) Close() {
"key stresser is closed",
zap.String("endpoint", s.m.EtcdClientEndpoint),
)
s.emu.Lock()
s.paused = true
ess := s.ems
s.ems = make(map[string]int, 100)
s.emu.Unlock()
return ess
}
func (s *keyStresser) ModifiedKeys() int64 {

View File

@ -447,11 +447,11 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
return false, ls.ctx.Err()
}
func (ls *leaseStresser) Pause() {
ls.Close()
func (ls *leaseStresser) Pause() map[string]int {
return ls.Close()
}
func (ls *leaseStresser) Close() {
func (ls *leaseStresser) Close() map[string]int {
ls.lg.Info(
"lease stresser is closing",
zap.String("endpoint", ls.m.EtcdClientEndpoint),
@ -464,6 +464,7 @@ func (ls *leaseStresser) Close() {
"lease stresser is closed",
zap.String("endpoint", ls.m.EtcdClientEndpoint),
)
return nil
}
func (ls *leaseStresser) ModifiedKeys() int64 {

View File

@ -77,15 +77,17 @@ func (rs *runnerStresser) Stress() (err error) {
return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT)
}
func (rs *runnerStresser) Pause() {
func (rs *runnerStresser) Pause() map[string]int {
syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP)
return nil
}
func (rs *runnerStresser) Close() {
func (rs *runnerStresser) Close() map[string]int {
syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT)
rs.cmd.Wait()
<-rs.donec
rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate))
return nil
}
func (rs *runnerStresser) ModifiedKeys() int64 {