From c3720fac3d5a4ee93838ebc28cafbe3e3842a0bd Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 19 Jun 2023 21:21:03 +0200 Subject: [PATCH] tests/robustness: Implement Kubernetes pagination Signed-off-by: Marek Siarkowicz --- tests/robustness/model/describe.go | 4 +- tests/robustness/model/history.go | 25 +----- tests/robustness/traffic/client.go | 15 ++-- tests/robustness/traffic/etcd.go | 5 +- tests/robustness/traffic/kubernetes.go | 101 +++++++++++++++++-------- 5 files changed, 86 insertions(+), 64 deletions(-) diff --git a/tests/robustness/model/describe.go b/tests/robustness/model/describe.go index 2057f94e2..3e5ccf97f 100644 --- a/tests/robustness/model/describe.go +++ b/tests/robustness/model/describe.go @@ -17,6 +17,8 @@ package model import ( "fmt" "strings" + + clientv3 "go.etcd.io/etcd/client/v3" ) func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) string { @@ -133,7 +135,7 @@ func describeRangeRequest(opts RangeOptions, revision int64) string { switch { case opts.End == "": return fmt.Sprintf("get(%q%s)", opts.Start, kwargsString) - case opts.End == prefixEnd(opts.Start): + case opts.End == clientv3.GetPrefixRangeEnd(opts.Start): return fmt.Sprintf("list(%q%s)", opts.Start, kwargsString) default: return fmt.Sprintf("range(%q..%q%s)", opts.Start, opts.End, kwargsString) diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index 98f4818ed..deb96a01a 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -54,18 +54,14 @@ func NewAppendableHistory(ids identity.Provider) *AppendableHistory { } } -func (h *AppendableHistory) AppendRange(key string, withPrefix bool, revision int64, start, end time.Duration, resp *clientv3.GetResponse) { +func (h *AppendableHistory) AppendRange(startKey, endKey string, revision, limit int64, start, end time.Duration, resp *clientv3.GetResponse) { var respRevision int64 if resp != nil && resp.Header != nil { respRevision = resp.Header.Revision } - var keyEnd string - if withPrefix { - keyEnd = prefixEnd(key) - } h.appendSuccessful(porcupine.Operation{ ClientId: h.streamId, - Input: staleRangeRequest(key, keyEnd, 0, revision), + Input: staleRangeRequest(startKey, endKey, limit, revision), Call: start.Nanoseconds(), Output: rangeResponse(resp.Kvs, resp.Count, respRevision), Return: end.Nanoseconds(), @@ -363,22 +359,7 @@ func listRequest(key string, limit int64) EtcdRequest { } func staleListRequest(key string, limit, revision int64) EtcdRequest { - return staleRangeRequest(key, prefixEnd(key), limit, revision) -} - -// prefixEnd gets the range end of the prefix. -// Notice: Keep in sync with /client/v3/op.go getPrefix function. -func prefixEnd(key string) string { - end := make([]byte, len(key)) - copy(end, key) - for i := len(end) - 1; i >= 0; i-- { - if end[i] < 0xff { - end[i] = end[i] + 1 - end = end[:i+1] - return string(end) - } - } - return "\x00" + return staleRangeRequest(key, clientv3.GetPrefixRangeEnd(key), limit, revision) } func staleRangeRequest(start, end string, limit, revision int64) EtcdRequest { diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 01973b346..7c9b3e9c4 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -102,7 +102,7 @@ func (r ClientReport) WatchEventCount() int { } func (c *RecordingClient) Get(ctx context.Context, key string, revision int64) (kv *mvccpb.KeyValue, rev int64, err error) { - resp, err := c.Range(ctx, key, false, revision) + resp, err := c.Range(ctx, key, "", revision, 0) if err != nil { return nil, 0, err } @@ -112,23 +112,26 @@ func (c *RecordingClient) Get(ctx context.Context, key string, revision int64) ( return kv, resp.Header.Revision, nil } -func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool, revision int64) (*clientv3.GetResponse, error) { +func (c *RecordingClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) { ops := []clientv3.OpOption{} - if withPrefix { - ops = append(ops, clientv3.WithPrefix()) + if end != "" { + ops = append(ops, clientv3.WithRange(end)) } if revision != 0 { ops = append(ops, clientv3.WithRev(revision)) } + if limit != 0 { + ops = append(ops, clientv3.WithLimit(limit)) + } c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) - resp, err := c.client.Get(ctx, key, ops...) + resp, err := c.client.Get(ctx, start, ops...) if err != nil { return nil, err } returnTime := time.Since(c.baseTime) - c.operations.AppendRange(key, withPrefix, revision, callTime, returnTime, resp) + c.operations.AppendRange(start, end, revision, limit, callTime, returnTime, resp) return resp, nil } diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 2754fd80c..66bf876fe 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -153,6 +153,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, opCtx, cancel := context.WithTimeout(ctx, RequestTimeout) defer cancel() + var limit int64 = 0 switch request { case StaleGet: _, rev, err = c.client.Get(opCtx, c.randomKey(), lastRev) @@ -160,13 +161,13 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, _, rev, err = c.client.Get(opCtx, c.randomKey(), 0) case List: var resp *clientv3.GetResponse - resp, err = c.client.Range(opCtx, c.keyPrefix, true, 0) + resp, err = c.client.Range(ctx, c.keyPrefix, clientv3.GetPrefixRangeEnd(c.keyPrefix), 0, limit) if resp != nil { rev = resp.Header.Revision } case StaleList: var resp *clientv3.GetResponse - resp, err = c.client.Range(opCtx, c.keyPrefix, true, lastRev) + resp, err = c.client.Range(ctx, c.keyPrefix, clientv3.GetPrefixRangeEnd(c.keyPrefix), lastRev, limit) if resp != nil { rev = resp.Header.Revision } diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 2539fcab1..4e7319444 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -75,19 +75,11 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter return nil default: } - listCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := kc.List(listCtx, keyPrefix) - cancel() + rev, err := t.Read(ctx, kc, s, limiter, keyPrefix) if err != nil { continue } - s.Reset(resp) - limiter.Wait(ctx) - watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout) - for e := range c.Watch(watchCtx, keyPrefix, resp.Header.Revision+1, true, true) { - s.Update(e) - } - cancel() + t.Watch(ctx, kc, s, limiter, keyPrefix, rev+1) } }) g.Go(func() error { @@ -102,54 +94,93 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter } // Avoid multiple failed writes in a row if lastWriteFailed { - listCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := kc.List(listCtx, keyPrefix) - cancel() + _, err := t.Read(ctx, kc, s, limiter, keyPrefix) if err != nil { continue } - s.Reset(resp) - limiter.Wait(ctx) } - writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - err := t.Write(writeCtx, kc, ids, s) - cancel() + err := t.Write(ctx, kc, ids, s, limiter) lastWriteFailed = err != nil if err != nil { continue } - limiter.Wait(ctx) } }) g.Wait() } -func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage) (err error) { +func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string) (rev int64, err error) { + limit := int64(t.averageKeyCount) + rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) + + hasMore := true + rangeStart := keyPrefix + var kvs []*mvccpb.KeyValue + var revision int64 = 0 + + for hasMore { + readCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, limit) + cancel() + if err != nil { + return 0, err + } + limiter.Wait(ctx) + + hasMore = resp.More + if len(resp.Kvs) > 0 && hasMore { + rangeStart = string(resp.Kvs[len(resp.Kvs)-1].Key) + "\x00" + } + kvs = append(kvs, resp.Kvs...) + if revision == 0 { + revision = resp.Header.Revision + } + } + s.Reset(revision, kvs) + return revision, nil +} + +func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter) (err error) { + writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + defer cancel() count := s.Count() if count < t.averageKeyCount/2 { - err = kc.OptimisticCreate(ctx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) + err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) } else { key, rev := s.PickRandom() if rev == 0 { return errors.New("storage empty") } if count > t.averageKeyCount*3/2 { - _, err = kc.OptimisticDelete(ctx, key, rev) + _, err = kc.OptimisticDelete(writeCtx, key, rev) } else { op := pickRandom(t.writeChoices) switch op { case KubernetesDelete: - _, err = kc.OptimisticDelete(ctx, key, rev) + _, err = kc.OptimisticDelete(writeCtx, key, rev) case KubernetesUpdate: - _, err = kc.OptimisticUpdate(ctx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev) + _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev) case KubernetesCreate: - err = kc.OptimisticCreate(ctx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) + err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) default: panic(fmt.Sprintf("invalid choice: %q", op)) } } } - return err + if err != nil { + return err + } + limiter.Wait(ctx) + return nil +} + +func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) { + watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout) + defer cancel() + for e := range kc.client.Watch(watchCtx, keyPrefix, revision, true, true) { + s.Update(e) + } + limiter.Wait(ctx) } func (t kubernetesTraffic) generateKey() string { @@ -168,14 +199,18 @@ type kubernetesClient struct { client *RecordingClient } -func (k kubernetesClient) List(ctx context.Context, key string) (*clientv3.GetResponse, error) { - resp, err := k.client.Range(ctx, key, true, 0) +func (k kubernetesClient) List(ctx context.Context, prefix string, revision, limit int64) (*clientv3.GetResponse, error) { + resp, err := k.client.Range(ctx, prefix, clientv3.GetPrefixRangeEnd(prefix), revision, limit) if err != nil { return nil, err } return resp, err } +func (k kubernetesClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) { + return k.client.Range(ctx, start, end, revision, limit) +} + func (k kubernetesClient) OptimisticDelete(ctx context.Context, key string, expectedRevision int64) (*mvccpb.KeyValue, error) { return k.optimisticOperationOrGet(ctx, key, clientv3.OpDelete(key), expectedRevision) } @@ -237,17 +272,17 @@ func (s *storage) Update(resp clientv3.WatchResponse) { } } -func (s *storage) Reset(resp *clientv3.GetResponse) { +func (s *storage) Reset(revision int64, kvs []*mvccpb.KeyValue) { s.mux.Lock() defer s.mux.Unlock() - if resp.Header.Revision <= s.revision { + if revision <= s.revision { return } - s.keyRevision = make(map[string]int64, len(resp.Kvs)) - for _, kv := range resp.Kvs { + s.keyRevision = make(map[string]int64, len(kvs)) + for _, kv := range kvs { s.keyRevision[string(kv.Key)] = kv.ModRevision } - s.revision = resp.Header.Revision + s.revision = revision } func (s *storage) Count() int {