From deef16b376b7db1b3b5383d0b325f79e947de987 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 19 Sep 2016 15:11:11 -0700 Subject: [PATCH 1/2] integration: test client watchers with overlapped context cancels --- clientv3/integration/watch_test.go | 60 ++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index d01613daf..395452c36 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -759,3 +759,63 @@ func TestWatchCancelOnServer(t *testing.T) { t.Fatalf("expected 0 watchers, got %q", watchers) } } + +// TestWatchOverlapContextCancel stresses the watcher stream teardown path by +// creating/canceling watchers to ensure that new watchers are not taken down +// by a torn down watch stream. The sort of race that's being detected: +// 1. create w1 using a cancelable ctx with %v as "ctx" +// 2. cancel ctx +// 3. watcher client begins tearing down watcher grpc stream since no more watchers +// 3. start creating watcher w2 using a new "ctx" (not canceled), attaches to old grpc stream +// 4. watcher client finishes tearing down stream on "ctx" +// 5. w2 comes back canceled +func TestWatchOverlapContextCancel(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cli := clus.RandClient() + if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil { + t.Fatal(err) + } + + // each unique context "%v" has a unique grpc stream + n := 100 + ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5) + for i := range ctxs { + // make "%v" unique + ctxs[i] = context.WithValue(context.TODO(), "key", i) + // limits the maximum number of outstanding watchers per stream + ctxc[i] = make(chan struct{}, 2) + } + ch := make(chan struct{}, n) + // issue concurrent watches with cancel + for i := 0; i < n; i++ { + go func() { + defer func() { ch <- struct{}{} }() + idx := rand.Intn(len(ctxs)) + ctx, cancel := context.WithCancel(ctxs[idx]) + ctxc[idx] <- struct{}{} + ch := cli.Watch(ctx, "abc", clientv3.WithRev(1)) + if _, ok := <-ch; !ok { + t.Fatalf("unexpected closed channel") + } + // randomize how cancel overlaps with watch creation + if rand.Intn(2) == 0 { + <-ctxc[idx] + cancel() + } else { + cancel() + <-ctxc[idx] + } + }() + } + // join on watches + for i := 0; i < n; i++ { + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for completed watch") + } + } +} From a32518006c12af0841a14560a21e53ae71ec9882 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 20 Sep 2016 16:51:58 -0700 Subject: [PATCH 2/2] clientv3: process closed watcherStreams in watcherGrpcStream run loop Was racing with Watch() when closing the grpc stream on no watchers. Fixes #6476 --- clientv3/watch.go | 46 +++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index afae2b59a..28eb491e4 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -131,6 +131,8 @@ type watchGrpcStream struct { donec chan struct{} // errc transmits errors from grpc Recv to the watch stream reconn logic errc chan error + // closingc gets the watcherStream of closing watchers + closingc chan *watcherStream // the error that closed the watch stream closeErr error @@ -203,11 +205,12 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { cancel: cancel, streams: make(map[int64]*watcherStream), - respc: make(chan *pb.WatchResponse), - reqc: make(chan *watchRequest), - stopc: make(chan struct{}), - donec: make(chan struct{}), - errc: make(chan error, 1), + respc: make(chan *pb.WatchResponse), + reqc: make(chan *watchRequest), + stopc: make(chan struct{}), + donec: make(chan struct{}), + errc: make(chan error, 1), + closingc: make(chan *watcherStream), } go wgs.run() return wgs @@ -268,7 +271,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case reqc <- wr: ok = true case <-wr.ctx.Done(): - wgs.stopIfEmpty() case <-donec: if wgs.closeErr != nil { closeCh <- WatchResponse{closeErr: wgs.closeErr} @@ -378,15 +380,19 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq go w.serveStream(ws) } -// closeStream closes the watcher resources and removes it -func (w *watchGrpcStream) closeStream(ws *watcherStream) { +func (w *watchGrpcStream) closeStream(ws *watcherStream) bool { w.mu.Lock() // cancels request stream; subscriber receives nil channel close(ws.initReq.retc) // close subscriber's channel close(ws.outc) delete(w.streams, ws.id) + empty := len(w.streams) == 0 + if empty && w.stopc != nil { + w.stopc = nil + } w.mu.Unlock() + return empty } // run is the root of the goroutines for managing a watcher client @@ -491,6 +497,10 @@ func (w *watchGrpcStream) run() { cancelSet = make(map[int64]struct{}) case <-stopc: return + case ws := <-w.closingc: + if w.closeStream(ws) { + return + } } // send failed; queue for retry @@ -553,6 +563,15 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { // serveStream forwards watch responses from run() to the subscriber func (w *watchGrpcStream) serveStream(ws *watcherStream) { + defer func() { + // signal that this watcherStream is finished + select { + case w.closingc <- ws: + case <-w.donec: + w.closeStream(ws) + } + }() + var closeErr error emptyWr := &WatchResponse{} wrs := []*WatchResponse{} @@ -641,20 +660,9 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) { } } - w.closeStream(ws) - w.stopIfEmpty() // lazily send cancel message if events on missing id } -func (wgs *watchGrpcStream) stopIfEmpty() { - wgs.mu.Lock() - if len(wgs.streams) == 0 && wgs.stopc != nil { - close(wgs.stopc) - wgs.stopc = nil - } - wgs.mu.Unlock() -} - func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { ws, rerr := w.resume() if rerr != nil {