functional: add back, travis

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
Gyuho Lee 2019-08-14 04:31:12 -07:00
parent 0e96b34d9f
commit e16b21be7b
58 changed files with 1598 additions and 2920 deletions

View File

@ -19,6 +19,7 @@ env:
matrix:
- TARGET=linux-amd64-integration-1-cpu
- TARGET=linux-amd64-integration-4-cpu
- TARGET=linux-amd64-functional
- TARGET=linux-amd64-unit
- TARGET=linux-amd64-e2e
- TARGET=all-build
@ -43,6 +44,9 @@ script:
linux-amd64-integration-4-cpu)
GOARCH=amd64 CPU=4 PASSES='integration' ./test
;;
linux-amd64-functional)
GOARCH=amd64 PASSES='functional' ./test
;;
linux-amd64-unit)
GOARCH=amd64 PASSES='unit' ./test
;;

65
functional.yaml Normal file → Executable file
View File

@ -1,8 +1,9 @@
agent-configs:
- etcd-exec: ./bin/etcd
- etcd-exec-path: ./bin/etcd
agent-addr: 127.0.0.1:19027
failpoint-http-addr: http://127.0.0.1:7381
base-dir: /tmp/etcd-functional-1
etcd-log-path: /tmp/etcd-functional-1/etcd.log
etcd-client-proxy: false
etcd-peer-proxy: true
etcd-client-endpoint: 127.0.0.1:1379
@ -29,7 +30,7 @@ agent-configs:
initial-cluster: s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381
initial-cluster-state: new
initial-cluster-token: tkn
snapshot-count: 2000
snapshot-count: 10000
quota-backend-bytes: 10740000000 # 10 GiB
pre-vote: true
initial-corrupt-check: true
@ -47,10 +48,11 @@ agent-configs:
peer-trusted-ca-path: ""
snapshot-path: /tmp/etcd-functional-1.snapshot.db
- etcd-exec: ./bin/etcd
- etcd-exec-path: ./bin/etcd
agent-addr: 127.0.0.1:29027
failpoint-http-addr: http://127.0.0.1:7382
base-dir: /tmp/etcd-functional-2
etcd-log-path: /tmp/etcd-functional-2/etcd.log
etcd-client-proxy: false
etcd-peer-proxy: true
etcd-client-endpoint: 127.0.0.1:2379
@ -77,7 +79,7 @@ agent-configs:
initial-cluster: s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381
initial-cluster-state: new
initial-cluster-token: tkn
snapshot-count: 2000
snapshot-count: 10000
quota-backend-bytes: 10740000000 # 10 GiB
pre-vote: true
initial-corrupt-check: true
@ -95,10 +97,11 @@ agent-configs:
peer-trusted-ca-path: ""
snapshot-path: /tmp/etcd-functional-2.snapshot.db
- etcd-exec: ./bin/etcd
- etcd-exec-path: ./bin/etcd
agent-addr: 127.0.0.1:39027
failpoint-http-addr: http://127.0.0.1:7383
base-dir: /tmp/etcd-functional-3
etcd-log-path: /tmp/etcd-functional-3/etcd.log
etcd-client-proxy: false
etcd-peer-proxy: true
etcd-client-endpoint: 127.0.0.1:3379
@ -125,7 +128,7 @@ agent-configs:
initial-cluster: s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381
initial-cluster-state: new
initial-cluster-token: tkn
snapshot-count: 2000
snapshot-count: 10000
quota-backend-bytes: 10740000000 # 10 GiB
pre-vote: true
initial-corrupt-check: true
@ -160,7 +163,7 @@ tester-config:
case-shuffle: true
# For full descriptions,
# https://godoc.org/github.com/etcd-io/etcd/functional/rpcpb#Case
# https://godoc.org/github.com/coreos/etcd/functional/rpcpb#Case
cases:
- SIGTERM_ONE_FOLLOWER
- SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
@ -170,62 +173,24 @@ tester-config:
- SIGTERM_ALL
- SIGQUIT_AND_REMOVE_ONE_FOLLOWER
- SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
- BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER
- BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
- BLACKHOLE_PEER_PORT_TX_RX_LEADER
- BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
- BLACKHOLE_PEER_PORT_TX_RX_QUORUM
- BLACKHOLE_PEER_PORT_TX_RX_ALL
- DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
- DELAY_PEER_PORT_TX_RX_LEADER
- RANDOM_DELAY_PEER_PORT_TX_RX_LEADER
- DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
- RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
- DELAY_PEER_PORT_TX_RX_QUORUM
- RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM
- DELAY_PEER_PORT_TX_RX_ALL
- RANDOM_DELAY_PEER_PORT_TX_RX_ALL
- NO_FAIL_WITH_STRESS
- NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS
# TODO: use iptables for discarding outbound rafthttp traffic to peer port
# - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER
# - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
# - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
# - RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
# - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
# - RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
# - SIGQUIT_AND_REMOVE_LEADER
# - SIGQUIT_AND_REMOVE_LEADER_UNTIL_TRIGGER_SNAPSHOT
# - SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH
failpoint-commands:
- panic("etcd-tester")
# - panic("etcd-tester"),1*sleep(1000)
runner-exec-path: ./bin/etcd-runner
external-exec-path: ""
# make up ±70% of workloads with writes
stressers:
- type: KV_WRITE_SMALL
weight: 0.35
- type: KV_WRITE_LARGE
weight: 0.002
- type: KV_READ_ONE_KEY
weight: 0.07
- type: KV_READ_RANGE
weight: 0.07
- type: KV_DELETE_ONE_KEY
weight: 0.07
- type: KV_DELETE_RANGE
weight: 0.07
- type: KV_TXN_WRITE_DELETE
weight: 0.35
- type: LEASE
weight: 0.0
# - ELECTION_RUNNER
# - WATCH_RUNNER
# - LOCK_RACER_RUNNER
# - LEASE_RUNNER
- KV
- LEASE
checkers:
- KV_HASH

0
functional/Dockerfile Normal file → Executable file
View File

0
functional/Procfile-proxy Normal file → Executable file
View File

12
functional/README.md Normal file → Executable file
View File

