mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
tests: Add Txn operation to linearizability tests
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
dc9e422e28
commit
26bf2f81f5
@ -18,6 +18,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@ -47,15 +48,15 @@ func (c *recordingClient) Close() error {
|
|||||||
return c.client.Close()
|
return c.client.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *recordingClient) Get(ctx context.Context, key string) error {
|
func (c *recordingClient) Get(ctx context.Context, key string) ([]*mvccpb.KeyValue, error) {
|
||||||
callTime := time.Now()
|
callTime := time.Now()
|
||||||
resp, err := c.client.Get(ctx, key)
|
resp, err := c.client.Get(ctx, key)
|
||||||
returnTime := time.Now()
|
returnTime := time.Now()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
c.history.AppendGet(key, callTime, returnTime, resp)
|
c.history.AppendGet(key, callTime, returnTime, resp)
|
||||||
return nil
|
return resp.Kvs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *recordingClient) Put(ctx context.Context, key, value string) error {
|
func (c *recordingClient) Put(ctx context.Context, key, value string) error {
|
||||||
@ -73,3 +74,22 @@ func (c *recordingClient) Delete(ctx context.Context, key string) error {
|
|||||||
c.history.AppendDelete(key, callTime, returnTime, resp, err)
|
c.history.AppendDelete(key, callTime, returnTime, resp, err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *recordingClient) Txn(ctx context.Context, key, expectedValue, newValue string) error {
|
||||||
|
callTime := time.Now()
|
||||||
|
txn := c.client.Txn(ctx)
|
||||||
|
var cmp clientv3.Cmp
|
||||||
|
if expectedValue == "" {
|
||||||
|
cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
|
||||||
|
} else {
|
||||||
|
cmp = clientv3.Compare(clientv3.Value(key), "=", expectedValue)
|
||||||
|
}
|
||||||
|
resp, err := txn.If(
|
||||||
|
cmp,
|
||||||
|
).Then(
|
||||||
|
clientv3.OpPut(key, newValue),
|
||||||
|
).Commit()
|
||||||
|
returnTime := time.Now()
|
||||||
|
c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
@ -94,6 +94,25 @@ func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *appendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) {
|
||||||
|
request := EtcdRequest{Op: Txn, Key: key, TxnExpectData: expectValue, TxnNewData: newValue}
|
||||||
|
if err != nil {
|
||||||
|
h.appendFailed(request, start, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var revision int64
|
||||||
|
if resp != nil && resp.Header != nil {
|
||||||
|
revision = resp.Header.Revision
|
||||||
|
}
|
||||||
|
h.successful = append(h.successful, porcupine.Operation{
|
||||||
|
ClientId: h.id,
|
||||||
|
Input: request,
|
||||||
|
Call: start.UnixNano(),
|
||||||
|
Output: EtcdResponse{Err: err, Revision: revision, TxnSucceeded: resp.Succeeded},
|
||||||
|
Return: end.UnixNano(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) {
|
func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) {
|
||||||
h.failed = append(h.failed, porcupine.Operation{
|
h.failed = append(h.failed, porcupine.Operation{
|
||||||
ClientId: h.id,
|
ClientId: h.id,
|
||||||
|
@ -27,19 +27,23 @@ const (
|
|||||||
Get Operation = "get"
|
Get Operation = "get"
|
||||||
Put Operation = "put"
|
Put Operation = "put"
|
||||||
Delete Operation = "delete"
|
Delete Operation = "delete"
|
||||||
|
Txn Operation = "txn"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EtcdRequest struct {
|
type EtcdRequest struct {
|
||||||
Op Operation
|
Op Operation
|
||||||
Key string
|
Key string
|
||||||
PutData string
|
PutData string
|
||||||
|
TxnExpectData string
|
||||||
|
TxnNewData string
|
||||||
}
|
}
|
||||||
|
|
||||||
type EtcdResponse struct {
|
type EtcdResponse struct {
|
||||||
GetData string
|
GetData string
|
||||||
Revision int64
|
Revision int64
|
||||||
Deleted int64
|
Deleted int64
|
||||||
Err error
|
TxnSucceeded bool
|
||||||
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
type EtcdState struct {
|
type EtcdState struct {
|
||||||
@ -89,6 +93,12 @@ var etcdModel = porcupine.Model{
|
|||||||
} else {
|
} else {
|
||||||
return fmt.Sprintf("delete(%q) -> ok, rev: %d deleted:%d", request.Key, response.Revision, response.Deleted)
|
return fmt.Sprintf("delete(%q) -> ok, rev: %d deleted:%d", request.Key, response.Revision, response.Deleted)
|
||||||
}
|
}
|
||||||
|
case Txn:
|
||||||
|
if response.Err != nil {
|
||||||
|
return fmt.Sprintf("txn(if(value(%q)=%q).then(put(%q, %q)) -> %s", request.Key, request.TxnExpectData, request.Key, request.TxnNewData, response.Err)
|
||||||
|
} else {
|
||||||
|
return fmt.Sprintf("txn(if(value(%q)=%q).then(put(%q, %q)) -> %v, rev: %d", request.Key, request.TxnExpectData, request.Key, request.TxnNewData, response.TxnSucceeded, response.Revision)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return "<invalid>"
|
return "<invalid>"
|
||||||
}
|
}
|
||||||
@ -145,6 +155,14 @@ func initValueRevision(request EtcdRequest, response EtcdResponse) (ok bool, v V
|
|||||||
Value: "",
|
Value: "",
|
||||||
Revision: response.Revision,
|
Revision: response.Revision,
|
||||||
}
|
}
|
||||||
|
case Txn:
|
||||||
|
if response.TxnSucceeded {
|
||||||
|
return true, ValueRevision{
|
||||||
|
Value: request.TxnNewData,
|
||||||
|
Revision: response.Revision,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, ValueRevision{}
|
||||||
default:
|
default:
|
||||||
panic("Unknown operation")
|
panic("Unknown operation")
|
||||||
}
|
}
|
||||||
@ -164,6 +182,12 @@ func stepValue(v ValueRevision, request EtcdRequest) (ValueRevision, EtcdRespons
|
|||||||
v.Revision += 1
|
v.Revision += 1
|
||||||
resp.Deleted = 1
|
resp.Deleted = 1
|
||||||
}
|
}
|
||||||
|
case Txn:
|
||||||
|
if v.Value == request.TxnExpectData {
|
||||||
|
v.Value = request.TxnNewData
|
||||||
|
v.Revision += 1
|
||||||
|
resp.TxnSucceeded = true
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
panic("unsupported operation")
|
panic("unsupported operation")
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,12 @@ func TestModel(t *testing.T) {
|
|||||||
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 42}},
|
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 42}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "First Txn can start from non-zero revision",
|
||||||
|
operations: []testOperation{
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "", TxnNewData: "42"}, resp: EtcdResponse{Revision: 42}},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "Get response data should match put",
|
name: "Get response data should match put",
|
||||||
operations: []testOperation{
|
operations: []testOperation{
|
||||||
@ -70,7 +76,7 @@ func TestModel(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Put can fail and be lost",
|
name: "Put can fail and be lost before get",
|
||||||
operations: []testOperation{
|
operations: []testOperation{
|
||||||
{req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}},
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}},
|
||||||
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
@ -81,18 +87,38 @@ func TestModel(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Put can fail but be persisted and increase revision before put",
|
name: "Put can fail and be lost before put",
|
||||||
operations: []testOperation{
|
operations: []testOperation{
|
||||||
// One failed request, one persisted.
|
|
||||||
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
|
||||||
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}},
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 2}},
|
||||||
// Two failed request, two persisted.
|
|
||||||
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
|
||||||
{req: EtcdRequest{Op: Put, Key: "key", PutData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
|
||||||
{req: EtcdRequest{Op: Put, Key: "key", PutData: "6"}, resp: EtcdResponse{Revision: 6}},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "Put can fail and be lost before delete",
|
||||||
|
operations: []testOperation{
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 1}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Put can fail and be lost before txn failed",
|
||||||
|
operations: []testOperation{
|
||||||
|
// Txn failure
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 1}},
|
||||||
|
// Txn success
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Revision: 2}},
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 3}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Put can fail and be lost before txn success",
|
||||||
|
operations: []testOperation{},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "Put can fail but be persisted and increase revision before get",
|
name: "Put can fail but be persisted and increase revision before get",
|
||||||
operations: []testOperation{
|
operations: []testOperation{
|
||||||
@ -129,6 +155,21 @@ func TestModel(t *testing.T) {
|
|||||||
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 10}},
|
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 10}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "Put can fail but be persisted before txn",
|
||||||
|
operations: []testOperation{
|
||||||
|
// Txn success
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 2}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 3}},
|
||||||
|
// Txn failure
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5"}, resp: EtcdResponse{Revision: 4}},
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 5, GetData: "5"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "Delete only increases revision on success",
|
name: "Delete only increases revision on success",
|
||||||
operations: []testOperation{
|
operations: []testOperation{
|
||||||
@ -216,6 +257,141 @@ func TestModel(t *testing.T) {
|
|||||||
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 4}},
|
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 4}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "Delete can fail but be persisted before txn",
|
||||||
|
operations: []testOperation{
|
||||||
|
// Txn success
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "", TxnNewData: "1"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 3}},
|
||||||
|
// Txn failure
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
|
||||||
|
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{TxnSucceeded: false, Revision: 5}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn sets new value if value matches expected",
|
||||||
|
operations: []testOperation{
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: true}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: false}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: false}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: true}},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 2}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 1}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 2}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn can expect on empty key",
|
||||||
|
operations: []testOperation{
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "", TxnNewData: "2"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: true}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn doesn't do anything if value doesn't match expected",
|
||||||
|
operations: []testOperation{
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: true}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: true}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: false}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: false}},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 1}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 2}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "3", Revision: 1}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "3", Revision: 2}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn can fail and be lost before get",
|
||||||
|
operations: []testOperation{
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2, GetData: "2"}, failure: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn can fail and be lost before delete",
|
||||||
|
operations: []testOperation{
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 2}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn can fail and be lost before put",
|
||||||
|
operations: []testOperation{
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 2}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn can fail but be persisted before get",
|
||||||
|
operations: []testOperation{
|
||||||
|
// One failed request, one persisted.
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1, GetData: "2"}, failure: true},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2, GetData: "2"}},
|
||||||
|
// Two failed request, two persisted.
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "3", TxnNewData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 5, GetData: "5"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn can fail but be persisted before put",
|
||||||
|
operations: []testOperation{
|
||||||
|
// One failed request, one persisted.
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}},
|
||||||
|
// Two failed request, two persisted.
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5", TxnNewData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "7"}, resp: EtcdResponse{Revision: 7}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn can fail but be persisted before delete",
|
||||||
|
operations: []testOperation{
|
||||||
|
// One failed request, one persisted.
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 3}},
|
||||||
|
// Two failed request, two persisted.
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5", TxnNewData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 7}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Txn can fail but be persisted before txn",
|
||||||
|
operations: []testOperation{
|
||||||
|
// One failed request, one persisted with success.
|
||||||
|
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 3, TxnSucceeded: true}},
|
||||||
|
// Two failed request, two persisted with success.
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5", TxnNewData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "6", TxnNewData: "7"}, resp: EtcdResponse{Revision: 7, TxnSucceeded: true}},
|
||||||
|
// One failed request, one persisted with failure.
|
||||||
|
{req: EtcdRequest{Op: Put, Key: "key", PutData: "8"}, resp: EtcdResponse{Revision: 8}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "8", TxnNewData: "9"}, resp: EtcdResponse{Err: errors.New("failed")}},
|
||||||
|
{req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "8", TxnNewData: "10"}, resp: EtcdResponse{Revision: 9}},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range tcs {
|
for _, tc := range tcs {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
@ -20,11 +20,12 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 90}, {operation: Delete, chance: 10}}}
|
DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 90}, {operation: Delete, chance: 5}, {operation: Txn, chance: 5}}}
|
||||||
)
|
)
|
||||||
|
|
||||||
type Traffic interface {
|
type Traffic interface {
|
||||||
@ -50,26 +51,26 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
// Execute one read per one write to avoid operation history include too many failed writes when etcd is down.
|
// Execute one read per one write to avoid operation history include too many failed writes when etcd is down.
|
||||||
err := t.Read(ctx, c, limiter)
|
resp, err := t.Read(ctx, c, limiter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Provide each write with unique id to make it easier to validate operation history.
|
// Provide each write with unique id to make it easier to validate operation history.
|
||||||
t.Write(ctx, c, limiter, ids.RequestId())
|
t.Write(ctx, c, limiter, ids.RequestId(), resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) error {
|
func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) ([]*mvccpb.KeyValue, error) {
|
||||||
getCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
|
getCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
|
||||||
err := c.Get(getCtx, t.key)
|
resp, err := c.Get(getCtx, t.key)
|
||||||
cancel()
|
cancel()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
limiter.Wait(ctx)
|
limiter.Wait(ctx)
|
||||||
}
|
}
|
||||||
return err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id int) error {
|
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id int, kvs []*mvccpb.KeyValue) error {
|
||||||
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
|
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
@ -78,6 +79,12 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit
|
|||||||
err = c.Put(putCtx, t.key, fmt.Sprintf("%d", id))
|
err = c.Put(putCtx, t.key, fmt.Sprintf("%d", id))
|
||||||
case Delete:
|
case Delete:
|
||||||
err = c.Delete(putCtx, t.key)
|
err = c.Delete(putCtx, t.key)
|
||||||
|
case Txn:
|
||||||
|
var value string
|
||||||
|
if len(kvs) != 0 {
|
||||||
|
value = string(kvs[0].Value)
|
||||||
|
}
|
||||||
|
err = c.Txn(putCtx, t.key, value, fmt.Sprintf("%d", id))
|
||||||
default:
|
default:
|
||||||
panic("invalid operation")
|
panic("invalid operation")
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user