From af25936fb796e06dc7e902cd985e7796e7430f6c Mon Sep 17 00:00:00 2001 From: Peter Wortmann Date: Fri, 17 Feb 2023 22:22:40 +0000 Subject: [PATCH] tests/integration: Demonstrate manual progress notification race This will fail basically every time, as the progress notification request catches the watcher in an asynchronised state. Signed-off-by: Peter Wortmann --- tests/integration/v3_watch_test.go | 68 ++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index c2a4fa576..ead81abc1 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1397,3 +1397,71 @@ func TestV3WatchCloseCancelRace(t *testing.T) { t.Fatalf("expected %s watch, got %s", expected, minWatches) } } + +// TestV3WatchProgressWaitsForSync checks that progress notifications +// don't get sent until the watcher is synchronised +func TestV3WatchProgressWaitsForSync(t *testing.T) { + + // Disable for gRPC proxy, as it does not support requesting + // progress notifications + if integration.ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } + + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + client := clus.RandClient() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Write a couple values into key to make sure there's a + // non-trivial amount of history. + count := 1001 + t.Logf("Writing key 'foo' %d times", count) + for i := 0; i < count; i++ { + _, err := client.Put(ctx, "foo", fmt.Sprintf("bar%d", i)) + require.NoError(t, err) + } + + // Create watch channel starting at revision 1 (i.e. it starts + // unsynced because of the update above) + wch := client.Watch(ctx, "foo", clientv3.WithRev(1)) + + // Immediately request a progress notification. As the client + // is unsynchronised, the server will have to defer the + // notification internally. + err := client.RequestProgress(ctx) + require.NoError(t, err) + + // Verify that we get the watch responses first. Note that + // events might be spread across multiple packets. + var event_count = 0 + for event_count < count { + wr := <-wch + if wr.Err() != nil { + t.Fatal(fmt.Errorf("watch error: %w", wr.Err())) + } + if wr.IsProgressNotify() { + t.Fatal("Progress notification from unsynced client!") + } + if wr.Header.Revision != int64(count+1) { + t.Fatal("Incomplete watch response!") + } + event_count += len(wr.Events) + } + + // ... followed by the requested progress notification + wr2 := <-wch + if wr2.Err() != nil { + t.Fatal(fmt.Errorf("watch error: %w", wr2.Err())) + } + if !wr2.IsProgressNotify() { + t.Fatal("Did not receive progress notification!") + } + if wr2.Header.Revision != int64(count+1) { + t.Fatal("Wrong revision in progress notification!") + } +}