Merge pull request #11452 from tedyu/watch-grpc-send-err

clientv3: log warning in case of error sending request
This commit is contained in:
Jingyi Hu 2019-12-20 15:31:02 -08:00 committed by GitHub
commit 5adad5e224
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -25,6 +25,7 @@ import (
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
@ -140,6 +141,7 @@ type watcher struct {
// streams holds all the active grpc streams keyed by ctx value.
streams map[string]*watchGrpcStream
lg *zap.Logger
}
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
@ -242,6 +244,7 @@ func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
}
if c != nil {
w.callOpts = c.callOpts
w.lg = c.lg
}
return w
}
@ -541,10 +544,14 @@ func (w *watchGrpcStream) run() {
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
if err := wc.Send(ws.initReq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
case *progressRequest:
wc.Send(wreq.toPB())
if err := wc.Send(wreq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
// new events from the watch client
@ -568,7 +575,9 @@ func (w *watchGrpcStream) run() {
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
if err := wc.Send(ws.initReq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
// reset for next iteration
@ -613,7 +622,9 @@ func (w *watchGrpcStream) run() {
},
}
req := &pb.WatchRequest{RequestUnion: cr}
wc.Send(req)
if err := wc.Send(req); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
// watch client failed on Recv; spawn another if possible
@ -626,7 +637,9 @@ func (w *watchGrpcStream) run() {
return
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
if err := wc.Send(ws.initReq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
cancelSet = make(map[int64]struct{})