From 85bfbfa5adaee43ac02424cb5ee15c59ed31e889 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 28 Jan 2016 16:41:09 -0800 Subject: [PATCH] clientv3: threadsafe --- clientv3/kv.go | 25 +++++++++++++++++++------ clientv3/txn.go | 19 +++++++++++-------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/clientv3/kv.go b/clientv3/kv.go index 8d36ad199..5c252974a 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -15,6 +15,8 @@ package clientv3 import ( + "sync" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -60,10 +62,11 @@ type KV interface { } type kv struct { + c *Client + + mu sync.Mutex // guards all fields conn *grpc.ClientConn // conn in-use remote pb.KVClient - - c *Client } func NewKV(c *Client) KV { @@ -121,7 +124,7 @@ func (kv *kv) Delete(key string) (*DeleteResponse, error) { func (kv *kv) Compact(rev int64) error { for { r := &pb.CompactionRequest{Revision: rev} - _, err := kv.remote.Compact(context.TODO(), r) + _, err := kv.getRemote().Compact(context.TODO(), r) if err == nil { return nil } @@ -155,7 +158,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target) } - resp, err = kv.remote.Range(context.TODO(), r) + resp, err = kv.getRemote().Range(context.TODO(), r) if err == nil { respu := &pb.ResponseUnion_ResponseRange{ResponseRange: resp} return &pb.ResponseUnion{Response: respu}, nil @@ -163,7 +166,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} - resp, err = kv.remote.Put(context.TODO(), r) + resp, err = kv.getRemote().Put(context.TODO(), r) if err == nil { respu := &pb.ResponseUnion_ResponsePut{ResponsePut: resp} return &pb.ResponseUnion{Response: respu}, nil @@ -171,7 +174,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} - resp, err = kv.remote.DeleteRange(context.TODO(), r) + resp, err = kv.getRemote().DeleteRange(context.TODO(), r) if err == nil { respu := &pb.ResponseUnion_ResponseDeleteRange{ResponseDeleteRange: resp} return &pb.ResponseUnion{Response: respu}, nil @@ -195,7 +198,17 @@ func (kv *kv) switchRemote(prevErr error) error { if err != nil { return err } + + kv.mu.Lock() + defer kv.mu.Unlock() + kv.conn = newConn kv.remote = pb.NewKVClient(kv.conn) return nil } + +func (kv *kv) getRemote() pb.KVClient { + kv.mu.Lock() + defer kv.mu.Unlock() + return kv.remote +} diff --git a/clientv3/txn.go b/clientv3/txn.go index 5042ea0f9..12ca17075 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -124,21 +124,24 @@ func (txn *txn) Else(ops ...Op) Txn { } func (txn *txn) Commit() (*TxnResponse, error) { + txn.mu.Lock() + defer txn.mu.Unlock() + kv := txn.kv + for { r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} - resp, err := kv.remote.Txn(context.TODO(), r) + resp, err := kv.getRemote().Txn(context.TODO(), r) if err == nil { return (*TxnResponse)(resp), nil } - // TODO: this can cause data race with other kv operation. - newConn, cerr := kv.c.retryConnection(kv.conn, err) - if cerr != nil { - // TODO: return client lib defined connection error - return nil, cerr + if isRPCError(err) { + return nil, err + } + + if nerr := kv.switchRemote(err); nerr != nil { + return nil, nerr } - kv.conn = newConn - kv.remote = pb.NewKVClient(kv.conn) } }