Merge pull request #15869 from serathius/robustness-watch-config

tests/robustness: Move request progress field from traffic to watch c…
This commit is contained in:
Marek Siarkowicz 2023-05-10 13:05:04 +02:00 committed by GitHub
commit 2bb9930ffa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 79 deletions

View File

@ -33,6 +33,8 @@ import (
const ( const (
triggerTimeout = time.Minute triggerTimeout = time.Minute
waitBetweenFailpointTriggers = time.Second waitBetweenFailpointTriggers = time.Second
failpointInjectionsCount = 1
failpointInjectionsRetries = 3
) )
var ( var (
@ -77,7 +79,7 @@ var (
}} }}
) )
func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) { func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint) {
ctx, cancel := context.WithTimeout(ctx, triggerTimeout) ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel() defer cancel()
@ -85,22 +87,22 @@ func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e
successes := 0 successes := 0
failures := 0 failures := 0
for _, proc := range clus.Procs { for _, proc := range clus.Procs {
if !config.failpoint.Available(*clus.Cfg, proc) { if !failpoint.Available(*clus.Cfg, proc) {
t.Errorf("Failpoint %q not available on %s", config.failpoint.Name(), proc.Config().Name) t.Errorf("Failpoint %q not available on %s", failpoint.Name(), proc.Config().Name)
return return
} }
} }
for successes < config.count && failures < config.retries { for successes < failpointInjectionsCount && failures < failpointInjectionsRetries {
time.Sleep(config.waitBetweenTriggers) time.Sleep(waitBetweenFailpointTriggers)
lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", config.failpoint.Name())) lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil { if err = verifyClusterHealth(ctx, t, clus); err != nil {
t.Errorf("failed to verify cluster health before failpoint injection, err: %v", err) t.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
return return
} }
lg.Info("Triggering failpoint", zap.String("failpoint", config.failpoint.Name())) lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
err = config.failpoint.Inject(ctx, t, lg, clus) err = failpoint.Inject(ctx, t, lg, clus)
if err != nil { if err != nil {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -108,12 +110,12 @@ func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e
return return
default: default:
} }
lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err)) lg.Info("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
failures++ failures++
continue continue
} }
lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", config.failpoint.Name())) lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil { if err = verifyClusterHealth(ctx, t, clus); err != nil {
t.Errorf("failed to verify cluster health after failpoint injection, err: %v", err) t.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
return return
@ -121,7 +123,7 @@ func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e
successes++ successes++
} }
if successes < config.count || failures >= config.retries { if successes < failpointInjectionsCount || failures >= failpointInjectionsRetries {
t.Errorf("failed to trigger failpoints enough times, err: %v", err) t.Errorf("failed to trigger failpoints enough times, err: %v", err)
} }

View File

