Merge pull request #16859 from ZhouJianMS/zhoujian/raft-io-stall

Add robustness failpoint for IO stall in raft loop
This commit is contained in:
Marek Siarkowicz 2023-11-03 13:57:18 +01:00 committed by GitHub
commit d8284a1a2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 61 additions and 0 deletions

View File

@ -369,6 +369,28 @@ func (f *BinaryFailpoints) SetupHTTP(ctx context.Context, failpoint, payload str
return nil
}
func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string) error {
host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().GoFailPort)
failpointUrl := url.URL{
Scheme: "http",
Host: host,
Path: failpoint,
}
r, err := http.NewRequestWithContext(ctx, "DELETE", failpointUrl.String(), nil)
if err != nil {
return err
}
resp, err := httpClient.Do(r)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return nil
}
var httpClient = http.Client{
Timeout: 10 * time.Millisecond,
}

View File

@ -48,6 +48,8 @@ var (
BeforeApplyOneConfChangeSleep,
MemberReplace,
DropPeerNetwork,
RaftBeforeSaveSleep,
RaftAfterSaveSleep,
}
)

View File

@ -54,6 +54,8 @@ var (
RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
BeforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second}
RaftBeforeSaveSleep Failpoint = gofailSleepAndDeactivate{"raftBeforeSave", time.Second}
RaftAfterSaveSleep Failpoint = gofailSleepAndDeactivate{"raftAfterSave", time.Second}
)
type goPanicFailpoint struct {
@ -189,3 +191,38 @@ func (f killAndGofailSleep) Available(config e2e.EtcdProcessClusterConfig, membe
}
return memberFailpoints.Available(f.failpoint)
}
type gofailSleepAndDeactivate struct {
failpoint string
time time.Duration
}
func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name()))
err := member.Failpoints().SetupHTTP(ctx, f.failpoint, fmt.Sprintf(`sleep(%q)`, f.time))
if err != nil {
lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err)
}
time.Sleep(f.time)
lg.Info("Deactivating gofailpoint", zap.String("failpoint", f.Name()))
err = member.Failpoints().DeactivateHTTP(ctx, f.failpoint)
if err != nil {
lg.Info("goFailpoint deactivate failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err)
}
return nil
}
func (f gofailSleepAndDeactivate) Name() string {
return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time)
}
func (f gofailSleepAndDeactivate) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
memberFailpoints := member.Failpoints()
if memberFailpoints == nil {
return false
}
return memberFailpoints.Available(f.failpoint)
}