From eb03d4803419b694e5156f6926ddfc33bae721cc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 28 Jan 2016 15:15:17 -0800 Subject: [PATCH] clientv3: hook up KV and Txn --- clientv3/kv.go | 54 +++++++++++++++++++++++++++++++++++++++++++------ clientv3/txn.go | 6 +++--- 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/clientv3/kv.go b/clientv3/kv.go index 4dc4c33a7..8d36ad199 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -66,6 +66,18 @@ type kv struct { c *Client } +func NewKV(c *Client) KV { + conn := c.activeConnection() + remote := pb.NewKVClient(conn) + + return &kv{ + conn: c.activeConnection(), + remote: remote, + + c: c, + } +} + func (kv *kv) Put(key, val string, leaseID lease.LeaseID) (*PutResponse, error) { r, err := kv.do(OpPut(key, val, leaseID)) if err != nil { @@ -106,6 +118,30 @@ func (kv *kv) Delete(key string) (*DeleteResponse, error) { return (*DeleteResponse)(r.GetResponseDeleteRange()), nil } +func (kv *kv) Compact(rev int64) error { + for { + r := &pb.CompactionRequest{Revision: rev} + _, err := kv.remote.Compact(context.TODO(), r) + if err == nil { + return nil + } + + if isRPCError(err) { + return err + } + + if nerr := kv.switchRemote(err); nerr != nil { + return nerr + } + } +} + +func (kv *kv) Txn() Txn { + return &txn{ + kv: kv, + } +} + func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { for { var err error @@ -148,12 +184,18 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { return nil, err } - newConn, cerr := kv.c.retryConnection(kv.conn, err) - if cerr != nil { - // TODO: return client lib defined connection error - return nil, cerr + if nerr := kv.switchRemote(err); nerr != nil { + return nil, nerr } - kv.conn = newConn - kv.remote = pb.NewKVClient(kv.conn) } } + +func (kv *kv) switchRemote(prevErr error) error { + newConn, err := kv.c.retryConnection(kv.conn, prevErr) + if err != nil { + return err + } + kv.conn = newConn + kv.remote = pb.NewKVClient(kv.conn) + return nil +} diff --git a/clientv3/txn.go b/clientv3/txn.go index 2b83def24..5042ea0f9 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -63,7 +63,7 @@ type txn struct { fas []*pb.RequestUnion } -func (txn *txn) If(cs ...Cmp) *txn { +func (txn *txn) If(cs ...Cmp) Txn { txn.mu.Lock() defer txn.mu.Unlock() @@ -86,7 +86,7 @@ func (txn *txn) If(cs ...Cmp) *txn { return txn } -func (txn *txn) Then(ops ...Op) *txn { +func (txn *txn) Then(ops ...Op) Txn { txn.mu.Lock() defer txn.mu.Unlock() @@ -106,7 +106,7 @@ func (txn *txn) Then(ops ...Op) *txn { return txn } -func (txn *txn) Else(ops ...Op) *txn { +func (txn *txn) Else(ops ...Op) Txn { txn.mu.Lock() defer txn.mu.Unlock()