mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
tests/robustness: Implement Kubernetes optimistic concurrency operations
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
0efa1c19ef
commit
6e53792568
@ -42,11 +42,15 @@ func describeEtcdResponse(request EtcdRequest, response EtcdResponse) string {
|
|||||||
func describeEtcdRequest(request EtcdRequest) string {
|
func describeEtcdRequest(request EtcdRequest) string {
|
||||||
switch request.Type {
|
switch request.Type {
|
||||||
case Txn:
|
case Txn:
|
||||||
describeOperations := describeEtcdOperations(request.Txn.OperationsOnSuccess)
|
onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess)
|
||||||
if len(request.Txn.Conditions) != 0 {
|
if len(request.Txn.Conditions) != 0 {
|
||||||
return fmt.Sprintf("if(%s).then(%s)", describeEtcdConditions(request.Txn.Conditions), describeOperations)
|
if len(request.Txn.OperationsOnFailure) == 0 {
|
||||||
|
return fmt.Sprintf("if(%s).then(%s)", describeEtcdConditions(request.Txn.Conditions), onSuccess)
|
||||||
|
}
|
||||||
|
onFailure := describeEtcdOperations(request.Txn.OperationsOnFailure)
|
||||||
|
return fmt.Sprintf("if(%s).then(%s).else(%s)", describeEtcdConditions(request.Txn.Conditions), onSuccess, onFailure)
|
||||||
}
|
}
|
||||||
return describeOperations
|
return onSuccess
|
||||||
case LeaseGrant:
|
case LeaseGrant:
|
||||||
return fmt.Sprintf("leaseGrant(%d)", request.LeaseGrant.LeaseID)
|
return fmt.Sprintf("leaseGrant(%d)", request.LeaseGrant.LeaseID)
|
||||||
case LeaseRevoke:
|
case LeaseRevoke:
|
||||||
@ -75,14 +79,23 @@ func describeEtcdOperations(ops []EtcdOperation) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func describeTxnResponse(request *TxnRequest, response *TxnResponse) string {
|
func describeTxnResponse(request *TxnRequest, response *TxnResponse) string {
|
||||||
if response.Failure {
|
|
||||||
return fmt.Sprintf("txn failed")
|
|
||||||
}
|
|
||||||
respDescription := make([]string, len(response.Results))
|
respDescription := make([]string, len(response.Results))
|
||||||
for i := range response.Results {
|
for i, result := range response.Results {
|
||||||
respDescription[i] = describeEtcdOperationResponse(request.OperationsOnSuccess[i], response.Results[i])
|
if response.Failure {
|
||||||
|
respDescription[i] = describeEtcdOperationResponse(request.OperationsOnFailure[i], result)
|
||||||
|
} else {
|
||||||
|
respDescription[i] = describeEtcdOperationResponse(request.OperationsOnSuccess[i], result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
description := strings.Join(respDescription, ", ")
|
||||||
|
if len(request.Conditions) == 0 {
|
||||||
|
return description
|
||||||
|
}
|
||||||
|
if response.Failure {
|
||||||
|
return fmt.Sprintf("failure(%s)", description)
|
||||||
|
} else {
|
||||||
|
return fmt.Sprintf("success(%s)", description)
|
||||||
}
|
}
|
||||||
return strings.Join(respDescription, ", ")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func describeEtcdOperation(op EtcdOperation) string {
|
func describeEtcdOperation(op EtcdOperation) string {
|
||||||
|
@ -81,13 +81,13 @@ func TestModelDescribe(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
req: compareRevisionAndPutRequest("key7", 7, "77"),
|
req: compareRevisionAndPutRequest("key7", 7, "77"),
|
||||||
resp: compareRevisionAndPutResponse(false, 7),
|
resp: txnEmptyResponse(false, 7),
|
||||||
expectDescribe: `if(mod_rev(key7)==7).then(put("key7", "77")) -> txn failed, rev: 7`,
|
expectDescribe: `if(mod_rev(key7)==7).then(put("key7", "77")) -> failure(), rev: 7`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
req: compareRevisionAndPutRequest("key8", 8, "88"),
|
req: compareRevisionAndPutRequest("key8", 8, "88"),
|
||||||
resp: compareRevisionAndPutResponse(true, 8),
|
resp: txnPutResponse(true, 8),
|
||||||
expectDescribe: `if(mod_rev(key8)==8).then(put("key8", "88")) -> ok, rev: 8`,
|
expectDescribe: `if(mod_rev(key8)==8).then(put("key8", "88")) -> success(ok), rev: 8`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
req: compareRevisionAndPutRequest("key9", 9, "99"),
|
req: compareRevisionAndPutRequest("key9", 9, "99"),
|
||||||
@ -95,7 +95,17 @@ func TestModelDescribe(t *testing.T) {
|
|||||||
expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`,
|
expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
req: txnRequest(nil, []EtcdOperation{{Type: Range, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}),
|
req: txnRequest([]EtcdCondition{{Key: "key9b", ExpectedRevision: 9}}, []EtcdOperation{{Type: Put, Key: "key9b", Value: ValueOrHash{Value: "991"}}}, []EtcdOperation{{Type: Range, Key: "key9b"}}),
|
||||||
|
resp: txnResponse([]EtcdOperationResult{{}}, true, 10),
|
||||||
|
expectDescribe: `if(mod_rev(key9b)==9).then(put("key9b", "991")).else(get("key9b")) -> success(ok), rev: 10`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
req: txnRequest([]EtcdCondition{{Key: "key9c", ExpectedRevision: 9}}, []EtcdOperation{{Type: Put, Key: "key9c", Value: ValueOrHash{Value: "992"}}}, []EtcdOperation{{Type: Range, Key: "key9c"}}),
|
||||||
|
resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{Key: "key9c", ValueRevision: ValueRevision{Value: ValueOrHash{Value: "993"}, ModRevision: 10}}}}}, false, 10),
|
||||||
|
expectDescribe: `if(mod_rev(key9c)==9).then(put("key9c", "992")).else(get("key9c")) -> failure("993"), rev: 10`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
req: txnRequest(nil, []EtcdOperation{{Type: Range, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}, nil),
|
||||||
resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{ValueRevision: ValueRevision{Value: ValueOrHash{Value: "110"}}}}}, {}, {Deleted: 1}}, true, 10),
|
resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{ValueRevision: ValueRevision{Value: ValueOrHash{Value: "110"}}}}}, {}, {Deleted: 1}}, true, 10),
|
||||||
expectDescribe: `get("10"), put("11", "111"), delete("12") -> "110", ok, deleted: 1, rev: 10`,
|
expectDescribe: `get("10"), put("11", "111"), delete("12") -> "110", ok, deleted: 1, rev: 10`,
|
||||||
},
|
},
|
||||||
|
@ -127,19 +127,20 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) {
|
|||||||
s.KeyValues = newKVs
|
s.KeyValues = newKVs
|
||||||
switch request.Type {
|
switch request.Type {
|
||||||
case Txn:
|
case Txn:
|
||||||
success := true
|
failure := false
|
||||||
for _, cond := range request.Txn.Conditions {
|
for _, cond := range request.Txn.Conditions {
|
||||||
if val := s.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision {
|
if val := s.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision {
|
||||||
success = false
|
failure = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !success {
|
operations := request.Txn.OperationsOnSuccess
|
||||||
return s, EtcdResponse{Revision: s.Revision, Txn: &TxnResponse{Failure: true}}
|
if failure {
|
||||||
|
operations = request.Txn.OperationsOnFailure
|
||||||
}
|
}
|
||||||
opResp := make([]EtcdOperationResult, len(request.Txn.OperationsOnSuccess))
|
opResp := make([]EtcdOperationResult, len(operations))
|
||||||
increaseRevision := false
|
increaseRevision := false
|
||||||
for i, op := range request.Txn.OperationsOnSuccess {
|
for i, op := range operations {
|
||||||
switch op.Type {
|
switch op.Type {
|
||||||
case Range:
|
case Range:
|
||||||
opResp[i] = EtcdOperationResult{
|
opResp[i] = EtcdOperationResult{
|
||||||
@ -198,7 +199,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) {
|
|||||||
if increaseRevision {
|
if increaseRevision {
|
||||||
s.Revision += 1
|
s.Revision += 1
|
||||||
}
|
}
|
||||||
return s, EtcdResponse{Txn: &TxnResponse{Results: opResp}, Revision: s.Revision}
|
return s, EtcdResponse{Txn: &TxnResponse{Failure: failure, Results: opResp}, Revision: s.Revision}
|
||||||
case LeaseGrant:
|
case LeaseGrant:
|
||||||
lease := EtcdLease{
|
lease := EtcdLease{
|
||||||
LeaseID: request.LeaseGrant.LeaseID,
|
LeaseID: request.LeaseGrant.LeaseID,
|
||||||
@ -266,6 +267,7 @@ type EtcdRequest struct {
|
|||||||
type TxnRequest struct {
|
type TxnRequest struct {
|
||||||
Conditions []EtcdCondition
|
Conditions []EtcdCondition
|
||||||
OperationsOnSuccess []EtcdOperation
|
OperationsOnSuccess []EtcdOperation
|
||||||
|
OperationsOnFailure []EtcdOperation
|
||||||
}
|
}
|
||||||
|
|
||||||
type EtcdCondition struct {
|
type EtcdCondition struct {
|
||||||
|
@ -191,7 +191,7 @@ func TestModelBase(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Txn sets new value if value matches expected",
|
name: "Txn executes onSuccess if revision matches expected",
|
||||||
operations: []testOperation{
|
operations: []testOperation{
|
||||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse},
|
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse},
|
||||||
{req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(true, 1).EtcdResponse, failure: true},
|
{req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(true, 1).EtcdResponse, failure: true},
|
||||||
@ -206,28 +206,26 @@ func TestModelBase(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Txn can expect on empty key",
|
name: "Txn can expect on key not existing",
|
||||||
operations: []testOperation{
|
operations: []testOperation{
|
||||||
{req: getRequest("key1"), resp: emptyGetResponse(1).EtcdResponse},
|
{req: getRequest("key1"), resp: emptyGetResponse(1).EtcdResponse},
|
||||||
{req: compareRevisionAndPutRequest("key1", 0, "2"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse},
|
{req: compareRevisionAndPutRequest("key1", 0, "2"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse},
|
||||||
{req: compareRevisionAndPutRequest("key2", 0, "3"), resp: compareRevisionAndPutResponse(true, 3).EtcdResponse},
|
{req: compareRevisionAndPutRequest("key1", 0, "3"), resp: compareRevisionAndPutResponse(true, 3).EtcdResponse, failure: true},
|
||||||
{req: compareRevisionAndPutRequest("key3", 4, "4"), resp: compareRevisionAndPutResponse(false, 4).EtcdResponse, failure: true},
|
{req: txnRequestSingleOperation(compareRevision("key1", 0), putOperation("key1", "4"), putOperation("key1", "5")), resp: txnPutResponse(false, 3).EtcdResponse},
|
||||||
|
{req: getRequest("key1"), resp: getResponse("key1", "5", 3, 3).EtcdResponse},
|
||||||
|
{req: compareRevisionAndPutRequest("key2", 0, "6"), resp: compareRevisionAndPutResponse(true, 4).EtcdResponse},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Txn doesn't do anything if value doesn't match expected",
|
name: "Txn executes onFailure if revision doesn't match expected",
|
||||||
operations: []testOperation{
|
operations: []testOperation{
|
||||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse},
|
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse},
|
||||||
{req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse, failure: true},
|
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnPutResponse(false, 2).EtcdResponse, failure: true},
|
||||||
{req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 1).EtcdResponse, failure: true},
|
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnEmptyResponse(false, 2).EtcdResponse, failure: true},
|
||||||
{req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 2).EtcdResponse, failure: true},
|
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnEmptyResponse(true, 2).EtcdResponse, failure: true},
|
||||||
{req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 1).EtcdResponse},
|
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnPutResponse(true, 1).EtcdResponse, failure: true},
|
||||||
{req: getRequest("key"), resp: getResponse("key", "2", 1, 1).EtcdResponse, failure: true},
|
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnEmptyResponse(true, 1).EtcdResponse},
|
||||||
{req: getRequest("key"), resp: getResponse("key", "2", 2, 2).EtcdResponse, failure: true},
|
{req: txnRequestSingleOperation(compareRevision("key", 2), nil, putOperation("key", "2")), resp: txnPutResponse(false, 2).EtcdResponse},
|
||||||
{req: getRequest("key"), resp: getResponse("key", "3", 1, 1).EtcdResponse, failure: true},
|
|
||||||
{req: getRequest("key"), resp: getResponse("key", "3", 1, 2).EtcdResponse, failure: true},
|
|
||||||
{req: getRequest("key"), resp: getResponse("key", "3", 2, 2).EtcdResponse, failure: true},
|
|
||||||
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -189,58 +189,20 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *AppendableHistory) AppendCompareRevisionAndDelete(key string, expectedRevision int64, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
|
func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
|
||||||
request := compareRevisionAndDeleteRequest(key, expectedRevision)
|
|
||||||
if err != nil {
|
|
||||||
h.appendFailed(request, start.Nanoseconds(), err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var revision int64
|
|
||||||
if resp != nil && resp.Header != nil {
|
|
||||||
revision = resp.Header.Revision
|
|
||||||
}
|
|
||||||
var deleted int64
|
|
||||||
if resp != nil && len(resp.Responses) > 0 {
|
|
||||||
deleted = resp.Responses[0].GetResponseDeleteRange().Deleted
|
|
||||||
}
|
|
||||||
h.appendSuccessful(porcupine.Operation{
|
|
||||||
ClientId: h.streamId,
|
|
||||||
Input: request,
|
|
||||||
Call: start.Nanoseconds(),
|
|
||||||
Output: compareRevisionAndDeleteResponse(resp.Succeeded, deleted, revision),
|
|
||||||
Return: end.Nanoseconds(),
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
|
||||||
func (h *AppendableHistory) AppendCompareRevisionAndPut(key string, expectedRevision int64, value string, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
|
|
||||||
request := compareRevisionAndPutRequest(key, expectedRevision, value)
|
|
||||||
if err != nil {
|
|
||||||
h.appendFailed(request, start.Nanoseconds(), err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var revision int64
|
|
||||||
if resp != nil && resp.Header != nil {
|
|
||||||
revision = resp.Header.Revision
|
|
||||||
}
|
|
||||||
h.appendSuccessful(porcupine.Operation{
|
|
||||||
ClientId: h.streamId,
|
|
||||||
Input: request,
|
|
||||||
Call: start.Nanoseconds(),
|
|
||||||
Output: compareRevisionAndPutResponse(resp.Succeeded, revision),
|
|
||||||
Return: end.Nanoseconds(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
|
|
||||||
conds := []EtcdCondition{}
|
conds := []EtcdCondition{}
|
||||||
for _, cmp := range cmp {
|
for _, cmp := range cmp {
|
||||||
conds = append(conds, toEtcdCondition(cmp))
|
conds = append(conds, toEtcdCondition(cmp))
|
||||||
}
|
}
|
||||||
ops := []EtcdOperation{}
|
modelOnSuccess := []EtcdOperation{}
|
||||||
for _, op := range onSuccess {
|
for _, op := range clientOnSuccessOps {
|
||||||
ops = append(ops, toEtcdOperation(op))
|
modelOnSuccess = append(modelOnSuccess, toEtcdOperation(op))
|
||||||
}
|
}
|
||||||
request := txnRequest(conds, ops)
|
modelOnFailure := []EtcdOperation{}
|
||||||
|
for _, op := range clientOnFailure {
|
||||||
|
modelOnFailure = append(modelOnFailure, toEtcdOperation(op))
|
||||||
|
}
|
||||||
|
request := txnRequest(conds, modelOnSuccess, modelOnFailure)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.appendFailed(request, start.Nanoseconds(), err)
|
h.appendFailed(request, start.Nanoseconds(), err)
|
||||||
return
|
return
|
||||||
@ -293,6 +255,7 @@ func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) {
|
|||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("Compare not supported, target: %q, result: %q", cmp.Target, cmp.Result))
|
panic(fmt.Sprintf("Compare not supported, target: %q, result: %q", cmp.Target, cmp.Result))
|
||||||
}
|
}
|
||||||
|
cond.ExpectedRevision = cmp.TargetUnion.(*etcdserverpb.Compare_ModRevision).ModRevision
|
||||||
return cond
|
return cond
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -447,32 +410,51 @@ func deleteResponse(deleted int64, revision int64) EtcdNonDeterministicResponse
|
|||||||
return EtcdNonDeterministicResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}}
|
return EtcdNonDeterministicResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func compareRevisionAndDeleteRequest(key string, expectedRevision int64) EtcdRequest {
|
|
||||||
return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Delete, Key: key}})
|
|
||||||
}
|
|
||||||
|
|
||||||
func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest {
|
func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest {
|
||||||
return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}})
|
return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func compareRevisionAndPutResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse {
|
func compareRevisionAndPutResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse {
|
||||||
var result []EtcdOperationResult
|
|
||||||
if succeeded {
|
if succeeded {
|
||||||
result = []EtcdOperationResult{{}}
|
return txnPutResponse(succeeded, revision)
|
||||||
}
|
}
|
||||||
return txnResponse(result, succeeded, revision)
|
return txnEmptyResponse(succeeded, revision)
|
||||||
}
|
}
|
||||||
|
|
||||||
func compareRevisionAndDeleteResponse(succeeded bool, deleted, revision int64) EtcdNonDeterministicResponse {
|
func compareRevision(key string, expectedRevision int64) *EtcdCondition {
|
||||||
var result []EtcdOperationResult
|
return &EtcdCondition{Key: key, ExpectedRevision: expectedRevision}
|
||||||
if succeeded {
|
|
||||||
result = []EtcdOperationResult{{Deleted: deleted}}
|
|
||||||
}
|
|
||||||
return txnResponse(result, succeeded, revision)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func txnRequest(conds []EtcdCondition, onSuccess []EtcdOperation) EtcdRequest {
|
func putOperation(key, value string) *EtcdOperation {
|
||||||
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conditions: conds, OperationsOnSuccess: onSuccess}}
|
return &EtcdOperation{Type: Put, Key: key, Value: ToValueOrHash(value)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func txnRequestSingleOperation(cond *EtcdCondition, onSuccess, onFailure *EtcdOperation) EtcdRequest {
|
||||||
|
var conds []EtcdCondition
|
||||||
|
if cond != nil {
|
||||||
|
conds = []EtcdCondition{*cond}
|
||||||
|
}
|
||||||
|
var onSuccess2 []EtcdOperation
|
||||||
|
if onSuccess != nil {
|
||||||
|
onSuccess2 = []EtcdOperation{*onSuccess}
|
||||||
|
}
|
||||||
|
var onFailure2 []EtcdOperation
|
||||||
|
if onFailure != nil {
|
||||||
|
onFailure2 = []EtcdOperation{*onFailure}
|
||||||
|
}
|
||||||
|
return txnRequest(conds, onSuccess2, onFailure2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func txnRequest(conds []EtcdCondition, onSuccess, onFailure []EtcdOperation) EtcdRequest {
|
||||||
|
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conditions: conds, OperationsOnSuccess: onSuccess, OperationsOnFailure: onFailure}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func txnPutResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse {
|
||||||
|
return txnResponse([]EtcdOperationResult{{}}, succeeded, revision)
|
||||||
|
}
|
||||||
|
|
||||||
|
func txnEmptyResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse {
|
||||||
|
return txnResponse([]EtcdOperationResult{}, succeeded, revision)
|
||||||
}
|
}
|
||||||
|
|
||||||
func txnResponse(result []EtcdOperationResult, succeeded bool, revision int64) EtcdNonDeterministicResponse {
|
func txnResponse(result []EtcdOperationResult, succeeded bool, revision int64) EtcdNonDeterministicResponse {
|
||||||
|
@ -131,56 +131,21 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RecordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error {
|
func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) {
|
||||||
txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key))
|
|
||||||
c.opMux.Lock()
|
|
||||||
defer c.opMux.Unlock()
|
|
||||||
callTime := time.Since(c.baseTime)
|
|
||||||
resp, err := txn.Commit()
|
|
||||||
returnTime := time.Since(c.baseTime)
|
|
||||||
c.operations.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *RecordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error {
|
|
||||||
txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value))
|
|
||||||
c.opMux.Lock()
|
|
||||||
defer c.opMux.Unlock()
|
|
||||||
callTime := time.Since(c.baseTime)
|
|
||||||
resp, err := txn.Commit()
|
|
||||||
returnTime := time.Since(c.baseTime)
|
|
||||||
c.operations.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *RecordingClient) compareRevisionTxn(ctx context.Context, key string, expectedRevision int64, op clientv3.Op) clientv3.Txn {
|
|
||||||
txn := c.client.Txn(ctx)
|
|
||||||
var cmp clientv3.Cmp
|
|
||||||
if expectedRevision == 0 {
|
|
||||||
cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
|
|
||||||
} else {
|
|
||||||
cmp = clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)
|
|
||||||
}
|
|
||||||
return txn.If(
|
|
||||||
cmp,
|
|
||||||
).Then(
|
|
||||||
op,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *RecordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error {
|
|
||||||
txn := c.client.Txn(ctx).If(
|
txn := c.client.Txn(ctx).If(
|
||||||
cmp...,
|
conditions...,
|
||||||
).Then(
|
).Then(
|
||||||
ops...,
|
onSuccess...,
|
||||||
|
).Else(
|
||||||
|
onFailure...,
|
||||||
)
|
)
|
||||||
c.opMux.Lock()
|
c.opMux.Lock()
|
||||||
defer c.opMux.Unlock()
|
defer c.opMux.Unlock()
|
||||||
callTime := time.Since(c.baseTime)
|
callTime := time.Since(c.baseTime)
|
||||||
resp, err := txn.Commit()
|
resp, err := txn.Commit()
|
||||||
returnTime := time.Since(c.baseTime)
|
returnTime := time.Since(c.baseTime)
|
||||||
c.operations.AppendTxn(cmp, ops, callTime, returnTime, resp, err)
|
c.operations.AppendTxn(conditions, onSuccess, onFailure, callTime, returnTime, resp, err)
|
||||||
return err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) {
|
func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) {
|
||||||
|
@ -122,7 +122,7 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat
|
|||||||
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
switch etcdRequestType(pickRandom(t.writeChoices)) {
|
switch pickRandom(t.writeChoices) {
|
||||||
case Put:
|
case Put:
|
||||||
err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.NewRequestId()))
|
err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.NewRequestId()))
|
||||||
case LargePut:
|
case LargePut:
|
||||||
@ -130,13 +130,14 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat
|
|||||||
case Delete:
|
case Delete:
|
||||||
err = c.Delete(writeCtx, key)
|
err = c.Delete(writeCtx, key)
|
||||||
case MultiOpTxn:
|
case MultiOpTxn:
|
||||||
err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id))
|
_, err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id), nil)
|
||||||
case CompareAndSet:
|
case CompareAndSet:
|
||||||
var expectRevision int64
|
var expectedRevision int64
|
||||||
if lastValues != nil {
|
if lastValues != nil {
|
||||||
expectRevision = lastValues.ModRevision
|
expectedRevision = lastValues.ModRevision
|
||||||
}
|
}
|
||||||
err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.NewRequestId()), expectRevision)
|
value := fmt.Sprintf("%d", id.NewRequestId())
|
||||||
|
_, err = c.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, value)}, nil)
|
||||||
case PutWithLease:
|
case PutWithLease:
|
||||||
leaseId := lm.LeaseId(cid)
|
leaseId := lm.LeaseId(cid)
|
||||||
if leaseId == 0 {
|
if leaseId == 0 {
|
||||||
|
@ -56,15 +56,8 @@ type kubernetesTraffic struct {
|
|||||||
writeChoices []choiceWeight[KubernetesRequestType]
|
writeChoices []choiceWeight[KubernetesRequestType]
|
||||||
}
|
}
|
||||||
|
|
||||||
type KubernetesRequestType string
|
|
||||||
|
|
||||||
const (
|
|
||||||
KubernetesUpdate KubernetesRequestType = "update"
|
|
||||||
KubernetesCreate KubernetesRequestType = "create"
|
|
||||||
KubernetesDelete KubernetesRequestType = "delete"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
|
func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
|
||||||
|
kc := &kubernetesClient{client: c}
|
||||||
s := newStorage()
|
s := newStorage()
|
||||||
keyPrefix := "/registry/" + t.resource + "/"
|
keyPrefix := "/registry/" + t.resource + "/"
|
||||||
g := errgroup.Group{}
|
g := errgroup.Group{}
|
||||||
@ -78,7 +71,9 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingCl
|
|||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
resp, err := t.Range(ctx, c, keyPrefix, true)
|
listCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||||
|
resp, err := kc.List(listCtx, keyPrefix)
|
||||||
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -103,14 +98,18 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingCl
|
|||||||
}
|
}
|
||||||
// Avoid multiple failed writes in a row
|
// Avoid multiple failed writes in a row
|
||||||
if lastWriteFailed {
|
if lastWriteFailed {
|
||||||
resp, err := t.Range(ctx, c, keyPrefix, true)
|
listCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||||
|
resp, err := kc.List(listCtx, keyPrefix)
|
||||||
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.Reset(resp)
|
s.Reset(resp)
|
||||||
limiter.Wait(ctx)
|
limiter.Wait(ctx)
|
||||||
}
|
}
|
||||||
err := t.Write(ctx, c, ids, s)
|
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||||
|
err := t.Write(writeCtx, kc, ids, s)
|
||||||
|
cancel()
|
||||||
lastWriteFailed = err != nil
|
lastWriteFailed = err != nil
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
@ -121,28 +120,26 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingCl
|
|||||||
g.Wait()
|
g.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, s *storage) (err error) {
|
func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage) (err error) {
|
||||||
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
count := s.Count()
|
count := s.Count()
|
||||||
if count < t.averageKeyCount/2 {
|
if count < t.averageKeyCount/2 {
|
||||||
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
|
err = kc.OptimisticCreate(ctx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
|
||||||
} else {
|
} else {
|
||||||
key, rev := s.PickRandom()
|
key, rev := s.PickRandom()
|
||||||
if rev == 0 {
|
if rev == 0 {
|
||||||
return errors.New("storage empty")
|
return errors.New("storage empty")
|
||||||
}
|
}
|
||||||
if count > t.averageKeyCount*3/2 {
|
if count > t.averageKeyCount*3/2 {
|
||||||
err = t.Delete(writeCtx, c, key, rev)
|
_, err = kc.OptimisticDelete(ctx, key, rev)
|
||||||
} else {
|
} else {
|
||||||
op := pickRandom(t.writeChoices)
|
op := pickRandom(t.writeChoices)
|
||||||
switch op {
|
switch op {
|
||||||
case KubernetesDelete:
|
case KubernetesDelete:
|
||||||
err = t.Delete(writeCtx, c, key, rev)
|
_, err = kc.OptimisticDelete(ctx, key, rev)
|
||||||
case KubernetesUpdate:
|
case KubernetesUpdate:
|
||||||
err = t.Update(writeCtx, c, key, fmt.Sprintf("%d", ids.NewRequestId()), rev)
|
_, err = kc.OptimisticUpdate(ctx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev)
|
||||||
case KubernetesCreate:
|
case KubernetesCreate:
|
||||||
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
|
err = kc.OptimisticCreate(ctx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("invalid choice: %q", op))
|
panic(fmt.Sprintf("invalid choice: %q", op))
|
||||||
}
|
}
|
||||||
@ -155,29 +152,56 @@ func (t kubernetesTraffic) generateKey() string {
|
|||||||
return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5))
|
return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) (*clientv3.GetResponse, error) {
|
type KubernetesRequestType string
|
||||||
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
|
||||||
resp, err := c.Range(ctx, key, withPrefix)
|
const (
|
||||||
cancel()
|
KubernetesDelete KubernetesRequestType = "delete"
|
||||||
|
KubernetesUpdate KubernetesRequestType = "update"
|
||||||
|
KubernetesCreate KubernetesRequestType = "create"
|
||||||
|
)
|
||||||
|
|
||||||
|
type kubernetesClient struct {
|
||||||
|
client *RecordingClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k kubernetesClient) List(ctx context.Context, key string) (*clientv3.GetResponse, error) {
|
||||||
|
resp, err := k.client.Range(ctx, key, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t kubernetesTraffic) Create(ctx context.Context, c *RecordingClient, key, value string) error {
|
func (k kubernetesClient) OptimisticDelete(ctx context.Context, key string, expectedRevision int64) (*mvccpb.KeyValue, error) {
|
||||||
return t.Update(ctx, c, key, value, 0)
|
return k.optimisticOperationOrGet(ctx, key, clientv3.OpDelete(key), expectedRevision)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t kubernetesTraffic) Update(ctx context.Context, c *RecordingClient, key, value string, expectedRevision int64) error {
|
func (k kubernetesClient) OptimisticUpdate(ctx context.Context, key, value string, expectedRevision int64) (*mvccpb.KeyValue, error) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
return k.optimisticOperationOrGet(ctx, key, clientv3.OpPut(key, value), expectedRevision)
|
||||||
err := c.CompareRevisionAndPut(ctx, key, value, expectedRevision)
|
}
|
||||||
cancel()
|
|
||||||
|
func (k kubernetesClient) OptimisticCreate(ctx context.Context, key, value string) error {
|
||||||
|
_, err := k.client.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", 0)}, []clientv3.Op{clientv3.OpPut(key, value)}, nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t kubernetesTraffic) Delete(ctx context.Context, c *RecordingClient, key string, expectedRevision int64) error {
|
// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing.
|
||||||
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure.
|
||||||
err := c.CompareRevisionAndDelete(ctx, key, expectedRevision)
|
func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) {
|
||||||
cancel()
|
resp, err := k.client.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{operation}, []clientv3.Op{clientv3.OpGet(key)})
|
||||||
return err
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !resp.Succeeded {
|
||||||
|
getResp := (*clientv3.GetResponse)(resp.Responses[0].GetResponseRange())
|
||||||
|
if err != nil || len(getResp.Kvs) == 0 {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(getResp.Kvs) == 1 {
|
||||||
|
return getResp.Kvs[0], err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type storage struct {
|
type storage struct {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user