From 617f1df443b8f7c24d5f7020c56308e35de14e9f Mon Sep 17 00:00:00 2001 From: Hitoshi Mitake Date: Mon, 5 Feb 2024 20:59:06 +0900 Subject: [PATCH] Remove string dependent error handling in watch and auth Signed-off-by: Hitoshi Mitake --- client/v3/watch.go | 32 ------------------------- client/v3/watch_test.go | 39 ------------------------------- tests/integration/v3_auth_test.go | 33 -------------------------- 3 files changed, 104 deletions(-) diff --git a/client/v3/watch.go b/client/v3/watch.go index c5e83cd4e..db12aff29 100644 --- a/client/v3/watch.go +++ b/client/v3/watch.go @@ -46,11 +46,6 @@ const ( InvalidWatchID = -1 ) -var ( - errMsgGRPCInvalidAuthToken = v3rpc.ErrGRPCInvalidAuthToken.Error() - errMsgGRPCAuthOldRevision = v3rpc.ErrGRPCAuthOldRevision.Error() -) - type Event mvccpb.Event type WatchChan <-chan WatchResponse @@ -593,25 +588,6 @@ func (w *watchGrpcStream) run() { switch { case pbresp.Created: - if pbresp.Canceled && shouldRetryWatch(pbresp.CancelReason) { - var newErr error - if wc, newErr = w.newWatchClient(); newErr != nil { - w.lg.Error("failed to create a new watch client", zap.Error(newErr)) - return - } - - if len(w.resuming) != 0 { - if ws := w.resuming[0]; ws != nil { - if err := wc.Send(ws.initReq.toPB()); err != nil { - w.lg.Debug("error when sending request", zap.Error(err)) - } - } - } - - cur = nil - continue - } - // response to head of queue creation if len(w.resuming) != 0 { if ws := w.resuming[0]; ws != nil { @@ -721,14 +697,6 @@ func (w *watchGrpcStream) run() { } } -func shouldRetryWatch(cancelReason string) bool { - if cancelReason == "" { - return false - } - return (cancelReason == errMsgGRPCInvalidAuthToken) || - (cancelReason == errMsgGRPCAuthOldRevision) -} - // nextResume chooses the next resuming to register with the grpc stream. Abandoned // streams are marked as nil in the queue since the head must wait for its inflight registration. func (w *watchGrpcStream) nextResume() *watcherStream { diff --git a/client/v3/watch_test.go b/client/v3/watch_test.go index 0a94f08cd..2a56ca4a9 100644 --- a/client/v3/watch_test.go +++ b/client/v3/watch_test.go @@ -17,10 +17,7 @@ package clientv3 import ( "testing" - "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/api/v3/mvccpb" - "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" ) func TestEvent(t *testing.T) { @@ -56,39 +53,3 @@ func TestEvent(t *testing.T) { } } } - -func TestShouldRetryWatch(t *testing.T) { - testCases := []struct { - name string - msg string - expectedRetry bool - }{ - { - name: "equal to ErrGRPCInvalidAuthToken", - msg: rpctypes.ErrGRPCInvalidAuthToken.Error(), - expectedRetry: true, - }, - { - name: "equal to ErrGRPCAuthOldRevision", - msg: rpctypes.ErrGRPCAuthOldRevision.Error(), - expectedRetry: true, - }, - { - name: "valid grpc error but not equal to ErrGRPCInvalidAuthToken or ErrGRPCAuthOldRevision", - msg: rpctypes.ErrGRPCUserEmpty.Error(), - expectedRetry: false, - }, - { - name: "invalid grpc error and not equal to ErrGRPCInvalidAuthToken or ErrGRPCAuthOldRevision", - msg: "whatever error message", - expectedRetry: false, - }, - } - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - assert.Equal(t, tc.expectedRetry, shouldRetryWatch(tc.msg)) - }) - } -} diff --git a/tests/integration/v3_auth_test.go b/tests/integration/v3_auth_test.go index 74841b2aa..89108bacc 100644 --- a/tests/integration/v3_auth_test.go +++ b/tests/integration/v3_auth_test.go @@ -420,39 +420,6 @@ func TestV3AuthOldRevConcurrent(t *testing.T) { wg.Wait() } -func TestV3AuthWatchAndTokenExpire(t *testing.T) { - integration.BeforeTest(t) - clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1, AuthTokenTTL: 3}) - defer clus.Terminate(t) - - ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) - defer cancel() - - authSetupRoot(t, integration.ToGRPC(clus.Client(0)).Auth) - - c, cerr := integration.NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"}) - if cerr != nil { - t.Fatal(cerr) - } - defer c.Close() - - _, err := c.Put(ctx, "key", "val") - if err != nil { - t.Fatalf("Unexpected error from Put: %v", err) - } - - // The first watch gets a valid auth token through watcher.newWatcherGrpcStream() - // We should discard the first one by waiting TTL after the first watch. - wChan := c.Watch(ctx, "key", clientv3.WithRev(1)) - <-wChan - - time.Sleep(5 * time.Second) - - wChan = c.Watch(ctx, "key", clientv3.WithRev(1)) - watchResponse := <-wChan - testutil.AssertNil(t, watchResponse.Err()) -} - func TestV3AuthWatchErrorAndWatchId0(t *testing.T) { integration.BeforeTest(t) clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})