clientv3: initial txn

This commit is contained in:
Xiang Li 2016-01-28 13:58:12 -08:00
parent 5bd930c9d2
commit 92653dcbfb
4 changed files with 240 additions and 31 deletions

View File

@ -14,5 +14,76 @@
package clientv3
type Compare struct {
import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type CompareTarget int
type CompareResult int
const (
CompareVersion CompareTarget = iota
CompareCreated
CompareModified
CompareValue
)
type Cmp pb.Compare
func Compare(key string, t pb.Compare_CompareTarget, result string, v interface{}) Cmp {
var r pb.Compare_CompareResult
switch result {
case "=":
r = pb.Compare_EQUAL
case ">":
r = pb.Compare_GREATER
case "<":
r = pb.Compare_LESS
default:
panic("Unknown result op")
}
switch t {
case pb.Compare_VALUE:
val, ok := v.(string)
if !ok {
panic("bad compare value")
}
return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_Value{Value: []byte(val)}}
case pb.Compare_VERSION:
return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_Version{Version: mustInt64(v)}}
case pb.Compare_CREATE:
return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}}
case pb.Compare_MOD:
return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_ModRevision{ModRevision: mustInt64(v)}}
default:
panic("Unknown compare type")
}
}
func Value(key string) (string, pb.Compare_CompareTarget) {
return key, pb.Compare_VALUE
}
func Version(key string) (string, pb.Compare_CompareTarget) {
return key, pb.Compare_VERSION
}
func CreatedRevision(key string) (string, pb.Compare_CompareTarget) {
return key, pb.Compare_CREATE
}
func ModifiedRevision(key string) (string, pb.Compare_CompareTarget) {
return key, pb.Compare_MOD
}
func mustInt64(val interface{}) int64 {
if v, ok := val.(int64); ok {
return v
}
if v, ok := val.(int); ok {
return int64(v)
}
panic("bad value")
}

View File

@ -59,35 +59,6 @@ type KV interface {
Txn() Txn
}
//
// Tx.If(
// CmpValue(k1, ">", v1),
// CmpVersion(k1, "=", 2)
// ).Then(
// OpPut(k2,v2), OpPut(k3,v3)
// ).Else(
// OpPut(k4,v4), OpPut(k5,v5)
// ).Commit()
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Compare) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
// TODO: add a Do for shortcut the txn without any condition?
}
type kv struct {
conn *grpc.ClientConn // conn in-use
remote pb.KVClient

View File

@ -14,7 +14,10 @@
package clientv3
import "github.com/coreos/etcd/lease"
import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
)
type opType int
@ -41,6 +44,26 @@ type Op struct {
leaseID lease.LeaseID
}
func (op Op) toRequestUnion() *pb.RequestUnion {
switch op.t {
case tRange:
r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev}
if op.sort != nil {
r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
}
return &pb.RequestUnion{Request: &pb.RequestUnion_RequestRange{RequestRange: r}}
case tPut:
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
return &pb.RequestUnion{Request: &pb.RequestUnion_RequestPut{RequestPut: r}}
case tDeleteRange:
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
return &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{RequestDeleteRange: r}}
default:
panic("Unknown Op")
}
}
func OpRange(key, end string, limit, rev int64, sort *SortOption) Op {
return Op{
t: tRange,

144
clientv3/txn.go Normal file
View File

@ -0,0 +1,144 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
//
// Tx.If(
// Compare(Value(k1), ">", v1),
// Compare(Version(k1), "=", 2)
// ).Then(
// OpPut(k2,v2), OpPut(k3,v3)
// ).Else(
// OpPut(k4,v4), OpPut(k5,v5)
// ).Commit()
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
// TODO: add a Do for shortcut the txn without any condition?
}
type txn struct {
kv *kv
mu sync.Mutex
cif bool
cthen bool
celse bool
cmps []*pb.Compare
sus []*pb.RequestUnion
fas []*pb.RequestUnion
}
func (txn *txn) If(cs ...Cmp) *txn {
txn.mu.Lock()
defer txn.mu.Unlock()
if txn.cif {
panic("cannot call If twice!")
}
if txn.cthen {
panic("cannot call If after Then!")
}
if txn.celse {
panic("cannot call If after Else!")
}
for _, cmp := range cs {
txn.cmps = append(txn.cmps, (*pb.Compare)(&cmp))
}
return txn
}
func (txn *txn) Then(ops ...Op) *txn {
txn.mu.Lock()
defer txn.mu.Unlock()
if txn.cthen {
panic("cannot call Then twice!")
}
if txn.celse {
panic("cannot call Then after Else!")
}
txn.cthen = true
for _, op := range ops {
txn.sus = append(txn.sus, op.toRequestUnion())
}
return txn
}
func (txn *txn) Else(ops ...Op) *txn {
txn.mu.Lock()
defer txn.mu.Unlock()
if txn.celse {
panic("cannot call Else twice!")
}
txn.celse = true
for _, op := range ops {
txn.fas = append(txn.fas, op.toRequestUnion())
}
return txn
}
func (txn *txn) Commit() (*TxnResponse, error) {
kv := txn.kv
for {
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
resp, err := kv.remote.Txn(context.TODO(), r)
if err == nil {
return (*TxnResponse)(resp), nil
}
// TODO: this can cause data race with other kv operation.
newConn, cerr := kv.c.retryConnection(kv.conn, err)
if cerr != nil {
// TODO: return client lib defined connection error
return nil, cerr
}
kv.conn = newConn
kv.remote = pb.NewKVClient(kv.conn)
}
}