From deef16b376b7db1b3b5383d0b325f79e947de987 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 19 Sep 2016 15:11:11 -0700 Subject: [PATCH] integration: test client watchers with overlapped context cancels --- clientv3/integration/watch_test.go | 60 ++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index d01613daf..395452c36 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -759,3 +759,63 @@ func TestWatchCancelOnServer(t *testing.T) { t.Fatalf("expected 0 watchers, got %q", watchers) } } + +// TestWatchOverlapContextCancel stresses the watcher stream teardown path by +// creating/canceling watchers to ensure that new watchers are not taken down +// by a torn down watch stream. The sort of race that's being detected: +// 1. create w1 using a cancelable ctx with %v as "ctx" +// 2. cancel ctx +// 3. watcher client begins tearing down watcher grpc stream since no more watchers +// 3. start creating watcher w2 using a new "ctx" (not canceled), attaches to old grpc stream +// 4. watcher client finishes tearing down stream on "ctx" +// 5. w2 comes back canceled +func TestWatchOverlapContextCancel(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cli := clus.RandClient() + if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil { + t.Fatal(err) + } + + // each unique context "%v" has a unique grpc stream + n := 100 + ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5) + for i := range ctxs { + // make "%v" unique + ctxs[i] = context.WithValue(context.TODO(), "key", i) + // limits the maximum number of outstanding watchers per stream + ctxc[i] = make(chan struct{}, 2) + } + ch := make(chan struct{}, n) + // issue concurrent watches with cancel + for i := 0; i < n; i++ { + go func() { + defer func() { ch <- struct{}{} }() + idx := rand.Intn(len(ctxs)) + ctx, cancel := context.WithCancel(ctxs[idx]) + ctxc[idx] <- struct{}{} + ch := cli.Watch(ctx, "abc", clientv3.WithRev(1)) + if _, ok := <-ch; !ok { + t.Fatalf("unexpected closed channel") + } + // randomize how cancel overlaps with watch creation + if rand.Intn(2) == 0 { + <-ctxc[idx] + cancel() + } else { + cancel() + <-ctxc[idx] + } + }() + } + // join on watches + for i := 0; i < n; i++ { + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for completed watch") + } + } +}