clientv3: handle watchGrpcStream shutdown if prior to goroutine start

Fixes #6141
This commit is contained in:
Anthony Romano 2016-08-09 19:28:16 -07:00
parent 88a77f30e1
commit 1c83a46c6d

View File

@ -268,6 +268,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
wgs.stopIfEmpty()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
@ -385,10 +386,6 @@ func (w *watchGrpcStream) closeStream(ws *watcherStream) {
// close subscriber's channel
close(ws.outc)
delete(w.streams, ws.id)
if len(w.streams) == 0 && w.stopc != nil {
close(w.stopc)
w.stopc = nil
}
w.mu.Unlock()
}
@ -408,6 +405,14 @@ func (w *watchGrpcStream) run() {
w.cancel()
}()
// already stopped?
w.mu.RLock()
stopc := w.stopc
w.mu.RUnlock()
if stopc == nil {
return
}
// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
@ -415,7 +420,6 @@ func (w *watchGrpcStream) run() {
var pendingReq, failedReq *watchRequest
curReqC := w.reqc
stopc := w.stopc
cancelSet := make(map[int64]struct{})
for {
@ -638,9 +642,19 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
}
w.closeStream(ws)
w.stopIfEmpty()
// lazily send cancel message if events on missing id
}
func (wgs *watchGrpcStream) stopIfEmpty() {
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
}
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {