From d8f0ef0e800d86e6343b073c406b7fe0e2cb3940 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 14 Jul 2016 17:47:30 -0700 Subject: [PATCH] clientv3: use grpc.FailFast(false) for all calls --- clientv3/auth.go | 32 ++++++++++++++++---------------- clientv3/cluster.go | 9 +++++---- clientv3/kv.go | 9 +++++---- clientv3/lease.go | 9 +++++---- clientv3/maintenance.go | 11 ++++++----- clientv3/txn.go | 3 ++- clientv3/watch.go | 3 ++- 7 files changed, 41 insertions(+), 35 deletions(-) diff --git a/clientv3/auth.go b/clientv3/auth.go index 5928f2267..183904243 100644 --- a/clientv3/auth.go +++ b/clientv3/auth.go @@ -115,52 +115,52 @@ func NewAuth(c *Client) Auth { } func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { - resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) + resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false)) return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { - resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) + resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false)) return (*AuthDisableResponse)(resp), toErr(ctx, err) } func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { - resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}) + resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, grpc.FailFast(false)) return (*AuthUserAddResponse)(resp), toErr(ctx, err) } func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { - resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}) + resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, grpc.FailFast(false)) return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) } func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { - resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}) + resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, grpc.FailFast(false)) return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { - resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}) + resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, grpc.FailFast(false)) return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) { - resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}) + resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, grpc.FailFast(false)) return (*AuthUserGetResponse)(resp), toErr(ctx, err) } func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) { - resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}) + resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, grpc.FailFast(false)) return (*AuthUserListResponse)(resp), toErr(ctx, err) } func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { - resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}) + resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, grpc.FailFast(false)) return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { - resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}) + resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, grpc.FailFast(false)) return (*AuthRoleAddResponse)(resp), toErr(ctx, err) } @@ -170,27 +170,27 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran RangeEnd: []byte(rangeEnd), PermType: authpb.Permission_Type(permType), } - resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}) + resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, grpc.FailFast(false)) return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) { - resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}) + resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, grpc.FailFast(false)) return (*AuthRoleGetResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { - resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}) + resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, grpc.FailFast(false)) return (*AuthRoleListResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) { - resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}) + resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, grpc.FailFast(false)) return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { - resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}) + resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, grpc.FailFast(false)) return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) } @@ -208,7 +208,7 @@ type authenticator struct { } func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { - resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}) + resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, grpc.FailFast(false)) return (*AuthenticateResponse)(resp), toErr(ctx, err) } diff --git a/clientv3/cluster.go b/clientv3/cluster.go index b981e0310..ee0244e4e 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -17,6 +17,7 @@ package clientv3 import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) type ( @@ -51,7 +52,7 @@ func NewCluster(c *Client) Cluster { func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { r := &pb.MemberAddRequest{PeerURLs: peerAddrs} - resp, err := c.remote.MemberAdd(ctx, r) + resp, err := c.remote.MemberAdd(ctx, r, grpc.FailFast(false)) if err == nil { return (*MemberAddResponse)(resp), nil } @@ -63,7 +64,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { r := &pb.MemberRemoveRequest{ID: id} - resp, err := c.remote.MemberRemove(ctx, r) + resp, err := c.remote.MemberRemove(ctx, r, grpc.FailFast(false)) if err == nil { return (*MemberRemoveResponse)(resp), nil } @@ -77,7 +78,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin // it is safe to retry on update. for { r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} - resp, err := c.remote.MemberUpdate(ctx, r) + resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false)) if err == nil { return (*MemberUpdateResponse)(resp), nil } @@ -90,7 +91,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { // it is safe to retry on list. for { - resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}) + resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, grpc.FailFast(false)) if err == nil { return (*MemberListResponse)(resp), nil } diff --git a/clientv3/kv.go b/clientv3/kv.go index 2a97c865d..e279e7dc7 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -17,6 +17,7 @@ package clientv3 import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) type ( @@ -100,7 +101,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete } func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) { - resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest()) + resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), grpc.FailFast(false)) if err != nil { return nil, toErr(ctx, err) } @@ -150,21 +151,21 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target) } - resp, err = kv.remote.Range(ctx, r) + resp, err = kv.remote.Range(ctx, r, grpc.FailFast(false)) if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV} - resp, err = kv.remote.Put(ctx, r) + resp, err = kv.remote.Put(ctx, r, grpc.FailFast(false)) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} - resp, err = kv.remote.DeleteRange(ctx, r) + resp, err = kv.remote.DeleteRange(ctx, r, grpc.FailFast(false)) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } diff --git a/clientv3/lease.go b/clientv3/lease.go index 839ff9d3d..5bbdefe4d 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) type ( @@ -129,7 +130,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err for { r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(cctx, r) + resp, err := l.remote.LeaseGrant(cctx, r, grpc.FailFast(false)) if err == nil { gresp := &LeaseGrantResponse{ ResponseHeader: resp.GetHeader(), @@ -155,7 +156,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, for { r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(cctx, r) + resp, err := l.remote.LeaseRevoke(cctx, r, grpc.FailFast(false)) if err == nil { return (*LeaseRevokeResponse)(resp), nil @@ -261,7 +262,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive cctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := l.remote.LeaseKeepAlive(cctx) + stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false)) if err != nil { return nil, toErr(ctx, err) } @@ -418,7 +419,7 @@ func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient { func (l *lessor) newStream() error { sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx) + stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) if err != nil { cancel() return toErr(sctx, err) diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index 1c9647ec1..718356250 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -19,6 +19,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) type ( @@ -67,7 +68,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { Alarm: pb.AlarmType_NONE, // all } for { - resp, err := m.remote.Alarm(ctx, req) + resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false)) if err == nil { return (*AlarmResponse)(resp), nil } @@ -100,7 +101,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR return &ret, nil } - resp, err := m.remote.Alarm(ctx, req) + resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false)) if err == nil { return (*AlarmResponse)(resp), nil } @@ -114,7 +115,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm } defer conn.Close() remote := pb.NewMaintenanceClient(conn) - resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}) + resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, grpc.FailFast(false)) if err != nil { return nil, toErr(ctx, err) } @@ -128,7 +129,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo } defer conn.Close() remote := pb.NewMaintenanceClient(conn) - resp, err := remote.Status(ctx, &pb.StatusRequest{}) + resp, err := remote.Status(ctx, &pb.StatusRequest{}, grpc.FailFast(false)) if err != nil { return nil, toErr(ctx, err) } @@ -136,7 +137,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo } func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { - ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}) + ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, grpc.FailFast(false)) if err != nil { return nil, toErr(ctx, err) } diff --git a/clientv3/txn.go b/clientv3/txn.go index a451e33ac..a76bd174c 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -19,6 +19,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) // Txn is the interface that wraps mini-transactions. @@ -152,7 +153,7 @@ func (txn *txn) Commit() (*TxnResponse, error) { func (txn *txn) commit() (*TxnResponse, error) { r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} - resp, err := txn.kv.remote.Txn(txn.ctx, r) + resp, err := txn.kv.remote.Txn(txn.ctx, r, grpc.FailFast(false)) if err != nil { return nil, err } diff --git a/clientv3/watch.go b/clientv3/watch.go index 7a612a4a1..48a33ef0c 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -23,6 +23,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" mvccpb "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) const ( @@ -626,7 +627,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) return nil, err default: } - if ws, err = w.remote.Watch(w.ctx); ws != nil && err == nil { + if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { break } if isHaltErr(w.ctx, err) {