@ -36,19 +36,13 @@ func TestRobustness(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed checking etcd version binary, binary: %q, err: %v", e2e.BinPath.Etcd, err) t.Fatalf("Failed checking etcd version binary, binary: %q, err: %v", e2e.BinPath.Etcd, err)
} }
type scenario struct { scenarios := []testScenario{}
name string
failpoint Failpoint
config e2e.EtcdProcessClusterConfig
traffic *traffic.Config
}
scenarios := []scenario{}
for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} { for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} {
scenarios = append(scenarios, scenario{ scenarios = append(scenarios, testScenario{
name: "ClusterOfSize1/" + traffic.Name, name: "ClusterOfSize1/" + traffic.Name,
failpoint: RandomFailpoint, failpoint: RandomFailpoint,
traffic: &traffic, traffic: &traffic,
config: *e2e.NewConfig( cluster: *e2e.NewConfig(
e2e.WithClusterSize(1), e2e.WithClusterSize(1),
e2e.WithSnapshotCount(100), e2e.WithSnapshotCount(100),
e2e.WithGoFailEnabled(true), e2e.WithGoFailEnabled(true),
@ -67,51 +61,53 @@ func TestRobustness(t *testing.T) {
if !v.LessThan(version.V3_6) { if !v.LessThan(version.V3_6) {
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100)) clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100))
} }
scenarios = append(scenarios, scenario{ scenarios = append(scenarios, testScenario{
name: "ClusterOfSize3/" + traffic.Name, name: "ClusterOfSize3/" + traffic.Name,
failpoint: RandomFailpoint, failpoint: RandomFailpoint,
traffic: &traffic, traffic: &traffic,
config: *e2e.NewConfig(clusterOfSize3Options...), cluster: *e2e.NewConfig(clusterOfSize3Options...),
}) })
} }
scenarios = append(scenarios, scenario{ scenarios = append(scenarios, testScenario{
name: "Issue14370", name: "Issue14370",
failpoint: RaftBeforeSavePanic, failpoint: RaftBeforeSavePanic,
config: *e2e.NewConfig( cluster: *e2e.NewConfig(
e2e.WithClusterSize(1), e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true), e2e.WithGoFailEnabled(true),
), ),
}) })
scenarios = append(scenarios, scenario{ scenarios = append(scenarios, testScenario{
name: "Issue14685", name: "Issue14685",
failpoint: DefragBeforeCopyPanic, failpoint: DefragBeforeCopyPanic,
config: *e2e.NewConfig( cluster: *e2e.NewConfig(
e2e.WithClusterSize(1), e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true), e2e.WithGoFailEnabled(true),
), ),
}) })
scenarios = append(scenarios, scenario{ scenarios = append(scenarios, testScenario{
name: "Issue13766", name: "Issue13766",
failpoint: KillFailpoint, failpoint: KillFailpoint,
traffic: &traffic.HighTraffic, traffic: &traffic.HighTraffic,
config: *e2e.NewConfig( cluster: *e2e.NewConfig(
e2e.WithSnapshotCount(100), e2e.WithSnapshotCount(100),
), ),
}) })
scenarios = append(scenarios, scenario{ scenarios = append(scenarios, testScenario{
name: "Issue15220", name: "Issue15220",
failpoint: RandomFailpoint, failpoint: RandomFailpoint,
traffic: &traffic.ReqProgTraffic, watch: watchConfig{
config: *e2e.NewConfig( requestProgress: true,
},
cluster: *e2e.NewConfig(
e2e.WithClusterSize(1), e2e.WithClusterSize(1),
), ),
}) })
if v.Compare(version.V3_5) >= 0 { if v.Compare(version.V3_5) >= 0 {
scenarios = append(scenarios, scenario{ scenarios = append(scenarios, testScenario{
name: "Issue15271", name: "Issue15271",
failpoint: BlackholeUntilSnapshot, failpoint: BlackholeUntilSnapshot,
traffic: &traffic.HighTraffic, traffic: &traffic.HighTraffic,
config: *e2e.NewConfig( cluster: *e2e.NewConfig(
e2e.WithSnapshotCount(100), e2e.WithSnapshotCount(100),
e2e.WithPeerProxy(true), e2e.WithPeerProxy(true),
e2e.WithIsPeerTLS(true), e2e.WithIsPeerTLS(true),
@ -125,22 +121,25 @@ func TestRobustness(t *testing.T) {
t.Run(scenario.name, func(t *testing.T) { t.Run(scenario.name, func(t *testing.T) {
lg := zaptest.NewLogger(t) lg := zaptest.NewLogger(t)
scenario.config.Logger = lg scenario.cluster.Logger = lg
ctx := context.Background() ctx := context.Background()
testRobustness(ctx, t, lg, scenario.config, scenario.traffic, FailpointConfig{ testRobustness(ctx, t, lg, scenario)
failpoint: scenario.failpoint,
count: 1,
retries: 3,
waitBetweenTriggers: waitBetweenFailpointTriggers,
})
}) })
} }
} }
func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2e.EtcdProcessClusterConfig, traffic *traffic.Config, failpoint FailpointConfig) { type testScenario struct {
name string
failpoint Failpoint
cluster e2e.EtcdProcessClusterConfig
traffic *traffic.Config
watch watchConfig
}
func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) {
r := report{lg: lg} r := report{lg: lg}
var err error var err error
r.clus, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config)) r.clus, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -153,11 +152,11 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
defer func() { defer func() {
r.Report(t, panicked) r.Report(t, panicked)
}() }()
r.operations, r.responses = runScenario(ctx, t, lg, r.clus, *traffic, failpoint) r.operations, r.responses = s.run(ctx, t, lg, r.clus)
forcestopCluster(r.clus) forcestopCluster(r.clus)
watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0 watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, r.clus, r.responses, traffic.RequestProgress || watchProgressNotifyEnabled) validateWatchResponses(t, r.clus, r.responses, s.watch.requestProgress || watchProgressNotifyEnabled)
r.events = watchEvents(r.responses) r.events = watchEvents(r.responses)
validateEventsMatch(t, r.events) validateEventsMatch(t, r.events)
@ -168,25 +167,25 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
panicked = false panicked = false
} }
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, tCfg traffic.Config, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) { func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]watchResponse) {
g := errgroup.Group{} g := errgroup.Group{}
finishTraffic := make(chan struct{}) finishTraffic := make(chan struct{})
g.Go(func() error { g.Go(func() error {
defer close(finishTraffic) defer close(finishTraffic)
injectFailpoints(ctx, t, lg, clus, failpoint) injectFailpoints(ctx, t, lg, clus, s.failpoint)
time.Sleep(time.Second) time.Sleep(time.Second)
return nil return nil
}) })
maxRevisionChan := make(chan int64, 1) maxRevisionChan := make(chan int64, 1)
g.Go(func() error { g.Go(func() error {
defer close(maxRevisionChan) defer close(maxRevisionChan)
operations = traffic.SimulateTraffic(ctx, t, lg, clus, tCfg, finishTraffic) operations = traffic.SimulateTraffic(ctx, t, lg, clus, *s.traffic, finishTraffic)
maxRevisionChan <- operationsMaxRevision(operations) maxRevisionChan <- operationsMaxRevision(operations)
return nil return nil
}) })
g.Go(func() error { g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, tCfg.RequestProgress) responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch)
return nil return nil
}) })
g.Wait() g.Wait()

