From 83739dc9cbb9af9eb2e2687041fded1499ecd703 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 4 Apr 2018 13:42:30 -0700 Subject: [PATCH] functional-tester/tester: clean up stresser logic for liveness mode Signed-off-by: Gyuho Lee --- tools/functional-tester/tester/cluster.go | 44 ------------- .../tester/cluster_tester.go | 64 +++++++++---------- 2 files changed, 32 insertions(+), 76 deletions(-) diff --git a/tools/functional-tester/tester/cluster.go b/tools/functional-tester/tester/cluster.go index 01d6d36c6..303b93f97 100644 --- a/tools/functional-tester/tester/cluster.go +++ b/tools/functional-tester/tester/cluster.go @@ -383,49 +383,6 @@ func (clus *Cluster) updateStresserChecker() { ) } -func (clus *Cluster) startStresser() (err error) { - clus.lg.Info( - "starting stressers", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - ) - err = clus.stresser.Stress() - clus.lg.Info( - "started stressers", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - ) - return err -} - -func (clus *Cluster) closeStresser() { - clus.lg.Info( - "closing stressers", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - ) - clus.stresser.Close() - clus.lg.Info( - "closed stressers", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - ) -} - -func (clus *Cluster) pauseStresser() { - clus.lg.Info( - "pausing stressers", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - ) - clus.stresser.Pause() - clus.lg.Info( - "paused stressers", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - ) -} - func (clus *Cluster) checkConsistency() (err error) { defer func() { if err != nil { @@ -438,7 +395,6 @@ func (clus *Cluster) checkConsistency() (err error) { ) return } - err = clus.startStresser() }() clus.lg.Info( diff --git a/tools/functional-tester/tester/cluster_tester.go b/tools/functional-tester/tester/cluster_tester.go index 032d44f0b..453556462 100644 --- a/tools/functional-tester/tester/cluster_tester.go +++ b/tools/functional-tester/tester/cluster_tester.go @@ -31,7 +31,6 @@ const compactQPS = 50000 // StartTester starts tester. func (clus *Cluster) StartTester() { // TODO: upate status - clus.startStresser() var preModifiedKey int64 for round := 0; round < int(clus.Tester.RoundLimit) || clus.Tester.RoundLimit == -1; round++ { @@ -123,19 +122,34 @@ func (clus *Cluster) doRound() error { clus.cs = i caseTotalCounter.WithLabelValues(fa.Desc()).Inc() + clus.lg.Info( + "failure case START", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", fa.Desc()), + ) clus.lg.Info("wait health before injecting failures") if err := clus.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) } - if fa.FailureCase() == rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS { - clus.lg.Info("pausing stresser after before injecting failures") - clus.pauseStresser() + stressStarted := false + if fa.FailureCase() != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS { + clus.lg.Info( + "starting stressers before injecting failures", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", fa.Desc()), + ) + if err := clus.stresser.Stress(); err != nil { + return fmt.Errorf("start stresser error: %v", err) + } + stressStarted = true } clus.lg.Info( - "injecting failure", + "injecting", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", fa.Desc()), @@ -143,18 +157,12 @@ func (clus *Cluster) doRound() error { if err := fa.Inject(clus); err != nil { return fmt.Errorf("injection error: %v", err) } - clus.lg.Info( - "injected failure", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - zap.String("desc", fa.Desc()), - ) // if run local, recovering server may conflict // with stressing client ports // TODO: use unix for local tests clus.lg.Info( - "recovering failure", + "recovering", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", fa.Desc()), @@ -162,30 +170,24 @@ func (clus *Cluster) doRound() error { if err := fa.Recover(clus); err != nil { return fmt.Errorf("recovery error: %v", err) } - clus.lg.Info( - "recovered failure", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - zap.String("desc", fa.Desc()), - ) - if fa.FailureCase() != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS { + if stressStarted { clus.lg.Info("pausing stresser after failure recovery, before wait health") - clus.pauseStresser() + clus.stresser.Pause() } - clus.lg.Info("wait health after recovering failures") + clus.lg.Info("wait health after recover") if err := clus.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) } - clus.lg.Info("check consistency after recovering failures") + clus.lg.Info("check consistency after recover") if err := clus.checkConsistency(); err != nil { return fmt.Errorf("tt.checkConsistency error (%v)", err) } clus.lg.Info( - "failure case passed", + "failure case PASS", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", fa.Desc()), @@ -216,14 +218,6 @@ func (clus *Cluster) updateRevision() error { } func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { - clus.lg.Info("pausing stresser before compact") - clus.pauseStresser() - defer func() { - if err == nil { - err = clus.startStresser() - } - }() - clus.lg.Info( "compacting storage", zap.Int64("current-revision", clus.currentRevision), @@ -285,7 +279,13 @@ func (clus *Cluster) cleanup() error { } caseFailedTotalCounter.WithLabelValues(desc).Inc() - clus.closeStresser() + clus.lg.Info( + "closing stressers before archiving failure data", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) + clus.stresser.Close() + if err := clus.FailArchive(); err != nil { clus.lg.Warn( "cleanup failed",