mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: remove redundant retries in Maintenance, set FailFast=true
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
c09a89d834
commit
141170c1d4
@ -19,8 +19,6 @@ import (
|
||||
"io"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -77,9 +75,9 @@ func NewMaintenance(c *Client) Maintenance {
|
||||
return nil, nil, err
|
||||
}
|
||||
cancel := func() { conn.Close() }
|
||||
return pb.NewMaintenanceClient(conn), cancel, nil
|
||||
return RetryMaintenanceClient(c, conn), cancel, nil
|
||||
},
|
||||
remote: pb.NewMaintenanceClient(c.conn),
|
||||
remote: RetryMaintenanceClient(c, c.conn),
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,15 +96,11 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
|
||||
MemberID: 0, // all
|
||||
Alarm: pb.AlarmType_NONE, // all
|
||||
}
|
||||
for {
|
||||
resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false))
|
||||
if err == nil {
|
||||
return (*AlarmResponse)(resp), nil
|
||||
}
|
||||
if isHaltErr(ctx, err) {
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
resp, err := m.remote.Alarm(ctx, req)
|
||||
if err == nil {
|
||||
return (*AlarmResponse)(resp), nil
|
||||
}
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
|
||||
func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
|
||||
@ -132,7 +126,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
|
||||
return &ret, nil
|
||||
}
|
||||
|
||||
resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false))
|
||||
resp, err := m.remote.Alarm(ctx, req)
|
||||
if err == nil {
|
||||
return (*AlarmResponse)(resp), nil
|
||||
}
|
||||
@ -145,7 +139,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
defer cancel()
|
||||
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, grpc.FailFast(false))
|
||||
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{})
|
||||
if err != nil {
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
@ -158,7 +152,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
defer cancel()
|
||||
resp, err := remote.Status(ctx, &pb.StatusRequest{}, grpc.FailFast(false))
|
||||
resp, err := remote.Status(ctx, &pb.StatusRequest{})
|
||||
if err != nil {
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
@ -171,7 +165,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
defer cancel()
|
||||
resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, grpc.FailFast(false))
|
||||
resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev})
|
||||
if err != nil {
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
@ -179,7 +173,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
|
||||
}
|
||||
|
||||
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
|
||||
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, grpc.FailFast(false))
|
||||
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{})
|
||||
if err != nil {
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
@ -206,6 +200,6 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
|
||||
}
|
||||
|
||||
func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
|
||||
resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, grpc.FailFast(false))
|
||||
resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID})
|
||||
return (*MoveLeaderResponse)(resp), toErr(ctx, err)
|
||||
}
|
||||
|
@ -269,6 +269,80 @@ func (rcc *nonRepeatableClusterClient) MemberUpdate(ctx context.Context, in *pb.
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// RetryMaintenanceClient implements a Maintenance.
|
||||
func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient {
|
||||
repeatableRetry := c.newRetryWrapper(isRepeatableStopError)
|
||||
nonRepeatableRetry := c.newRetryWrapper(isNonRepeatableStopError)
|
||||
mc := pb.NewMaintenanceClient(conn)
|
||||
return &retryMaintenanceClient{&nonRepeatableMaintenanceClient{mc, nonRepeatableRetry}, repeatableRetry}
|
||||
}
|
||||
|
||||
type retryMaintenanceClient struct {
|
||||
*nonRepeatableMaintenanceClient
|
||||
repeatableRetry retryRPCFunc
|
||||
}
|
||||
|
||||
func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) {
|
||||
err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rmc.mc.Alarm(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) {
|
||||
err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rmc.mc.Status(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) {
|
||||
err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rmc.mc.Hash(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) {
|
||||
err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rmc.mc.HashKV(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) {
|
||||
err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
|
||||
stream, err = rmc.mc.Snapshot(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
return stream, err
|
||||
}
|
||||
|
||||
func (rmc *retryMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) {
|
||||
err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rmc.mc.MoveLeader(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
return resp, err
|
||||
}
|
||||
|
||||
type nonRepeatableMaintenanceClient struct {
|
||||
mc pb.MaintenanceClient
|
||||
nonRepeatableRetry retryRPCFunc
|
||||
}
|
||||
|
||||
func (rmc *nonRepeatableMaintenanceClient) Defragment(ctx context.Context, in *pb.DefragmentRequest, opts ...grpc.CallOption) (resp *pb.DefragmentResponse, err error) {
|
||||
err = rmc.nonRepeatableRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rmc.mc.Defragment(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
return resp, err
|
||||
}
|
||||
|
||||
type retryAuthClient struct {
|
||||
pb.AuthClient
|
||||
writeRetry retryRPCFunc
|
||||
|
Loading…
x
Reference in New Issue
Block a user