mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
parent
23cced240b
commit
12c7e4a9f8
@ -285,7 +285,12 @@ func (w *watcher) Close() (err error) {
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) Close() (err error) {
|
||||
close(w.stopc)
|
||||
w.mu.Lock()
|
||||
if w.stopc != nil {
|
||||
close(w.stopc)
|
||||
w.stopc = nil
|
||||
}
|
||||
w.mu.Unlock()
|
||||
<-w.donec
|
||||
select {
|
||||
case err = <-w.errc:
|
||||
@ -348,11 +353,17 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
|
||||
|
||||
// closeStream closes the watcher resources and removes it
|
||||
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
|
||||
w.mu.Lock()
|
||||
// cancels request stream; subscriber receives nil channel
|
||||
close(ws.initReq.retc)
|
||||
// close subscriber's channel
|
||||
close(ws.outc)
|
||||
delete(w.streams, ws.id)
|
||||
if len(w.streams) == 0 && w.stopc != nil {
|
||||
close(w.stopc)
|
||||
w.stopc = nil
|
||||
}
|
||||
w.mu.Unlock()
|
||||
}
|
||||
|
||||
// run is the root of the goroutines for managing a watcher client
|
||||
@ -378,6 +389,7 @@ func (w *watchGrpcStream) run() {
|
||||
|
||||
var pendingReq, failedReq *watchRequest
|
||||
curReqC := w.reqc
|
||||
stopc := w.stopc
|
||||
cancelSet := make(map[int64]struct{})
|
||||
|
||||
for {
|
||||
@ -446,7 +458,7 @@ func (w *watchGrpcStream) run() {
|
||||
failedReq = pendingReq
|
||||
}
|
||||
cancelSet = make(map[int64]struct{})
|
||||
case <-w.stopc:
|
||||
case <-stopc:
|
||||
return
|
||||
}
|
||||
|
||||
@ -586,9 +598,7 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
|
||||
}
|
||||
}
|
||||
|
||||
w.mu.Lock()
|
||||
w.closeStream(ws)
|
||||
w.mu.Unlock()
|
||||
// lazily send cancel message if events on missing id
|
||||
}
|
||||
|
||||
@ -616,13 +626,14 @@ func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
|
||||
// openWatchClient retries opening a watchclient until retryConnection fails
|
||||
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||
for {
|
||||
select {
|
||||
case <-w.stopc:
|
||||
w.mu.Lock()
|
||||
stopc := w.stopc
|
||||
w.mu.Unlock()
|
||||
if stopc == nil {
|
||||
if err == nil {
|
||||
err = context.Canceled
|
||||
}
|
||||
return nil, err
|
||||
default:
|
||||
}
|
||||
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
|
||||
break
|
||||
|
Loading…
x
Reference in New Issue
Block a user