From b46ab2c36e355c5da8729ca7d22853f6111bf874 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 18 Oct 2017 12:43:01 -0700 Subject: [PATCH] clientv3: remove redundant retries in KV, set FailFast=true Signed-off-by: Gyu-Ho Lee --- clientv3/kv.go | 23 ++-------------------- clientv3/retry.go | 49 ++++++++++++++++++++++++----------------------- clientv3/txn.go | 26 ++++--------------------- 3 files changed, 31 insertions(+), 67 deletions(-) diff --git a/clientv3/kv.go b/clientv3/kv.go index ead08a082..b578d9ebe 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -18,8 +18,6 @@ import ( "context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "google.golang.org/grpc" ) type ( @@ -132,28 +130,11 @@ func (kv *kv) Txn(ctx context.Context) Txn { } func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { - for { - resp, err := kv.do(ctx, op) - if err == nil { - return resp, nil - } - - if isHaltErr(ctx, err) { - return resp, toErr(ctx, err) - } - // do not retry on modifications - if op.isWrite() { - return resp, toErr(ctx, err) - } - } -} - -func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { var err error switch op.t { case tRange: var resp *pb.RangeResponse - resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false)) + resp, err = kv.remote.Range(ctx, op.toRangeRequest()) if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } @@ -180,5 +161,5 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { default: panic("Unknown op") } - return OpResponse{}, err + return OpResponse{}, toErr(ctx, err) } diff --git a/clientv3/retry.go b/clientv3/retry.go index 408383d27..157f60363 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -101,63 +101,64 @@ func (c *Client) newAuthRetryWrapper() retryRPCFunc { } } -// RetryKVClient implements a KVClient that uses the client's FailFast retry policy. +// RetryKVClient implements a KVClient. func RetryKVClient(c *Client) pb.KVClient { - readRetry := c.newRetryWrapper(isReadStopError) - writeRetry := c.newRetryWrapper(isWriteStopError) + repeatableRetry := c.newRetryWrapper(isReadStopError) + nonRepeatableRetry := c.newRetryWrapper(isWriteStopError) conn := pb.NewKVClient(c.conn) - retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry} + retryBasic := &retryKVClient{&nonRepeatableKVClient{conn, nonRepeatableRetry}, repeatableRetry} retryAuthWrapper := c.newAuthRetryWrapper() return &retryKVClient{ - &retryWriteKVClient{retryBasic, retryAuthWrapper}, + &nonRepeatableKVClient{retryBasic, retryAuthWrapper}, retryAuthWrapper} } type retryKVClient struct { - *retryWriteKVClient - readRetry retryRPCFunc + *nonRepeatableKVClient + repeatableRetry retryRPCFunc } func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) { - err = rkv.readRetry(ctx, func(rctx context.Context) error { - resp, err = rkv.KVClient.Range(rctx, in, opts...) + err = rkv.repeatableRetry(ctx, func(rctx context.Context) error { + resp, err = rkv.kc.Range(rctx, in, opts...) return err }) return resp, err } -type retryWriteKVClient struct { - pb.KVClient - writeRetry retryRPCFunc +type nonRepeatableKVClient struct { + kc pb.KVClient + nonRepeatableRetry retryRPCFunc } -func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { - err = rkv.writeRetry(ctx, func(rctx context.Context) error { - resp, err = rkv.KVClient.Put(rctx, in, opts...) +func (rkv *nonRepeatableKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { + err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error { + resp, err = rkv.kc.Put(rctx, in, opts...) return err }) return resp, err } -func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) { - err = rkv.writeRetry(ctx, func(rctx context.Context) error { - resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...) +func (rkv *nonRepeatableKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) { + err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error { + resp, err = rkv.kc.DeleteRange(rctx, in, opts...) return err }) return resp, err } -func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { - err = rkv.writeRetry(ctx, func(rctx context.Context) error { - resp, err = rkv.KVClient.Txn(rctx, in, opts...) +func (rkv *nonRepeatableKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { + // TODO: repeatableRetry if read-only txn + err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error { + resp, err = rkv.kc.Txn(rctx, in, opts...) return err }) return resp, err } -func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) { - err = rkv.writeRetry(ctx, func(rctx context.Context) error { - resp, err = rkv.KVClient.Compact(rctx, in, opts...) +func (rkv *nonRepeatableKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) { + err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error { + resp, err = rkv.kc.Compact(rctx, in, opts...) return err }) return resp, err diff --git a/clientv3/txn.go b/clientv3/txn.go index ea4ec6160..8169b6215 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -19,8 +19,6 @@ import ( "sync" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "google.golang.org/grpc" ) // Txn is the interface that wraps mini-transactions. @@ -136,30 +134,14 @@ func (txn *txn) Else(ops ...Op) Txn { func (txn *txn) Commit() (*TxnResponse, error) { txn.mu.Lock() defer txn.mu.Unlock() - for { - resp, err := txn.commit() - if err == nil { - return resp, err - } - if isHaltErr(txn.ctx, err) { - return nil, toErr(txn.ctx, err) - } - if txn.isWrite { - return nil, toErr(txn.ctx, err) - } - } -} -func (txn *txn) commit() (*TxnResponse, error) { r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} - var opts []grpc.CallOption - if !txn.isWrite { - opts = []grpc.CallOption{grpc.FailFast(false)} - } - resp, err := txn.kv.remote.Txn(txn.ctx, r, opts...) + var resp *pb.TxnResponse + var err error + resp, err = txn.kv.remote.Txn(txn.ctx, r) if err != nil { - return nil, err + return nil, toErr(txn.ctx, err) } return (*TxnResponse)(resp), nil }