From 9b5680c5f1c10e77dc9d8ca504f5148e2ba2ee93 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 3 May 2023 10:08:10 +0200 Subject: [PATCH] tests/robustness: Implement first step in validating the Kubernetes-etcd contract. * Use mod revision for optimistic concurrency. * Introduce range requests as more general then get * Add kubernetes specific traffic generation, for now using pull, but expected to evolve to use watch. * Introduce kubernetes specific test scenario Signed-off-by: Marek Siarkowicz --- tests/robustness/client.go | 32 +- tests/robustness/linearizability_test.go | 11 +- tests/robustness/model/history.go | 67 +++-- tests/robustness/model/model.go | 115 ++++++-- tests/robustness/model/model_test.go | 360 ++++++++++++++--------- tests/robustness/traffic.go | 84 +++++- 6 files changed, 451 insertions(+), 218 deletions(-) diff --git a/tests/robustness/client.go b/tests/robustness/client.go index 625d561aa..8f236ea7f 100644 --- a/tests/robustness/client.go +++ b/tests/robustness/client.go @@ -16,6 +16,7 @@ package robustness import ( "context" + "fmt" "time" "go.uber.org/zap" @@ -53,14 +54,29 @@ func (c *recordingClient) Close() error { return c.client.Close() } -func (c *recordingClient) Get(ctx context.Context, key string) ([]*mvccpb.KeyValue, error) { +func (c *recordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) { + resp, err := c.Range(ctx, key, false) + if err != nil || len(resp) == 0 { + return nil, err + } + if len(resp) == 1 { + return resp[0], err + } + panic(fmt.Sprintf("Unexpected response size: %d", len(resp))) +} + +func (c *recordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { callTime := time.Since(c.baseTime) - resp, err := c.client.Get(ctx, key) + ops := []clientv3.OpOption{} + if withPrefix { + ops = append(ops, clientv3.WithPrefix()) + } + resp, err := c.client.Get(ctx, key, ops...) returnTime := time.Since(c.baseTime) if err != nil { return nil, err } - c.history.AppendGet(key, callTime, returnTime, resp) + c.history.AppendRange(key, withPrefix, callTime, returnTime, resp) return resp.Kvs, nil } @@ -80,22 +96,22 @@ func (c *recordingClient) Delete(ctx context.Context, key string) error { return nil } -func (c *recordingClient) CompareAndSet(ctx context.Context, key, expectedValue, newValue string) error { +func (c *recordingClient) CompareAndSet(ctx context.Context, key, value string, expectedRevision int64) error { callTime := time.Since(c.baseTime) txn := c.client.Txn(ctx) var cmp clientv3.Cmp - if expectedValue == "" { + if expectedRevision == 0 { cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0) } else { - cmp = clientv3.Compare(clientv3.Value(key), "=", expectedValue) + cmp = clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision) } resp, err := txn.If( cmp, ).Then( - clientv3.OpPut(key, newValue), + clientv3.OpPut(key, value), ).Commit() returnTime := time.Since(c.baseTime) - c.history.AppendCompareAndSet(key, expectedValue, newValue, callTime, returnTime, resp, err) + c.history.AppendCompareAndSet(key, expectedRevision, value, callTime, returnTime, resp, err) return err } diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index 86533a7c7..217ff2f22 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -73,6 +73,15 @@ var ( }, }, } + KubernetesTraffic = trafficConfig{ + name: "Kubernetes", + minimalQPS: 200, + maximalQPS: 1000, + clientCount: 12, + traffic: kubernetesTraffic{ + keyCount: 5, + }, + } ReqProgTraffic = trafficConfig{ name: "RequestProgressTraffic", minimalQPS: 200, @@ -91,7 +100,7 @@ var ( } defaultTraffic = LowTraffic trafficList = []trafficConfig{ - LowTraffic, HighTraffic, + LowTraffic, HighTraffic, KubernetesTraffic, } ) diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index fe5e6e821..a38f38902 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/robustness/identity" ) @@ -64,20 +65,16 @@ func NewAppendableHistory(ids identity.Provider) *AppendableHistory { } } -func (h *AppendableHistory) AppendGet(key string, start, end time.Duration, resp *clientv3.GetResponse) { - var readData string - if len(resp.Kvs) == 1 { - readData = string(resp.Kvs[0].Value) - } +func (h *AppendableHistory) AppendRange(key string, withPrefix bool, start, end time.Duration, resp *clientv3.GetResponse) { var revision int64 if resp != nil && resp.Header != nil { revision = resp.Header.Revision } h.successful = append(h.successful, porcupine.Operation{ ClientId: h.id, - Input: getRequest(key), + Input: rangeRequest(key, withPrefix), Call: start.Nanoseconds(), - Output: getResponse(readData, revision), + Output: rangeResponse(resp.Kvs, revision), Return: end.Nanoseconds(), }) } @@ -183,8 +180,8 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r }) } -func (h *AppendableHistory) AppendCompareAndSet(key, expectValue, newValue string, start, end time.Duration, resp *clientv3.TxnResponse, err error) { - request := compareAndSetRequest(key, expectValue, newValue) +func (h *AppendableHistory) AppendCompareAndSet(key string, expectedRevision int64, value string, start, end time.Duration, resp *clientv3.TxnResponse, err error) { + request := compareAndSetRequest(key, expectedRevision, value) if err != nil { h.appendFailed(request, start, err) return @@ -235,9 +232,8 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.O func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) { switch { - case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_VALUE: + case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_MOD: cond.Key = string(cmp.KeyBytes()) - cond.ExpectedValue = ToValueOrHash(string(cmp.ValueBytes())) case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_CREATE: cond.Key = string(cmp.KeyBytes()) default: @@ -250,7 +246,7 @@ func toEtcdOperation(op clientv3.Op) EtcdOperation { var opType OperationType switch { case op.IsGet(): - opType = Get + opType = Range case op.IsPut(): opType = Put case op.IsDelete(): @@ -269,12 +265,18 @@ func toEtcdOperationResult(resp *etcdserverpb.ResponseOp) EtcdOperationResult { switch { case resp.GetResponseRange() != nil: getResp := resp.GetResponseRange() - var val string - if len(getResp.Kvs) != 0 { - val = string(getResp.Kvs[0].Value) + kvs := make([]KeyValue, len(getResp.Kvs)) + for i, kv := range getResp.Kvs { + kvs[i] = KeyValue{ + Key: string(kv.Key), + ValueRevision: ValueRevision{ + Value: ToValueOrHash(string(kv.Value)), + ModRevision: kv.ModRevision, + }, + } } return EtcdOperationResult{ - Value: ToValueOrHash(val), + KVs: kvs, } case resp.GetResponsePut() != nil: return EtcdOperationResult{} @@ -316,11 +318,34 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, start time.Duratio } func getRequest(key string) EtcdRequest { - return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Get, Key: key}}}} + return rangeRequest(key, false) } -func getResponse(value string, revision int64) EtcdResponse { - return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Value: ToValueOrHash(value)}}}, Revision: revision} +func rangeRequest(key string, withPrefix bool) EtcdRequest { + return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Range, Key: key, WithPrefix: withPrefix}}}} +} + +func emptyGetResponse(revision int64) EtcdResponse { + return rangeResponse([]*mvccpb.KeyValue{}, revision) +} + +func getResponse(key, value string, modRevision, revision int64) EtcdResponse { + return rangeResponse([]*mvccpb.KeyValue{{Key: []byte(key), Value: []byte(value), ModRevision: modRevision}}, revision) +} + +func rangeResponse(kvs []*mvccpb.KeyValue, revision int64) EtcdResponse { + result := EtcdOperationResult{KVs: make([]KeyValue, len(kvs))} + + for i, kv := range kvs { + result.KVs[i] = KeyValue{ + Key: string(kv.Key), + ValueRevision: ValueRevision{ + Value: ToValueOrHash(string(kv.Value)), + ModRevision: kv.ModRevision, + }, + } + } + return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{result}}, Revision: revision} } func failedResponse(err error) EtcdResponse { @@ -347,8 +372,8 @@ func deleteResponse(deleted int64, revision int64) EtcdResponse { return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision} } -func compareAndSetRequest(key, expectValue, newValue string) EtcdRequest { - return txnRequest([]EtcdCondition{{Key: key, ExpectedValue: ToValueOrHash(expectValue)}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(newValue)}}) +func compareAndSetRequest(key string, expectedRevision int64, value string) EtcdRequest { + return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}}) } func compareAndSetResponse(succeeded bool, revision int64) EtcdResponse { diff --git a/tests/robustness/model/model.go b/tests/robustness/model/model.go index ae00a4b1c..8dde5764d 100644 --- a/tests/robustness/model/model.go +++ b/tests/robustness/model/model.go @@ -19,6 +19,7 @@ import ( "fmt" "hash/fnv" "reflect" + "sort" "strings" "github.com/anishathalye/porcupine" @@ -27,7 +28,7 @@ import ( type OperationType string const ( - Get OperationType = "get" + Range OperationType = "range" Put OperationType = "put" Delete OperationType = "delete" ) @@ -77,15 +78,16 @@ type TxnRequest struct { } type EtcdCondition struct { - Key string - ExpectedValue ValueOrHash + Key string + ExpectedRevision int64 } type EtcdOperation struct { - Type OperationType - Key string - Value ValueOrHash - LeaseID int64 + Type OperationType + Key string + WithPrefix bool + Value ValueOrHash + LeaseID int64 } type LeaseGrantRequest struct { @@ -122,10 +124,15 @@ func Match(r1, r2 EtcdResponse) bool { } type EtcdOperationResult struct { - Value ValueOrHash + KVs []KeyValue Deleted int64 } +type KeyValue struct { + Key string + ValueRevision +} + var leased = struct{}{} type EtcdLease struct { @@ -136,11 +143,16 @@ type PossibleStates []EtcdState type EtcdState struct { Revision int64 - KeyValues map[string]ValueOrHash + KeyValues map[string]ValueRevision KeyLeases map[string]int64 Leases map[int64]EtcdLease } +type ValueRevision struct { + Value ValueOrHash + ModRevision int64 +} + type ValueOrHash struct { Value string Hash uint32 @@ -200,7 +212,7 @@ func describeEtcdRequest(request EtcdRequest) string { func describeEtcdConditions(conds []EtcdCondition) string { opsDescription := make([]string, len(conds)) for i := range conds { - opsDescription[i] = fmt.Sprintf("%s==%s", conds[i].Key, describeValueOrHash(conds[i].ExpectedValue)) + opsDescription[i] = fmt.Sprintf("mod_rev(%s)==%d", conds[i].Key, conds[i].ExpectedRevision) } return strings.Join(opsDescription, " && ") } @@ -219,20 +231,23 @@ func describeTxnResponse(request *TxnRequest, response *TxnResponse) string { } respDescription := make([]string, len(response.OpsResult)) for i := range response.OpsResult { - respDescription[i] = describeEtcdOperationResponse(request.Ops[i].Type, response.OpsResult[i]) + respDescription[i] = describeEtcdOperationResponse(request.Ops[i], response.OpsResult[i]) } return strings.Join(respDescription, ", ") } func describeEtcdOperation(op EtcdOperation) string { switch op.Type { - case Get: + case Range: + if op.WithPrefix { + return fmt.Sprintf("range(%q)", op.Key) + } return fmt.Sprintf("get(%q)", op.Key) case Put: if op.LeaseID != 0 { return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID) } - return fmt.Sprintf("put(%q, %s, nil)", op.Key, describeValueOrHash(op.Value)) + return fmt.Sprintf("put(%q, %s)", op.Key, describeValueOrHash(op.Value)) case Delete: return fmt.Sprintf("delete(%q)", op.Key) default: @@ -240,16 +255,28 @@ func describeEtcdOperation(op EtcdOperation) string { } } -func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) string { - switch op { - case Get: - return describeValueOrHash(resp.Value) +func describeEtcdOperationResponse(req EtcdOperation, resp EtcdOperationResult) string { + switch req.Type { + case Range: + if req.WithPrefix { + kvs := make([]string, len(resp.KVs)) + for i, kv := range resp.KVs { + kvs[i] = describeValueOrHash(kv.Value) + } + return fmt.Sprintf("[%s]", strings.Join(kvs, ",")) + } else { + if len(resp.KVs) == 0 { + return "nil" + } else { + return describeValueOrHash(resp.KVs[0].Value) + } + } case Put: return fmt.Sprintf("ok") case Delete: return fmt.Sprintf("deleted: %d", resp.Deleted) default: - return fmt.Sprintf("", op) + return fmt.Sprintf("", req.Type) } } @@ -283,7 +310,7 @@ func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bo func initState(request EtcdRequest, response EtcdResponse) EtcdState { state := EtcdState{ Revision: response.Revision, - KeyValues: map[string]ValueOrHash{}, + KeyValues: map[string]ValueRevision{}, KeyLeases: map[string]int64{}, Leases: map[int64]EtcdLease{}, } @@ -292,15 +319,24 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState { if response.Txn.TxnResult { return state } + if len(request.Txn.Ops) != len(response.Txn.OpsResult) { + panic(fmt.Sprintf("Incorrect request %s, response %+v", describeEtcdRequest(request), describeEtcdResponse(request, response))) + } for i, op := range request.Txn.Ops { opResp := response.Txn.OpsResult[i] switch op.Type { - case Get: - if opResp.Value.Value != "" && opResp.Value.Hash == 0 { - state.KeyValues[op.Key] = opResp.Value + case Range: + for _, kv := range opResp.KVs { + state.KeyValues[kv.Key] = ValueRevision{ + Value: kv.Value, + ModRevision: kv.ModRevision, + } } case Put: - state.KeyValues[op.Key] = op.Value + state.KeyValues[op.Key] = ValueRevision{ + Value: op.Value, + ModRevision: response.Revision, + } case Delete: default: panic("Unknown operation") @@ -345,7 +381,7 @@ func applyRequest(states PossibleStates, request EtcdRequest, response EtcdRespo // applyRequestToSingleState handles a successful request, returning updated state and response it would generate. func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, EtcdResponse) { - newKVs := map[string]ValueOrHash{} + newKVs := map[string]ValueRevision{} for k, v := range s.KeyValues { newKVs[k] = v } @@ -354,7 +390,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc case Txn: success := true for _, cond := range request.Txn.Conds { - if val := s.KeyValues[cond.Key]; val != cond.ExpectedValue { + if val := s.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision { success = false break } @@ -366,14 +402,37 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc increaseRevision := false for i, op := range request.Txn.Ops { switch op.Type { - case Get: - opResp[i].Value = s.KeyValues[op.Key] + case Range: + opResp[i] = EtcdOperationResult{ + KVs: []KeyValue{}, + } + if op.WithPrefix { + for k, v := range s.KeyValues { + if strings.HasPrefix(k, op.Key) { + opResp[i].KVs = append(opResp[i].KVs, KeyValue{Key: k, ValueRevision: v}) + } + } + sort.Slice(opResp[i].KVs, func(j, k int) bool { + return opResp[i].KVs[j].Key < opResp[i].KVs[k].Key + }) + } else { + value, ok := s.KeyValues[op.Key] + if ok { + opResp[i].KVs = append(opResp[i].KVs, KeyValue{ + Key: op.Key, + ValueRevision: value, + }) + } + } case Put: _, leaseExists := s.Leases[op.LeaseID] if op.LeaseID != 0 && !leaseExists { break } - s.KeyValues[op.Key] = op.Value + s.KeyValues[op.Key] = ValueRevision{ + Value: op.Value, + ModRevision: s.Revision + 1, + } increaseRevision = true s = detachFromOldLease(s, op.Key) if leaseExists { diff --git a/tests/robustness/model/model_test.go b/tests/robustness/model/model_test.go index 91946358c..e412eaa24 100644 --- a/tests/robustness/model/model_test.go +++ b/tests/robustness/model/model_test.go @@ -15,10 +15,14 @@ package model import ( + "encoding/json" "errors" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + + "go.etcd.io/etcd/api/v3/mvccpb" ) func TestModelStep(t *testing.T) { @@ -29,7 +33,22 @@ func TestModelStep(t *testing.T) { { name: "First Get can start from non-empty value and non-zero revision", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("", 42)}, + {req: getRequest("key"), resp: getResponse("key", "1", 42, 42)}, + {req: getRequest("key"), resp: getResponse("key", "1", 42, 42)}, + }, + }, + { + name: "First Range can start from non-empty value and non-zero revision", + operations: []testOperation{ + {req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key"), Value: []byte("1")}}, 42)}, + {req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key"), Value: []byte("1")}}, 42)}, + }, + }, + { + name: "First Range can start from non-zero revision", + operations: []testOperation{ + {req: rangeRequest("key", true), resp: rangeResponse(nil, 1)}, + {req: rangeRequest("key", true), resp: rangeResponse(nil, 1)}, }, }, { @@ -47,7 +66,7 @@ func TestModelStep(t *testing.T) { { name: "First Txn can start from non-zero revision", operations: []testOperation{ - {req: compareAndSetRequest("key", "", "42"), resp: compareAndSetResponse(false, 42)}, + {req: compareAndSetRequest("key", 0, "42"), resp: compareAndSetResponse(false, 42)}, }, }, { @@ -55,31 +74,50 @@ func TestModelStep(t *testing.T) { operations: []testOperation{ {req: putRequest("key1", "11"), resp: putResponse(1)}, {req: putRequest("key2", "12"), resp: putResponse(2)}, - {req: getRequest("key1"), resp: getResponse("11", 1), failure: true}, - {req: getRequest("key1"), resp: getResponse("12", 1), failure: true}, - {req: getRequest("key1"), resp: getResponse("12", 2), failure: true}, - {req: getRequest("key1"), resp: getResponse("11", 2)}, - {req: getRequest("key2"), resp: getResponse("11", 2), failure: true}, - {req: getRequest("key2"), resp: getResponse("12", 1), failure: true}, - {req: getRequest("key2"), resp: getResponse("11", 1), failure: true}, - {req: getRequest("key2"), resp: getResponse("12", 2)}, + {req: getRequest("key1"), resp: getResponse("key1", "11", 1, 1), failure: true}, + {req: getRequest("key1"), resp: getResponse("key1", "12", 1, 1), failure: true}, + {req: getRequest("key1"), resp: getResponse("key1", "12", 2, 2), failure: true}, + {req: getRequest("key1"), resp: getResponse("key1", "11", 1, 2)}, + {req: getRequest("key2"), resp: getResponse("key2", "11", 2, 2), failure: true}, + {req: getRequest("key2"), resp: getResponse("key2", "12", 1, 1), failure: true}, + {req: getRequest("key2"), resp: getResponse("key2", "11", 1, 1), failure: true}, + {req: getRequest("key2"), resp: getResponse("key2", "12", 2, 2)}, }, }, { - name: "Get response data should match large put", + name: "Range response data should match put", + operations: []testOperation{ + {req: putRequest("key1", "1"), resp: putResponse(1)}, + {req: putRequest("key2", "2"), resp: putResponse(2)}, + {req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 1}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 2}}, 2)}, + {req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 1}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 2}}, 2)}, + }, + }, + { + name: "Range response should be ordered by key", + operations: []testOperation{ + {req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{ + {Key: []byte("key1"), Value: []byte("2"), ModRevision: 3}, + {Key: []byte("key2"), Value: []byte("1"), ModRevision: 2}, + {Key: []byte("key3"), Value: []byte("3"), ModRevision: 1}, + }, 3)}, + }, + }, + { + name: "Range response data should match large put", operations: []testOperation{ {req: putRequest("key", "012345678901234567890"), resp: putResponse(1)}, - {req: getRequest("key"), resp: getResponse("123456789012345678901", 1), failure: true}, - {req: getRequest("key"), resp: getResponse("012345678901234567890", 1)}, + {req: getRequest("key"), resp: getResponse("key", "123456789012345678901", 1, 1), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "012345678901234567890", 1, 1)}, {req: putRequest("key", "123456789012345678901"), resp: putResponse(2)}, - {req: getRequest("key"), resp: getResponse("123456789012345678901", 2)}, - {req: getRequest("key"), resp: getResponse("012345678901234567890", 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "123456789012345678901", 2, 2)}, + {req: getRequest("key"), resp: getResponse("key", "012345678901234567890", 2, 2), failure: true}, }, }, { name: "Put must increase revision by 1", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("", 1)}, + {req: getRequest("key"), resp: emptyGetResponse(1)}, {req: putRequest("key", "1"), resp: putResponse(1), failure: true}, {req: putRequest("key", "1"), resp: putResponse(3), failure: true}, {req: putRequest("key", "1"), resp: putResponse(2)}, @@ -90,18 +128,18 @@ func TestModelStep(t *testing.T) { operations: []testOperation{ {req: putRequest("key", "1"), resp: putResponse(1)}, {req: putRequest("key", "1"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: getRequest("key"), resp: getResponse("2", 1), failure: true}, - {req: getRequest("key"), resp: getResponse("1", 2), failure: true}, - {req: getRequest("key"), resp: getResponse("2", 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "2", 1, 2), failure: true}, }, }, { name: "Put can fail and be lost before put", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("", 1)}, + {req: getRequest("key"), resp: emptyGetResponse(1)}, {req: putRequest("key", "1"), resp: failedResponse(errors.New("failed"))}, - {req: putRequest("key", "3"), resp: getResponse("", 2)}, + {req: putRequest("key", "3"), resp: putResponse(2)}, }, }, { @@ -116,13 +154,13 @@ func TestModelStep(t *testing.T) { name: "Put can fail and be lost before txn", operations: []testOperation{ // Txn failure - {req: getRequest("key"), resp: getResponse("", 1)}, + {req: getRequest("key"), resp: emptyGetResponse(1)}, {req: putRequest("key", "1"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 1)}, + {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 1)}, // Txn success {req: putRequest("key", "2"), resp: putResponse(2)}, {req: putRequest("key", "4"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "2", "5"), resp: compareAndSetResponse(true, 3)}, + {req: compareAndSetRequest("key", 2, "5"), resp: compareAndSetResponse(true, 3)}, }, }, { @@ -135,13 +173,14 @@ func TestModelStep(t *testing.T) { // One failed request, one persisted. {req: putRequest("key", "1"), resp: putResponse(1)}, {req: putRequest("key", "2"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("3", 2), failure: true}, - {req: getRequest("key"), resp: getResponse("2", 1), failure: true}, - {req: getRequest("key"), resp: getResponse("2", 2)}, + {req: getRequest("key"), resp: getResponse("key", "3", 2, 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "3", 1, 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "2", 2, 2)}, // Two failed request, two persisted. {req: putRequest("key", "3"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "4"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("4", 4)}, + {req: getRequest("key"), resp: getResponse("key", "4", 4, 4)}, }, }, { @@ -169,15 +208,15 @@ func TestModelStep(t *testing.T) { name: "Put can fail but be persisted before txn", operations: []testOperation{ // Txn success - {req: getRequest("key"), resp: getResponse("", 1)}, + {req: getRequest("key"), resp: emptyGetResponse(1)}, {req: putRequest("key", "2"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "2", ""), resp: compareAndSetResponse(true, 2), failure: true}, - {req: compareAndSetRequest("key", "2", ""), resp: compareAndSetResponse(true, 3)}, + {req: compareAndSetRequest("key", 2, ""), resp: compareAndSetResponse(true, 2), failure: true}, + {req: compareAndSetRequest("key", 2, ""), resp: compareAndSetResponse(true, 3)}, // Txn failure {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: compareAndSetRequest("key", "5", ""), resp: compareAndSetResponse(false, 4)}, + {req: compareAndSetRequest("key", 5, ""), resp: compareAndSetResponse(false, 4)}, {req: putRequest("key", "5"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("5", 5)}, + {req: getRequest("key"), resp: getResponse("key", "5", 5, 5)}, }, }, { @@ -194,7 +233,7 @@ func TestModelStep(t *testing.T) { { name: "Delete not existing key", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("", 1)}, + {req: getRequest("key"), resp: emptyGetResponse(1)}, {req: deleteRequest("key"), resp: deleteResponse(1, 2), failure: true}, {req: deleteRequest("key"), resp: deleteResponse(0, 1)}, }, @@ -202,11 +241,12 @@ func TestModelStep(t *testing.T) { { name: "Delete clears value", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("1", 1)}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, {req: deleteRequest("key"), resp: deleteResponse(1, 2)}, - {req: getRequest("key"), resp: getResponse("1", 1), failure: true}, - {req: getRequest("key"), resp: getResponse("1", 2), failure: true}, - {req: getRequest("key"), resp: getResponse("", 2)}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "1", 2, 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 2), failure: true}, + {req: getRequest("key"), resp: emptyGetResponse(2)}, }, }, { @@ -214,8 +254,10 @@ func TestModelStep(t *testing.T) { operations: []testOperation{ {req: putRequest("key", "1"), resp: putResponse(1)}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: getRequest("key"), resp: getResponse("", 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: getRequest("key"), resp: emptyGetResponse(2), failure: true}, + {req: getRequest("key"), resp: emptyGetResponse(2), failure: true}, + {req: getRequest("key"), resp: emptyGetResponse(1), failure: true}, }, }, { @@ -241,12 +283,12 @@ func TestModelStep(t *testing.T) { // One failed request, one persisted. {req: putRequest("key", "1"), resp: putResponse(1)}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("", 2)}, + {req: getRequest("key"), resp: emptyGetResponse(2)}, // Two failed request, one persisted. {req: putRequest("key", "3"), resp: putResponse(3)}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("", 4)}, + {req: getRequest("key"), resp: emptyGetResponse(4)}, }, }, { @@ -280,75 +322,77 @@ func TestModelStep(t *testing.T) { name: "Delete can fail but be persisted before txn", operations: []testOperation{ // Txn success - {req: getRequest("key"), resp: getResponse("1", 1)}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "", "3"), resp: compareAndSetResponse(true, 3)}, + {req: compareAndSetRequest("key", 0, "3"), resp: compareAndSetResponse(true, 3)}, // Txn failure {req: putRequest("key", "4"), resp: putResponse(4)}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(false, 5)}, + {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(false, 5)}, }, }, { name: "Txn sets new value if value matches expected", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(true, 1), failure: true}, - {req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(false, 2), failure: true}, - {req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(false, 1), failure: true}, - {req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(true, 2)}, - {req: getRequest("key"), resp: getResponse("1", 1), failure: true}, - {req: getRequest("key"), resp: getResponse("1", 2), failure: true}, - {req: getRequest("key"), resp: getResponse("2", 1), failure: true}, - {req: getRequest("key"), resp: getResponse("2", 2)}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(true, 1), failure: true}, + {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(false, 2), failure: true}, + {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(false, 1), failure: true}, + {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(true, 2)}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "1", 2, 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "2", 2, 2)}, }, }, { name: "Txn can expect on empty key", operations: []testOperation{ - {req: getRequest("key1"), resp: getResponse("", 1)}, - {req: compareAndSetRequest("key1", "", "2"), resp: compareAndSetResponse(true, 2)}, - {req: compareAndSetRequest("key2", "", "3"), resp: compareAndSetResponse(true, 3)}, - {req: compareAndSetRequest("key3", "4", "4"), resp: compareAndSetResponse(false, 4), failure: true}, + {req: getRequest("key1"), resp: emptyGetResponse(1)}, + {req: compareAndSetRequest("key1", 0, "2"), resp: compareAndSetResponse(true, 2)}, + {req: compareAndSetRequest("key2", 0, "3"), resp: compareAndSetResponse(true, 3)}, + {req: compareAndSetRequest("key3", 4, "4"), resp: compareAndSetResponse(false, 4), failure: true}, }, }, { name: "Txn doesn't do anything if value doesn't match expected", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 2), failure: true}, - {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 1), failure: true}, - {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 2), failure: true}, - {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 1)}, - {req: getRequest("key"), resp: getResponse("2", 1), failure: true}, - {req: getRequest("key"), resp: getResponse("2", 2), failure: true}, - {req: getRequest("key"), resp: getResponse("3", 1), failure: true}, - {req: getRequest("key"), resp: getResponse("3", 2), failure: true}, - {req: getRequest("key"), resp: getResponse("1", 1)}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 2), failure: true}, + {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 1), failure: true}, + {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 2), failure: true}, + {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 1)}, + {req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "2", 2, 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "3", 1, 1), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "3", 1, 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "3", 2, 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, }, }, { name: "Txn can fail and be lost before get", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: getRequest("key"), resp: getResponse("2", 2), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: getRequest("key"), resp: getResponse("key", "2", 2, 2), failure: true}, }, }, { name: "Txn can fail and be lost before delete", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 2)}, }, }, { name: "Txn can fail and be lost before put", operations: []testOperation{ - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "3"), resp: putResponse(2)}, }, }, @@ -356,28 +400,28 @@ func TestModelStep(t *testing.T) { name: "Txn can fail but be persisted before get", operations: []testOperation{ // One failed request, one persisted. - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("2", 1), failure: true}, - {req: getRequest("key"), resp: getResponse("2", 2)}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true}, + {req: getRequest("key"), resp: getResponse("key", "2", 2, 2)}, // Two failed request, two persisted. {req: putRequest("key", "3"), resp: putResponse(3)}, - {req: compareAndSetRequest("key", "3", "4"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("5", 5)}, + {req: compareAndSetRequest("key", 3, "4"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, + {req: getRequest("key"), resp: getResponse("key", "5", 5, 5)}, }, }, { name: "Txn can fail but be persisted before put", operations: []testOperation{ // One failed request, one persisted. - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "3"), resp: putResponse(3)}, // Two failed request, two persisted. {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "7"), resp: putResponse(7)}, }, }, @@ -385,13 +429,13 @@ func TestModelStep(t *testing.T) { name: "Txn can fail but be persisted before delete", operations: []testOperation{ // One failed request, one persisted. - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 3)}, // Two failed request, two persisted. {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 7)}, }, }, @@ -399,18 +443,18 @@ func TestModelStep(t *testing.T) { name: "Txn can fail but be persisted before txn", operations: []testOperation{ // One failed request, one persisted with success. - {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 3)}, + {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, + {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 3)}, // Two failed request, two persisted with success. {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "6", "7"), resp: compareAndSetResponse(true, 7)}, + {req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", 6, "7"), resp: compareAndSetResponse(true, 7)}, // One failed request, one persisted with failure. {req: putRequest("key", "8"), resp: putResponse(8)}, - {req: compareAndSetRequest("key", "8", "9"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "8", "10"), resp: compareAndSetResponse(false, 9)}, + {req: compareAndSetRequest("key", 8, "9"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", 8, "10"), resp: compareAndSetResponse(false, 9)}, }, }, { @@ -419,7 +463,7 @@ func TestModelStep(t *testing.T) { {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, {req: putWithLeaseRequest("key", "3", 2), resp: putResponse(3), failure: true}, - {req: getRequest("key"), resp: getResponse("2", 2)}, + {req: getRequest("key"), resp: getResponse("key", "2", 2, 2)}, }, }, { @@ -427,10 +471,10 @@ func TestModelStep(t *testing.T) { operations: []testOperation{ {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, - {req: getRequest("key"), resp: getResponse("2", 2)}, + {req: getRequest("key"), resp: getResponse("key", "2", 2, 2)}, {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, {req: putWithLeaseRequest("key", "4", 1), resp: putResponse(4), failure: true}, - {req: getRequest("key"), resp: getResponse("", 3)}, + {req: getRequest("key"), resp: emptyGetResponse(3)}, }, }, { @@ -439,7 +483,7 @@ func TestModelStep(t *testing.T) { {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, - {req: getRequest("key"), resp: getResponse("", 3)}, + {req: getRequest("key"), resp: emptyGetResponse(3)}, }, }, { @@ -449,7 +493,7 @@ func TestModelStep(t *testing.T) { {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, {req: putRequest("key", "3"), resp: putResponse(3)}, {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, - {req: getRequest("key"), resp: getResponse("3", 3)}, + {req: getRequest("key"), resp: getResponse("key", "3", 3, 3)}, }, }, { @@ -460,9 +504,9 @@ func TestModelStep(t *testing.T) { {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, {req: putWithLeaseRequest("key", "3", 2), resp: putResponse(3)}, {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, - {req: getRequest("key"), resp: getResponse("3", 3)}, + {req: getRequest("key"), resp: getResponse("key", "3", 3, 3)}, {req: leaseRevokeRequest(2), resp: leaseRevokeResponse(4)}, - {req: getRequest("key"), resp: getResponse("", 4)}, + {req: getRequest("key"), resp: emptyGetResponse(4)}, }, }, { @@ -471,7 +515,7 @@ func TestModelStep(t *testing.T) { {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, {req: putWithLeaseRequest("key", "3", 1), resp: putResponse(3)}, - {req: getRequest("key"), resp: getResponse("3", 3)}, + {req: getRequest("key"), resp: getResponse("key", "3", 3, 3)}, }, }, { @@ -508,10 +552,10 @@ func TestModelStep(t *testing.T) { {req: deleteRequest("key4"), resp: deleteResponse(1, 8)}, {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(9)}, {req: deleteRequest("key2"), resp: deleteResponse(0, 9)}, - {req: getRequest("key1"), resp: getResponse("", 9)}, - {req: getRequest("key2"), resp: getResponse("", 9)}, - {req: getRequest("key3"), resp: getResponse("", 9)}, - {req: getRequest("key4"), resp: getResponse("", 9)}, + {req: getRequest("key1"), resp: emptyGetResponse(9)}, + {req: getRequest("key2"), resp: emptyGetResponse(9)}, + {req: getRequest("key3"), resp: emptyGetResponse(9)}, + {req: getRequest("key4"), resp: emptyGetResponse(9)}, }, }, { @@ -536,8 +580,8 @@ func TestModelStep(t *testing.T) { {req: putWithLeaseRequest("key", "1", 1), resp: putResponse(2)}, {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: getRequest("key"), resp: getResponse("4", 4)}, - {req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)}, + {req: getRequest("key"), resp: getResponse("key", "4", 4, 4)}, + {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5)}, {req: deleteRequest("key"), resp: deleteResponse(1, 6)}, {req: defragmentRequest(), resp: defragmentResponse()}, }, @@ -554,9 +598,9 @@ func TestModelStep(t *testing.T) { {req: defragmentRequest(), resp: defragmentResponse()}, {req: putRequest("key", "4"), resp: putResponse(4)}, {req: defragmentRequest(), resp: defragmentResponse()}, - {req: getRequest("key"), resp: getResponse("4", 4)}, + {req: getRequest("key"), resp: getResponse("key", "4", 4, 4)}, {req: defragmentRequest(), resp: defragmentResponse()}, - {req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)}, + {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5)}, {req: defragmentRequest(), resp: defragmentResponse()}, {req: deleteRequest("key"), resp: deleteResponse(1, 6)}, {req: defragmentRequest(), resp: defragmentResponse()}, @@ -574,9 +618,9 @@ func TestModelStep(t *testing.T) { {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "4"), resp: putResponse(4)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, - {req: getRequest("key"), resp: getResponse("4", 4)}, + {req: getRequest("key"), resp: getResponse("key", "4", 4, 4)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)}, + {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 6)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, @@ -589,8 +633,16 @@ func TestModelStep(t *testing.T) { for _, op := range tc.operations { ok, newState := Etcd.Step(state, op.req, op.resp) if ok != !op.failure { - t.Logf("state: %v", state) t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.failure, ok, Etcd.DescribeOperation(op.req, op.resp)) + var states PossibleStates + err := json.Unmarshal([]byte(state.(string)), &states) + if err != nil { + panic(err) + } + for _, s := range states { + _, gotResp := applyRequestToSingleState(s, op.req) + t.Logf("For state: %v, diff: %s", state, cmp.Diff(op.resp, gotResp)) + } } if ok { state = newState @@ -615,23 +667,23 @@ func TestModelDescribe(t *testing.T) { }{ { req: getRequest("key1"), - resp: getResponse("", 1), + resp: emptyGetResponse(1), expectDescribe: `get("key1") -> nil, rev: 1`, }, { req: getRequest("key2"), - resp: getResponse("2", 2), + resp: getResponse("key", "2", 2, 2), expectDescribe: `get("key2") -> "2", rev: 2`, }, { req: getRequest("key2b"), - resp: getResponse("01234567890123456789", 2), + resp: getResponse("key2b", "01234567890123456789", 2, 2), expectDescribe: `get("key2b") -> hash: 2945867837, rev: 2`, }, { req: putRequest("key3", "3"), resp: putResponse(3), - expectDescribe: `put("key3", "3", nil) -> ok, rev: 3`, + expectDescribe: `put("key3", "3") -> ok, rev: 3`, }, { req: putWithLeaseRequest("key3b", "3b", 3), @@ -641,17 +693,17 @@ func TestModelDescribe(t *testing.T) { { req: putRequest("key3c", "01234567890123456789"), resp: putResponse(3), - expectDescribe: `put("key3c", hash: 2945867837, nil) -> ok, rev: 3`, + expectDescribe: `put("key3c", hash: 2945867837) -> ok, rev: 3`, }, { req: putRequest("key4", "4"), resp: failedResponse(errors.New("failed")), - expectDescribe: `put("key4", "4", nil) -> err: "failed"`, + expectDescribe: `put("key4", "4") -> err: "failed"`, }, { req: putRequest("key4b", "4b"), resp: unknownResponse(42), - expectDescribe: `put("key4b", "4b", nil) -> unknown, rev: 42`, + expectDescribe: `put("key4b", "4b") -> unknown, rev: 42`, }, { req: deleteRequest("key5"), @@ -664,30 +716,45 @@ func TestModelDescribe(t *testing.T) { expectDescribe: `delete("key6") -> err: "failed"`, }, { - req: compareAndSetRequest("key7", "7", "77"), + req: compareAndSetRequest("key7", 7, "77"), resp: compareAndSetResponse(false, 7), - expectDescribe: `if(key7=="7").then(put("key7", "77", nil)) -> txn failed, rev: 7`, + expectDescribe: `if(mod_rev(key7)==7).then(put("key7", "77")) -> txn failed, rev: 7`, }, { - req: compareAndSetRequest("key8", "8", "88"), + req: compareAndSetRequest("key8", 8, "88"), resp: compareAndSetResponse(true, 8), - expectDescribe: `if(key8=="8").then(put("key8", "88", nil)) -> ok, rev: 8`, + expectDescribe: `if(mod_rev(key8)==8).then(put("key8", "88")) -> ok, rev: 8`, }, { - req: compareAndSetRequest("key9", "9", "99"), + req: compareAndSetRequest("key9", 9, "99"), resp: failedResponse(errors.New("failed")), - expectDescribe: `if(key9=="9").then(put("key9", "99", nil)) -> err: "failed"`, + expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`, }, { - req: txnRequest(nil, []EtcdOperation{{Type: Get, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}), - resp: txnResponse([]EtcdOperationResult{{Value: ValueOrHash{Value: "110"}}, {}, {Deleted: 1}}, true, 10), - expectDescribe: `get("10"), put("11", "111", nil), delete("12") -> "110", ok, deleted: 1, rev: 10`, + req: txnRequest(nil, []EtcdOperation{{Type: Range, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}), + resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{ValueRevision: ValueRevision{Value: ValueOrHash{Value: "110"}}}}}, {}, {Deleted: 1}}, true, 10), + expectDescribe: `get("10"), put("11", "111"), delete("12") -> "110", ok, deleted: 1, rev: 10`, }, { req: defragmentRequest(), resp: defragmentResponse(), expectDescribe: `defragment() -> ok`, }, + { + req: rangeRequest("key11", true), + resp: rangeResponse(nil, 11), + expectDescribe: `range("key11") -> [], rev: 11`, + }, + { + req: rangeRequest("key12", true), + resp: rangeResponse([]*mvccpb.KeyValue{{Value: []byte("12")}}, 12), + expectDescribe: `range("key12") -> ["12"], rev: 12`, + }, + { + req: rangeRequest("key13", true), + resp: rangeResponse([]*mvccpb.KeyValue{{Value: []byte("01234567890123456789")}}, 13), + expectDescribe: `range("key13") -> [hash: 2945867837], rev: 13`, + }, } for _, tc := range tcs { assert.Equal(t, tc.expectDescribe, Etcd.DescribeOperation(tc.req, tc.resp)) @@ -701,32 +768,37 @@ func TestModelResponseMatch(t *testing.T) { expectMatch bool }{ { - resp1: getResponse("a", 1), - resp2: getResponse("a", 1), + resp1: getResponse("key", "a", 1, 1), + resp2: getResponse("key", "a", 1, 1), expectMatch: true, }, { - resp1: getResponse("a", 1), - resp2: getResponse("b", 1), + resp1: getResponse("key", "a", 1, 1), + resp2: getResponse("key", "b", 1, 1), expectMatch: false, }, { - resp1: getResponse("a", 1), - resp2: getResponse("a", 2), + resp1: getResponse("key", "a", 1, 1), + resp2: getResponse("key", "a", 2, 1), expectMatch: false, }, { - resp1: getResponse("a", 1), + resp1: getResponse("key", "a", 1, 1), + resp2: getResponse("key", "a", 1, 2), + expectMatch: false, + }, + { + resp1: getResponse("key", "a", 1, 1), resp2: failedResponse(errors.New("failed request")), expectMatch: false, }, { - resp1: getResponse("a", 1), + resp1: getResponse("key", "a", 1, 1), resp2: unknownResponse(1), expectMatch: true, }, { - resp1: getResponse("a", 1), + resp1: getResponse("key", "a", 1, 1), resp2: unknownResponse(0), expectMatch: false, }, diff --git a/tests/robustness/traffic.go b/tests/robustness/traffic.go index fa5889fc9..3bb433c8f 100644 --- a/tests/robustness/traffic.go +++ b/tests/robustness/traffic.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/stringutil" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/model" @@ -133,6 +134,58 @@ type requestChance struct { chance int } +type kubernetesTraffic struct { + keyCount int +} + +func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { + for { + select { + case <-ctx.Done(): + return + case <-finish: + return + default: + } + resource := "pods" + + pods, err := t.Range(ctx, c, "/registry/"+resource+"/", true) + if err != nil { + continue + } + limiter.Wait(ctx) + if len(pods) < t.keyCount { + err = t.Create(ctx, c, fmt.Sprintf("/registry/%s/default/%s", resource, stringutil.RandString(5)), fmt.Sprintf("%d", ids.RequestId())) + continue + } else { + randomPod := pods[rand.Intn(len(pods))] + err = t.Update(ctx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision) + } + if err != nil { + continue + } + limiter.Wait(ctx) + } +} + +func (t kubernetesTraffic) Range(ctx context.Context, c *recordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { + ctx, cancel := context.WithTimeout(ctx, RequestTimeout) + resp, err := c.Range(ctx, key, withPrefix) + cancel() + return resp, err +} + +func (t kubernetesTraffic) Create(ctx context.Context, c *recordingClient, key, value string) error { + return t.Update(ctx, c, key, value, 0) +} + +func (t kubernetesTraffic) Update(ctx context.Context, c *recordingClient, key, value string, expectedRevision int64) error { + ctx, cancel := context.WithTimeout(ctx, RequestTimeout) + err := c.CompareAndSet(ctx, key, value, expectedRevision) + cancel() + return err +} + func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { for { @@ -145,25 +198,27 @@ func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limi } key := fmt.Sprintf("%d", rand.Int()%t.keyCount) // Execute one read per one write to avoid operation history include too many failed writes when etcd is down. - resp, err := t.Read(ctx, c, limiter, key) + resp, err := t.Read(ctx, c, key) if err != nil { continue } - t.Write(ctx, c, limiter, key, ids, lm, clientId, resp) + limiter.Wait(ctx) + err = t.Write(ctx, c, limiter, key, ids, lm, clientId, resp) + if err != nil { + continue + } + limiter.Wait(ctx) } } -func (t traffic) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string) ([]*mvccpb.KeyValue, error) { +func (t traffic) Read(ctx context.Context, c *recordingClient, key string) (*mvccpb.KeyValue, error) { getCtx, cancel := context.WithTimeout(ctx, RequestTimeout) resp, err := c.Get(getCtx, key) cancel() - if err == nil { - limiter.Wait(ctx) - } return resp, err } -func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error { +func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues *mvccpb.KeyValue) error { writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) var err error @@ -177,11 +232,11 @@ func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Li case MultiOpTxn: err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id)) case CompareAndSet: - var expectValue string - if len(lastValues) != 0 { - expectValue = string(lastValues[0].Value) + var expectRevision int64 + if lastValues != nil { + expectRevision = lastValues.ModRevision } - err = c.CompareAndSet(writeCtx, key, expectValue, fmt.Sprintf("%d", id.RequestId())) + err = c.CompareAndSet(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision) case PutWithLease: leaseId := lm.LeaseId(cid) if leaseId == 0 { @@ -211,9 +266,6 @@ func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Li panic("invalid operation") } cancel() - if err == nil { - limiter.Wait(ctx) - } return err } @@ -251,7 +303,7 @@ func (t traffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { for i, opType := range opTypes { key := fmt.Sprintf("%d", keys[i]) switch opType { - case model.Get: + case model.Range: ops = append(ops, clientv3.OpGet(key)) case model.Put: value := fmt.Sprintf("%d", ids.RequestId()) @@ -271,7 +323,7 @@ func (t traffic) pickOperationType() model.OperationType { return model.Delete } if roll < 50 { - return model.Get + return model.Range } return model.Put }