From 4969aa81ae72c68b0e92d71a467fd7c5214e8c18 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 26 Oct 2022 23:23:34 +0100 Subject: [PATCH] raft: send empty appends when replication is paused When Inflights to a particular node is full, i.e. MaxInflightMsgs for the append messages flow is saturated, it is still necessary to continue sending MsgApp to ensure progress. Currently this is achieved by "forgetting" the first in-flight message in the window, which frees up quota for one new MsgApp. This new message is constructed in such a way that it potentially has multiple entries, or a large entry. The effect of this is that the in-flight limitations can be exceeded arbitrarily, for as long as the flow to this node continues being saturated. In particular, if a follower is stuck, the leader will keep sending entries to it. This commit makes the MsgApp empty when Inflights is saturated, and prevents the described leakage of Entries to slow followers. Signed-off-by: Pavel Kalinnikov --- raft/raft.go | 21 ++++++++++++++++----- raft/raft_flow_control_test.go | 23 ++++++++--------------- raft/raft_test.go | 4 ++++ raft/testdata/replicate_pause.txt | 10 +++++----- raft/tracker/progress.go | 24 ++++++++++++------------ raft/tracker/progress_test.go | 2 +- 6 files changed, 46 insertions(+), 38 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 1c8995e6b..f2f898138 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -439,7 +439,18 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { } term, errt := r.raftLog.term(pr.Next - 1) - ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) + var ents []pb.Entry + var erre error + // In a throttled StateReplicate only send empty MsgApp, to ensure progress. + // Otherwise, if we had a full Inflights and all inflight messages were in + // fact dropped, replication to that follower would stall. Instead, an empty + // MsgApp will eventually reach the follower (heartbeats responses prompt the + // leader to send an append), allowing it to be acked or rejected, both of + // which will clear out Inflights. + if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { + ents, erre = r.raftLog.entries(pr.Next, r.maxMsgSize) + } + if len(ents) == 0 && !sendIfEmpty { return false } @@ -1295,10 +1306,10 @@ func stepLeader(r *raft, m pb.Message) error { pr.RecentActive = true pr.ProbeSent = false - // free one slot for the full inflights window to allow progress. - if pr.State == tracker.StateReplicate && pr.Inflights.Full() { - pr.Inflights.FreeFirstOne() - } + // NB: if the follower is paused (full Inflights), this will still send an + // empty append, allowing it to recover from situations in which all the + // messages that filled up Inflights in the first place were dropped. Note + // also that the outgoing heartbeat already communicated the commit index. if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) } diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index f0cb4aa2f..29dff843f 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -120,31 +120,24 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { } for tt := 1; tt < 5; tt++ { - if !pr2.IsPaused() { - t.Fatalf("#%d: paused = false, want true", tt) - } - // recv tt msgHeartbeatResp and expect one free slot for i := 0; i < tt; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } + // Unpauses the progress, sends an empty MsgApp, and pauses it again. r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) ms := r.readMessages() if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 { t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms)) } - if pr2.IsPaused() { - t.Fatalf("#%d.%d: paused = true, want false", tt, i) - } } - // one slot - r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - ms := r.readMessages() - if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 1 { - t.Fatalf("#%d: len(ms) == %d, want 1 MsgApp with 1 entry", tt, len(ms)) - } - - // and just one slot + // No more appends are sent if there are no heartbeats. for i := 0; i < 10; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() if len(ms) != 0 { diff --git a/raft/raft_test.go b/raft/raft_test.go index 9ae1ccd58..66cfe72ff 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -102,6 +102,10 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { } r.prs.Progress[2].BecomeReplicate() + if r.prs.Progress[2].ProbeSent { + t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) + } + r.prs.Progress[2].ProbeSent = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) if r.prs.Progress[2].ProbeSent { t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) diff --git a/raft/testdata/replicate_pause.txt b/raft/testdata/replicate_pause.txt index f6e30dc76..e7333cccb 100644 --- a/raft/testdata/replicate_pause.txt +++ b/raft/testdata/replicate_pause.txt @@ -146,9 +146,9 @@ stabilize 2 3 Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 -# After handling heartbeat responses, node 1 sends a new replication MsgApp to a -# throttled node 3 which hasn't yet replied to a single MsgApp. -# TODO(pavelkalinnikov): this should not happen, send empty MsgApp instead. +# After handling heartbeat responses, node 1 sends an empty MsgApp to a +# throttled node 3 because it hasn't yet replied to a single MsgApp, and the +# in-flight tracker is still saturated. stabilize 1 ---- > 1 receiving messages @@ -157,13 +157,13 @@ stabilize 1 > 1 handling Ready Ready MustSync=false: Messages: - 1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] + 1->3 MsgApp Term:1 Log:1/14 Commit:17 # Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. stabilize 3 ---- > 3 receiving messages - 1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] + 1->3 MsgApp Term:1 Log:1/14 Commit:17 DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 > 3 handling Ready Ready MustSync=false: diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index 7f1bb6260..fae565b9f 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -55,9 +55,13 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // ProbeSent is used while this follower is in StateProbe. When ProbeSent is - // true, raft should pause sending replication message to this peer until - // ProbeSent is reset. See ProbeAcked() and IsPaused(). + // ProbeSent is true when a "probe" MsgApp was sent to this follower recently, + // and we haven't heard from it back yet. Used when the MsgApp flow is + // throttled, i.e. when State is StateProbe, or StateReplicate with saturated + // Inflights. In both cases, we need to continue sending MsgApp once in a + // while to guarantee progress, but we only do so when ProbeSent is false (it + // is reset on receiving a heartbeat response), to not overflow the receiver. + // See IsPaused(). ProbeSent bool // Inflights is a sliding window for the inflight messages. @@ -101,13 +105,6 @@ func min(a, b uint64) uint64 { return a } -// ProbeAcked is called when this peer has accepted an append. It resets -// ProbeSent to signal that additional append messages should be sent without -// further delay. -func (pr *Progress) ProbeAcked() { - pr.ProbeSent = false -} - // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, // optionally and if larger, the index of the pending snapshot. func (pr *Progress) BecomeProbe() { @@ -147,6 +144,9 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { pr.OptimisticUpdate(last) pr.Inflights.Add(last) } + // If this message overflows the in-flights tracker, or it was already full, + // consider this message being a probe, so that the flow is paused. + pr.ProbeSent = pr.Inflights.Full() case StateProbe: // TODO(pavelkalinnikov): this condition captures the previous behaviour, // but we should set ProbeSent unconditionally for simplicity, because any @@ -168,7 +168,7 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { if pr.Match < n { pr.Match = n updated = true - pr.ProbeAcked() + pr.ProbeSent = false } pr.Next = max(pr.Next, n+1) return updated @@ -225,7 +225,7 @@ func (pr *Progress) IsPaused() bool { case StateProbe: return pr.ProbeSent case StateReplicate: - return pr.Inflights.Full() + return pr.ProbeSent case StateSnapshot: return true default: diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index 6eb582f04..5fbc1e7cd 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -47,7 +47,7 @@ func TestProgressIsPaused(t *testing.T) { {StateProbe, false, false}, {StateProbe, true, true}, {StateReplicate, false, false}, - {StateReplicate, true, false}, + {StateReplicate, true, true}, {StateSnapshot, false, true}, {StateSnapshot, true, true}, }