diff --git a/clientv3/compare.go b/clientv3/compare.go index 0a0a2137b..0624be580 100644 --- a/clientv3/compare.go +++ b/clientv3/compare.go @@ -14,5 +14,76 @@ package clientv3 -type Compare struct { +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type CompareTarget int +type CompareResult int + +const ( + CompareVersion CompareTarget = iota + CompareCreated + CompareModified + CompareValue +) + +type Cmp pb.Compare + +func Compare(key string, t pb.Compare_CompareTarget, result string, v interface{}) Cmp { + var r pb.Compare_CompareResult + + switch result { + case "=": + r = pb.Compare_EQUAL + case ">": + r = pb.Compare_GREATER + case "<": + r = pb.Compare_LESS + default: + panic("Unknown result op") + } + + switch t { + case pb.Compare_VALUE: + val, ok := v.(string) + if !ok { + panic("bad compare value") + } + return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_Value{Value: []byte(val)}} + case pb.Compare_VERSION: + return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_Version{Version: mustInt64(v)}} + case pb.Compare_CREATE: + return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}} + case pb.Compare_MOD: + return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_ModRevision{ModRevision: mustInt64(v)}} + default: + panic("Unknown compare type") + } +} + +func Value(key string) (string, pb.Compare_CompareTarget) { + return key, pb.Compare_VALUE +} + +func Version(key string) (string, pb.Compare_CompareTarget) { + return key, pb.Compare_VERSION +} + +func CreatedRevision(key string) (string, pb.Compare_CompareTarget) { + return key, pb.Compare_CREATE +} + +func ModifiedRevision(key string) (string, pb.Compare_CompareTarget) { + return key, pb.Compare_MOD +} + +func mustInt64(val interface{}) int64 { + if v, ok := val.(int64); ok { + return v + } + if v, ok := val.(int); ok { + return int64(v) + } + panic("bad value") } diff --git a/clientv3/kv.go b/clientv3/kv.go index f36496cb2..beea695fd 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -59,35 +59,6 @@ type KV interface { Txn() Txn } -// -// Tx.If( -// CmpValue(k1, ">", v1), -// CmpVersion(k1, "=", 2) -// ).Then( -// OpPut(k2,v2), OpPut(k3,v3) -// ).Else( -// OpPut(k4,v4), OpPut(k5,v5) -// ).Commit() -type Txn interface { - // If takes a list of comparison. If all comparisons passed in succeed, - // the operations passed into Then() will be executed. Or the operations - // passed into Else() will be executed. - If(cs ...Compare) Txn - - // Then takes a list of operations. The Ops list will be executed, if the - // comparisons passed in If() succeed. - Then(ops ...Op) Txn - - // Else takes a list of operations. The Ops list will be executed, if the - // comparisons passed in If() fail. - Else(ops ...Op) Txn - - // Commit tries to commit the transaction. - Commit() (*TxnResponse, error) - - // TODO: add a Do for shortcut the txn without any condition? -} - type kv struct { conn *grpc.ClientConn // conn in-use remote pb.KVClient diff --git a/clientv3/op.go b/clientv3/op.go index 82d9c3156..fdc333469 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -14,7 +14,10 @@ package clientv3 -import "github.com/coreos/etcd/lease" +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/lease" +) type opType int @@ -41,6 +44,26 @@ type Op struct { leaseID lease.LeaseID } +func (op Op) toRequestUnion() *pb.RequestUnion { + switch op.t { + case tRange: + r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev} + if op.sort != nil { + r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order) + r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target) + } + return &pb.RequestUnion{Request: &pb.RequestUnion_RequestRange{RequestRange: r}} + case tPut: + r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} + return &pb.RequestUnion{Request: &pb.RequestUnion_RequestPut{RequestPut: r}} + case tDeleteRange: + r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} + return &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{RequestDeleteRange: r}} + default: + panic("Unknown Op") + } +} + func OpRange(key, end string, limit, rev int64, sort *SortOption) Op { return Op{ t: tRange, diff --git a/clientv3/txn.go b/clientv3/txn.go new file mode 100644 index 000000000..2b83def24 --- /dev/null +++ b/clientv3/txn.go @@ -0,0 +1,144 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package clientv3 + +import ( + "sync" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// +// Tx.If( +// Compare(Value(k1), ">", v1), +// Compare(Version(k1), "=", 2) +// ).Then( +// OpPut(k2,v2), OpPut(k3,v3) +// ).Else( +// OpPut(k4,v4), OpPut(k5,v5) +// ).Commit() +type Txn interface { + // If takes a list of comparison. If all comparisons passed in succeed, + // the operations passed into Then() will be executed. Or the operations + // passed into Else() will be executed. + If(cs ...Cmp) Txn + + // Then takes a list of operations. The Ops list will be executed, if the + // comparisons passed in If() succeed. + Then(ops ...Op) Txn + + // Else takes a list of operations. The Ops list will be executed, if the + // comparisons passed in If() fail. + Else(ops ...Op) Txn + + // Commit tries to commit the transaction. + Commit() (*TxnResponse, error) + + // TODO: add a Do for shortcut the txn without any condition? +} + +type txn struct { + kv *kv + + mu sync.Mutex + cif bool + cthen bool + celse bool + + cmps []*pb.Compare + + sus []*pb.RequestUnion + fas []*pb.RequestUnion +} + +func (txn *txn) If(cs ...Cmp) *txn { + txn.mu.Lock() + defer txn.mu.Unlock() + + if txn.cif { + panic("cannot call If twice!") + } + + if txn.cthen { + panic("cannot call If after Then!") + } + + if txn.celse { + panic("cannot call If after Else!") + } + + for _, cmp := range cs { + txn.cmps = append(txn.cmps, (*pb.Compare)(&cmp)) + } + + return txn +} + +func (txn *txn) Then(ops ...Op) *txn { + txn.mu.Lock() + defer txn.mu.Unlock() + + if txn.cthen { + panic("cannot call Then twice!") + } + if txn.celse { + panic("cannot call Then after Else!") + } + + txn.cthen = true + + for _, op := range ops { + txn.sus = append(txn.sus, op.toRequestUnion()) + } + + return txn +} + +func (txn *txn) Else(ops ...Op) *txn { + txn.mu.Lock() + defer txn.mu.Unlock() + + if txn.celse { + panic("cannot call Else twice!") + } + + txn.celse = true + + for _, op := range ops { + txn.fas = append(txn.fas, op.toRequestUnion()) + } + + return txn +} + +func (txn *txn) Commit() (*TxnResponse, error) { + kv := txn.kv + for { + r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} + resp, err := kv.remote.Txn(context.TODO(), r) + if err == nil { + return (*TxnResponse)(resp), nil + } + + // TODO: this can cause data race with other kv operation. + newConn, cerr := kv.c.retryConnection(kv.conn, err) + if cerr != nil { + // TODO: return client lib defined connection error + return nil, cerr + } + kv.conn = newConn + kv.remote = pb.NewKVClient(kv.conn) + } +}