From 4669aaa9a2f1bb66918234e6c5171991aa9268a7 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sun, 30 Jul 2017 21:35:38 -0700 Subject: [PATCH] clientv3: only retry mutable KV RPCs if no endpoints found Was retrying when it shouldn't, causing multiple puts --- clientv3/client.go | 11 +++------ clientv3/retry.go | 59 ++++++++++++++++++++++++++++------------------ 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index 9dbc34b1e..1f8c83f57 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -52,11 +52,9 @@ type Client struct { conn *grpc.ClientConn dialerrc chan error - cfg Config - creds *credentials.TransportCredentials - balancer *simpleBalancer - retryWrapper retryRpcFunc - retryAuthWrapper retryRpcFunc + cfg Config + creds *credentials.TransportCredentials + balancer *simpleBalancer ctx context.Context cancel context.CancelFunc @@ -387,8 +385,6 @@ func newClient(cfg *Config) (*Client, error) { return nil, err } client.conn = conn - client.retryWrapper = client.newRetryWrapper() - client.retryAuthWrapper = client.newAuthRetryWrapper() // wait for a connection if cfg.DialTimeout > 0 { @@ -510,7 +506,6 @@ func toErr(ctx context.Context, err error) error { err = ctx.Err() } case codes.Unavailable: - err = ErrNoAvailableEndpoints case codes.FailedPrecondition: err = grpc.ErrClientConnClosing } diff --git a/clientv3/retry.go b/clientv3/retry.go index 78f31a8c4..b4b8b3d38 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -24,26 +24,29 @@ import ( type rpcFunc func(ctx context.Context) error type retryRpcFunc func(context.Context, rpcFunc) error +type retryStopErrFunc func(error) bool -func (c *Client) newRetryWrapper() retryRpcFunc { +func isReadStopError(err error) bool { + eErr := rpctypes.Error(err) + // always stop retry on etcd errors + if _, ok := eErr.(rpctypes.EtcdError); ok { + return true + } + // only retry if unavailable + return grpc.Code(err) != codes.Unavailable +} + +func isWriteStopError(err error) bool { + return grpc.Code(err) != codes.Unavailable || + grpc.ErrorDesc(err) != "there is no address available" +} + +func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { return func(rpcCtx context.Context, f rpcFunc) error { for { - err := f(rpcCtx) - if err == nil { - return nil - } - - eErr := rpctypes.Error(err) - // always stop retry on etcd errors - if _, ok := eErr.(rpctypes.EtcdError); ok { + if err := f(rpcCtx); err == nil || isStop(err) { return err } - - // only retry if unavailable - if grpc.Code(err) != codes.Unavailable { - return err - } - select { case <-c.balancer.ConnectNotify(): case <-rpcCtx.Done(): @@ -79,17 +82,24 @@ func (c *Client) newAuthRetryWrapper() retryRpcFunc { // RetryKVClient implements a KVClient that uses the client's FailFast retry policy. func RetryKVClient(c *Client) pb.KVClient { - retryWrite := &retryWriteKVClient{pb.NewKVClient(c.conn), c.retryWrapper} - return &retryKVClient{&retryWriteKVClient{retryWrite, c.retryAuthWrapper}} + readRetry := c.newRetryWrapper(isReadStopError) + writeRetry := c.newRetryWrapper(isWriteStopError) + conn := pb.NewKVClient(c.conn) + retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry} + retryAuthWrapper := c.newAuthRetryWrapper() + return &retryKVClient{ + &retryWriteKVClient{retryBasic, retryAuthWrapper}, + retryAuthWrapper} } type retryKVClient struct { *retryWriteKVClient + readRetry retryRpcFunc } func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) { - err = rkv.retryf(ctx, func(rctx context.Context) error { - resp, err = rkv.retryWriteKVClient.Range(rctx, in, opts...) + err = rkv.readRetry(ctx, func(rctx context.Context) error { + resp, err = rkv.KVClient.Range(rctx, in, opts...) return err }) return resp, err @@ -139,8 +149,11 @@ type retryLeaseClient struct { // RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy. func RetryLeaseClient(c *Client) pb.LeaseClient { - retry := &retryLeaseClient{pb.NewLeaseClient(c.conn), c.retryWrapper} - return &retryLeaseClient{retry, c.retryAuthWrapper} + retry := &retryLeaseClient{ + pb.NewLeaseClient(c.conn), + c.newRetryWrapper(isReadStopError), + } + return &retryLeaseClient{retry, c.newAuthRetryWrapper()} } func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { @@ -167,7 +180,7 @@ type retryClusterClient struct { // RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy. func RetryClusterClient(c *Client) pb.ClusterClient { - return &retryClusterClient{pb.NewClusterClient(c.conn), c.retryWrapper} + return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(isWriteStopError)} } func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { @@ -201,7 +214,7 @@ type retryAuthClient struct { // RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy. func RetryAuthClient(c *Client) pb.AuthClient { - return &retryAuthClient{pb.NewAuthClient(c.conn), c.retryWrapper} + return &retryAuthClient{pb.NewAuthClient(c.conn), c.newRetryWrapper(isWriteStopError)} } func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {