mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12985 from ptabor/20210517-fix-not-a-learner-retries
Fix not retryable error codes from: Unavailable -> FailedPrecondition
This commit is contained in:
commit
159d1916fe
@ -68,19 +68,19 @@ var (
|
|||||||
ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
|
ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
|
||||||
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
|
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
|
||||||
ErrGRPCLeaderChanged = status.New(codes.Unavailable, "etcdserver: leader changed").Err()
|
ErrGRPCLeaderChanged = status.New(codes.Unavailable, "etcdserver: leader changed").Err()
|
||||||
ErrGRPCNotCapable = status.New(codes.Unavailable, "etcdserver: not capable").Err()
|
ErrGRPCNotCapable = status.New(codes.FailedPrecondition, "etcdserver: not capable").Err()
|
||||||
ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err()
|
ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err()
|
||||||
ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
|
ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
|
||||||
ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err()
|
ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err()
|
||||||
ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
|
ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
|
||||||
ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
|
ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
|
||||||
ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
|
ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
|
||||||
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
|
ErrGPRCNotSupportedForLearner = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err()
|
||||||
ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()
|
ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()
|
||||||
|
|
||||||
ErrGRPCClusterVersionUnavailable = status.New(codes.Unavailable, "etcdserver: cluster version not found during downgrade").Err()
|
|
||||||
ErrGRPCWrongDowngradeVersionFormat = status.New(codes.InvalidArgument, "etcdserver: wrong downgrade target version format").Err()
|
ErrGRPCWrongDowngradeVersionFormat = status.New(codes.InvalidArgument, "etcdserver: wrong downgrade target version format").Err()
|
||||||
ErrGRPCInvalidDowngradeTargetVersion = status.New(codes.InvalidArgument, "etcdserver: invalid downgrade target version").Err()
|
ErrGRPCInvalidDowngradeTargetVersion = status.New(codes.InvalidArgument, "etcdserver: invalid downgrade target version").Err()
|
||||||
|
ErrGRPCClusterVersionUnavailable = status.New(codes.FailedPrecondition, "etcdserver: cluster version not found during downgrade").Err()
|
||||||
ErrGRPCDowngradeInProcess = status.New(codes.FailedPrecondition, "etcdserver: cluster has a downgrade job in progress").Err()
|
ErrGRPCDowngradeInProcess = status.New(codes.FailedPrecondition, "etcdserver: cluster has a downgrade job in progress").Err()
|
||||||
ErrGRPCNoInflightDowngrade = status.New(codes.FailedPrecondition, "etcdserver: no inflight downgrade job").Err()
|
ErrGRPCNoInflightDowngrade = status.New(codes.FailedPrecondition, "etcdserver: no inflight downgrade job").Err()
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ package clientv3
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -91,7 +92,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !isSafeRetry(c.lg, lastErr, callOpts) {
|
if !isSafeRetry(c, lastErr, callOpts) {
|
||||||
return lastErr
|
return lastErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -257,7 +258,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
|
|||||||
return true, err
|
return true, err
|
||||||
|
|
||||||
}
|
}
|
||||||
return isSafeRetry(s.client.lg, err, s.callOpts), err
|
return isSafeRetry(s.client, err, s.callOpts), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) {
|
func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) {
|
||||||
@ -297,17 +298,28 @@ func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
// isSafeRetry returns "true", if request is safe for retry with the given error.
|
// isSafeRetry returns "true", if request is safe for retry with the given error.
|
||||||
func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool {
|
func isSafeRetry(c *Client, err error, callOpts *options) bool {
|
||||||
if isContextError(err) {
|
if isContextError(err) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Situation when learner refuses RPC it is supposed to not serve is from the server
|
||||||
|
// perspective not retryable.
|
||||||
|
// But for backward-compatibility reasons we need to support situation that
|
||||||
|
// customer provides mix of learners (not yet voters) and voters with an
|
||||||
|
// expectation to pick voter in the next attempt.
|
||||||
|
// TODO: Ideally client should be 'aware' which endpoint represents: leader/voter/learner with high probability.
|
||||||
|
if errors.Is(err, rpctypes.ErrGPRCNotSupportedForLearner) && len(c.Endpoints()) > 1 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
switch callOpts.retryPolicy {
|
switch callOpts.retryPolicy {
|
||||||
case repeatable:
|
case repeatable:
|
||||||
return isSafeRetryImmutableRPC(err)
|
return isSafeRetryImmutableRPC(err)
|
||||||
case nonRepeatable:
|
case nonRepeatable:
|
||||||
return isSafeRetryMutableRPC(err)
|
return isSafeRetryMutableRPC(err)
|
||||||
default:
|
default:
|
||||||
lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
|
c.lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user