diff --git a/client/v3/watch.go b/client/v3/watch.go index bb0b5485e..e35eb528c 100644 --- a/client/v3/watch.go +++ b/client/v3/watch.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "strings" "sync" "time" @@ -46,6 +45,11 @@ const ( InvalidWatchID = -1 ) +var ( + errMsgGRPCInvalidAuthToken = v3rpc.ErrGRPCInvalidAuthToken.Error() + errMsgGRPCAuthOldRevision = v3rpc.ErrGRPCAuthOldRevision.Error() +) + type Event mvccpb.Event type WatchChan <-chan WatchResponse @@ -588,8 +592,7 @@ func (w *watchGrpcStream) run() { switch { case pbresp.Created: - cancelReasonError := v3rpc.Error(errors.New(pbresp.CancelReason)) - if shouldRetryWatch(cancelReasonError) { + if pbresp.Canceled && shouldRetryWatch(pbresp.CancelReason) { var newErr error if wc, newErr = w.newWatchClient(); newErr != nil { w.lg.Error("failed to create a new watch client", zap.Error(newErr)) @@ -717,9 +720,12 @@ func (w *watchGrpcStream) run() { } } -func shouldRetryWatch(cancelReasonError error) bool { - return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAuthToken.Error()) == 0) || - (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldRevision.Error()) == 0) +func shouldRetryWatch(cancelReason string) bool { + if cancelReason == "" { + return false + } + return (cancelReason == errMsgGRPCInvalidAuthToken) || + (cancelReason == errMsgGRPCAuthOldRevision) } // nextResume chooses the next resuming to register with the grpc stream. Abandoned diff --git a/client/v3/watch_test.go b/client/v3/watch_test.go index 2a56ca4a9..0a94f08cd 100644 --- a/client/v3/watch_test.go +++ b/client/v3/watch_test.go @@ -17,7 +17,10 @@ package clientv3 import ( "testing" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" ) func TestEvent(t *testing.T) { @@ -53,3 +56,39 @@ func TestEvent(t *testing.T) { } } } + +func TestShouldRetryWatch(t *testing.T) { + testCases := []struct { + name string + msg string + expectedRetry bool + }{ + { + name: "equal to ErrGRPCInvalidAuthToken", + msg: rpctypes.ErrGRPCInvalidAuthToken.Error(), + expectedRetry: true, + }, + { + name: "equal to ErrGRPCAuthOldRevision", + msg: rpctypes.ErrGRPCAuthOldRevision.Error(), + expectedRetry: true, + }, + { + name: "valid grpc error but not equal to ErrGRPCInvalidAuthToken or ErrGRPCAuthOldRevision", + msg: rpctypes.ErrGRPCUserEmpty.Error(), + expectedRetry: false, + }, + { + name: "invalid grpc error and not equal to ErrGRPCInvalidAuthToken or ErrGRPCAuthOldRevision", + msg: "whatever error message", + expectedRetry: false, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expectedRetry, shouldRetryWatch(tc.msg)) + }) + } +}