mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Improve output of the 'functional' tests
- Use go testing infrastructure, such that logs are printed only in case of failure. - Split the test into multiple sub-round and sub-round-testcases such that tree of execution reflects the logic. - Use 'development' loggers that are formatting the output to be human readable.
This commit is contained in:
parent
71934ff244
commit
ea8b9e6c18
@ -116,15 +116,6 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "github.com/etcd-io/gofail/runtime",
|
||||
"licenses": [
|
||||
{
|
||||
"type": "Apache License 2.0",
|
||||
"confidence": 1
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "github.com/form3tech-oss/jwt-go",
|
||||
"licenses": [
|
||||
|
25
build.sh
25
build.sh
@ -100,26 +100,11 @@ tools_build() {
|
||||
}
|
||||
|
||||
tests_build() {
|
||||
out="bin"
|
||||
if [[ -n "${BINDIR}" ]]; then out="${BINDIR}"; fi
|
||||
tools_path="
|
||||
functional/cmd/etcd-agent
|
||||
functional/cmd/etcd-proxy
|
||||
functional/cmd/etcd-runner
|
||||
functional/cmd/etcd-tester"
|
||||
(
|
||||
cd tests || exit 2
|
||||
for tool in ${tools_path}; do
|
||||
echo "Building" "'${tool}'"...
|
||||
run rm -f "../${out}/${tool}"
|
||||
|
||||
# shellcheck disable=SC2086
|
||||
run env CGO_ENABLED=0 GO_BUILD_FLAGS="${GO_BUILD_FLAGS}" go build ${GO_BUILD_FLAGS} \
|
||||
-installsuffix=cgo \
|
||||
"-ldflags='${GO_LDFLAGS[*]}'" \
|
||||
-o="../${out}/${tool}" "./${tool}" || return 2
|
||||
done
|
||||
) || return 2
|
||||
out=${BINDIR:-./bin}
|
||||
out=$(readlink -m "$out")
|
||||
out="${out}/functional/cmd"
|
||||
mkdir -p "${out}"
|
||||
BINDIR="${out}" run ./tests/functional/build.sh || return 2
|
||||
}
|
||||
|
||||
toggle_failpoints_default
|
||||
|
1
go.sum
1
go.sum
@ -76,7 +76,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
|
||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
|
||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE=
|
||||
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
|
||||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
|
||||
|
5
test.sh
5
test.sh
@ -142,7 +142,7 @@ function generic_checker {
|
||||
}
|
||||
|
||||
function functional_pass {
|
||||
run ./tests/functional/build
|
||||
run ./tests/functional/build.sh || exit 1
|
||||
|
||||
# Clean up any data and logs from previous runs
|
||||
rm -rf /tmp/etcd-functional-* /tmp/etcd-functional-*.backup
|
||||
@ -162,11 +162,12 @@ function functional_pass {
|
||||
done
|
||||
|
||||
log_callout "functional test START!"
|
||||
run ./bin/etcd-tester --config ./tests/functional/functional.yaml && log_success "'etcd-tester' succeeded"
|
||||
run ./bin/etcd-tester --config ./tests/functional/functional.yaml -test.v && log_success "'etcd-tester' succeeded"
|
||||
local etcd_tester_exit_code=$?
|
||||
|
||||
if [[ "${etcd_tester_exit_code}" -ne "0" ]]; then
|
||||
log_error "ETCD_TESTER_EXIT_CODE:" ${etcd_tester_exit_code}
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# shellcheck disable=SC2206
|
||||
|
@ -95,7 +95,7 @@ func (srv *Server) createEtcdLogFile() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (srv *Server) creatEtcd(fromSnapshot bool, failpoints string) error {
|
||||
func (srv *Server) createEtcd(fromSnapshot bool, failpoints string) error {
|
||||
if !fileutil.Exist(srv.Member.EtcdExec) {
|
||||
return fmt.Errorf("unknown etcd exec path %q does not exist", srv.Member.EtcdExec)
|
||||
}
|
||||
@ -143,6 +143,7 @@ func (srv *Server) runEtcd() error {
|
||||
srv.lg.Info(
|
||||
"started etcd command",
|
||||
zap.String("command-path", srv.etcdCmd.Path),
|
||||
zap.Strings("command-args", srv.etcdCmd.Args),
|
||||
zap.Errors("errors", []error{err, perr}),
|
||||
)
|
||||
if err != nil {
|
||||
@ -488,7 +489,7 @@ func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Respons
|
||||
if err = srv.saveTLSAssets(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = srv.creatEtcd(false, req.Member.Failpoints); err != nil {
|
||||
if err = srv.createEtcd(false, req.Member.Failpoints); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = srv.runEtcd(); err != nil {
|
||||
@ -517,7 +518,7 @@ func (srv *Server) handle_RESTART_ETCD(req *rpcpb.Request) (*rpcpb.Response, err
|
||||
if err = srv.saveTLSAssets(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = srv.creatEtcd(false, req.Member.Failpoints); err != nil {
|
||||
if err = srv.createEtcd(false, req.Member.Failpoints); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = srv.runEtcd(); err != nil {
|
||||
@ -619,7 +620,7 @@ func (srv *Server) handle_RESTART_FROM_SNAPSHOT(req *rpcpb.Request) (resp *rpcpb
|
||||
if err = srv.saveTLSAssets(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = srv.creatEtcd(true, req.Member.Failpoints); err != nil {
|
||||
if err = srv.createEtcd(true, req.Member.Failpoints); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = srv.runEtcd(); err != nil {
|
||||
|
@ -5,10 +5,12 @@ if ! [[ "$0" =~ "tests/functional/build" ]]; then
|
||||
exit 255
|
||||
fi
|
||||
|
||||
outdir="${BINDIR:-../bin}"
|
||||
|
||||
(
|
||||
cd ./tests
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-agent ./functional/cmd/etcd-agent
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-proxy ./functional/cmd/etcd-proxy
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-runner ./functional/cmd/etcd-runner
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-tester ./functional/cmd/etcd-tester
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o "${outdir}/etcd-agent" ./functional/cmd/etcd-agent
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o "${outdir}/etcd-proxy" ./functional/cmd/etcd-proxy
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o "${outdir}/etcd-runner" ./functional/cmd/etcd-runner
|
||||
CGO_ENABLED=0 go test -v -installsuffix cgo -ldflags "-s" -c -o "${outdir}/etcd-tester" ./functional/cmd/etcd-tester
|
||||
)
|
@ -19,28 +19,30 @@ import (
|
||||
"flag"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/functional/agent"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var logger *zap.Logger
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
logger, err = zap.NewProduction()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
network := flag.String("network", "tcp", "network to serve agent server")
|
||||
address := flag.String("address", "127.0.0.1:9027", "address to serve agent server")
|
||||
flag.Parse()
|
||||
|
||||
lcfg := zap.NewDevelopmentConfig()
|
||||
lcfg.Level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
|
||||
logger, err := lcfg.Build()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
logger = logger.Named("agent").With(zap.String("address", *address))
|
||||
|
||||
defer logger.Sync()
|
||||
|
||||
srv := agent.NewServer(logger, *network, *address)
|
||||
err := srv.StartServe()
|
||||
err = srv.StartServe()
|
||||
logger.Info("agent exiting", zap.Error(err))
|
||||
}
|
||||
|
@ -76,7 +76,15 @@ $ ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
|
||||
To: url.URL{Scheme: "tcp", Host: to},
|
||||
}
|
||||
if verbose {
|
||||
cfg.Logger = zap.NewExample()
|
||||
var err error
|
||||
cfg.Logger, err = zap.NewDevelopment()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
cfg.Logger = cfg.Logger.Named("proxy").With(
|
||||
zap.String("from", from),
|
||||
zap.String("to", to),
|
||||
zap.Int("port", httpPort))
|
||||
}
|
||||
p := proxy.NewServer(cfg)
|
||||
<-p.Ready()
|
||||
|
@ -17,44 +17,38 @@ package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"testing"
|
||||
|
||||
_ "github.com/etcd-io/gofail/runtime"
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
"go.etcd.io/etcd/tests/v3/functional/tester"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
var logger *zap.Logger
|
||||
var config = flag.String("config", "../../functional.yaml", "path to tester configuration")
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
logger, err = zap.NewProduction()
|
||||
func TestFunctional(t *testing.T) {
|
||||
testutil.SkipTestIfShortMode(t, "functional tests are skipped in --short mode")
|
||||
|
||||
lg := zaptest.NewLogger(t, zaptest.Level(zapcore.InfoLevel)).Named("tester")
|
||||
|
||||
clus, err := tester.NewCluster(lg, *config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
config := flag.String("config", "", "path to tester configuration")
|
||||
flag.Parse()
|
||||
|
||||
defer logger.Sync()
|
||||
|
||||
clus, err := tester.NewCluster(logger, *config)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create a cluster", zap.Error(err))
|
||||
t.Fatalf("failed to create a cluster: %v", err)
|
||||
}
|
||||
|
||||
err = clus.Send_INITIAL_START_ETCD()
|
||||
if err != nil {
|
||||
logger.Fatal("Bootstrap failed", zap.Error(err))
|
||||
t.Fatal("Bootstrap failed", zap.Error(err))
|
||||
}
|
||||
defer clus.Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
|
||||
|
||||
logger.Info("wait health after bootstrap")
|
||||
t.Log("wait health after bootstrap")
|
||||
err = clus.WaitHealth()
|
||||
if err != nil {
|
||||
logger.Fatal("WaitHealth failed", zap.Error(err))
|
||||
t.Fatal("WaitHealth failed", zap.Error(err))
|
||||
}
|
||||
|
||||
clus.Run()
|
||||
clus.Run(t)
|
||||
}
|
@ -175,7 +175,7 @@ tester-config:
|
||||
case-shuffle: true
|
||||
|
||||
# For full descriptions,
|
||||
# https://godoc.org/github.com/etcd-io/etcd/functional/rpcpb#Case
|
||||
# https://pkg.go.dev/go.etcd.io/etcd/tests/v3/functional/rpcpb#Case
|
||||
cases:
|
||||
- SIGTERM_ONE_FOLLOWER
|
||||
- SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
|
||||
|
@ -70,7 +70,7 @@ var dialOpts = []grpc.DialOption{
|
||||
grpc.WithBlock(),
|
||||
}
|
||||
|
||||
// NewCluster creates a client from a tester configuration.
|
||||
// NewCluster creates a cluster from a tester configuration.
|
||||
func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
|
||||
clus, err := read(lg, fpath)
|
||||
if err != nil {
|
||||
@ -83,22 +83,25 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
|
||||
clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
|
||||
clus.cases = make([]Case, 0)
|
||||
|
||||
lg.Info("creating members")
|
||||
for i, ap := range clus.Members {
|
||||
var err error
|
||||
clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("cannot dial agent %v: %w", ap.AgentAddr, err)
|
||||
}
|
||||
clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
|
||||
clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
|
||||
lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
|
||||
|
||||
clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
|
||||
lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
|
||||
}
|
||||
|
||||
lg.Info("agents configured.")
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", promhttp.Handler())
|
||||
if clus.Tester.EnablePprof {
|
||||
@ -112,6 +115,7 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
|
||||
ErrorLog: log.New(ioutil.Discard, "net/http", 0),
|
||||
}
|
||||
go clus.serveTesterServer()
|
||||
lg.Info("tester server started")
|
||||
|
||||
clus.rateLimiter = rate.NewLimiter(
|
||||
rate.Limit(int(clus.Tester.StressQPS)),
|
||||
@ -699,7 +703,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
|
||||
"compact error is ignored",
|
||||
zap.String("endpoint", m.EtcdClientEndpoint),
|
||||
zap.Int64("compact-revision", rev),
|
||||
zap.Error(cerr),
|
||||
zap.String("expected-error-msg", cerr.Error()),
|
||||
)
|
||||
} else {
|
||||
clus.lg.Warn(
|
||||
|
@ -17,6 +17,7 @@ package tester
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||
@ -30,7 +31,7 @@ import (
|
||||
const compactQPS = 50000
|
||||
|
||||
// Run starts tester.
|
||||
func (clus *Cluster) Run() {
|
||||
func (clus *Cluster) Run(t *testing.T) {
|
||||
defer printReport()
|
||||
|
||||
// updateCases must be executed after etcd is started, because the FAILPOINTS case
|
||||
@ -47,63 +48,12 @@ func (clus *Cluster) Run() {
|
||||
|
||||
var preModifiedKey int64
|
||||
for round := 0; round < int(clus.Tester.RoundLimit) || clus.Tester.RoundLimit == -1; round++ {
|
||||
roundTotalCounter.Inc()
|
||||
clus.rd = round
|
||||
t.Run(fmt.Sprintf("round:%v", round), func(t *testing.T) {
|
||||
preModifiedKey = clus.doRoundAndCompact(t, round, preModifiedKey)
|
||||
})
|
||||
|
||||
if err := clus.doRound(); err != nil {
|
||||
clus.lg.Error(
|
||||
"round FAIL",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.Error(err),
|
||||
)
|
||||
if clus.cleanup(err) != nil {
|
||||
return
|
||||
}
|
||||
// reset preModifiedKey after clean up
|
||||
preModifiedKey = 0
|
||||
continue
|
||||
}
|
||||
|
||||
// -1 so that logPrefix doesn't print out 'case'
|
||||
clus.cs = -1
|
||||
|
||||
revToCompact := max(0, clus.currentRevision-10000)
|
||||
currentModifiedKey := clus.stresser.ModifiedKeys()
|
||||
modifiedKey := currentModifiedKey - preModifiedKey
|
||||
preModifiedKey = currentModifiedKey
|
||||
timeout := 10 * time.Second
|
||||
timeout += time.Duration(modifiedKey/compactQPS) * time.Second
|
||||
clus.lg.Info(
|
||||
"compact START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.Duration("timeout", timeout),
|
||||
)
|
||||
if err := clus.compact(revToCompact, timeout); err != nil {
|
||||
clus.lg.Warn(
|
||||
"compact FAIL",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.Error(err),
|
||||
)
|
||||
if err = clus.cleanup(err); err != nil {
|
||||
clus.lg.Warn(
|
||||
"cleanup FAIL",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
// reset preModifiedKey after clean up
|
||||
preModifiedKey = 0
|
||||
}
|
||||
if round > 0 && round%500 == 0 { // every 500 rounds
|
||||
t.Logf("Defragmenting in round: %v", round)
|
||||
if err := clus.defrag(); err != nil {
|
||||
clus.failed(err)
|
||||
return
|
||||
@ -119,7 +69,66 @@ func (clus *Cluster) Run() {
|
||||
)
|
||||
}
|
||||
|
||||
func (clus *Cluster) doRound() error {
|
||||
func (clus *Cluster) doRoundAndCompact(t *testing.T, round int, preModifiedKey int64) (postModifiedKey int64) {
|
||||
roundTotalCounter.Inc()
|
||||
clus.rd = round
|
||||
|
||||
if err := clus.doRound(t); err != nil {
|
||||
clus.lg.Error(
|
||||
"round FAIL",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.Error(err),
|
||||
)
|
||||
if clus.cleanup(err) != nil {
|
||||
return
|
||||
}
|
||||
// reset preModifiedKey after clean up
|
||||
postModifiedKey = 0
|
||||
return
|
||||
}
|
||||
|
||||
// -1 so that logPrefix doesn't print out 'case'
|
||||
clus.cs = -1
|
||||
|
||||
revToCompact := max(0, clus.currentRevision-10000)
|
||||
currentModifiedKey := clus.stresser.ModifiedKeys()
|
||||
modifiedKey := currentModifiedKey - preModifiedKey
|
||||
timeout := 10 * time.Second
|
||||
timeout += time.Duration(modifiedKey/compactQPS) * time.Second
|
||||
clus.lg.Info(
|
||||
"compact START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.Duration("timeout", timeout),
|
||||
)
|
||||
if err := clus.compact(revToCompact, timeout); err != nil {
|
||||
clus.lg.Warn(
|
||||
"compact FAIL",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.Error(err),
|
||||
)
|
||||
if err = clus.cleanup(err); err != nil {
|
||||
clus.lg.Warn(
|
||||
"cleanup FAIL",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
// reset preModifiedKey after clean up
|
||||
return 0
|
||||
}
|
||||
return currentModifiedKey
|
||||
}
|
||||
|
||||
func (clus *Cluster) doRound(t *testing.T) error {
|
||||
if clus.Tester.CaseShuffle {
|
||||
clus.shuffleCases()
|
||||
}
|
||||
@ -128,137 +137,15 @@ func (clus *Cluster) doRound() error {
|
||||
clus.lg.Info(
|
||||
"round START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.Strings("cases", clus.listCases()),
|
||||
)
|
||||
for i, fa := range clus.cases {
|
||||
clus.cs = i
|
||||
|
||||
caseTotal[fa.Desc()]++
|
||||
caseTotalCounter.WithLabelValues(fa.Desc()).Inc()
|
||||
|
||||
caseNow := time.Now()
|
||||
clus.lg.Info(
|
||||
"case START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
|
||||
clus.lg.Info("wait health before injecting failures")
|
||||
if err := clus.WaitHealth(); err != nil {
|
||||
return fmt.Errorf("wait full health error: %v", err)
|
||||
}
|
||||
|
||||
stressStarted := false
|
||||
fcase := fa.TestCase()
|
||||
if fcase != rpcpb.Case_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
|
||||
clus.lg.Info(
|
||||
"stress START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := clus.stresser.Stress(); err != nil {
|
||||
return fmt.Errorf("start stresser error: %v", err)
|
||||
}
|
||||
stressStarted = true
|
||||
}
|
||||
|
||||
clus.lg.Info(
|
||||
"inject START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := fa.Inject(clus); err != nil {
|
||||
return fmt.Errorf("injection error: %v", err)
|
||||
}
|
||||
|
||||
// if run local, recovering server may conflict
|
||||
// with stressing client ports
|
||||
// TODO: use unix for local tests
|
||||
clus.lg.Info(
|
||||
"recover START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := fa.Recover(clus); err != nil {
|
||||
return fmt.Errorf("recovery error: %v", err)
|
||||
}
|
||||
|
||||
if stressStarted {
|
||||
clus.lg.Info(
|
||||
"stress PAUSE",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
ems := clus.stresser.Pause()
|
||||
if fcase == rpcpb.Case_NO_FAIL_WITH_STRESS && len(ems) > 0 {
|
||||
ess := make([]string, 0, len(ems))
|
||||
cnt := 0
|
||||
for k, v := range ems {
|
||||
ess = append(ess, fmt.Sprintf("%s (count: %d)", k, v))
|
||||
cnt += v
|
||||
}
|
||||
clus.lg.Warn(
|
||||
"expected no errors",
|
||||
zap.String("desc", fa.Desc()),
|
||||
zap.Strings("errors", ess),
|
||||
)
|
||||
|
||||
// with network delay, some ongoing requests may fail
|
||||
// only return error, if more than 30% of QPS requests fail
|
||||
if cnt > int(float64(clus.Tester.StressQPS)*0.3) {
|
||||
return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
clus.lg.Info(
|
||||
"health check START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := clus.WaitHealth(); err != nil {
|
||||
return fmt.Errorf("wait full health error: %v", err)
|
||||
}
|
||||
|
||||
checkerFailExceptions := []rpcpb.Checker{}
|
||||
switch fcase {
|
||||
case rpcpb.Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH:
|
||||
// TODO: restore from snapshot
|
||||
checkerFailExceptions = append(checkerFailExceptions, rpcpb.Checker_LEASE_EXPIRE)
|
||||
}
|
||||
|
||||
clus.lg.Info(
|
||||
"consistency check START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := clus.runCheckers(checkerFailExceptions...); err != nil {
|
||||
return fmt.Errorf("consistency check error (%v)", err)
|
||||
}
|
||||
clus.lg.Info(
|
||||
"consistency check PASS",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
zap.Duration("took", time.Since(caseNow)),
|
||||
)
|
||||
t.Run(fmt.Sprintf("%v_%s", i, fa.TestCase()),
|
||||
func(t *testing.T) {
|
||||
clus.doTestCase(t, fa)
|
||||
})
|
||||
}
|
||||
|
||||
clus.lg.Info(
|
||||
@ -271,6 +158,133 @@ func (clus *Cluster) doRound() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (clus *Cluster) doTestCase(t *testing.T, fa Case) {
|
||||
caseTotal[fa.Desc()]++
|
||||
caseTotalCounter.WithLabelValues(fa.Desc()).Inc()
|
||||
|
||||
caseNow := time.Now()
|
||||
clus.lg.Info(
|
||||
"case START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
|
||||
clus.lg.Info("wait health before injecting failures")
|
||||
if err := clus.WaitHealth(); err != nil {
|
||||
t.Fatalf("wait full health error: %v", err)
|
||||
}
|
||||
|
||||
stressStarted := false
|
||||
fcase := fa.TestCase()
|
||||
if fcase != rpcpb.Case_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
|
||||
clus.lg.Info(
|
||||
"stress START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := clus.stresser.Stress(); err != nil {
|
||||
t.Fatalf("start stresser error: %v", err)
|
||||
}
|
||||
stressStarted = true
|
||||
}
|
||||
|
||||
clus.lg.Info(
|
||||
"inject START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := fa.Inject(clus); err != nil {
|
||||
t.Fatalf("injection error: %v", err)
|
||||
}
|
||||
|
||||
// if run local, recovering server may conflict
|
||||
// with stressing client ports
|
||||
// TODO: use unix for local tests
|
||||
clus.lg.Info(
|
||||
"recover START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := fa.Recover(clus); err != nil {
|
||||
t.Fatalf("recovery error: %v", err)
|
||||
}
|
||||
|
||||
if stressStarted {
|
||||
clus.lg.Info(
|
||||
"stress PAUSE",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
ems := clus.stresser.Pause()
|
||||
if fcase == rpcpb.Case_NO_FAIL_WITH_STRESS && len(ems) > 0 {
|
||||
ess := make([]string, 0, len(ems))
|
||||
cnt := 0
|
||||
for k, v := range ems {
|
||||
ess = append(ess, fmt.Sprintf("%s (count: %d)", k, v))
|
||||
cnt += v
|
||||
}
|
||||
clus.lg.Warn(
|
||||
"expected no errors",
|
||||
zap.String("desc", fa.Desc()),
|
||||
zap.Strings("errors", ess),
|
||||
)
|
||||
|
||||
// with network delay, some ongoing requests may fail
|
||||
// only return error, if more than 30% of QPS requests fail
|
||||
if cnt > int(float64(clus.Tester.StressQPS)*0.3) {
|
||||
t.Fatalf("expected no error in %q, got %q", fcase.String(), ess)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
clus.lg.Info(
|
||||
"health check START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := clus.WaitHealth(); err != nil {
|
||||
t.Fatalf("wait full health error: %v", err)
|
||||
}
|
||||
|
||||
checkerFailExceptions := []rpcpb.Checker{}
|
||||
switch fcase {
|
||||
case rpcpb.Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH:
|
||||
// TODO: restore from snapshot
|
||||
checkerFailExceptions = append(checkerFailExceptions, rpcpb.Checker_LEASE_EXPIRE)
|
||||
}
|
||||
|
||||
clus.lg.Info(
|
||||
"consistency check START",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
)
|
||||
if err := clus.runCheckers(checkerFailExceptions...); err != nil {
|
||||
t.Fatalf("consistency check error (%v)", err)
|
||||
}
|
||||
clus.lg.Info(
|
||||
"consistency check PASS",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Int("case-total", len(clus.cases)),
|
||||
zap.String("desc", fa.Desc()),
|
||||
zap.Duration("took", time.Since(caseNow)),
|
||||
)
|
||||
}
|
||||
|
||||
func (clus *Cluster) updateRevision() error {
|
||||
revs, _, err := clus.getRevisionHash()
|
||||
for _, rev := range revs {
|
||||
|
@ -249,7 +249,7 @@ func (ls *leaseStresser) createAliveLeasesWithShortTTL() {
|
||||
)
|
||||
return
|
||||
}
|
||||
ls.lg.Warn("createAliveLeasesWithShortTTL", zap.Int64("lease-id", leaseID))
|
||||
ls.lg.Debug("createAliveLeasesWithShortTTL", zap.Int64("lease-id", leaseID))
|
||||
ls.alivedLeasesWithShortTTL.add(leaseID, time.Now())
|
||||
// keep track of all the keep lease alive goroutines
|
||||
ls.aliveWg.Add(1)
|
||||
|
@ -16,7 +16,6 @@ replace (
|
||||
|
||||
require (
|
||||
github.com/dustin/go-humanize v1.0.0
|
||||
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
|
@ -75,8 +75,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
|
||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
|
||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca h1:Y2I0lxOttdUKz+hNaIdG3FtjuQrTmwXun1opRV65IZc=
|
||||
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE=
|
||||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||
|
Loading…
x
Reference in New Issue
Block a user