Merge pull request #15259 from serathius/linearizability-multi-op-txn

tests: Implement multi operation Txn
This commit is contained in:
Marek Siarkowicz 2023-02-15 09:35:16 +01:00 committed by GitHub
commit 202d813c7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 240 additions and 76 deletions

View File

@ -93,7 +93,20 @@ func (c *recordingClient) CompareAndSet(ctx context.Context, key, expectedValue,
clientv3.OpPut(key, newValue),
).Commit()
returnTime := time.Now()
c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err)
c.history.AppendCompareAndSet(key, expectedValue, newValue, callTime, returnTime, resp, err)
return err
}
func (c *recordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error {
callTime := time.Now()
txn := c.client.Txn(ctx)
resp, err := txn.If(
cmp...,
).Then(
ops...,
).Commit()
returnTime := time.Now()
c.history.AppendTxn(cmp, ops, callTime, returnTime, resp, err)
return err
}

View File

@ -50,13 +50,14 @@ var (
maximalQPS: 200,
clientCount: 8,
traffic: traffic{
keyCount: 4,
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
writes: []requestChance{
{operation: Put, chance: 50},
{operation: Put, chance: 45},
{operation: LargePut, chance: 5},
{operation: Delete, chance: 10},
{operation: MultiOpTxn, chance: 10},
{operation: PutWithLease, chance: 10},
{operation: LeaseRevoke, chance: 10},
{operation: CompareAndSet, chance: 10},
@ -69,11 +70,12 @@ var (
maximalQPS: 1000,
clientCount: 12,
traffic: traffic{
keyCount: 4,
keyCount: 10,
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
writes: []requestChance{
{operation: Put, chance: 90},
{operation: Put, chance: 85},
{operation: MultiOpTxn, chance: 10},
{operation: LargePut, chance: 5},
},
},

View File

@ -15,10 +15,12 @@
package model
import (
"fmt"
"time"
"github.com/anishathalye/porcupine"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/linearizability/identity"
)
@ -161,8 +163,8 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Time, resp
})
}
func (h *AppendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) {
request := txnRequest(key, expectValue, newValue)
func (h *AppendableHistory) AppendCompareAndSet(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) {
request := compareAndSetRequest(key, expectValue, newValue)
if err != nil {
h.appendFailed(request, start, err)
return
@ -175,11 +177,96 @@ func (h *AppendableHistory) AppendTxn(key, expectValue, newValue string, start,
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: txnResponse(resp.Succeeded, revision),
Output: compareAndSetResponse(resp.Succeeded, revision),
Return: end.UnixNano(),
})
}
func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.Op, start, end time.Time, resp *clientv3.TxnResponse, err error) {
conds := []EtcdCondition{}
for _, cmp := range cmp {
conds = append(conds, toEtcdCondition(cmp))
}
ops := []EtcdOperation{}
for _, op := range onSuccess {
ops = append(ops, toEtcdOperation(op))
}
request := txnRequest(conds, ops)
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
results := []EtcdOperationResult{}
for _, resp := range resp.Responses {
results = append(results, toEtcdOperationResult(resp))
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: txnResponse(results, resp.Succeeded, revision),
Return: end.UnixNano(),
})
}
func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) {
switch {
case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_VALUE:
cond.Key = string(cmp.KeyBytes())
cond.ExpectedValue = ToValueOrHash(string(cmp.ValueBytes()))
case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_CREATE:
cond.Key = string(cmp.KeyBytes())
default:
panic(fmt.Sprintf("Compare not supported, target: %q, result: %q", cmp.Target, cmp.Result))
}
return cond
}
func toEtcdOperation(op clientv3.Op) EtcdOperation {
var opType OperationType
switch {
case op.IsGet():
opType = Get
case op.IsPut():
opType = Put
case op.IsDelete():
opType = Delete
default:
panic("Unsupported operation")
}
return EtcdOperation{
Type: opType,
Key: string(op.KeyBytes()),
Value: ValueOrHash{Value: string(op.ValueBytes())},
}
}
func toEtcdOperationResult(resp *etcdserverpb.ResponseOp) EtcdOperationResult {
switch {
case resp.GetResponseRange() != nil:
getResp := resp.GetResponseRange()
var val string
if len(getResp.Kvs) != 0 {
val = string(getResp.Kvs[0].Value)
}
return EtcdOperationResult{
Value: ToValueOrHash(val),
}
case resp.GetResponsePut() != nil:
return EtcdOperationResult{}
case resp.GetResponseDeleteRange() != nil:
return EtcdOperationResult{
Deleted: resp.GetResponseDeleteRange().Deleted,
}
default:
panic("Unsupported operation")
}
}
func (h *AppendableHistory) AppendDefragment(start, end time.Time, resp *clientv3.DefragmentResponse, err error) {
request := defragmentRequest()
if err != nil {
@ -240,15 +327,23 @@ func deleteResponse(deleted int64, revision int64) EtcdResponse {
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}
}
func txnRequest(key, expectValue, newValue string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: []EtcdCondition{{Key: key, ExpectedValue: ToValueOrHash(expectValue)}}, Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(newValue)}}}}
func compareAndSetRequest(key, expectValue, newValue string) EtcdRequest {
return txnRequest([]EtcdCondition{{Key: key, ExpectedValue: ToValueOrHash(expectValue)}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(newValue)}})
}
func txnResponse(succeeded bool, revision int64) EtcdResponse {
func compareAndSetResponse(succeeded bool, revision int64) EtcdResponse {
var result []EtcdOperationResult
if succeeded {
result = []EtcdOperationResult{{}}
}
return txnResponse(result, succeeded, revision)
}
func txnRequest(conds []EtcdCondition, onSuccess []EtcdOperation) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: conds, Ops: onSuccess}}
}
func txnResponse(result []EtcdOperationResult, succeeded bool, revision int64) EtcdResponse {
return EtcdResponse{Txn: &TxnResponse{OpsResult: result, TxnResult: !succeeded}, Revision: revision}
}

