tests/robustness: Implement Kubernetes pagination

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-06-19 21:21:03 +02:00
parent f985890ac3
commit c3720fac3d
5 changed files with 86 additions and 64 deletions

View File

@ -17,6 +17,8 @@ package model
import (
"fmt"
"strings"
clientv3 "go.etcd.io/etcd/client/v3"
)
func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) string {
@ -133,7 +135,7 @@ func describeRangeRequest(opts RangeOptions, revision int64) string {
switch {
case opts.End == "":
return fmt.Sprintf("get(%q%s)", opts.Start, kwargsString)
case opts.End == prefixEnd(opts.Start):
case opts.End == clientv3.GetPrefixRangeEnd(opts.Start):
return fmt.Sprintf("list(%q%s)", opts.Start, kwargsString)
default:
return fmt.Sprintf("range(%q..%q%s)", opts.Start, opts.End, kwargsString)

View File

@ -54,18 +54,14 @@ func NewAppendableHistory(ids identity.Provider) *AppendableHistory {
}
}
func (h *AppendableHistory) AppendRange(key string, withPrefix bool, revision int64, start, end time.Duration, resp *clientv3.GetResponse) {
func (h *AppendableHistory) AppendRange(startKey, endKey string, revision, limit int64, start, end time.Duration, resp *clientv3.GetResponse) {
var respRevision int64
if resp != nil && resp.Header != nil {
respRevision = resp.Header.Revision
}
var keyEnd string
if withPrefix {
keyEnd = prefixEnd(key)
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
Input: staleRangeRequest(key, keyEnd, 0, revision),
Input: staleRangeRequest(startKey, endKey, limit, revision),
Call: start.Nanoseconds(),
Output: rangeResponse(resp.Kvs, resp.Count, respRevision),
Return: end.Nanoseconds(),
@ -363,22 +359,7 @@ func listRequest(key string, limit int64) EtcdRequest {
}
func staleListRequest(key string, limit, revision int64) EtcdRequest {
return staleRangeRequest(key, prefixEnd(key), limit, revision)
}
// prefixEnd gets the range end of the prefix.
// Notice: Keep in sync with /client/v3/op.go getPrefix function.
func prefixEnd(key string) string {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
if end[i] < 0xff {
end[i] = end[i] + 1
end = end[:i+1]
return string(end)
}
}
return "\x00"
return staleRangeRequest(key, clientv3.GetPrefixRangeEnd(key), limit, revision)
}
func staleRangeRequest(start, end string, limit, revision int64) EtcdRequest {

View File

@ -102,7 +102,7 @@ func (r ClientReport) WatchEventCount() int {
}
func (c *RecordingClient) Get(ctx context.Context, key string, revision int64) (kv *mvccpb.KeyValue, rev int64, err error) {
resp, err := c.Range(ctx, key, false, revision)
resp, err := c.Range(ctx, key, "", revision, 0)
if err != nil {
return nil, 0, err
}
@ -112,23 +112,26 @@ func (c *RecordingClient) Get(ctx context.Context, key string, revision int64) (
return kv, resp.Header.Revision, nil
}
func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool, revision int64) (*clientv3.GetResponse, error) {
func (c *RecordingClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) {
ops := []clientv3.OpOption{}
if withPrefix {
ops = append(ops, clientv3.WithPrefix())
if end != "" {
ops = append(ops, clientv3.WithRange(end))
}
if revision != 0 {
ops = append(ops, clientv3.WithRev(revision))
}
if limit != 0 {
ops = append(ops, clientv3.WithLimit(limit))
}
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Get(ctx, key, ops...)
resp, err := c.client.Get(ctx, start, ops...)
if err != nil {
return nil, err
}
returnTime := time.Since(c.baseTime)
c.operations.AppendRange(key, withPrefix, revision, callTime, returnTime, resp)
c.operations.AppendRange(start, end, revision, limit, callTime, returnTime, resp)
return resp, nil
}

View File

@ -153,6 +153,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel()
var limit int64 = 0
switch request {
case StaleGet:
_, rev, err = c.client.Get(opCtx, c.randomKey(), lastRev)
@ -160,13 +161,13 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
_, rev, err = c.client.Get(opCtx, c.randomKey(), 0)
case List:
var resp *clientv3.GetResponse
resp, err = c.client.Range(opCtx, c.keyPrefix, true, 0)
resp, err = c.client.Range(ctx, c.keyPrefix, clientv3.GetPrefixRangeEnd(c.keyPrefix), 0, limit)
if resp != nil {
rev = resp.Header.Revision
}
case StaleList:
var resp *clientv3.GetResponse
resp, err = c.client.Range(opCtx, c.keyPrefix, true, lastRev)
resp, err = c.client.Range(ctx, c.keyPrefix, clientv3.GetPrefixRangeEnd(c.keyPrefix), lastRev, limit)
if resp != nil {
rev = resp.Header.Revision
}

View File

@ -75,19 +75,11 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
return nil
default:
}
listCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := kc.List(listCtx, keyPrefix)
cancel()
rev, err := t.Read(ctx, kc, s, limiter, keyPrefix)
if err != nil {
continue
}
s.Reset(resp)
limiter.Wait(ctx)
watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout)
for e := range c.Watch(watchCtx, keyPrefix, resp.Header.Revision+1, true, true) {
s.Update(e)
}
cancel()
t.Watch(ctx, kc, s, limiter, keyPrefix, rev+1)
}
})
g.Go(func() error {
@ -102,54 +94,93 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
}
// Avoid multiple failed writes in a row
if lastWriteFailed {
listCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := kc.List(listCtx, keyPrefix)
cancel()
_, err := t.Read(ctx, kc, s, limiter, keyPrefix)
if err != nil {
continue
}
s.Reset(resp)
limiter.Wait(ctx)
}
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
err := t.Write(writeCtx, kc, ids, s)
cancel()
err := t.Write(ctx, kc, ids, s, limiter)
lastWriteFailed = err != nil
if err != nil {
continue
}
limiter.Wait(ctx)
}
})
g.Wait()
}
func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage) (err error) {
func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string) (rev int64, err error) {
limit := int64(t.averageKeyCount)
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
hasMore := true
rangeStart := keyPrefix
var kvs []*mvccpb.KeyValue
var revision int64 = 0
for hasMore {
readCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, limit)
cancel()
if err != nil {
return 0, err
}
limiter.Wait(ctx)
hasMore = resp.More
if len(resp.Kvs) > 0 && hasMore {
rangeStart = string(resp.Kvs[len(resp.Kvs)-1].Key) + "\x00"
}
kvs = append(kvs, resp.Kvs...)
if revision == 0 {
revision = resp.Header.Revision
}
}
s.Reset(revision, kvs)
return revision, nil
}
func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter) (err error) {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel()
count := s.Count()
if count < t.averageKeyCount/2 {
err = kc.OptimisticCreate(ctx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
} else {
key, rev := s.PickRandom()
if rev == 0 {
return errors.New("storage empty")
}
if count > t.averageKeyCount*3/2 {
_, err = kc.OptimisticDelete(ctx, key, rev)
_, err = kc.OptimisticDelete(writeCtx, key, rev)
} else {
op := pickRandom(t.writeChoices)
switch op {
case KubernetesDelete:
_, err = kc.OptimisticDelete(ctx, key, rev)
_, err = kc.OptimisticDelete(writeCtx, key, rev)
case KubernetesUpdate:
_, err = kc.OptimisticUpdate(ctx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev)
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(ctx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
}
}
return err
if err != nil {
return err
}
limiter.Wait(ctx)
return nil
}
func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) {
watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout)
defer cancel()
for e := range kc.client.Watch(watchCtx, keyPrefix, revision, true, true) {
s.Update(e)
}
limiter.Wait(ctx)
}
func (t kubernetesTraffic) generateKey() string {
@ -168,14 +199,18 @@ type kubernetesClient struct {
client *RecordingClient
}
func (k kubernetesClient) List(ctx context.Context, key string) (*clientv3.GetResponse, error) {
resp, err := k.client.Range(ctx, key, true, 0)
func (k kubernetesClient) List(ctx context.Context, prefix string, revision, limit int64) (*clientv3.GetResponse, error) {
resp, err := k.client.Range(ctx, prefix, clientv3.GetPrefixRangeEnd(prefix), revision, limit)
if err != nil {
return nil, err
}
return resp, err
}
func (k kubernetesClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) {
return k.client.Range(ctx, start, end, revision, limit)
}
func (k kubernetesClient) OptimisticDelete(ctx context.Context, key string, expectedRevision int64) (*mvccpb.KeyValue, error) {
return k.optimisticOperationOrGet(ctx, key, clientv3.OpDelete(key), expectedRevision)
}
@ -237,17 +272,17 @@ func (s *storage) Update(resp clientv3.WatchResponse) {
}
}
func (s *storage) Reset(resp *clientv3.GetResponse) {
func (s *storage) Reset(revision int64, kvs []*mvccpb.KeyValue) {
s.mux.Lock()
defer s.mux.Unlock()
if resp.Header.Revision <= s.revision {
if revision <= s.revision {
return
}
s.keyRevision = make(map[string]int64, len(resp.Kvs))
for _, kv := range resp.Kvs {
s.keyRevision = make(map[string]int64, len(kvs))
for _, kv := range kvs {
s.keyRevision[string(kv.Key)] = kv.ModRevision
}
s.revision = resp.Header.Revision
s.revision = revision
}
func (s *storage) Count() int {