clientv3: remove redundant retries in KV, set FailFast=true

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyu-Ho Lee 2017-10-18 12:43:01 -07:00
parent ad7882590c
commit b46ab2c36e
3 changed files with 31 additions and 67 deletions

View File

@ -18,8 +18,6 @@ import (
"context" "context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
) )
type ( type (
@ -132,28 +130,11 @@ func (kv *kv) Txn(ctx context.Context) Txn {
} }
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
for {
resp, err := kv.do(ctx, op)
if err == nil {
return resp, nil
}
if isHaltErr(ctx, err) {
return resp, toErr(ctx, err)
}
// do not retry on modifications
if op.isWrite() {
return resp, toErr(ctx, err)
}
}
}
func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
var err error var err error
switch op.t { switch op.t {
case tRange: case tRange:
var resp *pb.RangeResponse var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false)) resp, err = kv.remote.Range(ctx, op.toRangeRequest())
if err == nil { if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil return OpResponse{get: (*GetResponse)(resp)}, nil
} }
@ -180,5 +161,5 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
default: default:
panic("Unknown op") panic("Unknown op")
} }
return OpResponse{}, err return OpResponse{}, toErr(ctx, err)
} }

View File

@ -101,63 +101,64 @@ func (c *Client) newAuthRetryWrapper() retryRPCFunc {
} }
} }
// RetryKVClient implements a KVClient that uses the client's FailFast retry policy. // RetryKVClient implements a KVClient.
func RetryKVClient(c *Client) pb.KVClient { func RetryKVClient(c *Client) pb.KVClient {
readRetry := c.newRetryWrapper(isReadStopError) repeatableRetry := c.newRetryWrapper(isReadStopError)
writeRetry := c.newRetryWrapper(isWriteStopError) nonRepeatableRetry := c.newRetryWrapper(isWriteStopError)
conn := pb.NewKVClient(c.conn) conn := pb.NewKVClient(c.conn)
retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry} retryBasic := &retryKVClient{&nonRepeatableKVClient{conn, nonRepeatableRetry}, repeatableRetry}
retryAuthWrapper := c.newAuthRetryWrapper() retryAuthWrapper := c.newAuthRetryWrapper()
return &retryKVClient{ return &retryKVClient{
&retryWriteKVClient{retryBasic, retryAuthWrapper}, &nonRepeatableKVClient{retryBasic, retryAuthWrapper},
retryAuthWrapper} retryAuthWrapper}
} }
type retryKVClient struct { type retryKVClient struct {
*retryWriteKVClient *nonRepeatableKVClient
readRetry retryRPCFunc repeatableRetry retryRPCFunc
} }
func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) { func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
err = rkv.readRetry(ctx, func(rctx context.Context) error { err = rkv.repeatableRetry(ctx, func(rctx context.Context) error {
resp, err = rkv.KVClient.Range(rctx, in, opts...) resp, err = rkv.kc.Range(rctx, in, opts...)
return err return err
}) })
return resp, err return resp, err
} }
type retryWriteKVClient struct { type nonRepeatableKVClient struct {
pb.KVClient kc pb.KVClient
writeRetry retryRPCFunc nonRepeatableRetry retryRPCFunc
} }
func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { func (rkv *nonRepeatableKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
err = rkv.writeRetry(ctx, func(rctx context.Context) error { err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error {
resp, err = rkv.KVClient.Put(rctx, in, opts...) resp, err = rkv.kc.Put(rctx, in, opts...)
return err return err
}) })
return resp, err return resp, err
} }
func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) { func (rkv *nonRepeatableKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
err = rkv.writeRetry(ctx, func(rctx context.Context) error { err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error {
resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...) resp, err = rkv.kc.DeleteRange(rctx, in, opts...)
return err return err
}) })
return resp, err return resp, err
} }
func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { func (rkv *nonRepeatableKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
err = rkv.writeRetry(ctx, func(rctx context.Context) error { // TODO: repeatableRetry if read-only txn
resp, err = rkv.KVClient.Txn(rctx, in, opts...) err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error {
resp, err = rkv.kc.Txn(rctx, in, opts...)
return err return err
}) })
return resp, err return resp, err
} }
func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) { func (rkv *nonRepeatableKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
err = rkv.writeRetry(ctx, func(rctx context.Context) error { err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error {
resp, err = rkv.KVClient.Compact(rctx, in, opts...) resp, err = rkv.kc.Compact(rctx, in, opts...)
return err return err
}) })
return resp, err return resp, err

View File

@ -19,8 +19,6 @@ import (
"sync" "sync"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
) )
// Txn is the interface that wraps mini-transactions. // Txn is the interface that wraps mini-transactions.
@ -136,30 +134,14 @@ func (txn *txn) Else(ops ...Op) Txn {
func (txn *txn) Commit() (*TxnResponse, error) { func (txn *txn) Commit() (*TxnResponse, error) {
txn.mu.Lock() txn.mu.Lock()
defer txn.mu.Unlock() defer txn.mu.Unlock()
for {
resp, err := txn.commit()
if err == nil {
return resp, err
}
if isHaltErr(txn.ctx, err) {
return nil, toErr(txn.ctx, err)
}
if txn.isWrite {
return nil, toErr(txn.ctx, err)
}
}
}
func (txn *txn) commit() (*TxnResponse, error) {
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
var opts []grpc.CallOption var resp *pb.TxnResponse
if !txn.isWrite { var err error
opts = []grpc.CallOption{grpc.FailFast(false)} resp, err = txn.kv.remote.Txn(txn.ctx, r)
}
resp, err := txn.kv.remote.Txn(txn.ctx, r, opts...)
if err != nil { if err != nil {
return nil, err return nil, toErr(txn.ctx, err)
} }
return (*TxnResponse)(resp), nil return (*TxnResponse)(resp), nil
} }