View File

@ -48,27 +48,6 @@ var (
}, },
}, },
} }
ReqProgTraffic = Config{
Name: "RequestProgressTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
RequestProgress: true,
traffic: etcdTraffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 45},
{choice: LargePut, weight: 5},
{choice: Delete, weight: 10},
{choice: MultiOpTxn, weight: 10},
{choice: PutWithLease, weight: 10},
{choice: LeaseRevoke, weight: 10},
{choice: CompareAndSet, weight: 10},
},
},
}
HighTraffic = Config{ HighTraffic = Config{
Name: "HighTraffic", Name: "HighTraffic",
minimalQPS: 200, minimalQPS: 200,

View File

@ -95,7 +95,6 @@ type Config struct {
maximalQPS float64 maximalQPS float64
clientCount int clientCount int
traffic Traffic traffic Traffic
RequestProgress bool // Request progress notifications while watching this traffic
} }
type Traffic interface { type Traffic interface {

View File

@ -31,7 +31,7 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/model"
) )
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, requestProgress bool) [][]watchResponse { func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig) [][]watchResponse {
mux := sync.Mutex{} mux := sync.Mutex{}
var wg sync.WaitGroup var wg sync.WaitGroup
memberResponses := make([][]watchResponse, len(clus.Procs)) memberResponses := make([][]watchResponse, len(clus.Procs))
@ -52,7 +52,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
go func(i int, c *clientv3.Client) { go func(i int, c *clientv3.Client) {
defer wg.Done() defer wg.Done()
defer c.Close() defer c.Close()
responses := watchMember(ctx, t, c, memberChan, requestProgress) responses := watchMember(ctx, t, c, memberChan, cfg)
mux.Lock() mux.Lock()
memberResponses[i] = responses memberResponses[i] = responses
mux.Unlock() mux.Unlock()
@ -70,8 +70,12 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
return memberResponses return memberResponses
} }
type watchConfig struct {
requestProgress bool
}
// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. // watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, requestProgress bool) (resps []watchResponse) { func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig) (resps []watchResponse) {
var maxRevision int64 = 0 var maxRevision int64 = 0
var lastRevision int64 = 0 var lastRevision int64 = 0
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
@ -101,7 +105,7 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
} }
} }
case resp := <-watch: case resp := <-watch:
if requestProgress { if cfg.requestProgress {
c.RequestProgress(ctx) c.RequestProgress(ctx)
} }
if resp.Err() == nil { if resp.Err() == nil {