@ -4,7 +4,7 @@
See [`rpcpb.Case`](https://godoc.org/github.com/coreos/etcd/functional/rpcpb#Case) for all failure cases.
See [functional.yaml](https://github.com/etcd-io/etcd/blob/master/functional.yaml) for an example configuration.
See [functional.yaml](https://github.com/coreos/etcd/blob/master/functional.yaml) for an example configuration.
### Run locally
@ -16,7 +16,7 @@ PASSES=functional ./test
```bash
pushd ..
make build-docker-functional push-docker-functional pull-docker-functional
make build-docker-functional
popd
```
@ -24,12 +24,12 @@ And run [example scripts](./scripts).
```bash
# run 3 agents for 3-node local etcd cluster
./functional/scripts/docker-local-agent.sh 1
./functional/scripts/docker-local-agent.sh 2
./functional/scripts/docker-local-agent.sh 3
./scripts/docker-local-agent.sh 1
./scripts/docker-local-agent.sh 2
./scripts/docker-local-agent.sh 3
# to run only 1 tester round
./functional/scripts/docker-local-tester.sh
./scripts/docker-local-tester.sh
```
## etcd Proxy

0
functional/agent/doc.go Normal file → Executable file
View File

337
functional/agent/handler.go Normal file → Executable file
View File

@ -70,13 +70,13 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
case rpcpb.Operation_BLACKHOLE_PEER_PORT_TX_RX:
return srv.handle_BLACKHOLE_PEER_PORT_TX_RX(), nil
return srv.handle_BLACKHOLE_PEER_PORT_TX_RX()
case rpcpb.Operation_UNBLACKHOLE_PEER_PORT_TX_RX:
return srv.handle_UNBLACKHOLE_PEER_PORT_TX_RX(), nil
return srv.handle_UNBLACKHOLE_PEER_PORT_TX_RX()
case rpcpb.Operation_DELAY_PEER_PORT_TX_RX:
return srv.handle_DELAY_PEER_PORT_TX_RX(), nil
return srv.handle_DELAY_PEER_PORT_TX_RX()
case rpcpb.Operation_UNDELAY_PEER_PORT_TX_RX:
return srv.handle_UNDELAY_PEER_PORT_TX_RX(), nil
return srv.handle_UNDELAY_PEER_PORT_TX_RX()
default:
msg := fmt.Sprintf("operation not found (%v)", req.Operation)
@ -84,125 +84,50 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
}
}
// just archive the first file
func (srv *Server) createEtcdLogFile() error {
var err error
srv.etcdLogFile, err = os.Create(srv.Member.Etcd.LogOutputs[0])
func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
if srv.last != rpcpb.Operation_NOT_STARTED {
return &rpcpb.Response{
Success: false,
Status: fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()),
Member: req.Member,
}, nil
}
err := fileutil.TouchDirAll(srv.Member.BaseDir)
if err != nil {
return err
return nil, err
}
srv.lg.Info("created etcd log file", zap.String("path", srv.Member.Etcd.LogOutputs[0]))
return nil
}
srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
func (srv *Server) creatEtcd(fromSnapshot bool) error {
if !fileutil.Exist(srv.Member.EtcdExec) {
return fmt.Errorf("unknown etcd exec path %q does not exist", srv.Member.EtcdExec)
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
}
etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags()
if fromSnapshot {
etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
srv.creatEtcdCmd(false)
if err = srv.saveTLSAssets(); err != nil {
return nil, err
}
u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
srv.lg.Info(
"creating etcd command",
zap.String("etcd-exec", etcdPath),
zap.Strings("etcd-flags", etcdFlags),
zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
zap.String("failpoint-addr", u.Host),
)
srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
srv.etcdCmd.Stdout = srv.etcdLogFile
srv.etcdCmd.Stderr = srv.etcdLogFile
return nil
}
// start but do not wait for it to complete
func (srv *Server) runEtcd() error {
errc := make(chan error)
go func() {
time.Sleep(5 * time.Second)
// server advertise client/peer listener had to start first
// before setting up proxy listener
errc <- srv.startProxy()
}()
if srv.etcdCmd != nil {
srv.lg.Info(
"starting etcd command",
zap.String("command-path", srv.etcdCmd.Path),
)
err := srv.etcdCmd.Start()
perr := <-errc
srv.lg.Info(
"started etcd command",
zap.String("command-path", srv.etcdCmd.Path),
zap.Errors("errors", []error{err, perr}),
)
if err != nil {
return err
}
return perr
if err = srv.startEtcdCmd(); err != nil {
return nil, err
}
srv.lg.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path))
if err = srv.loadAutoTLSAssets(); err != nil {
return nil, err
}
select {
case <-srv.etcdServer.Server.ReadyNotify():
srv.lg.Info("embedded etcd is ready")
case <-time.After(time.Minute):
srv.etcdServer.Close()
return fmt.Errorf("took too long to start %v", <-srv.etcdServer.Err())
}
return <-errc
}
// SIGQUIT to exit with stackstrace
func (srv *Server) stopEtcd(sig os.Signal) error {
srv.stopProxy()
if srv.etcdCmd != nil {
srv.lg.Info(
"stopping etcd command",
zap.String("command-path", srv.etcdCmd.Path),
zap.String("signal", sig.String()),
)
err := srv.etcdCmd.Process.Signal(sig)
if err != nil {
return err
}
errc := make(chan error)
go func() {
_, ew := srv.etcdCmd.Process.Wait()
errc <- ew
close(errc)
}()
select {
case <-time.After(5 * time.Second):
srv.etcdCmd.Process.Kill()
case e := <-errc:
return e
}
err = <-errc
srv.lg.Info(
"stopped etcd command",
zap.String("command-path", srv.etcdCmd.Path),
zap.String("signal", sig.String()),
zap.Error(err),
)
return err
// wait some time for etcd listener start
// before setting up proxy
time.Sleep(time.Second)
if err = srv.startProxy(); err != nil {
return nil, err
}
srv.lg.Info("stopping embedded etcd")
srv.etcdServer.Server.HardStop()
srv.etcdServer.Close()
srv.lg.Info("stopped embedded etcd")
return nil
return &rpcpb.Response{
Success: true,
Status: "start etcd PASS",
Member: srv.Member,
}, nil
}
func (srv *Server) startProxy() error {
@ -216,7 +141,6 @@ func (srv *Server) startProxy() error {
return err
}
srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String()))
srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{
Logger: srv.lg,
From: *advertiseClientURL,
@ -240,7 +164,6 @@ func (srv *Server) startProxy() error {
return err
}
srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{
Logger: srv.lg,
From: *advertisePeerURL,
@ -299,6 +222,34 @@ func (srv *Server) stopProxy() {
}
}
func (srv *Server) createEtcdLogFile() error {
var err error
srv.etcdLogFile, err = os.Create(srv.Member.EtcdLogPath)
if err != nil {
return err
}
srv.lg.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath))
return nil
}
func (srv *Server) creatEtcdCmd(fromSnapshot bool) {
etcdPath, etcdFlags := srv.Member.EtcdExecPath, srv.Member.Etcd.Flags()
if fromSnapshot {
etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
}
u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
srv.lg.Info("creating etcd command",
zap.String("etcd-exec-path", etcdPath),
zap.Strings("etcd-flags", etcdFlags),
zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
zap.String("failpoint-addr", u.Host),
)
srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
srv.etcdCmd.Stdout = srv.etcdLogFile
srv.etcdCmd.Stderr = srv.etcdLogFile
}
// if started with manual TLS, stores TLS assets
// from tester/client to disk before starting etcd process
func (srv *Server) saveTLSAssets() error {
@ -371,6 +322,7 @@ func (srv *Server) saveTLSAssets() error {
zap.String("client-trusted-ca", srv.Member.ClientTrustedCAPath),
)
}
return nil
}
@ -460,45 +412,9 @@ func (srv *Server) loadAutoTLSAssets() error {
return nil
}
func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
if srv.last != rpcpb.Operation_NOT_STARTED {
return &rpcpb.Response{
Success: false,
Status: fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()),
Member: req.Member,
}, nil
}
err := fileutil.TouchDirAll(srv.Member.BaseDir)
if err != nil {
return nil, err
}
srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
if srv.etcdServer == nil {
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
}
}
if err = srv.saveTLSAssets(); err != nil {
return nil, err
}
if err = srv.creatEtcd(false); err != nil {
return nil, err
}
if err = srv.runEtcd(); err != nil {
return nil, err
}
if err = srv.loadAutoTLSAssets(); err != nil {
return nil, err
}
return &rpcpb.Response{
Success: true,
Status: "start etcd PASS",
Member: srv.Member,
}, nil
// start but do not wait for it to complete
func (srv *Server) startEtcdCmd() error {
return srv.etcdCmd.Start()
}
func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
@ -510,19 +426,28 @@ func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
}
}
srv.creatEtcdCmd(false)
if err = srv.saveTLSAssets(); err != nil {
return nil, err
}
if err = srv.creatEtcd(false); err != nil {
return nil, err
}
if err = srv.runEtcd(); err != nil {
if err = srv.startEtcdCmd(); err != nil {
return nil, err
}
srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path))
if err = srv.loadAutoTLSAssets(); err != nil {
return nil, err
}
// wait some time for etcd listener start
// before setting up proxy
// TODO: local tests should handle port conflicts
// with clients on restart
time.Sleep(time.Second)
if err = srv.startProxy(); err != nil {
return nil, err
}
return &rpcpb.Response{
Success: true,
Status: "restart etcd PASS",
@ -531,15 +456,13 @@ func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
}
func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
if err := srv.stopEtcd(syscall.SIGTERM); err != nil {
srv.stopProxy()
err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
if err != nil {
return nil, err
}
if srv.etcdServer != nil {
// srv.etcdServer.GetLogger().Sync()
} else {
srv.etcdLogFile.Sync()
}
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))
return &rpcpb.Response{
Success: true,
@ -548,17 +471,16 @@ func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
}
func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error) {
err := srv.stopEtcd(syscall.SIGQUIT)
srv.stopProxy()
err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
if err != nil {
return nil, err
}
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
if srv.etcdServer != nil {
// srv.etcdServer.GetLogger().Sync()
} else {
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
}
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
// for debugging purposes, rename instead of removing
if err = os.RemoveAll(srv.Member.BaseDir + ".backup"); err != nil {
@ -580,6 +502,9 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error
return nil, err
}
}
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
}
return &rpcpb.Response{
Success: true,
@ -612,19 +537,28 @@ func (srv *Server) handle_RESTORE_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response,
}
func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) {
srv.creatEtcdCmd(true)
if err = srv.saveTLSAssets(); err != nil {
return nil, err
}
if err = srv.creatEtcd(true); err != nil {
return nil, err
}
if err = srv.runEtcd(); err != nil {
if err = srv.startEtcdCmd(); err != nil {
return nil, err
}
srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path))
if err = srv.loadAutoTLSAssets(); err != nil {
return nil, err
}
// wait some time for etcd listener start
// before setting up proxy
// TODO: local tests should handle port conflicts
// with clients on restart
time.Sleep(time.Second)
if err = srv.startProxy(); err != nil {
return nil, err
}
return &rpcpb.Response{
Success: true,
Status: "restarted etcd from snapshot",
@ -633,32 +567,30 @@ func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err err
}
func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, error) {
err := srv.stopEtcd(syscall.SIGQUIT)
srv.stopProxy()
// exit with stackstrace
err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
if err != nil {
return nil, err
}
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
if srv.etcdServer != nil {
// srv.etcdServer.GetLogger().Sync()
} else {
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
}
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
// TODO: support separate WAL directory
if err = archive(
srv.Member.BaseDir,
srv.Member.Etcd.LogOutputs[0],
srv.Member.EtcdLogPath,
srv.Member.Etcd.DataDir,
); err != nil {
return nil, err
}
srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir))
if srv.etcdServer == nil {
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
}
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
}
srv.lg.Info("cleaning up page cache")
@ -675,17 +607,16 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, erro
// stop proxy, etcd, delete data directory
func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.Response, error) {
err := srv.stopEtcd(syscall.SIGQUIT)
srv.stopProxy()
err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
if err != nil {
return nil, err
}
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
if srv.etcdServer != nil {
// srv.etcdServer.GetLogger().Sync()
} else {
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
}
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
err = os.RemoveAll(srv.Member.BaseDir)
if err != nil {
@ -702,7 +633,7 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.
}, nil
}
func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response {
func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy {
srv.lg.Info("blackholing", zap.Int("peer-port", port))
px.BlackholeTx()
@ -712,10 +643,10 @@ func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response {
return &rpcpb.Response{
Success: true,
Status: "blackholed peer port tx/rx",
}
}, nil
}
func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response {
func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy {
srv.lg.Info("unblackholing", zap.Int("peer-port", port))
px.UnblackholeTx()
@ -725,10 +656,10 @@ func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response {
return &rpcpb.Response{
Success: true,
Status: "unblackholed peer port tx/rx",
}
}, nil
}
func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() *rpcpb.Response {
func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
lat := time.Duration(srv.Tester.UpdatedDelayLatencyMs) * time.Millisecond
rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond
@ -750,10 +681,10 @@ func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() *rpcpb.Response {
return &rpcpb.Response{
Success: true,
Status: "delayed peer port tx/rx",
}
}, nil
}
func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() *rpcpb.Response {
func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy {
srv.lg.Info("undelaying", zap.Int("peer-port", port))
px.UndelayTx()
@ -763,5 +694,5 @@ func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() *rpcpb.Response {
return &rpcpb.Response{
Success: true,
Status: "undelayed peer port tx/rx",
}
}, nil
}

24
functional/agent/server.go Normal file → Executable file
View File

@ -21,7 +21,6 @@ import (
"os/exec"
"strings"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/functional/rpcpb"
"github.com/coreos/etcd/pkg/proxy"
@ -34,9 +33,8 @@ import (
// no need to lock fields since request operations are
// serialized in tester-side
type Server struct {
lg *zap.Logger
grpcServer *grpc.Server
lg *zap.Logger
network string
address string
@ -48,7 +46,6 @@ type Server struct {
*rpcpb.Member
*rpcpb.Tester
etcdServer *embed.Etcd
etcdCmd *exec.Cmd
etcdLogFile *os.File
@ -64,10 +61,10 @@ func NewServer(
address string,
) *Server {
return &Server{
lg: lg,
network: network,
address: address,
last: rpcpb.Operation_NOT_STARTED,
lg: lg,
network: network,
address: address,
last: rpcpb.Operation_NOT_STARTED,
advertiseClientPortToProxy: make(map[int]proxy.Server),
advertisePeerPortToProxy: make(map[int]proxy.Server),
}
@ -126,12 +123,11 @@ func (srv *Server) Stop() {
}
// Transport communicates with etcd tester.
func (srv *Server) Transport(stream rpcpb.Transport_TransportServer) (reterr error) {
errc := make(chan error, 1)
func (srv *Server) Transport(stream rpcpb.Transport_TransportServer) (err error) {
errc := make(chan error)
go func() {
for {
var req *rpcpb.Request
var err error
req, err = stream.Recv()
if err != nil {
errc <- err
@ -162,9 +158,9 @@ func (srv *Server) Transport(stream rpcpb.Transport_TransportServer) (reterr err
}()
select {
case reterr = <-errc:
case err = <-errc:
case <-stream.Context().Done():
reterr = stream.Context().Err()
err = stream.Context().Err()
}
return reterr
return err
}

30
functional/agent/utils.go Normal file → Executable file
View File

@ -15,7 +15,6 @@
package agent
import (
"io"
"net"
"net/url"
"os"
@ -37,8 +36,7 @@ func archive(baseDir, etcdLogPath, dataDir string) error {
return err
}
dst := filepath.Join(dir, "etcd.log")
if err := copyFile(etcdLogPath, dst); err != nil {
if err := os.Rename(etcdLogPath, filepath.Join(dir, "etcd.log")); err != nil {
if !os.IsNotExist(err) {
return err
}
@ -81,23 +79,27 @@ func getURLAndPort(addr string) (urlAddr *url.URL, port int, err error) {
return urlAddr, port, err
}
func copyFile(src, dst string) error {
f, err := os.Open(src)
func stopWithSig(cmd *exec.Cmd, sig os.Signal) error {
err := cmd.Process.Signal(sig)
if err != nil {
return err
}
defer f.Close()
w, err := os.Create(dst)
if err != nil {
return err
}
defer w.Close()
errc := make(chan error)
go func() {
_, ew := cmd.Process.Wait()
errc <- ew
close(errc)
}()
if _, err = io.Copy(w, f); err != nil {
return err
select {
case <-time.After(5 * time.Second):
cmd.Process.Kill()
case e := <-errc:
return e
}
return w.Sync()
err = <-errc
return err
}
func cleanPageCache() error {

0
functional/agent/utils_test.go Normal file → Executable file
View File

0
functional/cmd/etcd-agent/main.go Normal file → Executable file
View File

11
functional/cmd/etcd-proxy/main.go Normal file → Executable file
View File

@ -19,8 +19,6 @@ import (
"context"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
@ -64,8 +62,8 @@ $ make build-etcd-proxy
$ ./bin/etcd-proxy --help
$ ./bin/etcd-proxy --from localhost:23790 --to localhost:2379 --http-port 2378 --verbose
$ ./bin/etcdctl --endpoints localhost:2379 put foo bar
$ ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
$ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:2379 put foo bar
$ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
flag.PrintDefaults()
}
@ -193,9 +191,8 @@ $ ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
}
})
srv := &http.Server{
Addr: fmt.Sprintf(":%d", httpPort),
Handler: mux,
ErrorLog: log.New(ioutil.Discard, "net/http", 0),
Addr: fmt.Sprintf(":%d", httpPort),
Handler: mux,
}
defer srv.Close()

0
functional/cmd/etcd-runner/main.go Normal file → Executable file
View File

0
functional/cmd/etcd-tester/main.go Normal file → Executable file
View File

14
functional/rpcpb/etcd_config.go Normal file → Executable file
View File

@ -50,12 +50,15 @@ var etcdFields = []string{
"SnapshotCount",
"QuotaBackendBytes",
// "PreVote",
// "InitialCorruptCheck",
}
// Flags returns etcd flags in string slice.
func (e *Etcd) Flags() (fs []string) {
tp := reflect.TypeOf(*e)
vo := reflect.ValueOf(*e)
func (cfg *Etcd) Flags() (fs []string) {
tp := reflect.TypeOf(*cfg)
vo := reflect.ValueOf(*cfg)
for _, name := range etcdFields {
field, ok := tp.FieldByName(name)
if !ok {
@ -83,6 +86,11 @@ func (e *Etcd) Flags() (fs []string) {
fname := field.Tag.Get("yaml")
// not supported in old etcd
if fname == "pre-vote" || fname == "initial-corrupt-check" {
continue
}
if sv != "" {
fs = append(fs, fmt.Sprintf("--%s=%s", fname, sv))
}

27
functional/rpcpb/etcd_config_test.go Normal file → Executable file
View File

@ -19,11 +19,11 @@ import (
"testing"
)
func TestEtcd(t *testing.T) {
e := &Etcd{
func TestEtcdFlags(t *testing.T) {
cfg := &Etcd{
Name: "s1",
DataDir: "/tmp/etcd-functionl-1/etcd.data",
WALDir: "/tmp/etcd-functionl-1/etcd.data/member/wal",
DataDir: "/tmp/etcd-agent-data-1/etcd.data",
WALDir: "/tmp/etcd-agent-data-1/etcd.data/member/wal",
HeartbeatIntervalMs: 100,
ElectionTimeoutMs: 1000,
@ -53,16 +53,12 @@ func TestEtcd(t *testing.T) {
PreVote: true,
InitialCorruptCheck: true,
Logger: "zap",
LogOutputs: []string{"/tmp/etcd-functional-1/etcd.log"},
LogLevel: "info",
}
exps := []string{
exp := []string{
"--name=s1",
"--data-dir=/tmp/etcd-functionl-1/etcd.data",
"--wal-dir=/tmp/etcd-functionl-1/etcd.data/member/wal",
"--data-dir=/tmp/etcd-agent-data-1/etcd.data",
"--wal-dir=/tmp/etcd-agent-data-1/etcd.data/member/wal",
"--heartbeat-interval=100",
"--election-timeout=1000",
"--listen-client-urls=https://127.0.0.1:1379",
@ -80,12 +76,9 @@ func TestEtcd(t *testing.T) {
"--quota-backend-bytes=10740000000",
"--pre-vote=true",
"--experimental-initial-corrupt-check=true",
"--logger=zap",
"--log-outputs=/tmp/etcd-functional-1/etcd.log",
"--log-level=info",
}
fs := e.Flags()
if !reflect.DeepEqual(exps, fs) {
t.Fatalf("expected %q, got %q", exps, fs)
fs := cfg.Flags()
if !reflect.DeepEqual(exp, fs) {
t.Fatalf("expected %q, got %q", exp, fs)
}
}

12
functional/rpcpb/member.go Normal file → Executable file
View File

@ -23,10 +23,9 @@ import (
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/snapshot"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/logutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/snapshot"
"github.com/dustin/go-humanize"
"go.uber.org/zap"
@ -95,19 +94,10 @@ func (m *Member) CreateEtcdClientConfig(opts ...grpc.DialOption) (cfg *clientv3.
}
}
// TODO: make this configurable
level := "error"
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
level = "debug"
}
lcfg := logutil.DefaultZapLoggerConfig
lcfg.Level = zap.NewAtomicLevelAt(logutil.ConvertToZapLevel(level))
cfg = &clientv3.Config{
Endpoints: []string{m.EtcdClientEndpoint},
DialTimeout: 10 * time.Second,
DialOptions: opts,
LogConfig: &lcfg,
}
if secure {
// assume save TLS assets are already stord on disk

3217
functional/rpcpb/rpc.pb.go Normal file → Executable file

File diff suppressed because it is too large Load Diff

54
functional/rpcpb/rpc.proto Normal file → Executable file
View File

@ -45,8 +45,9 @@ service Transport {
}
message Member {
// EtcdExec is the executable etcd binary path in agent server.
string EtcdExec = 1 [(gogoproto.moretags) = "yaml:\"etcd-exec\""];
// EtcdExecPath is the executable etcd binary path in agent server.
string EtcdExecPath = 1 [(gogoproto.moretags) = "yaml:\"etcd-exec-path\""];
// TODO: support embedded etcd
// AgentAddr is the agent HTTP server address.
string AgentAddr = 11 [(gogoproto.moretags) = "yaml:\"agent-addr\""];
@ -55,6 +56,8 @@ message Member {
// BaseDir is the base directory where all logs and etcd data are stored.
string BaseDir = 101 [(gogoproto.moretags) = "yaml:\"base-dir\""];
// EtcdLogPath is the log file to store current etcd server logs.
string EtcdLogPath = 102 [(gogoproto.moretags) = "yaml:\"etcd-log-path\""];
// EtcdClientProxy is true when client traffic needs to be proxied.
// If true, listen client URL port must be different than advertise client URL port.
@ -138,7 +141,7 @@ message Tester {
// Stressers is the list of stresser types:
// KV, LEASE, ELECTION_RUNNER, WATCH_RUNNER, LOCK_RACER_RUNNER, LEASE_RUNNER.
repeated Stresser Stressers = 101 [(gogoproto.moretags) = "yaml:\"stressers\""];
repeated string Stressers = 101 [(gogoproto.moretags) = "yaml:\"stressers\""];
// Checkers is the list of consistency checker types:
// KV_HASH, LEASE_EXPIRE, NO_CHECK, RUNNER.
// Leave empty to skip consistency checks.
@ -164,35 +167,6 @@ message Tester {
int32 StressQPS = 302 [(gogoproto.moretags) = "yaml:\"stress-qps\""];
}
enum StresserType {
KV_WRITE_SMALL = 0;
KV_WRITE_LARGE = 1;
KV_READ_ONE_KEY = 2;
KV_READ_RANGE = 3;
KV_DELETE_ONE_KEY = 4;
KV_DELETE_RANGE = 5;
KV_TXN_WRITE_DELETE = 6;
LEASE = 10;
ELECTION_RUNNER = 20;
WATCH_RUNNER = 31;
LOCK_RACER_RUNNER = 41;
LEASE_RUNNER = 51;
}
message Stresser {
string Type = 1 [(gogoproto.moretags) = "yaml:\"type\""];
double Weight = 2 [(gogoproto.moretags) = "yaml:\"weight\""];
}
enum Checker {
KV_HASH = 0;
LEASE_EXPIRE = 1;
RUNNER = 2;
NO_CHECK = 3;
}
message Etcd {
string Name = 1 [(gogoproto.moretags) = "yaml:\"name\""];
string DataDir = 2 [(gogoproto.moretags) = "yaml:\"data-dir\""];
@ -620,3 +594,19 @@ enum Case {
// EXTERNAL runs external failure injection scripts.
EXTERNAL = 500;
}
enum Stresser {
KV = 0;
LEASE = 1;
ELECTION_RUNNER = 2;
WATCH_RUNNER = 3;
LOCK_RACER_RUNNER = 4;
LEASE_RUNNER = 5;
}
enum Checker {
KV_HASH = 0;
LEASE_EXPIRE = 1;
RUNNER = 2;
NO_CHECK = 3;
}

0
functional/runner/election_command.go Normal file → Executable file
View File

0
functional/runner/error.go Normal file → Executable file
View File

2
functional/runner/global.go Normal file → Executable file
View File

@ -47,7 +47,7 @@ type roundClient struct {
func newClient(eps []string, timeout time.Duration) *clientv3.Client {
c, err := clientv3.New(clientv3.Config{
Endpoints: eps,
DialTimeout: timeout * time.Second,
DialTimeout: time.Duration(timeout) * time.Second,
})
if err != nil {
log.Fatal(err)

0
functional/runner/help.go Normal file → Executable file
View File

0
functional/runner/lease_renewer_command.go Normal file → Executable file
View File

0
functional/runner/lock_racer_command.go Normal file → Executable file
View File

0
functional/runner/root.go Normal file → Executable file
View File

0
functional/runner/watch_command.go Normal file → Executable file
View File

View File

@ -13,7 +13,7 @@ if ! [[ "${0}" =~ "scripts/docker-local-agent.sh" ]]; then
fi
if [[ -z "${GO_VERSION}" ]]; then
GO_VERSION=1.12.8
GO_VERSION=1.10.1
fi
echo "Running with GO_VERSION:" ${GO_VERSION}
@ -38,5 +38,5 @@ docker run \
--rm \
--net=host \
--name ${AGENT_NAME} \
gcr.io/etcd-development/etcd-functional:go${GO_VERSION} \
gcr.io/etcd-development/etcd-functional-tester:go${GO_VERSION} \
/bin/bash -c "./bin/etcd-agent ${AGENT_ADDR_FLAG}"

View File

@ -6,7 +6,7 @@ if ! [[ "${0}" =~ "scripts/docker-local-tester.sh" ]]; then
fi
if [[ -z "${GO_VERSION}" ]]; then
GO_VERSION=1.12.8
GO_VERSION=1.10.1
fi
echo "Running with GO_VERSION:" ${GO_VERSION}
@ -14,5 +14,5 @@ docker run \
--rm \
--net=host \
--name tester \
gcr.io/etcd-development/etcd-functional:go${GO_VERSION} \
gcr.io/etcd-development/etcd-functional-tester:go${GO_VERSION} \
/bin/bash -c "./bin/etcd-tester --config ./functional.yaml"

View File

@ -7,8 +7,8 @@ if ! [[ "$0" =~ "scripts/genproto.sh" ]]; then
fi
# for now, be conservative about what version of protoc we expect
if ! [[ $(protoc --version) =~ "3.7.1" ]]; then
echo "could not find protoc 3.7.1, is it installed + in PATH?"
if ! [[ $(protoc --version) =~ "3.5.1" ]]; then
echo "could not find protoc 3.5.1, is it installed + in PATH?"
exit 255
fi

23
functional/tester/case.go Normal file → Executable file
View File

@ -275,18 +275,6 @@ func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
for i := 0; i < retries; i++ {
lastRev, err = clus.maxRev()
if lastRev == 0 {
clus.lg.Info(
"trigger snapshot RETRY",
zap.Int("retries", i),
zap.Int64("etcd-snapshot-count", snapshotCount),
zap.Int64("start-revision", startRev),
zap.Error(err),
)
time.Sleep(3 * time.Second)
continue
}
// If the number of proposals committed is bigger than snapshot count,
// a new snapshot should have been created.
diff := lastRev - startRev
@ -304,8 +292,12 @@ func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
return nil
}
dur := time.Second
if diff < 0 || err != nil {
dur = 3 * time.Second
}
clus.lg.Info(
"trigger snapshot RETRY",
"trigger snapshot PROGRESS",
zap.Int("retries", i),
zap.Int64("committed-entries", diff),
zap.Int64("etcd-snapshot-count", snapshotCount),
@ -314,10 +306,7 @@ func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
zap.Duration("took", time.Since(now)),
zap.Error(err),
)
time.Sleep(time.Second)
if err != nil {
time.Sleep(2 * time.Second)
}
time.Sleep(dur)
}
return fmt.Errorf("cluster too slow: only %d commits in %d retries", lastRev-startRev, retries)

0
functional/tester/case_delay.go Normal file → Executable file
View File

0
functional/tester/case_external.go Normal file → Executable file
View File

0
functional/tester/case_failpoints.go Normal file → Executable file
View File

0
functional/tester/case_network_blackhole.go Normal file → Executable file
View File

2
functional/tester/case_network_delay.go Normal file → Executable file
View File

@ -26,7 +26,7 @@ const (
// Wait more when it recovers from slow network, because network layer
// needs extra time to propagate traffic control (tc command) change.
// Otherwise, we get different hash values from the previous revision.
// For more detail, please see https://github.com/etcd-io/etcd/issues/5121.
// For more detail, please see https://github.com/coreos/etcd/issues/5121.
waitRecover = 5 * time.Second
)

0
functional/tester/case_no_fail.go Normal file → Executable file
View File

0
functional/tester/case_sigquit_remove.go Normal file → Executable file
View File

0
functional/tester/case_sigquit_remove_quorum.go Normal file → Executable file
View File

0
functional/tester/case_sigterm.go Normal file → Executable file
View File

0
functional/tester/checker.go Normal file → Executable file
View File

0
functional/tester/checker_kv_hash.go Normal file → Executable file
View File

0
functional/tester/checker_lease_expire.go Normal file → Executable file
View File

0
functional/tester/checker_no_check.go Normal file → Executable file
View File

0
functional/tester/checker_runner.go Normal file → Executable file
View File

14
functional/tester/cluster.go Normal file → Executable file
View File

@ -20,7 +20,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/url"
@ -107,9 +106,8 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
}
}
clus.testerHTTPServer = &http.Server{
Addr: clus.Tester.Addr,
Handler: mux,
ErrorLog: log.New(ioutil.Discard, "net/http", 0),
Addr: clus.Tester.Addr,
Handler: mux,
}
go clus.serveTesterServer()
@ -493,9 +491,9 @@ func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Respons
m, secure := clus.Members[idx], false
for _, cu := range m.Etcd.AdvertiseClientURLs {
u, perr := url.Parse(cu)
if perr != nil {
return nil, perr
u, err := url.Parse(cu)
if err != nil {
return nil, err
}
if u.Scheme == "https" { // TODO: handle unix
secure = true
@ -593,7 +591,7 @@ func (clus *Cluster) WaitHealth() error {
// wait 60s to check cluster health.
// TODO: set it to a reasonable value. It is set that high because
// follower may use long time to catch up the leader when reboot under
// reasonable workload (https://github.com/etcd-io/etcd/issues/2698)
// reasonable workload (https://github.com/coreos/etcd/issues/2698)
for i := 0; i < 60; i++ {
for _, m := range clus.Members {
if err = m.WriteHealthKey(); err != nil {

104
functional/tester/cluster_read_config.go Normal file → Executable file
View File

@ -44,56 +44,14 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
return nil, fmt.Errorf("len(clus.Members) expects at least 3, got %d", len(clus.Members))
}
failpointsEnabled := false
for _, c := range clus.Tester.Cases {
if c == rpcpb.Case_FAILPOINTS.String() {
failpointsEnabled = true
break
}
}
if len(clus.Tester.Cases) == 0 {
return nil, errors.New("cases not found")
}
if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
}
if clus.Tester.UpdatedDelayLatencyMs == 0 {
clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
}
for _, v := range clus.Tester.Cases {
if _, ok := rpcpb.Case_value[v]; !ok {
return nil, fmt.Errorf("%q is not defined in 'rpcpb.Case_value'", v)
}
}
for _, s := range clus.Tester.Stressers {
if _, ok := rpcpb.StresserType_value[s.Type]; !ok {
return nil, fmt.Errorf("unknown 'StresserType' %+v", s)
}
}
for _, v := range clus.Tester.Checkers {
if _, ok := rpcpb.Checker_value[v]; !ok {
return nil, fmt.Errorf("Checker is unknown; got %q", v)
}
}
if clus.Tester.StressKeySuffixRangeTxn > 100 {
return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
}
if clus.Tester.StressKeyTxnOps > 64 {
return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
}
for i, mem := range clus.Members {
if mem.EtcdExec == "embed" && failpointsEnabled {
return nil, errors.New("EtcdExec 'embed' cannot be run with failpoints enabled")
}
if mem.BaseDir == "" {
return nil, fmt.Errorf("BaseDir cannot be empty (got %q)", mem.BaseDir)
}
if mem.EtcdLogPath == "" {
return nil, fmt.Errorf("EtcdLogPath cannot be empty (got %q)", mem.EtcdLogPath)
}
if mem.Etcd.Name == "" {
return nil, fmt.Errorf("'--name' cannot be empty (got %+v)", mem)
}
@ -174,6 +132,9 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
}
}
if !strings.HasPrefix(mem.EtcdLogPath, mem.BaseDir) {
return nil, fmt.Errorf("EtcdLogPath must be prefixed with BaseDir (got %q)", mem.EtcdLogPath)
}
if !strings.HasPrefix(mem.Etcd.DataDir, mem.BaseDir) {
return nil, fmt.Errorf("Etcd.DataDir must be prefixed with BaseDir (got %q)", mem.Etcd.DataDir)
}
@ -227,7 +188,7 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
return nil, fmt.Errorf("Etcd.PeerClientCertAuth and Etcd.PeerAutoTLS cannot be both 'true'")
}
if (mem.Etcd.PeerCertFile == "") != (mem.Etcd.PeerKeyFile == "") {
return nil, fmt.Errorf("both Etcd.PeerCertFile %q and Etcd.PeerKeyFile %q must be either empty or non-empty", mem.Etcd.PeerCertFile, mem.Etcd.PeerKeyFile)
return nil, fmt.Errorf("Both Etcd.PeerCertFile %q and Etcd.PeerKeyFile %q must be either empty or non-empty", mem.Etcd.PeerCertFile, mem.Etcd.PeerKeyFile)
}
if mem.Etcd.ClientCertAuth && mem.Etcd.ClientAutoTLS {
return nil, fmt.Errorf("Etcd.ClientCertAuth and Etcd.ClientAutoTLS cannot be both 'true'")
@ -251,7 +212,7 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
return nil, fmt.Errorf("Etcd.ClientCertAuth 'false', but Etcd.ClientTrustedCAFile is %q", mem.Etcd.PeerCertFile)
}
if (mem.Etcd.ClientCertFile == "") != (mem.Etcd.ClientKeyFile == "") {
return nil, fmt.Errorf("both Etcd.ClientCertFile %q and Etcd.ClientKeyFile %q must be either empty or non-empty", mem.Etcd.ClientCertFile, mem.Etcd.ClientKeyFile)
return nil, fmt.Errorf("Both Etcd.ClientCertFile %q and Etcd.ClientKeyFile %q must be either empty or non-empty", mem.Etcd.ClientCertFile, mem.Etcd.ClientKeyFile)
}
peerTLS := mem.Etcd.PeerAutoTLS ||
@ -356,21 +317,42 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
}
clus.Members[i].ClientCertData = string(data)
}
if len(mem.Etcd.LogOutputs) == 0 {
return nil, fmt.Errorf("mem.Etcd.LogOutputs cannot be empty")
}
for _, v := range mem.Etcd.LogOutputs {
switch v {
case "stderr", "stdout", "/dev/null", "default":
default:
if !strings.HasPrefix(v, mem.BaseDir) {
return nil, fmt.Errorf("LogOutput %q must be prefixed with BaseDir %q", v, mem.BaseDir)
}
}
}
}
}
if len(clus.Tester.Cases) == 0 {
return nil, errors.New("Cases not found")
}
if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
}
if clus.Tester.UpdatedDelayLatencyMs == 0 {
clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
}
for _, v := range clus.Tester.Cases {
if _, ok := rpcpb.Case_value[v]; !ok {
return nil, fmt.Errorf("%q is not defined in 'rpcpb.Case_value'", v)
}
}
for _, v := range clus.Tester.Stressers {
if _, ok := rpcpb.Stresser_value[v]; !ok {
return nil, fmt.Errorf("Stresser is unknown; got %q", v)
}
}
for _, v := range clus.Tester.Checkers {
if _, ok := rpcpb.Checker_value[v]; !ok {
return nil, fmt.Errorf("Checker is unknown; got %q", v)
}
}
if clus.Tester.StressKeySuffixRangeTxn > 100 {
return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
}
if clus.Tester.StressKeyTxnOps > 64 {
return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
}
return clus, err
}

4
functional/tester/cluster_run.go Normal file → Executable file
View File

@ -212,8 +212,8 @@ func (clus *Cluster) doRound() error {
)
// with network delay, some ongoing requests may fail
// only return error, if more than 30% of QPS requests fail
if cnt > int(float64(clus.Tester.StressQPS)*0.3) {
// only return error, if more than 10% of QPS requests fail
if cnt > int(clus.Tester.StressQPS)/10 {
return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess)
}
}

0
functional/tester/cluster_shuffle.go Normal file → Executable file
View File

View File

@ -1,304 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"reflect"
"sort"
"testing"
"github.com/coreos/etcd/functional/rpcpb"
"go.uber.org/zap"
)
func Test_read(t *testing.T) {
exp := &Cluster{
Members: []*rpcpb.Member{
{
EtcdExec: "./bin/etcd",
AgentAddr: "127.0.0.1:19027",
FailpointHTTPAddr: "http://127.0.0.1:7381",
BaseDir: "/tmp/etcd-functional-1",
EtcdClientProxy: false,
EtcdPeerProxy: true,
EtcdClientEndpoint: "127.0.0.1:1379",
Etcd: &rpcpb.Etcd{
Name: "s1",
DataDir: "/tmp/etcd-functional-1/etcd.data",
WALDir: "/tmp/etcd-functional-1/etcd.data/member/wal",
HeartbeatIntervalMs: 100,
ElectionTimeoutMs: 1000,
ListenClientURLs: []string{"https://127.0.0.1:1379"},
AdvertiseClientURLs: []string{"https://127.0.0.1:1379"},
ClientAutoTLS: true,
ClientCertAuth: false,
ClientCertFile: "",
ClientKeyFile: "",
ClientTrustedCAFile: "",
ListenPeerURLs: []string{"https://127.0.0.1:1380"},
AdvertisePeerURLs: []string{"https://127.0.0.1:1381"},
PeerAutoTLS: true,
PeerClientCertAuth: false,
PeerCertFile: "",
PeerKeyFile: "",
PeerTrustedCAFile: "",
InitialCluster: "s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381",
InitialClusterState: "new",
InitialClusterToken: "tkn",
SnapshotCount: 10000,
QuotaBackendBytes: 10740000000,
PreVote: true,
InitialCorruptCheck: true,
Logger: "zap",
LogOutputs: []string{"/tmp/etcd-functional-1/etcd.log"},
Debug: true,
},
ClientCertData: "",
ClientCertPath: "",
ClientKeyData: "",
ClientKeyPath: "",
ClientTrustedCAData: "",
ClientTrustedCAPath: "",
PeerCertData: "",
PeerCertPath: "",
PeerKeyData: "",
PeerKeyPath: "",
PeerTrustedCAData: "",
PeerTrustedCAPath: "",
SnapshotPath: "/tmp/etcd-functional-1.snapshot.db",
},
{
EtcdExec: "./bin/etcd",
AgentAddr: "127.0.0.1:29027",
FailpointHTTPAddr: "http://127.0.0.1:7382",
BaseDir: "/tmp/etcd-functional-2",
EtcdClientProxy: false,
EtcdPeerProxy: true,
EtcdClientEndpoint: "127.0.0.1:2379",
Etcd: &rpcpb.Etcd{
Name: "s2",
DataDir: "/tmp/etcd-functional-2/etcd.data",
WALDir: "/tmp/etcd-functional-2/etcd.data/member/wal",
HeartbeatIntervalMs: 100,
ElectionTimeoutMs: 1000,
ListenClientURLs: []string{"https://127.0.0.1:2379"},
AdvertiseClientURLs: []string{"https://127.0.0.1:2379"},
ClientAutoTLS: true,
ClientCertAuth: false,
ClientCertFile: "",
ClientKeyFile: "",
ClientTrustedCAFile: "",
ListenPeerURLs: []string{"https://127.0.0.1:2380"},
AdvertisePeerURLs: []string{"https://127.0.0.1:2381"},
PeerAutoTLS: true,
PeerClientCertAuth: false,
PeerCertFile: "",
PeerKeyFile: "",
PeerTrustedCAFile: "",
InitialCluster: "s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381",
InitialClusterState: "new",
InitialClusterToken: "tkn",
SnapshotCount: 10000,
QuotaBackendBytes: 10740000000,
PreVote: true,
InitialCorruptCheck: true,
Logger: "zap",
LogOutputs: []string{"/tmp/etcd-functional-2/etcd.log"},
Debug: true,
},
ClientCertData: "",
ClientCertPath: "",
ClientKeyData: "",
ClientKeyPath: "",
ClientTrustedCAData: "",
ClientTrustedCAPath: "",
PeerCertData: "",
PeerCertPath: "",
PeerKeyData: "",
PeerKeyPath: "",
PeerTrustedCAData: "",
PeerTrustedCAPath: "",
SnapshotPath: "/tmp/etcd-functional-2.snapshot.db",
},
{
EtcdExec: "./bin/etcd",
AgentAddr: "127.0.0.1:39027",
FailpointHTTPAddr: "http://127.0.0.1:7383",
BaseDir: "/tmp/etcd-functional-3",
EtcdClientProxy: false,
EtcdPeerProxy: true,
EtcdClientEndpoint: "127.0.0.1:3379",
Etcd: &rpcpb.Etcd{
Name: "s3",
DataDir: "/tmp/etcd-functional-3/etcd.data",
WALDir: "/tmp/etcd-functional-3/etcd.data/member/wal",
HeartbeatIntervalMs: 100,
ElectionTimeoutMs: 1000,
ListenClientURLs: []string{"https://127.0.0.1:3379"},
AdvertiseClientURLs: []string{"https://127.0.0.1:3379"},
ClientAutoTLS: true,
ClientCertAuth: false,
ClientCertFile: "",
ClientKeyFile: "",
ClientTrustedCAFile: "",
ListenPeerURLs: []string{"https://127.0.0.1:3380"},
AdvertisePeerURLs: []string{"https://127.0.0.1:3381"},
PeerAutoTLS: true,
PeerClientCertAuth: false,
PeerCertFile: "",
PeerKeyFile: "",
PeerTrustedCAFile: "",
InitialCluster: "s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381",
InitialClusterState: "new",
InitialClusterToken: "tkn",
SnapshotCount: 10000,
QuotaBackendBytes: 10740000000,
PreVote: true,
InitialCorruptCheck: true,
Logger: "zap",
LogOutputs: []string{"/tmp/etcd-functional-3/etcd.log"},
Debug: true,
},
ClientCertData: "",
ClientCertPath: "",
ClientKeyData: "",
ClientKeyPath: "",
ClientTrustedCAData: "",
ClientTrustedCAPath: "",
PeerCertData: "",
PeerCertPath: "",
PeerKeyData: "",
PeerKeyPath: "",
PeerTrustedCAData: "",
PeerTrustedCAPath: "",
SnapshotPath: "/tmp/etcd-functional-3.snapshot.db",
},
},
Tester: &rpcpb.Tester{
DataDir: "/tmp/etcd-tester-data",
Network: "tcp",
Addr: "127.0.0.1:9028",
DelayLatencyMs: 5000,
DelayLatencyMsRv: 500,
UpdatedDelayLatencyMs: 5000,
RoundLimit: 1,
ExitOnCaseFail: true,
EnablePprof: true,
CaseDelayMs: 7000,
CaseShuffle: true,
Cases: []string{
"SIGTERM_ONE_FOLLOWER",
"SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
"SIGTERM_LEADER",
"SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT",
"SIGTERM_QUORUM",
"SIGTERM_ALL",
"SIGQUIT_AND_REMOVE_ONE_FOLLOWER",
"SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
// "SIGQUIT_AND_REMOVE_LEADER",
// "SIGQUIT_AND_REMOVE_LEADER_UNTIL_TRIGGER_SNAPSHOT",
// "SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH",
// "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER",
// "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
"BLACKHOLE_PEER_PORT_TX_RX_LEADER",
"BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT",
"BLACKHOLE_PEER_PORT_TX_RX_QUORUM",
"BLACKHOLE_PEER_PORT_TX_RX_ALL",
// "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER",
// "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER",
// "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
// "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
"DELAY_PEER_PORT_TX_RX_LEADER",
"RANDOM_DELAY_PEER_PORT_TX_RX_LEADER",
"DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT",
"RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT",
"DELAY_PEER_PORT_TX_RX_QUORUM",
"RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM",
"DELAY_PEER_PORT_TX_RX_ALL",
"RANDOM_DELAY_PEER_PORT_TX_RX_ALL",
"NO_FAIL_WITH_STRESS",
"NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS",
},
FailpointCommands: []string{`panic("etcd-tester")`},
RunnerExecPath: "./bin/etcd-runner",
ExternalExecPath: "",
Stressers: []*rpcpb.Stresser{
{Type: "KV_WRITE_SMALL", Weight: 0.35},
{Type: "KV_WRITE_LARGE", Weight: 0.002},
{Type: "KV_READ_ONE_KEY", Weight: 0.07},
{Type: "KV_READ_RANGE", Weight: 0.07},
{Type: "KV_DELETE_ONE_KEY", Weight: 0.07},
{Type: "KV_DELETE_RANGE", Weight: 0.07},
{Type: "KV_TXN_WRITE_DELETE", Weight: 0.35},
{Type: "LEASE", Weight: 0.0},
},
Checkers: []string{"KV_HASH", "LEASE_EXPIRE"},
StressKeySize: 100,
StressKeySizeLarge: 32769,
StressKeySuffixRange: 250000,
StressKeySuffixRangeTxn: 100,
StressKeyTxnOps: 10,
StressClients: 100,
StressQPS: 2000,
},
}
logger, err := zap.NewProduction()
if err != nil {
t.Fatal(err)
}
defer logger.Sync()
cfg, err := read(logger, "../../functional.yaml")
if err != nil {
t.Fatal(err)
}
cfg.lg = nil
if !reflect.DeepEqual(exp, cfg) {
t.Fatalf("expected %+v, got %+v", exp, cfg)
}
cfg.lg = logger
cfg.updateCases()
fs1 := cfg.listCases()
cfg.shuffleCases()
fs2 := cfg.listCases()
if reflect.DeepEqual(fs1, fs2) {
t.Fatalf("expected shuffled failure cases, got %q", fs2)
}
cfg.shuffleCases()
fs3 := cfg.listCases()
if reflect.DeepEqual(fs2, fs3) {
t.Fatalf("expected reshuffled failure cases from %q, got %q", fs2, fs3)
}
// shuffle ensures visit all exactly once
// so when sorted, failure cases must be equal
sort.Strings(fs1)
sort.Strings(fs2)
sort.Strings(fs3)
if !reflect.DeepEqual(fs1, fs2) {
t.Fatalf("expected %q, got %q", fs1, fs2)
}
if !reflect.DeepEqual(fs2, fs3) {
t.Fatalf("expected %q, got %q", fs2, fs3)
}
}

0
functional/tester/doc.go Normal file → Executable file
View File

0
functional/tester/metrics_report.go Normal file → Executable file
View File

94
functional/tester/stresser.go Normal file → Executable file
View File

@ -37,60 +37,40 @@ type Stresser interface {
// newStresser creates stresser from a comma separated list of stresser types.
func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
// TODO: Too intensive stressing clients can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
ks := &keyStresser{
lg: clus.lg,
m: m,
keySize: int(clus.Tester.StressKeySize),
keyLargeSize: int(clus.Tester.StressKeySizeLarge),
keySuffixRange: int(clus.Tester.StressKeySuffixRange),
keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
keyTxnOps: int(clus.Tester.StressKeyTxnOps),
clientsN: int(clus.Tester.StressClients),
rateLimiter: clus.rateLimiter,
}
ksExist := false
for _, s := range clus.Tester.Stressers {
stressers = make([]Stresser, len(clus.Tester.Stressers))
for i, stype := range clus.Tester.Stressers {
clus.lg.Info(
"creating stresser",
zap.String("type", s.Type),
zap.Float64("weight", s.Weight),
zap.String("type", stype),
zap.String("endpoint", m.EtcdClientEndpoint),
)
switch s.Type {
case "KV_WRITE_SMALL":
ksExist = true
ks.weightKVWriteSmall = s.Weight
case "KV_WRITE_LARGE":
ksExist = true
ks.weightKVWriteLarge = s.Weight
case "KV_READ_ONE_KEY":
ksExist = true
ks.weightKVReadOneKey = s.Weight
case "KV_READ_RANGE":
ksExist = true
ks.weightKVReadRange = s.Weight
case "KV_DELETE_ONE_KEY":
ksExist = true
ks.weightKVDeleteOneKey = s.Weight
case "KV_DELETE_RANGE":
ksExist = true
ks.weightKVDeleteRange = s.Weight
case "KV_TXN_WRITE_DELETE":
ksExist = true
ks.weightKVTxnWriteDelete = s.Weight
switch stype {
case "KV":
// TODO: Too intensive stressing clients can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
stressers[i] = &keyStresser{
stype: rpcpb.Stresser_KV,
lg: clus.lg,
m: m,
keySize: int(clus.Tester.StressKeySize),
keyLargeSize: int(clus.Tester.StressKeySizeLarge),
keySuffixRange: int(clus.Tester.StressKeySuffixRange),
keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
keyTxnOps: int(clus.Tester.StressKeyTxnOps),
clientsN: int(clus.Tester.StressClients),
rateLimiter: clus.rateLimiter,
}
case "LEASE":
stressers = append(stressers, &leaseStresser{
stype: rpcpb.StresserType_LEASE,
stressers[i] = &leaseStresser{
stype: rpcpb.Stresser_LEASE,
lg: clus.lg,
m: m,
numLeases: 10, // TODO: configurable
keysPerLease: 10, // TODO: configurable
rateLimiter: clus.rateLimiter,
})
}
case "ELECTION_RUNNER":
reqRate := 100
@ -103,15 +83,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers = append(stressers, newRunnerStresser(
rpcpb.StresserType_ELECTION_RUNNER,
stressers[i] = newRunnerStresser(
rpcpb.Stresser_ELECTION_RUNNER,
m.EtcdClientEndpoint,
clus.lg,
clus.Tester.RunnerExecPath,
args,
clus.rateLimiter,
reqRate,
))
)
case "WATCH_RUNNER":
reqRate := 100
@ -125,15 +105,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers = append(stressers, newRunnerStresser(
rpcpb.StresserType_WATCH_RUNNER,
stressers[i] = newRunnerStresser(
rpcpb.Stresser_WATCH_RUNNER,
m.EtcdClientEndpoint,
clus.lg,
clus.Tester.RunnerExecPath,
args,
clus.rateLimiter,
reqRate,
))
)
case "LOCK_RACER_RUNNER":
reqRate := 100
@ -145,15 +125,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers = append(stressers, newRunnerStresser(
rpcpb.StresserType_LOCK_RACER_RUNNER,
stressers[i] = newRunnerStresser(
rpcpb.Stresser_LOCK_RACER_RUNNER,
m.EtcdClientEndpoint,
clus.lg,
clus.Tester.RunnerExecPath,
args,
clus.rateLimiter,
reqRate,
))
)
case "LEASE_RUNNER":
args := []string{
@ -161,20 +141,16 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
"--ttl=30",
"--endpoints", m.EtcdClientEndpoint,
}
stressers = append(stressers, newRunnerStresser(
rpcpb.StresserType_LEASE_RUNNER,
stressers[i] = newRunnerStresser(
rpcpb.Stresser_LEASE_RUNNER,
m.EtcdClientEndpoint,
clus.lg,
clus.Tester.RunnerExecPath,
args,
clus.rateLimiter,
0,
))
)
}
}
if ksExist {
return append(stressers, ks)
}
return stressers
}

0
functional/tester/stresser_composite.go Normal file → Executable file
View File

143
functional/tester/stresser_key.go Normal file → Executable file
View File

@ -31,23 +31,14 @@ import (
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type keyStresser struct {
lg *zap.Logger
stype rpcpb.Stresser
lg *zap.Logger
m *rpcpb.Member
weightKVWriteSmall float64
weightKVWriteLarge float64
weightKVReadOneKey float64
weightKVReadRange float64
weightKVDeleteOneKey float64
weightKVDeleteRange float64
weightKVTxnWriteDelete float64
keySize int
keyLargeSize int
keySuffixRange int
@ -82,16 +73,26 @@ func (s *keyStresser) Stress() error {
s.ctx, s.cancel = context.WithCancel(context.Background())
s.wg.Add(s.clientsN)
s.stressTable = createStressTable([]stressEntry{
{weight: s.weightKVWriteSmall, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
{weight: s.weightKVWriteLarge, f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize)},
{weight: s.weightKVReadOneKey, f: newStressRange(s.cli, s.keySuffixRange)},
{weight: s.weightKVReadRange, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
{weight: s.weightKVDeleteOneKey, f: newStressDelete(s.cli, s.keySuffixRange)},
{weight: s.weightKVDeleteRange, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
{weight: s.weightKVTxnWriteDelete, f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps)},
})
var stressEntries = []stressEntry{
{weight: 0.7, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
{
weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize),
},
{weight: 0.07, f: newStressRange(s.cli, s.keySuffixRange)},
{weight: 0.07, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
{weight: 0.07, f: newStressDelete(s.cli, s.keySuffixRange)},
{weight: 0.07, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
}
if s.keyTxnSuffixRange > 0 {
// adjust to make up ±70% of workloads with writes
stressEntries[0].weight = 0.35
stressEntries = append(stressEntries, stressEntry{
weight: 0.35,
f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps),
})
}
s.stressTable = createStressTable(stressEntries)
s.emu.Lock()
s.paused = false
@ -103,7 +104,7 @@ func (s *keyStresser) Stress() error {
s.lg.Info(
"stress START",
zap.String("stress-type", "KV"),
zap.String("stress-type", s.stype.String()),
zap.String("endpoint", s.m.EtcdClientEndpoint),
)
return nil
@ -128,7 +129,41 @@ func (s *keyStresser) run() {
continue
}
if !s.isRetryableError(err) {
switch rpctypes.ErrorDesc(err) {
case context.DeadlineExceeded.Error():
// This retries when request is triggered at the same time as
// leader failure. When we terminate the leader, the request to
// that leader cannot be processed, and times out. Also requests
// to followers cannot be forwarded to the old leader, so timing out
// as well. We want to keep stressing until the cluster elects a
// new leader and start processing requests again.
case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
// This retries when request is triggered at the same time as
// leader failure and follower nodes receive time out errors
// from losing their leader. Followers should retry to connect
// to the new leader.
case etcdserver.ErrStopped.Error():
// one of the etcd nodes stopped from failure injection
// case transport.ErrConnClosing.Desc:
// // server closed the transport (failure injected node)
case rpctypes.ErrNotCapable.Error():
// capability check has not been done (in the beginning)
case rpctypes.ErrTooManyRequests.Error():
// hitting the recovering member.
case context.Canceled.Error():
// from stresser.Cancel method:
return
case grpc.ErrClientConnClosing.Error():
// from stresser.Cancel method:
return
default:
s.lg.Warn(
"stress run exiting",
zap.String("stress-type", s.stype.String()),
zap.String("endpoint", s.m.EtcdClientEndpoint),
zap.String("error-type", reflect.TypeOf(err).String()),
zap.Error(err),
)
return
}
@ -141,58 +176,6 @@ func (s *keyStresser) run() {
}
}
func (s *keyStresser) isRetryableError(err error) bool {
switch rpctypes.ErrorDesc(err) {
// retryable
case context.DeadlineExceeded.Error():
// This retries when request is triggered at the same time as
// leader failure. When we terminate the leader, the request to
// that leader cannot be processed, and times out. Also requests
// to followers cannot be forwarded to the old leader, so timing out
// as well. We want to keep stressing until the cluster elects a
// new leader and start processing requests again.
return true
case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
// This retries when request is triggered at the same time as
// leader failure and follower nodes receive time out errors
// from losing their leader. Followers should retry to connect
// to the new leader.
return true
case etcdserver.ErrStopped.Error():
// one of the etcd nodes stopped from failure injection
return true
case rpctypes.ErrNotCapable.Error():
// capability check has not been done (in the beginning)
return true
case rpctypes.ErrTooManyRequests.Error():
// hitting the recovering member.
return true
// case raft.ErrProposalDropped.Error():
// // removed member, or leadership has changed (old leader got raftpb.MsgProp)
// return true
// not retryable.
case context.Canceled.Error():
// from stresser.Cancel method:
return false
}
if status.Convert(err).Code() == codes.Unavailable {
// gRPC connection errors are translated to status.Unavailable
return true
}
s.lg.Warn(
"stress run exiting",
zap.String("stress-type", "KV"),
zap.String("endpoint", s.m.EtcdClientEndpoint),
zap.String("error-type", reflect.TypeOf(err).String()),
zap.String("error-desc", rpctypes.ErrorDesc(err)),
zap.Error(err),
)
return false
}
func (s *keyStresser) Pause() map[string]int {
return s.Close()
}
@ -210,7 +193,7 @@ func (s *keyStresser) Close() map[string]int {
s.lg.Info(
"stress STOP",
zap.String("stress-type", "KV"),
zap.String("stress-type", s.stype.String()),
zap.String("endpoint", s.m.EtcdClientEndpoint),
)
return ess
@ -223,13 +206,13 @@ func (s *keyStresser) ModifiedKeys() int64 {
type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
type stressEntry struct {
weight float64
weight float32
f stressFunc
}
type stressTable struct {
entries []stressEntry
sumWeights float64
sumWeights float32
}
func createStressTable(entries []stressEntry) *stressTable {
@ -241,8 +224,8 @@ func createStressTable(entries []stressEntry) *stressTable {
}
func (st *stressTable) choose() stressFunc {
v := rand.Float64() * st.sumWeights
var sum float64
v := rand.Float32() * st.sumWeights
var sum float32
var idx int
for i := range st.entries {
sum += st.entries[i].weight

2
functional/tester/stresser_lease.go Normal file → Executable file
View File

@ -38,7 +38,7 @@ const (
)
type leaseStresser struct {
stype rpcpb.StresserType
stype rpcpb.Stresser
lg *zap.Logger
m *rpcpb.Member

5
functional/tester/stresser_runner.go Normal file → Executable file
View File

@ -27,7 +27,7 @@ import (
)
type runnerStresser struct {
stype rpcpb.StresserType
stype rpcpb.Stresser
etcdClientEndpoint string
lg *zap.Logger
@ -42,7 +42,7 @@ type runnerStresser struct {
}
func newRunnerStresser(
stype rpcpb.StresserType,
stype rpcpb.Stresser,
ep string,
lg *zap.Logger,
cmdStr string,
@ -54,7 +54,6 @@ func newRunnerStresser(
return &runnerStresser{
stype: stype,
etcdClientEndpoint: ep,
lg: lg,
cmdStr: cmdStr,
args: args,
rl: rl,

0
functional/tester/utils.go Normal file → Executable file
View File