diff --git a/clientv3/auth.go b/clientv3/auth.go index dddbcb4f6..a64b8caca 100644 --- a/clientv3/auth.go +++ b/clientv3/auth.go @@ -101,60 +101,65 @@ type Auth interface { } type auth struct { - remote pb.AuthClient + remote pb.AuthClient + callOpts []grpc.CallOption } func NewAuth(c *Client) Auth { - return &auth{remote: RetryAuthClient(c)} + api := &auth{remote: RetryAuthClient(c)} + if c != nil { + api.callOpts = c.callOpts + } + return api } func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { - resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) + resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...) return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { - resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) + resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...) return (*AuthDisableResponse)(resp), toErr(ctx, err) } func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { - resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}) + resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...) return (*AuthUserAddResponse)(resp), toErr(ctx, err) } func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { - resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}) + resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...) return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) } func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { - resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}) + resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...) return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { - resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}) + resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...) return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) { - resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}) + resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...) return (*AuthUserGetResponse)(resp), toErr(ctx, err) } func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) { - resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}) + resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...) return (*AuthUserListResponse)(resp), toErr(ctx, err) } func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { - resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}) + resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...) return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { - resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}) + resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...) return (*AuthRoleAddResponse)(resp), toErr(ctx, err) } @@ -164,27 +169,27 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran RangeEnd: []byte(rangeEnd), PermType: authpb.Permission_Type(permType), } - resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}) + resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...) return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) { - resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}) + resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...) return (*AuthRoleGetResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { - resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}) + resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...) return (*AuthRoleListResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) { - resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}) + resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...) return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { - resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}) + resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...) return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) } @@ -197,12 +202,13 @@ func StrToPermissionType(s string) (PermissionType, error) { } type authenticator struct { - conn *grpc.ClientConn // conn in-use - remote pb.AuthClient + conn *grpc.ClientConn // conn in-use + remote pb.AuthClient + callOpts []grpc.CallOption } func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { - resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}) + resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...) return (*AuthenticateResponse)(resp), toErr(ctx, err) } @@ -210,14 +216,18 @@ func (auth *authenticator) close() { auth.conn.Close() } -func newAuthenticator(endpoint string, opts []grpc.DialOption) (*authenticator, error) { +func newAuthenticator(endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) { conn, err := grpc.Dial(endpoint, opts...) if err != nil { return nil, err } - return &authenticator{ + api := &authenticator{ conn: conn, remote: pb.NewAuthClient(conn), - }, nil + } + if c != nil { + api.callOpts = c.callOpts + } + return api, nil } diff --git a/clientv3/client.go b/clientv3/client.go index 507983a6a..2bdd92877 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -297,7 +297,7 @@ func (c *Client) getToken(ctx context.Context) error { endpoint := c.cfg.Endpoints[i] host := getHost(endpoint) // use dial options without dopts to avoid reusing the client balancer - auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint)) + auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint), c) if err != nil { continue } diff --git a/clientv3/cluster.go b/clientv3/cluster.go index 2df9f2951..545d676e7 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -18,6 +18,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) type ( @@ -43,20 +44,29 @@ type Cluster interface { } type cluster struct { - remote pb.ClusterClient + remote pb.ClusterClient + callOpts []grpc.CallOption } func NewCluster(c *Client) Cluster { - return &cluster{remote: RetryClusterClient(c)} + api := &cluster{remote: RetryClusterClient(c)} + if c != nil { + api.callOpts = c.callOpts + } + return api } -func NewClusterFromClusterClient(remote pb.ClusterClient) Cluster { - return &cluster{remote: remote} +func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster { + api := &cluster{remote: remote} + if c != nil { + api.callOpts = c.callOpts + } + return api } func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { r := &pb.MemberAddRequest{PeerURLs: peerAddrs} - resp, err := c.remote.MemberAdd(ctx, r) + resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -65,7 +75,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { r := &pb.MemberRemoveRequest{ID: id} - resp, err := c.remote.MemberRemove(ctx, r) + resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -75,7 +85,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) { // it is safe to retry on update. r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} - resp, err := c.remote.MemberUpdate(ctx, r) + resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...) if err == nil { return (*MemberUpdateResponse)(resp), nil } @@ -84,7 +94,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { // it is safe to retry on list. - resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}) + resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, c.callOpts...) if err == nil { return (*MemberListResponse)(resp), nil } diff --git a/clientv3/lease.go b/clientv3/lease.go index d90531bf2..e74e1d6b5 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -22,6 +22,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -139,6 +140,8 @@ type lessor struct { // firstKeepAliveOnce ensures stream starts after first KeepAlive call. firstKeepAliveOnce sync.Once + + callOpts []grpc.CallOption } // keepAlive multiplexes a keepalive for a lease over multiple channels @@ -154,10 +157,10 @@ type keepAlive struct { } func NewLease(c *Client) Lease { - return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout+time.Second) + return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second) } -func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease { +func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease { l := &lessor{ donec: make(chan struct{}), keepAlives: make(map[LeaseID]*keepAlive), @@ -167,6 +170,9 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati if l.firstKeepAliveTimeout == time.Second { l.firstKeepAliveTimeout = defaultTTL } + if c != nil { + l.callOpts = c.callOpts + } reqLeaderCtx := WithRequireLeader(context.Background()) l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx) return l @@ -174,7 +180,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(ctx, r) + resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...) if err == nil { gresp := &LeaseGrantResponse{ ResponseHeader: resp.GetHeader(), @@ -189,7 +195,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(ctx, r) + resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...) if err == nil { return (*LeaseRevokeResponse)(resp), nil } @@ -198,7 +204,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { r := toLeaseTimeToLiveRequest(id, opts...) - resp, err := l.remote.LeaseTimeToLive(ctx, r) + resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...) if err == nil { gresp := &LeaseTimeToLiveResponse{ ResponseHeader: resp.GetHeader(), @@ -350,7 +356,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive cctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := l.remote.LeaseKeepAlive(cctx) + stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -421,7 +427,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { // resetRecv opens a new lease stream and starts sending keep alive requests. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx) + stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...) if err != nil { cancel() return nil, err diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index ca2f445b8..67b928fcf 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -20,6 +20,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) type ( @@ -53,12 +54,13 @@ type Maintenance interface { } type maintenance struct { - dial func(endpoint string) (pb.MaintenanceClient, func(), error) - remote pb.MaintenanceClient + dial func(endpoint string) (pb.MaintenanceClient, func(), error) + remote pb.MaintenanceClient + callOpts []grpc.CallOption } func NewMaintenance(c *Client) Maintenance { - return &maintenance{ + api := &maintenance{ dial: func(endpoint string) (pb.MaintenanceClient, func(), error) { conn, err := c.dial(endpoint) if err != nil { @@ -69,15 +71,23 @@ func NewMaintenance(c *Client) Maintenance { }, remote: RetryMaintenanceClient(c, c.conn), } + if c != nil { + api.callOpts = c.callOpts + } + return api } -func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance { - return &maintenance{ +func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance { + api := &maintenance{ dial: func(string) (pb.MaintenanceClient, func(), error) { return remote, func() {}, nil }, remote: remote, } + if c != nil { + api.callOpts = c.callOpts + } + return api } func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { @@ -86,7 +96,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { MemberID: 0, // all Alarm: pb.AlarmType_NONE, // all } - resp, err := m.remote.Alarm(ctx, req) + resp, err := m.remote.Alarm(ctx, req, m.callOpts...) if err == nil { return (*AlarmResponse)(resp), nil } @@ -116,7 +126,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR return &ret, nil } - resp, err := m.remote.Alarm(ctx, req) + resp, err := m.remote.Alarm(ctx, req, m.callOpts...) if err == nil { return (*AlarmResponse)(resp), nil } @@ -129,7 +139,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}) + resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -142,7 +152,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.Status(ctx, &pb.StatusRequest{}) + resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -150,7 +160,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo } func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { - ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}) + ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...) if err != nil { return nil, toErr(ctx, err) } diff --git a/clientv3/watch.go b/clientv3/watch.go index 12977aed8..16a91fdff 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -106,7 +106,8 @@ func (wr *WatchResponse) IsProgressNotify() bool { // watcher implements the Watcher interface type watcher struct { - remote pb.WatchClient + remote pb.WatchClient + callOpts []grpc.CallOption // mu protects the grpc streams map mu sync.RWMutex @@ -117,8 +118,9 @@ type watcher struct { // watchGrpcStream tracks all watch resources attached to a single grpc stream. type watchGrpcStream struct { - owner *watcher - remote pb.WatchClient + owner *watcher + remote pb.WatchClient + callOpts []grpc.CallOption // ctx controls internal remote.Watch requests ctx context.Context @@ -189,14 +191,18 @@ type watcherStream struct { } func NewWatcher(c *Client) Watcher { - return NewWatchFromWatchClient(pb.NewWatchClient(c.conn)) + return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c) } -func NewWatchFromWatchClient(wc pb.WatchClient) Watcher { - return &watcher{ +func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { + w := &watcher{ remote: wc, streams: make(map[string]*watchGrpcStream), } + if c != nil { + w.callOpts = c.callOpts + } + return w } // never closes @@ -215,6 +221,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { wgs := &watchGrpcStream{ owner: w, remote: w.remote, + callOpts: w.callOpts, ctx: ctx, ctxKey: streamKeyFromCtx(inctx), cancel: cancel, @@ -775,7 +782,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) return nil, err default: } - if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { + if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil { break } if isHaltErr(w.ctx, err) { diff --git a/etcdserver/api/v3client/v3client.go b/etcdserver/api/v3client/v3client.go index 445d0408f..c0c07c8d7 100644 --- a/etcdserver/api/v3client/v3client.go +++ b/etcdserver/api/v3client/v3client.go @@ -35,16 +35,16 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client { c.KV = clientv3.NewKVFromKVClient(kvc, c) lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s)) - c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second) + c.Lease = clientv3.NewLeaseFromLeaseClient(lc, c, time.Second) wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s)) - c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc)} + c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)} mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s)) - c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc) + c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c) clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s)) - c.Cluster = clientv3.NewClusterFromClusterClient(clc) + c.Cluster = clientv3.NewClusterFromClusterClient(clc, c) // TODO: implement clientv3.Auth interface? diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 6d0cd40d3..15094358e 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -102,9 +102,9 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { c.KV = clientv3.NewKVFromKVClient(rpc.KV, c) pmu.Lock() lc := c.Lease - c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout) + c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout) c.Watcher = &proxyCloser{ - Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch), + Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c), wdonec: proxies[c].wdonec, kvdonec: proxies[c].kvdonec, lclose: func() { lc.Close() },