mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12189 from jingyih/automated-cherry-pick-of-#11452-#12187-upstream-release-3.4
Automated cherry pick of #11452 #12187 on release 3.4
This commit is contained in:
commit
8a4afdbcc2
@ -141,6 +141,7 @@ type watcher struct {
|
|||||||
|
|
||||||
// streams holds all the active grpc streams keyed by ctx value.
|
// streams holds all the active grpc streams keyed by ctx value.
|
||||||
streams map[string]*watchGrpcStream
|
streams map[string]*watchGrpcStream
|
||||||
|
lg *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
|
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
|
||||||
@ -177,6 +178,8 @@ type watchGrpcStream struct {
|
|||||||
resumec chan struct{}
|
resumec chan struct{}
|
||||||
// closeErr is the error that closed the watch stream
|
// closeErr is the error that closed the watch stream
|
||||||
closeErr error
|
closeErr error
|
||||||
|
|
||||||
|
lg *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchStreamRequest is a union of the supported watch request operation types
|
// watchStreamRequest is a union of the supported watch request operation types
|
||||||
@ -243,6 +246,7 @@ func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
|
|||||||
}
|
}
|
||||||
if c != nil {
|
if c != nil {
|
||||||
w.callOpts = c.callOpts
|
w.callOpts = c.callOpts
|
||||||
|
w.lg = c.lg
|
||||||
}
|
}
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
@ -274,6 +278,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
|||||||
errc: make(chan error, 1),
|
errc: make(chan error, 1),
|
||||||
closingc: make(chan *watcherStream),
|
closingc: make(chan *watcherStream),
|
||||||
resumec: make(chan struct{}),
|
resumec: make(chan struct{}),
|
||||||
|
lg: w.lg,
|
||||||
}
|
}
|
||||||
go wgs.run()
|
go wgs.run()
|
||||||
return wgs
|
return wgs
|
||||||
@ -545,10 +550,18 @@ func (w *watchGrpcStream) run() {
|
|||||||
w.resuming = append(w.resuming, ws)
|
w.resuming = append(w.resuming, ws)
|
||||||
if len(w.resuming) == 1 {
|
if len(w.resuming) == 1 {
|
||||||
// head of resume queue, can register a new watcher
|
// head of resume queue, can register a new watcher
|
||||||
wc.Send(ws.initReq.toPB())
|
if err := wc.Send(ws.initReq.toPB()); err != nil {
|
||||||
|
if w.lg != nil {
|
||||||
|
w.lg.Debug("error when sending request", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case *progressRequest:
|
case *progressRequest:
|
||||||
wc.Send(wreq.toPB())
|
if err := wc.Send(wreq.toPB()); err != nil {
|
||||||
|
if w.lg != nil {
|
||||||
|
w.lg.Debug("error when sending request", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// new events from the watch client
|
// new events from the watch client
|
||||||
@ -572,7 +585,11 @@ func (w *watchGrpcStream) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ws := w.nextResume(); ws != nil {
|
if ws := w.nextResume(); ws != nil {
|
||||||
wc.Send(ws.initReq.toPB())
|
if err := wc.Send(ws.initReq.toPB()); err != nil {
|
||||||
|
if w.lg != nil {
|
||||||
|
w.lg.Debug("error when sending request", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset for next iteration
|
// reset for next iteration
|
||||||
@ -617,9 +634,13 @@ func (w *watchGrpcStream) run() {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
req := &pb.WatchRequest{RequestUnion: cr}
|
req := &pb.WatchRequest{RequestUnion: cr}
|
||||||
lg.Info("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
|
if w.lg != nil {
|
||||||
|
w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
|
||||||
|
}
|
||||||
if err := wc.Send(req); err != nil {
|
if err := wc.Send(req); err != nil {
|
||||||
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
|
if w.lg != nil {
|
||||||
|
w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -633,7 +654,11 @@ func (w *watchGrpcStream) run() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if ws := w.nextResume(); ws != nil {
|
if ws := w.nextResume(); ws != nil {
|
||||||
wc.Send(ws.initReq.toPB())
|
if err := wc.Send(ws.initReq.toPB()); err != nil {
|
||||||
|
if w.lg != nil {
|
||||||
|
w.lg.Debug("error when sending request", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cancelSet = make(map[int64]struct{})
|
cancelSet = make(map[int64]struct{})
|
||||||
|
|
||||||
@ -651,9 +676,13 @@ func (w *watchGrpcStream) run() {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
req := &pb.WatchRequest{RequestUnion: cr}
|
req := &pb.WatchRequest{RequestUnion: cr}
|
||||||
lg.Info("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
|
if w.lg != nil {
|
||||||
|
w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
|
||||||
|
}
|
||||||
if err := wc.Send(req); err != nil {
|
if err := wc.Send(req); err != nil {
|
||||||
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
|
if w.lg != nil {
|
||||||
|
w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
w.closeSubstream(ws)
|
w.closeSubstream(ws)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user