diff --git a/api/v3rpc/rpctypes/error.go b/api/v3rpc/rpctypes/error.go index c03e91cfb..73dd84ee7 100644 --- a/api/v3rpc/rpctypes/error.go +++ b/api/v3rpc/rpctypes/error.go @@ -68,19 +68,19 @@ var ( ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err() ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err() ErrGRPCLeaderChanged = status.New(codes.Unavailable, "etcdserver: leader changed").Err() - ErrGRPCNotCapable = status.New(codes.Unavailable, "etcdserver: not capable").Err() + ErrGRPCNotCapable = status.New(codes.FailedPrecondition, "etcdserver: not capable").Err() ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err() ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err() ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err() ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err() ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err() ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err() - ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err() + ErrGPRCNotSupportedForLearner = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err() ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err() - ErrGRPCClusterVersionUnavailable = status.New(codes.Unavailable, "etcdserver: cluster version not found during downgrade").Err() ErrGRPCWrongDowngradeVersionFormat = status.New(codes.InvalidArgument, "etcdserver: wrong downgrade target version format").Err() ErrGRPCInvalidDowngradeTargetVersion = status.New(codes.InvalidArgument, "etcdserver: invalid downgrade target version").Err() + ErrGRPCClusterVersionUnavailable = status.New(codes.FailedPrecondition, "etcdserver: cluster version not found during downgrade").Err() ErrGRPCDowngradeInProcess = status.New(codes.FailedPrecondition, "etcdserver: cluster has a downgrade job in progress").Err() ErrGRPCNoInflightDowngrade = status.New(codes.FailedPrecondition, "etcdserver: no inflight downgrade job").Err() diff --git a/client/v3/retry_interceptor.go b/client/v3/retry_interceptor.go index 9586c334a..9ce2c7696 100644 --- a/client/v3/retry_interceptor.go +++ b/client/v3/retry_interceptor.go @@ -19,6 +19,7 @@ package clientv3 import ( "context" + "errors" "io" "sync" "time" @@ -91,7 +92,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien } continue } - if !isSafeRetry(c.lg, lastErr, callOpts) { + if !isSafeRetry(c, lastErr, callOpts) { return lastErr } } @@ -257,7 +258,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{} return true, err } - return isSafeRetry(s.client.lg, err, s.callOpts), err + return isSafeRetry(s.client, err, s.callOpts), err } func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) { @@ -297,17 +298,28 @@ func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) erro } // isSafeRetry returns "true", if request is safe for retry with the given error. -func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool { +func isSafeRetry(c *Client, err error, callOpts *options) bool { if isContextError(err) { return false } + + // Situation when learner refuses RPC it is supposed to not serve is from the server + // perspective not retryable. + // But for backward-compatibility reasons we need to support situation that + // customer provides mix of learners (not yet voters) and voters with an + // expectation to pick voter in the next attempt. + // TODO: Ideally client should be 'aware' which endpoint represents: leader/voter/learner with high probability. + if errors.Is(err, rpctypes.ErrGPRCNotSupportedForLearner) && len(c.Endpoints()) > 1 { + return true + } + switch callOpts.retryPolicy { case repeatable: return isSafeRetryImmutableRPC(err) case nonRepeatable: return isSafeRetryMutableRPC(err) default: - lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String())) + c.lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String())) return false } }