View File

@ -47,7 +47,7 @@ func TestModelStep(t *testing.T) {
{
name: "First Txn can start from non-zero revision",
operations: []testOperation{
{req: txnRequest("key", "", "42"), resp: txnResponse(false, 42)},
{req: compareAndSetRequest("key", "", "42"), resp: compareAndSetResponse(false, 42)},
},
},
{
@ -118,11 +118,11 @@ func TestModelStep(t *testing.T) {
// Txn failure
{req: getRequest("key"), resp: getResponse("", 1)},
{req: putRequest("key", "1"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "2", "3"), resp: txnResponse(false, 1)},
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 1)},
// Txn success
{req: putRequest("key", "2"), resp: putResponse(2)},
{req: putRequest("key", "4"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "2", "5"), resp: txnResponse(true, 3)},
{req: compareAndSetRequest("key", "2", "5"), resp: compareAndSetResponse(true, 3)},
},
},
{
@ -171,11 +171,11 @@ func TestModelStep(t *testing.T) {
// Txn success
{req: getRequest("key"), resp: getResponse("", 1)},
{req: putRequest("key", "2"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "2", ""), resp: txnResponse(true, 2), failure: true},
{req: txnRequest("key", "2", ""), resp: txnResponse(true, 3)},
{req: compareAndSetRequest("key", "2", ""), resp: compareAndSetResponse(true, 2), failure: true},
{req: compareAndSetRequest("key", "2", ""), resp: compareAndSetResponse(true, 3)},
// Txn failure
{req: putRequest("key", "4"), resp: putResponse(4)},
{req: txnRequest("key", "5", ""), resp: txnResponse(false, 4)},
{req: compareAndSetRequest("key", "5", ""), resp: compareAndSetResponse(false, 4)},
{req: putRequest("key", "5"), resp: failedResponse(errors.New("failed"))},
{req: getRequest("key"), resp: getResponse("5", 5)},
},
@ -282,21 +282,21 @@ func TestModelStep(t *testing.T) {
// Txn success
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "", "3"), resp: txnResponse(true, 3)},
{req: compareAndSetRequest("key", "", "3"), resp: compareAndSetResponse(true, 3)},
// Txn failure
{req: putRequest("key", "4"), resp: putResponse(4)},
{req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "4", "5"), resp: txnResponse(false, 5)},
{req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(false, 5)},
},
},
{
name: "Txn sets new value if value matches expected",
operations: []testOperation{
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: txnRequest("key", "1", "2"), resp: txnResponse(true, 1), failure: true},
{req: txnRequest("key", "1", "2"), resp: txnResponse(false, 2), failure: true},
{req: txnRequest("key", "1", "2"), resp: txnResponse(false, 1), failure: true},
{req: txnRequest("key", "1", "2"), resp: txnResponse(true, 2)},
{req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(true, 1), failure: true},
{req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(false, 2), failure: true},
{req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(false, 1), failure: true},
{req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(true, 2)},
{req: getRequest("key"), resp: getResponse("1", 1), failure: true},
{req: getRequest("key"), resp: getResponse("1", 2), failure: true},
{req: getRequest("key"), resp: getResponse("2", 1), failure: true},
@ -307,19 +307,19 @@ func TestModelStep(t *testing.T) {
name: "Txn can expect on empty key",
operations: []testOperation{
{req: getRequest("key1"), resp: getResponse("", 1)},
{req: txnRequest("key1", "", "2"), resp: txnResponse(true, 2)},
{req: txnRequest("key2", "", "3"), resp: txnResponse(true, 3)},
{req: txnRequest("key3", "4", "4"), resp: txnResponse(false, 4), failure: true},
{req: compareAndSetRequest("key1", "", "2"), resp: compareAndSetResponse(true, 2)},
{req: compareAndSetRequest("key2", "", "3"), resp: compareAndSetResponse(true, 3)},
{req: compareAndSetRequest("key3", "4", "4"), resp: compareAndSetResponse(false, 4), failure: true},
},
},
{
name: "Txn doesn't do anything if value doesn't match expected",
operations: []testOperation{
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: txnRequest("key", "2", "3"), resp: txnResponse(true, 2), failure: true},
{req: txnRequest("key", "2", "3"), resp: txnResponse(true, 1), failure: true},
{req: txnRequest("key", "2", "3"), resp: txnResponse(false, 2), failure: true},
{req: txnRequest("key", "2", "3"), resp: txnResponse(false, 1)},
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 2), failure: true},
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 1), failure: true},
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 2), failure: true},
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 1)},
{req: getRequest("key"), resp: getResponse("2", 1), failure: true},
{req: getRequest("key"), resp: getResponse("2", 2), failure: true},
{req: getRequest("key"), resp: getResponse("3", 1), failure: true},
@ -331,7 +331,7 @@ func TestModelStep(t *testing.T) {
name: "Txn can fail and be lost before get",
operations: []testOperation{
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: getRequest("key"), resp: getResponse("2", 2), failure: true},
},
@ -340,7 +340,7 @@ func TestModelStep(t *testing.T) {
name: "Txn can fail and be lost before delete",
operations: []testOperation{
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: deleteRequest("key"), resp: deleteResponse(1, 2)},
},
},
@ -348,7 +348,7 @@ func TestModelStep(t *testing.T) {
name: "Txn can fail and be lost before put",
operations: []testOperation{
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: putRequest("key", "3"), resp: putResponse(2)},
},
},
@ -357,13 +357,13 @@ func TestModelStep(t *testing.T) {
operations: []testOperation{
// One failed request, one persisted.
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: getRequest("key"), resp: getResponse("2", 1), failure: true},
{req: getRequest("key"), resp: getResponse("2", 2)},
// Two failed request, two persisted.
{req: putRequest("key", "3"), resp: putResponse(3)},
{req: txnRequest("key", "3", "4"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "3", "4"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
{req: getRequest("key"), resp: getResponse("5", 5)},
},
},
@ -372,12 +372,12 @@ func TestModelStep(t *testing.T) {
operations: []testOperation{
// One failed request, one persisted.
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: putRequest("key", "3"), resp: putResponse(3)},
// Two failed request, two persisted.
{req: putRequest("key", "4"), resp: putResponse(4)},
{req: txnRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))},
{req: putRequest("key", "7"), resp: putResponse(7)},
},
},
@ -386,12 +386,12 @@ func TestModelStep(t *testing.T) {
operations: []testOperation{
// One failed request, one persisted.
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: deleteRequest("key"), resp: deleteResponse(1, 3)},
// Two failed request, two persisted.
{req: putRequest("key", "4"), resp: putResponse(4)},
{req: txnRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))},
{req: deleteRequest("key"), resp: deleteResponse(1, 7)},
},
},
@ -400,17 +400,17 @@ func TestModelStep(t *testing.T) {
operations: []testOperation{
// One failed request, one persisted with success.
{req: getRequest("key"), resp: getResponse("1", 1)},
{req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "2", "3"), resp: txnResponse(true, 3)},
{req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 3)},
// Two failed request, two persisted with success.
{req: putRequest("key", "4"), resp: putResponse(4)},
{req: txnRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "6", "7"), resp: txnResponse(true, 7)},
{req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "6", "7"), resp: compareAndSetResponse(true, 7)},
// One failed request, one persisted with failure.
{req: putRequest("key", "8"), resp: putResponse(8)},
{req: txnRequest("key", "8", "9"), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "8", "10"), resp: txnResponse(false, 9)},
{req: compareAndSetRequest("key", "8", "9"), resp: failedResponse(errors.New("failed"))},
{req: compareAndSetRequest("key", "8", "10"), resp: compareAndSetResponse(false, 9)},
},
},
{
@ -537,7 +537,7 @@ func TestModelStep(t *testing.T) {
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
{req: putRequest("key", "4"), resp: putResponse(4)},
{req: getRequest("key"), resp: getResponse("4", 4)},
{req: txnRequest("key", "4", "5"), resp: txnResponse(true, 5)},
{req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)},
{req: deleteRequest("key"), resp: deleteResponse(1, 6)},
{req: defragmentRequest(), resp: defragmentResponse()},
},
@ -556,7 +556,7 @@ func TestModelStep(t *testing.T) {
{req: defragmentRequest(), resp: defragmentResponse()},
{req: getRequest("key"), resp: getResponse("4", 4)},
{req: defragmentRequest(), resp: defragmentResponse()},
{req: txnRequest("key", "4", "5"), resp: txnResponse(true, 5)},
{req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)},
{req: defragmentRequest(), resp: defragmentResponse()},
{req: deleteRequest("key"), resp: deleteResponse(1, 6)},
{req: defragmentRequest(), resp: defragmentResponse()},
@ -576,7 +576,7 @@ func TestModelStep(t *testing.T) {
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
{req: getRequest("key"), resp: getResponse("4", 4)},
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
{req: txnRequest("key", "4", "5"), resp: txnResponse(true, 5)},
{req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)},
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
{req: deleteRequest("key"), resp: deleteResponse(1, 6)},
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
@ -664,20 +664,25 @@ func TestModelDescribe(t *testing.T) {
expectDescribe: `delete("key6") -> err: "failed"`,
},
{
req: txnRequest("key7", "7", "77"),
resp: txnResponse(false, 7),
req: compareAndSetRequest("key7", "7", "77"),
resp: compareAndSetResponse(false, 7),
expectDescribe: `if(key7=="7").then(put("key7", "77", nil)) -> txn failed, rev: 7`,
},
{
req: txnRequest("key8", "8", "88"),
resp: txnResponse(true, 8),
req: compareAndSetRequest("key8", "8", "88"),
resp: compareAndSetResponse(true, 8),
expectDescribe: `if(key8=="8").then(put("key8", "88", nil)) -> ok, rev: 8`,
},
{
req: txnRequest("key9", "9", "99"),
req: compareAndSetRequest("key9", "9", "99"),
resp: failedResponse(errors.New("failed")),
expectDescribe: `if(key9=="9").then(put("key9", "99", nil)) -> err: "failed"`,
},
{
req: txnRequest(nil, []EtcdOperation{{Type: Get, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}),
resp: txnResponse([]EtcdOperationResult{{Value: ValueOrHash{Value: "110"}}, {}, {Deleted: 1}}, true, 10),
expectDescribe: `get("10"), put("11", "111", nil), delete("12") -> "110", ok, deleted: 1, rev: 10`,
},
{
req: defragmentRequest(),
resp: defragmentResponse(),
@ -791,42 +796,42 @@ func TestModelResponseMatch(t *testing.T) {
expectMatch: false,
},
{
resp1: txnResponse(false, 7),
resp2: txnResponse(false, 7),
resp1: compareAndSetResponse(false, 7),
resp2: compareAndSetResponse(false, 7),
expectMatch: true,
},
{
resp1: txnResponse(true, 7),
resp2: txnResponse(false, 7),
resp1: compareAndSetResponse(true, 7),
resp2: compareAndSetResponse(false, 7),
expectMatch: false,
},
{
resp1: txnResponse(false, 7),
resp2: txnResponse(false, 8),
resp1: compareAndSetResponse(false, 7),
resp2: compareAndSetResponse(false, 8),
expectMatch: false,
},
{
resp1: txnResponse(false, 7),
resp1: compareAndSetResponse(false, 7),
resp2: failedResponse(errors.New("failed request")),
expectMatch: false,
},
{
resp1: txnResponse(true, 7),
resp1: compareAndSetResponse(true, 7),
resp2: unknownResponse(7),
expectMatch: true,
},
{
resp1: txnResponse(false, 7),
resp1: compareAndSetResponse(false, 7),
resp2: unknownResponse(7),
expectMatch: true,
},
{
resp1: txnResponse(true, 7),
resp1: compareAndSetResponse(true, 7),
resp2: unknownResponse(0),
expectMatch: false,
},
{
resp1: txnResponse(false, 7),
resp1: compareAndSetResponse(false, 7),
resp2: unknownResponse(0),
expectMatch: false,
},

View File

@ -24,12 +24,15 @@ import (
"golang.org/x/time/rate"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/linearizability/identity"
"go.etcd.io/etcd/tests/v3/linearizability/model"
)
var (
DefaultLeaseTTL int64 = 7200
RequestTimeout = 40 * time.Millisecond
DefaultLeaseTTL int64 = 7200
RequestTimeout = 40 * time.Millisecond
MultiOpTxnOpCount = 4
)
type TrafficRequestType string
@ -39,6 +42,7 @@ const (
Put TrafficRequestType = "put"
LargePut TrafficRequestType = "largePut"
Delete TrafficRequestType = "delete"
MultiOpTxn TrafficRequestType = "multiOpTxn"
PutWithLease TrafficRequestType = "putWithLease"
LeaseRevoke TrafficRequestType = "leaseRevoke"
CompareAndSet TrafficRequestType = "compareAndSet"
@ -75,8 +79,7 @@ func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limi
if err != nil {
continue
}
// Provide each write with unique id to make it easier to validate operation history.
t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), lm, clientId, resp)
t.Write(ctx, c, limiter, key, ids, lm, clientId, resp)
}
}
@ -90,23 +93,25 @@ func (t traffic) Read(ctx context.Context, c *recordingClient, limiter *rate.Lim
return resp, err
}
func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
var err error
switch t.pickWriteRequest() {
case Put:
err = c.Put(writeCtx, key, newValue)
err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.RequestId()))
case LargePut:
err = c.Put(writeCtx, key, randString(t.largePutSize))
case Delete:
err = c.Delete(writeCtx, key)
case MultiOpTxn:
err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id))
case CompareAndSet:
var expectValue string
if len(lastValues) != 0 {
expectValue = string(lastValues[0].Value)
}
err = c.CompareAndSet(writeCtx, key, expectValue, newValue)
err = c.CompareAndSet(writeCtx, key, expectValue, fmt.Sprintf("%d", id.RequestId()))
case PutWithLease:
leaseId := lm.LeaseId(cid)
if leaseId == 0 {
@ -118,7 +123,7 @@ func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Li
}
if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
err = c.PutWithLease(putCtx, key, newValue, leaseId)
err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.RequestId()), leaseId)
putCancel()
}
case LeaseRevoke:
@ -157,6 +162,50 @@ func (t traffic) pickWriteRequest() TrafficRequestType {
panic("unexpected")
}
func (t traffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) {
keys := rand.Perm(t.keyCount)
opTypes := make([]model.OperationType, 4)
atLeastOnePut := false
for i := 0; i < MultiOpTxnOpCount; i++ {
opTypes[i] = t.pickOperationType()
if opTypes[i] == model.Put {
atLeastOnePut = true
}
}
// Ensure at least one put to make operation unique
if !atLeastOnePut {
opTypes[0] = model.Put
}
for i, opType := range opTypes {
key := fmt.Sprintf("%d", keys[i])
switch opType {
case model.Get:
ops = append(ops, clientv3.OpGet(key))
case model.Put:
value := fmt.Sprintf("%d", ids.RequestId())
ops = append(ops, clientv3.OpPut(key, value))
case model.Delete:
ops = append(ops, clientv3.OpDelete(key))
default:
panic("unsuported operation type")
}
}
return ops
}
func (t traffic) pickOperationType() model.OperationType {
roll := rand.Int() % 100
if roll < 10 {
return model.Delete
}
if roll < 50 {
return model.Get
}
return model.Put
}
func randString(size int) string {
data := strings.Builder{}
data.Grow(size)