mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #7582 from heyitsanthony/fix-watch-stream-leak
clientv3: use waitgroup to wait for substream goroutine teardown
This commit is contained in:
commit
36ece32a61
@ -132,6 +132,8 @@ type watchGrpcStream struct {
|
|||||||
errc chan error
|
errc chan error
|
||||||
// closingc gets the watcherStream of closing watchers
|
// closingc gets the watcherStream of closing watchers
|
||||||
closingc chan *watcherStream
|
closingc chan *watcherStream
|
||||||
|
// wg is Done when all substream goroutines have exited
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
// resumec closes to signal that all substreams should begin resuming
|
// resumec closes to signal that all substreams should begin resuming
|
||||||
resumec chan struct{}
|
resumec chan struct{}
|
||||||
@ -406,7 +408,7 @@ func (w *watchGrpcStream) run() {
|
|||||||
for range closing {
|
for range closing {
|
||||||
w.closeSubstream(<-w.closingc)
|
w.closeSubstream(<-w.closingc)
|
||||||
}
|
}
|
||||||
|
w.wg.Wait()
|
||||||
w.owner.closeStream(w)
|
w.owner.closeStream(w)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -431,6 +433,7 @@ func (w *watchGrpcStream) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ws.donec = make(chan struct{})
|
ws.donec = make(chan struct{})
|
||||||
|
w.wg.Add(1)
|
||||||
go w.serveSubstream(ws, w.resumec)
|
go w.serveSubstream(ws, w.resumec)
|
||||||
|
|
||||||
// queue up for watcher creation/resume
|
// queue up for watcher creation/resume
|
||||||
@ -576,6 +579,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
|||||||
if !resuming {
|
if !resuming {
|
||||||
w.closingc <- ws
|
w.closingc <- ws
|
||||||
}
|
}
|
||||||
|
w.wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
emptyWr := &WatchResponse{}
|
emptyWr := &WatchResponse{}
|
||||||
@ -674,6 +678,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ws.donec = make(chan struct{})
|
ws.donec = make(chan struct{})
|
||||||
|
w.wg.Add(1)
|
||||||
go w.serveSubstream(ws, w.resumec)
|
go w.serveSubstream(ws, w.resumec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user