diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 2c070b31c..42d196ca2 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -122,23 +122,23 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { defer func() { stopc <- struct{}{} }() wps.sendLoop() }() - if leaderc != nil { - go func() { - defer func() { stopc <- struct{}{} }() - select { - case <-leaderc: - case <-ctx.Done(): - } - }() - } + // tear down watch if leader goes down or entire watch proxy is terminated + go func() { + defer func() { stopc <- struct{}{} }() + select { + case <-leaderc: + case <-ctx.Done(): + case <-wp.ctx.Done(): + } + }() <-stopc + cancel() + // recv/send may only shutdown after function exits; // goroutine notifies proxy that stream is through go func() { - if leaderc != nil { - <-stopc - } + <-stopc <-stopc wps.close() wp.wg.Done()