raft/tracker: rename and comment MsgApp paused field

Make the field name and comment clearer on the fact that it's used both in
StateProbe and StateReplicate. The old name ProbeSent was slightly confusing,
and also triggered thinking that it's used only in StateProbe.

Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
This commit is contained in:
Pavel Kalinnikov 2022-11-08 13:01:11 +00:00
parent 467114ed87
commit 1ea13494eb
5 changed files with 55 additions and 56 deletions

View File

@ -1304,7 +1304,7 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.ProbeSent = false
pr.MsgAppFlowPaused = false
// NB: if the follower is paused (full Inflights), this will still send an
// empty append, allowing it to recover from situations in which all the
@ -1349,7 +1349,7 @@ func stepLeader(r *raft, m pb.Message) error {
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.ProbeSent = true
pr.MsgAppFlowPaused = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.

View File

@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) {
if sm.prs.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next)
}
if !sm.prs.Progress[2].ProbeSent {
t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
if !sm.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused)
}
}
@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) {
if sm.prs.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next)
}
if !sm.prs.Progress[2].ProbeSent {
t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
if !sm.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused)
}
}

View File

@ -94,21 +94,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.prs.Progress[2].ProbeSent = true
r.prs.Progress[2].MsgAppFlowPaused = true
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
if !r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
}
r.prs.Progress[2].BecomeReplicate()
if r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
if r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused)
}
r.prs.Progress[2].ProbeSent = true
r.prs.Progress[2].MsgAppFlowPaused = true
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
if r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
if r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused)
}
}
@ -2658,8 +2658,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
}
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
if !r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
}
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@ -2673,8 +2673,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
for j := 0; j < r.heartbeatTimeout; j++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
}
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
if !r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
}
// consume the heartbeat
@ -2696,8 +2696,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
if msg[0].Index != 0 {
t.Errorf("index = %d, want %d", msg[0].Index, 0)
}
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
if !r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
}
}

View File

@ -55,14 +55,13 @@ type Progress struct {
// This is always true on the leader.
RecentActive bool
// ProbeSent is true when a "probe" MsgApp was sent to this follower recently,
// and we haven't heard from it back yet. Used when the MsgApp flow is
// throttled, i.e. when State is StateProbe, or StateReplicate with saturated
// Inflights. In both cases, we need to continue sending MsgApp once in a
// while to guarantee progress, but we only do so when ProbeSent is false (it
// is reset on receiving a heartbeat response), to not overflow the receiver.
// See IsPaused().
ProbeSent bool
// MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
// happens in StateProbe, or StateReplicate with saturated Inflights. In both
// cases, we need to continue sending MsgApp once in a while to guarantee
// progress, but we only do so when MsgAppFlowPaused is false (it is reset on
// receiving a heartbeat response), to not overflow the receiver. See
// IsPaused().
MsgAppFlowPaused bool
// Inflights is a sliding window for the inflight messages.
// Each inflight message contains one or more log entries.
@ -82,10 +81,10 @@ type Progress struct {
IsLearner bool
}
// ResetState moves the Progress into the specified State, resetting ProbeSent,
// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
// PendingSnapshot, and Inflights.
func (pr *Progress) ResetState(state StateType) {
pr.ProbeSent = false
pr.MsgAppFlowPaused = false
pr.PendingSnapshot = 0
pr.State = state
pr.Inflights.reset()
@ -146,13 +145,13 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error {
}
// If this message overflows the in-flights tracker, or it was already full,
// consider this message being a probe, so that the flow is paused.
pr.ProbeSent = pr.Inflights.Full()
pr.MsgAppFlowPaused = pr.Inflights.Full()
case StateProbe:
// TODO(pavelkalinnikov): this condition captures the previous behaviour,
// but we should set ProbeSent unconditionally for simplicity, because any
// but we should set MsgAppFlowPaused unconditionally for simplicity, because any
// MsgApp in StateProbe is a probe, not only non-empty ones.
if entries > 0 {
pr.ProbeSent = true
pr.MsgAppFlowPaused = true
}
default:
return fmt.Errorf("sending append in unhandled state %s", pr.State)
@ -168,7 +167,7 @@ func (pr *Progress) MaybeUpdate(n uint64) bool {
if pr.Match < n {
pr.Match = n
updated = true
pr.ProbeSent = false
pr.MsgAppFlowPaused = false
}
pr.Next = max(pr.Next, n+1)
return updated
@ -210,7 +209,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
}
pr.Next = max(min(rejected, matchHint+1), 1)
pr.ProbeSent = false
pr.MsgAppFlowPaused = false
return true
}
@ -223,9 +222,9 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
func (pr *Progress) IsPaused() bool {
switch pr.State {
case StateProbe:
return pr.ProbeSent
return pr.MsgAppFlowPaused
case StateReplicate:
return pr.ProbeSent
return pr.MsgAppFlowPaused
case StateSnapshot:
return true
default:

View File

@ -22,14 +22,14 @@ func TestProgressString(t *testing.T) {
ins := NewInflights(1)
ins.Add(123)
pr := &Progress{
Match: 1,
Next: 2,
State: StateSnapshot,
PendingSnapshot: 123,
RecentActive: false,
ProbeSent: true,
IsLearner: true,
Inflights: ins,
Match: 1,
Next: 2,
State: StateSnapshot,
PendingSnapshot: 123,
RecentActive: false,
MsgAppFlowPaused: true,
IsLearner: true,
Inflights: ins,
}
const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]`
if act := pr.String(); act != exp {
@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) {
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
ProbeSent: tt.paused,
Inflights: NewInflights(256),
State: tt.state,
MsgAppFlowPaused: tt.paused,
Inflights: NewInflights(256),
}
if g := p.IsPaused(); g != tt.w {
t.Errorf("#%d: paused= %t, want %t", i, g, tt.w)
@ -64,20 +64,20 @@ func TestProgressIsPaused(t *testing.T) {
}
// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset
// ProbeSent.
// MsgAppFlowPaused.
func TestProgressResume(t *testing.T) {
p := &Progress{
Next: 2,
ProbeSent: true,
Next: 2,
MsgAppFlowPaused: true,
}
p.MaybeDecrTo(1, 1)
if p.ProbeSent {
t.Errorf("paused= %v, want false", p.ProbeSent)
if p.MsgAppFlowPaused {
t.Errorf("paused= %v, want false", p.MsgAppFlowPaused)
}
p.ProbeSent = true
p.MsgAppFlowPaused = true
p.MaybeUpdate(2)
if p.ProbeSent {
t.Errorf("paused= %v, want false", p.ProbeSent)
if p.MsgAppFlowPaused {
t.Errorf("paused= %v, want false", p.MsgAppFlowPaused)
}
}