diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index c383ca012..442921e60 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -185,6 +185,7 @@ type EtcdProcessClusterConfig struct { WarningUnaryRequestDuration time.Duration ExperimentalWarningUnaryRequestDuration time.Duration PeerProxy bool + WatchProcessNotifyInterval time.Duration } func DefaultConfig() *EtcdProcessClusterConfig { @@ -336,6 +337,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit } } +func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval } +} + func WithPeerProxy(enabled bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.PeerProxy = enabled } } @@ -573,6 +578,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in if cfg.ExperimentalWarningUnaryRequestDuration != 0 { args = append(args, "--experimental-warning-unary-request-duration", cfg.ExperimentalWarningUnaryRequestDuration.String()) } + if cfg.WatchProcessNotifyInterval != 0 { + args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String()) + } if cfg.SnapshotCatchUpEntries > 0 { args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries)) } diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index e5e7b3786..5a0c771ad 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -103,6 +103,7 @@ func TestLinearizability(t *testing.T) { e2e.WithSnapshotCount(100), e2e.WithGoFailEnabled(true), e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints + e2e.WithWatchProcessNotifyInterval(100*time.Millisecond), ), }) scenarios = append(scenarios, scenario{ @@ -114,6 +115,7 @@ func TestLinearizability(t *testing.T) { e2e.WithPeerProxy(true), e2e.WithGoFailEnabled(true), e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints + e2e.WithWatchProcessNotifyInterval(100*time.Millisecond), ), }) } @@ -177,7 +179,8 @@ func TestLinearizability(t *testing.T) { waitBetweenTriggers: waitBetweenFailpointTriggers, }, *scenario.traffic) forcestopCluster(clus) - validateWatchResponses(t, watchResponses) + watchProgressNotifyEnabled := clus.Cfg.WatchProcessNotifyInterval != 0 + validateWatchResponses(t, watchResponses, watchProgressNotifyEnabled) longestHistory, remainingEvents := watchEventHistory(watchResponses) validateEventsMatch(t, longestHistory, remainingEvents) operations = patchOperationBasedOnWatchEvents(operations, longestHistory) diff --git a/tests/linearizability/watch.go b/tests/linearizability/watch.go index 2aaf59b18..d4fc5a8cb 100644 --- a/tests/linearizability/watch.go +++ b/tests/linearizability/watch.go @@ -56,6 +56,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger wg.Wait() return memberResponses } + func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps []watchResponse) { var lastRevision int64 = 0 for { @@ -64,7 +65,7 @@ func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps return resps default: } - for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) { + for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1), clientv3.WithProgressNotify()) { resps = append(resps, watchResponse{resp, time.Now()}) lastRevision = resp.Header.Revision if resp.Err() != nil { @@ -74,18 +75,22 @@ func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps } } -func validateWatchResponses(t *testing.T, responses [][]watchResponse) { +func validateWatchResponses(t *testing.T, responses [][]watchResponse, expectProgressNotify bool) { for _, memberResponses := range responses { - validateMemberWatchResponses(t, memberResponses) + validateMemberWatchResponses(t, memberResponses, expectProgressNotify) } } -func validateMemberWatchResponses(t *testing.T, responses []watchResponse) { +func validateMemberWatchResponses(t *testing.T, responses []watchResponse, expectProgressNotify bool) { + var gotProgressNotify = false var lastRevision int64 = 1 for _, resp := range responses { if resp.Header.Revision < lastRevision { t.Errorf("Revision should never decrease") } + if resp.IsProgressNotify() && resp.Header.Revision == lastRevision { + gotProgressNotify = true + } if resp.Header.Revision == lastRevision && len(resp.Events) != 0 { t.Errorf("Got two non-empty responses about same revision") } @@ -100,6 +105,9 @@ func validateMemberWatchResponses(t *testing.T, responses []watchResponse) { } lastRevision = resp.Header.Revision } + if gotProgressNotify != expectProgressNotify { + t.Errorf("Expected progress notify: %v, got: %v", expectProgressNotify, gotProgressNotify) + } } func toWatchEvents(responses []watchResponse) (events []watchEvent) {