mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
tests/robustness: Implement first step in validating the Kubernetes-etcd contract.
* Use mod revision for optimistic concurrency. * Introduce range requests as more general then get * Add kubernetes specific traffic generation, for now using pull, but expected to evolve to use watch. * Introduce kubernetes specific test scenario Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
7462c61b31
commit
9b5680c5f1
@ -16,6 +16,7 @@ package robustness
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -53,14 +54,29 @@ func (c *recordingClient) Close() error {
|
||||
return c.client.Close()
|
||||
}
|
||||
|
||||
func (c *recordingClient) Get(ctx context.Context, key string) ([]*mvccpb.KeyValue, error) {
|
||||
func (c *recordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) {
|
||||
resp, err := c.Range(ctx, key, false)
|
||||
if err != nil || len(resp) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
if len(resp) == 1 {
|
||||
return resp[0], err
|
||||
}
|
||||
panic(fmt.Sprintf("Unexpected response size: %d", len(resp)))
|
||||
}
|
||||
|
||||
func (c *recordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := c.client.Get(ctx, key)
|
||||
ops := []clientv3.OpOption{}
|
||||
if withPrefix {
|
||||
ops = append(ops, clientv3.WithPrefix())
|
||||
}
|
||||
resp, err := c.client.Get(ctx, key, ops...)
|
||||
returnTime := time.Since(c.baseTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.history.AppendGet(key, callTime, returnTime, resp)
|
||||
c.history.AppendRange(key, withPrefix, callTime, returnTime, resp)
|
||||
return resp.Kvs, nil
|
||||
}
|
||||
|
||||
@ -80,22 +96,22 @@ func (c *recordingClient) Delete(ctx context.Context, key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *recordingClient) CompareAndSet(ctx context.Context, key, expectedValue, newValue string) error {
|
||||
func (c *recordingClient) CompareAndSet(ctx context.Context, key, value string, expectedRevision int64) error {
|
||||
callTime := time.Since(c.baseTime)
|
||||
txn := c.client.Txn(ctx)
|
||||
var cmp clientv3.Cmp
|
||||
if expectedValue == "" {
|
||||
if expectedRevision == 0 {
|
||||
cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
|
||||
} else {
|
||||
cmp = clientv3.Compare(clientv3.Value(key), "=", expectedValue)
|
||||
cmp = clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)
|
||||
}
|
||||
resp, err := txn.If(
|
||||
cmp,
|
||||
).Then(
|
||||
clientv3.OpPut(key, newValue),
|
||||
clientv3.OpPut(key, value),
|
||||
).Commit()
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendCompareAndSet(key, expectedValue, newValue, callTime, returnTime, resp, err)
|
||||
c.history.AppendCompareAndSet(key, expectedRevision, value, callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -73,6 +73,15 @@ var (
|
||||
},
|
||||
},
|
||||
}
|
||||
KubernetesTraffic = trafficConfig{
|
||||
name: "Kubernetes",
|
||||
minimalQPS: 200,
|
||||
maximalQPS: 1000,
|
||||
clientCount: 12,
|
||||
traffic: kubernetesTraffic{
|
||||
keyCount: 5,
|
||||
},
|
||||
}
|
||||
ReqProgTraffic = trafficConfig{
|
||||
name: "RequestProgressTraffic",
|
||||
minimalQPS: 200,
|
||||
@ -91,7 +100,7 @@ var (
|
||||
}
|
||||
defaultTraffic = LowTraffic
|
||||
trafficList = []trafficConfig{
|
||||
LowTraffic, HighTraffic,
|
||||
LowTraffic, HighTraffic, KubernetesTraffic,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/identity"
|
||||
)
|
||||
@ -64,20 +65,16 @@ func NewAppendableHistory(ids identity.Provider) *AppendableHistory {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *AppendableHistory) AppendGet(key string, start, end time.Duration, resp *clientv3.GetResponse) {
|
||||
var readData string
|
||||
if len(resp.Kvs) == 1 {
|
||||
readData = string(resp.Kvs[0].Value)
|
||||
}
|
||||
func (h *AppendableHistory) AppendRange(key string, withPrefix bool, start, end time.Duration, resp *clientv3.GetResponse) {
|
||||
var revision int64
|
||||
if resp != nil && resp.Header != nil {
|
||||
revision = resp.Header.Revision
|
||||
}
|
||||
h.successful = append(h.successful, porcupine.Operation{
|
||||
ClientId: h.id,
|
||||
Input: getRequest(key),
|
||||
Input: rangeRequest(key, withPrefix),
|
||||
Call: start.Nanoseconds(),
|
||||
Output: getResponse(readData, revision),
|
||||
Output: rangeResponse(resp.Kvs, revision),
|
||||
Return: end.Nanoseconds(),
|
||||
})
|
||||
}
|
||||
@ -183,8 +180,8 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
|
||||
})
|
||||
}
|
||||
|
||||
func (h *AppendableHistory) AppendCompareAndSet(key, expectValue, newValue string, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
|
||||
request := compareAndSetRequest(key, expectValue, newValue)
|
||||
func (h *AppendableHistory) AppendCompareAndSet(key string, expectedRevision int64, value string, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
|
||||
request := compareAndSetRequest(key, expectedRevision, value)
|
||||
if err != nil {
|
||||
h.appendFailed(request, start, err)
|
||||
return
|
||||
@ -235,9 +232,8 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.O
|
||||
|
||||
func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) {
|
||||
switch {
|
||||
case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_VALUE:
|
||||
case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_MOD:
|
||||
cond.Key = string(cmp.KeyBytes())
|
||||
cond.ExpectedValue = ToValueOrHash(string(cmp.ValueBytes()))
|
||||
case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_CREATE:
|
||||
cond.Key = string(cmp.KeyBytes())
|
||||
default:
|
||||
@ -250,7 +246,7 @@ func toEtcdOperation(op clientv3.Op) EtcdOperation {
|
||||
var opType OperationType
|
||||
switch {
|
||||
case op.IsGet():
|
||||
opType = Get
|
||||
opType = Range
|
||||
case op.IsPut():
|
||||
opType = Put
|
||||
case op.IsDelete():
|
||||
@ -269,12 +265,18 @@ func toEtcdOperationResult(resp *etcdserverpb.ResponseOp) EtcdOperationResult {
|
||||
switch {
|
||||
case resp.GetResponseRange() != nil:
|
||||
getResp := resp.GetResponseRange()
|
||||
var val string
|
||||
if len(getResp.Kvs) != 0 {
|
||||
val = string(getResp.Kvs[0].Value)
|
||||
kvs := make([]KeyValue, len(getResp.Kvs))
|
||||
for i, kv := range getResp.Kvs {
|
||||
kvs[i] = KeyValue{
|
||||
Key: string(kv.Key),
|
||||
ValueRevision: ValueRevision{
|
||||
Value: ToValueOrHash(string(kv.Value)),
|
||||
ModRevision: kv.ModRevision,
|
||||
},
|
||||
}
|
||||
}
|
||||
return EtcdOperationResult{
|
||||
Value: ToValueOrHash(val),
|
||||
KVs: kvs,
|
||||
}
|
||||
case resp.GetResponsePut() != nil:
|
||||
return EtcdOperationResult{}
|
||||
@ -316,11 +318,34 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, start time.Duratio
|
||||
}
|
||||
|
||||
func getRequest(key string) EtcdRequest {
|
||||
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Get, Key: key}}}}
|
||||
return rangeRequest(key, false)
|
||||
}
|
||||
|
||||
func getResponse(value string, revision int64) EtcdResponse {
|
||||
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Value: ToValueOrHash(value)}}}, Revision: revision}
|
||||
func rangeRequest(key string, withPrefix bool) EtcdRequest {
|
||||
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Range, Key: key, WithPrefix: withPrefix}}}}
|
||||
}
|
||||
|
||||
func emptyGetResponse(revision int64) EtcdResponse {
|
||||
return rangeResponse([]*mvccpb.KeyValue{}, revision)
|
||||
}
|
||||
|
||||
func getResponse(key, value string, modRevision, revision int64) EtcdResponse {
|
||||
return rangeResponse([]*mvccpb.KeyValue{{Key: []byte(key), Value: []byte(value), ModRevision: modRevision}}, revision)
|
||||
}
|
||||
|
||||
func rangeResponse(kvs []*mvccpb.KeyValue, revision int64) EtcdResponse {
|
||||
result := EtcdOperationResult{KVs: make([]KeyValue, len(kvs))}
|
||||
|
||||
for i, kv := range kvs {
|
||||
result.KVs[i] = KeyValue{
|
||||
Key: string(kv.Key),
|
||||
ValueRevision: ValueRevision{
|
||||
Value: ToValueOrHash(string(kv.Value)),
|
||||
ModRevision: kv.ModRevision,
|
||||
},
|
||||
}
|
||||
}
|
||||
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{result}}, Revision: revision}
|
||||
}
|
||||
|
||||
func failedResponse(err error) EtcdResponse {
|
||||
@ -347,8 +372,8 @@ func deleteResponse(deleted int64, revision int64) EtcdResponse {
|
||||
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}
|
||||
}
|
||||
|
||||
func compareAndSetRequest(key, expectValue, newValue string) EtcdRequest {
|
||||
return txnRequest([]EtcdCondition{{Key: key, ExpectedValue: ToValueOrHash(expectValue)}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(newValue)}})
|
||||
func compareAndSetRequest(key string, expectedRevision int64, value string) EtcdRequest {
|
||||
return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}})
|
||||
}
|
||||
|
||||
func compareAndSetResponse(succeeded bool, revision int64) EtcdResponse {
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/anishathalye/porcupine"
|
||||
@ -27,7 +28,7 @@ import (
|
||||
type OperationType string
|
||||
|
||||
const (
|
||||
Get OperationType = "get"
|
||||
Range OperationType = "range"
|
||||
Put OperationType = "put"
|
||||
Delete OperationType = "delete"
|
||||
)
|
||||
@ -77,15 +78,16 @@ type TxnRequest struct {
|
||||
}
|
||||
|
||||
type EtcdCondition struct {
|
||||
Key string
|
||||
ExpectedValue ValueOrHash
|
||||
Key string
|
||||
ExpectedRevision int64
|
||||
}
|
||||
|
||||
type EtcdOperation struct {
|
||||
Type OperationType
|
||||
Key string
|
||||
Value ValueOrHash
|
||||
LeaseID int64
|
||||
Type OperationType
|
||||
Key string
|
||||
WithPrefix bool
|
||||
Value ValueOrHash
|
||||
LeaseID int64
|
||||
}
|
||||
|
||||
type LeaseGrantRequest struct {
|
||||
@ -122,10 +124,15 @@ func Match(r1, r2 EtcdResponse) bool {
|
||||
}
|
||||
|
||||
type EtcdOperationResult struct {
|
||||
Value ValueOrHash
|
||||
KVs []KeyValue
|
||||
Deleted int64
|
||||
}
|
||||
|
||||
type KeyValue struct {
|
||||
Key string
|
||||
ValueRevision
|
||||
}
|
||||
|
||||
var leased = struct{}{}
|
||||
|
||||
type EtcdLease struct {
|
||||
@ -136,11 +143,16 @@ type PossibleStates []EtcdState
|
||||
|
||||
type EtcdState struct {
|
||||
Revision int64
|
||||
KeyValues map[string]ValueOrHash
|
||||
KeyValues map[string]ValueRevision
|
||||
KeyLeases map[string]int64
|
||||
Leases map[int64]EtcdLease
|
||||
}
|
||||
|
||||
type ValueRevision struct {
|
||||
Value ValueOrHash
|
||||
ModRevision int64
|
||||
}
|
||||
|
||||
type ValueOrHash struct {
|
||||
Value string
|
||||
Hash uint32
|
||||
@ -200,7 +212,7 @@ func describeEtcdRequest(request EtcdRequest) string {
|
||||
func describeEtcdConditions(conds []EtcdCondition) string {
|
||||
opsDescription := make([]string, len(conds))
|
||||
for i := range conds {
|
||||
opsDescription[i] = fmt.Sprintf("%s==%s", conds[i].Key, describeValueOrHash(conds[i].ExpectedValue))
|
||||
opsDescription[i] = fmt.Sprintf("mod_rev(%s)==%d", conds[i].Key, conds[i].ExpectedRevision)
|
||||
}
|
||||
return strings.Join(opsDescription, " && ")
|
||||
}
|
||||
@ -219,20 +231,23 @@ func describeTxnResponse(request *TxnRequest, response *TxnResponse) string {
|
||||
}
|
||||
respDescription := make([]string, len(response.OpsResult))
|
||||
for i := range response.OpsResult {
|
||||
respDescription[i] = describeEtcdOperationResponse(request.Ops[i].Type, response.OpsResult[i])
|
||||
respDescription[i] = describeEtcdOperationResponse(request.Ops[i], response.OpsResult[i])
|
||||
}
|
||||
return strings.Join(respDescription, ", ")
|
||||
}
|
||||
|
||||
func describeEtcdOperation(op EtcdOperation) string {
|
||||
switch op.Type {
|
||||
case Get:
|
||||
case Range:
|
||||
if op.WithPrefix {
|
||||
return fmt.Sprintf("range(%q)", op.Key)
|
||||
}
|
||||
return fmt.Sprintf("get(%q)", op.Key)
|
||||
case Put:
|
||||
if op.LeaseID != 0 {
|
||||
return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID)
|
||||
}
|
||||
return fmt.Sprintf("put(%q, %s, nil)", op.Key, describeValueOrHash(op.Value))
|
||||
return fmt.Sprintf("put(%q, %s)", op.Key, describeValueOrHash(op.Value))
|
||||
case Delete:
|
||||
return fmt.Sprintf("delete(%q)", op.Key)
|
||||
default:
|
||||
@ -240,16 +255,28 @@ func describeEtcdOperation(op EtcdOperation) string {
|
||||
}
|
||||
}
|
||||
|
||||
func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) string {
|
||||
switch op {
|
||||
case Get:
|
||||
return describeValueOrHash(resp.Value)
|
||||
func describeEtcdOperationResponse(req EtcdOperation, resp EtcdOperationResult) string {
|
||||
switch req.Type {
|
||||
case Range:
|
||||
if req.WithPrefix {
|
||||
kvs := make([]string, len(resp.KVs))
|
||||
for i, kv := range resp.KVs {
|
||||
kvs[i] = describeValueOrHash(kv.Value)
|
||||
}
|
||||
return fmt.Sprintf("[%s]", strings.Join(kvs, ","))
|
||||
} else {
|
||||
if len(resp.KVs) == 0 {
|
||||
return "nil"
|
||||
} else {
|
||||
return describeValueOrHash(resp.KVs[0].Value)
|
||||
}
|
||||
}
|
||||
case Put:
|
||||
return fmt.Sprintf("ok")
|
||||
case Delete:
|
||||
return fmt.Sprintf("deleted: %d", resp.Deleted)
|
||||
default:
|
||||
return fmt.Sprintf("<! unknown op: %q !>", op)
|
||||
return fmt.Sprintf("<! unknown op: %q !>", req.Type)
|
||||
}
|
||||
}
|
||||
|
||||
@ -283,7 +310,7 @@ func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bo
|
||||
func initState(request EtcdRequest, response EtcdResponse) EtcdState {
|
||||
state := EtcdState{
|
||||
Revision: response.Revision,
|
||||
KeyValues: map[string]ValueOrHash{},
|
||||
KeyValues: map[string]ValueRevision{},
|
||||
KeyLeases: map[string]int64{},
|
||||
Leases: map[int64]EtcdLease{},
|
||||
}
|
||||
@ -292,15 +319,24 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
|
||||
if response.Txn.TxnResult {
|
||||
return state
|
||||
}
|
||||
if len(request.Txn.Ops) != len(response.Txn.OpsResult) {
|
||||
panic(fmt.Sprintf("Incorrect request %s, response %+v", describeEtcdRequest(request), describeEtcdResponse(request, response)))
|
||||
}
|
||||
for i, op := range request.Txn.Ops {
|
||||
opResp := response.Txn.OpsResult[i]
|
||||
switch op.Type {
|
||||
case Get:
|
||||
if opResp.Value.Value != "" && opResp.Value.Hash == 0 {
|
||||
state.KeyValues[op.Key] = opResp.Value
|
||||
case Range:
|
||||
for _, kv := range opResp.KVs {
|
||||
state.KeyValues[kv.Key] = ValueRevision{
|
||||
Value: kv.Value,
|
||||
ModRevision: kv.ModRevision,
|
||||
}
|
||||
}
|
||||
case Put:
|
||||
state.KeyValues[op.Key] = op.Value
|
||||
state.KeyValues[op.Key] = ValueRevision{
|
||||
Value: op.Value,
|
||||
ModRevision: response.Revision,
|
||||
}
|
||||
case Delete:
|
||||
default:
|
||||
panic("Unknown operation")
|
||||
@ -345,7 +381,7 @@ func applyRequest(states PossibleStates, request EtcdRequest, response EtcdRespo
|
||||
|
||||
// applyRequestToSingleState handles a successful request, returning updated state and response it would generate.
|
||||
func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, EtcdResponse) {
|
||||
newKVs := map[string]ValueOrHash{}
|
||||
newKVs := map[string]ValueRevision{}
|
||||
for k, v := range s.KeyValues {
|
||||
newKVs[k] = v
|
||||
}
|
||||
@ -354,7 +390,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
|
||||
case Txn:
|
||||
success := true
|
||||
for _, cond := range request.Txn.Conds {
|
||||
if val := s.KeyValues[cond.Key]; val != cond.ExpectedValue {
|
||||
if val := s.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision {
|
||||
success = false
|
||||
break
|
||||
}
|
||||
@ -366,14 +402,37 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
|
||||
increaseRevision := false
|
||||
for i, op := range request.Txn.Ops {
|
||||
switch op.Type {
|
||||
case Get:
|
||||
opResp[i].Value = s.KeyValues[op.Key]
|
||||
case Range:
|
||||
opResp[i] = EtcdOperationResult{
|
||||
KVs: []KeyValue{},
|
||||
}
|
||||
if op.WithPrefix {
|
||||
for k, v := range s.KeyValues {
|
||||
if strings.HasPrefix(k, op.Key) {
|
||||
opResp[i].KVs = append(opResp[i].KVs, KeyValue{Key: k, ValueRevision: v})
|
||||
}
|
||||
}
|
||||
sort.Slice(opResp[i].KVs, func(j, k int) bool {
|
||||
return opResp[i].KVs[j].Key < opResp[i].KVs[k].Key
|
||||
})
|
||||
} else {
|
||||
value, ok := s.KeyValues[op.Key]
|
||||
if ok {
|
||||
opResp[i].KVs = append(opResp[i].KVs, KeyValue{
|
||||
Key: op.Key,
|
||||
ValueRevision: value,
|
||||
})
|
||||
}
|
||||
}
|
||||
case Put:
|
||||
_, leaseExists := s.Leases[op.LeaseID]
|
||||
if op.LeaseID != 0 && !leaseExists {
|
||||
break
|
||||
}
|
||||
s.KeyValues[op.Key] = op.Value
|
||||
s.KeyValues[op.Key] = ValueRevision{
|
||||
Value: op.Value,
|
||||
ModRevision: s.Revision + 1,
|
||||
}
|
||||
increaseRevision = true
|
||||
s = detachFromOldLease(s, op.Key)
|
||||
if leaseExists {
|
||||
|
@ -15,10 +15,14 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
)
|
||||
|
||||
func TestModelStep(t *testing.T) {
|
||||
@ -29,7 +33,22 @@ func TestModelStep(t *testing.T) {
|
||||
{
|
||||
name: "First Get can start from non-empty value and non-zero revision",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("", 42)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 42, 42)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 42, 42)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "First Range can start from non-empty value and non-zero revision",
|
||||
operations: []testOperation{
|
||||
{req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key"), Value: []byte("1")}}, 42)},
|
||||
{req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key"), Value: []byte("1")}}, 42)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "First Range can start from non-zero revision",
|
||||
operations: []testOperation{
|
||||
{req: rangeRequest("key", true), resp: rangeResponse(nil, 1)},
|
||||
{req: rangeRequest("key", true), resp: rangeResponse(nil, 1)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -47,7 +66,7 @@ func TestModelStep(t *testing.T) {
|
||||
{
|
||||
name: "First Txn can start from non-zero revision",
|
||||
operations: []testOperation{
|
||||
{req: compareAndSetRequest("key", "", "42"), resp: compareAndSetResponse(false, 42)},
|
||||
{req: compareAndSetRequest("key", 0, "42"), resp: compareAndSetResponse(false, 42)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -55,31 +74,50 @@ func TestModelStep(t *testing.T) {
|
||||
operations: []testOperation{
|
||||
{req: putRequest("key1", "11"), resp: putResponse(1)},
|
||||
{req: putRequest("key2", "12"), resp: putResponse(2)},
|
||||
{req: getRequest("key1"), resp: getResponse("11", 1), failure: true},
|
||||
{req: getRequest("key1"), resp: getResponse("12", 1), failure: true},
|
||||
{req: getRequest("key1"), resp: getResponse("12", 2), failure: true},
|
||||
{req: getRequest("key1"), resp: getResponse("11", 2)},
|
||||
{req: getRequest("key2"), resp: getResponse("11", 2), failure: true},
|
||||
{req: getRequest("key2"), resp: getResponse("12", 1), failure: true},
|
||||
{req: getRequest("key2"), resp: getResponse("11", 1), failure: true},
|
||||
{req: getRequest("key2"), resp: getResponse("12", 2)},
|
||||
{req: getRequest("key1"), resp: getResponse("key1", "11", 1, 1), failure: true},
|
||||
{req: getRequest("key1"), resp: getResponse("key1", "12", 1, 1), failure: true},
|
||||
{req: getRequest("key1"), resp: getResponse("key1", "12", 2, 2), failure: true},
|
||||
{req: getRequest("key1"), resp: getResponse("key1", "11", 1, 2)},
|
||||
{req: getRequest("key2"), resp: getResponse("key2", "11", 2, 2), failure: true},
|
||||
{req: getRequest("key2"), resp: getResponse("key2", "12", 1, 1), failure: true},
|
||||
{req: getRequest("key2"), resp: getResponse("key2", "11", 1, 1), failure: true},
|
||||
{req: getRequest("key2"), resp: getResponse("key2", "12", 2, 2)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Get response data should match large put",
|
||||
name: "Range response data should match put",
|
||||
operations: []testOperation{
|
||||
{req: putRequest("key1", "1"), resp: putResponse(1)},
|
||||
{req: putRequest("key2", "2"), resp: putResponse(2)},
|
||||
{req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 1}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 2}}, 2)},
|
||||
{req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{{Key: []byte("key1"), Value: []byte("1"), ModRevision: 1}, {Key: []byte("key2"), Value: []byte("2"), ModRevision: 2}}, 2)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Range response should be ordered by key",
|
||||
operations: []testOperation{
|
||||
{req: rangeRequest("key", true), resp: rangeResponse([]*mvccpb.KeyValue{
|
||||
{Key: []byte("key1"), Value: []byte("2"), ModRevision: 3},
|
||||
{Key: []byte("key2"), Value: []byte("1"), ModRevision: 2},
|
||||
{Key: []byte("key3"), Value: []byte("3"), ModRevision: 1},
|
||||
}, 3)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Range response data should match large put",
|
||||
operations: []testOperation{
|
||||
{req: putRequest("key", "012345678901234567890"), resp: putResponse(1)},
|
||||
{req: getRequest("key"), resp: getResponse("123456789012345678901", 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("012345678901234567890", 1)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "123456789012345678901", 1, 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "012345678901234567890", 1, 1)},
|
||||
{req: putRequest("key", "123456789012345678901"), resp: putResponse(2)},
|
||||
{req: getRequest("key"), resp: getResponse("123456789012345678901", 2)},
|
||||
{req: getRequest("key"), resp: getResponse("012345678901234567890", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "123456789012345678901", 2, 2)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "012345678901234567890", 2, 2), failure: true},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Put must increase revision by 1",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("", 1)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(1)},
|
||||
{req: putRequest("key", "1"), resp: putResponse(1), failure: true},
|
||||
{req: putRequest("key", "1"), resp: putResponse(3), failure: true},
|
||||
{req: putRequest("key", "1"), resp: putResponse(2)},
|
||||
@ -90,18 +128,18 @@ func TestModelStep(t *testing.T) {
|
||||
operations: []testOperation{
|
||||
{req: putRequest("key", "1"), resp: putResponse(1)},
|
||||
{req: putRequest("key", "1"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: getRequest("key"), resp: getResponse("2", 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("1", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("2", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 1, 2), failure: true},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Put can fail and be lost before put",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("", 1)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(1)},
|
||||
{req: putRequest("key", "1"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: putRequest("key", "3"), resp: getResponse("", 2)},
|
||||
{req: putRequest("key", "3"), resp: putResponse(2)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -116,13 +154,13 @@ func TestModelStep(t *testing.T) {
|
||||
name: "Put can fail and be lost before txn",
|
||||
operations: []testOperation{
|
||||
// Txn failure
|
||||
{req: getRequest("key"), resp: getResponse("", 1)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(1)},
|
||||
{req: putRequest("key", "1"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 1)},
|
||||
{req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 1)},
|
||||
// Txn success
|
||||
{req: putRequest("key", "2"), resp: putResponse(2)},
|
||||
{req: putRequest("key", "4"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "2", "5"), resp: compareAndSetResponse(true, 3)},
|
||||
{req: compareAndSetRequest("key", 2, "5"), resp: compareAndSetResponse(true, 3)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -135,13 +173,14 @@ func TestModelStep(t *testing.T) {
|
||||
// One failed request, one persisted.
|
||||
{req: putRequest("key", "1"), resp: putResponse(1)},
|
||||
{req: putRequest("key", "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("3", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("2", 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("2", 2)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "3", 2, 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "3", 1, 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 2, 2)},
|
||||
// Two failed request, two persisted.
|
||||
{req: putRequest("key", "3"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: putRequest("key", "4"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("4", 4)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "4", 4, 4)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -169,15 +208,15 @@ func TestModelStep(t *testing.T) {
|
||||
name: "Put can fail but be persisted before txn",
|
||||
operations: []testOperation{
|
||||
// Txn success
|
||||
{req: getRequest("key"), resp: getResponse("", 1)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(1)},
|
||||
{req: putRequest("key", "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "2", ""), resp: compareAndSetResponse(true, 2), failure: true},
|
||||
{req: compareAndSetRequest("key", "2", ""), resp: compareAndSetResponse(true, 3)},
|
||||
{req: compareAndSetRequest("key", 2, ""), resp: compareAndSetResponse(true, 2), failure: true},
|
||||
{req: compareAndSetRequest("key", 2, ""), resp: compareAndSetResponse(true, 3)},
|
||||
// Txn failure
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: compareAndSetRequest("key", "5", ""), resp: compareAndSetResponse(false, 4)},
|
||||
{req: compareAndSetRequest("key", 5, ""), resp: compareAndSetResponse(false, 4)},
|
||||
{req: putRequest("key", "5"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("5", 5)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "5", 5, 5)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -194,7 +233,7 @@ func TestModelStep(t *testing.T) {
|
||||
{
|
||||
name: "Delete not existing key",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("", 1)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(1)},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 2), failure: true},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(0, 1)},
|
||||
},
|
||||
@ -202,11 +241,12 @@ func TestModelStep(t *testing.T) {
|
||||
{
|
||||
name: "Delete clears value",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 2)},
|
||||
{req: getRequest("key"), resp: getResponse("1", 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("1", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("", 2)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 2, 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 2), failure: true},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(2)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -214,8 +254,10 @@ func TestModelStep(t *testing.T) {
|
||||
operations: []testOperation{
|
||||
{req: putRequest("key", "1"), resp: putResponse(1)},
|
||||
{req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: getRequest("key"), resp: getResponse("", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(2), failure: true},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(2), failure: true},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(1), failure: true},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -241,12 +283,12 @@ func TestModelStep(t *testing.T) {
|
||||
// One failed request, one persisted.
|
||||
{req: putRequest("key", "1"), resp: putResponse(1)},
|
||||
{req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("", 2)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(2)},
|
||||
// Two failed request, one persisted.
|
||||
{req: putRequest("key", "3"), resp: putResponse(3)},
|
||||
{req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("", 4)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(4)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -280,75 +322,77 @@ func TestModelStep(t *testing.T) {
|
||||
name: "Delete can fail but be persisted before txn",
|
||||
operations: []testOperation{
|
||||
// Txn success
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "", "3"), resp: compareAndSetResponse(true, 3)},
|
||||
{req: compareAndSetRequest("key", 0, "3"), resp: compareAndSetResponse(true, 3)},
|
||||
// Txn failure
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(false, 5)},
|
||||
{req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(false, 5)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Txn sets new value if value matches expected",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(true, 1), failure: true},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(false, 2), failure: true},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(false, 1), failure: true},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(true, 2)},
|
||||
{req: getRequest("key"), resp: getResponse("1", 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("1", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("2", 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("2", 2)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(true, 1), failure: true},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(false, 2), failure: true},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(false, 1), failure: true},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(true, 2)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 2, 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 2, 2)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Txn can expect on empty key",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key1"), resp: getResponse("", 1)},
|
||||
{req: compareAndSetRequest("key1", "", "2"), resp: compareAndSetResponse(true, 2)},
|
||||
{req: compareAndSetRequest("key2", "", "3"), resp: compareAndSetResponse(true, 3)},
|
||||
{req: compareAndSetRequest("key3", "4", "4"), resp: compareAndSetResponse(false, 4), failure: true},
|
||||
{req: getRequest("key1"), resp: emptyGetResponse(1)},
|
||||
{req: compareAndSetRequest("key1", 0, "2"), resp: compareAndSetResponse(true, 2)},
|
||||
{req: compareAndSetRequest("key2", 0, "3"), resp: compareAndSetResponse(true, 3)},
|
||||
{req: compareAndSetRequest("key3", 4, "4"), resp: compareAndSetResponse(false, 4), failure: true},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Txn doesn't do anything if value doesn't match expected",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 2), failure: true},
|
||||
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 1), failure: true},
|
||||
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 2), failure: true},
|
||||
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 1)},
|
||||
{req: getRequest("key"), resp: getResponse("2", 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("2", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("3", 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("3", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 2), failure: true},
|
||||
{req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 1), failure: true},
|
||||
{req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 2), failure: true},
|
||||
{req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 1)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 2, 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "3", 1, 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "3", 1, 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "3", 2, 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Txn can fail and be lost before get",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: getRequest("key"), resp: getResponse("2", 2), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 2, 2), failure: true},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Txn can fail and be lost before delete",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 2)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Txn can fail and be lost before put",
|
||||
operations: []testOperation{
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: putRequest("key", "3"), resp: putResponse(2)},
|
||||
},
|
||||
},
|
||||
@ -356,28 +400,28 @@ func TestModelStep(t *testing.T) {
|
||||
name: "Txn can fail but be persisted before get",
|
||||
operations: []testOperation{
|
||||
// One failed request, one persisted.
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("2", 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("2", 2)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 2, 2)},
|
||||
// Two failed request, two persisted.
|
||||
{req: putRequest("key", "3"), resp: putResponse(3)},
|
||||
{req: compareAndSetRequest("key", "3", "4"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("5", 5)},
|
||||
{req: compareAndSetRequest("key", 3, "4"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("key", "5", 5, 5)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Txn can fail but be persisted before put",
|
||||
operations: []testOperation{
|
||||
// One failed request, one persisted.
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: putRequest("key", "3"), resp: putResponse(3)},
|
||||
// Two failed request, two persisted.
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: putRequest("key", "7"), resp: putResponse(7)},
|
||||
},
|
||||
},
|
||||
@ -385,13 +429,13 @@ func TestModelStep(t *testing.T) {
|
||||
name: "Txn can fail but be persisted before delete",
|
||||
operations: []testOperation{
|
||||
// One failed request, one persisted.
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 3)},
|
||||
// Two failed request, two persisted.
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 7)},
|
||||
},
|
||||
},
|
||||
@ -399,18 +443,18 @@ func TestModelStep(t *testing.T) {
|
||||
name: "Txn can fail but be persisted before txn",
|
||||
operations: []testOperation{
|
||||
// One failed request, one persisted with success.
|
||||
{req: getRequest("key"), resp: getResponse("1", 1)},
|
||||
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 3)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1)},
|
||||
{req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 3)},
|
||||
// Two failed request, two persisted with success.
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "6", "7"), resp: compareAndSetResponse(true, 7)},
|
||||
{req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", 6, "7"), resp: compareAndSetResponse(true, 7)},
|
||||
// One failed request, one persisted with failure.
|
||||
{req: putRequest("key", "8"), resp: putResponse(8)},
|
||||
{req: compareAndSetRequest("key", "8", "9"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "8", "10"), resp: compareAndSetResponse(false, 9)},
|
||||
{req: compareAndSetRequest("key", 8, "9"), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", 8, "10"), resp: compareAndSetResponse(false, 9)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -419,7 +463,7 @@ func TestModelStep(t *testing.T) {
|
||||
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
|
||||
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
|
||||
{req: putWithLeaseRequest("key", "3", 2), resp: putResponse(3), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("2", 2)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 2, 2)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -427,10 +471,10 @@ func TestModelStep(t *testing.T) {
|
||||
operations: []testOperation{
|
||||
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
|
||||
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
|
||||
{req: getRequest("key"), resp: getResponse("2", 2)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "2", 2, 2)},
|
||||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
|
||||
{req: putWithLeaseRequest("key", "4", 1), resp: putResponse(4), failure: true},
|
||||
{req: getRequest("key"), resp: getResponse("", 3)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(3)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -439,7 +483,7 @@ func TestModelStep(t *testing.T) {
|
||||
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
|
||||
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
|
||||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
|
||||
{req: getRequest("key"), resp: getResponse("", 3)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(3)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -449,7 +493,7 @@ func TestModelStep(t *testing.T) {
|
||||
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
|
||||
{req: putRequest("key", "3"), resp: putResponse(3)},
|
||||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
|
||||
{req: getRequest("key"), resp: getResponse("3", 3)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "3", 3, 3)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -460,9 +504,9 @@ func TestModelStep(t *testing.T) {
|
||||
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
|
||||
{req: putWithLeaseRequest("key", "3", 2), resp: putResponse(3)},
|
||||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
|
||||
{req: getRequest("key"), resp: getResponse("3", 3)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "3", 3, 3)},
|
||||
{req: leaseRevokeRequest(2), resp: leaseRevokeResponse(4)},
|
||||
{req: getRequest("key"), resp: getResponse("", 4)},
|
||||
{req: getRequest("key"), resp: emptyGetResponse(4)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -471,7 +515,7 @@ func TestModelStep(t *testing.T) {
|
||||
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
|
||||
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
|
||||
{req: putWithLeaseRequest("key", "3", 1), resp: putResponse(3)},
|
||||
{req: getRequest("key"), resp: getResponse("3", 3)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "3", 3, 3)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -508,10 +552,10 @@ func TestModelStep(t *testing.T) {
|
||||
{req: deleteRequest("key4"), resp: deleteResponse(1, 8)},
|
||||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(9)},
|
||||
{req: deleteRequest("key2"), resp: deleteResponse(0, 9)},
|
||||
{req: getRequest("key1"), resp: getResponse("", 9)},
|
||||
{req: getRequest("key2"), resp: getResponse("", 9)},
|
||||
{req: getRequest("key3"), resp: getResponse("", 9)},
|
||||
{req: getRequest("key4"), resp: getResponse("", 9)},
|
||||
{req: getRequest("key1"), resp: emptyGetResponse(9)},
|
||||
{req: getRequest("key2"), resp: emptyGetResponse(9)},
|
||||
{req: getRequest("key3"), resp: emptyGetResponse(9)},
|
||||
{req: getRequest("key4"), resp: emptyGetResponse(9)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -536,8 +580,8 @@ func TestModelStep(t *testing.T) {
|
||||
{req: putWithLeaseRequest("key", "1", 1), resp: putResponse(2)},
|
||||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: getRequest("key"), resp: getResponse("4", 4)},
|
||||
{req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "4", 4, 4)},
|
||||
{req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5)},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 6)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
},
|
||||
@ -554,9 +598,9 @@ func TestModelStep(t *testing.T) {
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: getRequest("key"), resp: getResponse("4", 4)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "4", 4, 4)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)},
|
||||
{req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 6)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
@ -574,9 +618,9 @@ func TestModelStep(t *testing.T) {
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("4", 4)},
|
||||
{req: getRequest("key"), resp: getResponse("key", "4", 4, 4)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)},
|
||||
{req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 6)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
@ -589,8 +633,16 @@ func TestModelStep(t *testing.T) {
|
||||
for _, op := range tc.operations {
|
||||
ok, newState := Etcd.Step(state, op.req, op.resp)
|
||||
if ok != !op.failure {
|
||||
t.Logf("state: %v", state)
|
||||
t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.failure, ok, Etcd.DescribeOperation(op.req, op.resp))
|
||||
var states PossibleStates
|
||||
err := json.Unmarshal([]byte(state.(string)), &states)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for _, s := range states {
|
||||
_, gotResp := applyRequestToSingleState(s, op.req)
|
||||
t.Logf("For state: %v, diff: %s", state, cmp.Diff(op.resp, gotResp))
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
state = newState
|
||||
@ -615,23 +667,23 @@ func TestModelDescribe(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
req: getRequest("key1"),
|
||||
resp: getResponse("", 1),
|
||||
resp: emptyGetResponse(1),
|
||||
expectDescribe: `get("key1") -> nil, rev: 1`,
|
||||
},
|
||||
{
|
||||
req: getRequest("key2"),
|
||||
resp: getResponse("2", 2),
|
||||
resp: getResponse("key", "2", 2, 2),
|
||||
expectDescribe: `get("key2") -> "2", rev: 2`,
|
||||
},
|
||||
{
|
||||
req: getRequest("key2b"),
|
||||
resp: getResponse("01234567890123456789", 2),
|
||||
resp: getResponse("key2b", "01234567890123456789", 2, 2),
|
||||
expectDescribe: `get("key2b") -> hash: 2945867837, rev: 2`,
|
||||
},
|
||||
{
|
||||
req: putRequest("key3", "3"),
|
||||
resp: putResponse(3),
|
||||
expectDescribe: `put("key3", "3", nil) -> ok, rev: 3`,
|
||||
expectDescribe: `put("key3", "3") -> ok, rev: 3`,
|
||||
},
|
||||
{
|
||||
req: putWithLeaseRequest("key3b", "3b", 3),
|
||||
@ -641,17 +693,17 @@ func TestModelDescribe(t *testing.T) {
|
||||
{
|
||||
req: putRequest("key3c", "01234567890123456789"),
|
||||
resp: putResponse(3),
|
||||
expectDescribe: `put("key3c", hash: 2945867837, nil) -> ok, rev: 3`,
|
||||
expectDescribe: `put("key3c", hash: 2945867837) -> ok, rev: 3`,
|
||||
},
|
||||
{
|
||||
req: putRequest("key4", "4"),
|
||||
resp: failedResponse(errors.New("failed")),
|
||||
expectDescribe: `put("key4", "4", nil) -> err: "failed"`,
|
||||
expectDescribe: `put("key4", "4") -> err: "failed"`,
|
||||
},
|
||||
{
|
||||
req: putRequest("key4b", "4b"),
|
||||
resp: unknownResponse(42),
|
||||
expectDescribe: `put("key4b", "4b", nil) -> unknown, rev: 42`,
|
||||
expectDescribe: `put("key4b", "4b") -> unknown, rev: 42`,
|
||||
},
|
||||
{
|
||||
req: deleteRequest("key5"),
|
||||
@ -664,30 +716,45 @@ func TestModelDescribe(t *testing.T) {
|
||||
expectDescribe: `delete("key6") -> err: "failed"`,
|
||||
},
|
||||
{
|
||||
req: compareAndSetRequest("key7", "7", "77"),
|
||||
req: compareAndSetRequest("key7", 7, "77"),
|
||||
resp: compareAndSetResponse(false, 7),
|
||||
expectDescribe: `if(key7=="7").then(put("key7", "77", nil)) -> txn failed, rev: 7`,
|
||||
expectDescribe: `if(mod_rev(key7)==7).then(put("key7", "77")) -> txn failed, rev: 7`,
|
||||
},
|
||||
{
|
||||
req: compareAndSetRequest("key8", "8", "88"),
|
||||
req: compareAndSetRequest("key8", 8, "88"),
|
||||
resp: compareAndSetResponse(true, 8),
|
||||
expectDescribe: `if(key8=="8").then(put("key8", "88", nil)) -> ok, rev: 8`,
|
||||
expectDescribe: `if(mod_rev(key8)==8).then(put("key8", "88")) -> ok, rev: 8`,
|
||||
},
|
||||
{
|
||||
req: compareAndSetRequest("key9", "9", "99"),
|
||||
req: compareAndSetRequest("key9", 9, "99"),
|
||||
resp: failedResponse(errors.New("failed")),
|
||||
expectDescribe: `if(key9=="9").then(put("key9", "99", nil)) -> err: "failed"`,
|
||||
expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`,
|
||||
},
|
||||
{
|
||||
req: txnRequest(nil, []EtcdOperation{{Type: Get, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}),
|
||||
resp: txnResponse([]EtcdOperationResult{{Value: ValueOrHash{Value: "110"}}, {}, {Deleted: 1}}, true, 10),
|
||||
expectDescribe: `get("10"), put("11", "111", nil), delete("12") -> "110", ok, deleted: 1, rev: 10`,
|
||||
req: txnRequest(nil, []EtcdOperation{{Type: Range, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}),
|
||||
resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{ValueRevision: ValueRevision{Value: ValueOrHash{Value: "110"}}}}}, {}, {Deleted: 1}}, true, 10),
|
||||
expectDescribe: `get("10"), put("11", "111"), delete("12") -> "110", ok, deleted: 1, rev: 10`,
|
||||
},
|
||||
{
|
||||
req: defragmentRequest(),
|
||||
resp: defragmentResponse(),
|
||||
expectDescribe: `defragment() -> ok`,
|
||||
},
|
||||
{
|
||||
req: rangeRequest("key11", true),
|
||||
resp: rangeResponse(nil, 11),
|
||||
expectDescribe: `range("key11") -> [], rev: 11`,
|
||||
},
|
||||
{
|
||||
req: rangeRequest("key12", true),
|
||||
resp: rangeResponse([]*mvccpb.KeyValue{{Value: []byte("12")}}, 12),
|
||||
expectDescribe: `range("key12") -> ["12"], rev: 12`,
|
||||
},
|
||||
{
|
||||
req: rangeRequest("key13", true),
|
||||
resp: rangeResponse([]*mvccpb.KeyValue{{Value: []byte("01234567890123456789")}}, 13),
|
||||
expectDescribe: `range("key13") -> [hash: 2945867837], rev: 13`,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
assert.Equal(t, tc.expectDescribe, Etcd.DescribeOperation(tc.req, tc.resp))
|
||||
@ -701,32 +768,37 @@ func TestModelResponseMatch(t *testing.T) {
|
||||
expectMatch bool
|
||||
}{
|
||||
{
|
||||
resp1: getResponse("a", 1),
|
||||
resp2: getResponse("a", 1),
|
||||
resp1: getResponse("key", "a", 1, 1),
|
||||
resp2: getResponse("key", "a", 1, 1),
|
||||
expectMatch: true,
|
||||
},
|
||||
{
|
||||
resp1: getResponse("a", 1),
|
||||
resp2: getResponse("b", 1),
|
||||
resp1: getResponse("key", "a", 1, 1),
|
||||
resp2: getResponse("key", "b", 1, 1),
|
||||
expectMatch: false,
|
||||
},
|
||||
{
|
||||
resp1: getResponse("a", 1),
|
||||
resp2: getResponse("a", 2),
|
||||
resp1: getResponse("key", "a", 1, 1),
|
||||
resp2: getResponse("key", "a", 2, 1),
|
||||
expectMatch: false,
|
||||
},
|
||||
{
|
||||
resp1: getResponse("a", 1),
|
||||
resp1: getResponse("key", "a", 1, 1),
|
||||
resp2: getResponse("key", "a", 1, 2),
|
||||
expectMatch: false,
|
||||
},
|
||||
{
|
||||
resp1: getResponse("key", "a", 1, 1),
|
||||
resp2: failedResponse(errors.New("failed request")),
|
||||
expectMatch: false,
|
||||
},
|
||||
{
|
||||
resp1: getResponse("a", 1),
|
||||
resp1: getResponse("key", "a", 1, 1),
|
||||
resp2: unknownResponse(1),
|
||||
expectMatch: true,
|
||||
},
|
||||
{
|
||||
resp1: getResponse("a", 1),
|
||||
resp1: getResponse("key", "a", 1, 1),
|
||||
resp2: unknownResponse(0),
|
||||
expectMatch: false,
|
||||
},
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/pkg/v3/stringutil"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/identity"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/model"
|
||||
@ -133,6 +134,58 @@ type requestChance struct {
|
||||
chance int
|
||||
}
|
||||
|
||||
type kubernetesTraffic struct {
|
||||
keyCount int
|
||||
}
|
||||
|
||||
func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-finish:
|
||||
return
|
||||
default:
|
||||
}
|
||||
resource := "pods"
|
||||
|
||||
pods, err := t.Range(ctx, c, "/registry/"+resource+"/", true)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
limiter.Wait(ctx)
|
||||
if len(pods) < t.keyCount {
|
||||
err = t.Create(ctx, c, fmt.Sprintf("/registry/%s/default/%s", resource, stringutil.RandString(5)), fmt.Sprintf("%d", ids.RequestId()))
|
||||
continue
|
||||
} else {
|
||||
randomPod := pods[rand.Intn(len(pods))]
|
||||
err = t.Update(ctx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision)
|
||||
}
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
limiter.Wait(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (t kubernetesTraffic) Range(ctx context.Context, c *recordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
resp, err := c.Range(ctx, key, withPrefix)
|
||||
cancel()
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (t kubernetesTraffic) Create(ctx context.Context, c *recordingClient, key, value string) error {
|
||||
return t.Update(ctx, c, key, value, 0)
|
||||
}
|
||||
|
||||
func (t kubernetesTraffic) Update(ctx context.Context, c *recordingClient, key, value string, expectedRevision int64) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
err := c.CompareAndSet(ctx, key, value, expectedRevision)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
|
||||
|
||||
for {
|
||||
@ -145,25 +198,27 @@ func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limi
|
||||
}
|
||||
key := fmt.Sprintf("%d", rand.Int()%t.keyCount)
|
||||
// Execute one read per one write to avoid operation history include too many failed writes when etcd is down.
|
||||
resp, err := t.Read(ctx, c, limiter, key)
|
||||
resp, err := t.Read(ctx, c, key)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
t.Write(ctx, c, limiter, key, ids, lm, clientId, resp)
|
||||
limiter.Wait(ctx)
|
||||
err = t.Write(ctx, c, limiter, key, ids, lm, clientId, resp)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
limiter.Wait(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (t traffic) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string) ([]*mvccpb.KeyValue, error) {
|
||||
func (t traffic) Read(ctx context.Context, c *recordingClient, key string) (*mvccpb.KeyValue, error) {
|
||||
getCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
resp, err := c.Get(getCtx, key)
|
||||
cancel()
|
||||
if err == nil {
|
||||
limiter.Wait(ctx)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
|
||||
func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues *mvccpb.KeyValue) error {
|
||||
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
|
||||
var err error
|
||||
@ -177,11 +232,11 @@ func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Li
|
||||
case MultiOpTxn:
|
||||
err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id))
|
||||
case CompareAndSet:
|
||||
var expectValue string
|
||||
if len(lastValues) != 0 {
|
||||
expectValue = string(lastValues[0].Value)
|
||||
var expectRevision int64
|
||||
if lastValues != nil {
|
||||
expectRevision = lastValues.ModRevision
|
||||
}
|
||||
err = c.CompareAndSet(writeCtx, key, expectValue, fmt.Sprintf("%d", id.RequestId()))
|
||||
err = c.CompareAndSet(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision)
|
||||
case PutWithLease:
|
||||
leaseId := lm.LeaseId(cid)
|
||||
if leaseId == 0 {
|
||||
@ -211,9 +266,6 @@ func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Li
|
||||
panic("invalid operation")
|
||||
}
|
||||
cancel()
|
||||
if err == nil {
|
||||
limiter.Wait(ctx)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -251,7 +303,7 @@ func (t traffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) {
|
||||
for i, opType := range opTypes {
|
||||
key := fmt.Sprintf("%d", keys[i])
|
||||
switch opType {
|
||||
case model.Get:
|
||||
case model.Range:
|
||||
ops = append(ops, clientv3.OpGet(key))
|
||||
case model.Put:
|
||||
value := fmt.Sprintf("%d", ids.RequestId())
|
||||
@ -271,7 +323,7 @@ func (t traffic) pickOperationType() model.OperationType {
|
||||
return model.Delete
|
||||
}
|
||||
if roll < 50 {
|
||||
return model.Get
|
||||
return model.Range
|
||||
}
|
||||
return model.Put
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user