mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17781 from serathius/robustness-read-limit
Remove limit from read requests after a failed write
This commit is contained in:
commit
452445e2d8
@ -63,6 +63,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
|
|||||||
s := newStorage()
|
s := newStorage()
|
||||||
keyPrefix := "/registry/" + t.resource + "/"
|
keyPrefix := "/registry/" + t.resource + "/"
|
||||||
g := errgroup.Group{}
|
g := errgroup.Group{}
|
||||||
|
readLimit := t.averageKeyCount
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
for {
|
for {
|
||||||
@ -73,7 +74,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
|
|||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
rev, err := t.Read(ctx, kc, s, limiter, keyPrefix)
|
rev, err := t.Read(ctx, kc, s, limiter, keyPrefix, readLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -92,7 +93,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
|
|||||||
}
|
}
|
||||||
// Avoid multiple failed writes in a row
|
// Avoid multiple failed writes in a row
|
||||||
if lastWriteFailed {
|
if lastWriteFailed {
|
||||||
_, err := t.Read(ctx, kc, s, limiter, keyPrefix)
|
_, err := t.Read(ctx, kc, s, limiter, keyPrefix, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -107,8 +108,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
|
|||||||
g.Wait()
|
g.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string) (rev int64, err error) {
|
func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, limit int) (rev int64, err error) {
|
||||||
limit := int64(t.averageKeyCount)
|
|
||||||
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
|
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
|
||||||
|
|
||||||
hasMore := true
|
hasMore := true
|
||||||
@ -118,7 +118,7 @@ func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *st
|
|||||||
|
|
||||||
for hasMore {
|
for hasMore {
|
||||||
readCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
readCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||||
resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, limit)
|
resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, int64(limit))
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user