From 0a0f0ae719d3e980e0c0ef297f897c6350e1416a Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 31 Oct 2022 22:42:25 +0000 Subject: [PATCH] raft/rafttest: add test for replication pausing This commit adds a data-driven test which simulates conditions under which Raft messages flow to a particular node is throttled while in StateReplicate. The test demonstrates that MsgApp messages with non-empty Entries may "leak" to a paused stream every time there is successful heartbeat exchange. Signed-off-by: Pavel Kalinnikov --- raft/rafttest/interaction_env.go | 12 +- .../interaction_env_handler_add_nodes.go | 21 +- raft/testdata/replicate_pause.txt | 190 ++++++++++++++++++ 3 files changed, 208 insertions(+), 15 deletions(-) create mode 100644 raft/testdata/replicate_pause.txt diff --git a/raft/rafttest/interaction_env.go b/raft/rafttest/interaction_env.go index 4a6adc5a5..75c223837 100644 --- a/raft/rafttest/interaction_env.go +++ b/raft/rafttest/interaction_env.go @@ -84,15 +84,13 @@ type Storage interface { Append([]pb.Entry) error } -// defaultRaftConfig sets up a *raft.Config with reasonable testing defaults. -// In particular, no limits are set. -func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config { - return &raft.Config{ - ID: id, - Applied: applied, +// raftConfigStub sets up a raft.Config stub with reasonable testing defaults. +// In particular, no limits are set. It is not a complete config: ID and Storage +// must be set for each node using the stub as a template. +func raftConfigStub() raft.Config { + return raft.Config{ ElectionTick: 3, HeartbeatTick: 1, - Storage: s, MaxSizePerMsg: math.MaxUint64, MaxInflightMsgs: math.MaxInt32, } diff --git a/raft/rafttest/interaction_env_handler_add_nodes.go b/raft/rafttest/interaction_env_handler_add_nodes.go index 517477ef4..b72c96505 100644 --- a/raft/rafttest/interaction_env_handler_add_nodes.go +++ b/raft/rafttest/interaction_env_handler_add_nodes.go @@ -28,6 +28,7 @@ import ( func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error { n := firstAsInt(t, d) var snap pb.Snapshot + cfg := raftConfigStub() for _, arg := range d.CmdArgs[1:] { for i := range arg.Vals { switch arg.Key { @@ -39,14 +40,17 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e var id uint64 arg.Scan(t, i, &id) snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id) + case "inflight": + arg.Scan(t, i, &cfg.MaxInflightMsgs) case "index": arg.Scan(t, i, &snap.Metadata.Index) + cfg.Applied = snap.Metadata.Index case "content": arg.Scan(t, i, &snap.Data) } } } - return env.AddNodes(n, snap) + return env.AddNodes(n, cfg, snap) } type snapOverrideStorage struct { @@ -63,9 +67,9 @@ func (s snapOverrideStorage) Snapshot() (pb.Snapshot, error) { var _ raft.Storage = snapOverrideStorage{} -// AddNodes adds n new nodes initializes from the given snapshot (which may be -// empty). They will be assigned consecutive IDs. -func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { +// AddNodes adds n new nodes initialized from the given snapshot (which may be +// empty), and using the cfg as template. They will be assigned consecutive IDs. +func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) error { bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{}) for i := 0; i < n; i++ { id := uint64(1 + len(env.Nodes)) @@ -103,9 +107,10 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { return fmt.Errorf("failed to establish first index %d; got %d", exp, fi) } } - cfg := defaultRaftConfig(id, snap.Metadata.Index, s) + cfg := cfg // fork the config stub + cfg.ID, cfg.Storage = id, s if env.Options.OnConfig != nil { - env.Options.OnConfig(cfg) + env.Options.OnConfig(&cfg) if cfg.ID != id { // This could be supported but then we need to do more work // translating back and forth -- not worth it. @@ -117,7 +122,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { } cfg.Logger = env.Output - rn, err := raft.NewRawNode(cfg) + rn, err := raft.NewRawNode(&cfg) if err != nil { return err } @@ -127,7 +132,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { // TODO(tbg): allow a more general Storage, as long as it also allows // us to apply snapshots, append entries, and update the HardState. Storage: s, - Config: cfg, + Config: &cfg, History: []pb.Snapshot{snap}, } env.Nodes = append(env.Nodes, node) diff --git a/raft/testdata/replicate_pause.txt b/raft/testdata/replicate_pause.txt new file mode 100644 index 000000000..f6e30dc76 --- /dev/null +++ b/raft/testdata/replicate_pause.txt @@ -0,0 +1,190 @@ +# This test ensures that MsgApp stream to a follower is paused when the +# in-flight state exceeds the configured limits. This is a regression test for +# the issue fixed by https://github.com/etcd-io/etcd/pull/14633. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with 3 nodes, with a limited in-flight capacity. +add-nodes 3 voters=(1,2,3) index=10 inflight=3 +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok (quiet) + +# Propose 3 entries. +propose 1 prop_1_12 +---- +ok + +propose 1 prop_1_13 +---- +ok + +propose 1 prop_1_14 +---- +ok + +# Store entries and send proposals. +process-ready 1 +---- +ok (quiet) + +# Re-enable log messages. +log-level debug +---- +ok + +# Expect that in-flight tracking to nodes 2 and 3 is saturated. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=11 next=15 paused inflight=3[full] +3: StateReplicate match=11 next=15 paused inflight=3[full] + +log-level none +---- +ok + +# Commit entries between nodes 1 and 2. +stabilize 1 2 +---- +ok (quiet) + +log-level debug +---- +ok + +# Expect that the entries are committed and stored on nodes 1 and 2. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=14 next=15 +3: StateReplicate match=11 next=15 paused inflight=3[full] + +# Drop append messages to node 3. +deliver-msgs drop=3 +---- +dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"] +dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"] +dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"] + + +# Repeat committing 3 entries. +propose 1 prop_1_15 +---- +ok + +propose 1 prop_1_16 +---- +ok + +propose 1 prop_1_17 +---- +ok + +# In-flight tracking to nodes 2 and 3 is saturated, but node 3 is behind. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=14 next=18 paused inflight=3[full] +3: StateReplicate match=11 next=15 paused inflight=3[full] + +log-level none +---- +ok + +# Commit entries between nodes 1 and 2 again. +stabilize 1 2 +---- +ok (quiet) + +log-level debug +---- +ok + +# Expect that the entries are committed and stored only on nodes 1 and 2. +status 1 +---- +1: StateReplicate match=17 next=18 +2: StateReplicate match=17 next=18 +3: StateReplicate match=11 next=15 paused inflight=3[full] + +# Make a heartbeat roundtrip. +tick-heartbeat 1 +---- +ok + +stabilize 1 +---- +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17 + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 + +stabilize 2 3 +---- +> 2 receiving messages + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17 +> 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgHeartbeatResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + +# After handling heartbeat responses, node 1 sends a new replication MsgApp to a +# throttled node 3 which hasn't yet replied to a single MsgApp. +# TODO(pavelkalinnikov): this should not happen, send empty MsgApp instead. +stabilize 1 +---- +> 1 receiving messages + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] + +# Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. +stabilize 3 +---- +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] + DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) + +log-level none +---- +ok + +stabilize +---- +ok (quiet) + +log-level debug +---- +ok + +# Eventually all nodes catch up on the committed state. +status 1 +---- +1: StateReplicate match=17 next=18 +2: StateReplicate match=17 next=18 +3: StateReplicate match=17 next=18 \ No newline at end of file