clientv3: revert the client side change in 14547

In order to fix https://github.com/etcd-io/etcd/issues/12385,
PR https://github.com/etcd-io/etcd/pull/14322 introduced a change
in which the client side may retry based on the error message
returned from server side.

This is not good, as it's too fragile and it's also changed the
protocol between client and server. Please see the discussion
in https://github.com/kubernetes/kubernetes/pull/114403

Note: The issue https://github.com/etcd-io/etcd/issues/12385 only
happens when auth is enabled, and client side reuse the same client
to watch.

So we decided to rollback the change on 3.5, reasons:
1.K8s doesn't enable auth at all. It has no any impact on K8s.
2.It's very easy for client application to workaround the issue.
  The client just needs to create a new client each time before watching.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
Benjamin Wang
2022-12-15 02:12:49 +08:00
parent 127e9c05b0
commit 1fdfb4292c
2 changed files with 0 additions and 59 deletions

View File

@@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
@@ -589,26 +588,6 @@ func (w *watchGrpcStream) run() {
switch {
case pbresp.Created:
cancelReasonError := v3rpc.Error(errors.New(pbresp.CancelReason))
if shouldRetryWatch(cancelReasonError) {
var newErr error
if wc, newErr = w.newWatchClient(); newErr != nil {
w.lg.Error("failed to create a new watch client", zap.Error(newErr))
return
}
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil {
if err := wc.Send(ws.initReq.toPB()); err != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
}
cur = nil
continue
}
// response to head of queue creation
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil {
@@ -718,11 +697,6 @@ func (w *watchGrpcStream) run() {
}
}
func shouldRetryWatch(cancelReasonError error) bool {
return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAuthToken.Error()) == 0) ||
(strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldRevision.Error()) == 0)
}
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
// streams are marked as nil in the queue since the head must wait for its inflight registration.
func (w *watchGrpcStream) nextResume() *watcherStream {