clientv3: only receive from closing streams in Watcher close

Was overcounting the number of expected closing messages; the resuming
list may have nil entries. Also the full client wasn't closing the watcher
client, only canceling its context, so client closes weren't joining with
the watcher shutdown.

Fixes #6605
This commit is contained in:
Anthony Romano 2016-10-12 15:54:12 -07:00
parent a66c25121b
commit c100e40715
2 changed files with 4 additions and 1 deletions

View File

@ -86,6 +86,7 @@ func NewFromConfigFile(path string) (*Client, error) {
// Close shuts down the client's etcd connections. // Close shuts down the client's etcd connections.
func (c *Client) Close() error { func (c *Client) Close() error {
c.cancel() c.cancel()
c.Watcher.Close()
return toErr(c.ctx, c.conn.Close()) return toErr(c.ctx, c.conn.Close())
} }

View File

@ -396,15 +396,17 @@ func (w *watchGrpcStream) run() {
for _, ws := range w.substreams { for _, ws := range w.substreams {
if _, ok := closing[ws]; !ok { if _, ok := closing[ws]; !ok {
close(ws.recvc) close(ws.recvc)
closing[ws] = struct{}{}
} }
} }
for _, ws := range w.resuming { for _, ws := range w.resuming {
if _, ok := closing[ws]; ws != nil && !ok { if _, ok := closing[ws]; ws != nil && !ok {
close(ws.recvc) close(ws.recvc)
closing[ws] = struct{}{}
} }
} }
w.joinSubstreams() w.joinSubstreams()
for toClose := len(w.substreams) + len(w.resuming); toClose > 0; toClose-- { for range closing {
w.closeSubstream(<-w.closingc) w.closeSubstream(<-w.closingc)
} }