mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: clarify retry function names, do not retry on dial error
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
dd520cebd4
commit
08da08bb19
@ -47,46 +47,52 @@ type rpcFunc func(ctx context.Context) error
|
||||
type retryRPCFunc func(context.Context, rpcFunc, retryPolicy) error
|
||||
type retryStopErrFunc func(error) bool
|
||||
|
||||
// isSafeRetryImmutableRPC returns "true" when an immutable request is safe for retry.
|
||||
//
|
||||
// immutable requests (e.g. Get) should be retried unless it's
|
||||
// an obvious server-side error (e.g. rpctypes.ErrRequestTooLarge).
|
||||
//
|
||||
// "isRepeatableStopError" returns "true" when an immutable request
|
||||
// is interrupted by server-side or gRPC-side error and its status
|
||||
// code is not transient (!= codes.Unavailable).
|
||||
//
|
||||
// Returning "true" means retry should stop, since client cannot
|
||||
// Returning "false" means retry should stop, since client cannot
|
||||
// handle itself even with retries.
|
||||
func isRepeatableStopError(err error) bool {
|
||||
func isSafeRetryImmutableRPC(err error) bool {
|
||||
eErr := rpctypes.Error(err)
|
||||
// always stop retry on etcd errors
|
||||
if serverErr, ok := eErr.(rpctypes.EtcdError); ok && serverErr.Code() != codes.Unavailable {
|
||||
return true
|
||||
// interrupted by non-transient server-side or gRPC-side error
|
||||
// client cannot handle itself (e.g. rpctypes.ErrCompacted)
|
||||
return false
|
||||
}
|
||||
// only retry if unavailable
|
||||
ev, ok := status.FromError(err)
|
||||
if !ok {
|
||||
// all errors from RPC is typed "grpc/status.(*statusError)"
|
||||
// (ref. https://github.com/grpc/grpc-go/pull/1782)
|
||||
//
|
||||
// if the error type is not "grpc/status.(*statusError)",
|
||||
// it could be from "Dial"
|
||||
// TODO: do not retry for now
|
||||
// ref. https://github.com/grpc/grpc-go/issues/1581
|
||||
return false
|
||||
}
|
||||
return ev.Code() != codes.Unavailable
|
||||
return ev.Code() == codes.Unavailable
|
||||
}
|
||||
|
||||
// isSafeRetryMutableRPC returns "true" when a mutable request is safe for retry.
|
||||
//
|
||||
// mutable requests (e.g. Put, Delete, Txn) should only be retried
|
||||
// when the status code is codes.Unavailable when initial connection
|
||||
// has not been established (no pinned endpoint).
|
||||
// has not been established (no endpoint is up).
|
||||
//
|
||||
// "isNonRepeatableStopError" returns "true" when a mutable request
|
||||
// is interrupted by non-transient error that client cannot handle itself,
|
||||
// or transient error while the connection has already been established
|
||||
// (pinned endpoint exists).
|
||||
//
|
||||
// Returning "true" means retry should stop, otherwise it violates
|
||||
// Returning "false" means retry should stop, otherwise it violates
|
||||
// write-at-most-once semantics.
|
||||
func isNonRepeatableStopError(err error) bool {
|
||||
func isSafeRetryMutableRPC(err error) bool {
|
||||
if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable {
|
||||
return true
|
||||
// not safe for mutable RPCs
|
||||
// e.g. interrupted by non-transient error that client cannot handle itself,
|
||||
// or transient error while the connection has already been established
|
||||
return false
|
||||
}
|
||||
desc := rpctypes.ErrorDesc(err)
|
||||
return desc != "there is no address available" && desc != "there is no connection available"
|
||||
return desc == "there is no address available" || desc == "there is no connection available"
|
||||
}
|
||||
|
||||
type retryKVClient struct {
|
||||
|
@ -70,7 +70,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !isRetriable(lastErr, callOpts) {
|
||||
if !isSafeRetry(lastErr, callOpts) {
|
||||
return lastErr
|
||||
}
|
||||
}
|
||||
@ -221,7 +221,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
|
||||
return true, err
|
||||
|
||||
}
|
||||
return isRetriable(err, s.callOpts), err
|
||||
return isSafeRetry(err, s.callOpts), err
|
||||
|
||||
}
|
||||
|
||||
@ -261,15 +261,16 @@ func waitRetryBackoff(attempt uint, ctx context.Context, callOpts *options) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func isRetriable(err error, callOpts *options) bool {
|
||||
// isSafeRetry returns "true", if request is safe for retry with the given error.
|
||||
func isSafeRetry(err error, callOpts *options) bool {
|
||||
if isContextError(err) {
|
||||
return false
|
||||
}
|
||||
switch callOpts.retryPolicy {
|
||||
case repeatable:
|
||||
return !isRepeatableStopError(err)
|
||||
return isSafeRetryImmutableRPC(err)
|
||||
case nonRepeatable:
|
||||
return !isNonRepeatableStopError(err)
|
||||
return isSafeRetryMutableRPC(err)
|
||||
default:
|
||||
logger.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
|
||||
return false
|
||||
|
Loading…
x
Reference in New Issue
Block a user