From dd248518d10b6c62065218f0ffa5758b414095ec Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 10 May 2023 11:42:57 +0200 Subject: [PATCH] tests/robustness: Move request progress field from traffic to watch config and pass testScenario to reduce number of arguments Signed-off-by: Marek Siarkowicz --- tests/robustness/failpoints.go | 24 ++++---- tests/robustness/linearizability_test.go | 73 ++++++++++++------------ tests/robustness/traffic/etcd.go | 21 ------- tests/robustness/traffic/traffic.go | 11 ++-- tests/robustness/watch.go | 12 ++-- 5 files changed, 62 insertions(+), 79 deletions(-) diff --git a/tests/robustness/failpoints.go b/tests/robustness/failpoints.go index a70a9fbb4..33c920a02 100644 --- a/tests/robustness/failpoints.go +++ b/tests/robustness/failpoints.go @@ -33,6 +33,8 @@ import ( const ( triggerTimeout = time.Minute waitBetweenFailpointTriggers = time.Second + failpointInjectionsCount = 1 + failpointInjectionsRetries = 3 ) var ( @@ -77,7 +79,7 @@ var ( }} ) -func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) { +func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint) { ctx, cancel := context.WithTimeout(ctx, triggerTimeout) defer cancel() @@ -85,22 +87,22 @@ func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e successes := 0 failures := 0 for _, proc := range clus.Procs { - if !config.failpoint.Available(*clus.Cfg, proc) { - t.Errorf("Failpoint %q not available on %s", config.failpoint.Name(), proc.Config().Name) + if !failpoint.Available(*clus.Cfg, proc) { + t.Errorf("Failpoint %q not available on %s", failpoint.Name(), proc.Config().Name) return } } - for successes < config.count && failures < config.retries { - time.Sleep(config.waitBetweenTriggers) + for successes < failpointInjectionsCount && failures < failpointInjectionsRetries { + time.Sleep(waitBetweenFailpointTriggers) - lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", config.failpoint.Name())) + lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", failpoint.Name())) if err = verifyClusterHealth(ctx, t, clus); err != nil { t.Errorf("failed to verify cluster health before failpoint injection, err: %v", err) return } - lg.Info("Triggering failpoint", zap.String("failpoint", config.failpoint.Name())) - err = config.failpoint.Inject(ctx, t, lg, clus) + lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name())) + err = failpoint.Inject(ctx, t, lg, clus) if err != nil { select { case <-ctx.Done(): @@ -108,12 +110,12 @@ func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e return default: } - lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err)) + lg.Info("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err)) failures++ continue } - lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", config.failpoint.Name())) + lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", failpoint.Name())) if err = verifyClusterHealth(ctx, t, clus); err != nil { t.Errorf("failed to verify cluster health after failpoint injection, err: %v", err) return @@ -121,7 +123,7 @@ func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e successes++ } - if successes < config.count || failures >= config.retries { + if successes < failpointInjectionsCount || failures >= failpointInjectionsRetries { t.Errorf("failed to trigger failpoints enough times, err: %v", err) } diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index a00846055..35b986981 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -36,19 +36,13 @@ func TestRobustness(t *testing.T) { if err != nil { t.Fatalf("Failed checking etcd version binary, binary: %q, err: %v", e2e.BinPath.Etcd, err) } - type scenario struct { - name string - failpoint Failpoint - config e2e.EtcdProcessClusterConfig - traffic *traffic.Config - } - scenarios := []scenario{} + scenarios := []testScenario{} for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} { - scenarios = append(scenarios, scenario{ + scenarios = append(scenarios, testScenario{ name: "ClusterOfSize1/" + traffic.Name, failpoint: RandomFailpoint, traffic: &traffic, - config: *e2e.NewConfig( + cluster: *e2e.NewConfig( e2e.WithClusterSize(1), e2e.WithSnapshotCount(100), e2e.WithGoFailEnabled(true), @@ -67,51 +61,53 @@ func TestRobustness(t *testing.T) { if !v.LessThan(version.V3_6) { clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100)) } - scenarios = append(scenarios, scenario{ + scenarios = append(scenarios, testScenario{ name: "ClusterOfSize3/" + traffic.Name, failpoint: RandomFailpoint, traffic: &traffic, - config: *e2e.NewConfig(clusterOfSize3Options...), + cluster: *e2e.NewConfig(clusterOfSize3Options...), }) } - scenarios = append(scenarios, scenario{ + scenarios = append(scenarios, testScenario{ name: "Issue14370", failpoint: RaftBeforeSavePanic, - config: *e2e.NewConfig( + cluster: *e2e.NewConfig( e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true), ), }) - scenarios = append(scenarios, scenario{ + scenarios = append(scenarios, testScenario{ name: "Issue14685", failpoint: DefragBeforeCopyPanic, - config: *e2e.NewConfig( + cluster: *e2e.NewConfig( e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true), ), }) - scenarios = append(scenarios, scenario{ + scenarios = append(scenarios, testScenario{ name: "Issue13766", failpoint: KillFailpoint, traffic: &traffic.HighTraffic, - config: *e2e.NewConfig( + cluster: *e2e.NewConfig( e2e.WithSnapshotCount(100), ), }) - scenarios = append(scenarios, scenario{ + scenarios = append(scenarios, testScenario{ name: "Issue15220", failpoint: RandomFailpoint, - traffic: &traffic.ReqProgTraffic, - config: *e2e.NewConfig( + watch: watchConfig{ + requestProgress: true, + }, + cluster: *e2e.NewConfig( e2e.WithClusterSize(1), ), }) if v.Compare(version.V3_5) >= 0 { - scenarios = append(scenarios, scenario{ + scenarios = append(scenarios, testScenario{ name: "Issue15271", failpoint: BlackholeUntilSnapshot, traffic: &traffic.HighTraffic, - config: *e2e.NewConfig( + cluster: *e2e.NewConfig( e2e.WithSnapshotCount(100), e2e.WithPeerProxy(true), e2e.WithIsPeerTLS(true), @@ -125,22 +121,25 @@ func TestRobustness(t *testing.T) { t.Run(scenario.name, func(t *testing.T) { lg := zaptest.NewLogger(t) - scenario.config.Logger = lg + scenario.cluster.Logger = lg ctx := context.Background() - testRobustness(ctx, t, lg, scenario.config, scenario.traffic, FailpointConfig{ - failpoint: scenario.failpoint, - count: 1, - retries: 3, - waitBetweenTriggers: waitBetweenFailpointTriggers, - }) + testRobustness(ctx, t, lg, scenario) }) } } -func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2e.EtcdProcessClusterConfig, traffic *traffic.Config, failpoint FailpointConfig) { +type testScenario struct { + name string + failpoint Failpoint + cluster e2e.EtcdProcessClusterConfig + traffic *traffic.Config + watch watchConfig +} + +func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) { r := report{lg: lg} var err error - r.clus, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config)) + r.clus, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster)) if err != nil { t.Fatal(err) } @@ -153,11 +152,11 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2 defer func() { r.Report(t, panicked) }() - r.operations, r.responses = runScenario(ctx, t, lg, r.clus, *traffic, failpoint) + r.operations, r.responses = s.run(ctx, t, lg, r.clus) forcestopCluster(r.clus) watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0 - validateWatchResponses(t, r.clus, r.responses, traffic.RequestProgress || watchProgressNotifyEnabled) + validateWatchResponses(t, r.clus, r.responses, s.watch.requestProgress || watchProgressNotifyEnabled) r.events = watchEvents(r.responses) validateEventsMatch(t, r.events) @@ -168,25 +167,25 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2 panicked = false } -func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, tCfg traffic.Config, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) { +func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]watchResponse) { g := errgroup.Group{} finishTraffic := make(chan struct{}) g.Go(func() error { defer close(finishTraffic) - injectFailpoints(ctx, t, lg, clus, failpoint) + injectFailpoints(ctx, t, lg, clus, s.failpoint) time.Sleep(time.Second) return nil }) maxRevisionChan := make(chan int64, 1) g.Go(func() error { defer close(maxRevisionChan) - operations = traffic.SimulateTraffic(ctx, t, lg, clus, tCfg, finishTraffic) + operations = traffic.SimulateTraffic(ctx, t, lg, clus, *s.traffic, finishTraffic) maxRevisionChan <- operationsMaxRevision(operations) return nil }) g.Go(func() error { - responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, tCfg.RequestProgress) + responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch) return nil }) g.Wait() diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 9e3233e5f..7f21fc0af 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -48,27 +48,6 @@ var ( }, }, } - ReqProgTraffic = Config{ - Name: "RequestProgressTraffic", - minimalQPS: 100, - maximalQPS: 200, - clientCount: 8, - RequestProgress: true, - traffic: etcdTraffic{ - keyCount: 10, - leaseTTL: DefaultLeaseTTL, - largePutSize: 32769, - writeChoices: []choiceWeight[etcdRequestType]{ - {choice: Put, weight: 45}, - {choice: LargePut, weight: 5}, - {choice: Delete, weight: 10}, - {choice: MultiOpTxn, weight: 10}, - {choice: PutWithLease, weight: 10}, - {choice: LeaseRevoke, weight: 10}, - {choice: CompareAndSet, weight: 10}, - }, - }, - } HighTraffic = Config{ Name: "HighTraffic", minimalQPS: 200, diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index bf4c8513e..c895ca6bb 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -90,12 +90,11 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 } type Config struct { - Name string - minimalQPS float64 - maximalQPS float64 - clientCount int - traffic Traffic - RequestProgress bool // Request progress notifications while watching this traffic + Name string + minimalQPS float64 + maximalQPS float64 + clientCount int + traffic Traffic } type Traffic interface { diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 89ad5bf48..e2f36b99f 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -31,7 +31,7 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/model" ) -func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, requestProgress bool) [][]watchResponse { +func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig) [][]watchResponse { mux := sync.Mutex{} var wg sync.WaitGroup memberResponses := make([][]watchResponse, len(clus.Procs)) @@ -52,7 +52,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd go func(i int, c *clientv3.Client) { defer wg.Done() defer c.Close() - responses := watchMember(ctx, t, c, memberChan, requestProgress) + responses := watchMember(ctx, t, c, memberChan, cfg) mux.Lock() memberResponses[i] = responses mux.Unlock() @@ -70,8 +70,12 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd return memberResponses } +type watchConfig struct { + requestProgress bool +} + // watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. -func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, requestProgress bool) (resps []watchResponse) { +func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig) (resps []watchResponse) { var maxRevision int64 = 0 var lastRevision int64 = 0 ctx, cancel := context.WithCancel(ctx) @@ -101,7 +105,7 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis } } case resp := <-watch: - if requestProgress { + if cfg.requestProgress { c.RequestProgress(ctx) } if resp.Err() == nil {