diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index 358533e1f..070556c75 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -36,10 +36,11 @@ const ( var ( LowTraffic = trafficConfig{ - name: "LowTraffic", - minimalQPS: 100, - maximalQPS: 200, - clientCount: 8, + name: "LowTraffic", + minimalQPS: 100, + maximalQPS: 200, + clientCount: 8, + requestProgress: false, traffic: traffic{ keyCount: 10, leaseTTL: DefaultLeaseTTL, @@ -56,10 +57,11 @@ var ( }, } HighTraffic = trafficConfig{ - name: "HighTraffic", - minimalQPS: 200, - maximalQPS: 1000, - clientCount: 12, + name: "HighTraffic", + minimalQPS: 200, + maximalQPS: 1000, + clientCount: 12, + requestProgress: false, traffic: traffic{ keyCount: 10, largePutSize: 32769, @@ -71,6 +73,22 @@ var ( }, }, } + ReqProgTraffic = trafficConfig{ + name: "RequestProgressTraffic", + minimalQPS: 200, + maximalQPS: 1000, + clientCount: 12, + requestProgress: true, + traffic: traffic{ + keyCount: 10, + largePutSize: 8196, + leaseTTL: DefaultLeaseTTL, + writes: []requestChance{ + {operation: Put, chance: 95}, + {operation: LargePut, chance: 5}, + }, + }, + } defaultTraffic = LowTraffic trafficList = []trafficConfig{ LowTraffic, HighTraffic, @@ -141,6 +159,14 @@ func TestRobustness(t *testing.T) { e2e.WithSnapshotCount(100), ), }) + scenarios = append(scenarios, scenario{ + name: "Issue15220", + failpoint: RandomOneNodeClusterFailpoint, + traffic: &ReqProgTraffic, + config: *e2e.NewConfig( + e2e.WithClusterSize(1), + ), + }) snapshotOptions := []e2e.EPClusterOption{ e2e.WithGoFailEnabled(true), e2e.WithSnapshotCount(100), @@ -191,7 +217,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2 forcestopCluster(r.clus) watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0 - validateWatchResponses(t, r.responses, watchProgressNotifyEnabled) + validateWatchResponses(t, r.responses, traffic.requestProgress || watchProgressNotifyEnabled) r.events = watchEvents(r.responses) validateEventsMatch(t, r.events) @@ -218,7 +244,7 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et return nil }) g.Go(func() error { - responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan) + responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, traffic.requestProgress) return nil }) g.Wait() diff --git a/tests/robustness/traffic.go b/tests/robustness/traffic.go index 842989570..fa5889fc9 100644 --- a/tests/robustness/traffic.go +++ b/tests/robustness/traffic.go @@ -109,11 +109,12 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 } type trafficConfig struct { - name string - minimalQPS float64 - maximalQPS float64 - clientCount int - traffic Traffic + name string + minimalQPS float64 + maximalQPS float64 + clientCount int + traffic Traffic + requestProgress bool // Request progress notifications while watching this traffic } type Traffic interface { diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 9038d9618..79ba408a6 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -31,7 +31,7 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/model" ) -func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64) [][]watchResponse { +func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, requestProgress bool) [][]watchResponse { mux := sync.Mutex{} var wg sync.WaitGroup memberResponses := make([][]watchResponse, len(clus.Procs)) @@ -52,7 +52,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd go func(i int, c *clientv3.Client) { defer wg.Done() defer c.Close() - responses := watchMember(ctx, t, c, memberChan) + responses := watchMember(ctx, t, c, memberChan, requestProgress) mux.Lock() memberResponses[i] = responses mux.Unlock() @@ -71,7 +71,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd } // watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. -func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64) (resps []watchResponse) { +func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, requestProgress bool) (resps []watchResponse) { var maxRevision int64 = 0 var lastRevision int64 = 0 ctx, cancel := context.WithCancel(ctx) @@ -101,6 +101,9 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis } } case resp := <-watch: + if requestProgress { + c.RequestProgress(ctx) + } if resp.Err() == nil { resps = append(resps, watchResponse{resp, time.Now()}) } else if !resp.Canceled {