Merge pull request #14903 from serathius/linearizability-txn

tests: Add Txn operation to linearizability tests
This commit is contained in:
Marek Siarkowicz 2022-12-09 09:38:08 +01:00 committed by GitHub
commit ebca62cc81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 271 additions and 25 deletions

View File

@ -18,6 +18,7 @@ import (
"context"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
@ -47,15 +48,15 @@ func (c *recordingClient) Close() error {
return c.client.Close()
}
func (c *recordingClient) Get(ctx context.Context, key string) error {
func (c *recordingClient) Get(ctx context.Context, key string) ([]*mvccpb.KeyValue, error) {
callTime := time.Now()
resp, err := c.client.Get(ctx, key)
returnTime := time.Now()
if err != nil {
return err
return nil, err
}
c.history.AppendGet(key, callTime, returnTime, resp)
return nil
return resp.Kvs, nil
}
func (c *recordingClient) Put(ctx context.Context, key, value string) error {
@ -73,3 +74,22 @@ func (c *recordingClient) Delete(ctx context.Context, key string) error {
c.history.AppendDelete(key, callTime, returnTime, resp, err)
return nil
}
func (c *recordingClient) Txn(ctx context.Context, key, expectedValue, newValue string) error {
callTime := time.Now()
txn := c.client.Txn(ctx)
var cmp clientv3.Cmp
if expectedValue == "" {
cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
} else {
cmp = clientv3.Compare(clientv3.Value(key), "=", expectedValue)
}
resp, err := txn.If(
cmp,
).Then(
clientv3.OpPut(key, newValue),
).Commit()
returnTime := time.Now()
c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err)
return err
}

View File

@ -94,6 +94,25 @@ func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp
})
}
func (h *appendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) {
request := EtcdRequest{Op: Txn, Key: key, TxnExpectData: expectValue, TxnNewData: newValue}
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: EtcdResponse{Err: err, Revision: revision, TxnSucceeded: resp.Succeeded},
Return: end.UnixNano(),
})
}
func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) {
h.failed = append(h.failed, porcupine.Operation{
ClientId: h.id,

View File

@ -27,19 +27,23 @@ const (
Get Operation = "get"
Put Operation = "put"
Delete Operation = "delete"
Txn Operation = "txn"
)
type EtcdRequest struct {
Op Operation
Key string
PutData string
Op Operation
Key string
PutData string
TxnExpectData string
TxnNewData string
}
type EtcdResponse struct {
GetData string
Revision int64
Deleted int64
Err error
GetData string
Revision int64
Deleted int64
TxnSucceeded bool
Err error
}
type EtcdState struct {
@ -89,6 +93,12 @@ var etcdModel = porcupine.Model{
} else {
return fmt.Sprintf("delete(%q) -> ok, rev: %d deleted:%d", request.Key, response.Revision, response.Deleted)
}
case Txn:
if response.Err != nil {
return fmt.Sprintf("txn(if(value(%q)=%q).then(put(%q, %q)) -> %s", request.Key, request.TxnExpectData, request.Key, request.TxnNewData, response.Err)
} else {
return fmt.Sprintf("txn(if(value(%q)=%q).then(put(%q, %q)) -> %v, rev: %d", request.Key, request.TxnExpectData, request.Key, request.TxnNewData, response.TxnSucceeded, response.Revision)
}
default:
return "<invalid>"
}
@ -145,6 +155,14 @@ func initValueRevision(request EtcdRequest, response EtcdResponse) (ok bool, v V
Value: "",
Revision: response.Revision,
}
case Txn:
if response.TxnSucceeded {
return true, ValueRevision{
Value: request.TxnNewData,
Revision: response.Revision,
}
}
return false, ValueRevision{}
default:
panic("Unknown operation")
}
@ -164,6 +182,12 @@ func stepValue(v ValueRevision, request EtcdRequest) (ValueRevision, EtcdRespons
v.Revision += 1
resp.Deleted = 1
}
case Txn:
if v.Value == request.TxnExpectData {
v.Value = request.TxnNewData
v.Revision += 1
resp.TxnSucceeded = true
}
default:
panic("unsupported operation")
}

View File

@ -42,6 +42,12 @@ func TestModel(t *testing.T) {
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 42}},
},
},
{
name: "First Txn can start from non-zero revision",
operations: []testOperation{
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "", TxnNewData: "42"}, resp: EtcdResponse{Revision: 42}},
},
},
{
name: "Get response data should match put",
operations: []testOperation{
@ -70,7 +76,7 @@ func TestModel(t *testing.T) {
},
},
{
name: "Put can fail and be lost",
name: "Put can fail and be lost before get",
operations: []testOperation{
{req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
@ -81,18 +87,38 @@ func TestModel(t *testing.T) {
},
},
{
name: "Put can fail but be persisted and increase revision before put",
name: "Put can fail and be lost before put",
operations: []testOperation{
// One failed request, one persisted.
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}},
// Two failed request, two persisted.
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "6"}, resp: EtcdResponse{Revision: 6}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 2}},
},
},
{
name: "Put can fail and be lost before delete",
operations: []testOperation{
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 1}},
},
},
{
name: "Put can fail and be lost before txn failed",
operations: []testOperation{
// Txn failure
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 1}},
// Txn success
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Revision: 2}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 3}},
},
},
{
name: "Put can fail and be lost before txn success",
operations: []testOperation{},
},
{
name: "Put can fail but be persisted and increase revision before get",
operations: []testOperation{
@ -129,6 +155,21 @@ func TestModel(t *testing.T) {
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 10}},
},
},
{
name: "Put can fail but be persisted before txn",
operations: []testOperation{
// Txn success
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 2}, failure: true},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 3}},
// Txn failure
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5"}, resp: EtcdResponse{Revision: 4}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 5, GetData: "5"}},
},
},
{
name: "Delete only increases revision on success",
operations: []testOperation{
@ -216,6 +257,141 @@ func TestModel(t *testing.T) {
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 4}},
},
},
{
name: "Delete can fail but be persisted before txn",
operations: []testOperation{
// Txn success
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "", TxnNewData: "1"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 3}},
// Txn failure
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{TxnSucceeded: false, Revision: 5}},
},
},
{
name: "Txn sets new value if value matches expected",
operations: []testOperation{
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: true}, failure: true},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: false}, failure: true},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: false}, failure: true},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: true}},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}, failure: true},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 2}, failure: true},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 1}, failure: true},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 2}},
},
},
{
name: "Txn can expect on empty key",
operations: []testOperation{
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "", TxnNewData: "2"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: true}},
},
},
{
name: "Txn doesn't do anything if value doesn't match expected",
operations: []testOperation{
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: true}, failure: true},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: true}, failure: true},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: false}, failure: true},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: false}},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 1}, failure: true},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 2}, failure: true},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "3", Revision: 1}, failure: true},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "3", Revision: 2}, failure: true},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
},
},
{
name: "Txn can fail and be lost before get",
operations: []testOperation{
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2, GetData: "2"}, failure: true},
},
},
{
name: "Txn can fail and be lost before delete",
operations: []testOperation{
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 2}},
},
},
{
name: "Txn can fail and be lost before put",
operations: []testOperation{
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 2}},
},
},
{
name: "Txn can fail but be persisted before get",
operations: []testOperation{
// One failed request, one persisted.
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1, GetData: "2"}, failure: true},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2, GetData: "2"}},
// Two failed request, two persisted.
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "3", TxnNewData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 5, GetData: "5"}},
},
},
{
name: "Txn can fail but be persisted before put",
operations: []testOperation{
// One failed request, one persisted.
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}},
// Two failed request, two persisted.
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5", TxnNewData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "7"}, resp: EtcdResponse{Revision: 7}},
},
},
{
name: "Txn can fail but be persisted before delete",
operations: []testOperation{
// One failed request, one persisted.
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 3}},
// Two failed request, two persisted.
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5", TxnNewData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 7}},
},
},
{
name: "Txn can fail but be persisted before txn",
operations: []testOperation{
// One failed request, one persisted with success.
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 3, TxnSucceeded: true}},
// Two failed request, two persisted with success.
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5", TxnNewData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "6", TxnNewData: "7"}, resp: EtcdResponse{Revision: 7, TxnSucceeded: true}},
// One failed request, one persisted with failure.
{req: EtcdRequest{Op: Put, Key: "key", PutData: "8"}, resp: EtcdResponse{Revision: 8}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "8", TxnNewData: "9"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "8", TxnNewData: "10"}, resp: EtcdResponse{Revision: 9}},
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {

View File

@ -20,11 +20,12 @@ import (
"math/rand"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
"golang.org/x/time/rate"
)
var (
DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 90}, {operation: Delete, chance: 10}}}
DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 90}, {operation: Delete, chance: 5}, {operation: Txn, chance: 5}}}
)
type Traffic interface {
@ -50,26 +51,26 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter
default:
}
// Execute one read per one write to avoid operation history include too many failed writes when etcd is down.
err := t.Read(ctx, c, limiter)
resp, err := t.Read(ctx, c, limiter)
if err != nil {
continue
}
// Provide each write with unique id to make it easier to validate operation history.
t.Write(ctx, c, limiter, ids.RequestId())
t.Write(ctx, c, limiter, ids.RequestId(), resp)
}
}
func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) error {
func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) ([]*mvccpb.KeyValue, error) {
getCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
err := c.Get(getCtx, t.key)
resp, err := c.Get(getCtx, t.key)
cancel()
if err == nil {
limiter.Wait(ctx)
}
return err
return resp, err
}
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id int) error {
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id int, kvs []*mvccpb.KeyValue) error {
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
var err error
@ -78,6 +79,12 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit
err = c.Put(putCtx, t.key, fmt.Sprintf("%d", id))
case Delete:
err = c.Delete(putCtx, t.key)
case Txn:
var value string
if len(kvs) != 0 {
value = string(kvs[0].Value)
}
err = c.Txn(putCtx, t.key, value, fmt.Sprintf("%d", id))
default:
panic("invalid operation")
}