tests/robustness: Add List and StaleList requests to etcd traffic

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-06-21 10:32:25 +02:00
parent de6415801e
commit 9fc438cb6b

View File

@ -38,8 +38,10 @@ var (
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
requests: []choiceWeight[etcdRequestType]{
{choice: Get, weight: 25},
{choice: StaleGet, weight: 25},
{choice: Get, weight: 15},
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 23},
{choice: LargePut, weight: 2},
{choice: Delete, weight: 5},
@ -60,8 +62,10 @@ var (
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
requests: []choiceWeight[etcdRequestType]{
{choice: Get, weight: 25},
{choice: StaleGet, weight: 25},
{choice: Get, weight: 15},
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 40},
{choice: MultiOpTxn, weight: 5},
{choice: LargePut, weight: 5},
@ -86,6 +90,8 @@ type etcdRequestType string
const (
Get etcdRequestType = "get"
StaleGet etcdRequestType = "staleGet"
List etcdRequestType = "list"
StaleList etcdRequestType = "staleList"
Put etcdRequestType = "put"
LargePut etcdRequestType = "largePut"
Delete etcdRequestType = "delete"
@ -102,6 +108,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
var requestType etcdRequestType
client := etcdTrafficClient{
etcdTraffic: t,
keyPrefix: "key",
client: c,
limiter: limiter,
idProvider: ids,
@ -115,14 +122,13 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
return
default:
}
key := fmt.Sprintf("%d", rand.Int()%t.keyCount)
// Avoid multiple failed writes in a row
if lastOperationSucceeded {
requestType = pickRandom(t.requests)
} else {
requestType = Get
}
rev, err := client.Request(ctx, requestType, key, lastRev)
rev, err := client.Request(ctx, requestType, lastRev)
lastOperationSucceeded = err == nil
if err != nil {
continue
@ -136,35 +142,49 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
type etcdTrafficClient struct {
etcdTraffic
keyPrefix string
client *RecordingClient
limiter *rate.Limiter
idProvider identity.Provider
leaseStorage identity.LeaseIdStorage
}
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, key string, lastRev int64) (rev int64, err error) {
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, lastRev int64) (rev int64, err error) {
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel()
switch request {
case StaleGet:
_, rev, err = c.client.Get(opCtx, key, lastRev)
_, rev, err = c.client.Get(opCtx, c.randomKey(), lastRev)
case Get:
_, rev, err = c.client.Get(opCtx, key, 0)
_, rev, err = c.client.Get(opCtx, c.randomKey(), 0)
case List:
var resp *clientv3.GetResponse
resp, err = c.client.Range(opCtx, c.keyPrefix, true, 0)
if resp != nil {
rev = resp.Header.Revision
}
case StaleList:
var resp *clientv3.GetResponse
resp, err = c.client.Range(opCtx, c.keyPrefix, true, lastRev)
if resp != nil {
rev = resp.Header.Revision
}
case Put:
var resp *clientv3.PutResponse
resp, err = c.client.Put(opCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))
resp, err = c.client.Put(opCtx, c.randomKey(), fmt.Sprintf("%d", c.idProvider.NewRequestId()))
if resp != nil {
rev = resp.Header.Revision
}
case LargePut:
var resp *clientv3.PutResponse
resp, err = c.client.Put(opCtx, key, randString(c.largePutSize))
resp, err = c.client.Put(opCtx, c.randomKey(), randString(c.largePutSize))
if resp != nil {
rev = resp.Header.Revision
}
case Delete:
var resp *clientv3.DeleteResponse
resp, err = c.client.Delete(opCtx, key)
resp, err = c.client.Delete(opCtx, c.randomKey())
if resp != nil {
rev = resp.Header.Revision
}
@ -174,9 +194,9 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
if resp != nil {
rev = resp.Header.Revision
}
case CompareAndSet:
var kv *mvccpb.KeyValue
key := c.randomKey()
kv, rev, err = c.client.Get(opCtx, key, 0)
if err == nil {
c.limiter.Wait(ctx)
@ -209,7 +229,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.PutResponse
resp, err = c.client.PutWithLease(putCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId)
resp, err = c.client.PutWithLease(putCtx, c.randomKey(), fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId)
putCancel()
if resp != nil {
rev = resp.Header.Revision
@ -237,7 +257,6 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
default:
panic("invalid choice")
}
cancel()
return rev, err
}
@ -274,6 +293,10 @@ func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) {
return ops
}
func (c etcdTrafficClient) randomKey() string {
return fmt.Sprintf("%s%d", c.keyPrefix, rand.Int()%c.keyCount)
}
func (t etcdTraffic) pickOperationType() model.OperationType {
roll := rand.Int() % 100
if roll < 10 {