diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index ff0165003..2f065d84e 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -63,6 +63,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter s := newStorage() keyPrefix := "/registry/" + t.resource + "/" g := errgroup.Group{} + readLimit := t.averageKeyCount g.Go(func() error { for { @@ -73,7 +74,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter return nil default: } - rev, err := t.Read(ctx, kc, s, limiter, keyPrefix) + rev, err := t.Read(ctx, kc, s, limiter, keyPrefix, readLimit) if err != nil { continue } @@ -92,7 +93,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter } // Avoid multiple failed writes in a row if lastWriteFailed { - _, err := t.Read(ctx, kc, s, limiter, keyPrefix) + _, err := t.Read(ctx, kc, s, limiter, keyPrefix, 0) if err != nil { continue } @@ -107,8 +108,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter g.Wait() } -func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string) (rev int64, err error) { - limit := int64(t.averageKeyCount) +func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, limit int) (rev int64, err error) { rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) hasMore := true @@ -118,7 +118,7 @@ func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *st for hasMore { readCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, limit) + resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, int64(limit)) cancel() if err != nil { return 0, err