From 712090fc09ccc292fe37a0b563843bdbe8a65827 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 4 May 2016 23:39:16 -0700 Subject: [PATCH] clientv3: keep watcher client active if reconnect has network error Otherwise watchers created after a long disconnect period will always close immediately. --- clientv3/watch.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 8f3aad677..f2a23ff63 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -201,10 +201,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch } func (w *watcher) Close() error { - select { - case w.stopc <- struct{}{}: - case <-w.donec: - } + close(w.stopc) <-w.donec return v3rpc.Error(<-w.errc) } @@ -274,15 +271,20 @@ func (w *watcher) closeStream(ws *watcherStream) { // run is the root of the goroutines for managing a watcher client func (w *watcher) run() { + var wc pb.Watch_WatchClient + var closeErr error + defer func() { + select { + case w.errc <- closeErr: + default: + } close(w.donec) w.cancel() }() // start a stream with the etcd grpc server - wc, wcerr := w.newWatchClient() - if wcerr != nil { - w.errc <- wcerr + if wc, closeErr = w.newWatchClient(); closeErr != nil { return } @@ -332,8 +334,7 @@ func (w *watcher) run() { // watch client failed to recv; spawn another if possible // TODO report watch client errors from errc? case <-w.errc: - if wc, wcerr = w.newWatchClient(); wcerr != nil { - w.errc <- wcerr + if wc, closeErr = w.newWatchClient(); closeErr != nil { return } curReqC = w.reqc @@ -342,7 +343,6 @@ func (w *watcher) run() { } cancelSet = make(map[int64]struct{}) case <-w.stopc: - w.errc <- nil return } @@ -500,14 +500,20 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) { // openWatchClient retries opening a watchclient until retryConnection fails func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) { for { - if ws, err = w.remote.Watch(w.ctx); ws != nil { + select { + case <-w.stopc: + if err == nil { + err = context.Canceled + } + return nil, err + default: + } + if ws, err = w.remote.Watch(w.ctx); ws != nil && err == nil { break } else if isHaltErr(w.ctx, err) { return nil, v3rpc.Error(err) } - if nerr := w.remoteConn.reconnectWait(w.ctx, nil); nerr != nil { - return nil, nerr - } + err = w.rc.reconnectWait(w.ctx, nil) } return ws, nil }