diff --git a/tests/framework/e2e/cluster_proxy.go b/tests/framework/e2e/cluster_proxy.go index 9adb4f91c..bc6c4593c 100644 --- a/tests/framework/e2e/cluster_proxy.go +++ b/tests/framework/e2e/cluster_proxy.go @@ -128,6 +128,10 @@ func (p *proxyEtcdProcess) PeerProxy() proxy.Server { return nil } +func (p *proxyEtcdProcess) Failpoints() *BinaryFailpoints { + return p.etcdProc.Failpoints() +} + type proxyProc struct { lg *zap.Logger name string diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 5a7e062c3..54fbc1d6d 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -15,8 +15,11 @@ package e2e import ( + "bytes" "context" "fmt" + "io" + "net/http" "net/url" "os" "strings" @@ -51,6 +54,7 @@ type EtcdProcess interface { Close() error Config() *EtcdServerProcessConfig PeerProxy() proxy.Server + Failpoints() *BinaryFailpoints Logs() LogsExpect Kill() error } @@ -62,10 +66,11 @@ type LogsExpect interface { } type EtcdServerProcess struct { - cfg *EtcdServerProcessConfig - proc *expect.ExpectProcess - proxy proxy.Server - donec chan struct{} // closed when Interact() terminates + cfg *EtcdServerProcessConfig + proc *expect.ExpectProcess + proxy proxy.Server + failpoints *BinaryFailpoints + donec chan struct{} // closed when Interact() terminates } type EtcdServerProcessConfig struct { @@ -101,7 +106,11 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err return nil, err } } - return &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil + ep := &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})} + if cfg.GoFailPort != 0 { + ep.failpoints = &BinaryFailpoints{member: ep} + } + return ep, nil } func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.ClientURL} } @@ -269,3 +278,72 @@ func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) { func (ep *EtcdServerProcess) PeerProxy() proxy.Server { return ep.proxy } + +func (ep *EtcdServerProcess) Failpoints() *BinaryFailpoints { + return ep.failpoints +} + +type BinaryFailpoints struct { + member EtcdProcess + availableCache map[string]struct{} +} + +func (f *BinaryFailpoints) Setup(ctx context.Context, failpoint, payload string) error { + host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().GoFailPort) + failpointUrl := url.URL{ + Scheme: "http", + Host: host, + Path: failpoint, + } + r, err := http.NewRequestWithContext(ctx, "PUT", failpointUrl.String(), bytes.NewBuffer([]byte(payload))) + if err != nil { + return err + } + resp, err := httpClient.Do(r) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("bad status code: %d", resp.StatusCode) + } + return nil +} + +var httpClient = http.Client{ + Timeout: 10 * time.Millisecond, +} + +func (f *BinaryFailpoints) Available() map[string]struct{} { + if f.availableCache == nil { + fs, err := fetchFailpoints(f.member) + if err != nil { + panic(err) + } + f.availableCache = fs + } + return f.availableCache +} + +func fetchFailpoints(member EtcdProcess) (map[string]struct{}, error) { + address := fmt.Sprintf("127.0.0.1:%d", member.Config().GoFailPort) + failpointUrl := url.URL{ + Scheme: "http", + Host: address, + } + resp, err := http.Get(failpointUrl.String()) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + text := strings.ReplaceAll(string(body), "=", "") + failpoints := map[string]struct{}{} + for _, f := range strings.Split(text, "\n") { + failpoints[f] = struct{}{} + } + return failpoints, nil +} diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go index ff3090670..46fd50800 100644 --- a/tests/linearizability/failpoints.go +++ b/tests/linearizability/failpoints.go @@ -15,12 +15,9 @@ package linearizability import ( - "bytes" "context" "fmt" "math/rand" - "net/http" - "net/url" "strings" "testing" "time" @@ -83,6 +80,7 @@ var ( type Failpoint interface { Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error Name() string + Available(e2e.EtcdProcess) bool } type killFailpoint struct{} @@ -114,6 +112,10 @@ func (f killFailpoint) Name() string { return "Kill" } +func (f killFailpoint) Available(e2e.EtcdProcess) bool { + return true +} + type goPanicFailpoint struct { failpoint string trigger func(ctx context.Context, member e2e.EtcdProcess) error @@ -129,13 +131,12 @@ const ( func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error { member := f.pickMember(t, clus) - address := fmt.Sprintf("127.0.0.1:%d", member.Config().GoFailPort) triggerCtx, cancel := context.WithTimeout(ctx, triggerTimeout) defer cancel() for member.IsRunning() { - err := setupGoFailpoint(triggerCtx, address, f.failpoint, "panic") + err := member.Failpoints().Setup(triggerCtx, f.failpoint, "panic") if err != nil { t.Logf("gofailpoint setup failed: %v", err) } @@ -169,25 +170,14 @@ func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster) } } -func setupGoFailpoint(ctx context.Context, host, failpoint, payload string) error { - failpointUrl := url.URL{ - Scheme: "http", - Host: host, - Path: failpoint, +func (f goPanicFailpoint) Available(member e2e.EtcdProcess) bool { + memberFailpoints := member.Failpoints() + if memberFailpoints == nil { + return false } - r, err := http.NewRequestWithContext(ctx, "PUT", failpointUrl.String(), bytes.NewBuffer([]byte(payload))) - if err != nil { - return err - } - resp, err := httpClient.Do(r) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusNoContent { - return fmt.Errorf("bad status code: %d", resp.StatusCode) - } - return nil + available := memberFailpoints.Available() + _, found := available[f.failpoint] + return found } func (f goPanicFailpoint) Name() string { @@ -234,16 +224,24 @@ func triggerCompact(ctx context.Context, member e2e.EtcdProcess) error { return nil } -var httpClient = http.Client{ - Timeout: 10 * time.Millisecond, -} - type randomFailpoint struct { failpoints []Failpoint } func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error { - failpoint := f.failpoints[rand.Int()%len(f.failpoints)] + availableFailpoints := make([]Failpoint, 0, len(f.failpoints)) + for _, failpoint := range f.failpoints { + count := 0 + for _, proc := range clus.Procs { + if failpoint.Available(proc) { + count++ + } + } + if count == len(clus.Procs) { + availableFailpoints = append(availableFailpoints, failpoint) + } + } + failpoint := availableFailpoints[rand.Int()%len(availableFailpoints)] t.Logf("Triggering %v failpoint\n", failpoint.Name()) return failpoint.Trigger(t, ctx, clus) } @@ -252,6 +250,10 @@ func (f randomFailpoint) Name() string { return "Random" } +func (f randomFailpoint) Available(e2e.EtcdProcess) bool { + return true +} + type blackholePeerNetworkFailpoint struct { duration time.Duration } @@ -274,6 +276,10 @@ func (f blackholePeerNetworkFailpoint) Name() string { return "blackhole" } +func (f blackholePeerNetworkFailpoint) Available(clus e2e.EtcdProcess) bool { + return clus.PeerProxy() != nil +} + type delayPeerNetworkFailpoint struct { duration time.Duration baseLatency time.Duration @@ -297,3 +303,7 @@ func (f delayPeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, cl func (f delayPeerNetworkFailpoint) Name() string { return "delay" } + +func (f delayPeerNetworkFailpoint) Available(clus e2e.EtcdProcess) bool { + return clus.PeerProxy() != nil +} diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 405ae21a1..7c7bf75a6 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -127,6 +127,11 @@ func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessC var err error successes := 0 failures := 0 + for _, proc := range clus.Procs { + if !config.failpoint.Available(proc) { + return fmt.Errorf("failpoint %q not available on %s", config.failpoint.Name(), proc.Config().Name) + } + } for successes < config.count && failures < config.retries { time.Sleep(config.waitBetweenTriggers) err = config.failpoint.Trigger(t, ctx, clus)