From a66c25121b90b9f95b7c9f1c905bf56a19be1061 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 12 Oct 2016 16:42:27 -0700 Subject: [PATCH 1/2] integration: stress closing while resuming watchers --- clientv3/integration/watch_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 058745e22..6a8f3cb5e 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -868,3 +868,26 @@ func TestWatchCancelAndCloseClient(t *testing.T) { <-donec clus.TakeClient(0) } + +// TestWatchStressResumeClose establishes a bunch of watchers, disconnects +// to put them in resuming mode, cancels them so some resumes by cancel fail, +// then closes the watcher interface to ensure correct clean up. +func TestWatchStressResumeClose(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + cli := clus.Client(0) + + ctx, cancel := context.WithCancel(context.Background()) + // add more watches than can be resumed before the cancel + wchs := make([]clientv3.WatchChan, 2000) + for i := range wchs { + wchs[i] = cli.Watch(ctx, "abc") + } + clus.Members[0].DropConnections() + cancel() + if err := cli.Close(); err != nil { + t.Fatal(err) + } + clus.TakeClient(0) +} From c100e40715c623cbc56012ca957a4b2676870830 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 12 Oct 2016 15:54:12 -0700 Subject: [PATCH 2/2] clientv3: only receive from closing streams in Watcher close Was overcounting the number of expected closing messages; the resuming list may have nil entries. Also the full client wasn't closing the watcher client, only canceling its context, so client closes weren't joining with the watcher shutdown. Fixes #6605 --- clientv3/client.go | 1 + clientv3/watch.go | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/clientv3/client.go b/clientv3/client.go index 148addea8..7f7d42770 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -86,6 +86,7 @@ func NewFromConfigFile(path string) (*Client, error) { // Close shuts down the client's etcd connections. func (c *Client) Close() error { c.cancel() + c.Watcher.Close() return toErr(c.ctx, c.conn.Close()) } diff --git a/clientv3/watch.go b/clientv3/watch.go index 1d652c81a..5281f8f5c 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -396,15 +396,17 @@ func (w *watchGrpcStream) run() { for _, ws := range w.substreams { if _, ok := closing[ws]; !ok { close(ws.recvc) + closing[ws] = struct{}{} } } for _, ws := range w.resuming { if _, ok := closing[ws]; ws != nil && !ok { close(ws.recvc) + closing[ws] = struct{}{} } } w.joinSubstreams() - for toClose := len(w.substreams) + len(w.resuming); toClose > 0; toClose-- { + for range closing { w.closeSubstream(<-w.closingc) }