mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #8102 from heyitsanthony/txn-nested
api: nested txns
This commit is contained in:
commit
9cb12deca6
@ -669,6 +669,7 @@ Empty field.
|
|||||||
| request_range | | RangeRequest |
|
| request_range | | RangeRequest |
|
||||||
| request_put | | PutRequest |
|
| request_put | | PutRequest |
|
||||||
| request_delete_range | | DeleteRangeRequest |
|
| request_delete_range | | DeleteRangeRequest |
|
||||||
|
| request_txn | | TxnRequest |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -691,6 +692,7 @@ Empty field.
|
|||||||
| response_range | | RangeResponse |
|
| response_range | | RangeResponse |
|
||||||
| response_put | | PutResponse |
|
| response_put | | PutResponse |
|
||||||
| response_delete_range | | DeleteRangeResponse |
|
| response_delete_range | | DeleteRangeResponse |
|
||||||
|
| response_txn | | TxnResponse |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -1954,6 +1954,9 @@
|
|||||||
},
|
},
|
||||||
"request_range": {
|
"request_range": {
|
||||||
"$ref": "#/definitions/etcdserverpbRangeRequest"
|
"$ref": "#/definitions/etcdserverpbRangeRequest"
|
||||||
|
},
|
||||||
|
"request_txn": {
|
||||||
|
"$ref": "#/definitions/etcdserverpbTxnRequest"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -1993,6 +1996,9 @@
|
|||||||
},
|
},
|
||||||
"response_range": {
|
"response_range": {
|
||||||
"$ref": "#/definitions/etcdserverpbRangeResponse"
|
"$ref": "#/definitions/etcdserverpbRangeResponse"
|
||||||
|
},
|
||||||
|
"response_txn": {
|
||||||
|
"$ref": "#/definitions/etcdserverpbTxnResponse"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -106,13 +106,20 @@ func TestTxnReadRetry(t *testing.T) {
|
|||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
kv := clus.Client(0)
|
kv := clus.Client(0)
|
||||||
|
|
||||||
|
thenOps := [][]clientv3.Op{
|
||||||
|
{clientv3.OpGet("foo")},
|
||||||
|
{clientv3.OpTxn(nil, []clientv3.Op{clientv3.OpGet("foo")}, nil)},
|
||||||
|
{clientv3.OpTxn(nil, nil, nil)},
|
||||||
|
{},
|
||||||
|
}
|
||||||
|
for i := range thenOps {
|
||||||
clus.Members[0].Stop(t)
|
clus.Members[0].Stop(t)
|
||||||
<-clus.Members[0].StopNotify()
|
<-clus.Members[0].StopNotify()
|
||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
ctx := context.TODO()
|
_, err := kv.Txn(context.TODO()).Then(thenOps[i]...).Commit()
|
||||||
_, err := kv.Txn(ctx).Then(clientv3.OpGet("foo")).Commit()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected response, got error %v", err)
|
t.Fatalf("expected response, got error %v", err)
|
||||||
}
|
}
|
||||||
@ -128,6 +135,7 @@ func TestTxnReadRetry(t *testing.T) {
|
|||||||
case <-time.After(2 * clus.Members[1].ServerConfig.ReqTimeout()):
|
case <-time.After(2 * clus.Members[1].ServerConfig.ReqTimeout()):
|
||||||
t.Fatalf("waited too long")
|
t.Fatalf("waited too long")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTxnSuccess(t *testing.T) {
|
func TestTxnSuccess(t *testing.T) {
|
||||||
@ -179,3 +187,41 @@ func TestTxnCompareRange(t *testing.T) {
|
|||||||
t.Fatal("expected prefix compare to false, got compares as true")
|
t.Fatal("expected prefix compare to false, got compares as true")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTxnNested(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
kv := clus.Client(0)
|
||||||
|
|
||||||
|
tresp, err := kv.Txn(context.TODO()).
|
||||||
|
If(clientv3.Compare(clientv3.Version("foo"), "=", 0)).
|
||||||
|
Then(
|
||||||
|
clientv3.OpPut("foo", "bar"),
|
||||||
|
clientv3.OpTxn(nil, []clientv3.Op{clientv3.OpPut("abc", "123")}, nil)).
|
||||||
|
Else(clientv3.OpPut("foo", "baz")).Commit()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(tresp.Responses) != 2 {
|
||||||
|
t.Errorf("expected 2 top-level txn responses, got %+v", tresp.Responses)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check txn writes were applied
|
||||||
|
resp, err := kv.Get(context.TODO(), "foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(resp.Kvs) != 1 || string(resp.Kvs[0].Value) != "bar" {
|
||||||
|
t.Errorf("unexpected Get response %+v", resp)
|
||||||
|
}
|
||||||
|
resp, err = kv.Get(context.TODO(), "abc")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(resp.Kvs) != 1 || string(resp.Kvs[0].Value) != "123" {
|
||||||
|
t.Errorf("unexpected Get response %+v", resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -66,11 +66,13 @@ type OpResponse struct {
|
|||||||
put *PutResponse
|
put *PutResponse
|
||||||
get *GetResponse
|
get *GetResponse
|
||||||
del *DeleteResponse
|
del *DeleteResponse
|
||||||
|
txn *TxnResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
func (op OpResponse) Put() *PutResponse { return op.put }
|
func (op OpResponse) Put() *PutResponse { return op.put }
|
||||||
func (op OpResponse) Get() *GetResponse { return op.get }
|
func (op OpResponse) Get() *GetResponse { return op.get }
|
||||||
func (op OpResponse) Del() *DeleteResponse { return op.del }
|
func (op OpResponse) Del() *DeleteResponse { return op.del }
|
||||||
|
func (op OpResponse) Txn() *TxnResponse { return op.txn }
|
||||||
|
|
||||||
type kv struct {
|
type kv struct {
|
||||||
remote pb.KVClient
|
remote pb.KVClient
|
||||||
@ -134,7 +136,6 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
|
|||||||
func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
|
func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
|
||||||
var err error
|
var err error
|
||||||
switch op.t {
|
switch op.t {
|
||||||
// TODO: handle other ops
|
|
||||||
case tRange:
|
case tRange:
|
||||||
var resp *pb.RangeResponse
|
var resp *pb.RangeResponse
|
||||||
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false))
|
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false))
|
||||||
@ -155,6 +156,12 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
return OpResponse{del: (*DeleteResponse)(resp)}, nil
|
return OpResponse{del: (*DeleteResponse)(resp)}, nil
|
||||||
}
|
}
|
||||||
|
case tTxn:
|
||||||
|
var resp *pb.TxnResponse
|
||||||
|
resp, err = kv.remote.Txn(ctx, op.toTxnRequest())
|
||||||
|
if err == nil {
|
||||||
|
return OpResponse{txn: (*TxnResponse)(resp)}, nil
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
panic("Unknown op")
|
panic("Unknown op")
|
||||||
}
|
}
|
||||||
|
@ -74,7 +74,7 @@ func (kv *kvPrefix) Delete(ctx context.Context, key string, opts ...clientv3.OpO
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
|
func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
|
||||||
if len(op.KeyBytes()) == 0 {
|
if len(op.KeyBytes()) == 0 && !op.IsTxn() {
|
||||||
return clientv3.OpResponse{}, rpctypes.ErrEmptyKey
|
return clientv3.OpResponse{}, rpctypes.ErrEmptyKey
|
||||||
}
|
}
|
||||||
r, err := kv.KV.Do(ctx, kv.prefixOp(op))
|
r, err := kv.KV.Do(ctx, kv.prefixOp(op))
|
||||||
@ -88,6 +88,8 @@ func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse
|
|||||||
kv.unprefixPutResponse(r.Put())
|
kv.unprefixPutResponse(r.Put())
|
||||||
case r.Del() != nil:
|
case r.Del() != nil:
|
||||||
kv.unprefixDeleteResponse(r.Del())
|
kv.unprefixDeleteResponse(r.Del())
|
||||||
|
case r.Txn() != nil:
|
||||||
|
kv.unprefixTxnResponse(r.Txn())
|
||||||
}
|
}
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
@ -102,34 +104,17 @@ func (kv *kvPrefix) Txn(ctx context.Context) clientv3.Txn {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn {
|
func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn {
|
||||||
newCmps := make([]clientv3.Cmp, len(cs))
|
txn.Txn = txn.Txn.If(txn.kv.prefixCmps(cs)...)
|
||||||
for i := range cs {
|
|
||||||
newCmps[i] = cs[i]
|
|
||||||
pfxKey, endKey := txn.kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd)
|
|
||||||
newCmps[i].WithKeyBytes(pfxKey)
|
|
||||||
if len(cs[i].RangeEnd) != 0 {
|
|
||||||
newCmps[i].RangeEnd = endKey
|
|
||||||
}
|
|
||||||
}
|
|
||||||
txn.Txn = txn.Txn.If(newCmps...)
|
|
||||||
return txn
|
return txn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (txn *txnPrefix) Then(ops ...clientv3.Op) clientv3.Txn {
|
func (txn *txnPrefix) Then(ops ...clientv3.Op) clientv3.Txn {
|
||||||
newOps := make([]clientv3.Op, len(ops))
|
txn.Txn = txn.Txn.Then(txn.kv.prefixOps(ops)...)
|
||||||
for i := range ops {
|
|
||||||
newOps[i] = txn.kv.prefixOp(ops[i])
|
|
||||||
}
|
|
||||||
txn.Txn = txn.Txn.Then(newOps...)
|
|
||||||
return txn
|
return txn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (txn *txnPrefix) Else(ops ...clientv3.Op) clientv3.Txn {
|
func (txn *txnPrefix) Else(ops ...clientv3.Op) clientv3.Txn {
|
||||||
newOps := make([]clientv3.Op, len(ops))
|
txn.Txn = txn.Txn.Else(txn.kv.prefixOps(ops)...)
|
||||||
for i := range ops {
|
|
||||||
newOps[i] = txn.kv.prefixOp(ops[i])
|
|
||||||
}
|
|
||||||
txn.Txn = txn.Txn.Else(newOps...)
|
|
||||||
return txn
|
return txn
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -143,10 +128,14 @@ func (txn *txnPrefix) Commit() (*clientv3.TxnResponse, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (kv *kvPrefix) prefixOp(op clientv3.Op) clientv3.Op {
|
func (kv *kvPrefix) prefixOp(op clientv3.Op) clientv3.Op {
|
||||||
|
if !op.IsTxn() {
|
||||||
begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes())
|
begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes())
|
||||||
op.WithKeyBytes(begin)
|
op.WithKeyBytes(begin)
|
||||||
op.WithRangeBytes(end)
|
op.WithRangeBytes(end)
|
||||||
return op
|
return op
|
||||||
|
}
|
||||||
|
cmps, thenOps, elseOps := op.Txn()
|
||||||
|
return clientv3.OpTxn(kv.prefixCmps(cmps), kv.prefixOps(thenOps), kv.prefixOps(elseOps))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) {
|
func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) {
|
||||||
@ -182,6 +171,10 @@ func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) {
|
|||||||
if tv.ResponseDeleteRange != nil {
|
if tv.ResponseDeleteRange != nil {
|
||||||
kv.unprefixDeleteResponse((*clientv3.DeleteResponse)(tv.ResponseDeleteRange))
|
kv.unprefixDeleteResponse((*clientv3.DeleteResponse)(tv.ResponseDeleteRange))
|
||||||
}
|
}
|
||||||
|
case *pb.ResponseOp_ResponseTxn:
|
||||||
|
if tv.ResponseTxn != nil {
|
||||||
|
kv.unprefixTxnResponse((*clientv3.TxnResponse)(tv.ResponseTxn))
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -190,3 +183,24 @@ func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) {
|
|||||||
func (p *kvPrefix) prefixInterval(key, end []byte) (pfxKey []byte, pfxEnd []byte) {
|
func (p *kvPrefix) prefixInterval(key, end []byte) (pfxKey []byte, pfxEnd []byte) {
|
||||||
return prefixInterval(p.pfx, key, end)
|
return prefixInterval(p.pfx, key, end)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) prefixCmps(cs []clientv3.Cmp) []clientv3.Cmp {
|
||||||
|
newCmps := make([]clientv3.Cmp, len(cs))
|
||||||
|
for i := range cs {
|
||||||
|
newCmps[i] = cs[i]
|
||||||
|
pfxKey, endKey := kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd)
|
||||||
|
newCmps[i].WithKeyBytes(pfxKey)
|
||||||
|
if len(cs[i].RangeEnd) != 0 {
|
||||||
|
newCmps[i].RangeEnd = endKey
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newCmps
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) prefixOps(ops []clientv3.Op) []clientv3.Op {
|
||||||
|
newOps := make([]clientv3.Op, len(ops))
|
||||||
|
for i := range ops {
|
||||||
|
newOps[i] = kv.prefixOp(ops[i])
|
||||||
|
}
|
||||||
|
return newOps
|
||||||
|
}
|
||||||
|
@ -23,6 +23,7 @@ const (
|
|||||||
tRange opType = iota + 1
|
tRange opType = iota + 1
|
||||||
tPut
|
tPut
|
||||||
tDeleteRange
|
tDeleteRange
|
||||||
|
tTxn
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -67,10 +68,18 @@ type Op struct {
|
|||||||
// for put
|
// for put
|
||||||
val []byte
|
val []byte
|
||||||
leaseID LeaseID
|
leaseID LeaseID
|
||||||
|
|
||||||
|
// txn
|
||||||
|
cmps []Cmp
|
||||||
|
thenOps []Op
|
||||||
|
elseOps []Op
|
||||||
}
|
}
|
||||||
|
|
||||||
// accesors / mutators
|
// accesors / mutators
|
||||||
|
|
||||||
|
func (op Op) IsTxn() bool { return op.t == tTxn }
|
||||||
|
func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps }
|
||||||
|
|
||||||
// KeyBytes returns the byte slice holding the Op's key.
|
// KeyBytes returns the byte slice holding the Op's key.
|
||||||
func (op Op) KeyBytes() []byte { return op.key }
|
func (op Op) KeyBytes() []byte { return op.key }
|
||||||
|
|
||||||
@ -113,6 +122,22 @@ func (op Op) toRangeRequest() *pb.RangeRequest {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (op Op) toTxnRequest() *pb.TxnRequest {
|
||||||
|
thenOps := make([]*pb.RequestOp, len(op.thenOps))
|
||||||
|
for i, tOp := range op.thenOps {
|
||||||
|
thenOps[i] = tOp.toRequestOp()
|
||||||
|
}
|
||||||
|
elseOps := make([]*pb.RequestOp, len(op.elseOps))
|
||||||
|
for i, eOp := range op.elseOps {
|
||||||
|
elseOps[i] = eOp.toRequestOp()
|
||||||
|
}
|
||||||
|
cmps := make([]*pb.Compare, len(op.cmps))
|
||||||
|
for i := range op.cmps {
|
||||||
|
cmps[i] = (*pb.Compare)(&op.cmps[i])
|
||||||
|
}
|
||||||
|
return &pb.TxnRequest{Compare: cmps, Success: thenOps, Failure: elseOps}
|
||||||
|
}
|
||||||
|
|
||||||
func (op Op) toRequestOp() *pb.RequestOp {
|
func (op Op) toRequestOp() *pb.RequestOp {
|
||||||
switch op.t {
|
switch op.t {
|
||||||
case tRange:
|
case tRange:
|
||||||
@ -123,12 +148,27 @@ func (op Op) toRequestOp() *pb.RequestOp {
|
|||||||
case tDeleteRange:
|
case tDeleteRange:
|
||||||
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
|
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
|
||||||
return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
|
return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
|
||||||
|
case tTxn:
|
||||||
|
return &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: op.toTxnRequest()}}
|
||||||
default:
|
default:
|
||||||
panic("Unknown Op")
|
panic("Unknown Op")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (op Op) isWrite() bool {
|
func (op Op) isWrite() bool {
|
||||||
|
if op.t == tTxn {
|
||||||
|
for _, tOp := range op.thenOps {
|
||||||
|
if tOp.isWrite() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, tOp := range op.elseOps {
|
||||||
|
if tOp.isWrite() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
return op.t != tRange
|
return op.t != tRange
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,6 +234,10 @@ func OpPut(key, val string, opts ...OpOption) Op {
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op {
|
||||||
|
return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps}
|
||||||
|
}
|
||||||
|
|
||||||
func opWatch(key string, opts ...OpOption) Op {
|
func opWatch(key string, opts ...OpOption) Op {
|
||||||
ret := Op{t: tRange, key: []byte(key)}
|
ret := Op{t: tRange, key: []byte(key)}
|
||||||
ret.applyOpts(opts)
|
ret.applyOpts(opts)
|
||||||
|
@ -16,11 +16,10 @@
|
|||||||
package v3rpc
|
package v3rpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
"github.com/coreos/etcd/pkg/adt"
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
@ -89,6 +88,13 @@ func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse,
|
|||||||
if err := checkTxnRequest(r, int(s.maxTxnOps)); err != nil {
|
if err := checkTxnRequest(r, int(s.maxTxnOps)); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
// check for forbidden put/del overlaps after checking request to avoid quadratic blowup
|
||||||
|
if _, _, err := checkIntervals(r.Success); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if _, _, err := checkIntervals(r.Failure); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := s.kv.Txn(ctx, r)
|
resp, err := s.kv.Txn(ctx, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -137,7 +143,14 @@ func checkDeleteRequest(r *pb.DeleteRangeRequest) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error {
|
func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error {
|
||||||
if len(r.Compare) > maxTxnOps || len(r.Success) > maxTxnOps || len(r.Failure) > maxTxnOps {
|
opc := len(r.Compare)
|
||||||
|
if opc < len(r.Success) {
|
||||||
|
opc = len(r.Success)
|
||||||
|
}
|
||||||
|
if opc < len(r.Failure) {
|
||||||
|
opc = len(r.Failure)
|
||||||
|
}
|
||||||
|
if opc > maxTxnOps {
|
||||||
return rpctypes.ErrGRPCTooManyOps
|
return rpctypes.ErrGRPCTooManyOps
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,58 +159,29 @@ func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error {
|
|||||||
return rpctypes.ErrGRPCEmptyKey
|
return rpctypes.ErrGRPCEmptyKey
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, u := range r.Success {
|
for _, u := range r.Success {
|
||||||
if err := checkRequestOp(u); err != nil {
|
if err := checkRequestOp(u, maxTxnOps-opc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := checkRequestDupKeys(r.Success); err != nil {
|
for _, u := range r.Failure {
|
||||||
|
if err := checkRequestOp(u, maxTxnOps-opc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, u := range r.Failure {
|
return nil
|
||||||
if err := checkRequestOp(u); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return checkRequestDupKeys(r.Failure)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkRequestDupKeys gives rpctypes.ErrGRPCDuplicateKey if the same key is modified twice
|
// checkIntervals tests whether puts and deletes overlap for a list of ops. If
|
||||||
func checkRequestDupKeys(reqs []*pb.RequestOp) error {
|
// there is an overlap, returns an error. If no overlap, return put and delete
|
||||||
// check put overlap
|
// sets for recursive evaluation.
|
||||||
keys := make(map[string]struct{})
|
func checkIntervals(reqs []*pb.RequestOp) (map[string]struct{}, adt.IntervalTree, error) {
|
||||||
for _, requ := range reqs {
|
var dels adt.IntervalTree
|
||||||
tv, ok := requ.Request.(*pb.RequestOp_RequestPut)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
preq := tv.RequestPut
|
|
||||||
if preq == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, ok := keys[string(preq.Key)]; ok {
|
|
||||||
return rpctypes.ErrGRPCDuplicateKey
|
|
||||||
}
|
|
||||||
keys[string(preq.Key)] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// no need to check deletes if no puts; delete overlaps are permitted
|
// collect deletes from this level; build first to check lower level overlapped puts
|
||||||
if len(keys) == 0 {
|
for _, req := range reqs {
|
||||||
return nil
|
tv, ok := req.Request.(*pb.RequestOp_RequestDeleteRange)
|
||||||
}
|
|
||||||
|
|
||||||
// sort keys for range checking
|
|
||||||
sortedKeys := []string{}
|
|
||||||
for k := range keys {
|
|
||||||
sortedKeys = append(sortedKeys, k)
|
|
||||||
}
|
|
||||||
sort.Strings(sortedKeys)
|
|
||||||
|
|
||||||
// check put overlap with deletes
|
|
||||||
for _, requ := range reqs {
|
|
||||||
tv, ok := requ.Request.(*pb.RequestOp_RequestDeleteRange)
|
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -205,41 +189,87 @@ func checkRequestDupKeys(reqs []*pb.RequestOp) error {
|
|||||||
if dreq == nil {
|
if dreq == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if dreq.RangeEnd == nil {
|
var iv adt.Interval
|
||||||
if _, found := keys[string(dreq.Key)]; found {
|
if len(dreq.RangeEnd) != 0 {
|
||||||
return rpctypes.ErrGRPCDuplicateKey
|
iv = adt.NewStringAffineInterval(string(dreq.Key), string(dreq.RangeEnd))
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
lo := sort.SearchStrings(sortedKeys, string(dreq.Key))
|
iv = adt.NewStringAffinePoint(string(dreq.Key))
|
||||||
hi := sort.SearchStrings(sortedKeys, string(dreq.RangeEnd))
|
|
||||||
if lo != hi {
|
|
||||||
// element between lo and hi => overlap
|
|
||||||
return rpctypes.ErrGRPCDuplicateKey
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
dels.Insert(iv, struct{}{})
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
// collect children puts/deletes
|
||||||
|
puts := make(map[string]struct{})
|
||||||
|
for _, req := range reqs {
|
||||||
|
tv, ok := req.Request.(*pb.RequestOp_RequestTxn)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
putsThen, delsThen, err := checkIntervals(tv.RequestTxn.Success)
|
||||||
|
if err != nil {
|
||||||
|
return nil, dels, err
|
||||||
|
}
|
||||||
|
putsElse, delsElse, err := checkIntervals(tv.RequestTxn.Failure)
|
||||||
|
if err != nil {
|
||||||
|
return nil, dels, err
|
||||||
|
}
|
||||||
|
for k := range putsThen {
|
||||||
|
if _, ok := puts[k]; ok {
|
||||||
|
return nil, dels, rpctypes.ErrGRPCDuplicateKey
|
||||||
|
}
|
||||||
|
if dels.Intersects(adt.NewStringAffinePoint(k)) {
|
||||||
|
return nil, dels, rpctypes.ErrGRPCDuplicateKey
|
||||||
|
}
|
||||||
|
puts[k] = struct{}{}
|
||||||
|
}
|
||||||
|
for k := range putsElse {
|
||||||
|
if _, ok := puts[k]; ok {
|
||||||
|
// if key is from putsThen, overlap is OK since
|
||||||
|
// either then/else are mutually exclusive
|
||||||
|
if _, isSafe := putsThen[k]; !isSafe {
|
||||||
|
return nil, dels, rpctypes.ErrGRPCDuplicateKey
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if dels.Intersects(adt.NewStringAffinePoint(k)) {
|
||||||
|
return nil, dels, rpctypes.ErrGRPCDuplicateKey
|
||||||
|
}
|
||||||
|
puts[k] = struct{}{}
|
||||||
|
}
|
||||||
|
dels.Union(delsThen, adt.NewStringAffineInterval("\x00", ""))
|
||||||
|
dels.Union(delsElse, adt.NewStringAffineInterval("\x00", ""))
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect and check this level's puts
|
||||||
|
for _, req := range reqs {
|
||||||
|
tv, ok := req.Request.(*pb.RequestOp_RequestPut)
|
||||||
|
if !ok || tv.RequestPut == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
k := string(tv.RequestPut.Key)
|
||||||
|
if _, ok := puts[k]; ok {
|
||||||
|
return nil, dels, rpctypes.ErrGRPCDuplicateKey
|
||||||
|
}
|
||||||
|
if dels.Intersects(adt.NewStringAffinePoint(k)) {
|
||||||
|
return nil, dels, rpctypes.ErrGRPCDuplicateKey
|
||||||
|
}
|
||||||
|
puts[k] = struct{}{}
|
||||||
|
}
|
||||||
|
return puts, dels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkRequestOp(u *pb.RequestOp) error {
|
func checkRequestOp(u *pb.RequestOp, maxTxnOps int) error {
|
||||||
// TODO: ensure only one of the field is set.
|
// TODO: ensure only one of the field is set.
|
||||||
switch uv := u.Request.(type) {
|
switch uv := u.Request.(type) {
|
||||||
case *pb.RequestOp_RequestRange:
|
case *pb.RequestOp_RequestRange:
|
||||||
if uv.RequestRange != nil {
|
|
||||||
return checkRangeRequest(uv.RequestRange)
|
return checkRangeRequest(uv.RequestRange)
|
||||||
}
|
|
||||||
case *pb.RequestOp_RequestPut:
|
case *pb.RequestOp_RequestPut:
|
||||||
if uv.RequestPut != nil {
|
|
||||||
return checkPutRequest(uv.RequestPut)
|
return checkPutRequest(uv.RequestPut)
|
||||||
}
|
|
||||||
case *pb.RequestOp_RequestDeleteRange:
|
case *pb.RequestOp_RequestDeleteRange:
|
||||||
if uv.RequestDeleteRange != nil {
|
|
||||||
return checkDeleteRequest(uv.RequestDeleteRange)
|
return checkDeleteRequest(uv.RequestDeleteRange)
|
||||||
}
|
case *pb.RequestOp_RequestTxn:
|
||||||
|
return checkTxnRequest(uv.RequestTxn, maxTxnOps)
|
||||||
default:
|
default:
|
||||||
// empty op / nil entry
|
// empty op / nil entry
|
||||||
return rpctypes.ErrGRPCKeyNotFound
|
return rpctypes.ErrGRPCKeyNotFound
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -76,14 +76,30 @@ type applierV3 interface {
|
|||||||
RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
|
RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
|
||||||
|
|
||||||
type applierV3backend struct {
|
type applierV3backend struct {
|
||||||
s *EtcdServer
|
s *EtcdServer
|
||||||
|
|
||||||
|
checkPut checkReqFunc
|
||||||
|
checkRange checkReqFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *EtcdServer) newApplierV3Backend() applierV3 {
|
||||||
|
base := &applierV3backend{s: s}
|
||||||
|
base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error {
|
||||||
|
return base.checkRequestPut(rv, req)
|
||||||
|
}
|
||||||
|
base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error {
|
||||||
|
return base.checkRequestRange(rv, req)
|
||||||
|
}
|
||||||
|
return base
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) newApplierV3() applierV3 {
|
func (s *EtcdServer) newApplierV3() applierV3 {
|
||||||
return newAuthApplierV3(
|
return newAuthApplierV3(
|
||||||
s.AuthStore(),
|
s.AuthStore(),
|
||||||
newQuotaApplierV3(s, &applierV3backend{s}),
|
newQuotaApplierV3(s, s.newApplierV3Backend()),
|
||||||
s.lessor,
|
s.lessor,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@ -315,24 +331,19 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|||||||
isWrite := !isTxnReadonly(rt)
|
isWrite := !isTxnReadonly(rt)
|
||||||
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
|
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
|
||||||
|
|
||||||
reqs, ok := a.compareToOps(txn, rt)
|
txnPath := compareToPath(txn, rt)
|
||||||
if isWrite {
|
if isWrite {
|
||||||
if err := a.checkRequestPut(txn, reqs); err != nil {
|
if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil {
|
||||||
txn.End()
|
txn.End()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := checkRequestRange(txn, reqs); err != nil {
|
if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil {
|
||||||
txn.End()
|
txn.End()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resps := make([]*pb.ResponseOp, len(reqs))
|
txnResp, _ := newTxnResp(rt, txnPath)
|
||||||
txnResp := &pb.TxnResponse{
|
|
||||||
Responses: resps,
|
|
||||||
Succeeded: ok,
|
|
||||||
Header: &pb.ResponseHeader{},
|
|
||||||
}
|
|
||||||
|
|
||||||
// When executing mutable txn ops, etcd must hold the txn lock so
|
// When executing mutable txn ops, etcd must hold the txn lock so
|
||||||
// readers do not see any intermediate results. Since writes are
|
// readers do not see any intermediate results. Since writes are
|
||||||
@ -342,9 +353,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|||||||
txn.End()
|
txn.End()
|
||||||
txn = a.s.KV().Write()
|
txn = a.s.KV().Write()
|
||||||
}
|
}
|
||||||
for i := range reqs {
|
a.applyTxn(txn, rt, txnPath, txnResp)
|
||||||
resps[i] = a.applyUnion(txn, reqs[i])
|
|
||||||
}
|
|
||||||
rev := txn.Rev()
|
rev := txn.Rev()
|
||||||
if len(txn.Changes()) != 0 {
|
if len(txn.Changes()) != 0 {
|
||||||
rev++
|
rev++
|
||||||
@ -355,13 +364,60 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|||||||
return txnResp, nil
|
return txnResp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) compareToOps(rv mvcc.ReadView, rt *pb.TxnRequest) ([]*pb.RequestOp, bool) {
|
// newTxnResp allocates a txn response for a txn request given a path.
|
||||||
for _, c := range rt.Compare {
|
func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) {
|
||||||
|
reqs := rt.Success
|
||||||
|
if !txnPath[0] {
|
||||||
|
reqs = rt.Failure
|
||||||
|
}
|
||||||
|
resps := make([]*pb.ResponseOp, len(reqs))
|
||||||
|
txnResp = &pb.TxnResponse{
|
||||||
|
Responses: resps,
|
||||||
|
Succeeded: txnPath[0],
|
||||||
|
Header: &pb.ResponseHeader{},
|
||||||
|
}
|
||||||
|
for i, req := range reqs {
|
||||||
|
switch tv := req.Request.(type) {
|
||||||
|
case *pb.RequestOp_RequestRange:
|
||||||
|
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}}
|
||||||
|
case *pb.RequestOp_RequestPut:
|
||||||
|
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}}
|
||||||
|
case *pb.RequestOp_RequestDeleteRange:
|
||||||
|
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}}
|
||||||
|
case *pb.RequestOp_RequestTxn:
|
||||||
|
resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:])
|
||||||
|
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}}
|
||||||
|
txnPath = txnPath[1+txns:]
|
||||||
|
txnCount += txns + 1
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return txnResp, txnCount
|
||||||
|
}
|
||||||
|
|
||||||
|
func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
|
||||||
|
txnPath := make([]bool, 1)
|
||||||
|
ops := rt.Success
|
||||||
|
if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
|
||||||
|
ops = rt.Failure
|
||||||
|
}
|
||||||
|
for _, op := range ops {
|
||||||
|
tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
|
||||||
|
if !ok || tv.RequestTxn == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
|
||||||
|
}
|
||||||
|
return txnPath
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
|
||||||
|
for _, c := range cmps {
|
||||||
if !applyCompare(rv, c) {
|
if !applyCompare(rv, c) {
|
||||||
return rt.Failure, false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rt.Success, true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// applyCompare applies the compare request.
|
// applyCompare applies the compare request.
|
||||||
@ -431,38 +487,42 @@ func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp {
|
func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
|
||||||
switch tv := union.Request.(type) {
|
reqs := rt.Success
|
||||||
|
if !txnPath[0] {
|
||||||
|
reqs = rt.Failure
|
||||||
|
}
|
||||||
|
for i, req := range reqs {
|
||||||
|
respi := tresp.Responses[i].Response
|
||||||
|
switch tv := req.Request.(type) {
|
||||||
case *pb.RequestOp_RequestRange:
|
case *pb.RequestOp_RequestRange:
|
||||||
if tv.RequestRange != nil {
|
|
||||||
resp, err := a.Range(txn, tv.RequestRange)
|
resp, err := a.Range(txn, tv.RequestRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Panicf("unexpected error during txn: %v", err)
|
plog.Panicf("unexpected error during txn: %v", err)
|
||||||
}
|
}
|
||||||
return &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{ResponseRange: resp}}
|
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
|
||||||
}
|
|
||||||
case *pb.RequestOp_RequestPut:
|
case *pb.RequestOp_RequestPut:
|
||||||
if tv.RequestPut != nil {
|
|
||||||
resp, err := a.Put(txn, tv.RequestPut)
|
resp, err := a.Put(txn, tv.RequestPut)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Panicf("unexpected error during txn: %v", err)
|
plog.Panicf("unexpected error during txn: %v", err)
|
||||||
}
|
}
|
||||||
return &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{ResponsePut: resp}}
|
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
|
||||||
}
|
|
||||||
case *pb.RequestOp_RequestDeleteRange:
|
case *pb.RequestOp_RequestDeleteRange:
|
||||||
if tv.RequestDeleteRange != nil {
|
|
||||||
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
|
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Panicf("unexpected error during txn: %v", err)
|
plog.Panicf("unexpected error during txn: %v", err)
|
||||||
}
|
}
|
||||||
return &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{ResponseDeleteRange: resp}}
|
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
|
||||||
}
|
case *pb.RequestOp_RequestTxn:
|
||||||
|
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
|
||||||
|
applyTxns := a.applyTxn(txn, tv.RequestTxn, txnPath[1:], resp)
|
||||||
|
txns += applyTxns + 1
|
||||||
|
txnPath = txnPath[applyTxns+1:]
|
||||||
default:
|
default:
|
||||||
// empty union
|
// empty union
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
return nil
|
}
|
||||||
|
return txns
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
|
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
|
||||||
@ -768,19 +828,38 @@ func (s *kvSortByValue) Less(i, j int) bool {
|
|||||||
return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
|
return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
|
func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
|
||||||
for _, requ := range reqs {
|
txnCount := 0
|
||||||
tv, ok := requ.Request.(*pb.RequestOp_RequestPut)
|
reqs := rt.Success
|
||||||
if !ok {
|
if !txnPath[0] {
|
||||||
|
reqs = rt.Failure
|
||||||
|
}
|
||||||
|
for _, req := range reqs {
|
||||||
|
if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
|
||||||
|
txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
txnCount += txns + 1
|
||||||
|
txnPath = txnPath[txns+1:]
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
preq := tv.RequestPut
|
if err := f(rv, req); err != nil {
|
||||||
if preq == nil {
|
return 0, err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
if preq.IgnoreValue || preq.IgnoreLease {
|
}
|
||||||
|
return txnCount, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
|
||||||
|
tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
|
||||||
|
if !ok || tv.RequestPut == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
req := tv.RequestPut
|
||||||
|
if req.IgnoreValue || req.IgnoreLease {
|
||||||
// expects previous key-value, error if not exist
|
// expects previous key-value, error if not exist
|
||||||
rr, err := rv.Range(preq.Key, nil, mvcc.RangeOptions{})
|
rr, err := rv.Range(req.Key, nil, mvcc.RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -788,34 +867,28 @@ func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqs []*pb.RequestO
|
|||||||
return ErrKeyNotFound
|
return ErrKeyNotFound
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if lease.LeaseID(preq.Lease) == lease.NoLease {
|
if lease.LeaseID(req.Lease) != lease.NoLease {
|
||||||
continue
|
if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
|
||||||
}
|
|
||||||
if l := a.s.lessor.Lookup(lease.LeaseID(preq.Lease)); l == nil {
|
|
||||||
return lease.ErrLeaseNotFound
|
return lease.ErrLeaseNotFound
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkRequestRange(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
|
func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
|
||||||
for _, requ := range reqs {
|
tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
|
||||||
tv, ok := requ.Request.(*pb.RequestOp_RequestRange)
|
if !ok || tv.RequestRange == nil {
|
||||||
if !ok {
|
return nil
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
greq := tv.RequestRange
|
req := tv.RequestRange
|
||||||
if greq == nil || greq.Revision == 0 {
|
switch {
|
||||||
continue
|
case req.Revision == 0:
|
||||||
}
|
return nil
|
||||||
|
case req.Revision > rv.Rev():
|
||||||
if greq.Revision > rv.Rev() {
|
|
||||||
return mvcc.ErrFutureRev
|
return mvcc.ErrFutureRev
|
||||||
}
|
case req.Revision < rv.FirstRev():
|
||||||
if greq.Revision < rv.FirstRev() {
|
|
||||||
return mvcc.ErrCompacted
|
return mvcc.ErrCompacted
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -469,6 +469,7 @@ message RequestOp {
|
|||||||
RangeRequest request_range = 1;
|
RangeRequest request_range = 1;
|
||||||
PutRequest request_put = 2;
|
PutRequest request_put = 2;
|
||||||
DeleteRangeRequest request_delete_range = 3;
|
DeleteRangeRequest request_delete_range = 3;
|
||||||
|
TxnRequest request_txn = 4;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -478,6 +479,7 @@ message ResponseOp {
|
|||||||
RangeResponse response_range = 1;
|
RangeResponse response_range = 1;
|
||||||
PutResponse response_put = 2;
|
PutResponse response_put = 2;
|
||||||
DeleteRangeResponse response_delete_range = 3;
|
DeleteRangeResponse response_delete_range = 3;
|
||||||
|
TxnResponse response_txn = 4;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,7 +474,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
srv.compactor.Run()
|
srv.compactor.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
srv.applyV3Base = &applierV3backend{srv}
|
srv.applyV3Base = srv.newApplierV3Backend()
|
||||||
if err = srv.restoreAlarms(); err != nil {
|
if err = srv.restoreAlarms(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -192,11 +192,22 @@ func TestV3TxnTooManyOps(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
addTxnOps := func(txn *pb.TxnRequest) {
|
||||||
|
newTxn := &pb.TxnRequest{}
|
||||||
|
addSuccessOps(newTxn)
|
||||||
|
txn.Success = append(txn.Success,
|
||||||
|
&pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
|
||||||
|
RequestTxn: newTxn,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
tests := []func(txn *pb.TxnRequest){
|
tests := []func(txn *pb.TxnRequest){
|
||||||
addCompareOps,
|
addCompareOps,
|
||||||
addSuccessOps,
|
addSuccessOps,
|
||||||
addFailureOps,
|
addFailureOps,
|
||||||
|
addTxnOps,
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
@ -236,6 +247,27 @@ func TestV3TxnDuplicateKeys(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
txnDelReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
|
||||||
|
RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{delInRangeReq}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
txnDelReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
|
||||||
|
RequestTxn: &pb.TxnRequest{
|
||||||
|
Success: []*pb.RequestOp{delInRangeReq},
|
||||||
|
Failure: []*pb.RequestOp{delInRangeReq}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
txnPutReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
|
||||||
|
RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{putreq}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
txnPutReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
|
||||||
|
RequestTxn: &pb.TxnRequest{
|
||||||
|
Success: []*pb.RequestOp{putreq},
|
||||||
|
Failure: []*pb.RequestOp{putreq}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
kvc := toGRPC(clus.RandClient()).KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -258,6 +290,36 @@ func TestV3TxnDuplicateKeys(t *testing.T) {
|
|||||||
|
|
||||||
werr: rpctypes.ErrGRPCDuplicateKey,
|
werr: rpctypes.ErrGRPCDuplicateKey,
|
||||||
},
|
},
|
||||||
|
// Then(Put(a), Then(Del(a)))
|
||||||
|
{
|
||||||
|
txnSuccess: []*pb.RequestOp{putreq, txnDelReq},
|
||||||
|
|
||||||
|
werr: rpctypes.ErrGRPCDuplicateKey,
|
||||||
|
},
|
||||||
|
// Then(Del(a), Then(Put(a)))
|
||||||
|
{
|
||||||
|
txnSuccess: []*pb.RequestOp{delInRangeReq, txnPutReq},
|
||||||
|
|
||||||
|
werr: rpctypes.ErrGRPCDuplicateKey,
|
||||||
|
},
|
||||||
|
// Then((Then(Put(a)), Else(Put(a))), (Then(Put(a)), Else(Put(a)))
|
||||||
|
{
|
||||||
|
txnSuccess: []*pb.RequestOp{txnPutReqTwoSide, txnPutReqTwoSide},
|
||||||
|
|
||||||
|
werr: rpctypes.ErrGRPCDuplicateKey,
|
||||||
|
},
|
||||||
|
// Then(Del(x), (Then(Put(a)), Else(Put(a))))
|
||||||
|
{
|
||||||
|
txnSuccess: []*pb.RequestOp{delOutOfRangeReq, txnPutReqTwoSide},
|
||||||
|
|
||||||
|
werr: nil,
|
||||||
|
},
|
||||||
|
// Then(Then(Del(a)), (Then(Del(a)), Else(Del(a))))
|
||||||
|
{
|
||||||
|
txnSuccess: []*pb.RequestOp{txnDelReq, txnDelReqTwoSide},
|
||||||
|
|
||||||
|
werr: nil,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
|
txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
|
||||||
|
|
||||||
@ -469,6 +531,59 @@ func TestV3TxnRangeCompare(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestV3TxnNested tests nested txns follow paths as expected.
|
||||||
|
func TestV3TxnNestedPath(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
|
|
||||||
|
cmpTrue := &pb.Compare{
|
||||||
|
Result: pb.Compare_EQUAL,
|
||||||
|
Target: pb.Compare_VERSION,
|
||||||
|
Key: []byte("k"),
|
||||||
|
TargetUnion: &pb.Compare_Version{Version: int64(0)},
|
||||||
|
}
|
||||||
|
cmpFalse := &pb.Compare{
|
||||||
|
Result: pb.Compare_EQUAL,
|
||||||
|
Target: pb.Compare_VERSION,
|
||||||
|
Key: []byte("k"),
|
||||||
|
TargetUnion: &pb.Compare_Version{Version: int64(1)},
|
||||||
|
}
|
||||||
|
|
||||||
|
// generate random path to eval txns
|
||||||
|
topTxn := &pb.TxnRequest{}
|
||||||
|
txn := topTxn
|
||||||
|
txnPath := make([]bool, 10)
|
||||||
|
for i := range txnPath {
|
||||||
|
nextTxn := &pb.TxnRequest{}
|
||||||
|
op := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: nextTxn}}
|
||||||
|
txnPath[i] = rand.Intn(2) == 0
|
||||||
|
if txnPath[i] {
|
||||||
|
txn.Compare = append(txn.Compare, cmpTrue)
|
||||||
|
txn.Success = append(txn.Success, op)
|
||||||
|
} else {
|
||||||
|
txn.Compare = append(txn.Compare, cmpFalse)
|
||||||
|
txn.Failure = append(txn.Failure, op)
|
||||||
|
}
|
||||||
|
txn = nextTxn
|
||||||
|
}
|
||||||
|
|
||||||
|
tresp, err := kvc.Txn(context.TODO(), topTxn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
curTxnResp := tresp
|
||||||
|
for i := range txnPath {
|
||||||
|
if curTxnResp.Succeeded != txnPath[i] {
|
||||||
|
t.Fatalf("expected path %+v, got response %+v", txnPath, *tresp)
|
||||||
|
}
|
||||||
|
curTxnResp = curTxnResp.Responses[0].Response.(*pb.ResponseOp_ResponseTxn).ResponseTxn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
|
// TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
|
||||||
func TestV3PutIgnoreValue(t *testing.T) {
|
func TestV3PutIgnoreValue(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
|
@ -485,6 +485,15 @@ func (ivt *IntervalTree) Stab(iv Interval) (ivs []*IntervalValue) {
|
|||||||
return ivs
|
return ivs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Union merges a given interval tree into the receiver.
|
||||||
|
func (ivt *IntervalTree) Union(inIvt IntervalTree, ivl Interval) {
|
||||||
|
f := func(n *IntervalValue) bool {
|
||||||
|
ivt.Insert(n.Ivl, n.Val)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
inIvt.Visit(ivl, f)
|
||||||
|
}
|
||||||
|
|
||||||
type StringComparable string
|
type StringComparable string
|
||||||
|
|
||||||
func (s StringComparable) Compare(c Comparable) int {
|
func (s StringComparable) Compare(c Comparable) int {
|
||||||
|
@ -99,28 +99,13 @@ func (p *kvProxy) txnToCache(reqs []*pb.RequestOp, resps []*pb.ResponseOp) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||||
txn := p.kv.Txn(ctx)
|
op := TxnRequestToOp(r)
|
||||||
cmps := make([]clientv3.Cmp, len(r.Compare))
|
opResp, err := p.kv.Do(ctx, op)
|
||||||
thenops := make([]clientv3.Op, len(r.Success))
|
|
||||||
elseops := make([]clientv3.Op, len(r.Failure))
|
|
||||||
|
|
||||||
for i := range r.Compare {
|
|
||||||
cmps[i] = (clientv3.Cmp)(*r.Compare[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range r.Success {
|
|
||||||
thenops[i] = requestOpToOp(r.Success[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range r.Failure {
|
|
||||||
elseops[i] = requestOpToOp(r.Failure[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := txn.If(cmps...).Then(thenops...).Else(elseops...).Commit()
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
resp := opResp.Txn()
|
||||||
|
|
||||||
// txn may claim an outdated key is updated; be safe and invalidate
|
// txn may claim an outdated key is updated; be safe and invalidate
|
||||||
for _, cmp := range r.Compare {
|
for _, cmp := range r.Compare {
|
||||||
p.cache.Invalidate(cmp.Key, cmp.RangeEnd)
|
p.cache.Invalidate(cmp.Key, cmp.RangeEnd)
|
||||||
@ -167,6 +152,10 @@ func requestOpToOp(union *pb.RequestOp) clientv3.Op {
|
|||||||
if tv.RequestDeleteRange != nil {
|
if tv.RequestDeleteRange != nil {
|
||||||
return DelRequestToOp(tv.RequestDeleteRange)
|
return DelRequestToOp(tv.RequestDeleteRange)
|
||||||
}
|
}
|
||||||
|
case *pb.RequestOp_RequestTxn:
|
||||||
|
if tv.RequestTxn != nil {
|
||||||
|
return TxnRequestToOp(tv.RequestTxn)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
panic("unknown request")
|
panic("unknown request")
|
||||||
}
|
}
|
||||||
@ -219,3 +208,19 @@ func DelRequestToOp(r *pb.DeleteRangeRequest) clientv3.Op {
|
|||||||
}
|
}
|
||||||
return clientv3.OpDelete(string(r.Key), opts...)
|
return clientv3.OpDelete(string(r.Key), opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TxnRequestToOp(r *pb.TxnRequest) clientv3.Op {
|
||||||
|
cmps := make([]clientv3.Cmp, len(r.Compare))
|
||||||
|
thenops := make([]clientv3.Op, len(r.Success))
|
||||||
|
elseops := make([]clientv3.Op, len(r.Failure))
|
||||||
|
for i := range r.Compare {
|
||||||
|
cmps[i] = (clientv3.Cmp)(*r.Compare[i])
|
||||||
|
}
|
||||||
|
for i := range r.Success {
|
||||||
|
thenops[i] = requestOpToOp(r.Success[i])
|
||||||
|
}
|
||||||
|
for i := range r.Failure {
|
||||||
|
elseops[i] = requestOpToOp(r.Failure[i])
|
||||||
|
}
|
||||||
|
return clientv3.OpTxn(cmps, thenops, elseops)
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user