clientv3: hook up KV and Txn

This commit is contained in:
Xiang Li 2016-01-28 15:15:17 -08:00
parent aef77f9829
commit eb03d48034
2 changed files with 51 additions and 9 deletions

View File

@ -66,6 +66,18 @@ type kv struct {
c *Client
}
func NewKV(c *Client) KV {
conn := c.activeConnection()
remote := pb.NewKVClient(conn)
return &kv{
conn: c.activeConnection(),
remote: remote,
c: c,
}
}
func (kv *kv) Put(key, val string, leaseID lease.LeaseID) (*PutResponse, error) {
r, err := kv.do(OpPut(key, val, leaseID))
if err != nil {
@ -106,6 +118,30 @@ func (kv *kv) Delete(key string) (*DeleteResponse, error) {
return (*DeleteResponse)(r.GetResponseDeleteRange()), nil
}
func (kv *kv) Compact(rev int64) error {
for {
r := &pb.CompactionRequest{Revision: rev}
_, err := kv.remote.Compact(context.TODO(), r)
if err == nil {
return nil
}
if isRPCError(err) {
return err
}
if nerr := kv.switchRemote(err); nerr != nil {
return nerr
}
}
}
func (kv *kv) Txn() Txn {
return &txn{
kv: kv,
}
}
func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
for {
var err error
@ -148,12 +184,18 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
return nil, err
}
newConn, cerr := kv.c.retryConnection(kv.conn, err)
if cerr != nil {
// TODO: return client lib defined connection error
return nil, cerr
if nerr := kv.switchRemote(err); nerr != nil {
return nil, nerr
}
kv.conn = newConn
kv.remote = pb.NewKVClient(kv.conn)
}
}
func (kv *kv) switchRemote(prevErr error) error {
newConn, err := kv.c.retryConnection(kv.conn, prevErr)
if err != nil {
return err
}
kv.conn = newConn
kv.remote = pb.NewKVClient(kv.conn)
return nil
}

View File

@ -63,7 +63,7 @@ type txn struct {
fas []*pb.RequestUnion
}
func (txn *txn) If(cs ...Cmp) *txn {
func (txn *txn) If(cs ...Cmp) Txn {
txn.mu.Lock()
defer txn.mu.Unlock()
@ -86,7 +86,7 @@ func (txn *txn) If(cs ...Cmp) *txn {
return txn
}
func (txn *txn) Then(ops ...Op) *txn {
func (txn *txn) Then(ops ...Op) Txn {
txn.mu.Lock()
defer txn.mu.Unlock()
@ -106,7 +106,7 @@ func (txn *txn) Then(ops ...Op) *txn {
return txn
}
func (txn *txn) Else(ops ...Op) *txn {
func (txn *txn) Else(ops ...Op) Txn {
txn.mu.Lock()
defer txn.mu.Unlock()