etcdserver: fix checkIntervals will not validate correctly

Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>
This commit is contained in:
bsbds 2023-08-09 19:08:18 +08:00
parent 43f10cbd57
commit 91b830d350
No known key found for this signature in database
GPG Key ID: 0700493FC8E7881B
2 changed files with 206 additions and 18 deletions

View File

@ -182,6 +182,7 @@ func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error {
// there is an overlap, returns an error. If no overlap, return put and delete // there is an overlap, returns an error. If no overlap, return put and delete
// sets for recursive evaluation. // sets for recursive evaluation.
func checkIntervals(reqs []*pb.RequestOp) (map[string]struct{}, adt.IntervalTree, error) { func checkIntervals(reqs []*pb.RequestOp) (map[string]struct{}, adt.IntervalTree, error) {
puts := make(map[string]struct{})
dels := adt.NewIntervalTree() dels := adt.NewIntervalTree()
// collect deletes from this level; build first to check lower level overlapped puts // collect deletes from this level; build first to check lower level overlapped puts
@ -203,8 +204,13 @@ func checkIntervals(reqs []*pb.RequestOp) (map[string]struct{}, adt.IntervalTree
dels.Insert(iv, struct{}{}) dels.Insert(iv, struct{}{})
} }
// collect children puts/deletes type Txn struct {
puts := make(map[string]struct{}) Puts map[string]struct{}
Dels adt.IntervalTree
}
var txns []Txn
// for each txn, collect all puts and deletes of children level
for _, req := range reqs { for _, req := range reqs {
tv, ok := req.Request.(*pb.RequestOp_RequestTxn) tv, ok := req.Request.(*pb.RequestOp_RequestTxn)
if !ok { if !ok {
@ -218,33 +224,53 @@ func checkIntervals(reqs []*pb.RequestOp) (map[string]struct{}, adt.IntervalTree
if err != nil { if err != nil {
return nil, dels, err return nil, dels, err
} }
for k := range putsThen {
if _, ok := puts[k]; ok { delsThen.Union(delsElse, adt.NewStringAffineInterval("\x00", ""))
return nil, dels, rpctypes.ErrGRPCDuplicateKey
}
if dels.Intersects(adt.NewStringAffinePoint(k)) {
return nil, dels, rpctypes.ErrGRPCDuplicateKey
}
puts[k] = struct{}{}
}
for k := range putsElse { for k := range putsElse {
if _, ok := puts[k]; ok { putsThen[k] = struct{}{}
// if key is from putsThen, overlap is OK since
// either then/else are mutually exclusive
if _, isSafe := putsThen[k]; !isSafe {
return nil, dels, rpctypes.ErrGRPCDuplicateKey
} }
} txns = append(txns, Txn{Puts: putsThen, Dels: delsThen})
if dels.Intersects(adt.NewStringAffinePoint(k)) {
return nil, dels, rpctypes.ErrGRPCDuplicateKey
}
puts[k] = struct{}{}
}
dels.Union(delsThen, adt.NewStringAffineInterval("\x00", ""))
dels.Union(delsElse, adt.NewStringAffineInterval("\x00", ""))
} }
// collect and check this level's puts // 1. puts in a txn's children level should not overlap with deletes in another txn's children level
//
// Note: This operation is necessary despite the relatively high time complexity.
// The rationale behind this is that for a single txn, its success and failure branches can safely overlap,
// which means that we cannot simply test overlapping using a union all deletes.
for i, txn_x := range txns {
for _, txn_y := range txns[i+1:] {
for k := range txn_x.Puts {
if txn_y.Dels.Intersects(adt.NewStringAffinePoint(k)) {
return nil, dels, rpctypes.ErrGRPCDuplicateKey
}
}
for k := range txn_y.Puts {
if txn_x.Dels.Intersects(adt.NewStringAffinePoint(k)) {
return nil, dels, rpctypes.ErrGRPCDuplicateKey
}
}
}
}
// 2. puts in a txn's children level also should not overlap with deletes in current level
for _, txn := range txns {
for k := range txn.Puts {
if _, ok := puts[k]; ok {
return nil, dels, rpctypes.ErrGRPCDuplicateKey
}
if dels.Intersects(adt.NewStringAffinePoint(k)) {
return nil, dels, rpctypes.ErrGRPCDuplicateKey
}
puts[k] = struct{}{}
}
}
for _, txn := range txns {
dels.Union(txn.Dels, adt.NewStringAffineInterval("\x00", ""))
}
// 3. puts in current level should not overlap with deletes in current level and deletes
// in all txn children levels
for _, req := range reqs { for _, req := range reqs {
tv, ok := req.Request.(*pb.RequestOp_RequestPut) tv, ok := req.Request.(*pb.RequestOp_RequestPut)
if !ok || tv.RequestPut == nil { if !ok || tv.RequestPut == nil {
@ -259,6 +285,7 @@ func checkIntervals(reqs []*pb.RequestOp) (map[string]struct{}, adt.IntervalTree
} }
puts[k] = struct{}{} puts[k] = struct{}{}
} }
return puts, dels, nil return puts, dels, nil
} }

View File

@ -19,6 +19,8 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"github.com/stretchr/testify/assert"
) )
func TestCheckRangeRequest(t *testing.T) { func TestCheckRangeRequest(t *testing.T) {
@ -66,3 +68,162 @@ func getError(err error) string {
return err.Error() return err.Error()
} }
func TestCheckIntervals(t *testing.T) {
tests := []struct {
name string
requestOp []*pb.RequestOp
err error
}{
// check ok
{
name: "No duplicate key is ok",
requestOp: []*pb.RequestOp{put_op, put_op1},
err: nil,
},
{
name: "Nested no duplicate key is ok",
requestOp: []*pb.RequestOp{txn_op_put_success, txn_op_put_success1},
err: nil,
},
{
name: "Overlap in different branch is ok",
requestOp: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Success: []*pb.RequestOp{put_op},
Failure: []*pb.RequestOp{del_op},
},
},
},
},
err: nil,
},
// check err
{
name: "Duplicate key should fail",
requestOp: []*pb.RequestOp{put_op, put_op},
err: rpctypes.ErrGRPCDuplicateKey,
},
{
name: "Nested duplicate key should fail",
requestOp: []*pb.RequestOp{txn_op_put_success, txn_op_put_success},
err: rpctypes.ErrGRPCDuplicateKey,
},
{
name: "Overlap put and delete should fail",
requestOp: []*pb.RequestOp{put_op, del_op},
err: rpctypes.ErrGRPCDuplicateKey,
},
{
name: "Overlap put-in-txn and delete should fail",
requestOp: []*pb.RequestOp{txn_op_put_success, del_op},
err: rpctypes.ErrGRPCDuplicateKey,
},
{
name: "Overlap put and delete-in-txn should fail",
requestOp: []*pb.RequestOp{put_op, txn_op_del_success},
err: rpctypes.ErrGRPCDuplicateKey,
},
{
name: "Nested overlap put and delete should fail, case 0",
requestOp: []*pb.RequestOp{txn_op_put_success, txn_op_del_success},
err: rpctypes.ErrGRPCDuplicateKey,
},
{
name: "Nested overlap put and delete should fail, case 1",
requestOp: []*pb.RequestOp{txn_op_put_success, txn_op_del_failure},
err: rpctypes.ErrGRPCDuplicateKey,
},
{
name: "Nested overlap put and delete should fail, case 2",
requestOp: []*pb.RequestOp{txn_op_put_failure, txn_op_del_success},
err: rpctypes.ErrGRPCDuplicateKey,
},
{
name: "Nested overlap put and delete should fail, case 3 (with the order swapped)",
requestOp: []*pb.RequestOp{txn_op_del_success, txn_op_put_success},
err: rpctypes.ErrGRPCDuplicateKey,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, _, err := checkIntervals(tt.requestOp)
assert.Equal(t, tt.err, err)
})
}
}
// TestCheckIntervals variables setup.
var (
put_op = &pb.RequestOp{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("foo1"),
},
},
}
put_op1 = &pb.RequestOp{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("foo2"),
},
},
}
// overlap with `put_op`
del_op = &pb.RequestOp{
Request: &pb.RequestOp_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte("foo0"),
RangeEnd: []byte("foo3"),
},
},
}
// does not overlap with `put_op`
del_op1 = &pb.RequestOp{
Request: &pb.RequestOp_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte("foo2"),
RangeEnd: []byte("foo3"),
},
},
}
txn_op_put_success = &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Success: []*pb.RequestOp{put_op},
},
},
}
txn_op_put_success1 = &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Success: []*pb.RequestOp{put_op1},
},
},
}
txn_op_put_failure = &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Success: []*pb.RequestOp{put_op},
},
},
}
txn_op_del_success = &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Success: []*pb.RequestOp{del_op},
},
},
}
txn_op_del_failure = &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Failure: []*pb.RequestOp{del_op},
},
},
}
)