Signed-off-by: ganhy4@chinatelecom.cn <ganhy4@chinatelecom.cn>
This commit is contained in:
ganhy4@chinatelecom.cn 2024-03-21 10:30:24 +08:00
parent 6f55dfa26e
commit 5b227645a3

View File

@ -455,12 +455,13 @@ func (w *watcher) closeStream(wgs *watchGRPCStream) {
w.mu.Unlock()
}
func (w *watchGRPCStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
func (w *watchGRPCStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream, closing map[*watcherStream]struct{}) {
// check watch ID for backward compatibility (<= v3.3)
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
// failed; no channel
close(ws.recvc)
closing[ws] = struct{}{}
return
}
ws.id = resp.WatchId
@ -591,7 +592,7 @@ func (w *watchGRPCStream) run() {
// response to head of queue creation
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.addSubstream(pbresp, ws, closing)
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}