diff --git a/tools/functional-tester/tester/checks.go b/tools/functional-tester/tester/checks.go index 0a5207fe9..9788e8649 100644 --- a/tools/functional-tester/tester/checks.go +++ b/tools/functional-tester/tester/checks.go @@ -104,6 +104,16 @@ type leaseChecker struct { } func (lc *leaseChecker) Check() error { + if lc.ls == nil { + return nil + } + if lc.ls != nil && + (lc.ls.revokedLeases == nil || + lc.ls.aliveLeases == nil || + lc.ls.shortLivedLeases == nil) { + return nil + } + cli, err := lc.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(time.Second)) if err != nil { return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint) @@ -114,6 +124,7 @@ func (lc *leaseChecker) Check() error { } }() lc.cli = cli + if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil { return err } diff --git a/tools/functional-tester/tester/cluster.go b/tools/functional-tester/tester/cluster.go index cc747a800..fd85d0c88 100644 --- a/tools/functional-tester/tester/cluster.go +++ b/tools/functional-tester/tester/cluster.go @@ -61,7 +61,6 @@ type Cluster struct { } func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) { - lg.Info("reading configuration file", zap.String("path", fpath)) bts, err := ioutil.ReadFile(fpath) if err != nil { return nil, err @@ -204,7 +203,6 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) { clus.failures = make([]Failure, 0) for i, ap := range clus.Members { - clus.lg.Info("connecting", zap.String("agent-address", ap.AgentAddr)) var err error clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...) if err != nil { @@ -213,7 +211,6 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) { clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i]) clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr)) - clus.lg.Info("creating stream", zap.String("agent-address", ap.AgentAddr)) clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background()) if err != nil { return nil, err @@ -240,7 +237,9 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) { rate.Limit(int(clus.Tester.StressQPS)), int(clus.Tester.StressQPS), ) + clus.updateStresserChecker() + return clus, nil } @@ -265,32 +264,48 @@ func (clus *Cluster) updateFailures() { switch cs { case "KILL_ONE_FOLLOWER": clus.failures = append(clus.failures, newFailureKillOneFollower()) + case "KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT": + clus.failures = append(clus.failures, newFailureKillOneFollowerUntilTriggerSnapshot()) case "KILL_LEADER": clus.failures = append(clus.failures, newFailureKillLeader()) - case "KILL_ONE_FOLLOWER_FOR_LONG": - clus.failures = append(clus.failures, newFailureKillOneFollowerForLongTime()) - case "KILL_LEADER_FOR_LONG": - clus.failures = append(clus.failures, newFailureKillLeaderForLongTime()) + case "KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT": + clus.failures = append(clus.failures, newFailureKillLeaderUntilTriggerSnapshot()) case "KILL_QUORUM": clus.failures = append(clus.failures, newFailureKillQuorum()) case "KILL_ALL": clus.failures = append(clus.failures, newFailureKillAll()) + case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER": clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower(clus)) + case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT": + clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot()) case "BLACKHOLE_PEER_PORT_TX_RX_LEADER": clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader(clus)) + case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT": + clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot()) + case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM": + clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxQuorum(clus)) case "BLACKHOLE_PEER_PORT_TX_RX_ALL": clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus)) + case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER": clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus)) + case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT": + clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot()) case "DELAY_PEER_PORT_TX_RX_LEADER": clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus)) + case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT": + clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot()) + case "DELAY_PEER_PORT_TX_RX_QUORUM": + clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus)) case "DELAY_PEER_PORT_TX_RX_ALL": clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus)) + case "NO_FAIL_WITH_STRESS": clus.failures = append(clus.failures, newFailureNoFailWithStress(clus)) case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS": clus.failures = append(clus.failures, newFailureNoFailWithNoStressForLiveness(clus)) + case "EXTERNAL": clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath)) case "FAILPOINTS": @@ -317,7 +332,6 @@ func (clus *Cluster) shuffleFailures() { n := len(clus.failures) cp := coprime(n) - clus.lg.Info("shuffling test failure cases", zap.Int("total", n)) fs := make([]Failure, n) for i := 0; i < n; i++ { fs[i] = clus.failures[(cp*i+offset)%n] @@ -355,12 +369,6 @@ func gcd(x, y int) int { } func (clus *Cluster) updateStresserChecker() { - clus.lg.Info( - "updating stressers", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - ) - cs := &compositeStresser{} for _, m := range clus.Members { cs.stressers = append(cs.stressers, newStresser(clus, m)) @@ -397,21 +405,17 @@ func (clus *Cluster) checkConsistency() (err error) { } }() - clus.lg.Info( - "checking consistency and invariant of cluster", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - zap.String("desc", clus.failures[clus.cs].Desc()), - ) if err = clus.checker.Check(); err != nil { clus.lg.Warn( - "checker.Check failed", + "consistency check FAIL", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), zap.Error(err), ) return err } clus.lg.Info( - "checked consistency and invariant of cluster", + "consistency check ALL PASS", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", clus.failures[clus.cs].Desc()), @@ -468,11 +472,6 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { clus.agentRequests[idx].Operation = op } - clus.lg.Info( - "sending request", - zap.String("operation", op.String()), - zap.String("to", clus.Members[idx].EtcdClientEndpoint), - ) err := clus.agentStreams[idx].Send(clus.agentRequests[idx]) clus.lg.Info( "sent request", @@ -484,11 +483,6 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { return err } - clus.lg.Info( - "receiving response", - zap.String("operation", op.String()), - zap.String("from", clus.Members[idx].EtcdClientEndpoint), - ) resp, err := clus.agentStreams[idx].Recv() if resp != nil { clus.lg.Info( @@ -519,22 +513,19 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { // DestroyEtcdAgents terminates all tester connections to agents and etcd servers. func (clus *Cluster) DestroyEtcdAgents() { - clus.lg.Info("destroying etcd servers and agents") err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent) if err != nil { - clus.lg.Warn("failed to destroy etcd servers and agents", zap.Error(err)) + clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err)) } else { - clus.lg.Info("destroyed etcd servers and agents") + clus.lg.Info("destroying etcd/agents PASS") } for i, conn := range clus.agentConns { - clus.lg.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr)) err := conn.Close() clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err)) } if clus.testerHTTPServer != nil { - clus.lg.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr)) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) err := clus.testerHTTPServer.Shutdown(ctx) cancel() @@ -552,14 +543,9 @@ func (clus *Cluster) WaitHealth() error { // reasonable workload (https://github.com/coreos/etcd/issues/2698) for i := 0; i < 60; i++ { for _, m := range clus.Members { - clus.lg.Info( - "writing health key", - zap.Int("retries", i), - zap.String("endpoint", m.EtcdClientEndpoint), - ) if err = m.WriteHealthKey(); err != nil { clus.lg.Warn( - "writing health key failed", + "health check FAIL", zap.Int("retries", i), zap.String("endpoint", m.EtcdClientEndpoint), zap.Error(err), @@ -567,15 +553,16 @@ func (clus *Cluster) WaitHealth() error { break } clus.lg.Info( - "wrote health key", + "health check PASS", zap.Int("retries", i), zap.String("endpoint", m.EtcdClientEndpoint), ) } if err == nil { clus.lg.Info( - "writing health key success on all members", - zap.Int("retries", i), + "health check ALL PASS", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), ) return nil } @@ -639,7 +626,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { for i, m := range clus.Members { clus.lg.Info( - "compacting", + "compact START", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), zap.Duration("timeout", timeout), @@ -657,7 +644,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { ) } else { clus.lg.Warn( - "compact failed", + "compact FAIL", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), zap.Error(cerr), @@ -669,7 +656,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { if succeed { clus.lg.Info( - "compacted", + "compact PASS", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), zap.Duration("timeout", timeout), @@ -693,24 +680,22 @@ func (clus *Cluster) checkCompact(rev int64) error { } func (clus *Cluster) defrag() error { - clus.lg.Info( - "defragmenting", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - ) for _, m := range clus.Members { if err := m.Defrag(); err != nil { clus.lg.Warn( - "defrag failed", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), + "defrag FAIL", + zap.String("endpoint", m.EtcdClientEndpoint), zap.Error(err), ) return err } + clus.lg.Info( + "defrag PASS", + zap.String("endpoint", m.EtcdClientEndpoint), + ) } clus.lg.Info( - "defragmented", + "defrag ALL PASS", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) diff --git a/tools/functional-tester/tester/cluster_test.go b/tools/functional-tester/tester/cluster_test.go index e0627453e..d8d0f8d1e 100644 --- a/tools/functional-tester/tester/cluster_test.go +++ b/tools/functional-tester/tester/cluster_test.go @@ -112,24 +112,30 @@ func Test_newCluster(t *testing.T) { Tester: &rpcpb.Tester{ TesterNetwork: "tcp", TesterAddr: "127.0.0.1:9028", - DelayLatencyMs: 500, - DelayLatencyMsRv: 50, + DelayLatencyMs: 5000, + DelayLatencyMsRv: 150, RoundLimit: 1, ExitOnFailure: true, ConsistencyCheck: true, EnablePprof: true, FailureCases: []string{ "KILL_ONE_FOLLOWER", + "KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT", "KILL_LEADER", - "KILL_ONE_FOLLOWER_FOR_LONG", - "KILL_LEADER_FOR_LONG", + "KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT", "KILL_QUORUM", "KILL_ALL", "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER", + "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT", "BLACKHOLE_PEER_PORT_TX_RX_LEADER", + "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT", + "BLACKHOLE_PEER_PORT_TX_RX_QUORUM", "BLACKHOLE_PEER_PORT_TX_RX_ALL", "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER", + "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT", "DELAY_PEER_PORT_TX_RX_LEADER", + "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT", + "DELAY_PEER_PORT_TX_RX_QUORUM", "DELAY_PEER_PORT_TX_RX_ALL", "NO_FAIL_WITH_STRESS", "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS", @@ -146,7 +152,7 @@ func Test_newCluster(t *testing.T) { StressKeySuffixRangeTxn: 100, StressKeyTxnOps: 10, StressClients: 100, - StressQPS: 1000, + StressQPS: 2000, }, } diff --git a/tools/functional-tester/tester/cluster_tester.go b/tools/functional-tester/tester/cluster_tester.go index 466a268a6..95a5b29fa 100644 --- a/tools/functional-tester/tester/cluster_tester.go +++ b/tools/functional-tester/tester/cluster_tester.go @@ -39,7 +39,7 @@ func (clus *Cluster) StartTester() { if err := clus.doRound(); err != nil { clus.lg.Warn( - "doRound failed; returning", + "round FAIL", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Error(err), @@ -62,21 +62,21 @@ func (clus *Cluster) StartTester() { timeout := 10 * time.Second timeout += time.Duration(modifiedKey/compactQPS) * time.Second clus.lg.Info( - "compacting", + "compact START", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Duration("timeout", timeout), ) if err := clus.compact(revToCompact, timeout); err != nil { clus.lg.Warn( - "compact failed", + "compact FAIL", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Error(err), ) if err = clus.cleanup(); err != nil { clus.lg.Warn( - "cleanup failed", + "cleanup FAIL", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Error(err), @@ -88,12 +88,6 @@ func (clus *Cluster) StartTester() { } if round > 0 && round%500 == 0 { // every 500 rounds if err := clus.defrag(); err != nil { - clus.lg.Warn( - "defrag failed; returning", - zap.Int("round", clus.rd), - zap.Int("case", clus.cs), - zap.Error(err), - ) clus.failed() return } @@ -101,7 +95,7 @@ func (clus *Cluster) StartTester() { } clus.lg.Info( - "functional-tester passed", + "functional-tester PASS", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) @@ -112,18 +106,20 @@ func (clus *Cluster) doRound() error { clus.shuffleFailures() } + roundNow := time.Now() clus.lg.Info( - "starting round", + "round START", zap.Int("round", clus.rd), zap.Strings("failures", clus.failureStrings()), ) - for i, fa := range clus.failures { clus.cs = i caseTotalCounter.WithLabelValues(fa.Desc()).Inc() + + caseNow := time.Now() clus.lg.Info( - "failure case START", + "case START", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", fa.Desc()), @@ -138,7 +134,7 @@ func (clus *Cluster) doRound() error { fcase := fa.FailureCase() if fcase != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS { clus.lg.Info( - "starting stressers before injecting failures", + "stresser START", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", fa.Desc()), @@ -150,7 +146,7 @@ func (clus *Cluster) doRound() error { } clus.lg.Info( - "injecting", + "inject START", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", fa.Desc()), @@ -163,7 +159,7 @@ func (clus *Cluster) doRound() error { // with stressing client ports // TODO: use unix for local tests clus.lg.Info( - "recovering", + "recover START", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", fa.Desc()), @@ -173,13 +169,13 @@ func (clus *Cluster) doRound() error { } if stressStarted { - clus.lg.Info("pausing stresser after failure recovery, before wait health") + clus.lg.Info("stresser PAUSE") ems := clus.stresser.Pause() if fcase == rpcpb.FailureCase_NO_FAIL_WITH_STRESS && len(ems) > 0 { ess := make([]string, 0, len(ems)) cnt := 0 for k, v := range ems { - ess = append(ess, fmt.Sprintf("%s (count %d)", k, v)) + ess = append(ess, fmt.Sprintf("%s (count: %d)", k, v)) cnt += v } clus.lg.Warn( @@ -187,34 +183,40 @@ func (clus *Cluster) doRound() error { zap.String("desc", fa.Desc()), zap.Strings("errors", ess), ) - return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess) + + // with network delay, some ongoing requests may fail + // only return error, if more than 10% of QPS requests fail + if cnt > int(clus.Tester.StressQPS)/10 { + return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess) + } } } - clus.lg.Info("wait health after recover") + clus.lg.Info("health check START") if err := clus.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) } - clus.lg.Info("check consistency after recover") + clus.lg.Info("consistency check START") if err := clus.checkConsistency(); err != nil { - return fmt.Errorf("tt.checkConsistency error (%v)", err) + return fmt.Errorf("consistency check error (%v)", err) } clus.lg.Info( - "failure case PASS", + "case PASS", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", fa.Desc()), + zap.Duration("took", time.Since(caseNow)), ) } clus.lg.Info( - "finished round", + "round ALL PASS", zap.Int("round", clus.rd), zap.Strings("failures", clus.failureStrings()), + zap.Duration("took", time.Since(roundNow)), ) - return nil } @@ -233,28 +235,9 @@ func (clus *Cluster) updateRevision() error { } func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { - clus.lg.Info( - "compacting storage", - zap.Int64("current-revision", clus.currentRevision), - zap.Int64("compact-revision", rev), - ) if err = clus.compactKV(rev, timeout); err != nil { - return err - } - clus.lg.Info( - "compacted storage", - zap.Int64("current-revision", clus.currentRevision), - zap.Int64("compact-revision", rev), - ) - - clus.lg.Info( - "checking compaction", - zap.Int64("current-revision", clus.currentRevision), - zap.Int64("compact-revision", rev), - ) - if err = clus.checkCompact(rev); err != nil { clus.lg.Warn( - "checkCompact failed", + "compact FAIL", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), zap.Error(err), @@ -262,7 +245,22 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { return err } clus.lg.Info( - "confirmed compaction", + "compact DONE", + zap.Int64("current-revision", clus.currentRevision), + zap.Int64("compact-revision", rev), + ) + + if err = clus.checkCompact(rev); err != nil { + clus.lg.Warn( + "check compact FAIL", + zap.Int64("current-revision", clus.currentRevision), + zap.Int64("compact-revision", rev), + zap.Error(err), + ) + return err + } + clus.lg.Info( + "check compact DONE", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), ) @@ -276,7 +274,7 @@ func (clus *Cluster) failed() { } clus.lg.Info( - "exiting on failure", + "functional-tester FAIL", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) @@ -303,7 +301,7 @@ func (clus *Cluster) cleanup() error { if err := clus.FailArchive(); err != nil { clus.lg.Warn( - "cleanup failed", + "cleanup FAIL", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Error(err), @@ -312,7 +310,7 @@ func (clus *Cluster) cleanup() error { } if err := clus.Restart(); err != nil { clus.lg.Warn( - "restart failed", + "restart FAIL", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Error(err), diff --git a/tools/functional-tester/tester/failure.go b/tools/functional-tester/tester/failure.go index f9c205512..30218f38c 100644 --- a/tools/functional-tester/tester/failure.go +++ b/tools/functional-tester/tester/failure.go @@ -20,6 +20,8 @@ import ( "time" "github.com/coreos/etcd/tools/functional-tester/rpcpb" + + "go.uber.org/zap" ) // Failure defines failure injection interface. @@ -43,15 +45,15 @@ type injectMemberFunc func(*Cluster, int) error type recoverMemberFunc func(*Cluster, int) error type failureByFunc struct { - desc + desc string failureCase rpcpb.FailureCase injectMember injectMemberFunc recoverMember recoverMemberFunc } func (f *failureByFunc) Desc() string { - if string(f.desc) != "" { - return string(f.desc) + if f.desc != "" { + return f.desc } return f.failureCase.String() } @@ -100,8 +102,8 @@ func (f *failureFollower) Recover(clus *Cluster) error { } func (f *failureFollower) Desc() string { - if string(f.desc) != "" { - return string(f.desc) + if f.desc != "" { + return f.desc } return f.failureCase.String() } @@ -162,8 +164,8 @@ func (f *failureQuorum) Recover(clus *Cluster) error { } func (f *failureQuorum) Desc() string { - if string(f.desc) != "" { - return string(f.desc) + if f.desc != "" { + return f.desc } return f.failureCase.String() } @@ -172,6 +174,18 @@ func (f *failureQuorum) FailureCase() rpcpb.FailureCase { return f.failureCase } +func killMap(size int, seed int) map[int]bool { + m := make(map[int]bool) + r := rand.New(rand.NewSource(int64(seed))) + majority := size/2 + 1 + for { + m[r.Intn(size)] = true + if len(m) >= majority { + return m + } + } +} + type failureAll failureByFunc func (f *failureAll) Inject(clus *Cluster) error { @@ -193,8 +207,8 @@ func (f *failureAll) Recover(clus *Cluster) error { } func (f *failureAll) Desc() string { - if string(f.desc) != "" { - return string(f.desc) + if f.desc != "" { + return f.desc } return f.failureCase.String() } @@ -205,13 +219,15 @@ func (f *failureAll) FailureCase() rpcpb.FailureCase { // failureUntilSnapshot injects a failure and waits for a snapshot event type failureUntilSnapshot struct { - desc desc + desc string failureCase rpcpb.FailureCase - Failure } -const snapshotCount = 10000 +var slowCases = map[rpcpb.FailureCase]bool{ + rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true, + rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true, +} func (f *failureUntilSnapshot) Inject(clus *Cluster) error { if err := f.Failure.Inject(clus); err != nil { @@ -220,6 +236,18 @@ func (f *failureUntilSnapshot) Inject(clus *Cluster) error { if len(clus.Members) < 3 { return nil } + + snapshotCount := clus.Members[0].Etcd.SnapshotCount + + now := time.Now() + clus.lg.Info( + "trigger snapshot START", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", f.Desc()), + zap.Int64("etcd-snapshot-count", snapshotCount), + ) + // maxRev may fail since failure just injected, retry if failed. startRev, err := clus.maxRev() for i := 0; i < 10 && startRev == 0; i++ { @@ -229,44 +257,59 @@ func (f *failureUntilSnapshot) Inject(clus *Cluster) error { return err } lastRev := startRev - // Normal healthy cluster could accept 1000req/s at least. - // Give it 3-times time to create a new snapshot. - retry := snapshotCount / 1000 * 3 - for j := 0; j < retry; j++ { + + // healthy cluster could accept 1000 req/sec at least. + // 3x time to trigger snapshot. + retries := int(snapshotCount) / 1000 * 3 + if v, ok := slowCases[f.FailureCase()]; v && ok { + // slow network takes more retries + retries *= 4 + } + + for i := 0; i < retries; i++ { lastRev, _ = clus.maxRev() // If the number of proposals committed is bigger than snapshot count, // a new snapshot should have been created. - if lastRev-startRev > snapshotCount { + diff := lastRev - startRev + if diff > snapshotCount { + clus.lg.Info( + "trigger snapshot PASS", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Int("retries", i), + zap.String("desc", f.Desc()), + zap.Int64("committed-entries", diff), + zap.Int64("etcd-snapshot-count", snapshotCount), + zap.Int64("last-revision", lastRev), + zap.Duration("took", time.Since(now)), + ) return nil } + + clus.lg.Info( + "trigger snapshot PROGRESS", + zap.Int("retries", i), + zap.Int64("committed-entries", diff), + zap.Int64("etcd-snapshot-count", snapshotCount), + zap.Int64("last-revision", lastRev), + zap.Duration("took", time.Since(now)), + ) time.Sleep(time.Second) } - return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry) + + return fmt.Errorf("cluster too slow: only %d commits in %d retries", lastRev-startRev, retries) } func (f *failureUntilSnapshot) Desc() string { - if f.desc.Desc() != "" { - return f.desc.Desc() + if f.desc != "" { + return f.desc } - return f.failureCase.String() + if f.failureCase.String() != "" { + return f.failureCase.String() + } + return f.Failure.Desc() } func (f *failureUntilSnapshot) FailureCase() rpcpb.FailureCase { return f.failureCase } - -func killMap(size int, seed int) map[int]bool { - m := make(map[int]bool) - r := rand.New(rand.NewSource(int64(seed))) - majority := size/2 + 1 - for { - m[r.Intn(size)] = true - if len(m) >= majority { - return m - } - } -} - -type desc string - -func (d desc) Desc() string { return string(d) } diff --git a/tools/functional-tester/tester/failure_case_failpoints.go b/tools/functional-tester/tester/failure_case_failpoints.go index c55e34577..c32e419b0 100644 --- a/tools/functional-tester/tester/failure_case_failpoints.go +++ b/tools/functional-tester/tester/failure_case_failpoints.go @@ -51,7 +51,7 @@ func failpointFailures(clus *Cluster) (ret []Failure, err error) { if strings.Contains(fp, "Snap") { // hack to trigger snapshot failpoints fpFails[i] = &failureUntilSnapshot{ - desc: desc(fpf.Desc()), + desc: fpf.Desc(), failureCase: rpcpb.FailureCase_FAILPOINTS, Failure: fpf, } @@ -95,7 +95,7 @@ func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure) fs = append(fs, []Failure{ &failureFollower{ failureByFunc: failureByFunc{ - desc: desc(fmt.Sprintf("failpoint %q (one: %q)", fp, fcmd)), + desc: fmt.Sprintf("failpoint %q (one: %q)", fp, fcmd), failureCase: rpcpb.FailureCase_FAILPOINTS, injectMember: inject, recoverMember: recov, @@ -105,7 +105,7 @@ func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure) }, &failureLeader{ failureByFunc: failureByFunc{ - desc: desc(fmt.Sprintf("failpoint %q (leader: %q)", fp, fcmd)), + desc: fmt.Sprintf("failpoint %q (leader: %q)", fp, fcmd), failureCase: rpcpb.FailureCase_FAILPOINTS, injectMember: inject, recoverMember: recov, @@ -114,13 +114,13 @@ func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure) lead: -1, }, &failureQuorum{ - desc: desc(fmt.Sprintf("failpoint %q (quorum: %q)", fp, fcmd)), + desc: fmt.Sprintf("failpoint %q (quorum: %q)", fp, fcmd), failureCase: rpcpb.FailureCase_FAILPOINTS, injectMember: inject, recoverMember: recov, }, &failureAll{ - desc: desc(fmt.Sprintf("failpoint %q (all: %q)", fp, fcmd)), + desc: fmt.Sprintf("failpoint %q (all: %q)", fp, fcmd), failureCase: rpcpb.FailureCase_FAILPOINTS, injectMember: inject, recoverMember: recov, diff --git a/tools/functional-tester/tester/failure_case_kill.go b/tools/functional-tester/tester/failure_case_kill.go index 9e88efe03..3af1f9da9 100644 --- a/tools/functional-tester/tester/failure_case_kill.go +++ b/tools/functional-tester/tester/failure_case_kill.go @@ -58,16 +58,16 @@ func newFailureKillAll() Failure { } } -func newFailureKillOneFollowerForLongTime() Failure { +func newFailureKillOneFollowerUntilTriggerSnapshot() Failure { return &failureUntilSnapshot{ - failureCase: rpcpb.FailureCase_KILL_ONE_FOLLOWER_FOR_LONG, + failureCase: rpcpb.FailureCase_KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT, Failure: newFailureKillOneFollower(), } } -func newFailureKillLeaderForLongTime() Failure { +func newFailureKillLeaderUntilTriggerSnapshot() Failure { return &failureUntilSnapshot{ - failureCase: rpcpb.FailureCase_KILL_LEADER_FOR_LONG, + failureCase: rpcpb.FailureCase_KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT, Failure: newFailureKillLeader(), } } diff --git a/tools/functional-tester/tester/failure_case_network_blackhole.go b/tools/functional-tester/tester/failure_case_network_blackhole.go index 0f7581280..469d201bb 100644 --- a/tools/functional-tester/tester/failure_case_network_blackhole.go +++ b/tools/functional-tester/tester/failure_case_network_blackhole.go @@ -14,9 +14,7 @@ package tester -import ( - "github.com/coreos/etcd/tools/functional-tester/rpcpb" -) +import "github.com/coreos/etcd/tools/functional-tester/rpcpb" func injectBlackholePeerPortTxRx(clus *Cluster, idx int) error { return clus.sendOperation(idx, rpcpb.Operation_BlackholePeerPortTxRx) @@ -39,6 +37,19 @@ func newFailureBlackholePeerPortTxRxOneFollower(clus *Cluster) Failure { } } +func newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot() Failure { + ff := failureByFunc{ + failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT, + injectMember: injectBlackholePeerPortTxRx, + recoverMember: recoverBlackholePeerPortTxRx, + } + f := &failureFollower{ff, -1, -1} + return &failureUntilSnapshot{ + failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT, + Failure: f, + } +} + func newFailureBlackholePeerPortTxRxLeader(clus *Cluster) Failure { ff := failureByFunc{ failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER, @@ -52,6 +63,31 @@ func newFailureBlackholePeerPortTxRxLeader(clus *Cluster) Failure { } } +func newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot() Failure { + ff := failureByFunc{ + failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT, + injectMember: injectBlackholePeerPortTxRx, + recoverMember: recoverBlackholePeerPortTxRx, + } + f := &failureLeader{ff, -1, -1} + return &failureUntilSnapshot{ + failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT, + Failure: f, + } +} + +func newFailureBlackholePeerPortTxRxQuorum(clus *Cluster) Failure { + f := &failureQuorum{ + failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_QUORUM, + injectMember: injectBlackholePeerPortTxRx, + recoverMember: recoverBlackholePeerPortTxRx, + } + return &failureDelay{ + Failure: f, + delayDuration: clus.GetFailureDelayDuration(), + } +} + func newFailureBlackholePeerPortTxRxAll(clus *Cluster) Failure { f := &failureAll{ failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ALL, diff --git a/tools/functional-tester/tester/failure_case_network_slow.go b/tools/functional-tester/tester/failure_case_network_delay.go similarity index 65% rename from tools/functional-tester/tester/failure_case_network_slow.go rename to tools/functional-tester/tester/failure_case_network_delay.go index 274ba6383..4a3161e2c 100644 --- a/tools/functional-tester/tester/failure_case_network_slow.go +++ b/tools/functional-tester/tester/failure_case_network_delay.go @@ -51,6 +51,19 @@ func newFailureDelayPeerPortTxRxOneFollower(clus *Cluster) Failure { } } +func newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot() Failure { + ff := failureByFunc{ + failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT, + injectMember: injectDelayPeerPortTxRx, + recoverMember: recoverDelayPeerPortTxRx, + } + f := &failureFollower{ff, -1, -1} + return &failureUntilSnapshot{ + failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT, + Failure: f, + } +} + func newFailureDelayPeerPortTxRxLeader(clus *Cluster) Failure { ff := failureByFunc{ failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER, @@ -64,6 +77,31 @@ func newFailureDelayPeerPortTxRxLeader(clus *Cluster) Failure { } } +func newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot() Failure { + ff := failureByFunc{ + failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT, + injectMember: injectDelayPeerPortTxRx, + recoverMember: recoverDelayPeerPortTxRx, + } + f := &failureLeader{ff, -1, -1} + return &failureUntilSnapshot{ + failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT, + Failure: f, + } +} + +func newFailureDelayPeerPortTxRxQuorum(clus *Cluster) Failure { + f := &failureQuorum{ + failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_QUORUM, + injectMember: injectDelayPeerPortTxRx, + recoverMember: recoverDelayPeerPortTxRx, + } + return &failureDelay{ + Failure: f, + delayDuration: clus.GetFailureDelayDuration(), + } +} + func newFailureDelayPeerPortTxRxAll(clus *Cluster) Failure { f := &failureAll{ failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ALL, diff --git a/tools/functional-tester/tester/failure_case_no_fail.go b/tools/functional-tester/tester/failure_case_no_fail.go index 221929960..235d8bbc6 100644 --- a/tools/functional-tester/tester/failure_case_no_fail.go +++ b/tools/functional-tester/tester/failure_case_no_fail.go @@ -33,8 +33,8 @@ func (f *failureNoFailWithStress) Recover(clus *Cluster) error { } func (f *failureNoFailWithStress) Desc() string { - if f.desc.Desc() != "" { - return f.desc.Desc() + if f.desc != "" { + return f.desc } return f.failureCase.String() } @@ -78,8 +78,8 @@ func (f *failureNoFailWithNoStressForLiveness) Recover(clus *Cluster) error { } func (f *failureNoFailWithNoStressForLiveness) Desc() string { - if f.desc.Desc() != "" { - return f.desc.Desc() + if f.desc != "" { + return f.desc } return f.failureCase.String() } diff --git a/tools/functional-tester/tester/local-test.yaml b/tools/functional-tester/tester/local-test.yaml index f5068c236..381474eb5 100644 --- a/tools/functional-tester/tester/local-test.yaml +++ b/tools/functional-tester/tester/local-test.yaml @@ -76,8 +76,9 @@ tester-config: tester-network: tcp tester-addr: 127.0.0.1:9028 - delay-latency-ms: 500 - delay-latency-ms-rv: 50 + # slow enough to trigger election + delay-latency-ms: 5000 + delay-latency-ms-rv: 150 round-limit: 1 exit-on-failure: true @@ -86,16 +87,22 @@ tester-config: failure-cases: - KILL_ONE_FOLLOWER + - KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT - KILL_LEADER - - KILL_ONE_FOLLOWER_FOR_LONG - - KILL_LEADER_FOR_LONG + - KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT - KILL_QUORUM - KILL_ALL - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER + - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT - BLACKHOLE_PEER_PORT_TX_RX_LEADER + - BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT + - BLACKHOLE_PEER_PORT_TX_RX_QUORUM - BLACKHOLE_PEER_PORT_TX_RX_ALL - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER + - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT - DELAY_PEER_PORT_TX_RX_LEADER + - DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT + - DELAY_PEER_PORT_TX_RX_QUORUM - DELAY_PEER_PORT_TX_RX_ALL - NO_FAIL_WITH_STRESS - NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS @@ -125,4 +132,4 @@ tester-config: stress-key-txn-ops: 10 stress-clients: 100 - stress-qps: 1000 + stress-qps: 2000 diff --git a/tools/functional-tester/tester/stress.go b/tools/functional-tester/tester/stress.go index 9a0b07b8b..281e20d9e 100644 --- a/tools/functional-tester/tester/stress.go +++ b/tools/functional-tester/tester/stress.go @@ -41,7 +41,11 @@ type Stresser interface { func newStresser(clus *Cluster, m *rpcpb.Member) Stresser { stressers := make([]Stresser, len(clus.Tester.StressTypes)) for i, stype := range clus.Tester.StressTypes { - clus.lg.Info("creating stresser", zap.String("type", stype)) + clus.lg.Info( + "creating stresser", + zap.String("type", stype), + zap.String("endpoint", m.EtcdClientEndpoint), + ) switch stype { case "KV": diff --git a/tools/functional-tester/tester/stress_key.go b/tools/functional-tester/tester/stress_key.go index 7b2c62bd4..e8883c145 100644 --- a/tools/functional-tester/tester/stress_key.go +++ b/tools/functional-tester/tester/stress_key.go @@ -102,7 +102,7 @@ func (s *keyStresser) Stress() error { } s.lg.Info( - "key stresser started in background", + "key stresser START", zap.String("endpoint", s.m.EtcdClientEndpoint), ) return nil @@ -181,16 +181,16 @@ func (s *keyStresser) Close() map[string]int { s.cli.Close() s.wg.Wait() - s.lg.Info( - "key stresser is closed", - zap.String("endpoint", s.m.EtcdClientEndpoint), - ) - s.emu.Lock() s.paused = true ess := s.ems s.ems = make(map[string]int, 100) s.emu.Unlock() + + s.lg.Info( + "key stresser STOP", + zap.String("endpoint", s.m.EtcdClientEndpoint), + ) return ess } diff --git a/tools/functional-tester/tester/stress_lease.go b/tools/functional-tester/tester/stress_lease.go index 5d7050fe7..5257213d9 100644 --- a/tools/functional-tester/tester/stress_lease.go +++ b/tools/functional-tester/tester/stress_lease.go @@ -121,7 +121,7 @@ func (ls *leaseStresser) setupOnce() error { func (ls *leaseStresser) Stress() error { ls.lg.Info( - "lease stresser is started", + "lease stresser START", zap.String("endpoint", ls.m.EtcdClientEndpoint), ) @@ -452,16 +452,12 @@ func (ls *leaseStresser) Pause() map[string]int { } func (ls *leaseStresser) Close() map[string]int { - ls.lg.Info( - "lease stresser is closing", - zap.String("endpoint", ls.m.EtcdClientEndpoint), - ) ls.cancel() ls.runWg.Wait() ls.aliveWg.Wait() ls.cli.Close() ls.lg.Info( - "lease stresser is closed", + "lease stresser STOP", zap.String("endpoint", ls.m.EtcdClientEndpoint), ) return nil