From 0a0f0ae719d3e980e0c0ef297f897c6350e1416a Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 31 Oct 2022 22:42:25 +0000 Subject: [PATCH 1/7] raft/rafttest: add test for replication pausing This commit adds a data-driven test which simulates conditions under which Raft messages flow to a particular node is throttled while in StateReplicate. The test demonstrates that MsgApp messages with non-empty Entries may "leak" to a paused stream every time there is successful heartbeat exchange. Signed-off-by: Pavel Kalinnikov --- raft/rafttest/interaction_env.go | 12 +- .../interaction_env_handler_add_nodes.go | 21 +- raft/testdata/replicate_pause.txt | 190 ++++++++++++++++++ 3 files changed, 208 insertions(+), 15 deletions(-) create mode 100644 raft/testdata/replicate_pause.txt diff --git a/raft/rafttest/interaction_env.go b/raft/rafttest/interaction_env.go index 4a6adc5a5..75c223837 100644 --- a/raft/rafttest/interaction_env.go +++ b/raft/rafttest/interaction_env.go @@ -84,15 +84,13 @@ type Storage interface { Append([]pb.Entry) error } -// defaultRaftConfig sets up a *raft.Config with reasonable testing defaults. -// In particular, no limits are set. -func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config { - return &raft.Config{ - ID: id, - Applied: applied, +// raftConfigStub sets up a raft.Config stub with reasonable testing defaults. +// In particular, no limits are set. It is not a complete config: ID and Storage +// must be set for each node using the stub as a template. +func raftConfigStub() raft.Config { + return raft.Config{ ElectionTick: 3, HeartbeatTick: 1, - Storage: s, MaxSizePerMsg: math.MaxUint64, MaxInflightMsgs: math.MaxInt32, } diff --git a/raft/rafttest/interaction_env_handler_add_nodes.go b/raft/rafttest/interaction_env_handler_add_nodes.go index 517477ef4..b72c96505 100644 --- a/raft/rafttest/interaction_env_handler_add_nodes.go +++ b/raft/rafttest/interaction_env_handler_add_nodes.go @@ -28,6 +28,7 @@ import ( func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error { n := firstAsInt(t, d) var snap pb.Snapshot + cfg := raftConfigStub() for _, arg := range d.CmdArgs[1:] { for i := range arg.Vals { switch arg.Key { @@ -39,14 +40,17 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e var id uint64 arg.Scan(t, i, &id) snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id) + case "inflight": + arg.Scan(t, i, &cfg.MaxInflightMsgs) case "index": arg.Scan(t, i, &snap.Metadata.Index) + cfg.Applied = snap.Metadata.Index case "content": arg.Scan(t, i, &snap.Data) } } } - return env.AddNodes(n, snap) + return env.AddNodes(n, cfg, snap) } type snapOverrideStorage struct { @@ -63,9 +67,9 @@ func (s snapOverrideStorage) Snapshot() (pb.Snapshot, error) { var _ raft.Storage = snapOverrideStorage{} -// AddNodes adds n new nodes initializes from the given snapshot (which may be -// empty). They will be assigned consecutive IDs. -func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { +// AddNodes adds n new nodes initialized from the given snapshot (which may be +// empty), and using the cfg as template. They will be assigned consecutive IDs. +func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) error { bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{}) for i := 0; i < n; i++ { id := uint64(1 + len(env.Nodes)) @@ -103,9 +107,10 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { return fmt.Errorf("failed to establish first index %d; got %d", exp, fi) } } - cfg := defaultRaftConfig(id, snap.Metadata.Index, s) + cfg := cfg // fork the config stub + cfg.ID, cfg.Storage = id, s if env.Options.OnConfig != nil { - env.Options.OnConfig(cfg) + env.Options.OnConfig(&cfg) if cfg.ID != id { // This could be supported but then we need to do more work // translating back and forth -- not worth it. @@ -117,7 +122,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { } cfg.Logger = env.Output - rn, err := raft.NewRawNode(cfg) + rn, err := raft.NewRawNode(&cfg) if err != nil { return err } @@ -127,7 +132,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { // TODO(tbg): allow a more general Storage, as long as it also allows // us to apply snapshots, append entries, and update the HardState. Storage: s, - Config: cfg, + Config: &cfg, History: []pb.Snapshot{snap}, } env.Nodes = append(env.Nodes, node) diff --git a/raft/testdata/replicate_pause.txt b/raft/testdata/replicate_pause.txt new file mode 100644 index 000000000..f6e30dc76 --- /dev/null +++ b/raft/testdata/replicate_pause.txt @@ -0,0 +1,190 @@ +# This test ensures that MsgApp stream to a follower is paused when the +# in-flight state exceeds the configured limits. This is a regression test for +# the issue fixed by https://github.com/etcd-io/etcd/pull/14633. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with 3 nodes, with a limited in-flight capacity. +add-nodes 3 voters=(1,2,3) index=10 inflight=3 +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok (quiet) + +# Propose 3 entries. +propose 1 prop_1_12 +---- +ok + +propose 1 prop_1_13 +---- +ok + +propose 1 prop_1_14 +---- +ok + +# Store entries and send proposals. +process-ready 1 +---- +ok (quiet) + +# Re-enable log messages. +log-level debug +---- +ok + +# Expect that in-flight tracking to nodes 2 and 3 is saturated. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=11 next=15 paused inflight=3[full] +3: StateReplicate match=11 next=15 paused inflight=3[full] + +log-level none +---- +ok + +# Commit entries between nodes 1 and 2. +stabilize 1 2 +---- +ok (quiet) + +log-level debug +---- +ok + +# Expect that the entries are committed and stored on nodes 1 and 2. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=14 next=15 +3: StateReplicate match=11 next=15 paused inflight=3[full] + +# Drop append messages to node 3. +deliver-msgs drop=3 +---- +dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"] +dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"] +dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"] + + +# Repeat committing 3 entries. +propose 1 prop_1_15 +---- +ok + +propose 1 prop_1_16 +---- +ok + +propose 1 prop_1_17 +---- +ok + +# In-flight tracking to nodes 2 and 3 is saturated, but node 3 is behind. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=14 next=18 paused inflight=3[full] +3: StateReplicate match=11 next=15 paused inflight=3[full] + +log-level none +---- +ok + +# Commit entries between nodes 1 and 2 again. +stabilize 1 2 +---- +ok (quiet) + +log-level debug +---- +ok + +# Expect that the entries are committed and stored only on nodes 1 and 2. +status 1 +---- +1: StateReplicate match=17 next=18 +2: StateReplicate match=17 next=18 +3: StateReplicate match=11 next=15 paused inflight=3[full] + +# Make a heartbeat roundtrip. +tick-heartbeat 1 +---- +ok + +stabilize 1 +---- +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17 + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 + +stabilize 2 3 +---- +> 2 receiving messages + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17 +> 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgHeartbeatResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + +# After handling heartbeat responses, node 1 sends a new replication MsgApp to a +# throttled node 3 which hasn't yet replied to a single MsgApp. +# TODO(pavelkalinnikov): this should not happen, send empty MsgApp instead. +stabilize 1 +---- +> 1 receiving messages + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] + +# Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. +stabilize 3 +---- +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] + DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) + +log-level none +---- +ok + +stabilize +---- +ok (quiet) + +log-level debug +---- +ok + +# Eventually all nodes catch up on the committed state. +status 1 +---- +1: StateReplicate match=17 next=18 +2: StateReplicate match=17 next=18 +3: StateReplicate match=17 next=18 \ No newline at end of file From 5619953f332dd43ac6dab447a19a902ddddefb2f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 28 Oct 2022 18:54:16 +0100 Subject: [PATCH 2/7] raft: elaborate checks in flow control tests Signed-off-by: Pavel Kalinnikov --- raft/raft_flow_control_test.go | 43 ++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 5430568c3..f0cb4aa2f 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -36,14 +36,14 @@ func TestMsgAppFlowControlFull(t *testing.T) { for i := 0; i < r.prs.MaxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() - if len(ms) != 1 { - t.Fatalf("#%d: len(ms) = %d, want 1", i, len(ms)) + if len(ms) != 1 || ms[0].Type != pb.MsgApp { + t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", i, len(ms)) } } // ensure 1 - if !pr2.Inflights.Full() { - t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true) + if !pr2.IsPaused() { + t.Fatal("paused = false, want true") } // ensure 2 @@ -84,20 +84,20 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { // fill in the inflights window again r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() - if len(ms) != 1 { - t.Fatalf("#%d: len(ms) = %d, want 1", tt, len(ms)) + if len(ms) != 1 || ms[0].Type != pb.MsgApp { + t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", tt, len(ms)) } // ensure 1 - if !pr2.Inflights.Full() { - t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true) + if !pr2.IsPaused() { + t.Fatalf("#%d: paused = false, want true", tt) } // ensure 2 for i := 0; i < tt; i++ { r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)}) - if !pr2.Inflights.Full() { - t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true) + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) } } } @@ -120,32 +120,35 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { } for tt := 1; tt < 5; tt++ { - if !pr2.Inflights.Full() { - t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true) + if !pr2.IsPaused() { + t.Fatalf("#%d: paused = false, want true", tt) } // recv tt msgHeartbeatResp and expect one free slot for i := 0; i < tt; i++ { r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - r.readMessages() - if pr2.Inflights.Full() { - t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.Inflights.Full(), false) + ms := r.readMessages() + if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 { + t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms)) + } + if pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = true, want false", tt, i) } } // one slot r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() - if len(ms) != 1 { - t.Fatalf("#%d: free slot = 0, want 1", tt) + if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 1 { + t.Fatalf("#%d: len(ms) == %d, want 1 MsgApp with 1 entry", tt, len(ms)) } // and just one slot for i := 0; i < 10; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - ms1 := r.readMessages() - if len(ms1) != 0 { - t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms1)) + ms := r.readMessages() + if len(ms) != 0 { + t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms)) } } From d5ac7b833f81a4a1cbad5951a620e73a30f9c830 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 1 Nov 2022 21:45:10 +0000 Subject: [PATCH 3/7] raft: cleanup maybeSendAppend method - avoid large indented blocks, leave the main block unindented - declare pb.Message inlined in the sending call Signed-off-by: Pavel Kalinnikov --- raft/raft.go | 50 +++++++++++++++++++++++++++----------------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index f9f38155d..aca9d7ff9 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -437,8 +437,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { if pr.IsPaused() { return false } - m := pb.Message{} - m.To = to term, errt := r.raftLog.term(pr.Next - 1) ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) @@ -452,7 +450,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return false } - m.Type = pb.MsgSnap snapshot, err := r.raftLog.snapshot() if err != nil { if err == ErrSnapshotTemporarilyUnavailable { @@ -464,33 +461,40 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { if IsEmptySnap(snapshot) { panic("need non-empty snapshot") } - m.Snapshot = snapshot sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) pr.BecomeSnapshot(sindex) r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) - } else { - m.Type = pb.MsgApp - m.Index = pr.Next - 1 - m.LogTerm = term - m.Entries = ents - m.Commit = r.raftLog.committed - if n := len(m.Entries); n != 0 { - switch pr.State { - // optimistically increase the next when in StateReplicate - case tracker.StateReplicate: - last := m.Entries[n-1].Index - pr.OptimisticUpdate(last) - pr.Inflights.Add(last) - case tracker.StateProbe: - pr.ProbeSent = true - default: - r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) - } + + r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: snapshot}) + return true + } + + // Send the actual MsgApp otherwise, and update the progress accordingly. + // TODO(pavelkalinnikov): factor out the Progress update to a method + next := pr.Next // save Next for later, as the progress update can change it + if n := len(ents); n != 0 { + switch pr.State { + // optimistically increase the next when in StateReplicate + case tracker.StateReplicate: + last := ents[n-1].Index + pr.OptimisticUpdate(last) + pr.Inflights.Add(last) + case tracker.StateProbe: + pr.ProbeSent = true + default: + r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) } } - r.send(m) + r.send(pb.Message{ + To: to, + Type: pb.MsgApp, + Index: next - 1, + LogTerm: term, + Entries: ents, + Commit: r.raftLog.committed, + }) return true } From 3bc3d2071e1c4d822eeb3e07613caf7fed04e1c1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 1 Nov 2022 21:52:26 +0000 Subject: [PATCH 4/7] raft: extract Progress update on MsgApp to a method Previously, Progress update on MsgApp send was scattered across raft.go and tracker/progress.go. This commit better encapsulates this logic in the Progress type. Signed-off-by: Pavel Kalinnikov --- raft/raft.go | 15 ++------------- raft/tracker/progress.go | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index aca9d7ff9..1c8995e6b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -472,20 +472,9 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { } // Send the actual MsgApp otherwise, and update the progress accordingly. - // TODO(pavelkalinnikov): factor out the Progress update to a method next := pr.Next // save Next for later, as the progress update can change it - if n := len(ents); n != 0 { - switch pr.State { - // optimistically increase the next when in StateReplicate - case tracker.StateReplicate: - last := ents[n-1].Index - pr.OptimisticUpdate(last) - pr.Inflights.Add(last) - case tracker.StateProbe: - pr.ProbeSent = true - default: - r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) - } + if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil { + r.logger.Panicf("%x: %v", r.id, err) } r.send(pb.Message{ To: to, diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index e37e4b63f..7f1bb6260 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -137,6 +137,29 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { pr.PendingSnapshot = snapshoti } +// UpdateOnEntriesSend updates the progress on the given number of consecutive +// entries being sent in a MsgApp, appended at and after the given log index. +func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { + switch pr.State { + case StateReplicate: + if entries > 0 { + last := nextIndex + uint64(entries) - 1 + pr.OptimisticUpdate(last) + pr.Inflights.Add(last) + } + case StateProbe: + // TODO(pavelkalinnikov): this condition captures the previous behaviour, + // but we should set ProbeSent unconditionally for simplicity, because any + // MsgApp in StateProbe is a probe, not only non-empty ones. + if entries > 0 { + pr.ProbeSent = true + } + default: + return fmt.Errorf("sending append in unhandled state %s", pr.State) + } + return nil +} + // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true. From 4969aa81ae72c68b0e92d71a467fd7c5214e8c18 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 26 Oct 2022 23:23:34 +0100 Subject: [PATCH 5/7] raft: send empty appends when replication is paused When Inflights to a particular node is full, i.e. MaxInflightMsgs for the append messages flow is saturated, it is still necessary to continue sending MsgApp to ensure progress. Currently this is achieved by "forgetting" the first in-flight message in the window, which frees up quota for one new MsgApp. This new message is constructed in such a way that it potentially has multiple entries, or a large entry. The effect of this is that the in-flight limitations can be exceeded arbitrarily, for as long as the flow to this node continues being saturated. In particular, if a follower is stuck, the leader will keep sending entries to it. This commit makes the MsgApp empty when Inflights is saturated, and prevents the described leakage of Entries to slow followers. Signed-off-by: Pavel Kalinnikov --- raft/raft.go | 21 ++++++++++++++++----- raft/raft_flow_control_test.go | 23 ++++++++--------------- raft/raft_test.go | 4 ++++ raft/testdata/replicate_pause.txt | 10 +++++----- raft/tracker/progress.go | 24 ++++++++++++------------ raft/tracker/progress_test.go | 2 +- 6 files changed, 46 insertions(+), 38 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 1c8995e6b..f2f898138 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -439,7 +439,18 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { } term, errt := r.raftLog.term(pr.Next - 1) - ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) + var ents []pb.Entry + var erre error + // In a throttled StateReplicate only send empty MsgApp, to ensure progress. + // Otherwise, if we had a full Inflights and all inflight messages were in + // fact dropped, replication to that follower would stall. Instead, an empty + // MsgApp will eventually reach the follower (heartbeats responses prompt the + // leader to send an append), allowing it to be acked or rejected, both of + // which will clear out Inflights. + if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { + ents, erre = r.raftLog.entries(pr.Next, r.maxMsgSize) + } + if len(ents) == 0 && !sendIfEmpty { return false } @@ -1295,10 +1306,10 @@ func stepLeader(r *raft, m pb.Message) error { pr.RecentActive = true pr.ProbeSent = false - // free one slot for the full inflights window to allow progress. - if pr.State == tracker.StateReplicate && pr.Inflights.Full() { - pr.Inflights.FreeFirstOne() - } + // NB: if the follower is paused (full Inflights), this will still send an + // empty append, allowing it to recover from situations in which all the + // messages that filled up Inflights in the first place were dropped. Note + // also that the outgoing heartbeat already communicated the commit index. if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) } diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index f0cb4aa2f..29dff843f 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -120,31 +120,24 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { } for tt := 1; tt < 5; tt++ { - if !pr2.IsPaused() { - t.Fatalf("#%d: paused = false, want true", tt) - } - // recv tt msgHeartbeatResp and expect one free slot for i := 0; i < tt; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } + // Unpauses the progress, sends an empty MsgApp, and pauses it again. r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) ms := r.readMessages() if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 { t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms)) } - if pr2.IsPaused() { - t.Fatalf("#%d.%d: paused = true, want false", tt, i) - } } - // one slot - r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - ms := r.readMessages() - if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 1 { - t.Fatalf("#%d: len(ms) == %d, want 1 MsgApp with 1 entry", tt, len(ms)) - } - - // and just one slot + // No more appends are sent if there are no heartbeats. for i := 0; i < 10; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() if len(ms) != 0 { diff --git a/raft/raft_test.go b/raft/raft_test.go index 9ae1ccd58..66cfe72ff 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -102,6 +102,10 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { } r.prs.Progress[2].BecomeReplicate() + if r.prs.Progress[2].ProbeSent { + t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) + } + r.prs.Progress[2].ProbeSent = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) if r.prs.Progress[2].ProbeSent { t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) diff --git a/raft/testdata/replicate_pause.txt b/raft/testdata/replicate_pause.txt index f6e30dc76..e7333cccb 100644 --- a/raft/testdata/replicate_pause.txt +++ b/raft/testdata/replicate_pause.txt @@ -146,9 +146,9 @@ stabilize 2 3 Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 -# After handling heartbeat responses, node 1 sends a new replication MsgApp to a -# throttled node 3 which hasn't yet replied to a single MsgApp. -# TODO(pavelkalinnikov): this should not happen, send empty MsgApp instead. +# After handling heartbeat responses, node 1 sends an empty MsgApp to a +# throttled node 3 because it hasn't yet replied to a single MsgApp, and the +# in-flight tracker is still saturated. stabilize 1 ---- > 1 receiving messages @@ -157,13 +157,13 @@ stabilize 1 > 1 handling Ready Ready MustSync=false: Messages: - 1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] + 1->3 MsgApp Term:1 Log:1/14 Commit:17 # Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. stabilize 3 ---- > 3 receiving messages - 1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] + 1->3 MsgApp Term:1 Log:1/14 Commit:17 DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 > 3 handling Ready Ready MustSync=false: diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index 7f1bb6260..fae565b9f 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -55,9 +55,13 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // ProbeSent is used while this follower is in StateProbe. When ProbeSent is - // true, raft should pause sending replication message to this peer until - // ProbeSent is reset. See ProbeAcked() and IsPaused(). + // ProbeSent is true when a "probe" MsgApp was sent to this follower recently, + // and we haven't heard from it back yet. Used when the MsgApp flow is + // throttled, i.e. when State is StateProbe, or StateReplicate with saturated + // Inflights. In both cases, we need to continue sending MsgApp once in a + // while to guarantee progress, but we only do so when ProbeSent is false (it + // is reset on receiving a heartbeat response), to not overflow the receiver. + // See IsPaused(). ProbeSent bool // Inflights is a sliding window for the inflight messages. @@ -101,13 +105,6 @@ func min(a, b uint64) uint64 { return a } -// ProbeAcked is called when this peer has accepted an append. It resets -// ProbeSent to signal that additional append messages should be sent without -// further delay. -func (pr *Progress) ProbeAcked() { - pr.ProbeSent = false -} - // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, // optionally and if larger, the index of the pending snapshot. func (pr *Progress) BecomeProbe() { @@ -147,6 +144,9 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { pr.OptimisticUpdate(last) pr.Inflights.Add(last) } + // If this message overflows the in-flights tracker, or it was already full, + // consider this message being a probe, so that the flow is paused. + pr.ProbeSent = pr.Inflights.Full() case StateProbe: // TODO(pavelkalinnikov): this condition captures the previous behaviour, // but we should set ProbeSent unconditionally for simplicity, because any @@ -168,7 +168,7 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { if pr.Match < n { pr.Match = n updated = true - pr.ProbeAcked() + pr.ProbeSent = false } pr.Next = max(pr.Next, n+1) return updated @@ -225,7 +225,7 @@ func (pr *Progress) IsPaused() bool { case StateProbe: return pr.ProbeSent case StateReplicate: - return pr.Inflights.Full() + return pr.ProbeSent case StateSnapshot: return true default: diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index 6eb582f04..5fbc1e7cd 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -47,7 +47,7 @@ func TestProgressIsPaused(t *testing.T) { {StateProbe, false, false}, {StateProbe, true, true}, {StateReplicate, false, false}, - {StateReplicate, true, false}, + {StateReplicate, true, true}, {StateSnapshot, false, true}, {StateSnapshot, true, true}, } From 467114ed87c1387ead1138ddfdc32e0e99b9f541 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 27 Oct 2022 00:22:07 +0100 Subject: [PATCH 6/7] raft/tracker: remove unused Inflights.FreeFirstOne Signed-off-by: Pavel Kalinnikov --- raft/tracker/inflights.go | 4 ---- raft/tracker/inflights_test.go | 35 ++++++++++++++-------------------- 2 files changed, 14 insertions(+), 25 deletions(-) diff --git a/raft/tracker/inflights.go b/raft/tracker/inflights.go index 1a056341a..242d1cab1 100644 --- a/raft/tracker/inflights.go +++ b/raft/tracker/inflights.go @@ -113,10 +113,6 @@ func (in *Inflights) FreeLE(to uint64) { } } -// FreeFirstOne releases the first inflight. This is a no-op if nothing is -// inflight. -func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) } - // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { return in.count == in.size diff --git a/raft/tracker/inflights_test.go b/raft/tracker/inflights_test.go index 582a373ba..0a56ced34 100644 --- a/raft/tracker/inflights_test.go +++ b/raft/tracker/inflights_test.go @@ -105,6 +105,20 @@ func TestInflightFreeTo(t *testing.T) { in.Add(uint64(i)) } + in.FreeLE(0) + + wantIn0 := &Inflights{ + start: 1, + count: 9, + size: 10, + // ↓------------------------ + buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + } + + if !reflect.DeepEqual(in, wantIn0) { + t.Fatalf("in = %+v, want %+v", in, wantIn0) + } + in.FreeLE(4) wantIn := &Inflights{ @@ -166,24 +180,3 @@ func TestInflightFreeTo(t *testing.T) { t.Fatalf("in = %+v, want %+v", in, wantIn4) } } - -func TestInflightFreeFirstOne(t *testing.T) { - in := NewInflights(10) - for i := 0; i < 10; i++ { - in.Add(uint64(i)) - } - - in.FreeFirstOne() - - wantIn := &Inflights{ - start: 1, - count: 9, - size: 10, - // ↓------------------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - } - - if !reflect.DeepEqual(in, wantIn) { - t.Fatalf("in = %+v, want %+v", in, wantIn) - } -} From 1ea13494ebbf18c8f26656697dc641cdae69c6a2 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 8 Nov 2022 13:01:11 +0000 Subject: [PATCH 7/7] raft/tracker: rename and comment MsgApp paused field Make the field name and comment clearer on the fact that it's used both in StateProbe and StateReplicate. The old name ProbeSent was slightly confusing, and also triggered thinking that it's used only in StateProbe. Signed-off-by: Pavel Kalinnikov --- raft/raft.go | 4 ++-- raft/raft_snap_test.go | 8 ++++---- raft/raft_test.go | 28 +++++++++++++------------- raft/tracker/progress.go | 33 +++++++++++++++--------------- raft/tracker/progress_test.go | 38 +++++++++++++++++------------------ 5 files changed, 55 insertions(+), 56 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index f2f898138..d7603f101 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1304,7 +1304,7 @@ func stepLeader(r *raft, m pb.Message) error { } case pb.MsgHeartbeatResp: pr.RecentActive = true - pr.ProbeSent = false + pr.MsgAppFlowPaused = false // NB: if the follower is paused (full Inflights), this will still send an // empty append, allowing it to recover from situations in which all the @@ -1349,7 +1349,7 @@ func stepLeader(r *raft, m pb.Message) error { // If snapshot finish, wait for the MsgAppResp from the remote node before sending // out the next MsgApp. // If snapshot failure, wait for a heartbeat interval before next try - pr.ProbeSent = true + pr.MsgAppFlowPaused = true case pb.MsgUnreachable: // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgApp is lost. diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 6b2afeebd..f8ed07eba 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) { if sm.prs.Progress[2].Next != 1 { t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next) } - if !sm.prs.Progress[2].ProbeSent { - t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent) + if !sm.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused) } } @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) { if sm.prs.Progress[2].Next != 12 { t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next) } - if !sm.prs.Progress[2].ProbeSent { - t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent) + if !sm.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index 66cfe72ff..a04e60bfe 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -94,21 +94,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.prs.Progress[2].ProbeSent = true + r.prs.Progress[2].MsgAppFlowPaused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) + if !r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused) } r.prs.Progress[2].BecomeReplicate() - if r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) + if r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused) } - r.prs.Progress[2].ProbeSent = true + r.prs.Progress[2].MsgAppFlowPaused = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) + if r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused) } } @@ -2658,8 +2658,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) + if !r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2673,8 +2673,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) + if !r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused) } // consume the heartbeat @@ -2696,8 +2696,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) + if !r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused) } } diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index fae565b9f..c6272d22d 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -55,14 +55,13 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // ProbeSent is true when a "probe" MsgApp was sent to this follower recently, - // and we haven't heard from it back yet. Used when the MsgApp flow is - // throttled, i.e. when State is StateProbe, or StateReplicate with saturated - // Inflights. In both cases, we need to continue sending MsgApp once in a - // while to guarantee progress, but we only do so when ProbeSent is false (it - // is reset on receiving a heartbeat response), to not overflow the receiver. - // See IsPaused(). - ProbeSent bool + // MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This + // happens in StateProbe, or StateReplicate with saturated Inflights. In both + // cases, we need to continue sending MsgApp once in a while to guarantee + // progress, but we only do so when MsgAppFlowPaused is false (it is reset on + // receiving a heartbeat response), to not overflow the receiver. See + // IsPaused(). + MsgAppFlowPaused bool // Inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries. @@ -82,10 +81,10 @@ type Progress struct { IsLearner bool } -// ResetState moves the Progress into the specified State, resetting ProbeSent, +// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, // PendingSnapshot, and Inflights. func (pr *Progress) ResetState(state StateType) { - pr.ProbeSent = false + pr.MsgAppFlowPaused = false pr.PendingSnapshot = 0 pr.State = state pr.Inflights.reset() @@ -146,13 +145,13 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. - pr.ProbeSent = pr.Inflights.Full() + pr.MsgAppFlowPaused = pr.Inflights.Full() case StateProbe: // TODO(pavelkalinnikov): this condition captures the previous behaviour, - // but we should set ProbeSent unconditionally for simplicity, because any + // but we should set MsgAppFlowPaused unconditionally for simplicity, because any // MsgApp in StateProbe is a probe, not only non-empty ones. if entries > 0 { - pr.ProbeSent = true + pr.MsgAppFlowPaused = true } default: return fmt.Errorf("sending append in unhandled state %s", pr.State) @@ -168,7 +167,7 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { if pr.Match < n { pr.Match = n updated = true - pr.ProbeSent = false + pr.MsgAppFlowPaused = false } pr.Next = max(pr.Next, n+1) return updated @@ -210,7 +209,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { } pr.Next = max(min(rejected, matchHint+1), 1) - pr.ProbeSent = false + pr.MsgAppFlowPaused = false return true } @@ -223,9 +222,9 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { func (pr *Progress) IsPaused() bool { switch pr.State { case StateProbe: - return pr.ProbeSent + return pr.MsgAppFlowPaused case StateReplicate: - return pr.ProbeSent + return pr.MsgAppFlowPaused case StateSnapshot: return true default: diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index 5fbc1e7cd..154366249 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -22,14 +22,14 @@ func TestProgressString(t *testing.T) { ins := NewInflights(1) ins.Add(123) pr := &Progress{ - Match: 1, - Next: 2, - State: StateSnapshot, - PendingSnapshot: 123, - RecentActive: false, - ProbeSent: true, - IsLearner: true, - Inflights: ins, + Match: 1, + Next: 2, + State: StateSnapshot, + PendingSnapshot: 123, + RecentActive: false, + MsgAppFlowPaused: true, + IsLearner: true, + Inflights: ins, } const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]` if act := pr.String(); act != exp { @@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, - ProbeSent: tt.paused, - Inflights: NewInflights(256), + State: tt.state, + MsgAppFlowPaused: tt.paused, + Inflights: NewInflights(256), } if g := p.IsPaused(); g != tt.w { t.Errorf("#%d: paused= %t, want %t", i, g, tt.w) @@ -64,20 +64,20 @@ func TestProgressIsPaused(t *testing.T) { } // TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset -// ProbeSent. +// MsgAppFlowPaused. func TestProgressResume(t *testing.T) { p := &Progress{ - Next: 2, - ProbeSent: true, + Next: 2, + MsgAppFlowPaused: true, } p.MaybeDecrTo(1, 1) - if p.ProbeSent { - t.Errorf("paused= %v, want false", p.ProbeSent) + if p.MsgAppFlowPaused { + t.Errorf("paused= %v, want false", p.MsgAppFlowPaused) } - p.ProbeSent = true + p.MsgAppFlowPaused = true p.MaybeUpdate(2) - if p.ProbeSent { - t.Errorf("paused= %v, want false", p.ProbeSent) + if p.MsgAppFlowPaused { + t.Errorf("paused= %v, want false", p.MsgAppFlowPaused) } }