From e8473850a22072b2967beaa8742a494b52025373 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 23 Sep 2016 16:40:29 -0700 Subject: [PATCH] integration: test canceling watchers when disconnected --- clientv3/integration/watch_test.go | 80 ++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index efd2ef65c..eab92eb04 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -673,3 +673,83 @@ func TestWatchWithRequireLeader(t *testing.T) { t.Fatalf("expected response, got closed channel") } } + +// TestWatchOverlapContextCancel stresses the watcher stream teardown path by +// creating/canceling watchers to ensure that new watchers are not taken down +// by a torn down watch stream. The sort of race that's being detected: +// 1. create w1 using a cancelable ctx with %v as "ctx" +// 2. cancel ctx +// 3. watcher client begins tearing down watcher grpc stream since no more watchers +// 3. start creating watcher w2 using a new "ctx" (not canceled), attaches to old grpc stream +// 4. watcher client finishes tearing down stream on "ctx" +// 5. w2 comes back canceled +func TestWatchOverlapContextCancel(t *testing.T) { + f := func(clus *integration.ClusterV3) {} + testWatchOverlapContextCancel(t, f) +} + +func TestWatchOverlapDropConnContextCancel(t *testing.T) { + f := func(clus *integration.ClusterV3) { + clus.Members[0].DropConnections() + } + testWatchOverlapContextCancel(t, f) +} + +func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + // each unique context "%v" has a unique grpc stream + n := 100 + ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5) + for i := range ctxs { + // make "%v" unique + ctxs[i] = context.WithValue(context.TODO(), "key", i) + // limits the maximum number of outstanding watchers per stream + ctxc[i] = make(chan struct{}, 2) + } + + // issue concurrent watches on "abc" with cancel + cli := clus.RandClient() + if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil { + t.Fatal(err) + } + ch := make(chan struct{}, n) + for i := 0; i < n; i++ { + go func() { + defer func() { ch <- struct{}{} }() + idx := rand.Intn(len(ctxs)) + ctx, cancel := context.WithCancel(ctxs[idx]) + ctxc[idx] <- struct{}{} + wch := cli.Watch(ctx, "abc", clientv3.WithRev(1)) + f(clus) + select { + case _, ok := <-wch: + if !ok { + t.Fatalf("unexpected closed channel %p", wch) + } + // may take a second or two to reestablish a watcher because of + // grpc backoff policies for disconnects + case <-time.After(5 * time.Second): + t.Errorf("timed out waiting for watch on %p", wch) + } + // randomize how cancel overlaps with watch creation + if rand.Intn(2) == 0 { + <-ctxc[idx] + cancel() + } else { + cancel() + <-ctxc[idx] + } + }() + } + // join on watches + for i := 0; i < n; i++ { + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for completed watch") + } + } +}