diff --git a/clientv3/watch.go b/clientv3/watch.go index a224ddc3b..e6aa440a0 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -16,6 +16,7 @@ package clientv3 import ( "context" + "errors" "fmt" "sync" "time" @@ -333,7 +334,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case <-wr.ctx.Done(): case <-donec: if wgs.closeErr != nil { - closeCh <- WatchResponse{closeErr: wgs.closeErr} + closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} break } // retry; may have dropped stream from no ctxs @@ -348,7 +349,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case <-ctx.Done(): case <-donec: if wgs.closeErr != nil { - closeCh <- WatchResponse{closeErr: wgs.closeErr} + closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} break } // retry; may have dropped stream from no ctxs @@ -432,6 +433,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) { func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { // check watch ID for backward compatibility (<= v3.3) if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { + w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) // failed; no channel close(ws.recvc) return @@ -457,7 +459,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { } // close subscriber's channel if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { - go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr}) + go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr}) } else if ws.outc != nil { close(ws.outc) }