From 04ba936ea6506d4853067477300e9d759ded300d Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 8 Feb 2023 00:32:32 +0100 Subject: [PATCH] tests: Implement multi operation Txn Signed-off-by: Marek Siarkowicz --- tests/linearizability/client.go | 15 ++- tests/linearizability/linearizability_test.go | 10 +- tests/linearizability/model/history.go | 107 +++++++++++++++- tests/linearizability/model/model_test.go | 119 +++++++++--------- tests/linearizability/traffic.go | 65 ++++++++-- 5 files changed, 240 insertions(+), 76 deletions(-) diff --git a/tests/linearizability/client.go b/tests/linearizability/client.go index 5526eee08..ad95942c6 100644 --- a/tests/linearizability/client.go +++ b/tests/linearizability/client.go @@ -93,7 +93,20 @@ func (c *recordingClient) CompareAndSet(ctx context.Context, key, expectedValue, clientv3.OpPut(key, newValue), ).Commit() returnTime := time.Now() - c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err) + c.history.AppendCompareAndSet(key, expectedValue, newValue, callTime, returnTime, resp, err) + return err +} + +func (c *recordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error { + callTime := time.Now() + txn := c.client.Txn(ctx) + resp, err := txn.If( + cmp..., + ).Then( + ops..., + ).Commit() + returnTime := time.Now() + c.history.AppendTxn(cmp, ops, callTime, returnTime, resp, err) return err } diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index aa2791542..69ef54a4d 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -50,13 +50,14 @@ var ( maximalQPS: 200, clientCount: 8, traffic: traffic{ - keyCount: 4, + keyCount: 10, leaseTTL: DefaultLeaseTTL, largePutSize: 32769, writes: []requestChance{ - {operation: Put, chance: 50}, + {operation: Put, chance: 45}, {operation: LargePut, chance: 5}, {operation: Delete, chance: 10}, + {operation: MultiOpTxn, chance: 10}, {operation: PutWithLease, chance: 10}, {operation: LeaseRevoke, chance: 10}, {operation: CompareAndSet, chance: 10}, @@ -69,11 +70,12 @@ var ( maximalQPS: 1000, clientCount: 12, traffic: traffic{ - keyCount: 4, + keyCount: 10, largePutSize: 32769, leaseTTL: DefaultLeaseTTL, writes: []requestChance{ - {operation: Put, chance: 90}, + {operation: Put, chance: 85}, + {operation: MultiOpTxn, chance: 10}, {operation: LargePut, chance: 5}, }, }, diff --git a/tests/linearizability/model/history.go b/tests/linearizability/model/history.go index 2a7fe985e..ea5556072 100644 --- a/tests/linearizability/model/history.go +++ b/tests/linearizability/model/history.go @@ -15,10 +15,12 @@ package model import ( + "fmt" "time" "github.com/anishathalye/porcupine" + "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/linearizability/identity" ) @@ -161,8 +163,8 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Time, resp }) } -func (h *AppendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) { - request := txnRequest(key, expectValue, newValue) +func (h *AppendableHistory) AppendCompareAndSet(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) { + request := compareAndSetRequest(key, expectValue, newValue) if err != nil { h.appendFailed(request, start, err) return @@ -175,11 +177,96 @@ func (h *AppendableHistory) AppendTxn(key, expectValue, newValue string, start, ClientId: h.id, Input: request, Call: start.UnixNano(), - Output: txnResponse(resp.Succeeded, revision), + Output: compareAndSetResponse(resp.Succeeded, revision), Return: end.UnixNano(), }) } +func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.Op, start, end time.Time, resp *clientv3.TxnResponse, err error) { + conds := []EtcdCondition{} + for _, cmp := range cmp { + conds = append(conds, toEtcdCondition(cmp)) + } + ops := []EtcdOperation{} + for _, op := range onSuccess { + ops = append(ops, toEtcdOperation(op)) + } + request := txnRequest(conds, ops) + if err != nil { + h.appendFailed(request, start, err) + return + } + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } + results := []EtcdOperationResult{} + for _, resp := range resp.Responses { + results = append(results, toEtcdOperationResult(resp)) + } + h.successful = append(h.successful, porcupine.Operation{ + ClientId: h.id, + Input: request, + Call: start.UnixNano(), + Output: txnResponse(results, resp.Succeeded, revision), + Return: end.UnixNano(), + }) +} + +func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) { + switch { + case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_VALUE: + cond.Key = string(cmp.KeyBytes()) + cond.ExpectedValue = ToValueOrHash(string(cmp.ValueBytes())) + case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_CREATE: + cond.Key = string(cmp.KeyBytes()) + default: + panic(fmt.Sprintf("Compare not supported, target: %q, result: %q", cmp.Target, cmp.Result)) + } + return cond +} + +func toEtcdOperation(op clientv3.Op) EtcdOperation { + var opType OperationType + switch { + case op.IsGet(): + opType = Get + case op.IsPut(): + opType = Put + case op.IsDelete(): + opType = Delete + default: + panic("Unsupported operation") + } + return EtcdOperation{ + Type: opType, + Key: string(op.KeyBytes()), + Value: ValueOrHash{Value: string(op.ValueBytes())}, + } +} + +func toEtcdOperationResult(resp *etcdserverpb.ResponseOp) EtcdOperationResult { + switch { + case resp.GetResponseRange() != nil: + getResp := resp.GetResponseRange() + var val string + if len(getResp.Kvs) != 0 { + val = string(getResp.Kvs[0].Value) + } + return EtcdOperationResult{ + Value: ToValueOrHash(val), + } + case resp.GetResponsePut() != nil: + return EtcdOperationResult{} + case resp.GetResponseDeleteRange() != nil: + return EtcdOperationResult{ + Deleted: resp.GetResponseDeleteRange().Deleted, + } + default: + panic("Unsupported operation") + } +} + func (h *AppendableHistory) AppendDefragment(start, end time.Time, resp *clientv3.DefragmentResponse, err error) { request := defragmentRequest() if err != nil { @@ -240,15 +327,23 @@ func deleteResponse(deleted int64, revision int64) EtcdResponse { return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision} } -func txnRequest(key, expectValue, newValue string) EtcdRequest { - return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: []EtcdCondition{{Key: key, ExpectedValue: ToValueOrHash(expectValue)}}, Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(newValue)}}}} +func compareAndSetRequest(key, expectValue, newValue string) EtcdRequest { + return txnRequest([]EtcdCondition{{Key: key, ExpectedValue: ToValueOrHash(expectValue)}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(newValue)}}) } -func txnResponse(succeeded bool, revision int64) EtcdResponse { +func compareAndSetResponse(succeeded bool, revision int64) EtcdResponse { var result []EtcdOperationResult if succeeded { result = []EtcdOperationResult{{}} } + return txnResponse(result, succeeded, revision) +} + +func txnRequest(conds []EtcdCondition, onSuccess []EtcdOperation) EtcdRequest { + return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: conds, Ops: onSuccess}} +} + +func txnResponse(result []EtcdOperationResult, succeeded bool, revision int64) EtcdResponse { return EtcdResponse{Txn: &TxnResponse{OpsResult: result, TxnResult: !succeeded}, Revision: revision} } diff --git a/tests/linearizability/model/model_test.go b/tests/linearizability/model/model_test.go index d6800263c..91946358c 100644 --- a/tests/linearizability/model/model_test.go +++ b/tests/linearizability/model/model_test.go @@ -47,7 +47,7 @@ func TestModelStep(t *testing.T) { { name: "First Txn can start from non-zero revision", operations: []testOperation{ - {req: txnRequest("key", "", "42"), resp: txnResponse(false, 42)}, + {req: compareAndSetRequest("key", "", "42"), resp: compareAndSetResponse(false, 42)}, }, }, { @@ -118,11 +118,11 @@ func TestModelStep(t *testing.T) { // Txn failure {req: getRequest("key"), resp: getResponse("", 1)}, {req: putRequest("key", "1"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "2", "3"), resp: txnResponse(false, 1)}, + {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 1)}, // Txn success {req: putRequest("key", "2"), resp: putResponse(2)}, {req: putRequest("key", "4"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "2", "5"), resp: txnResponse(true, 3)}, + {req: compareAndSetRequest("key", "2", "5"), resp: compareAndSetResponse(true, 3)}, }, }, { @@ -171,11 +171,11 @@ func TestModelStep(t *testing.T) { // Txn success {req: getRequest("key"), resp: getResponse("", 1)}, {req: putRequest("key", "2"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "2", ""), resp: txnResponse(true, 2), failure: true}, - {req: txnRequest("key", "2", ""), resp: txnResponse(true, 3)}, + {req: compareAndSetRequest("key", "2", ""), resp: compareAndSetResponse(true, 2), failure: true}, + {req: compareAndSetRequest("key", "2", ""), resp: compareAndSetResponse(true, 3)}, // Txn failure {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: txnRequest("key", "5", ""), resp: txnResponse(false, 4)}, + {req: compareAndSetRequest("key", "5", ""), resp: compareAndSetResponse(false, 4)}, {req: putRequest("key", "5"), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("5", 5)}, }, @@ -282,21 +282,21 @@ func TestModelStep(t *testing.T) { // Txn success {req: getRequest("key"), resp: getResponse("1", 1)}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "", "3"), resp: txnResponse(true, 3)}, + {req: compareAndSetRequest("key", "", "3"), resp: compareAndSetResponse(true, 3)}, // Txn failure {req: putRequest("key", "4"), resp: putResponse(4)}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "4", "5"), resp: txnResponse(false, 5)}, + {req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(false, 5)}, }, }, { name: "Txn sets new value if value matches expected", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: txnRequest("key", "1", "2"), resp: txnResponse(true, 1), failure: true}, - {req: txnRequest("key", "1", "2"), resp: txnResponse(false, 2), failure: true}, - {req: txnRequest("key", "1", "2"), resp: txnResponse(false, 1), failure: true}, - {req: txnRequest("key", "1", "2"), resp: txnResponse(true, 2)}, + {req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(true, 1), failure: true}, + {req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(false, 2), failure: true}, + {req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(false, 1), failure: true}, + {req: compareAndSetRequest("key", "1", "2"), resp: compareAndSetResponse(true, 2)}, {req: getRequest("key"), resp: getResponse("1", 1), failure: true}, {req: getRequest("key"), resp: getResponse("1", 2), failure: true}, {req: getRequest("key"), resp: getResponse("2", 1), failure: true}, @@ -307,19 +307,19 @@ func TestModelStep(t *testing.T) { name: "Txn can expect on empty key", operations: []testOperation{ {req: getRequest("key1"), resp: getResponse("", 1)}, - {req: txnRequest("key1", "", "2"), resp: txnResponse(true, 2)}, - {req: txnRequest("key2", "", "3"), resp: txnResponse(true, 3)}, - {req: txnRequest("key3", "4", "4"), resp: txnResponse(false, 4), failure: true}, + {req: compareAndSetRequest("key1", "", "2"), resp: compareAndSetResponse(true, 2)}, + {req: compareAndSetRequest("key2", "", "3"), resp: compareAndSetResponse(true, 3)}, + {req: compareAndSetRequest("key3", "4", "4"), resp: compareAndSetResponse(false, 4), failure: true}, }, }, { name: "Txn doesn't do anything if value doesn't match expected", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: txnRequest("key", "2", "3"), resp: txnResponse(true, 2), failure: true}, - {req: txnRequest("key", "2", "3"), resp: txnResponse(true, 1), failure: true}, - {req: txnRequest("key", "2", "3"), resp: txnResponse(false, 2), failure: true}, - {req: txnRequest("key", "2", "3"), resp: txnResponse(false, 1)}, + {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 2), failure: true}, + {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 1), failure: true}, + {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 2), failure: true}, + {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(false, 1)}, {req: getRequest("key"), resp: getResponse("2", 1), failure: true}, {req: getRequest("key"), resp: getResponse("2", 2), failure: true}, {req: getRequest("key"), resp: getResponse("3", 1), failure: true}, @@ -331,7 +331,7 @@ func TestModelStep(t *testing.T) { name: "Txn can fail and be lost before get", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("1", 1)}, {req: getRequest("key"), resp: getResponse("2", 2), failure: true}, }, @@ -340,7 +340,7 @@ func TestModelStep(t *testing.T) { name: "Txn can fail and be lost before delete", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 2)}, }, }, @@ -348,7 +348,7 @@ func TestModelStep(t *testing.T) { name: "Txn can fail and be lost before put", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "3"), resp: putResponse(2)}, }, }, @@ -357,13 +357,13 @@ func TestModelStep(t *testing.T) { operations: []testOperation{ // One failed request, one persisted. {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("2", 1), failure: true}, {req: getRequest("key"), resp: getResponse("2", 2)}, // Two failed request, two persisted. {req: putRequest("key", "3"), resp: putResponse(3)}, - {req: txnRequest("key", "3", "4"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "3", "4"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("5", 5)}, }, }, @@ -372,12 +372,12 @@ func TestModelStep(t *testing.T) { operations: []testOperation{ // One failed request, one persisted. {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "3"), resp: putResponse(3)}, // Two failed request, two persisted. {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: txnRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "7"), resp: putResponse(7)}, }, }, @@ -386,12 +386,12 @@ func TestModelStep(t *testing.T) { operations: []testOperation{ // One failed request, one persisted. {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 3)}, // Two failed request, two persisted. {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: txnRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 7)}, }, }, @@ -400,17 +400,17 @@ func TestModelStep(t *testing.T) { operations: []testOperation{ // One failed request, one persisted with success. {req: getRequest("key"), resp: getResponse("1", 1)}, - {req: txnRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "2", "3"), resp: txnResponse(true, 3)}, + {req: compareAndSetRequest("key", "1", "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "2", "3"), resp: compareAndSetResponse(true, 3)}, // Two failed request, two persisted with success. {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: txnRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "6", "7"), resp: txnResponse(true, 7)}, + {req: compareAndSetRequest("key", "4", "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "5", "6"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "6", "7"), resp: compareAndSetResponse(true, 7)}, // One failed request, one persisted with failure. {req: putRequest("key", "8"), resp: putResponse(8)}, - {req: txnRequest("key", "8", "9"), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "8", "10"), resp: txnResponse(false, 9)}, + {req: compareAndSetRequest("key", "8", "9"), resp: failedResponse(errors.New("failed"))}, + {req: compareAndSetRequest("key", "8", "10"), resp: compareAndSetResponse(false, 9)}, }, }, { @@ -537,7 +537,7 @@ func TestModelStep(t *testing.T) { {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, {req: putRequest("key", "4"), resp: putResponse(4)}, {req: getRequest("key"), resp: getResponse("4", 4)}, - {req: txnRequest("key", "4", "5"), resp: txnResponse(true, 5)}, + {req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)}, {req: deleteRequest("key"), resp: deleteResponse(1, 6)}, {req: defragmentRequest(), resp: defragmentResponse()}, }, @@ -556,7 +556,7 @@ func TestModelStep(t *testing.T) { {req: defragmentRequest(), resp: defragmentResponse()}, {req: getRequest("key"), resp: getResponse("4", 4)}, {req: defragmentRequest(), resp: defragmentResponse()}, - {req: txnRequest("key", "4", "5"), resp: txnResponse(true, 5)}, + {req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)}, {req: defragmentRequest(), resp: defragmentResponse()}, {req: deleteRequest("key"), resp: deleteResponse(1, 6)}, {req: defragmentRequest(), resp: defragmentResponse()}, @@ -576,7 +576,7 @@ func TestModelStep(t *testing.T) { {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("4", 4)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, - {req: txnRequest("key", "4", "5"), resp: txnResponse(true, 5)}, + {req: compareAndSetRequest("key", "4", "5"), resp: compareAndSetResponse(true, 5)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 6)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, @@ -664,20 +664,25 @@ func TestModelDescribe(t *testing.T) { expectDescribe: `delete("key6") -> err: "failed"`, }, { - req: txnRequest("key7", "7", "77"), - resp: txnResponse(false, 7), + req: compareAndSetRequest("key7", "7", "77"), + resp: compareAndSetResponse(false, 7), expectDescribe: `if(key7=="7").then(put("key7", "77", nil)) -> txn failed, rev: 7`, }, { - req: txnRequest("key8", "8", "88"), - resp: txnResponse(true, 8), + req: compareAndSetRequest("key8", "8", "88"), + resp: compareAndSetResponse(true, 8), expectDescribe: `if(key8=="8").then(put("key8", "88", nil)) -> ok, rev: 8`, }, { - req: txnRequest("key9", "9", "99"), + req: compareAndSetRequest("key9", "9", "99"), resp: failedResponse(errors.New("failed")), expectDescribe: `if(key9=="9").then(put("key9", "99", nil)) -> err: "failed"`, }, + { + req: txnRequest(nil, []EtcdOperation{{Type: Get, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}), + resp: txnResponse([]EtcdOperationResult{{Value: ValueOrHash{Value: "110"}}, {}, {Deleted: 1}}, true, 10), + expectDescribe: `get("10"), put("11", "111", nil), delete("12") -> "110", ok, deleted: 1, rev: 10`, + }, { req: defragmentRequest(), resp: defragmentResponse(), @@ -791,42 +796,42 @@ func TestModelResponseMatch(t *testing.T) { expectMatch: false, }, { - resp1: txnResponse(false, 7), - resp2: txnResponse(false, 7), + resp1: compareAndSetResponse(false, 7), + resp2: compareAndSetResponse(false, 7), expectMatch: true, }, { - resp1: txnResponse(true, 7), - resp2: txnResponse(false, 7), + resp1: compareAndSetResponse(true, 7), + resp2: compareAndSetResponse(false, 7), expectMatch: false, }, { - resp1: txnResponse(false, 7), - resp2: txnResponse(false, 8), + resp1: compareAndSetResponse(false, 7), + resp2: compareAndSetResponse(false, 8), expectMatch: false, }, { - resp1: txnResponse(false, 7), + resp1: compareAndSetResponse(false, 7), resp2: failedResponse(errors.New("failed request")), expectMatch: false, }, { - resp1: txnResponse(true, 7), + resp1: compareAndSetResponse(true, 7), resp2: unknownResponse(7), expectMatch: true, }, { - resp1: txnResponse(false, 7), + resp1: compareAndSetResponse(false, 7), resp2: unknownResponse(7), expectMatch: true, }, { - resp1: txnResponse(true, 7), + resp1: compareAndSetResponse(true, 7), resp2: unknownResponse(0), expectMatch: false, }, { - resp1: txnResponse(false, 7), + resp1: compareAndSetResponse(false, 7), resp2: unknownResponse(0), expectMatch: false, }, diff --git a/tests/linearizability/traffic.go b/tests/linearizability/traffic.go index ed41305c8..ba56834c2 100644 --- a/tests/linearizability/traffic.go +++ b/tests/linearizability/traffic.go @@ -24,12 +24,15 @@ import ( "golang.org/x/time/rate" "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/linearizability/identity" + "go.etcd.io/etcd/tests/v3/linearizability/model" ) var ( - DefaultLeaseTTL int64 = 7200 - RequestTimeout = 40 * time.Millisecond + DefaultLeaseTTL int64 = 7200 + RequestTimeout = 40 * time.Millisecond + MultiOpTxnOpCount = 4 ) type TrafficRequestType string @@ -39,6 +42,7 @@ const ( Put TrafficRequestType = "put" LargePut TrafficRequestType = "largePut" Delete TrafficRequestType = "delete" + MultiOpTxn TrafficRequestType = "multiOpTxn" PutWithLease TrafficRequestType = "putWithLease" LeaseRevoke TrafficRequestType = "leaseRevoke" CompareAndSet TrafficRequestType = "compareAndSet" @@ -75,8 +79,7 @@ func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limi if err != nil { continue } - // Provide each write with unique id to make it easier to validate operation history. - t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), lm, clientId, resp) + t.Write(ctx, c, limiter, key, ids, lm, clientId, resp) } } @@ -90,23 +93,25 @@ func (t traffic) Read(ctx context.Context, c *recordingClient, limiter *rate.Lim return resp, err } -func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error { +func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error { writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) var err error switch t.pickWriteRequest() { case Put: - err = c.Put(writeCtx, key, newValue) + err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.RequestId())) case LargePut: err = c.Put(writeCtx, key, randString(t.largePutSize)) case Delete: err = c.Delete(writeCtx, key) + case MultiOpTxn: + err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id)) case CompareAndSet: var expectValue string if len(lastValues) != 0 { expectValue = string(lastValues[0].Value) } - err = c.CompareAndSet(writeCtx, key, expectValue, newValue) + err = c.CompareAndSet(writeCtx, key, expectValue, fmt.Sprintf("%d", id.RequestId())) case PutWithLease: leaseId := lm.LeaseId(cid) if leaseId == 0 { @@ -118,7 +123,7 @@ func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Li } if leaseId != 0 { putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout) - err = c.PutWithLease(putCtx, key, newValue, leaseId) + err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.RequestId()), leaseId) putCancel() } case LeaseRevoke: @@ -157,6 +162,50 @@ func (t traffic) pickWriteRequest() TrafficRequestType { panic("unexpected") } +func (t traffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { + keys := rand.Perm(t.keyCount) + opTypes := make([]model.OperationType, 4) + + atLeastOnePut := false + for i := 0; i < MultiOpTxnOpCount; i++ { + opTypes[i] = t.pickOperationType() + if opTypes[i] == model.Put { + atLeastOnePut = true + } + } + // Ensure at least one put to make operation unique + if !atLeastOnePut { + opTypes[0] = model.Put + } + + for i, opType := range opTypes { + key := fmt.Sprintf("%d", keys[i]) + switch opType { + case model.Get: + ops = append(ops, clientv3.OpGet(key)) + case model.Put: + value := fmt.Sprintf("%d", ids.RequestId()) + ops = append(ops, clientv3.OpPut(key, value)) + case model.Delete: + ops = append(ops, clientv3.OpDelete(key)) + default: + panic("unsuported operation type") + } + } + return ops +} + +func (t traffic) pickOperationType() model.OperationType { + roll := rand.Int() % 100 + if roll < 10 { + return model.Delete + } + if roll < 50 { + return model.Get + } + return model.Put +} + func randString(size int) string { data := strings.Builder{} data.Grow(size)