diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 2a55ca83a..0fef9c549 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -15,19 +15,19 @@ package clientv3 import ( - "errors" "net/url" "strings" "sync" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) // ErrNoAddrAvilable is returned by Get() when the balancer does not have // any active connection to endpoints at the time. // This error is returned only when opts.BlockingWait is true. -var ErrNoAddrAvilable = errors.New("there is no address available") +var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available") // simpleBalancer does the bare minimum to expose multiple eps // to the grpc reconnection code path @@ -173,9 +173,14 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) // an address it has notified via Notify immediately instead of blocking. if !opts.BlockingWait { b.mu.RLock() + closed := b.closed addr = b.pinAddr upEps := len(b.upEps) b.mu.RUnlock() + if closed { + return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing + } + if upEps == 0 { return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable } diff --git a/clientv3/kv.go b/clientv3/kv.go index 4a1434d9f..c8350f926 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -125,6 +125,7 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { if err == nil { return resp, nil } + if isHaltErr(ctx, err) { return resp, toErr(ctx, err) } diff --git a/clientv3/retry.go b/clientv3/retry.go index 1084c63da..cad8dbfc2 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -23,29 +23,32 @@ import ( ) type rpcFunc func(ctx context.Context) error -type retryRpcFunc func(context.Context, rpcFunc) +type retryRpcFunc func(context.Context, rpcFunc) error func (c *Client) newRetryWrapper() retryRpcFunc { - return func(rpcCtx context.Context, f rpcFunc) { + return func(rpcCtx context.Context, f rpcFunc) error { for { err := f(rpcCtx) if err == nil { - return + return nil } + // only retry if unavailable if grpc.Code(err) != codes.Unavailable { - return + return err } // always stop retry on etcd errors eErr := rpctypes.Error(err) if _, ok := eErr.(rpctypes.EtcdError); ok { - return + return err } + select { case <-c.balancer.ConnectNotify(): case <-rpcCtx.Done(): + return rpcCtx.Err() case <-c.ctx.Done(): - return + return c.ctx.Err() } } } @@ -62,7 +65,7 @@ func RetryKVClient(c *Client) pb.KVClient { } func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { - rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.retryf(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Put(rctx, in, opts...) return err }) @@ -70,7 +73,7 @@ func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...gr } func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) { - rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.retryf(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...) return err }) @@ -78,7 +81,7 @@ func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeReq } func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { - rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.retryf(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Txn(rctx, in, opts...) return err }) @@ -86,7 +89,7 @@ func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...gr } func (rkv *retryKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) { - rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.retryf(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Compact(rctx, in, opts...) return err }) @@ -104,7 +107,7 @@ func RetryLeaseClient(c *Client) pb.LeaseClient { } func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { - rlc.retryf(ctx, func(rctx context.Context) error { + err = rlc.retryf(ctx, func(rctx context.Context) error { resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...) return err }) @@ -113,7 +116,7 @@ func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRe } func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) { - rlc.retryf(ctx, func(rctx context.Context) error { + err = rlc.retryf(ctx, func(rctx context.Context) error { resp, err = rlc.LeaseClient.LeaseRevoke(rctx, in, opts...) return err }) @@ -131,7 +134,7 @@ func RetryClusterClient(c *Client) pb.ClusterClient { } func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { - rcc.retryf(ctx, func(rctx context.Context) error { + err = rcc.retryf(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberAdd(rctx, in, opts...) return err }) @@ -139,7 +142,7 @@ func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRe } func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) { - rcc.retryf(ctx, func(rctx context.Context) error { + err = rcc.retryf(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberRemove(rctx, in, opts...) return err }) @@ -147,7 +150,7 @@ func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRe } func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) { - rcc.retryf(ctx, func(rctx context.Context) error { + err = rcc.retryf(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberUpdate(rctx, in, opts...) return err }) @@ -165,7 +168,7 @@ func RetryAuthClient(c *Client) pb.AuthClient { } func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.AuthEnable(rctx, in, opts...) return err }) @@ -173,7 +176,7 @@ func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableReq } func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.AuthDisable(rctx, in, opts...) return err }) @@ -181,7 +184,7 @@ func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableR } func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserAdd(rctx, in, opts...) return err }) @@ -189,7 +192,7 @@ func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddReque } func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserDelete(rctx, in, opts...) return err }) @@ -197,7 +200,7 @@ func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDelet } func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserChangePassword(rctx, in, opts...) return err }) @@ -205,7 +208,7 @@ func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthU } func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserGrantRole(rctx, in, opts...) return err }) @@ -213,7 +216,7 @@ func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGr } func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserRevokeRole(rctx, in, opts...) return err }) @@ -221,7 +224,7 @@ func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserR } func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleAdd(rctx, in, opts...) return err }) @@ -229,7 +232,7 @@ func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddReque } func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleDelete(rctx, in, opts...) return err }) @@ -237,7 +240,7 @@ func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDelet } func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleGrantPermission(rctx, in, opts...) return err }) @@ -245,7 +248,7 @@ func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.Auth } func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) { - rac.retryf(ctx, func(rctx context.Context) error { + err = rac.retryf(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleRevokePermission(rctx, in, opts...) return err })