functional: rename to "SIGTERM/SIGQUIT*"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-04-09 16:18:27 -07:00
parent cd4580b4a9
commit bc1fd92579
16 changed files with 1845 additions and 1805 deletions

View File

@ -122,12 +122,12 @@ tester-config:
failure-delay-ms: 7000
failure-shuffle: true
failure-cases:
- KILL_ONE_FOLLOWER
- KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
- KILL_LEADER
- KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT
- KILL_QUORUM
- KILL_ALL
- SIGTERM_ONE_FOLLOWER
- SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
- SIGTERM_LEADER
- SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT
- SIGTERM_QUORUM
- SIGTERM_ALL
- BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER
- BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
- BLACKHOLE_PEER_PORT_TX_RX_LEADER

View File

@ -47,25 +47,29 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
}
switch req.Operation {
case rpcpb.Operation_InitialStartEtcd:
return srv.handleInitialStartEtcd(req)
case rpcpb.Operation_RestartEtcd:
return srv.handleRestartEtcd()
case rpcpb.Operation_KillEtcd:
return srv.handleKillEtcd()
case rpcpb.Operation_FailArchive:
return srv.handleFailArchive()
case rpcpb.Operation_DestroyEtcdAgent:
return srv.handleDestroyEtcdAgent()
case rpcpb.Operation_INITIAL_START_ETCD:
return srv.handle_INITIAL_START_ETCD(req)
case rpcpb.Operation_RESTART_ETCD:
return srv.handle_RESTART_ETCD()
case rpcpb.Operation_BlackholePeerPortTxRx:
return srv.handleBlackholePeerPortTxRx()
case rpcpb.Operation_UnblackholePeerPortTxRx:
return srv.handleUnblackholePeerPortTxRx()
case rpcpb.Operation_DelayPeerPortTxRx:
return srv.handleDelayPeerPortTxRx()
case rpcpb.Operation_UndelayPeerPortTxRx:
return srv.handleUndelayPeerPortTxRx()
case rpcpb.Operation_SIGTERM_ETCD:
return srv.handle_SIGTERM_ETCD()
case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA:
return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA()
case rpcpb.Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA:
return srv.handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA()
case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT:
return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
case rpcpb.Operation_BLACKHOLE_PEER_PORT_TX_RX:
return srv.handle_BLACKHOLE_PEER_PORT_TX_RX()
case rpcpb.Operation_UNBLACKHOLE_PEER_PORT_TX_RX:
return srv.handle_UNBLACKHOLE_PEER_PORT_TX_RX()
case rpcpb.Operation_DELAY_PEER_PORT_TX_RX:
return srv.handle_DELAY_PEER_PORT_TX_RX()
case rpcpb.Operation_UNDELAY_PEER_PORT_TX_RX:
return srv.handle_UNDELAY_PEER_PORT_TX_RX()
default:
msg := fmt.Sprintf("operation not found (%v)", req.Operation)
@ -73,11 +77,11 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
}
}
func (srv *Server) handleInitialStartEtcd(req *rpcpb.Request) (*rpcpb.Response, error) {
if srv.last != rpcpb.Operation_NotStarted {
func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
if srv.last != rpcpb.Operation_NOT_STARTED {
return &rpcpb.Response{
Success: false,
Status: fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_InitialStartEtcd.String(), srv.last.String()),
Status: fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()),
Member: req.Member,
}, nil
}
@ -403,10 +407,17 @@ func (srv *Server) startEtcdCmd() error {
return srv.etcdCmd.Start()
}
func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) {
func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
var err error
if !fileutil.Exist(srv.Member.BaseDir) {
err = fileutil.TouchDirAll(srv.Member.BaseDir)
if err != nil {
return nil, err
}
}
srv.creatEtcdCmd()
var err error
if err = srv.saveTLSAssets(); err != nil {
return nil, err
}
@ -434,7 +445,7 @@ func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) {
}, nil
}
func (srv *Server) handleKillEtcd() (*rpcpb.Response, error) {
func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
srv.stopProxy()
err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
@ -449,7 +460,28 @@ func (srv *Server) handleKillEtcd() (*rpcpb.Response, error) {
}, nil
}
func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {
func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error) {
srv.stopProxy()
err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
if err != nil {
return nil, err
}
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
err = os.RemoveAll(srv.Member.BaseDir)
if err != nil {
return nil, err
}
srv.lg.Info("removed base directory", zap.String("dir", srv.Member.BaseDir))
return &rpcpb.Response{
Success: true,
Status: "killed etcd and removed base directory",
}, nil
}
func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, error) {
srv.stopProxy()
// exit with stackstrace
@ -489,12 +521,14 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {
}
// stop proxy, etcd, delete data directory
func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) {
err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.Response, error) {
srv.stopProxy()
err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
if err != nil {
return nil, err
}
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
err = os.RemoveAll(srv.Member.BaseDir)
if err != nil {
@ -505,22 +539,13 @@ func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) {
// stop agent server
srv.Stop()
for port, px := range srv.advertiseClientPortToProxy {
err := px.Close()
srv.lg.Info("closed proxy", zap.Int("client-port", port), zap.Error(err))
}
for port, px := range srv.advertisePeerPortToProxy {
err := px.Close()
srv.lg.Info("closed proxy", zap.Int("peer-port", port), zap.Error(err))
}
return &rpcpb.Response{
Success: true,
Status: "destroyed etcd and agent",
}, nil
}
func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) {
func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy {
srv.lg.Info("blackholing", zap.Int("peer-port", port))
px.BlackholeTx()
@ -533,7 +558,7 @@ func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) {
}, nil
}
func (srv *Server) handleUnblackholePeerPortTxRx() (*rpcpb.Response, error) {
func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy {
srv.lg.Info("unblackholing", zap.Int("peer-port", port))
px.UnblackholeTx()
@ -546,7 +571,7 @@ func (srv *Server) handleUnblackholePeerPortTxRx() (*rpcpb.Response, error) {
}, nil
}
func (srv *Server) handleDelayPeerPortTxRx() (*rpcpb.Response, error) {
func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
lat := time.Duration(srv.Tester.UpdatedDelayLatencyMs) * time.Millisecond
rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond
@ -571,7 +596,7 @@ func (srv *Server) handleDelayPeerPortTxRx() (*rpcpb.Response, error) {
}, nil
}
func (srv *Server) handleUndelayPeerPortTxRx() (*rpcpb.Response, error) {
func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy {
srv.lg.Info("undelaying", zap.Int("peer-port", port))
px.UndelayTx()

View File

@ -64,7 +64,7 @@ func NewServer(
lg: lg,
network: network,
address: address,
last: rpcpb.Operation_NotStarted,
last: rpcpb.Operation_NOT_STARTED,
advertiseClientPortToProxy: make(map[int]proxy.Server),
advertisePeerPortToProxy: make(map[int]proxy.Server),
}

View File

@ -44,11 +44,11 @@ func main() {
logger.Fatal("failed to create a cluster", zap.Error(err))
}
err = clus.Bootstrap()
err = clus.Send_INITIAL_START_ETCD()
if err != nil {
logger.Fatal("Bootstrap failed", zap.Error(err))
}
defer clus.DestroyEtcdAgents()
defer clus.Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
logger.Info("wait health after bootstrap")
err = clus.WaitHealth()

File diff suppressed because it is too large Load Diff

View File

@ -8,77 +8,25 @@ option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
message Request {
Operation Operation = 1;
// Member contains the same Member object from tester configuration.
Member Member = 2;
// Tester contains tester configuration.
Tester Tester = 3;
}
message Response {
bool Success = 1;
string Status = 2;
// Member contains the same Member object from tester request.
Member Member = 3;
}
service Transport {
rpc Transport(stream Request) returns (stream Response) {}
}
enum Operation {
NotStarted = 0;
// InitialStartEtcd is only called to start etcd very first time.
InitialStartEtcd = 1;
// RestartEtcd is sent to restart killed etcd.
RestartEtcd = 2;
// KillEtcd pauses etcd process while keeping data directories
// and previous etcd configurations.
KillEtcd = 3;
// FailArchive is sent when consistency check failed,
// thus need to archive etcd data directories.
FailArchive = 4;
// DestroyEtcdAgent destroys etcd process, etcd data, and agent server.
DestroyEtcdAgent = 5;
// BlackholePeerPortTxRx drops all outgoing/incoming packets from/to the
// peer port on target member's peer port.
BlackholePeerPortTxRx = 100;
// UnblackholePeerPortTxRx removes outgoing/incoming packet dropping.
UnblackholePeerPortTxRx = 101;
// DelayPeerPortTxRx delays all outgoing/incoming packets from/to the
// peer port on target member's peer port.
DelayPeerPortTxRx = 102;
// UndelayPeerPortTxRx removes all outgoing/incoming delays.
UndelayPeerPortTxRx = 103;
}
message Etcd {
string Name = 1 [(gogoproto.moretags) = "yaml:\"name\""];
string DataDir = 2 [(gogoproto.moretags) = "yaml:\"data-dir\""];
string WALDir = 3 [(gogoproto.moretags) = "yaml:\"wal-dir\""];
// HeartbeatIntervalMs is the time (in milliseconds) of a heartbeat interval.
// Default value is 100, which is 100ms.
int64 HeartbeatIntervalMs = 11 [(gogoproto.moretags) = "yaml:\"heartbeat-interval\""];
// ElectionTimeoutMs is the time (in milliseconds) for an election to timeout.
// Default value is 1000, which is 1s.
int64 ElectionTimeoutMs = 12 [(gogoproto.moretags) = "yaml:\"election-timeout\""];
repeated string ListenClientURLs = 21 [(gogoproto.moretags) = "yaml:\"listen-client-urls\""];
repeated string AdvertiseClientURLs = 22 [(gogoproto.moretags) = "yaml:\"advertise-client-urls\""];
bool ClientAutoTLS = 23 [(gogoproto.moretags) = "yaml:\"auto-tls\""];
bool ClientCertAuth = 24 [(gogoproto.moretags) = "yaml:\"client-cert-auth\""];
string ClientCertFile = 25 [(gogoproto.moretags) = "yaml:\"cert-file\""];
string ClientKeyFile = 26 [(gogoproto.moretags) = "yaml:\"key-file\""];
string ClientTrustedCAFile = 27 [(gogoproto.moretags) = "yaml:\"trusted-ca-file\""];
repeated string ListenPeerURLs = 31 [(gogoproto.moretags) = "yaml:\"listen-peer-urls\""];
repeated string AdvertisePeerURLs = 32 [(gogoproto.moretags) = "yaml:\"initial-advertise-peer-urls\""];
bool PeerAutoTLS = 33 [(gogoproto.moretags) = "yaml:\"peer-auto-tls\""];
bool PeerClientCertAuth = 34 [(gogoproto.moretags) = "yaml:\"peer-client-cert-auth\""];
string PeerCertFile = 35 [(gogoproto.moretags) = "yaml:\"peer-cert-file\""];
string PeerKeyFile = 36 [(gogoproto.moretags) = "yaml:\"peer-key-file\""];
string PeerTrustedCAFile = 37 [(gogoproto.moretags) = "yaml:\"peer-trusted-ca-file\""];
string InitialCluster = 41 [(gogoproto.moretags) = "yaml:\"initial-cluster\""];
string InitialClusterState = 42 [(gogoproto.moretags) = "yaml:\"initial-cluster-state\""];
string InitialClusterToken = 43 [(gogoproto.moretags) = "yaml:\"initial-cluster-token\""];
int64 SnapshotCount = 51 [(gogoproto.moretags) = "yaml:\"snapshot-count\""];
int64 QuotaBackendBytes = 52 [(gogoproto.moretags) = "yaml:\"quota-backend-bytes\""];
bool PreVote = 63 [(gogoproto.moretags) = "yaml:\"pre-vote\""];
bool InitialCorruptCheck = 64 [(gogoproto.moretags) = "yaml:\"initial-corrupt-check\""];
}
message Member {
// EtcdExecPath is the executable etcd binary path in agent server.
string EtcdExecPath = 1 [(gogoproto.moretags) = "yaml:\"etcd-exec-path\""];
@ -128,18 +76,156 @@ message Member {
string PeerTrustedCAPath = 506 [(gogoproto.moretags) = "yaml:\"peer-trusted-ca-path\""];
}
message Tester {
string DataDir = 1 [(gogoproto.moretags) = "yaml:\"data-dir\""];
string Network = 2 [(gogoproto.moretags) = "yaml:\"network\""];
string Addr = 3 [(gogoproto.moretags) = "yaml:\"addr\""];
// DelayLatencyMsRv is the delay latency in milliseconds,
// to inject to simulated slow network.
uint32 DelayLatencyMs = 11 [(gogoproto.moretags) = "yaml:\"delay-latency-ms\""];
// DelayLatencyMsRv is the delay latency random variable in milliseconds.
uint32 DelayLatencyMsRv = 12 [(gogoproto.moretags) = "yaml:\"delay-latency-ms-rv\""];
// UpdatedDelayLatencyMs is the update delay latency in milliseconds,
// to inject to simulated slow network. It's the final latency to apply,
// in case the latency numbers are randomly generated from given delay latency field.
uint32 UpdatedDelayLatencyMs = 13 [(gogoproto.moretags) = "yaml:\"updated-delay-latency-ms\""];
// RoundLimit is the limit of rounds to run failure set (-1 to run without limits).
int32 RoundLimit = 21 [(gogoproto.moretags) = "yaml:\"round-limit\""];
// ExitOnFailure is true, then exit tester on first failure.
bool ExitOnFailure = 22 [(gogoproto.moretags) = "yaml:\"exit-on-failure\""];
// ConsistencyCheck is true to check consistency (revision, hash).
bool ConsistencyCheck = 23 [(gogoproto.moretags) = "yaml:\"consistency-check\""];
// EnablePprof is true to enable profiler.
bool EnablePprof = 24 [(gogoproto.moretags) = "yaml:\"enable-pprof\""];
// FailureDelayMs is the delay duration after failure is injected.
// Useful when triggering snapshot or no-op failure cases.
uint32 FailureDelayMs = 31 [(gogoproto.moretags) = "yaml:\"failure-delay-ms\""];
// FailureShuffle is true to randomize failure injecting order.
bool FailureShuffle = 32 [(gogoproto.moretags) = "yaml:\"failure-shuffle\""];
// FailureCases is the selected test cases to schedule.
// If empty, run all failure cases.
repeated string FailureCases = 33 [(gogoproto.moretags) = "yaml:\"failure-cases\""];
// Failpoinommands is the list of "gofail" commands (e.g. panic("etcd-tester"),1*sleep(1000)
repeated string FailpointCommands = 34 [(gogoproto.moretags) = "yaml:\"failpoint-commands\""];
// RunnerExecPath is a path of etcd-runner binary.
string RunnerExecPath = 41 [(gogoproto.moretags) = "yaml:\"runner-exec-path\""];
// ExternalExecPath is a path of script for enabling/disabling an external fault injector.
string ExternalExecPath = 42 [(gogoproto.moretags) = "yaml:\"external-exec-path\""];
// StressTypes is the list of stresser names:
// keys, lease, nop, election-runner, watch-runner, lock-racer-runner, lease-runner.
repeated string StressTypes = 101 [(gogoproto.moretags) = "yaml:\"stress-types\""];
// StressKeySize is the size of each small key written into etcd.
int32 StressKeySize = 102 [(gogoproto.moretags) = "yaml:\"stress-key-size\""];
// StressKeySizeLarge is the size of each large key written into etcd.
int32 StressKeySizeLarge = 103 [(gogoproto.moretags) = "yaml:\"stress-key-size-large\""];
// StressKeySuffixRange is the count of key range written into etcd.
// Stress keys are created with "fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)".
int32 StressKeySuffixRange = 104 [(gogoproto.moretags) = "yaml:\"stress-key-suffix-range\""];
// StressKeySuffixRangeTxn is the count of key range written into etcd txn (max 100).
// Stress keys are created with "fmt.Sprintf("/k%03d", i)".
int32 StressKeySuffixRangeTxn = 105 [(gogoproto.moretags) = "yaml:\"stress-key-suffix-range-txn\""];
// StressKeyTxnOps is the number of operations per a transaction (max 64).
int32 StressKeyTxnOps = 106 [(gogoproto.moretags) = "yaml:\"stress-key-txn-ops\""];
// StressClients is the number of concurrent stressing clients
// with "one" shared TCP connection.
int32 StressClients = 201 [(gogoproto.moretags) = "yaml:\"stress-clients\""];
// StressQPS is the maximum number of stresser requests per second.
int32 StressQPS = 202 [(gogoproto.moretags) = "yaml:\"stress-qps\""];
}
message Etcd {
string Name = 1 [(gogoproto.moretags) = "yaml:\"name\""];
string DataDir = 2 [(gogoproto.moretags) = "yaml:\"data-dir\""];
string WALDir = 3 [(gogoproto.moretags) = "yaml:\"wal-dir\""];
// HeartbeatIntervalMs is the time (in milliseconds) of a heartbeat interval.
// Default value is 100, which is 100ms.
int64 HeartbeatIntervalMs = 11 [(gogoproto.moretags) = "yaml:\"heartbeat-interval\""];
// ElectionTimeoutMs is the time (in milliseconds) for an election to timeout.
// Default value is 1000, which is 1s.
int64 ElectionTimeoutMs = 12 [(gogoproto.moretags) = "yaml:\"election-timeout\""];
repeated string ListenClientURLs = 21 [(gogoproto.moretags) = "yaml:\"listen-client-urls\""];
repeated string AdvertiseClientURLs = 22 [(gogoproto.moretags) = "yaml:\"advertise-client-urls\""];
bool ClientAutoTLS = 23 [(gogoproto.moretags) = "yaml:\"auto-tls\""];
bool ClientCertAuth = 24 [(gogoproto.moretags) = "yaml:\"client-cert-auth\""];
string ClientCertFile = 25 [(gogoproto.moretags) = "yaml:\"cert-file\""];
string ClientKeyFile = 26 [(gogoproto.moretags) = "yaml:\"key-file\""];
string ClientTrustedCAFile = 27 [(gogoproto.moretags) = "yaml:\"trusted-ca-file\""];
repeated string ListenPeerURLs = 31 [(gogoproto.moretags) = "yaml:\"listen-peer-urls\""];
repeated string AdvertisePeerURLs = 32 [(gogoproto.moretags) = "yaml:\"initial-advertise-peer-urls\""];
bool PeerAutoTLS = 33 [(gogoproto.moretags) = "yaml:\"peer-auto-tls\""];
bool PeerClientCertAuth = 34 [(gogoproto.moretags) = "yaml:\"peer-client-cert-auth\""];
string PeerCertFile = 35 [(gogoproto.moretags) = "yaml:\"peer-cert-file\""];
string PeerKeyFile = 36 [(gogoproto.moretags) = "yaml:\"peer-key-file\""];
string PeerTrustedCAFile = 37 [(gogoproto.moretags) = "yaml:\"peer-trusted-ca-file\""];
string InitialCluster = 41 [(gogoproto.moretags) = "yaml:\"initial-cluster\""];
string InitialClusterState = 42 [(gogoproto.moretags) = "yaml:\"initial-cluster-state\""];
string InitialClusterToken = 43 [(gogoproto.moretags) = "yaml:\"initial-cluster-token\""];
int64 SnapshotCount = 51 [(gogoproto.moretags) = "yaml:\"snapshot-count\""];
int64 QuotaBackendBytes = 52 [(gogoproto.moretags) = "yaml:\"quota-backend-bytes\""];
bool PreVote = 63 [(gogoproto.moretags) = "yaml:\"pre-vote\""];
bool InitialCorruptCheck = 64 [(gogoproto.moretags) = "yaml:\"initial-corrupt-check\""];
}
enum Operation {
// NOT_STARTED is the agent status before etcd first start.
NOT_STARTED = 0;
// INITIAL_START_ETCD is only called to start etcd, the very first time.
INITIAL_START_ETCD = 10;
// RESTART_ETCD is sent to restart killed etcd.
RESTART_ETCD = 11;
// SIGTERM_ETCD pauses etcd process while keeping data directories
// and previous etcd configurations.
SIGTERM_ETCD = 20;
// SIGQUIT_ETCD_AND_REMOVE_DATA kills etcd process and removes all data
// directories to simulate destroying the whole machine.
SIGQUIT_ETCD_AND_REMOVE_DATA = 21;
// SIGQUIT_ETCD_AND_ARCHIVE_DATA is sent when consistency check failed,
// thus need to archive etcd data directories.
SIGQUIT_ETCD_AND_ARCHIVE_DATA = 30;
// SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT destroys etcd process,
// etcd data, and agent server.
SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT = 31;
// BLACKHOLE_PEER_PORT_TX_RX drops all outgoing/incoming packets from/to
// the peer port on target member's peer port.
BLACKHOLE_PEER_PORT_TX_RX = 100;
// UNBLACKHOLE_PEER_PORT_TX_RX removes outgoing/incoming packet dropping.
UNBLACKHOLE_PEER_PORT_TX_RX = 101;
// DELAY_PEER_PORT_TX_RX delays all outgoing/incoming packets from/to
// the peer port on target member's peer port.
DELAY_PEER_PORT_TX_RX = 200;
// UNDELAY_PEER_PORT_TX_RX removes all outgoing/incoming delays.
UNDELAY_PEER_PORT_TX_RX = 201;
}
// FailureCase defines various system faults in distributed systems,
// in order to verify correct behavior of etcd servers and clients.
enum FailureCase {
// KILL_ONE_FOLLOWER stops a randomly chosen follower (non-leader)
// SIGTERM_ONE_FOLLOWER stops a randomly chosen follower (non-leader)
// but does not delete its data directories on disk for next restart.
// It waits "failure-delay-ms" before recovering this failure.
// The expected behavior is that the follower comes back online
// and rejoins the cluster, and then each member continues to process
// client requests ('Put' request that requires Raft consensus).
KILL_ONE_FOLLOWER = 0;
SIGTERM_ONE_FOLLOWER = 0;
// KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT stops a randomly chosen
// SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT stops a randomly chosen
// follower but does not delete its data directories on disk for next
// restart. And waits until most up-to-date node (leader) applies the
// snapshot count of entries since the stop operation.
@ -148,9 +234,9 @@ enum FailureCase {
// to the follower to force it to follow the leader's log.
// As always, after recovery, each member must be able to process
// client requests.
KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT = 1;
SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT = 1;
// KILL_LEADER stops the active leader node but does not delete its
// SIGTERM_LEADER stops the active leader node but does not delete its
// data directories on disk for next restart. Then it waits
// "failure-delay-ms" before recovering this failure, in order to
// trigger election timeouts.
@ -158,9 +244,9 @@ enum FailureCase {
// old leader comes back online and rejoins the cluster as a follower.
// As always, after recovery, each member must be able to process
// client requests.
KILL_LEADER = 2;
SIGTERM_LEADER = 2;
// KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT stops the active leader node
// SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT stops the active leader node
// but does not delete its data directories on disk for next restart.
// And waits until most up-to-date node ("new" leader) applies the
// snapshot count of entries since the stop operation.
@ -169,24 +255,24 @@ enum FailureCase {
// And it receives the snapshot from the new leader to overwrite its
// store. As always, after recovery, each member must be able to
// process client requests.
KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT = 3;
SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT = 3;
// KILL_QUORUM stops majority number of nodes to make the whole cluster
// SIGTERM_QUORUM stops majority number of nodes to make the whole cluster
// inoperable but does not delete data directories on stopped nodes
// for next restart. And it waits "failure-delay-ms" before recovering
// this failure.
// The expected behavior is that nodes come back online, thus cluster
// comes back operative as well. As always, after recovery, each member
// must be able to process client requests.
KILL_QUORUM = 4;
SIGTERM_QUORUM = 4;
// KILL_ALL stops the whole cluster but does not delete data directories
// SIGTERM_ALL stops the whole cluster but does not delete data directories
// on disk for next restart. And it waits "failure-delay-ms" before
// recovering this failure.
// The expected behavior is that nodes come back online, thus cluster
// comes back operative as well. As always, after recovery, each member
// must be able to process client requests.
KILL_ALL = 5;
SIGTERM_ALL = 5;
// BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER drops all outgoing/incoming
// packets from/to the peer port on a randomly chosen follower
@ -392,81 +478,3 @@ enum StressType {
LOCK_RACER_RUNNER = 4;
LEASE_RUNNER = 5;
}
message Tester {
string DataDir = 1 [(gogoproto.moretags) = "yaml:\"data-dir\""];
string Network = 2 [(gogoproto.moretags) = "yaml:\"network\""];
string Addr = 3 [(gogoproto.moretags) = "yaml:\"addr\""];
// DelayLatencyMsRv is the delay latency in milliseconds,
// to inject to simulated slow network.
uint32 DelayLatencyMs = 11 [(gogoproto.moretags) = "yaml:\"delay-latency-ms\""];
// DelayLatencyMsRv is the delay latency random variable in milliseconds.
uint32 DelayLatencyMsRv = 12 [(gogoproto.moretags) = "yaml:\"delay-latency-ms-rv\""];
// UpdatedDelayLatencyMs is the update delay latency in milliseconds,
// to inject to simulated slow network. It's the final latency to apply,
// in case the latency numbers are randomly generated from given delay latency field.
uint32 UpdatedDelayLatencyMs = 13 [(gogoproto.moretags) = "yaml:\"updated-delay-latency-ms\""];
// RoundLimit is the limit of rounds to run failure set (-1 to run without limits).
int32 RoundLimit = 21 [(gogoproto.moretags) = "yaml:\"round-limit\""];
// ExitOnFailure is true, then exit tester on first failure.
bool ExitOnFailure = 22 [(gogoproto.moretags) = "yaml:\"exit-on-failure\""];
// ConsistencyCheck is true to check consistency (revision, hash).
bool ConsistencyCheck = 23 [(gogoproto.moretags) = "yaml:\"consistency-check\""];
// EnablePprof is true to enable profiler.
bool EnablePprof = 24 [(gogoproto.moretags) = "yaml:\"enable-pprof\""];
// FailureDelayMs is the delay duration after failure is injected.
// Useful when triggering snapshot or no-op failure cases.
uint32 FailureDelayMs = 31 [(gogoproto.moretags) = "yaml:\"failure-delay-ms\""];
// FailureShuffle is true to randomize failure injecting order.
bool FailureShuffle = 32 [(gogoproto.moretags) = "yaml:\"failure-shuffle\""];
// FailureCases is the selected test cases to schedule.
// If empty, run all failure cases.
repeated string FailureCases = 33 [(gogoproto.moretags) = "yaml:\"failure-cases\""];
// Failpoinommands is the list of "gofail" commands (e.g. panic("etcd-tester"),1*sleep(1000)
repeated string FailpointCommands = 34 [(gogoproto.moretags) = "yaml:\"failpoint-commands\""];
// RunnerExecPath is a path of etcd-runner binary.
string RunnerExecPath = 41 [(gogoproto.moretags) = "yaml:\"runner-exec-path\""];
// ExternalExecPath is a path of script for enabling/disabling an external fault injector.
string ExternalExecPath = 42 [(gogoproto.moretags) = "yaml:\"external-exec-path\""];
// StressTypes is the list of stresser names:
// keys, lease, nop, election-runner, watch-runner, lock-racer-runner, lease-runner.
repeated string StressTypes = 101 [(gogoproto.moretags) = "yaml:\"stress-types\""];
// StressKeySize is the size of each small key written into etcd.
int32 StressKeySize = 102 [(gogoproto.moretags) = "yaml:\"stress-key-size\""];
// StressKeySizeLarge is the size of each large key written into etcd.
int32 StressKeySizeLarge = 103 [(gogoproto.moretags) = "yaml:\"stress-key-size-large\""];
// StressKeySuffixRange is the count of key range written into etcd.
// Stress keys are created with "fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)".
int32 StressKeySuffixRange = 104 [(gogoproto.moretags) = "yaml:\"stress-key-suffix-range\""];
// StressKeySuffixRangeTxn is the count of key range written into etcd txn (max 100).
// Stress keys are created with "fmt.Sprintf("/k%03d", i)".
int32 StressKeySuffixRangeTxn = 105 [(gogoproto.moretags) = "yaml:\"stress-key-suffix-range-txn\""];
// StressKeyTxnOps is the number of operations per a transaction (max 64).
int32 StressKeyTxnOps = 106 [(gogoproto.moretags) = "yaml:\"stress-key-txn-ops\""];
// StressClients is the number of concurrent stressing clients
// with "one" shared TCP connection.
int32 StressClients = 201 [(gogoproto.moretags) = "yaml:\"stress-clients\""];
// StressQPS is the maximum number of stresser requests per second.
int32 StressQPS = 202 [(gogoproto.moretags) = "yaml:\"stress-qps\""];
}
message Request {
Operation Operation = 1;
// Member contains the same Member object from tester configuration.
Member Member = 2;
// Tester contains tester configuration.
Tester Tester = 3;
}
message Response {
bool Success = 1;
string Status = 2;
// Member contains the same Member object from tester request.
Member Member = 3;
}

View File

@ -142,91 +142,91 @@ func (clus *Cluster) serveTesterServer() {
func (clus *Cluster) updateFailures() {
for _, cs := range clus.Tester.FailureCases {
switch cs {
case "KILL_ONE_FOLLOWER":
case "SIGTERM_ONE_FOLLOWER":
clus.failures = append(clus.failures,
newFailureKillOneFollower(clus))
case "KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
new_FailureCase_SIGTERM_ONE_FOLLOWER(clus))
case "SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures,
newFailureKillOneFollowerUntilTriggerSnapshot(clus))
case "KILL_LEADER":
new_FailureCase_SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus))
case "SIGTERM_LEADER":
clus.failures = append(clus.failures,
newFailureKillLeader(clus))
case "KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT":
new_FailureCase_SIGTERM_LEADER(clus))
case "SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures,
newFailureKillLeaderUntilTriggerSnapshot(clus))
case "KILL_QUORUM":
new_FailureCase_SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus))
case "SIGTERM_QUORUM":
clus.failures = append(clus.failures,
newFailureKillQuorum(clus))
case "KILL_ALL":
new_FailureCase_SIGTERM_QUORUM(clus))
case "SIGTERM_ALL":
clus.failures = append(clus.failures,
newFailureKillAll(clus))
new_FailureCase_SIGTERM_ALL(clus))
case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.failures = append(clus.failures,
newFailureBlackholePeerPortTxRxOneFollower(clus))
new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER(clus))
case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures,
newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot())
new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT())
case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
clus.failures = append(clus.failures,
newFailureBlackholePeerPortTxRxLeader(clus))
new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER(clus))
case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures,
newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot())
new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT())
case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM":
clus.failures = append(clus.failures,
newFailureBlackholePeerPortTxRxQuorum(clus))
new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_QUORUM(clus))
case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
clus.failures = append(clus.failures,
newFailureBlackholePeerPortTxRxAll(clus))
new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ALL(clus))
case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxOneFollower(clus, false))
new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxOneFollower(clus, true))
new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus, true))
case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, false))
new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, true))
new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus, true))
case "DELAY_PEER_PORT_TX_RX_LEADER":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxLeader(clus, false))
new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxLeader(clus, true))
new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER(clus, true))
case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, false))
new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, true))
new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus, true))
case "DELAY_PEER_PORT_TX_RX_QUORUM":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxQuorum(clus, false))
new_FailureCase_DELAY_PEER_PORT_TX_RX_QUORUM(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxQuorum(clus, true))
new_FailureCase_DELAY_PEER_PORT_TX_RX_QUORUM(clus, true))
case "DELAY_PEER_PORT_TX_RX_ALL":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxAll(clus, false))
new_FailureCase_DELAY_PEER_PORT_TX_RX_ALL(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_ALL":
clus.failures = append(clus.failures,
newFailureDelayPeerPortTxRxAll(clus, true))
new_FailureCase_DELAY_PEER_PORT_TX_RX_ALL(clus, true))
case "NO_FAIL_WITH_STRESS":
clus.failures = append(clus.failures,
newFailureNoFailWithStress(clus))
new_FailureCase_NO_FAIL_WITH_STRESS(clus))
case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
clus.failures = append(clus.failures,
newFailureNoFailWithNoStressForLiveness(clus))
new_FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS(clus))
case "EXTERNAL":
clus.failures = append(clus.failures,
newFailureExternal(clus.Tester.ExternalExecPath))
new_FailureCase_EXTERNAL(clus.Tester.ExternalExecPath))
case "FAILPOINTS":
fpFailures, fperr := failpointFailures(clus)
if len(fpFailures) == 0 {
@ -310,24 +310,24 @@ func (clus *Cluster) checkConsistency() (err error) {
return err
}
// Bootstrap bootstraps etcd cluster the very first time.
// Send_INITIAL_START_ETCD bootstraps etcd cluster the very first time.
// After this, just continue to call kill/restart.
func (clus *Cluster) Bootstrap() error {
func (clus *Cluster) Send_INITIAL_START_ETCD() error {
// this is the only time that creates request from scratch
return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd)
return clus.broadcast(rpcpb.Operation_INITIAL_START_ETCD)
}
// FailArchive sends "FailArchive" operation.
func (clus *Cluster) FailArchive() error {
return clus.broadcastOperation(rpcpb.Operation_FailArchive)
// send_SIGQUIT_ETCD_AND_ARCHIVE_DATA sends "send_SIGQUIT_ETCD_AND_ARCHIVE_DATA" operation.
func (clus *Cluster) send_SIGQUIT_ETCD_AND_ARCHIVE_DATA() error {
return clus.broadcast(rpcpb.Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA)
}
// Restart sends "Restart" operation.
func (clus *Cluster) Restart() error {
return clus.broadcastOperation(rpcpb.Operation_RestartEtcd)
// send_RESTART_ETCD sends restart operation.
func (clus *Cluster) send_RESTART_ETCD() error {
return clus.broadcast(rpcpb.Operation_RESTART_ETCD)
}
func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
func (clus *Cluster) broadcast(op rpcpb.Operation) error {
var wg sync.WaitGroup
wg.Add(len(clus.agentStreams))
@ -335,7 +335,7 @@ func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
for i := range clus.agentStreams {
go func(idx int, o rpcpb.Operation) {
defer wg.Done()
errc <- clus.sendOperation(idx, o)
errc <- clus.sendOp(idx, o)
}(i, op)
}
wg.Wait()
@ -349,7 +349,7 @@ func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
if err != nil {
destroyed := false
if op == rpcpb.Operation_DestroyEtcdAgent {
if op == rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT {
if err == io.EOF {
destroyed = true
}
@ -376,8 +376,8 @@ func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
return errors.New(strings.Join(errs, ", "))
}
func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
if op == rpcpb.Operation_InitialStartEtcd {
func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
if op == rpcpb.Operation_INITIAL_START_ETCD {
clus.agentRequests[idx] = &rpcpb.Request{
Operation: op,
Member: clus.Members[idx],
@ -436,7 +436,7 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
}
// store TLS assets from agents/servers onto disk
if secure && (op == rpcpb.Operation_InitialStartEtcd || op == rpcpb.Operation_RestartEtcd) {
if secure && (op == rpcpb.Operation_INITIAL_START_ETCD || op == rpcpb.Operation_RESTART_ETCD) {
dirClient := filepath.Join(
clus.Tester.DataDir,
clus.Members[idx].Etcd.Name,
@ -496,9 +496,9 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
return nil
}
// DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
func (clus *Cluster) DestroyEtcdAgents() {
err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
// Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT terminates all tester connections to agents and etcd servers.
func (clus *Cluster) Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() {
err := clus.broadcast(rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT)
if err != nil {
clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
} else {

View File

@ -316,7 +316,7 @@ func (clus *Cluster) failed() {
zap.Int("case", clus.cs),
zap.Int("case-total", len(clus.failures)),
)
clus.DestroyEtcdAgents()
clus.Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
os.Exit(2)
}
@ -341,7 +341,7 @@ func (clus *Cluster) cleanup() error {
)
clus.stresser.Close()
if err := clus.FailArchive(); err != nil {
if err := clus.send_SIGQUIT_ETCD_AND_ARCHIVE_DATA(); err != nil {
clus.lg.Warn(
"cleanup FAIL",
zap.Int("round", clus.rd),
@ -351,7 +351,7 @@ func (clus *Cluster) cleanup() error {
)
return err
}
if err := clus.Restart(); err != nil {
if err := clus.send_RESTART_ETCD(); err != nil {
clus.lg.Warn(
"restart FAIL",
zap.Int("round", clus.rd),

View File

@ -156,12 +156,12 @@ func Test_read(t *testing.T) {
FailureDelayMs: 7000,
FailureShuffle: true,
FailureCases: []string{
"KILL_ONE_FOLLOWER",
"KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
"KILL_LEADER",
"KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT",
"KILL_QUORUM",
"KILL_ALL",
"SIGTERM_ONE_FOLLOWER",
"SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
"SIGTERM_LEADER",
"SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT",
"SIGTERM_QUORUM",
"SIGTERM_ALL",
"BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER",
"BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
"BLACKHOLE_PEER_PORT_TX_RX_LEADER",

View File

@ -46,7 +46,7 @@ func (f *failureExternal) FailureCase() rpcpb.FailureCase {
return f.failureCase
}
func newFailureExternal(scriptPath string) Failure {
func new_FailureCase_EXTERNAL(scriptPath string) Failure {
return &failureExternal{
desc: fmt.Sprintf("external fault injector (script: %q)", scriptPath),
failureCase: rpcpb.FailureCase_EXTERNAL,

View File

@ -145,7 +145,7 @@ func makeRecoverFailpoint(fp string) recoverMemberFunc {
fpStats.mu.Lock()
fpStats.crashes[fp]++
fpStats.mu.Unlock()
return recoverKill(clus, idx)
return recover_SIGTERM_ETCD(clus, idx)
}
}

View File

@ -1,89 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import "github.com/coreos/etcd/functional/rpcpb"
func injectKill(clus *Cluster, idx int) error {
return clus.sendOperation(idx, rpcpb.Operation_KillEtcd)
}
func recoverKill(clus *Cluster, idx int) error {
return clus.sendOperation(idx, rpcpb.Operation_RestartEtcd)
}
func newFailureKillOneFollower(clus *Cluster) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_KILL_ONE_FOLLOWER,
injectMember: injectKill,
recoverMember: recoverKill,
}
f := &failureFollower{ff, -1, -1}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}
func newFailureKillLeader(clus *Cluster) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_KILL_LEADER,
injectMember: injectKill,
recoverMember: recoverKill,
}
f := &failureLeader{ff, -1, -1}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}
func newFailureKillQuorum(clus *Cluster) Failure {
f := &failureQuorum{
failureCase: rpcpb.FailureCase_KILL_QUORUM,
injectMember: injectKill,
recoverMember: recoverKill,
}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}
func newFailureKillAll(clus *Cluster) Failure {
f := &failureAll{
failureCase: rpcpb.FailureCase_KILL_ALL,
injectMember: injectKill,
recoverMember: recoverKill,
}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}
func newFailureKillOneFollowerUntilTriggerSnapshot(clus *Cluster) Failure {
return &failureUntilSnapshot{
failureCase: rpcpb.FailureCase_KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
Failure: newFailureKillOneFollower(clus),
}
}
func newFailureKillLeaderUntilTriggerSnapshot(clus *Cluster) Failure {
return &failureUntilSnapshot{
failureCase: rpcpb.FailureCase_KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT,
Failure: newFailureKillLeader(clus),
}
}

View File

@ -16,19 +16,19 @@ package tester
import "github.com/coreos/etcd/functional/rpcpb"
func injectBlackholePeerPortTxRx(clus *Cluster, idx int) error {
return clus.sendOperation(idx, rpcpb.Operation_BlackholePeerPortTxRx)
func inject_BLACKHOLE_PEER_PORT_TX_RX(clus *Cluster, idx int) error {
return clus.sendOp(idx, rpcpb.Operation_BLACKHOLE_PEER_PORT_TX_RX)
}
func recoverBlackholePeerPortTxRx(clus *Cluster, idx int) error {
return clus.sendOperation(idx, rpcpb.Operation_UnblackholePeerPortTxRx)
func recover_BLACKHOLE_PEER_PORT_TX_RX(clus *Cluster, idx int) error {
return clus.sendOp(idx, rpcpb.Operation_UNBLACKHOLE_PEER_PORT_TX_RX)
}
func newFailureBlackholePeerPortTxRxOneFollower(clus *Cluster) Failure {
func new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER(clus *Cluster) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER,
injectMember: injectBlackholePeerPortTxRx,
recoverMember: recoverBlackholePeerPortTxRx,
injectMember: inject_BLACKHOLE_PEER_PORT_TX_RX,
recoverMember: recover_BLACKHOLE_PEER_PORT_TX_RX,
}
f := &failureFollower{ff, -1, -1}
return &failureDelay{
@ -37,11 +37,11 @@ func newFailureBlackholePeerPortTxRxOneFollower(clus *Cluster) Failure {
}
}
func newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot() Failure {
func new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT() Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
injectMember: injectBlackholePeerPortTxRx,
recoverMember: recoverBlackholePeerPortTxRx,
injectMember: inject_BLACKHOLE_PEER_PORT_TX_RX,
recoverMember: recover_BLACKHOLE_PEER_PORT_TX_RX,
}
f := &failureFollower{ff, -1, -1}
return &failureUntilSnapshot{
@ -50,11 +50,11 @@ func newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot() Failure {
}
}
func newFailureBlackholePeerPortTxRxLeader(clus *Cluster) Failure {
func new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER(clus *Cluster) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER,
injectMember: injectBlackholePeerPortTxRx,
recoverMember: recoverBlackholePeerPortTxRx,
injectMember: inject_BLACKHOLE_PEER_PORT_TX_RX,
recoverMember: recover_BLACKHOLE_PEER_PORT_TX_RX,
}
f := &failureLeader{ff, -1, -1}
return &failureDelay{
@ -63,11 +63,11 @@ func newFailureBlackholePeerPortTxRxLeader(clus *Cluster) Failure {
}
}
func newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot() Failure {
func new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT() Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT,
injectMember: injectBlackholePeerPortTxRx,
recoverMember: recoverBlackholePeerPortTxRx,
injectMember: inject_BLACKHOLE_PEER_PORT_TX_RX,
recoverMember: recover_BLACKHOLE_PEER_PORT_TX_RX,
}
f := &failureLeader{ff, -1, -1}
return &failureUntilSnapshot{
@ -76,11 +76,11 @@ func newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot() Failure {
}
}
func newFailureBlackholePeerPortTxRxQuorum(clus *Cluster) Failure {
func new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_QUORUM(clus *Cluster) Failure {
f := &failureQuorum{
failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_QUORUM,
injectMember: injectBlackholePeerPortTxRx,
recoverMember: recoverBlackholePeerPortTxRx,
injectMember: inject_BLACKHOLE_PEER_PORT_TX_RX,
recoverMember: recover_BLACKHOLE_PEER_PORT_TX_RX,
}
return &failureDelay{
Failure: f,
@ -88,11 +88,11 @@ func newFailureBlackholePeerPortTxRxQuorum(clus *Cluster) Failure {
}
}
func newFailureBlackholePeerPortTxRxAll(clus *Cluster) Failure {
func new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ALL(clus *Cluster) Failure {
f := &failureAll{
failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ALL,
injectMember: injectBlackholePeerPortTxRx,
recoverMember: recoverBlackholePeerPortTxRx,
injectMember: inject_BLACKHOLE_PEER_PORT_TX_RX,
recoverMember: recover_BLACKHOLE_PEER_PORT_TX_RX,
}
return &failureDelay{
Failure: f,

View File

@ -30,27 +30,27 @@ const (
waitRecover = 5 * time.Second
)
func injectDelayPeerPortTxRx(clus *Cluster, idx int) error {
func inject_DELAY_PEER_PORT_TX_RX(clus *Cluster, idx int) error {
clus.lg.Info(
"injecting delay latency",
zap.Duration("latency", time.Duration(clus.Tester.UpdatedDelayLatencyMs)*time.Millisecond),
zap.Duration("latency-rv", time.Duration(clus.Tester.DelayLatencyMsRv)*time.Millisecond),
zap.String("endpoint", clus.Members[idx].EtcdClientEndpoint),
)
return clus.sendOperation(idx, rpcpb.Operation_DelayPeerPortTxRx)
return clus.sendOp(idx, rpcpb.Operation_DELAY_PEER_PORT_TX_RX)
}
func recoverDelayPeerPortTxRx(clus *Cluster, idx int) error {
err := clus.sendOperation(idx, rpcpb.Operation_UndelayPeerPortTxRx)
func recover_DELAY_PEER_PORT_TX_RX(clus *Cluster, idx int) error {
err := clus.sendOp(idx, rpcpb.Operation_UNDELAY_PEER_PORT_TX_RX)
time.Sleep(waitRecover)
return err
}
func newFailureDelayPeerPortTxRxOneFollower(clus *Cluster, random bool) Failure {
func new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus *Cluster, random bool) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
injectMember: inject_DELAY_PEER_PORT_TX_RX,
recoverMember: recover_DELAY_PEER_PORT_TX_RX,
}
clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
@ -64,11 +64,11 @@ func newFailureDelayPeerPortTxRxOneFollower(clus *Cluster, random bool) Failure
}
}
func newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus *Cluster, random bool) Failure {
func new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus *Cluster, random bool) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
injectMember: inject_DELAY_PEER_PORT_TX_RX,
recoverMember: recover_DELAY_PEER_PORT_TX_RX,
}
clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
@ -82,11 +82,11 @@ func newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus *Cluster, r
}
}
func newFailureDelayPeerPortTxRxLeader(clus *Cluster, random bool) Failure {
func new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER(clus *Cluster, random bool) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
injectMember: inject_DELAY_PEER_PORT_TX_RX,
recoverMember: recover_DELAY_PEER_PORT_TX_RX,
}
clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
@ -100,11 +100,11 @@ func newFailureDelayPeerPortTxRxLeader(clus *Cluster, random bool) Failure {
}
}
func newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus *Cluster, random bool) Failure {
func new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus *Cluster, random bool) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
injectMember: inject_DELAY_PEER_PORT_TX_RX,
recoverMember: recover_DELAY_PEER_PORT_TX_RX,
}
clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
@ -118,11 +118,11 @@ func newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus *Cluster, random
}
}
func newFailureDelayPeerPortTxRxQuorum(clus *Cluster, random bool) Failure {
func new_FailureCase_DELAY_PEER_PORT_TX_RX_QUORUM(clus *Cluster, random bool) Failure {
f := &failureQuorum{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_QUORUM,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
injectMember: inject_DELAY_PEER_PORT_TX_RX,
recoverMember: recover_DELAY_PEER_PORT_TX_RX,
}
clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
@ -135,11 +135,11 @@ func newFailureDelayPeerPortTxRxQuorum(clus *Cluster, random bool) Failure {
}
}
func newFailureDelayPeerPortTxRxAll(clus *Cluster, random bool) Failure {
func new_FailureCase_DELAY_PEER_PORT_TX_RX_ALL(clus *Cluster, random bool) Failure {
f := &failureAll{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ALL,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
injectMember: inject_DELAY_PEER_PORT_TX_RX,
recoverMember: recover_DELAY_PEER_PORT_TX_RX,
}
clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {

View File

@ -43,7 +43,7 @@ func (f *failureNoFailWithStress) FailureCase() rpcpb.FailureCase {
return f.failureCase
}
func newFailureNoFailWithStress(clus *Cluster) Failure {
func new_FailureCase_NO_FAIL_WITH_STRESS(clus *Cluster) Failure {
f := &failureNoFailWithStress{
failureCase: rpcpb.FailureCase_NO_FAIL_WITH_STRESS,
}
@ -88,7 +88,7 @@ func (f *failureNoFailWithNoStressForLiveness) FailureCase() rpcpb.FailureCase {
return f.failureCase
}
func newFailureNoFailWithNoStressForLiveness(clus *Cluster) Failure {
func new_FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS(clus *Cluster) Failure {
f := &failureNoFailWithNoStressForLiveness{
failureCase: rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS,
}

View File

@ -0,0 +1,89 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import "github.com/coreos/etcd/functional/rpcpb"
func inject_SIGTERM_ETCD(clus *Cluster, idx int) error {
return clus.sendOp(idx, rpcpb.Operation_SIGTERM_ETCD)
}
func recover_SIGTERM_ETCD(clus *Cluster, idx int) error {
return clus.sendOp(idx, rpcpb.Operation_RESTART_ETCD)
}
func new_FailureCase_SIGTERM_ONE_FOLLOWER(clus *Cluster) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_SIGTERM_ONE_FOLLOWER,
injectMember: inject_SIGTERM_ETCD,
recoverMember: recover_SIGTERM_ETCD,
}
f := &failureFollower{ff, -1, -1}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}
func new_FailureCase_SIGTERM_LEADER(clus *Cluster) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_SIGTERM_LEADER,
injectMember: inject_SIGTERM_ETCD,
recoverMember: recover_SIGTERM_ETCD,
}
f := &failureLeader{ff, -1, -1}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}
func new_FailureCase_SIGTERM_QUORUM(clus *Cluster) Failure {
f := &failureQuorum{
failureCase: rpcpb.FailureCase_SIGTERM_QUORUM,
injectMember: inject_SIGTERM_ETCD,
recoverMember: recover_SIGTERM_ETCD,
}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}
func new_FailureCase_SIGTERM_ALL(clus *Cluster) Failure {
f := &failureAll{
failureCase: rpcpb.FailureCase_SIGTERM_ALL,
injectMember: inject_SIGTERM_ETCD,
recoverMember: recover_SIGTERM_ETCD,
}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}
func new_FailureCase_SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus *Cluster) Failure {
return &failureUntilSnapshot{
failureCase: rpcpb.FailureCase_SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
Failure: new_FailureCase_SIGTERM_ONE_FOLLOWER(clus),
}
}
func new_FailureCase_SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus *Cluster) Failure {
return &failureUntilSnapshot{
failureCase: rpcpb.FailureCase_SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT,
Failure: new_FailureCase_SIGTERM_LEADER(clus),
}
}