Merge pull request #6636 from heyitsanthony/watch-resume-close

clientv3: only receive from closing streams in Watcher close
This commit is contained in:
Anthony Romano 2016-10-21 10:06:03 -07:00 committed by GitHub
commit 60c0a5503e
3 changed files with 27 additions and 1 deletions

View File

@ -86,6 +86,7 @@ func NewFromConfigFile(path string) (*Client, error) {
// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
c.Watcher.Close()
return toErr(c.ctx, c.conn.Close())
}

View File

@ -868,3 +868,26 @@ func TestWatchCancelAndCloseClient(t *testing.T) {
<-donec
clus.TakeClient(0)
}
// TestWatchStressResumeClose establishes a bunch of watchers, disconnects
// to put them in resuming mode, cancels them so some resumes by cancel fail,
// then closes the watcher interface to ensure correct clean up.
func TestWatchStressResumeClose(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
ctx, cancel := context.WithCancel(context.Background())
// add more watches than can be resumed before the cancel
wchs := make([]clientv3.WatchChan, 2000)
for i := range wchs {
wchs[i] = cli.Watch(ctx, "abc")
}
clus.Members[0].DropConnections()
cancel()
if err := cli.Close(); err != nil {
t.Fatal(err)
}
clus.TakeClient(0)
}

View File

@ -396,15 +396,17 @@ func (w *watchGrpcStream) run() {
for _, ws := range w.substreams {
if _, ok := closing[ws]; !ok {
close(ws.recvc)
closing[ws] = struct{}{}
}
}
for _, ws := range w.resuming {
if _, ok := closing[ws]; ws != nil && !ok {
close(ws.recvc)
closing[ws] = struct{}{}
}
}
w.joinSubstreams()
for toClose := len(w.substreams) + len(w.resuming); toClose > 0; toClose-- {
for range closing {
w.closeSubstream(<-w.closingc)
}