Remove limit from read requests after a failed write

Limit can cause multiple request due to pagination.
For reads after a failed write we would like to return to normal write
request as soon as possible.

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2024-04-12 15:01:09 +02:00
parent bfbfee0afa
commit cadfc407e9

View File

@ -63,6 +63,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
s := newStorage() s := newStorage()
keyPrefix := "/registry/" + t.resource + "/" keyPrefix := "/registry/" + t.resource + "/"
g := errgroup.Group{} g := errgroup.Group{}
readLimit := t.averageKeyCount
g.Go(func() error { g.Go(func() error {
for { for {
@ -73,7 +74,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
return nil return nil
default: default:
} }
rev, err := t.Read(ctx, kc, s, limiter, keyPrefix) rev, err := t.Read(ctx, kc, s, limiter, keyPrefix, readLimit)
if err != nil { if err != nil {
continue continue
} }
@ -92,7 +93,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
} }
// Avoid multiple failed writes in a row // Avoid multiple failed writes in a row
if lastWriteFailed { if lastWriteFailed {
_, err := t.Read(ctx, kc, s, limiter, keyPrefix) _, err := t.Read(ctx, kc, s, limiter, keyPrefix, 0)
if err != nil { if err != nil {
continue continue
} }
@ -107,8 +108,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
g.Wait() g.Wait()
} }
func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string) (rev int64, err error) { func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, limit int) (rev int64, err error) {
limit := int64(t.averageKeyCount)
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
hasMore := true hasMore := true
@ -118,7 +118,7 @@ func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *st
for hasMore { for hasMore {
readCtx, cancel := context.WithTimeout(ctx, RequestTimeout) readCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, limit) resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, int64(limit))
cancel() cancel()
if err != nil { if err != nil {
return 0, err return 0, err