Merge pull request #16113 from serathius/robustness-range

tests/robustness: Implement proper range requests
This commit is contained in:
Marek Siarkowicz 2023-06-22 20:50:27 +02:00 committed by GitHub
commit f985890ac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 191 additions and 134 deletions

View File

@ -44,7 +44,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
func describeEtcdRequest(request EtcdRequest) string {
switch request.Type {
case Range:
return describeRangeRequest(request.Range.Key, request.Range.Revision, request.Range.RangeOptions)
return describeRangeRequest(request.Range.RangeOptions, request.Range.Revision)
case Txn:
onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess)
if len(request.Txn.Conditions) != 0 {
@ -105,20 +105,20 @@ func describeTxnResponse(request *TxnRequest, response *TxnResponse) string {
func describeEtcdOperation(op EtcdOperation) string {
switch op.Type {
case RangeOperation:
return describeRangeRequest(op.Key, 0, op.RangeOptions)
return describeRangeRequest(op.Range, 0)
case PutOperation:
if op.LeaseID != 0 {
return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID)
if op.Put.LeaseID != 0 {
return fmt.Sprintf("put(%q, %s, %d)", op.Put.Key, describeValueOrHash(op.Put.Value), op.Put.LeaseID)
}
return fmt.Sprintf("put(%q, %s)", op.Key, describeValueOrHash(op.Value))
return fmt.Sprintf("put(%q, %s)", op.Put.Key, describeValueOrHash(op.Put.Value))
case DeleteOperation:
return fmt.Sprintf("delete(%q)", op.Key)
return fmt.Sprintf("delete(%q)", op.Delete.Key)
default:
return fmt.Sprintf("<! unknown op: %q !>", op.Type)
}
}
func describeRangeRequest(key string, revision int64, opts RangeOptions) string {
func describeRangeRequest(opts RangeOptions, revision int64) string {
kwargs := []string{}
if revision != 0 {
kwargs = append(kwargs, fmt.Sprintf("rev=%d", revision))
@ -126,31 +126,35 @@ func describeRangeRequest(key string, revision int64, opts RangeOptions) string
if opts.Limit != 0 {
kwargs = append(kwargs, fmt.Sprintf("limit=%d", opts.Limit))
}
command := "get"
if opts.WithPrefix {
command = "range"
kwargsString := strings.Join(kwargs, ", ")
if kwargsString != "" {
kwargsString = ", " + kwargsString
}
if len(kwargs) == 0 {
return fmt.Sprintf("%s(%q)", command, key)
switch {
case opts.End == "":
return fmt.Sprintf("get(%q%s)", opts.Start, kwargsString)
case opts.End == prefixEnd(opts.Start):
return fmt.Sprintf("list(%q%s)", opts.Start, kwargsString)
default:
return fmt.Sprintf("range(%q..%q%s)", opts.Start, opts.End, kwargsString)
}
return fmt.Sprintf("%s(%q, %s)", command, key, strings.Join(kwargs, ", "))
}
func describeEtcdOperationResponse(req EtcdOperation, resp EtcdOperationResult) string {
switch req.Type {
func describeEtcdOperationResponse(op EtcdOperation, resp EtcdOperationResult) string {
switch op.Type {
case RangeOperation:
return describeRangeResponse(req.RangeOptions, resp.RangeResponse)
return describeRangeResponse(op.Range, resp.RangeResponse)
case PutOperation:
return fmt.Sprintf("ok")
case DeleteOperation:
return fmt.Sprintf("deleted: %d", resp.Deleted)
default:
return fmt.Sprintf("<! unknown op: %q !>", req.Type)
return fmt.Sprintf("<! unknown op: %q !>", op.Type)
}
}
func describeRangeResponse(opts RangeOptions, response RangeResponse) string {
if opts.WithPrefix {
func describeRangeResponse(request RangeOptions, response RangeResponse) string {
if request.End != "" {
kvs := make([]string, len(response.KVs))
for i, kv := range response.KVs {
kvs[i] = describeValueOrHash(kv.Value)

View File

@ -18,9 +18,9 @@ import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/mvccpb"
"github.com/stretchr/testify/assert"
)
func TestModelDescribe(t *testing.T) {
@ -95,17 +95,17 @@ func TestModelDescribe(t *testing.T) {
expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`,
},
{
req: txnRequest([]EtcdCondition{{Key: "key9b", ExpectedRevision: 9}}, []EtcdOperation{{Type: PutOperation, Key: "key9b", PutOptions: PutOptions{Value: ValueOrHash{Value: "991"}}}}, []EtcdOperation{{Type: RangeOperation, Key: "key9b"}}),
req: txnRequest([]EtcdCondition{{Key: "key9b", ExpectedRevision: 9}}, []EtcdOperation{{Type: PutOperation, Put: PutOptions{Key: "key9b", Value: ValueOrHash{Value: "991"}}}}, []EtcdOperation{{Type: RangeOperation, Range: RangeOptions{Start: "key9b"}}}),
resp: txnResponse([]EtcdOperationResult{{}}, true, 10),
expectDescribe: `if(mod_rev(key9b)==9).then(put("key9b", "991")).else(get("key9b")) -> success(ok), rev: 10`,
},
{
req: txnRequest([]EtcdCondition{{Key: "key9c", ExpectedRevision: 9}}, []EtcdOperation{{Type: PutOperation, Key: "key9c", PutOptions: PutOptions{Value: ValueOrHash{Value: "992"}}}}, []EtcdOperation{{Type: RangeOperation, Key: "key9c"}}),
req: txnRequest([]EtcdCondition{{Key: "key9c", ExpectedRevision: 9}}, []EtcdOperation{{Type: PutOperation, Put: PutOptions{Key: "key9c", Value: ValueOrHash{Value: "992"}}}}, []EtcdOperation{{Type: RangeOperation, Range: RangeOptions{Start: "key9c"}}}),
resp: txnResponse([]EtcdOperationResult{{RangeResponse: RangeResponse{KVs: []KeyValue{{Key: "key9c", ValueRevision: ValueRevision{Value: ValueOrHash{Value: "993"}, ModRevision: 10}}}}}}, false, 10),
expectDescribe: `if(mod_rev(key9c)==9).then(put("key9c", "992")).else(get("key9c")) -> failure("993"), rev: 10`,
},
{
req: txnRequest(nil, []EtcdOperation{{Type: RangeOperation, Key: "10"}, {Type: PutOperation, Key: "11", PutOptions: PutOptions{Value: ValueOrHash{Value: "111"}}}, {Type: DeleteOperation, Key: "12"}}, nil),
req: txnRequest(nil, []EtcdOperation{{Type: RangeOperation, Range: RangeOptions{Start: "10"}}, {Type: PutOperation, Put: PutOptions{Key: "11", Value: ValueOrHash{Value: "111"}}}, {Type: DeleteOperation, Delete: DeleteOptions{Key: "12"}}}, nil),
resp: txnResponse([]EtcdOperationResult{{RangeResponse: RangeResponse{KVs: []KeyValue{{ValueRevision: ValueRevision{Value: ValueOrHash{Value: "110"}}}}}}, {}, {Deleted: 1}}, true, 10),
expectDescribe: `get("10"), put("11", "111"), delete("12") -> "110", ok, deleted: 1, rev: 10`,
},
@ -115,29 +115,44 @@ func TestModelDescribe(t *testing.T) {
expectDescribe: `defragment() -> ok, rev: 10`,
},
{
req: rangeRequest("key11", true, 0),
req: listRequest("key11", 0),
resp: rangeResponse(nil, 0, 11),
expectDescribe: `range("key11") -> [], count: 0, rev: 11`,
expectDescribe: `list("key11") -> [], count: 0, rev: 11`,
},
{
req: rangeRequest("key12", true, 0),
req: listRequest("key12", 0),
resp: rangeResponse([]*mvccpb.KeyValue{{Value: []byte("12")}}, 2, 12),
expectDescribe: `range("key12") -> ["12"], count: 2, rev: 12`,
expectDescribe: `list("key12") -> ["12"], count: 2, rev: 12`,
},
{
req: rangeRequest("key13", true, 0),
req: listRequest("key13", 0),
resp: rangeResponse([]*mvccpb.KeyValue{{Value: []byte("01234567890123456789")}}, 1, 13),
expectDescribe: `range("key13") -> [hash: 2945867837], count: 1, rev: 13`,
expectDescribe: `list("key13") -> [hash: 2945867837], count: 1, rev: 13`,
},
{
req: rangeRequest("key14", true, 14),
req: listRequest("key14", 14),
resp: rangeResponse(nil, 0, 14),
expectDescribe: `range("key14", limit=14) -> [], count: 0, rev: 14`,
expectDescribe: `list("key14", limit=14) -> [], count: 0, rev: 14`,
},
{
req: staleRangeRequest("key15", true, 0, 15),
req: staleListRequest("key15", 0, 15),
resp: rangeResponse(nil, 0, 15),
expectDescribe: `range("key15", rev=15) -> [], count: 0, rev: 15`,
expectDescribe: `list("key15", rev=15) -> [], count: 0, rev: 15`,
},
{
req: staleListRequest("key15", 2, 15),
resp: rangeResponse(nil, 0, 15),
expectDescribe: `list("key15", rev=15, limit=2) -> [], count: 0, rev: 15`,
},
{
req: rangeRequest("key16", "key16b", 0),
resp: rangeResponse(nil, 0, 16),
expectDescribe: `range("key16".."key16b") -> [], count: 0, rev: 16`,
},
{
req: rangeRequest("key16", "key16b", 2),
resp: rangeResponse(nil, 0, 16),
expectDescribe: `range("key16".."key16b", limit=2) -> [], count: 0, rev: 16`,
},
}
for _, tc := range tcs {

View File

@ -21,7 +21,6 @@ import (
"hash/fnv"
"reflect"
"sort"
"strings"
"github.com/anishathalye/porcupine"
)
@ -95,7 +94,7 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
switch request.Type {
case Range:
if request.Range.Revision == 0 || request.Range.Revision == s.Revision {
resp := s.getRange(request.Range.Key, request.Range.RangeOptions)
resp := s.getRange(request.Range.RangeOptions)
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}}
} else {
if request.Range.Revision > s.Revision {
@ -121,27 +120,27 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
switch op.Type {
case RangeOperation:
opResp[i] = EtcdOperationResult{
RangeResponse: s.getRange(op.Key, op.RangeOptions),
RangeResponse: s.getRange(op.Range),
}
case PutOperation:
_, leaseExists := s.Leases[op.LeaseID]
if op.LeaseID != 0 && !leaseExists {
_, leaseExists := s.Leases[op.Put.LeaseID]
if op.Put.LeaseID != 0 && !leaseExists {
break
}
s.KeyValues[op.Key] = ValueRevision{
Value: op.Value,
s.KeyValues[op.Put.Key] = ValueRevision{
Value: op.Put.Value,
ModRevision: s.Revision + 1,
}
increaseRevision = true
s = detachFromOldLease(s, op.Key)
s = detachFromOldLease(s, op.Put.Key)
if leaseExists {
s = attachToNewLease(s, op.LeaseID, op.Key)
s = attachToNewLease(s, op.Put.LeaseID, op.Put.Key)
}
case DeleteOperation:
if _, ok := s.KeyValues[op.Key]; ok {
delete(s.KeyValues, op.Key)
if _, ok := s.KeyValues[op.Delete.Key]; ok {
delete(s.KeyValues, op.Delete.Key)
increaseRevision = true
s = detachFromOldLease(s, op.Key)
s = detachFromOldLease(s, op.Delete.Key)
opResp[i].Deleted = 1
}
default:
@ -185,14 +184,14 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
}
}
func (s EtcdState) getRange(key string, options RangeOptions) RangeResponse {
func (s EtcdState) getRange(options RangeOptions) RangeResponse {
response := RangeResponse{
KVs: []KeyValue{},
}
if options.WithPrefix {
if options.End != "" {
var count int64
for k, v := range s.KeyValues {
if strings.HasPrefix(k, key) {
if k >= options.Start && k < options.End {
response.KVs = append(response.KVs, KeyValue{Key: k, ValueRevision: v})
count += 1
}
@ -205,10 +204,10 @@ func (s EtcdState) getRange(key string, options RangeOptions) RangeResponse {
}
response.Count = count
} else {
value, ok := s.KeyValues[key]
value, ok := s.KeyValues[options.Start]
if ok {
response.KVs = append(response.KVs, KeyValue{
Key: key,
Key: options.Start,
ValueRevision: value,
})
response.Count = 1
@ -251,21 +250,26 @@ type EtcdRequest struct {
}
type RangeRequest struct {
Key string
RangeOptions
Revision int64
}
type RangeOptions struct {
WithPrefix bool
Limit int64
Start string
End string
Limit int64
}
type PutOptions struct {
Key string
Value ValueOrHash
LeaseID int64
}
type DeleteOptions struct {
Key string
}
type TxnRequest struct {
Conditions []EtcdCondition
OperationsOnSuccess []EtcdOperation
@ -278,10 +282,10 @@ type EtcdCondition struct {
}
type EtcdOperation struct {
Type OperationType
Key string
RangeOptions
PutOptions
Type OperationType
Range RangeOptions
Put PutOptions
Delete DeleteOptions
}
type OperationType string

View File

@ -83,8 +83,8 @@ var commonTestScenarios = []modelTestCase{
operations: []testOperation{
{req: putRequest("key1", "1"), resp: putResponse(2)},
{req: putRequest("key2", "2"), resp: putResponse(3)},
{req: rangeRequest("key", true, 0), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 3}}, 2, 3)},
{req: rangeRequest("key", true, 0), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 3}}, 2, 3)},
{req: listRequest("key", 0), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 3}}, 2, 3)},
{req: listRequest("key", 0), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 3}}, 2, 3)},
},
},
{
@ -93,26 +93,26 @@ var commonTestScenarios = []modelTestCase{
{req: putRequest("key1", "1"), resp: putResponse(2)},
{req: putRequest("key2", "2"), resp: putResponse(3)},
{req: putRequest("key3", "3"), resp: putResponse(4)},
{req: rangeRequest("key", true, 0), resp: rangeResponse([]*mvccpb.KeyValue{
{req: listRequest("key", 0), resp: rangeResponse([]*mvccpb.KeyValue{
{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2},
{Key: []byte("key2"), Value: []byte("2"), ModRevision: 3},
{Key: []byte("key3"), Value: []byte("3"), ModRevision: 4},
}, 3, 4)},
{req: rangeRequest("key", true, 4), resp: rangeResponse([]*mvccpb.KeyValue{
{req: listRequest("key", 4), resp: rangeResponse([]*mvccpb.KeyValue{
{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2},
{Key: []byte("key2"), Value: []byte("2"), ModRevision: 3},
{Key: []byte("key3"), Value: []byte("3"), ModRevision: 4},
}, 3, 4)},
{req: rangeRequest("key", true, 3), resp: rangeResponse([]*mvccpb.KeyValue{
{req: listRequest("key", 3), resp: rangeResponse([]*mvccpb.KeyValue{
{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2},
{Key: []byte("key2"), Value: []byte("2"), ModRevision: 3},
{Key: []byte("key3"), Value: []byte("3"), ModRevision: 4},
}, 3, 4)},
{req: rangeRequest("key", true, 2), resp: rangeResponse([]*mvccpb.KeyValue{
{req: listRequest("key", 2), resp: rangeResponse([]*mvccpb.KeyValue{
{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2},
{Key: []byte("key2"), Value: []byte("2"), ModRevision: 3},
}, 3, 4)},
{req: rangeRequest("key", true, 1), resp: rangeResponse([]*mvccpb.KeyValue{
{req: listRequest("key", 1), resp: rangeResponse([]*mvccpb.KeyValue{
{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2},
}, 3, 4)},
},
@ -123,17 +123,17 @@ var commonTestScenarios = []modelTestCase{
{req: putRequest("key3", "3"), resp: putResponse(2)},
{req: putRequest("key2", "1"), resp: putResponse(3)},
{req: putRequest("key1", "2"), resp: putResponse(4)},
{req: rangeRequest("key", true, 0), resp: rangeResponse([]*mvccpb.KeyValue{
{req: listRequest("key", 0), resp: rangeResponse([]*mvccpb.KeyValue{
{Key: []byte("key1"), Value: []byte("2"), ModRevision: 4},
{Key: []byte("key2"), Value: []byte("1"), ModRevision: 3},
{Key: []byte("key3"), Value: []byte("3"), ModRevision: 2},
}, 3, 4)},
{req: rangeRequest("key", true, 0), resp: rangeResponse([]*mvccpb.KeyValue{
{req: listRequest("key", 0), resp: rangeResponse([]*mvccpb.KeyValue{
{Key: []byte("key2"), Value: []byte("1"), ModRevision: 3},
{Key: []byte("key1"), Value: []byte("2"), ModRevision: 4},
{Key: []byte("key3"), Value: []byte("3"), ModRevision: 2},
}, 3, 4), expectFailure: true},
{req: rangeRequest("key", true, 0), resp: rangeResponse([]*mvccpb.KeyValue{
{req: listRequest("key", 0), resp: rangeResponse([]*mvccpb.KeyValue{
{Key: []byte("key3"), Value: []byte("3"), ModRevision: 2},
{Key: []byte("key2"), Value: []byte("1"), ModRevision: 3},
{Key: []byte("key1"), Value: []byte("2"), ModRevision: 4},

View File

@ -59,9 +59,13 @@ func (h *AppendableHistory) AppendRange(key string, withPrefix bool, revision in
if resp != nil && resp.Header != nil {
respRevision = resp.Header.Revision
}
var keyEnd string
if withPrefix {
keyEnd = prefixEnd(key)
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
Input: staleRangeRequest(key, withPrefix, 0, revision),
Input: staleRangeRequest(key, keyEnd, 0, revision),
Call: start.Nanoseconds(),
Output: rangeResponse(resp.Kvs, resp.Count, respRevision),
Return: end.Nanoseconds(),
@ -239,23 +243,29 @@ func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) {
return cond
}
func toEtcdOperation(op clientv3.Op) EtcdOperation {
var opType OperationType
func toEtcdOperation(option clientv3.Op) (op EtcdOperation) {
switch {
case op.IsGet():
opType = RangeOperation
case op.IsPut():
opType = PutOperation
case op.IsDelete():
opType = DeleteOperation
case option.IsGet():
op.Type = RangeOperation
op.Range = RangeOptions{
Start: string(option.KeyBytes()),
End: string(option.RangeBytes()),
}
case option.IsPut():
op.Type = PutOperation
op.Put = PutOptions{
Key: string(option.KeyBytes()),
Value: ValueOrHash{Value: string(option.ValueBytes())},
}
case option.IsDelete():
op.Type = DeleteOperation
op.Delete = DeleteOptions{
Key: string(option.KeyBytes()),
}
default:
panic("Unsupported operation")
}
return EtcdOperation{
Type: opType,
Key: string(op.KeyBytes()),
PutOptions: PutOptions{Value: ValueOrHash{Value: string(op.ValueBytes())}},
}
return op
}
func toEtcdOperationResult(resp *etcdserverpb.ResponseOp) EtcdOperationResult {
@ -337,19 +347,42 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, call int64, err er
}
func getRequest(key string) EtcdRequest {
return rangeRequest(key, false, 0)
return rangeRequest(key, "", 0)
}
func staleGetRequest(key string, revision int64) EtcdRequest {
return staleRangeRequest(key, false, 0, revision)
return staleRangeRequest(key, "", 0, revision)
}
func rangeRequest(key string, withPrefix bool, limit int64) EtcdRequest {
return staleRangeRequest(key, withPrefix, limit, 0)
func rangeRequest(start, end string, limit int64) EtcdRequest {
return staleRangeRequest(start, end, limit, 0)
}
func staleRangeRequest(key string, withPrefix bool, limit, revision int64) EtcdRequest {
return EtcdRequest{Type: Range, Range: &RangeRequest{Key: key, RangeOptions: RangeOptions{WithPrefix: withPrefix, Limit: limit}, Revision: revision}}
func listRequest(key string, limit int64) EtcdRequest {
return staleListRequest(key, limit, 0)
}
func staleListRequest(key string, limit, revision int64) EtcdRequest {
return staleRangeRequest(key, prefixEnd(key), limit, revision)
}
// prefixEnd gets the range end of the prefix.
// Notice: Keep in sync with /client/v3/op.go getPrefix function.
func prefixEnd(key string) string {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
if end[i] < 0xff {
end[i] = end[i] + 1
end = end[:i+1]
return string(end)
}
}
return "\x00"
}
func staleRangeRequest(start, end string, limit, revision int64) EtcdRequest {
return EtcdRequest{Type: Range, Range: &RangeRequest{RangeOptions: RangeOptions{Start: start, End: end, Limit: limit}, Revision: revision}}
}
func emptyGetResponse(revision int64) MaybeEtcdResponse {
@ -384,7 +417,7 @@ func partialResponse(revision int64) MaybeEtcdResponse {
}
func putRequest(key, value string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: PutOperation, Key: key, PutOptions: PutOptions{Value: ToValueOrHash(value)}}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: PutOperation, Put: PutOptions{Key: key, Value: ToValueOrHash(value)}}}}}
}
func putResponse(revision int64) MaybeEtcdResponse {
@ -392,7 +425,7 @@ func putResponse(revision int64) MaybeEtcdResponse {
}
func deleteRequest(key string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Key: key}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Delete: DeleteOptions{Key: key}}}}}
}
func deleteResponse(deleted int64, revision int64) MaybeEtcdResponse {
@ -415,7 +448,7 @@ func compareRevision(key string, expectedRevision int64) *EtcdCondition {
}
func putOperation(key, value string) *EtcdOperation {
return &EtcdOperation{Type: PutOperation, Key: key, PutOptions: PutOptions{Value: ToValueOrHash(value)}}
return &EtcdOperation{Type: PutOperation, Put: PutOptions{Key: key, Value: ToValueOrHash(value)}}
}
func txnRequestSingleOperation(cond *EtcdCondition, onSuccess, onFailure *EtcdOperation) EtcdRequest {
@ -451,7 +484,7 @@ func txnResponse(result []EtcdOperationResult, succeeded bool, revision int64) M
}
func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: PutOperation, Key: key, PutOptions: PutOptions{Value: ToValueOrHash(value), LeaseID: leaseID}}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: PutOperation, Put: PutOptions{Key: key, Value: ToValueOrHash(value), LeaseID: leaseID}}}}}
}
func leaseGrantRequest(leaseID int64) EtcdRequest {

View File

@ -32,7 +32,7 @@ func TestModelNonDeterministic(t *testing.T) {
operations: []testOperation{
{req: putRequest("key1", "1"), resp: failedResponse(errors.New("failed"))},
{req: putRequest("key2", "2"), resp: putResponse(3)},
{req: rangeRequest("key", true, 0), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 3}}, 2, 3)},
{req: listRequest("key", 0), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 2}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 3}}, 2, 3)},
},
},
{
@ -40,7 +40,7 @@ func TestModelNonDeterministic(t *testing.T) {
operations: []testOperation{
{req: putRequest("key1", "1"), resp: failedResponse(errors.New("failed"))},
{req: putRequest("key2", "2"), resp: putResponse(2)},
{req: rangeRequest("key", true, 0), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key2"), Value: []byte("2"), ModRevision: 2}}, 1, 2)},
{req: listRequest("key", 0), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key2"), Value: []byte("2"), ModRevision: 2}}, 1, 2)},
},
},
{

View File

@ -73,7 +73,19 @@ func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) {
index = r.eventHistoryIndex
operations := []EtcdOperation{}
for r.eventHistory[index].Revision == revision {
operations = append(operations, r.eventHistory[index].Op)
event := r.eventHistory[index]
switch event.Type {
case PutOperation:
operations = append(operations, EtcdOperation{
Type: event.Type,
Put: PutOptions{Key: event.Key, Value: event.Value},
})
case DeleteOperation:
operations = append(operations, EtcdOperation{
Type: event.Type,
Delete: DeleteOptions{Key: event.Key},
})
}
index++
}
return EtcdRequest{
@ -84,16 +96,13 @@ func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) {
}, revision, index
}
func operationToRequest(op EtcdOperation) EtcdRequest {
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: []EtcdOperation{op},
},
}
}
type WatchEvent struct {
Op EtcdOperation
Event
Revision int64
}
type Event struct {
Type OperationType
Key string
Value ValueOrHash
}

View File

@ -254,22 +254,18 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) WatchResponse
return resp
}
func toWatchEvent(event clientv3.Event) model.WatchEvent {
var op model.OperationType
func toWatchEvent(event clientv3.Event) (watch model.WatchEvent) {
watch.Revision = event.Kv.ModRevision
watch.Key = string(event.Kv.Key)
watch.Value = model.ToValueOrHash(string(event.Kv.Value))
switch event.Type {
case mvccpb.PUT:
op = model.PutOperation
watch.Type = model.PutOperation
case mvccpb.DELETE:
op = model.DeleteOperation
watch.Type = model.DeleteOperation
default:
panic(fmt.Sprintf("Unexpected event type: %s", event.Type))
}
return model.WatchEvent{
Revision: event.Kv.ModRevision,
Op: model.EtcdOperation{
Type: op,
Key: string(event.Kv.Key),
PutOptions: model.PutOptions{Value: model.ToValueOrHash(string(event.Kv.Value))},
},
}
return watch
}

View File

@ -21,7 +21,7 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) []porcupine.Operation {
func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))
lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents)
@ -51,7 +51,7 @@ func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvent
return newOperations
}
func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) porcupine.Operation {
func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent) porcupine.Operation {
var maxCallTime int64
var lastOperation porcupine.Operation
for _, op := range operations {
@ -68,16 +68,13 @@ func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents
return lastOperation
}
func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) *traffic.TimedWatchEvent {
func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]traffic.TimedWatchEvent) *traffic.TimedWatchEvent {
for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) {
if etcdOp.Type == model.PutOperation {
// Remove LeaseID which is not exposed in watch.
event, ok := watchEvents[model.EtcdOperation{
Type: etcdOp.Type,
Key: etcdOp.Key,
PutOptions: model.PutOptions{
Value: etcdOp.Value,
},
event, ok := watchEvents[model.Event{
Type: etcdOp.Type,
Key: etcdOp.Put.Key,
Value: etcdOp.Put.Value,
}]
if ok {
return &event

View File

@ -41,12 +41,12 @@ func operations(reports []traffic.ClientReport) []porcupine.Operation {
return ops
}
func uniqueWatchEvents(reports []traffic.ClientReport) map[model.EtcdOperation]traffic.TimedWatchEvent {
persisted := map[model.EtcdOperation]traffic.TimedWatchEvent{}
func uniqueWatchEvents(reports []traffic.ClientReport) map[model.Event]traffic.TimedWatchEvent {
persisted := map[model.Event]traffic.TimedWatchEvent{}
for _, r := range reports {
for _, resp := range r.Watch {
for _, event := range resp.Events {
persisted[event.Op] = traffic.TimedWatchEvent{Time: resp.Time, WatchEvent: event}
persisted[event.Event] = traffic.TimedWatchEvent{Time: resp.Time, WatchEvent: event}
}
}
}

View File

@ -79,11 +79,10 @@ func validateUnique(t *testing.T, expectUniqueRevision bool, report traffic.Clie
key = struct {
revision int64
key string
}{event.Revision, event.Op.Key}
}{event.Revision, event.Key}
}
if _, found := uniqueOperations[key]; found {
t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Op.Key, event.Revision, report.ClientId)
t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Key, event.Revision, report.ClientId)
}
uniqueOperations[key] = struct{}{}
}
@ -137,7 +136,7 @@ func validateEventsMatch(t *testing.T, reports []traffic.ClientReport) {
for _, r := range reports {
for _, resp := range r.Watch {
for _, event := range resp.Events {
rk := revisionKey{key: event.Op.Key, revision: event.Revision}
rk := revisionKey{key: event.Key, revision: event.Revision}
if prev, found := revisionKeyToEvent[rk]; found {
if prev.WatchEvent != event {
t.Errorf("Events between clients %d and %d don't match, key: %q, revision: %d, diff: %s", prev.ClientId, r.ClientId, rk.key, rk.revision, cmp.Diff(prev, event))