From 42a2643df9f0508292412e2a5be88a804770bf9c Mon Sep 17 00:00:00 2001 From: Peter Wortmann Date: Tue, 28 Mar 2023 22:03:37 +0100 Subject: [PATCH] tests/robustness: Reproduce issue #15220 This issue is somewhat easily reproduced simply by bombarding the server with requests for progress notifications, which eventually leads to one being delivered ahead of the payload message. This is then caught by the watch response validation code previously added by Marek Siarkowicz. Signed-off-by: Peter Wortmann --- tests/robustness/linearizability_test.go | 46 ++++++++++++++++++------ tests/robustness/traffic.go | 11 +++--- tests/robustness/watch.go | 9 +++-- 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index 358533e1f..070556c75 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -36,10 +36,11 @@ const ( var ( LowTraffic = trafficConfig{ - name: "LowTraffic", - minimalQPS: 100, - maximalQPS: 200, - clientCount: 8, + name: "LowTraffic", + minimalQPS: 100, + maximalQPS: 200, + clientCount: 8, + requestProgress: false, traffic: traffic{ keyCount: 10, leaseTTL: DefaultLeaseTTL, @@ -56,10 +57,11 @@ var ( }, } HighTraffic = trafficConfig{ - name: "HighTraffic", - minimalQPS: 200, - maximalQPS: 1000, - clientCount: 12, + name: "HighTraffic", + minimalQPS: 200, + maximalQPS: 1000, + clientCount: 12, + requestProgress: false, traffic: traffic{ keyCount: 10, largePutSize: 32769, @@ -71,6 +73,22 @@ var ( }, }, } + ReqProgTraffic = trafficConfig{ + name: "RequestProgressTraffic", + minimalQPS: 200, + maximalQPS: 1000, + clientCount: 12, + requestProgress: true, + traffic: traffic{ + keyCount: 10, + largePutSize: 8196, + leaseTTL: DefaultLeaseTTL, + writes: []requestChance{ + {operation: Put, chance: 95}, + {operation: LargePut, chance: 5}, + }, + }, + } defaultTraffic = LowTraffic trafficList = []trafficConfig{ LowTraffic, HighTraffic, @@ -141,6 +159,14 @@ func TestRobustness(t *testing.T) { e2e.WithSnapshotCount(100), ), }) + scenarios = append(scenarios, scenario{ + name: "Issue15220", + failpoint: RandomOneNodeClusterFailpoint, + traffic: &ReqProgTraffic, + config: *e2e.NewConfig( + e2e.WithClusterSize(1), + ), + }) snapshotOptions := []e2e.EPClusterOption{ e2e.WithGoFailEnabled(true), e2e.WithSnapshotCount(100), @@ -191,7 +217,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2 forcestopCluster(r.clus) watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0 - validateWatchResponses(t, r.responses, watchProgressNotifyEnabled) + validateWatchResponses(t, r.responses, traffic.requestProgress || watchProgressNotifyEnabled) r.events = watchEvents(r.responses) validateEventsMatch(t, r.events) @@ -218,7 +244,7 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et return nil }) g.Go(func() error { - responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan) + responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, traffic.requestProgress) return nil }) g.Wait() diff --git a/tests/robustness/traffic.go b/tests/robustness/traffic.go index 842989570..fa5889fc9 100644 --- a/tests/robustness/traffic.go +++ b/tests/robustness/traffic.go @@ -109,11 +109,12 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 } type trafficConfig struct { - name string - minimalQPS float64 - maximalQPS float64 - clientCount int - traffic Traffic + name string + minimalQPS float64 + maximalQPS float64 + clientCount int + traffic Traffic + requestProgress bool // Request progress notifications while watching this traffic } type Traffic interface { diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 9038d9618..79ba408a6 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -31,7 +31,7 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/model" ) -func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64) [][]watchResponse { +func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, requestProgress bool) [][]watchResponse { mux := sync.Mutex{} var wg sync.WaitGroup memberResponses := make([][]watchResponse, len(clus.Procs)) @@ -52,7 +52,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd go func(i int, c *clientv3.Client) { defer wg.Done() defer c.Close() - responses := watchMember(ctx, t, c, memberChan) + responses := watchMember(ctx, t, c, memberChan, requestProgress) mux.Lock() memberResponses[i] = responses mux.Unlock() @@ -71,7 +71,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd } // watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. -func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64) (resps []watchResponse) { +func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, requestProgress bool) (resps []watchResponse) { var maxRevision int64 = 0 var lastRevision int64 = 0 ctx, cancel := context.WithCancel(ctx) @@ -101,6 +101,9 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis } } case resp := <-watch: + if requestProgress { + c.RequestProgress(ctx) + } if resp.Err() == nil { resps = append(resps, watchResponse{resp, time.Now()}) } else if !resp.Canceled {