functional-tester/tester: clean up stresser logic for liveness mode

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-04-04 13:42:30 -07:00
parent a43bd84631
commit 83739dc9cb
2 changed files with 32 additions and 76 deletions

View File

@ -383,49 +383,6 @@ func (clus *Cluster) updateStresserChecker() {
)
}
func (clus *Cluster) startStresser() (err error) {
clus.lg.Info(
"starting stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
err = clus.stresser.Stress()
clus.lg.Info(
"started stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
return err
}
func (clus *Cluster) closeStresser() {
clus.lg.Info(
"closing stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
clus.stresser.Close()
clus.lg.Info(
"closed stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
}
func (clus *Cluster) pauseStresser() {
clus.lg.Info(
"pausing stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
clus.stresser.Pause()
clus.lg.Info(
"paused stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
}
func (clus *Cluster) checkConsistency() (err error) {
defer func() {
if err != nil {
@ -438,7 +395,6 @@ func (clus *Cluster) checkConsistency() (err error) {
)
return
}
err = clus.startStresser()
}()
clus.lg.Info(

View File

@ -31,7 +31,6 @@ const compactQPS = 50000
// StartTester starts tester.
func (clus *Cluster) StartTester() {
// TODO: upate status
clus.startStresser()
var preModifiedKey int64
for round := 0; round < int(clus.Tester.RoundLimit) || clus.Tester.RoundLimit == -1; round++ {
@ -123,19 +122,34 @@ func (clus *Cluster) doRound() error {
clus.cs = i
caseTotalCounter.WithLabelValues(fa.Desc()).Inc()
clus.lg.Info(
"failure case START",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", fa.Desc()),
)
clus.lg.Info("wait health before injecting failures")
if err := clus.WaitHealth(); err != nil {
return fmt.Errorf("wait full health error: %v", err)
}
if fa.FailureCase() == rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
clus.lg.Info("pausing stresser after before injecting failures")
clus.pauseStresser()
stressStarted := false
if fa.FailureCase() != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
clus.lg.Info(
"starting stressers before injecting failures",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", fa.Desc()),
)
if err := clus.stresser.Stress(); err != nil {
return fmt.Errorf("start stresser error: %v", err)
}
stressStarted = true
}
clus.lg.Info(
"injecting failure",
"injecting",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", fa.Desc()),
@ -143,18 +157,12 @@ func (clus *Cluster) doRound() error {
if err := fa.Inject(clus); err != nil {
return fmt.Errorf("injection error: %v", err)
}
clus.lg.Info(
"injected failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", fa.Desc()),
)
// if run local, recovering server may conflict
// with stressing client ports
// TODO: use unix for local tests
clus.lg.Info(
"recovering failure",
"recovering",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", fa.Desc()),
@ -162,30 +170,24 @@ func (clus *Cluster) doRound() error {
if err := fa.Recover(clus); err != nil {
return fmt.Errorf("recovery error: %v", err)
}
clus.lg.Info(
"recovered failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", fa.Desc()),
)
if fa.FailureCase() != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
if stressStarted {
clus.lg.Info("pausing stresser after failure recovery, before wait health")
clus.pauseStresser()
clus.stresser.Pause()
}
clus.lg.Info("wait health after recovering failures")
clus.lg.Info("wait health after recover")
if err := clus.WaitHealth(); err != nil {
return fmt.Errorf("wait full health error: %v", err)
}
clus.lg.Info("check consistency after recovering failures")
clus.lg.Info("check consistency after recover")
if err := clus.checkConsistency(); err != nil {
return fmt.Errorf("tt.checkConsistency error (%v)", err)
}
clus.lg.Info(
"failure case passed",
"failure case PASS",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", fa.Desc()),
@ -216,14 +218,6 @@ func (clus *Cluster) updateRevision() error {
}
func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
clus.lg.Info("pausing stresser before compact")
clus.pauseStresser()
defer func() {
if err == nil {
err = clus.startStresser()
}
}()
clus.lg.Info(
"compacting storage",
zap.Int64("current-revision", clus.currentRevision),
@ -285,7 +279,13 @@ func (clus *Cluster) cleanup() error {
}
caseFailedTotalCounter.WithLabelValues(desc).Inc()
clus.closeStresser()
clus.lg.Info(
"closing stressers before archiving failure data",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
clus.stresser.Close()
if err := clus.FailArchive(); err != nil {
clus.lg.Warn(
"cleanup failed",