mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #5830 from heyitsanthony/functest-failpoints
functional-tester: failpoint support
This commit is contained in:
commit
9405583745
@ -68,6 +68,7 @@ func newAgent(etcd, logDir string) (*Agent, error) {
|
||||
// start starts a new etcd process with the given args.
|
||||
func (a *Agent) start(args ...string) error {
|
||||
a.cmd = exec.Command(a.cmd.Path, args...)
|
||||
a.cmd.Env = []string{"GOFAIL_HTTP=:2381"}
|
||||
a.cmd.Stdout = a.logfile
|
||||
a.cmd.Stderr = a.logfile
|
||||
err := a.cmd.Start()
|
||||
@ -119,16 +120,7 @@ func stopWithSig(cmd *exec.Cmd, sig os.Signal) error {
|
||||
|
||||
// restart restarts the stopped etcd process.
|
||||
func (a *Agent) restart() error {
|
||||
a.cmd = exec.Command(a.cmd.Path, a.cmd.Args[1:]...)
|
||||
a.cmd.Stdout = a.logfile
|
||||
a.cmd.Stderr = a.logfile
|
||||
err := a.cmd.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.state = stateStarted
|
||||
return nil
|
||||
return a.start(a.cmd.Args[1:]...)
|
||||
}
|
||||
|
||||
func (a *Agent) cleanup() error {
|
||||
|
@ -27,7 +27,10 @@ import (
|
||||
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
|
||||
)
|
||||
|
||||
const peerURLPort = 2380
|
||||
const (
|
||||
peerURLPort = 2380
|
||||
failpointPort = 2381
|
||||
)
|
||||
|
||||
type cluster struct {
|
||||
v2Only bool // to be deprecated
|
||||
@ -75,11 +78,12 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
|
||||
return err
|
||||
}
|
||||
members[i] = &member{
|
||||
Agent: agent,
|
||||
Endpoint: u,
|
||||
Name: fmt.Sprintf("etcd-%d", i),
|
||||
ClientURL: fmt.Sprintf("http://%s:2379", host),
|
||||
PeerURL: fmt.Sprintf("http://%s:%d", host, peerURLPort),
|
||||
Agent: agent,
|
||||
Endpoint: u,
|
||||
Name: fmt.Sprintf("etcd-%d", i),
|
||||
ClientURL: fmt.Sprintf("http://%s:2379", host),
|
||||
PeerURL: fmt.Sprintf("http://%s:%d", host, peerURLPort),
|
||||
FailpointURL: fmt.Sprintf("http://%s:%d", host, failpointPort),
|
||||
}
|
||||
memberNameURLs[i] = members[i].ClusterEntry()
|
||||
}
|
||||
|
155
tools/functional-tester/etcd-tester/failpoint.go
Normal file
155
tools/functional-tester/etcd-tester/failpoint.go
Normal file
@ -0,0 +1,155 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type failpointStats struct {
|
||||
// crashes counts the number of crashes for a failpoint
|
||||
crashes map[string]int
|
||||
// mu protects crashes
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
var fpStats failpointStats
|
||||
|
||||
func failpointFailures(c *cluster) (ret []failure, err error) {
|
||||
var fps []string
|
||||
fps, err = failpointPaths(c.Members[0].FailpointURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// create failure objects for all failpoints
|
||||
for _, fp := range fps {
|
||||
if len(fp) == 0 {
|
||||
continue
|
||||
}
|
||||
fpFails := failuresFromFailpoint(fp)
|
||||
// wrap in delays so failpoint has time to trigger
|
||||
for i, fpf := range fpFails {
|
||||
if strings.Contains(fp, "Snap") {
|
||||
// hack to trigger snapshot failpoints
|
||||
fpFails[i] = &failureUntilSnapshot{fpf}
|
||||
} else {
|
||||
fpFails[i] = &failureDelay{fpf, 3 * time.Second}
|
||||
}
|
||||
}
|
||||
ret = append(ret, fpFails...)
|
||||
}
|
||||
fpStats.crashes = make(map[string]int)
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func failpointPaths(endpoint string) ([]string, error) {
|
||||
resp, err := http.Get(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, rerr := ioutil.ReadAll(resp.Body)
|
||||
if rerr != nil {
|
||||
return nil, rerr
|
||||
}
|
||||
var fps []string
|
||||
for _, l := range strings.Split(string(body), "\n") {
|
||||
fp := strings.Split(l, "=")[0]
|
||||
fps = append(fps, fp)
|
||||
}
|
||||
return fps, nil
|
||||
}
|
||||
|
||||
func failuresFromFailpoint(fp string) []failure {
|
||||
inject := makeInjectFailpoint(fp, `panic("etcd-tester")`)
|
||||
recov := makeRecoverFailpoint(fp)
|
||||
return []failure{
|
||||
&failureOne{
|
||||
description: description("failpoint " + fp + " panic one"),
|
||||
injectMember: inject,
|
||||
recoverMember: recov,
|
||||
},
|
||||
&failureAll{
|
||||
description: description("failpoint " + fp + " panic all"),
|
||||
injectMember: inject,
|
||||
recoverMember: recov,
|
||||
},
|
||||
&failureMajority{
|
||||
description: description("failpoint " + fp + " panic majority"),
|
||||
injectMember: inject,
|
||||
recoverMember: recov,
|
||||
},
|
||||
&failureLeader{
|
||||
failureByFunc{
|
||||
description: description("failpoint " + fp + " panic leader"),
|
||||
injectMember: inject,
|
||||
recoverMember: recov,
|
||||
},
|
||||
0,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func makeInjectFailpoint(fp, val string) injectMemberFunc {
|
||||
return func(m *member) (err error) {
|
||||
return putFailpoint(m.FailpointURL, fp, val)
|
||||
}
|
||||
}
|
||||
|
||||
func makeRecoverFailpoint(fp string) recoverMemberFunc {
|
||||
return func(m *member) error {
|
||||
if err := delFailpoint(m.FailpointURL, fp); err == nil {
|
||||
return nil
|
||||
}
|
||||
// node not responding, likely dead from fp panic; restart
|
||||
fpStats.mu.Lock()
|
||||
fpStats.crashes[fp]++
|
||||
fpStats.mu.Unlock()
|
||||
return recoverStop(m)
|
||||
}
|
||||
}
|
||||
|
||||
func putFailpoint(ep, fp, val string) error {
|
||||
req, _ := http.NewRequest(http.MethodPut, ep+"/"+fp, strings.NewReader(val))
|
||||
c := http.Client{}
|
||||
resp, err := c.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("failed to PUT %s=%s at %s (%v)", fp, val, ep, resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func delFailpoint(ep, fp string) error {
|
||||
req, _ := http.NewRequest(http.MethodDelete, ep+"/"+fp, strings.NewReader(""))
|
||||
c := http.Client{}
|
||||
resp, err := c.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("failed to DELETE %s at %s (%v)", fp, ep, resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
@ -52,8 +52,13 @@ type failureLeader struct {
|
||||
idx int
|
||||
}
|
||||
|
||||
// failureDelay injects a failure and waits for a snapshot event
|
||||
type failureDelay struct{ failure }
|
||||
type failureDelay struct {
|
||||
failure
|
||||
delayDuration time.Duration
|
||||
}
|
||||
|
||||
// failureUntilSnapshot injects a failure and waits for a snapshot event
|
||||
type failureUntilSnapshot struct{ failure }
|
||||
|
||||
func (f *failureOne) Inject(c *cluster, round int) error {
|
||||
return f.injectMember(c.Members[round%c.Size])
|
||||
@ -122,6 +127,14 @@ func (f *failureDelay) Inject(c *cluster, round int) error {
|
||||
if err := f.failure.Inject(c, round); err != nil {
|
||||
return err
|
||||
}
|
||||
time.Sleep(f.delayDuration)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *failureUntilSnapshot) Inject(c *cluster, round int) error {
|
||||
if err := f.failure.Inject(c, round); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.Size < 3 {
|
||||
return nil
|
||||
@ -144,7 +157,7 @@ func (f *failureDelay) Inject(c *cluster, round int) error {
|
||||
return fmt.Errorf("cluster too slow: only commit %d requests in %ds", end-start, retry)
|
||||
}
|
||||
|
||||
func (f *failureDelay) Desc() string {
|
||||
func (f *failureUntilSnapshot) Desc() string {
|
||||
return f.failure.Desc() + " for a long time and expect it to recover from an incoming snapshot"
|
||||
}
|
||||
|
||||
|
@ -71,11 +71,11 @@ func newFailureKillLeader() failure {
|
||||
}
|
||||
|
||||
func newFailureKillOneForLongTime() failure {
|
||||
return &failureDelay{newFailureKillOne()}
|
||||
return &failureUntilSnapshot{newFailureKillOne()}
|
||||
}
|
||||
|
||||
func newFailureKillLeaderForLongTime() failure {
|
||||
return &failureDelay{newFailureKillLeader()}
|
||||
return &failureUntilSnapshot{newFailureKillLeader()}
|
||||
}
|
||||
|
||||
func injectDropPort(m *member) error { return m.Agent.DropPort(peerURLPort) }
|
||||
|
@ -58,6 +58,14 @@ func main() {
|
||||
newFailureSlowNetworkAll(),
|
||||
}
|
||||
|
||||
// ensure cluster is fully booted to know failpoints are available
|
||||
c.WaitHealth()
|
||||
fpFailures, fperr := failpointFailures(c)
|
||||
if len(fpFailures) == 0 {
|
||||
plog.Infof("no failpoints found (%v)", fperr)
|
||||
}
|
||||
failures = append(failures, fpFailures...)
|
||||
|
||||
schedule := failures
|
||||
if schedCases != nil && *schedCases != "" {
|
||||
cases := strings.Split(*schedCases, " ")
|
||||
|
@ -29,11 +29,12 @@ import (
|
||||
)
|
||||
|
||||
type member struct {
|
||||
Agent client.Agent
|
||||
Endpoint string
|
||||
Name string
|
||||
ClientURL string
|
||||
PeerURL string
|
||||
Agent client.Agent
|
||||
Endpoint string
|
||||
Name string
|
||||
ClientURL string
|
||||
PeerURL string
|
||||
FailpointURL string
|
||||
}
|
||||
|
||||
func (m *member) ClusterEntry() string { return m.Name + "=" + m.PeerURL }
|
||||
|
Loading…
x
Reference in New Issue
Block a user