Merge pull request #15603 from serathius/robustness-finish-with-success

tests: Ensure that operation history finishes with successful request
This commit is contained in:
Marek Siarkowicz 2023-04-04 12:03:36 +02:00 committed by GitHub
commit 523f235c82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 11 deletions

View File

@ -201,19 +201,17 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
}
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
g := errgroup.Group{}
trafficCtx, trafficCancel := context.WithCancel(ctx)
finishTraffic := make(chan struct{})
g.Go(func() error {
triggerFailpoints(ctx, t, lg, clus, failpoint)
time.Sleep(time.Second)
trafficCancel()
close(finishTraffic)
return nil
})
maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
operations = simulateTraffic(trafficCtx, t, lg, clus, traffic)
time.Sleep(time.Second)
operations = simulateTraffic(ctx, t, lg, clus, traffic, finishTraffic)
maxRevisionChan <- operationsMaxRevision(operations)
return nil
})

View File

@ -54,7 +54,7 @@ const (
Defragment TrafficRequestType = "defragment"
)
func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig) []porcupine.Operation {
func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig, finish <-chan struct{}) []porcupine.Operation {
mux := sync.Mutex{}
endpoints := clus.EndpointsGRPC()
@ -64,11 +64,15 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
startTime := time.Now()
cc, err := NewClient(endpoints, ids, startTime)
if err != nil {
t.Fatal(err)
}
defer cc.Close()
wg := sync.WaitGroup{}
for i := 0; i < config.clientCount; i++ {
wg.Add(1)
endpoints := []string{endpoints[i%len(endpoints)]}
c, err := NewClient(endpoints, ids, startTime)
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, startTime)
if err != nil {
t.Fatal(err)
}
@ -76,7 +80,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
defer wg.Done()
defer c.Close()
config.traffic.Run(ctx, clientId, c, limiter, ids, lm)
config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish)
mux.Lock()
h = h.Merge(c.history.History)
mux.Unlock()
@ -84,6 +88,15 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}
wg.Wait()
endTime := time.Now()
// Ensure that last operation is succeeds
time.Sleep(time.Second)
err = cc.Put(ctx, "tombstone", "true")
if err != nil {
t.Error(err)
}
h = h.Merge(cc.history.History)
operations := h.Operations()
lg.Info("Recorded operations", zap.Int("count", len(operations)))
@ -104,7 +117,7 @@ type trafficConfig struct {
}
type Traffic interface {
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage)
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{})
}
type traffic struct {
@ -119,12 +132,14 @@ type requestChance struct {
chance int
}
func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {
func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-finish:
return
default:
}
key := fmt.Sprintf("%d", rand.Int()%t.keyCount)