Merge 5b227645a3d697bda8b5eaa09a67271ee9f7cf90 into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
Hanying Gan 2024-09-26 22:00:14 +01:00 committed by GitHub
commit d4d78b6436
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -455,12 +455,13 @@ func (w *watcher) closeStream(wgs *watchGRPCStream) {
w.mu.Unlock()
}
func (w *watchGRPCStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
func (w *watchGRPCStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream, closing map[*watcherStream]struct{}) {
// check watch ID for backward compatibility (<= v3.3)
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
// failed; no channel
close(ws.recvc)
closing[ws] = struct{}{}
return
}
ws.id = resp.WatchId
@ -591,7 +592,7 @@ func (w *watchGRPCStream) run() {
// response to head of queue creation
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.addSubstream(pbresp, ws, closing)
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}