From eb9c8d3c2f7632c5cc13d6dd370c69ead79498f0 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Thu, 16 Aug 2018 22:22:59 -0500 Subject: [PATCH] clientv3: return reason to user when server cancels watch This change allows Watch users to retrieve the cancel reason when a watch is canceled by the server. Additionally WatchResponses with closeErr set now have Canceled set true which is in line with the documentation for the Canceled field. --- clientv3/watch.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index a224ddc3b..e6aa440a0 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -16,6 +16,7 @@ package clientv3 import ( "context" + "errors" "fmt" "sync" "time" @@ -333,7 +334,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case <-wr.ctx.Done(): case <-donec: if wgs.closeErr != nil { - closeCh <- WatchResponse{closeErr: wgs.closeErr} + closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} break } // retry; may have dropped stream from no ctxs @@ -348,7 +349,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case <-ctx.Done(): case <-donec: if wgs.closeErr != nil { - closeCh <- WatchResponse{closeErr: wgs.closeErr} + closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} break } // retry; may have dropped stream from no ctxs @@ -432,6 +433,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) { func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { // check watch ID for backward compatibility (<= v3.3) if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { + w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) // failed; no channel close(ws.recvc) return @@ -457,7 +459,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { } // close subscriber's channel if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { - go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr}) + go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr}) } else if ws.outc != nil { close(ws.outc) }