raft: send empty appends when replication is paused

When Inflights to a particular node is full, i.e. MaxInflightMsgs for the
append messages flow is saturated, it is still necessary to continue sending
MsgApp to ensure progress. Currently this is achieved by "forgetting" the first
in-flight message in the window, which frees up quota for one new MsgApp.

This new message is constructed in such a way that it potentially has multiple
entries, or a large entry. The effect of this is that the in-flight limitations
can be exceeded arbitrarily, for as long as the flow to this node continues
being saturated. In particular, if a follower is stuck, the leader will keep
sending entries to it.

This commit makes the MsgApp empty when Inflights is saturated, and prevents
the described leakage of Entries to slow followers.

Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
This commit is contained in:
Pavel Kalinnikov 2022-10-26 23:23:34 +01:00
parent 3bc3d2071e
commit 4969aa81ae
6 changed files with 46 additions and 38 deletions

View File

@ -439,7 +439,18 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
} }
term, errt := r.raftLog.term(pr.Next - 1) term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) var ents []pb.Entry
var erre error
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, erre = r.raftLog.entries(pr.Next, r.maxMsgSize)
}
if len(ents) == 0 && !sendIfEmpty { if len(ents) == 0 && !sendIfEmpty {
return false return false
} }
@ -1295,10 +1306,10 @@ func stepLeader(r *raft, m pb.Message) error {
pr.RecentActive = true pr.RecentActive = true
pr.ProbeSent = false pr.ProbeSent = false
// free one slot for the full inflights window to allow progress. // NB: if the follower is paused (full Inflights), this will still send an
if pr.State == tracker.StateReplicate && pr.Inflights.Full() { // empty append, allowing it to recover from situations in which all the
pr.Inflights.FreeFirstOne() // messages that filled up Inflights in the first place were dropped. Note
} // also that the outgoing heartbeat already communicated the commit index.
if pr.Match < r.raftLog.lastIndex() { if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From) r.sendAppend(m.From)
} }

View File

@ -120,31 +120,24 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
} }
for tt := 1; tt < 5; tt++ { for tt := 1; tt < 5; tt++ {
if !pr2.IsPaused() {
t.Fatalf("#%d: paused = false, want true", tt)
}
// recv tt msgHeartbeatResp and expect one free slot // recv tt msgHeartbeatResp and expect one free slot
for i := 0; i < tt; i++ { for i := 0; i < tt; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}
// Unpauses the progress, sends an empty MsgApp, and pauses it again.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
ms := r.readMessages() ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 { if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 {
t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms)) t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms))
} }
if pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = true, want false", tt, i)
}
} }
// one slot // No more appends are sent if there are no heartbeats.
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 1 {
t.Fatalf("#%d: len(ms) == %d, want 1 MsgApp with 1 entry", tt, len(ms))
}
// and just one slot
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages() ms := r.readMessages()
if len(ms) != 0 { if len(ms) != 0 {

View File

@ -102,6 +102,10 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
} }
r.prs.Progress[2].BecomeReplicate() r.prs.Progress[2].BecomeReplicate()
if r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
}
r.prs.Progress[2].ProbeSent = true
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
if r.prs.Progress[2].ProbeSent { if r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)

View File

@ -146,9 +146,9 @@ stabilize 2 3
Messages: Messages:
3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0
# After handling heartbeat responses, node 1 sends a new replication MsgApp to a # After handling heartbeat responses, node 1 sends an empty MsgApp to a
# throttled node 3 which hasn't yet replied to a single MsgApp. # throttled node 3 because it hasn't yet replied to a single MsgApp, and the
# TODO(pavelkalinnikov): this should not happen, send empty MsgApp instead. # in-flight tracker is still saturated.
stabilize 1 stabilize 1
---- ----
> 1 receiving messages > 1 receiving messages
@ -157,13 +157,13 @@ stabilize 1
> 1 handling Ready > 1 handling Ready
Ready MustSync=false: Ready MustSync=false:
Messages: Messages:
1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] 1->3 MsgApp Term:1 Log:1/14 Commit:17
# Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. # Node 3 finally receives a MsgApp, but there was a gap, so it rejects it.
stabilize 3 stabilize 3
---- ----
> 3 receiving messages > 3 receiving messages
1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"] 1->3 MsgApp Term:1 Log:1/14 Commit:17
DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1
> 3 handling Ready > 3 handling Ready
Ready MustSync=false: Ready MustSync=false:

