mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: remove redundant retries in Cluster, set FailFast=true
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
fecd26f141
commit
c09a89d834
@ -18,7 +18,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
@ -75,27 +74,19 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
|
|||||||
|
|
||||||
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
|
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
|
||||||
// it is safe to retry on update.
|
// it is safe to retry on update.
|
||||||
for {
|
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
|
||||||
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
|
resp, err := c.remote.MemberUpdate(ctx, r)
|
||||||
resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false))
|
if err == nil {
|
||||||
if err == nil {
|
return (*MemberUpdateResponse)(resp), nil
|
||||||
return (*MemberUpdateResponse)(resp), nil
|
|
||||||
}
|
|
||||||
if isHaltErr(ctx, err) {
|
|
||||||
return nil, toErr(ctx, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
|
func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
|
||||||
// it is safe to retry on list.
|
// it is safe to retry on list.
|
||||||
for {
|
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{})
|
||||||
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, grpc.FailFast(false))
|
if err == nil {
|
||||||
if err == nil {
|
return (*MemberListResponse)(resp), nil
|
||||||
return (*MemberListResponse)(resp), nil
|
|
||||||
}
|
|
||||||
if isHaltErr(ctx, err) {
|
|
||||||
return nil, toErr(ctx, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
@ -220,34 +220,50 @@ func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.Ca
|
|||||||
}
|
}
|
||||||
|
|
||||||
type retryClusterClient struct {
|
type retryClusterClient struct {
|
||||||
pb.ClusterClient
|
*nonRepeatableClusterClient
|
||||||
writeRetry retryRPCFunc
|
repeatableRetry retryRPCFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy.
|
// RetryClusterClient implements a ClusterClient.
|
||||||
func RetryClusterClient(c *Client) pb.ClusterClient {
|
func RetryClusterClient(c *Client) pb.ClusterClient {
|
||||||
return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(isNonRepeatableStopError)}
|
repeatableRetry := c.newRetryWrapper(isRepeatableStopError)
|
||||||
|
nonRepeatableRetry := c.newRetryWrapper(isNonRepeatableStopError)
|
||||||
|
cc := pb.NewClusterClient(c.conn)
|
||||||
|
return &retryClusterClient{&nonRepeatableClusterClient{cc, nonRepeatableRetry}, repeatableRetry}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
|
func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) {
|
||||||
err = rcc.writeRetry(ctx, func(rctx context.Context) error {
|
err = rcc.repeatableRetry(ctx, func(rctx context.Context) error {
|
||||||
resp, err = rcc.ClusterClient.MemberAdd(rctx, in, opts...)
|
resp, err = rcc.cc.MemberList(rctx, in, opts...)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
|
type nonRepeatableClusterClient struct {
|
||||||
err = rcc.writeRetry(ctx, func(rctx context.Context) error {
|
cc pb.ClusterClient
|
||||||
resp, err = rcc.ClusterClient.MemberRemove(rctx, in, opts...)
|
nonRepeatableRetry retryRPCFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rcc *nonRepeatableClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
|
||||||
|
err = rcc.nonRepeatableRetry(ctx, func(rctx context.Context) error {
|
||||||
|
resp, err = rcc.cc.MemberAdd(rctx, in, opts...)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
|
func (rcc *nonRepeatableClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
|
||||||
err = rcc.writeRetry(ctx, func(rctx context.Context) error {
|
err = rcc.nonRepeatableRetry(ctx, func(rctx context.Context) error {
|
||||||
resp, err = rcc.ClusterClient.MemberUpdate(rctx, in, opts...)
|
resp, err = rcc.cc.MemberRemove(rctx, in, opts...)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rcc *nonRepeatableClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
|
||||||
|
err = rcc.nonRepeatableRetry(ctx, func(rctx context.Context) error {
|
||||||
|
resp, err = rcc.cc.MemberUpdate(rctx, in, opts...)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
return resp, err
|
return resp, err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user