diff --git a/clientv3/watch.go b/clientv3/watch.go index 4ae3a0b33..1af25e41e 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -141,6 +141,7 @@ type watcher struct { // streams holds all the active grpc streams keyed by ctx value. streams map[string]*watchGrpcStream + lg *zap.Logger } // watchGrpcStream tracks all watch resources attached to a single grpc stream. @@ -243,6 +244,7 @@ func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { } if c != nil { w.callOpts = c.callOpts + w.lg = c.lg } return w } @@ -545,10 +547,14 @@ func (w *watchGrpcStream) run() { w.resuming = append(w.resuming, ws) if len(w.resuming) == 1 { // head of resume queue, can register a new watcher - wc.Send(ws.initReq.toPB()) + if err := wc.Send(ws.initReq.toPB()); err != nil { + lg.Warningf("error when sending request: %v", err) + } } case *progressRequest: - wc.Send(wreq.toPB()) + if err := wc.Send(wreq.toPB()); err != nil { + lg.Warningf("error when sending request: %v", err) + } } // new events from the watch client @@ -572,7 +578,9 @@ func (w *watchGrpcStream) run() { } if ws := w.nextResume(); ws != nil { - wc.Send(ws.initReq.toPB()) + if err := wc.Send(ws.initReq.toPB()); err != nil { + lg.Warningf("error when sending request: %v", err) + } } // reset for next iteration @@ -633,7 +641,9 @@ func (w *watchGrpcStream) run() { return } if ws := w.nextResume(); ws != nil { - wc.Send(ws.initReq.toPB()) + if err := wc.Send(ws.initReq.toPB()); err != nil { + lg.Warningf("error when sending request: %v", err) + } } cancelSet = make(map[int64]struct{})