clientv3: log warning in case of error sending request

This commit is contained in:
Ted Yu 2019-12-20 15:08:33 -08:00 committed by jingyih
parent 18dfb9cca3
commit e800c62eca

View File

@ -141,6 +141,7 @@ type watcher struct {
// streams holds all the active grpc streams keyed by ctx value.
streams map[string]*watchGrpcStream
lg *zap.Logger
}
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
@ -243,6 +244,7 @@ func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
}
if c != nil {
w.callOpts = c.callOpts
w.lg = c.lg
}
return w
}
@ -545,10 +547,14 @@ func (w *watchGrpcStream) run() {
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
if err := wc.Send(ws.initReq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
case *progressRequest:
wc.Send(wreq.toPB())
if err := wc.Send(wreq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
// new events from the watch client
@ -572,7 +578,9 @@ func (w *watchGrpcStream) run() {
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
if err := wc.Send(ws.initReq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
// reset for next iteration
@ -633,7 +641,9 @@ func (w *watchGrpcStream) run() {
return
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
if err := wc.Send(ws.initReq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
cancelSet = make(map[int64]struct{})