Merge pull request #17692 from sheyt0/12985-to-34

[3.4] backport for fix retry requests when receiving ErrGPRCNotSupportedForLearner
This commit is contained in:
Benjamin Wang 2024-04-04 08:32:53 +01:00 committed by GitHub
commit 7090967a79
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -19,6 +19,7 @@ package clientv3
import ( import (
"context" "context"
"errors"
"io" "io"
"sync" "sync"
"time" "time"
@ -85,7 +86,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
} }
continue continue
} }
if !isSafeRetry(c.lg, lastErr, callOpts) { if !isSafeRetry(c, lastErr, callOpts) {
return lastErr return lastErr
} }
} }
@ -279,7 +280,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
return true, err return true, err
} }
return isSafeRetry(s.client.lg, err, s.callOpts), err return isSafeRetry(s.client, err, s.callOpts), err
} }
func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) { func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) {
@ -319,17 +320,28 @@ func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) erro
} }
// isSafeRetry returns "true", if request is safe for retry with the given error. // isSafeRetry returns "true", if request is safe for retry with the given error.
func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool { func isSafeRetry(c *Client, err error, callOpts *options) bool {
if isContextError(err) { if isContextError(err) {
return false return false
} }
// Situation when learner refuses RPC it is supposed to not serve is from the server
// perspective not retryable.
// But for backward-compatibility reasons we need to support situation that
// customer provides mix of learners (not yet voters) and voters with an
// expectation to pick voter in the next attempt.
// TODO: Ideally client should be 'aware' which endpoint represents: leader/voter/learner with high probability.
if errors.Is(err, rpctypes.ErrGPRCNotSupportedForLearner) && len(c.Endpoints()) > 1 {
return true
}
switch callOpts.retryPolicy { switch callOpts.retryPolicy {
case repeatable: case repeatable:
return isSafeRetryImmutableRPC(err) return isSafeRetryImmutableRPC(err)
case nonRepeatable: case nonRepeatable:
return isSafeRetryMutableRPC(err) return isSafeRetryMutableRPC(err)
default: default:
lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String())) c.lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
return false return false
} }
} }