mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #15633 from serathius/robustness-enforce-timeout
tests: Enfoce timeout on failpoints
This commit is contained in:
commit
05e2910f15
@ -84,6 +84,9 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
|
func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
successes := 0
|
successes := 0
|
||||||
failures := 0
|
failures := 0
|
||||||
@ -127,14 +130,12 @@ type killFailpoint struct{}
|
|||||||
func (f killFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
func (f killFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||||
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||||
|
|
||||||
killCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
|
|
||||||
defer cancel()
|
|
||||||
for member.IsRunning() {
|
for member.IsRunning() {
|
||||||
err := member.Kill()
|
err := member.Kill()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lg.Info("Sending kill signal failed", zap.Error(err))
|
lg.Info("Sending kill signal failed", zap.Error(err))
|
||||||
}
|
}
|
||||||
err = member.Wait(killCtx)
|
err = member.Wait(ctx)
|
||||||
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
|
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
|
||||||
lg.Info("Failed to kill the process", zap.Error(err))
|
lg.Info("Failed to kill the process", zap.Error(err))
|
||||||
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
|
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
|
||||||
@ -173,12 +174,9 @@ const (
|
|||||||
func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||||
member := f.pickMember(t, clus)
|
member := f.pickMember(t, clus)
|
||||||
|
|
||||||
triggerCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
for member.IsRunning() {
|
for member.IsRunning() {
|
||||||
lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name()))
|
lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name()))
|
||||||
err := member.Failpoints().Setup(triggerCtx, f.failpoint, "panic")
|
err := member.Failpoints().Setup(ctx, f.failpoint, "panic")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err))
|
lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err))
|
||||||
}
|
}
|
||||||
@ -188,13 +186,13 @@ func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Log
|
|||||||
}
|
}
|
||||||
if f.trigger != nil {
|
if f.trigger != nil {
|
||||||
lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name()))
|
lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name()))
|
||||||
err = f.trigger(t, triggerCtx, member, clus)
|
err = f.trigger(t, ctx, member, clus)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err))
|
lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lg.Info("Waiting for member to exit", zap.String("member", member.Config().Name))
|
lg.Info("Waiting for member to exit", zap.String("member", member.Config().Name))
|
||||||
err = member.Wait(triggerCtx)
|
err = member.Wait(ctx)
|
||||||
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
|
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
|
||||||
lg.Info("Member didn't exit as expected", zap.String("member", member.Config().Name), zap.Error(err))
|
lg.Info("Member didn't exit as expected", zap.String("member", member.Config().Name), zap.Error(err))
|
||||||
return fmt.Errorf("member didn't exit as expected: %v", err)
|
return fmt.Errorf("member didn't exit as expected: %v", err)
|
||||||
|
@ -203,14 +203,16 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
|
|||||||
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
|
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
|
||||||
g := errgroup.Group{}
|
g := errgroup.Group{}
|
||||||
finishTraffic := make(chan struct{})
|
finishTraffic := make(chan struct{})
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
|
defer close(finishTraffic)
|
||||||
triggerFailpoints(ctx, t, lg, clus, failpoint)
|
triggerFailpoints(ctx, t, lg, clus, failpoint)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
close(finishTraffic)
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
maxRevisionChan := make(chan int64, 1)
|
maxRevisionChan := make(chan int64, 1)
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
|
defer close(maxRevisionChan)
|
||||||
operations = simulateTraffic(ctx, t, lg, clus, traffic, finishTraffic)
|
operations = simulateTraffic(ctx, t, lg, clus, traffic, finishTraffic)
|
||||||
maxRevisionChan <- operationsMaxRevision(operations)
|
maxRevisionChan <- operationsMaxRevision(operations)
|
||||||
return nil
|
return nil
|
||||||
|
@ -70,7 +70,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
|
|||||||
return memberResponses
|
return memberResponses
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchMember collects all responses until context is cancelled or has observed revision provided via maxRevisionChan.
|
// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
|
||||||
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64) (resps []watchResponse) {
|
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64) (resps []watchResponse) {
|
||||||
var maxRevision int64 = 0
|
var maxRevision int64 = 0
|
||||||
var lastRevision int64 = 0
|
var lastRevision int64 = 0
|
||||||
@ -88,9 +88,17 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
|
|||||||
t.Errorf("Client didn't collect all events, revision got %d, expected: %d", revision, maxRevision)
|
t.Errorf("Client didn't collect all events, revision got %d, expected: %d", revision, maxRevision)
|
||||||
}
|
}
|
||||||
return resps
|
return resps
|
||||||
case maxRevision = <-maxRevisionChan:
|
case revision, ok := <-maxRevisionChan:
|
||||||
if lastRevision >= maxRevision {
|
if ok {
|
||||||
cancel()
|
maxRevision = revision
|
||||||
|
if lastRevision >= maxRevision {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Only cancel if maxRevision was never set.
|
||||||
|
if maxRevision == 0 {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case resp := <-watch:
|
case resp := <-watch:
|
||||||
if resp.Err() == nil {
|
if resp.Err() == nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user