View File

@ -55,9 +55,13 @@ type Progress struct {
// This is always true on the leader. // This is always true on the leader.
RecentActive bool RecentActive bool
// ProbeSent is used while this follower is in StateProbe. When ProbeSent is // ProbeSent is true when a "probe" MsgApp was sent to this follower recently,
// true, raft should pause sending replication message to this peer until // and we haven't heard from it back yet. Used when the MsgApp flow is
// ProbeSent is reset. See ProbeAcked() and IsPaused(). // throttled, i.e. when State is StateProbe, or StateReplicate with saturated
// Inflights. In both cases, we need to continue sending MsgApp once in a
// while to guarantee progress, but we only do so when ProbeSent is false (it
// is reset on receiving a heartbeat response), to not overflow the receiver.
// See IsPaused().
ProbeSent bool ProbeSent bool
// Inflights is a sliding window for the inflight messages. // Inflights is a sliding window for the inflight messages.
@ -101,13 +105,6 @@ func min(a, b uint64) uint64 {
return a return a
} }
// ProbeAcked is called when this peer has accepted an append. It resets
// ProbeSent to signal that additional append messages should be sent without
// further delay.
func (pr *Progress) ProbeAcked() {
pr.ProbeSent = false
}
// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
// optionally and if larger, the index of the pending snapshot. // optionally and if larger, the index of the pending snapshot.
func (pr *Progress) BecomeProbe() { func (pr *Progress) BecomeProbe() {
@ -147,6 +144,9 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error {
pr.OptimisticUpdate(last) pr.OptimisticUpdate(last)
pr.Inflights.Add(last) pr.Inflights.Add(last)
} }
// If this message overflows the in-flights tracker, or it was already full,
// consider this message being a probe, so that the flow is paused.
pr.ProbeSent = pr.Inflights.Full()
case StateProbe: case StateProbe:
// TODO(pavelkalinnikov): this condition captures the previous behaviour, // TODO(pavelkalinnikov): this condition captures the previous behaviour,
// but we should set ProbeSent unconditionally for simplicity, because any // but we should set ProbeSent unconditionally for simplicity, because any
@ -168,7 +168,7 @@ func (pr *Progress) MaybeUpdate(n uint64) bool {
if pr.Match < n { if pr.Match < n {
pr.Match = n pr.Match = n
updated = true updated = true
pr.ProbeAcked() pr.ProbeSent = false
} }
pr.Next = max(pr.Next, n+1) pr.Next = max(pr.Next, n+1)
return updated return updated
@ -225,7 +225,7 @@ func (pr *Progress) IsPaused() bool {
case StateProbe: case StateProbe:
return pr.ProbeSent return pr.ProbeSent
case StateReplicate: case StateReplicate:
return pr.Inflights.Full() return pr.ProbeSent
case StateSnapshot: case StateSnapshot:
return true return true
default: default:

View File

@ -47,7 +47,7 @@ func TestProgressIsPaused(t *testing.T) {
{StateProbe, false, false}, {StateProbe, false, false},
{StateProbe, true, true}, {StateProbe, true, true},
{StateReplicate, false, false}, {StateReplicate, false, false},
{StateReplicate, true, false}, {StateReplicate, true, true},
{StateSnapshot, false, true}, {StateSnapshot, false, true},
{StateSnapshot, true, true}, {StateSnapshot, true, true},
} }