From f0e6c10abaa3c58043f902312324723b601e914c Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Thu, 16 Aug 2018 22:18:20 -0500 Subject: [PATCH 1/2] grpcproxy: return error to client during watch create Now returns errors from checkPermissionForWatch() via the CancelReason field. This allows clients to understand why the watch was canceled. Additionally, this change protects a watch from starting and that otherwise might hang indefinitely. --- proxy/grpcproxy/watch.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 603095f27..4172755c3 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -229,11 +229,14 @@ func (wps *watchProxyStream) recvLoop() error { case *pb.WatchRequest_CreateRequest: cr := uv.CreateRequest - if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied { - // Return WatchResponse which is caused by permission checking if and only if - // the error is permission denied. For other errors (e.g. timeout or connection closed), - // the permission checking mechanism should do nothing for preserving error code. - wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true} + if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil { + wps.watchCh <- &pb.WatchResponse{ + Header: &pb.ResponseHeader{}, + WatchId: -1, + Created: true, + Canceled: true, + CancelReason: err.Error(), + } continue } From eb9c8d3c2f7632c5cc13d6dd370c69ead79498f0 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Thu, 16 Aug 2018 22:22:59 -0500 Subject: [PATCH 2/2] clientv3: return reason to user when server cancels watch This change allows Watch users to retrieve the cancel reason when a watch is canceled by the server. Additionally WatchResponses with closeErr set now have Canceled set true which is in line with the documentation for the Canceled field. --- clientv3/watch.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index a224ddc3b..e6aa440a0 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -16,6 +16,7 @@ package clientv3 import ( "context" + "errors" "fmt" "sync" "time" @@ -333,7 +334,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case <-wr.ctx.Done(): case <-donec: if wgs.closeErr != nil { - closeCh <- WatchResponse{closeErr: wgs.closeErr} + closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} break } // retry; may have dropped stream from no ctxs @@ -348,7 +349,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case <-ctx.Done(): case <-donec: if wgs.closeErr != nil { - closeCh <- WatchResponse{closeErr: wgs.closeErr} + closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} break } // retry; may have dropped stream from no ctxs @@ -432,6 +433,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) { func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { // check watch ID for backward compatibility (<= v3.3) if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { + w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) // failed; no channel close(ws.recvc) return @@ -457,7 +459,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { } // close subscriber's channel if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { - go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr}) + go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr}) } else if ws.outc != nil { close(ws.outc) }