mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: call other APIs with default gRPC call options
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
c67e6d5f5e
commit
348b25f3dc
@ -101,60 +101,65 @@ type Auth interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type auth struct {
|
type auth struct {
|
||||||
remote pb.AuthClient
|
remote pb.AuthClient
|
||||||
|
callOpts []grpc.CallOption
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAuth(c *Client) Auth {
|
func NewAuth(c *Client) Auth {
|
||||||
return &auth{remote: RetryAuthClient(c)}
|
api := &auth{remote: RetryAuthClient(c)}
|
||||||
|
if c != nil {
|
||||||
|
api.callOpts = c.callOpts
|
||||||
|
}
|
||||||
|
return api
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
|
func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
|
||||||
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{})
|
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
|
||||||
return (*AuthEnableResponse)(resp), toErr(ctx, err)
|
return (*AuthEnableResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
|
func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
|
||||||
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{})
|
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
|
||||||
return (*AuthDisableResponse)(resp), toErr(ctx, err)
|
return (*AuthDisableResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
|
func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
|
||||||
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password})
|
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...)
|
||||||
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
|
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
|
func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
|
||||||
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name})
|
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
|
||||||
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
|
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
|
func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
|
||||||
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password})
|
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
|
||||||
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
|
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
|
func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
|
||||||
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role})
|
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
|
||||||
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
|
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
|
func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
|
||||||
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name})
|
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
|
||||||
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
|
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
|
func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
|
||||||
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{})
|
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
|
||||||
return (*AuthUserListResponse)(resp), toErr(ctx, err)
|
return (*AuthUserListResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
|
func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
|
||||||
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role})
|
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
|
||||||
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
|
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
|
func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
|
||||||
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name})
|
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
|
||||||
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
|
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,27 +169,27 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran
|
|||||||
RangeEnd: []byte(rangeEnd),
|
RangeEnd: []byte(rangeEnd),
|
||||||
PermType: authpb.Permission_Type(permType),
|
PermType: authpb.Permission_Type(permType),
|
||||||
}
|
}
|
||||||
resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm})
|
resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...)
|
||||||
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
|
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
|
func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
|
||||||
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role})
|
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
|
||||||
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
|
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
|
func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
|
||||||
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{})
|
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
|
||||||
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
|
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
|
func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
|
||||||
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd})
|
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...)
|
||||||
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
|
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
|
func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
|
||||||
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role})
|
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
|
||||||
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
|
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,12 +202,13 @@ func StrToPermissionType(s string) (PermissionType, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type authenticator struct {
|
type authenticator struct {
|
||||||
conn *grpc.ClientConn // conn in-use
|
conn *grpc.ClientConn // conn in-use
|
||||||
remote pb.AuthClient
|
remote pb.AuthClient
|
||||||
|
callOpts []grpc.CallOption
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
|
func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
|
||||||
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password})
|
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
|
||||||
return (*AuthenticateResponse)(resp), toErr(ctx, err)
|
return (*AuthenticateResponse)(resp), toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,14 +216,18 @@ func (auth *authenticator) close() {
|
|||||||
auth.conn.Close()
|
auth.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAuthenticator(endpoint string, opts []grpc.DialOption) (*authenticator, error) {
|
func newAuthenticator(endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
|
||||||
conn, err := grpc.Dial(endpoint, opts...)
|
conn, err := grpc.Dial(endpoint, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &authenticator{
|
api := &authenticator{
|
||||||
conn: conn,
|
conn: conn,
|
||||||
remote: pb.NewAuthClient(conn),
|
remote: pb.NewAuthClient(conn),
|
||||||
}, nil
|
}
|
||||||
|
if c != nil {
|
||||||
|
api.callOpts = c.callOpts
|
||||||
|
}
|
||||||
|
return api, nil
|
||||||
}
|
}
|
||||||
|
@ -297,7 +297,7 @@ func (c *Client) getToken(ctx context.Context) error {
|
|||||||
endpoint := c.cfg.Endpoints[i]
|
endpoint := c.cfg.Endpoints[i]
|
||||||
host := getHost(endpoint)
|
host := getHost(endpoint)
|
||||||
// use dial options without dopts to avoid reusing the client balancer
|
// use dial options without dopts to avoid reusing the client balancer
|
||||||
auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint))
|
auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint), c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
@ -43,20 +44,29 @@ type Cluster interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type cluster struct {
|
type cluster struct {
|
||||||
remote pb.ClusterClient
|
remote pb.ClusterClient
|
||||||
|
callOpts []grpc.CallOption
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCluster(c *Client) Cluster {
|
func NewCluster(c *Client) Cluster {
|
||||||
return &cluster{remote: RetryClusterClient(c)}
|
api := &cluster{remote: RetryClusterClient(c)}
|
||||||
|
if c != nil {
|
||||||
|
api.callOpts = c.callOpts
|
||||||
|
}
|
||||||
|
return api
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClusterFromClusterClient(remote pb.ClusterClient) Cluster {
|
func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
|
||||||
return &cluster{remote: remote}
|
api := &cluster{remote: remote}
|
||||||
|
if c != nil {
|
||||||
|
api.callOpts = c.callOpts
|
||||||
|
}
|
||||||
|
return api
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
||||||
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
|
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
|
||||||
resp, err := c.remote.MemberAdd(ctx, r)
|
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
@ -65,7 +75,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
|
|||||||
|
|
||||||
func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
|
func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
|
||||||
r := &pb.MemberRemoveRequest{ID: id}
|
r := &pb.MemberRemoveRequest{ID: id}
|
||||||
resp, err := c.remote.MemberRemove(ctx, r)
|
resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
@ -75,7 +85,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
|
|||||||
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
|
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
|
||||||
// it is safe to retry on update.
|
// it is safe to retry on update.
|
||||||
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
|
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
|
||||||
resp, err := c.remote.MemberUpdate(ctx, r)
|
resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return (*MemberUpdateResponse)(resp), nil
|
return (*MemberUpdateResponse)(resp), nil
|
||||||
}
|
}
|
||||||
@ -84,7 +94,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
|
|||||||
|
|
||||||
func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
|
func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
|
||||||
// it is safe to retry on list.
|
// it is safe to retry on list.
|
||||||
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{})
|
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, c.callOpts...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return (*MemberListResponse)(resp), nil
|
return (*MemberListResponse)(resp), nil
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -139,6 +140,8 @@ type lessor struct {
|
|||||||
|
|
||||||
// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
|
// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
|
||||||
firstKeepAliveOnce sync.Once
|
firstKeepAliveOnce sync.Once
|
||||||
|
|
||||||
|
callOpts []grpc.CallOption
|
||||||
}
|
}
|
||||||
|
|
||||||
// keepAlive multiplexes a keepalive for a lease over multiple channels
|
// keepAlive multiplexes a keepalive for a lease over multiple channels
|
||||||
@ -154,10 +157,10 @@ type keepAlive struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewLease(c *Client) Lease {
|
func NewLease(c *Client) Lease {
|
||||||
return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout+time.Second)
|
return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease {
|
func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
|
||||||
l := &lessor{
|
l := &lessor{
|
||||||
donec: make(chan struct{}),
|
donec: make(chan struct{}),
|
||||||
keepAlives: make(map[LeaseID]*keepAlive),
|
keepAlives: make(map[LeaseID]*keepAlive),
|
||||||
@ -167,6 +170,9 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
|
|||||||
if l.firstKeepAliveTimeout == time.Second {
|
if l.firstKeepAliveTimeout == time.Second {
|
||||||
l.firstKeepAliveTimeout = defaultTTL
|
l.firstKeepAliveTimeout = defaultTTL
|
||||||
}
|
}
|
||||||
|
if c != nil {
|
||||||
|
l.callOpts = c.callOpts
|
||||||
|
}
|
||||||
reqLeaderCtx := WithRequireLeader(context.Background())
|
reqLeaderCtx := WithRequireLeader(context.Background())
|
||||||
l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
|
l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
|
||||||
return l
|
return l
|
||||||
@ -174,7 +180,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
|
|||||||
|
|
||||||
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
|
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
|
||||||
r := &pb.LeaseGrantRequest{TTL: ttl}
|
r := &pb.LeaseGrantRequest{TTL: ttl}
|
||||||
resp, err := l.remote.LeaseGrant(ctx, r)
|
resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
gresp := &LeaseGrantResponse{
|
gresp := &LeaseGrantResponse{
|
||||||
ResponseHeader: resp.GetHeader(),
|
ResponseHeader: resp.GetHeader(),
|
||||||
@ -189,7 +195,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
|
|||||||
|
|
||||||
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
|
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
|
||||||
r := &pb.LeaseRevokeRequest{ID: int64(id)}
|
r := &pb.LeaseRevokeRequest{ID: int64(id)}
|
||||||
resp, err := l.remote.LeaseRevoke(ctx, r)
|
resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return (*LeaseRevokeResponse)(resp), nil
|
return (*LeaseRevokeResponse)(resp), nil
|
||||||
}
|
}
|
||||||
@ -198,7 +204,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
|
|||||||
|
|
||||||
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
|
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
|
||||||
r := toLeaseTimeToLiveRequest(id, opts...)
|
r := toLeaseTimeToLiveRequest(id, opts...)
|
||||||
resp, err := l.remote.LeaseTimeToLive(ctx, r)
|
resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
gresp := &LeaseTimeToLiveResponse{
|
gresp := &LeaseTimeToLiveResponse{
|
||||||
ResponseHeader: resp.GetHeader(),
|
ResponseHeader: resp.GetHeader(),
|
||||||
@ -350,7 +356,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
|
|||||||
cctx, cancel := context.WithCancel(ctx)
|
cctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
stream, err := l.remote.LeaseKeepAlive(cctx)
|
stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
@ -421,7 +427,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
|
|||||||
// resetRecv opens a new lease stream and starts sending keep alive requests.
|
// resetRecv opens a new lease stream and starts sending keep alive requests.
|
||||||
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
|
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
|
||||||
sctx, cancel := context.WithCancel(l.stopCtx)
|
sctx, cancel := context.WithCancel(l.stopCtx)
|
||||||
stream, err := l.remote.LeaseKeepAlive(sctx)
|
stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
@ -53,12 +54,13 @@ type Maintenance interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type maintenance struct {
|
type maintenance struct {
|
||||||
dial func(endpoint string) (pb.MaintenanceClient, func(), error)
|
dial func(endpoint string) (pb.MaintenanceClient, func(), error)
|
||||||
remote pb.MaintenanceClient
|
remote pb.MaintenanceClient
|
||||||
|
callOpts []grpc.CallOption
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMaintenance(c *Client) Maintenance {
|
func NewMaintenance(c *Client) Maintenance {
|
||||||
return &maintenance{
|
api := &maintenance{
|
||||||
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
|
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
|
||||||
conn, err := c.dial(endpoint)
|
conn, err := c.dial(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -69,15 +71,23 @@ func NewMaintenance(c *Client) Maintenance {
|
|||||||
},
|
},
|
||||||
remote: RetryMaintenanceClient(c, c.conn),
|
remote: RetryMaintenanceClient(c, c.conn),
|
||||||
}
|
}
|
||||||
|
if c != nil {
|
||||||
|
api.callOpts = c.callOpts
|
||||||
|
}
|
||||||
|
return api
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance {
|
func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
|
||||||
return &maintenance{
|
api := &maintenance{
|
||||||
dial: func(string) (pb.MaintenanceClient, func(), error) {
|
dial: func(string) (pb.MaintenanceClient, func(), error) {
|
||||||
return remote, func() {}, nil
|
return remote, func() {}, nil
|
||||||
},
|
},
|
||||||
remote: remote,
|
remote: remote,
|
||||||
}
|
}
|
||||||
|
if c != nil {
|
||||||
|
api.callOpts = c.callOpts
|
||||||
|
}
|
||||||
|
return api
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
|
func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
|
||||||
@ -86,7 +96,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
|
|||||||
MemberID: 0, // all
|
MemberID: 0, // all
|
||||||
Alarm: pb.AlarmType_NONE, // all
|
Alarm: pb.AlarmType_NONE, // all
|
||||||
}
|
}
|
||||||
resp, err := m.remote.Alarm(ctx, req)
|
resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return (*AlarmResponse)(resp), nil
|
return (*AlarmResponse)(resp), nil
|
||||||
}
|
}
|
||||||
@ -116,7 +126,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
|
|||||||
return &ret, nil
|
return &ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := m.remote.Alarm(ctx, req)
|
resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return (*AlarmResponse)(resp), nil
|
return (*AlarmResponse)(resp), nil
|
||||||
}
|
}
|
||||||
@ -129,7 +139,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
|
|||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
defer cancel()
|
defer cancel()
|
||||||
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{})
|
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
@ -142,7 +152,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
|
|||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
defer cancel()
|
defer cancel()
|
||||||
resp, err := remote.Status(ctx, &pb.StatusRequest{})
|
resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
@ -150,7 +160,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
|
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
|
||||||
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{})
|
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
@ -106,7 +106,8 @@ func (wr *WatchResponse) IsProgressNotify() bool {
|
|||||||
|
|
||||||
// watcher implements the Watcher interface
|
// watcher implements the Watcher interface
|
||||||
type watcher struct {
|
type watcher struct {
|
||||||
remote pb.WatchClient
|
remote pb.WatchClient
|
||||||
|
callOpts []grpc.CallOption
|
||||||
|
|
||||||
// mu protects the grpc streams map
|
// mu protects the grpc streams map
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
@ -117,8 +118,9 @@ type watcher struct {
|
|||||||
|
|
||||||
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
|
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
|
||||||
type watchGrpcStream struct {
|
type watchGrpcStream struct {
|
||||||
owner *watcher
|
owner *watcher
|
||||||
remote pb.WatchClient
|
remote pb.WatchClient
|
||||||
|
callOpts []grpc.CallOption
|
||||||
|
|
||||||
// ctx controls internal remote.Watch requests
|
// ctx controls internal remote.Watch requests
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@ -189,14 +191,18 @@ type watcherStream struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewWatcher(c *Client) Watcher {
|
func NewWatcher(c *Client) Watcher {
|
||||||
return NewWatchFromWatchClient(pb.NewWatchClient(c.conn))
|
return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWatchFromWatchClient(wc pb.WatchClient) Watcher {
|
func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
|
||||||
return &watcher{
|
w := &watcher{
|
||||||
remote: wc,
|
remote: wc,
|
||||||
streams: make(map[string]*watchGrpcStream),
|
streams: make(map[string]*watchGrpcStream),
|
||||||
}
|
}
|
||||||
|
if c != nil {
|
||||||
|
w.callOpts = c.callOpts
|
||||||
|
}
|
||||||
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
// never closes
|
// never closes
|
||||||
@ -215,6 +221,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
|||||||
wgs := &watchGrpcStream{
|
wgs := &watchGrpcStream{
|
||||||
owner: w,
|
owner: w,
|
||||||
remote: w.remote,
|
remote: w.remote,
|
||||||
|
callOpts: w.callOpts,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
ctxKey: streamKeyFromCtx(inctx),
|
ctxKey: streamKeyFromCtx(inctx),
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@ -775,7 +782,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
|
if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if isHaltErr(w.ctx, err) {
|
if isHaltErr(w.ctx, err) {
|
||||||
|
@ -35,16 +35,16 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
|
|||||||
c.KV = clientv3.NewKVFromKVClient(kvc, c)
|
c.KV = clientv3.NewKVFromKVClient(kvc, c)
|
||||||
|
|
||||||
lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
|
lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
|
||||||
c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)
|
c.Lease = clientv3.NewLeaseFromLeaseClient(lc, c, time.Second)
|
||||||
|
|
||||||
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
|
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
|
||||||
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc)}
|
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}
|
||||||
|
|
||||||
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
|
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
|
||||||
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc)
|
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)
|
||||||
|
|
||||||
clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
|
clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
|
||||||
c.Cluster = clientv3.NewClusterFromClusterClient(clc)
|
c.Cluster = clientv3.NewClusterFromClusterClient(clc, c)
|
||||||
|
|
||||||
// TODO: implement clientv3.Auth interface?
|
// TODO: implement clientv3.Auth interface?
|
||||||
|
|
||||||
|
@ -102,9 +102,9 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
|
|||||||
c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
|
c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
|
||||||
pmu.Lock()
|
pmu.Lock()
|
||||||
lc := c.Lease
|
lc := c.Lease
|
||||||
c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)
|
c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout)
|
||||||
c.Watcher = &proxyCloser{
|
c.Watcher = &proxyCloser{
|
||||||
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
|
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c),
|
||||||
wdonec: proxies[c].wdonec,
|
wdonec: proxies[c].wdonec,
|
||||||
kvdonec: proxies[c].kvdonec,
|
kvdonec: proxies[c].kvdonec,
|
||||||
lclose: func() { lc.Close() },
|
lclose: func() { lc.Close() },
|
||||||
|
Loading…
x
Reference in New Issue
Block a user