diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index b01b01814..395017320 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -82,7 +82,18 @@ type serverWatchStream struct { nextWatcherID int64 } +func (sws *serverWatchStream) close() { + close(sws.watchCh) + close(sws.ctrlCh) + for _, ws := range sws.singles { + ws.stop() + } + sws.groups.stop() +} + func (sws *serverWatchStream) recvLoop() error { + defer sws.close() + for { req, err := sws.gRPCStream.Recv() if err == io.EOF { diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go index d903017b1..25d40c19e 100644 --- a/proxy/grpcproxy/watcher_groups.go +++ b/proxy/grpcproxy/watcher_groups.go @@ -86,3 +86,11 @@ func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingl return false } + +func (wgs *watchergroups) stop() { + wgs.mu.Lock() + defer wgs.mu.Unlock() + for _, wg := range wgs.groups { + wg.stop() + } +}