Merge pull request #4376 from heyitsanthony/txn-no-duplicate-putkey

etcdserver: reject v3 txns with duplicate put keys
This commit is contained in:
Anthony Romano 2016-02-02 13:17:47 -08:00
commit 71e000de65
3 changed files with 145 additions and 3 deletions

View File

@ -23,6 +23,7 @@ import (
var (
ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided")
ErrTooManyOps = grpc.Errorf(codes.InvalidArgument, "too many operations in txn request")
ErrDuplicateKey = grpc.Errorf(codes.InvalidArgument, "duplicate key given in txn request")
ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error())
ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error())
ErrLeaseNotFound = grpc.Errorf(codes.NotFound, "requested lease not found")

View File

@ -16,6 +16,8 @@
package v3rpc
import (
"sort"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
@ -176,12 +178,78 @@ func checkTxnRequest(r *pb.TxnRequest) error {
return err
}
}
if err := checkRequestDupKeys(r.Success); err != nil {
return err
}
for _, u := range r.Failure {
if err := checkRequestUnion(u); err != nil {
return err
}
}
if err := checkRequestDupKeys(r.Failure); err != nil {
return err
}
return nil
}
// checkRequestDupKeys gives ErrDuplicateKey if the same key is modified twice
func checkRequestDupKeys(reqs []*pb.RequestUnion) error {
// check put overlap
keys := make(map[string]struct{})
for _, requ := range reqs {
tv, ok := requ.Request.(*pb.RequestUnion_RequestPut)
if !ok {
continue
}
preq := tv.RequestPut
if preq == nil {
continue
}
key := string(preq.Key)
if _, ok := keys[key]; ok {
return ErrDuplicateKey
}
keys[key] = struct{}{}
}
// no need to check deletes if no puts; delete overlaps are permitted
if len(keys) == 0 {
return nil
}
// sort keys for range checking
sortedKeys := []string{}
for k := range keys {
sortedKeys = append(sortedKeys, k)
}
sort.Strings(sortedKeys)
// check put overlap with deletes
for _, requ := range reqs {
tv, ok := requ.Request.(*pb.RequestUnion_RequestDeleteRange)
if !ok {
continue
}
dreq := tv.RequestDeleteRange
if dreq == nil {
continue
}
key := string(dreq.Key)
if dreq.RangeEnd == nil {
if _, found := keys[key]; found {
return ErrDuplicateKey
}
} else {
lo := sort.SearchStrings(sortedKeys, key)
hi := sort.SearchStrings(sortedKeys, string(dreq.RangeEnd))
if lo != hi {
// element between lo and hi => overlap
return ErrDuplicateKey
}
}
}
return nil
}

View File

@ -84,12 +84,19 @@ func TestV3TxnTooManyOps(t *testing.T) {
kvc := clus.RandClient().KV
// unique keys
i := new(int)
keyf := func() []byte {
*i++
return []byte(fmt.Sprintf("key-%d", i))
}
addCompareOps := func(txn *pb.TxnRequest) {
txn.Compare = append(txn.Compare,
&pb.Compare{
Result: pb.Compare_GREATER,
Target: pb.Compare_CREATE,
Key: []byte("bar"),
Key: keyf(),
})
}
addSuccessOps := func(txn *pb.TxnRequest) {
@ -97,7 +104,7 @@ func TestV3TxnTooManyOps(t *testing.T) {
&pb.RequestUnion{
Request: &pb.RequestUnion_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("bar"),
Key: keyf(),
Value: []byte("bar"),
},
},
@ -108,7 +115,7 @@ func TestV3TxnTooManyOps(t *testing.T) {
&pb.RequestUnion{
Request: &pb.RequestUnion_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("bar"),
Key: keyf(),
Value: []byte("bar"),
},
},
@ -134,6 +141,72 @@ func TestV3TxnTooManyOps(t *testing.T) {
}
}
func TestV3TxnDuplicateKeys(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
putreq := &pb.RequestUnion{Request: &pb.RequestUnion_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
delKeyReq := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte("abc"),
},
},
}
delInRangeReq := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte("a"), RangeEnd: []byte("b"),
},
},
}
delOutOfRangeReq := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte("abb"), RangeEnd: []byte("abc"),
},
},
}
kvc := clus.RandClient().KV
tests := []struct {
txnSuccess []*pb.RequestUnion
werr error
}{
{
txnSuccess: []*pb.RequestUnion{putreq, putreq},
werr: v3rpc.ErrDuplicateKey,
},
{
txnSuccess: []*pb.RequestUnion{putreq, delKeyReq},
werr: v3rpc.ErrDuplicateKey,
},
{
txnSuccess: []*pb.RequestUnion{putreq, delInRangeReq},
werr: v3rpc.ErrDuplicateKey,
},
{
txnSuccess: []*pb.RequestUnion{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
werr: nil,
},
{
txnSuccess: []*pb.RequestUnion{putreq, delOutOfRangeReq},
werr: nil,
},
}
for i, tt := range tests {
txn := &pb.TxnRequest{Success: tt.txnSuccess}
_, err := kvc.Txn(context.Background(), txn)
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}
}
}
// TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
func TestV3PutMissingLease(t *testing.T) {
defer testutil.AfterTest(t)