mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
tests: Enfoce timeout on failpoints
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
523f235c82
commit
6582e349db
@ -84,6 +84,9 @@ var (
|
||||
)
|
||||
|
||||
func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
|
||||
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
|
||||
defer cancel()
|
||||
|
||||
var err error
|
||||
successes := 0
|
||||
failures := 0
|
||||
@ -127,14 +130,12 @@ type killFailpoint struct{}
|
||||
func (f killFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||
|
||||
killCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
|
||||
defer cancel()
|
||||
for member.IsRunning() {
|
||||
err := member.Kill()
|
||||
if err != nil {
|
||||
lg.Info("Sending kill signal failed", zap.Error(err))
|
||||
}
|
||||
err = member.Wait(killCtx)
|
||||
err = member.Wait(ctx)
|
||||
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
|
||||
lg.Info("Failed to kill the process", zap.Error(err))
|
||||
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
|
||||
@ -173,12 +174,9 @@ const (
|
||||
func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||
member := f.pickMember(t, clus)
|
||||
|
||||
triggerCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
|
||||
defer cancel()
|
||||
|
||||
for member.IsRunning() {
|
||||
lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name()))
|
||||
err := member.Failpoints().Setup(triggerCtx, f.failpoint, "panic")
|
||||
err := member.Failpoints().Setup(ctx, f.failpoint, "panic")
|
||||
if err != nil {
|
||||
lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err))
|
||||
}
|
||||
@ -188,13 +186,13 @@ func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Log
|
||||
}
|
||||
if f.trigger != nil {
|
||||
lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name()))
|
||||
err = f.trigger(t, triggerCtx, member, clus)
|
||||
err = f.trigger(t, ctx, member, clus)
|
||||
if err != nil {
|
||||
lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
lg.Info("Waiting for member to exit", zap.String("member", member.Config().Name))
|
||||
err = member.Wait(triggerCtx)
|
||||
err = member.Wait(ctx)
|
||||
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
|
||||
lg.Info("Member didn't exit as expected", zap.String("member", member.Config().Name), zap.Error(err))
|
||||
return fmt.Errorf("member didn't exit as expected: %v", err)
|
||||
|
@ -203,14 +203,16 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
|
||||
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
|
||||
g := errgroup.Group{}
|
||||
finishTraffic := make(chan struct{})
|
||||
|
||||
g.Go(func() error {
|
||||
defer close(finishTraffic)
|
||||
triggerFailpoints(ctx, t, lg, clus, failpoint)
|
||||
time.Sleep(time.Second)
|
||||
close(finishTraffic)
|
||||
return nil
|
||||
})
|
||||
maxRevisionChan := make(chan int64, 1)
|
||||
g.Go(func() error {
|
||||
defer close(maxRevisionChan)
|
||||
operations = simulateTraffic(ctx, t, lg, clus, traffic, finishTraffic)
|
||||
maxRevisionChan <- operationsMaxRevision(operations)
|
||||
return nil
|
||||
|
@ -70,7 +70,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
|
||||
return memberResponses
|
||||
}
|
||||
|
||||
// watchMember collects all responses until context is cancelled or has observed revision provided via maxRevisionChan.
|
||||
// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
|
||||
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64) (resps []watchResponse) {
|
||||
var maxRevision int64 = 0
|
||||
var lastRevision int64 = 0
|
||||
@ -88,9 +88,17 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
|
||||
t.Errorf("Client didn't collect all events, revision got %d, expected: %d", revision, maxRevision)
|
||||
}
|
||||
return resps
|
||||
case maxRevision = <-maxRevisionChan:
|
||||
if lastRevision >= maxRevision {
|
||||
cancel()
|
||||
case revision, ok := <-maxRevisionChan:
|
||||
if ok {
|
||||
maxRevision = revision
|
||||
if lastRevision >= maxRevision {
|
||||
cancel()
|
||||
}
|
||||
} else {
|
||||
// Only cancel if maxRevision was never set.
|
||||
if maxRevision == 0 {
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
case resp := <-watch:
|
||||
if resp.Err() == nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user