tests: Refactor how linearizability test components are run in parallel

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-01-02 09:08:15 +01:00
parent 6821e226dd
commit 8a9f848d33

View File

@ -25,6 +25,7 @@ import (
"github.com/anishathalye/porcupine" "github.com/anishathalye/porcupine"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/e2e"
@ -84,51 +85,52 @@ func TestLinearizability(t *testing.T) {
} }
for _, tc := range tcs { for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
failpoint := FailpointConfig{ ctx := context.Background()
failpoint: tc.failpoint, clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&tc.config))
count: 1,
retries: 3,
waitBetweenTriggers: waitBetweenFailpointTriggers,
}
traffic := trafficConfig{
minimalQPS: minimalQPS,
maximalQPS: maximalQPS,
clientCount: 8,
traffic: DefaultTraffic,
}
testLinearizability(context.Background(), t, tc.config, failpoint, traffic)
})
}
}
func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) {
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer clus.Close() defer clus.Close()
trafficCtx, trafficCancel := context.WithCancel(ctx) operations, events := testLinearizability(ctx, t, clus, FailpointConfig{
go func() { failpoint: tc.failpoint,
defer trafficCancel() count: 1,
triggerFailpoints(ctx, t, clus, failpoint) retries: 3,
// Wait second to collect traffic after triggering last failpoint. waitBetweenTriggers: waitBetweenFailpointTriggers,
time.Sleep(time.Second) }, trafficConfig{
}() minimalQPS: minimalQPS,
watchCtx, watchCancel := context.WithCancel(ctx) maximalQPS: maximalQPS,
var operations []porcupine.Operation clientCount: 8,
go func() { traffic: DefaultTraffic,
defer watchCancel() })
operations = simulateTraffic(trafficCtx, t, clus, traffic)
// Wait second to collect watch events after all traffic was sent.
time.Sleep(time.Second)
}()
events := collectClusterWatchEvents(watchCtx, t, clus)
err = clus.Stop()
if err != nil {
t.Error(err)
}
validateEventsMatch(t, events) validateEventsMatch(t, events)
checkOperationsAndPersistResults(t, operations, clus) checkOperationsAndPersistResults(t, operations, clus)
})
}
}
func testLinearizability(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, events [][]watchEvent) {
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
g := errgroup.Group{}
trafficCtx, trafficCancel := context.WithCancel(ctx)
g.Go(func() error {
triggerFailpoints(ctx, t, clus, failpoint)
time.Sleep(time.Second)
trafficCancel()
return nil
})
watchCtx, watchCancel := context.WithCancel(ctx)
g.Go(func() error {
operations = simulateTraffic(trafficCtx, t, clus, traffic)
time.Sleep(time.Second)
watchCancel()
return nil
})
g.Go(func() error {
events = collectClusterWatchEvents(watchCtx, t, clus)
return nil
})
g.Wait()
return operations, events
} }
func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) { func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) {