mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
tests: Ensure that operation history finishes with successful request
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
e11a32366e
commit
6a5d326519
@ -193,19 +193,17 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
|
|||||||
}
|
}
|
||||||
|
|
||||||
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
|
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
|
||||||
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
|
|
||||||
g := errgroup.Group{}
|
g := errgroup.Group{}
|
||||||
trafficCtx, trafficCancel := context.WithCancel(ctx)
|
finishTraffic := make(chan struct{})
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
triggerFailpoints(ctx, t, lg, clus, failpoint)
|
triggerFailpoints(ctx, t, lg, clus, failpoint)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
trafficCancel()
|
close(finishTraffic)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
maxRevisionChan := make(chan int64, 1)
|
maxRevisionChan := make(chan int64, 1)
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
operations = simulateTraffic(trafficCtx, t, lg, clus, traffic)
|
operations = simulateTraffic(ctx, t, lg, clus, traffic, finishTraffic)
|
||||||
time.Sleep(time.Second)
|
|
||||||
maxRevisionChan <- operationsMaxRevision(operations)
|
maxRevisionChan <- operationsMaxRevision(operations)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -54,7 +54,7 @@ const (
|
|||||||
Defragment TrafficRequestType = "defragment"
|
Defragment TrafficRequestType = "defragment"
|
||||||
)
|
)
|
||||||
|
|
||||||
func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig) []porcupine.Operation {
|
func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig, finish <-chan struct{}) []porcupine.Operation {
|
||||||
mux := sync.Mutex{}
|
mux := sync.Mutex{}
|
||||||
endpoints := clus.EndpointsV3()
|
endpoints := clus.EndpointsV3()
|
||||||
|
|
||||||
@ -64,11 +64,15 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
|||||||
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
|
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
cc, err := NewClient(endpoints, ids, startTime)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i := 0; i < config.clientCount; i++ {
|
for i := 0; i < config.clientCount; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
endpoints := []string{endpoints[i%len(endpoints)]}
|
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, startTime)
|
||||||
c, err := NewClient(endpoints, ids, startTime)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -76,7 +80,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
config.traffic.Run(ctx, clientId, c, limiter, ids, lm)
|
config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish)
|
||||||
mux.Lock()
|
mux.Lock()
|
||||||
h = h.Merge(c.history.History)
|
h = h.Merge(c.history.History)
|
||||||
mux.Unlock()
|
mux.Unlock()
|
||||||
@ -84,6 +88,15 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
|||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
endTime := time.Now()
|
endTime := time.Now()
|
||||||
|
|
||||||
|
// Ensure that last operation is succeeds
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
err = cc.Put(ctx, "tombstone", "true")
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
h = h.Merge(cc.history.History)
|
||||||
|
|
||||||
operations := h.Operations()
|
operations := h.Operations()
|
||||||
lg.Info("Recorded operations", zap.Int("count", len(operations)))
|
lg.Info("Recorded operations", zap.Int("count", len(operations)))
|
||||||
|
|
||||||
@ -104,7 +117,7 @@ type trafficConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Traffic interface {
|
type Traffic interface {
|
||||||
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage)
|
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
type traffic struct {
|
type traffic struct {
|
||||||
@ -119,12 +132,14 @@ type requestChance struct {
|
|||||||
chance int
|
chance int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {
|
func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-finish:
|
||||||
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
key := fmt.Sprintf("%d", rand.Int()%t.keyCount)
|
key := fmt.Sprintf("%d", rand.Int()%t.keyCount)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user