[3.4] backport: add backoff to client config

Signed-off-by: Allen Ray <alray@redhat.com>
This commit is contained in:
Allen Ray 2024-01-17 11:45:36 -05:00
parent ff2304879e
commit 8e62f4af61
3 changed files with 77 additions and 2 deletions

View File

@ -212,15 +212,30 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
opts = append(opts, grpc.WithInsecure())
}
unaryMaxRetries := defaultUnaryMaxRetries
if c.cfg.MaxUnaryRetries > 0 {
unaryMaxRetries = c.cfg.MaxUnaryRetries
}
backoffWaitBetween := defaultBackoffWaitBetween
if c.cfg.BackoffWaitBetween > 0 {
backoffWaitBetween = c.cfg.BackoffWaitBetween
}
backoffJitterFraction := defaultBackoffJitterFraction
if c.cfg.BackoffJitterFraction > 0 {
backoffJitterFraction = c.cfg.BackoffJitterFraction
}
// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with RetryPolicy:
// https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(backoffWaitBetween, backoffJitterFraction))
opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
// Streams that are safe to retry are enabled individually.
grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(unaryMaxRetries), rrBackoff)),
)
return opts, nil

View File

@ -142,6 +142,57 @@ func TestDialNoTimeout(t *testing.T) {
c.Close()
}
func TestMaxUnaryRetries(t *testing.T) {
maxUnaryRetries := uint(10)
cfg := Config{
Endpoints: []string{"127.0.0.1:12345"},
MaxUnaryRetries: maxUnaryRetries,
}
c, err := New(cfg)
if c == nil || err != nil {
t.Fatalf("new client with MaxUnaryRetries should succeed, got %v", err)
}
defer c.Close()
if c.cfg.MaxUnaryRetries != maxUnaryRetries {
t.Fatalf("client MaxUnaryRetries should be %d, got %d", maxUnaryRetries, c.cfg.MaxUnaryRetries)
}
}
func TestBackoff(t *testing.T) {
backoffWaitBetween := 100 * time.Millisecond
cfg := Config{
Endpoints: []string{"127.0.0.1:12345"},
BackoffWaitBetween: backoffWaitBetween,
}
c, err := New(cfg)
if c == nil || err != nil {
t.Fatalf("new client with BackoffWaitBetween should succeed, got %v", err)
}
defer c.Close()
if c.cfg.BackoffWaitBetween != backoffWaitBetween {
t.Fatalf("client BackoffWaitBetween should be %v, got %v", backoffWaitBetween, c.cfg.BackoffWaitBetween)
}
}
func TestBackoffJitterFraction(t *testing.T) {
backoffJitterFraction := float64(0.9)
cfg := Config{
Endpoints: []string{"127.0.0.1:12345"},
BackoffJitterFraction: backoffJitterFraction,
}
c, err := New(cfg)
if c == nil || err != nil {
t.Fatalf("new client with BackoffJitterFraction should succeed, got %v", err)
}
defer c.Close()
if c.cfg.BackoffJitterFraction != backoffJitterFraction {
t.Fatalf("client BackoffJitterFraction should be %v, got %v", backoffJitterFraction, c.cfg.BackoffJitterFraction)
}
}
func TestIsHaltErr(t *testing.T) {
if !isHaltErr(nil, fmt.Errorf("etcdserver: some etcdserver error")) {
t.Errorf(`error prefixed with "etcdserver: " should be Halted by default`)

View File

@ -84,5 +84,14 @@ type Config struct {
// PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs).
PermitWithoutStream bool `json:"permit-without-stream"`
// MaxUnaryRetries is the maximum number of retries for unary RPCs.
MaxUnaryRetries uint `json:"max-unary-retries"`
// BackoffWaitBetween is the wait time before retrying an RPC.
BackoffWaitBetween time.Duration `json:"backoff-wait-between"`
// BackoffJitterFraction is the jitter fraction to randomize backoff wait time.
BackoffJitterFraction float64 `json:"backoff-jitter-fraction"`
// TODO: support custom balancer picker
}