From f8dbcd86ec941644df3f7e90d854303fc7f74206 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 14 Jun 2017 11:06:08 -0700 Subject: [PATCH] clientv3: support nested Txns with OpTxn --- clientv3/kv.go | 9 ++++++++- clientv3/op.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/clientv3/kv.go b/clientv3/kv.go index dc45fa4ad..f887e0441 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -66,11 +66,13 @@ type OpResponse struct { put *PutResponse get *GetResponse del *DeleteResponse + txn *TxnResponse } func (op OpResponse) Put() *PutResponse { return op.put } func (op OpResponse) Get() *GetResponse { return op.get } func (op OpResponse) Del() *DeleteResponse { return op.del } +func (op OpResponse) Txn() *TxnResponse { return op.txn } type kv struct { remote pb.KVClient @@ -134,7 +136,6 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { var err error switch op.t { - // TODO: handle other ops case tRange: var resp *pb.RangeResponse resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false)) @@ -155,6 +156,12 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } + case tTxn: + var resp *pb.TxnResponse + resp, err = kv.remote.Txn(ctx, op.toTxnRequest()) + if err == nil { + return OpResponse{txn: (*TxnResponse)(resp)}, nil + } default: panic("Unknown op") } diff --git a/clientv3/op.go b/clientv3/op.go index 63b99501e..ef6f04368 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -23,6 +23,7 @@ const ( tRange opType = iota + 1 tPut tDeleteRange + tTxn ) var ( @@ -67,10 +68,18 @@ type Op struct { // for put val []byte leaseID LeaseID + + // txn + cmps []Cmp + thenOps []Op + elseOps []Op } // accesors / mutators +func (op Op) IsTxn() bool { return op.t == tTxn } +func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps } + // KeyBytes returns the byte slice holding the Op's key. func (op Op) KeyBytes() []byte { return op.key } @@ -113,6 +122,22 @@ func (op Op) toRangeRequest() *pb.RangeRequest { return r } +func (op Op) toTxnRequest() *pb.TxnRequest { + thenOps := make([]*pb.RequestOp, len(op.thenOps)) + for i, tOp := range op.thenOps { + thenOps[i] = tOp.toRequestOp() + } + elseOps := make([]*pb.RequestOp, len(op.elseOps)) + for i, eOp := range op.elseOps { + elseOps[i] = eOp.toRequestOp() + } + cmps := make([]*pb.Compare, len(op.cmps)) + for i := range op.cmps { + cmps[i] = (*pb.Compare)(&op.cmps[i]) + } + return &pb.TxnRequest{Compare: cmps, Success: thenOps, Failure: elseOps} +} + func (op Op) toRequestOp() *pb.RequestOp { switch op.t { case tRange: @@ -123,12 +148,27 @@ func (op Op) toRequestOp() *pb.RequestOp { case tDeleteRange: r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}} + case tTxn: + return &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: op.toTxnRequest()}} default: panic("Unknown Op") } } func (op Op) isWrite() bool { + if op.t == tTxn { + for _, tOp := range op.thenOps { + if tOp.isWrite() { + return true + } + } + for _, tOp := range op.elseOps { + if tOp.isWrite() { + return true + } + } + return false + } return op.t != tRange } @@ -194,6 +234,10 @@ func OpPut(key, val string, opts ...OpOption) Op { return ret } +func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op { + return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps} +} + func opWatch(key string, opts ...OpOption) Op { ret := Op{t: tRange, key: []byte(key)} ret.applyOpts(opts)