tests/robustness: Prevent to many concurrent non-unique writes which are causing linearization to timeout

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-07-02 15:56:48 +02:00
parent 57a583d140
commit 8fca6ebdb2
5 changed files with 170 additions and 26 deletions

View File

@ -29,10 +29,11 @@ import (
var ( var (
LowTraffic = Config{ LowTraffic = Config{
Name: "LowTraffic", Name: "LowTraffic",
minimalQPS: 100, minimalQPS: 100,
maximalQPS: 200, maximalQPS: 200,
clientCount: 8, clientCount: 8,
maxNonUniqueRequestConcurrency: 3,
Traffic: etcdTraffic{ Traffic: etcdTraffic{
keyCount: 10, keyCount: 10,
leaseTTL: DefaultLeaseTTL, leaseTTL: DefaultLeaseTTL,
@ -53,10 +54,11 @@ var (
}, },
} }
HighTraffic = Config{ HighTraffic = Config{
Name: "HighTraffic", Name: "HighTraffic",
minimalQPS: 200, minimalQPS: 200,
maximalQPS: 1000, maximalQPS: 1000,
clientCount: 12, clientCount: 12,
maxNonUniqueRequestConcurrency: 3,
Traffic: etcdTraffic{ Traffic: etcdTraffic{
keyCount: 10, keyCount: 10,
largePutSize: 32769, largePutSize: 32769,
@ -102,7 +104,7 @@ const (
Defragment etcdRequestType = "defragment" Defragment etcdRequestType = "defragment"
) )
func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
lastOperationSucceeded := true lastOperationSucceeded := true
var lastRev int64 var lastRev int64
var requestType etcdRequestType var requestType etcdRequestType
@ -124,11 +126,18 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
} }
// Avoid multiple failed writes in a row // Avoid multiple failed writes in a row
if lastOperationSucceeded { if lastOperationSucceeded {
requestType = pickRandom(t.requests) choices := t.requests
if !nonUniqueWriteLimiter.Take() {
choices = filterOutNonUniqueEtcdWrites(choices)
}
requestType = pickRandom(choices)
} else { } else {
requestType = Get requestType = Get
} }
rev, err := client.Request(ctx, requestType, lastRev) rev, err := client.Request(ctx, requestType, lastRev)
if requestType == Delete || requestType == LeaseRevoke {
nonUniqueWriteLimiter.Return()
}
lastOperationSucceeded = err == nil lastOperationSucceeded = err == nil
if err != nil { if err != nil {
continue continue
@ -140,6 +149,15 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
} }
} }
func filterOutNonUniqueEtcdWrites(choices []choiceWeight[etcdRequestType]) (resp []choiceWeight[etcdRequestType]) {
for _, choice := range choices {
if choice.choice != Delete && choice.choice != LeaseRevoke {
resp = append(resp, choice)
}
}
return resp
}
type etcdTrafficClient struct { type etcdTrafficClient struct {
etcdTraffic etcdTraffic
keyPrefix string keyPrefix string

View File

@ -32,10 +32,11 @@ import (
var ( var (
KubernetesTraffic = Config{ KubernetesTraffic = Config{
Name: "Kubernetes", Name: "Kubernetes",
minimalQPS: 200, minimalQPS: 200,
maximalQPS: 1000, maximalQPS: 1000,
clientCount: 12, clientCount: 12,
maxNonUniqueRequestConcurrency: 3,
Traffic: kubernetesTraffic{ Traffic: kubernetesTraffic{
averageKeyCount: 10, averageKeyCount: 10,
resource: "pods", resource: "pods",
@ -60,7 +61,7 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool {
return true return true
} }
func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
kc := &kubernetesClient{client: c} kc := &kubernetesClient{client: c}
s := newStorage() s := newStorage()
keyPrefix := "/registry/" + t.resource + "/" keyPrefix := "/registry/" + t.resource + "/"
@ -99,7 +100,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
continue continue
} }
} }
err := t.Write(ctx, kc, ids, s, limiter) err := t.Write(ctx, kc, ids, s, limiter, nonUniqueWriteLimiter)
lastWriteFailed = err != nil lastWriteFailed = err != nil
if err != nil { if err != nil {
continue continue
@ -140,7 +141,7 @@ func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *st
return revision, nil return revision, nil
} }
func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter) (err error) { func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter, nonUniqueWriteLimiter ConcurrencyLimiter) (err error) {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel() defer cancel()
count := s.Count() count := s.Count()
@ -151,13 +152,19 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
if rev == 0 { if rev == 0 {
return errors.New("storage empty") return errors.New("storage empty")
} }
if count > t.averageKeyCount*3/2 { if count > t.averageKeyCount*3/2 && nonUniqueWriteLimiter.Take() {
_, err = kc.OptimisticDelete(writeCtx, key, rev) _, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
} else { } else {
op := pickRandom(t.writeChoices) choices := t.writeChoices
if !nonUniqueWriteLimiter.Take() {
choices = filterOutNonUniqueKuberntesWrites(t.writeChoices)
}
op := pickRandom(choices)
switch op { switch op {
case KubernetesDelete: case KubernetesDelete:
_, err = kc.OptimisticDelete(writeCtx, key, rev) _, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
case KubernetesUpdate: case KubernetesUpdate:
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev) _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev)
case KubernetesCreate: case KubernetesCreate:
@ -174,6 +181,15 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
return nil return nil
} }
func filterOutNonUniqueKuberntesWrites(choices []choiceWeight[KubernetesRequestType]) (resp []choiceWeight[KubernetesRequestType]) {
for _, choice := range choices {
if choice.choice != KubernetesDelete {
resp = append(resp, choice)
}
}
return resp
}
func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) { func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) {
watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout) watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout)
defer cancel() defer cancel()

View File

@ -0,0 +1,47 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package traffic
func NewConcurrencyLimiter(size int) ConcurrencyLimiter {
return &concurrencyLimiter{
ch: make(chan struct{}, size),
}
}
type ConcurrencyLimiter interface {
Take() bool
Return()
}
type concurrencyLimiter struct {
ch chan struct{}
}
func (c *concurrencyLimiter) Take() bool {
select {
case c.ch <- struct{}{}:
return true
default:
return false
}
}
func (c *concurrencyLimiter) Return() {
select {
case _ = <-c.ch:
default:
panic("Call to Return() without a successful Take")
}
}

View File

@ -0,0 +1,61 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package traffic
import (
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
)
func TestLimiter(t *testing.T) {
limiter := NewConcurrencyLimiter(3)
counter := &atomic.Int64{}
g := errgroup.Group{}
for i := 0; i < 10; i++ {
g.Go(func() error {
if limiter.Take() {
counter.Add(1)
}
return nil
})
}
g.Wait()
assert.Equal(t, 3, int(counter.Load()))
assert.False(t, limiter.Take())
limiter.Return()
counter.Store(0)
for i := 0; i < 10; i++ {
g.Go(func() error {
if limiter.Take() {
counter.Add(1)
}
return nil
})
}
g.Wait()
assert.Equal(t, 1, int(counter.Load()))
assert.False(t, limiter.Take())
limiter.Return()
limiter.Return()
limiter.Return()
assert.Panics(t, func() {
limiter.Return()
})
}

View File

@ -51,6 +51,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
} }
defer cc.Close() defer cc.Close()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
nonUniqueWriteLimiter := NewConcurrencyLimiter(config.maxNonUniqueRequestConcurrency)
for i := 0; i < config.clientCount; i++ { for i := 0; i < config.clientCount; i++ {
wg.Add(1) wg.Add(1)
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime)
@ -61,7 +62,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
defer wg.Done() defer wg.Done()
defer c.Close() defer c.Close()
config.Traffic.Run(ctx, c, limiter, ids, lm, finish) config.Traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish)
mux.Lock() mux.Lock()
reports = append(reports, c.Report()) reports = append(reports, c.Report())
mux.Unlock() mux.Unlock()
@ -93,14 +94,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
} }
type Config struct { type Config struct {
Name string Name string
minimalQPS float64 minimalQPS float64
maximalQPS float64 maximalQPS float64
clientCount int maxNonUniqueRequestConcurrency int
Traffic Traffic clientCount int
Traffic Traffic
} }
type Traffic interface { type Traffic interface {
Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) Run(ctx context.Context, c *RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
ExpectUniqueRevision() bool ExpectUniqueRevision() bool
} }