diff --git a/raft/raft.go b/raft/raft.go index f9f38155d..d7603f101 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -437,11 +437,20 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { if pr.IsPaused() { return false } - m := pb.Message{} - m.To = to term, errt := r.raftLog.term(pr.Next - 1) - ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) + var ents []pb.Entry + var erre error + // In a throttled StateReplicate only send empty MsgApp, to ensure progress. + // Otherwise, if we had a full Inflights and all inflight messages were in + // fact dropped, replication to that follower would stall. Instead, an empty + // MsgApp will eventually reach the follower (heartbeats responses prompt the + // leader to send an append), allowing it to be acked or rejected, both of + // which will clear out Inflights. + if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { + ents, erre = r.raftLog.entries(pr.Next, r.maxMsgSize) + } + if len(ents) == 0 && !sendIfEmpty { return false } @@ -452,7 +461,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return false } - m.Type = pb.MsgSnap snapshot, err := r.raftLog.snapshot() if err != nil { if err == ErrSnapshotTemporarilyUnavailable { @@ -464,33 +472,29 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { if IsEmptySnap(snapshot) { panic("need non-empty snapshot") } - m.Snapshot = snapshot sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) pr.BecomeSnapshot(sindex) r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) - } else { - m.Type = pb.MsgApp - m.Index = pr.Next - 1 - m.LogTerm = term - m.Entries = ents - m.Commit = r.raftLog.committed - if n := len(m.Entries); n != 0 { - switch pr.State { - // optimistically increase the next when in StateReplicate - case tracker.StateReplicate: - last := m.Entries[n-1].Index - pr.OptimisticUpdate(last) - pr.Inflights.Add(last) - case tracker.StateProbe: - pr.ProbeSent = true - default: - r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) - } - } + + r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: snapshot}) + return true } - r.send(m) + + // Send the actual MsgApp otherwise, and update the progress accordingly. + next := pr.Next // save Next for later, as the progress update can change it + if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil { + r.logger.Panicf("%x: %v", r.id, err) + } + r.send(pb.Message{ + To: to, + Type: pb.MsgApp, + Index: next - 1, + LogTerm: term, + Entries: ents, + Commit: r.raftLog.committed, + }) return true } @@ -1300,12 +1304,12 @@ func stepLeader(r *raft, m pb.Message) error { } case pb.MsgHeartbeatResp: pr.RecentActive = true - pr.ProbeSent = false + pr.MsgAppFlowPaused = false - // free one slot for the full inflights window to allow progress. - if pr.State == tracker.StateReplicate && pr.Inflights.Full() { - pr.Inflights.FreeFirstOne() - } + // NB: if the follower is paused (full Inflights), this will still send an + // empty append, allowing it to recover from situations in which all the + // messages that filled up Inflights in the first place were dropped. Note + // also that the outgoing heartbeat already communicated the commit index. if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) } @@ -1345,7 +1349,7 @@ func stepLeader(r *raft, m pb.Message) error { // If snapshot finish, wait for the MsgAppResp from the remote node before sending // out the next MsgApp. // If snapshot failure, wait for a heartbeat interval before next try - pr.ProbeSent = true + pr.MsgAppFlowPaused = true case pb.MsgUnreachable: // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgApp is lost. diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 5430568c3..29dff843f 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -36,14 +36,14 @@ func TestMsgAppFlowControlFull(t *testing.T) { for i := 0; i < r.prs.MaxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() - if len(ms) != 1 { - t.Fatalf("#%d: len(ms) = %d, want 1", i, len(ms)) + if len(ms) != 1 || ms[0].Type != pb.MsgApp { + t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", i, len(ms)) } } // ensure 1 - if !pr2.Inflights.Full() { - t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true) + if !pr2.IsPaused() { + t.Fatal("paused = false, want true") } // ensure 2 @@ -84,20 +84,20 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { // fill in the inflights window again r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() - if len(ms) != 1 { - t.Fatalf("#%d: len(ms) = %d, want 1", tt, len(ms)) + if len(ms) != 1 || ms[0].Type != pb.MsgApp { + t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", tt, len(ms)) } // ensure 1 - if !pr2.Inflights.Full() { - t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true) + if !pr2.IsPaused() { + t.Fatalf("#%d: paused = false, want true", tt) } // ensure 2 for i := 0; i < tt; i++ { r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)}) - if !pr2.Inflights.Full() { - t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true) + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) } } } @@ -120,32 +120,28 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { } for tt := 1; tt < 5; tt++ { - if !pr2.Inflights.Full() { - t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true) - } - // recv tt msgHeartbeatResp and expect one free slot for i := 0; i < tt; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } + // Unpauses the progress, sends an empty MsgApp, and pauses it again. r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - r.readMessages() - if pr2.Inflights.Full() { - t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.Inflights.Full(), false) + ms := r.readMessages() + if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 { + t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms)) } } - // one slot - r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - ms := r.readMessages() - if len(ms) != 1 { - t.Fatalf("#%d: free slot = 0, want 1", tt) - } - - // and just one slot + // No more appends are sent if there are no heartbeats. for i := 0; i < 10; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - ms1 := r.readMessages() - if len(ms1) != 0 { - t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms1)) + ms := r.readMessages() + if len(ms) != 0 { + t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms)) } } diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 6b2afeebd..f8ed07eba 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) { if sm.prs.Progress[2].Next != 1 { t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next) } - if !sm.prs.Progress[2].ProbeSent { - t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent) + if !sm.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused) } } @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) { if sm.prs.Progress[2].Next != 12 { t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next) } - if !sm.prs.Progress[2].ProbeSent { - t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent) + if !sm.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index 9ae1ccd58..a04e60bfe 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -94,17 +94,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.prs.Progress[2].ProbeSent = true + r.prs.Progress[2].MsgAppFlowPaused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) + if !r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused) } r.prs.Progress[2].BecomeReplicate() + if r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused) + } + r.prs.Progress[2].MsgAppFlowPaused = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) + if r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused) } } @@ -2654,8 +2658,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) + if !r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2669,8 +2673,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) + if !r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused) } // consume the heartbeat @@ -2692,8 +2696,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.prs.Progress[2].ProbeSent { - t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) + if !r.prs.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused) } } diff --git a/raft/rafttest/interaction_env.go b/raft/rafttest/interaction_env.go index 4a6adc5a5..75c223837 100644 --- a/raft/rafttest/interaction_env.go +++ b/raft/rafttest/interaction_env.go @@ -84,15 +84,13 @@ type Storage interface { Append([]pb.Entry) error } -// defaultRaftConfig sets up a *raft.Config with reasonable testing defaults. -// In particular, no limits are set. -func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config { - return &raft.Config{ - ID: id, - Applied: applied, +// raftConfigStub sets up a raft.Config stub with reasonable testing defaults. +// In particular, no limits are set. It is not a complete config: ID and Storage +// must be set for each node using the stub as a template. +func raftConfigStub() raft.Config { + return raft.Config{ ElectionTick: 3, HeartbeatTick: 1, - Storage: s, MaxSizePerMsg: math.MaxUint64, MaxInflightMsgs: math.MaxInt32, } diff --git a/raft/rafttest/interaction_env_handler_add_nodes.go b/raft/rafttest/interaction_env_handler_add_nodes.go index 517477ef4..b72c96505 100644 --- a/raft/rafttest/interaction_env_handler_add_nodes.go +++ b/raft/rafttest/interaction_env_handler_add_nodes.go @@ -28,6 +28,7 @@ import ( func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error { n := firstAsInt(t, d) var snap pb.Snapshot + cfg := raftConfigStub() for _, arg := range d.CmdArgs[1:] { for i := range arg.Vals { switch arg.Key { @@ -39,14 +40,17 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e var id uint64 arg.Scan(t, i, &id) snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id) + case "inflight": + arg.Scan(t, i, &cfg.MaxInflightMsgs) case "index": arg.Scan(t, i, &snap.Metadata.Index) + cfg.Applied = snap.Metadata.Index case "content": arg.Scan(t, i, &snap.Data) } } } - return env.AddNodes(n, snap) + return env.AddNodes(n, cfg, snap) } type snapOverrideStorage struct { @@ -63,9 +67,9 @@ func (s snapOverrideStorage) Snapshot() (pb.Snapshot, error) { var _ raft.Storage = snapOverrideStorage{} -// AddNodes adds n new nodes initializes from the given snapshot (which may be -// empty). They will be assigned consecutive IDs. -func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { +// AddNodes adds n new nodes initialized from the given snapshot (which may be +// empty), and using the cfg as template. They will be assigned consecutive IDs. +func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) error { bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{}) for i := 0; i < n; i++ { id := uint64(1 + len(env.Nodes)) @@ -103,9 +107,10 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { return fmt.Errorf("failed to establish first index %d; got %d", exp, fi) } } - cfg := defaultRaftConfig(id, snap.Metadata.Index, s) + cfg := cfg // fork the config stub + cfg.ID, cfg.Storage = id, s if env.Options.OnConfig != nil { - env.Options.OnConfig(cfg) + env.Options.OnConfig(&cfg) if cfg.ID != id { // This could be supported but then we need to do more work // translating back and forth -- not worth it. @@ -117,7 +122,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { } cfg.Logger = env.Output - rn, err := raft.NewRawNode(cfg) + rn, err := raft.NewRawNode(&cfg) if err != nil { return err } @@ -127,7 +132,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { // TODO(tbg): allow a more general Storage, as long as it also allows // us to apply snapshots, append entries, and update the HardState. Storage: s, - Config: cfg, + Config: &cfg, History: []pb.Snapshot{snap}, } env.Nodes = append(env.Nodes, node) diff --git a/raft/testdata/replicate_pause.txt b/raft/testdata/replicate_pause.txt new file mode 100644 index 000000000..e7333cccb --- /dev/null +++ b/raft/testdata/replicate_pause.txt @@ -0,0 +1,190 @@ +# This test ensures that MsgApp stream to a follower is paused when the +# in-flight state exceeds the configured limits. This is a regression test for +# the issue fixed by https://github.com/etcd-io/etcd/pull/14633. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with 3 nodes, with a limited in-flight capacity. +add-nodes 3 voters=(1,2,3) index=10 inflight=3 +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok (quiet) + +# Propose 3 entries. +propose 1 prop_1_12 +---- +ok + +propose 1 prop_1_13 +---- +ok + +propose 1 prop_1_14 +---- +ok + +# Store entries and send proposals. +process-ready 1 +---- +ok (quiet) + +# Re-enable log messages. +log-level debug +---- +ok + +# Expect that in-flight tracking to nodes 2 and 3 is saturated. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=11 next=15 paused inflight=3[full] +3: StateReplicate match=11 next=15 paused inflight=3[full] + +log-level none +---- +ok + +# Commit entries between nodes 1 and 2. +stabilize 1 2 +---- +ok (quiet) + +log-level debug +---- +ok + +# Expect that the entries are committed and stored on nodes 1 and 2. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=14 next=15 +3: StateReplicate match=11 next=15 paused inflight=3[full] + +# Drop append messages to node 3. +deliver-msgs drop=3 +---- +dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"] +dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"] +dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"] + + +# Repeat committing 3 entries. +propose 1 prop_1_15 +---- +ok + +propose 1 prop_1_16 +---- +ok + +propose 1 prop_1_17 +---- +ok + +# In-flight tracking to nodes 2 and 3 is saturated, but node 3 is behind. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=14 next=18 paused inflight=3[full] +3: StateReplicate match=11 next=15 paused inflight=3[full] + +log-level none +---- +ok + +# Commit entries between nodes 1 and 2 again. +stabilize 1 2 +---- +ok (quiet) + +log-level debug +---- +ok + +# Expect that the entries are committed and stored only on nodes 1 and 2. +status 1 +---- +1: StateReplicate match=17 next=18 +2: StateReplicate match=17 next=18 +3: StateReplicate match=11 next=15 paused inflight=3[full] + +# Make a heartbeat roundtrip. +tick-heartbeat 1 +---- +ok + +stabilize 1 +---- +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17 + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 + +stabilize 2 3 +---- +> 2 receiving messages + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17 +> 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgHeartbeatResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + +# After handling heartbeat responses, node 1 sends an empty MsgApp to a +# throttled node 3 because it hasn't yet replied to a single MsgApp, and the +# in-flight tracker is still saturated. +stabilize 1 +---- +> 1 receiving messages + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/14 Commit:17 + +# Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. +stabilize 3 +---- +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/14 Commit:17 + DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) + +log-level none +---- +ok + +stabilize +---- +ok (quiet) + +log-level debug +---- +ok + +# Eventually all nodes catch up on the committed state. +status 1 +---- +1: StateReplicate match=17 next=18 +2: StateReplicate match=17 next=18 +3: StateReplicate match=17 next=18 \ No newline at end of file diff --git a/raft/tracker/inflights.go b/raft/tracker/inflights.go index 1a056341a..242d1cab1 100644 --- a/raft/tracker/inflights.go +++ b/raft/tracker/inflights.go @@ -113,10 +113,6 @@ func (in *Inflights) FreeLE(to uint64) { } } -// FreeFirstOne releases the first inflight. This is a no-op if nothing is -// inflight. -func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) } - // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { return in.count == in.size diff --git a/raft/tracker/inflights_test.go b/raft/tracker/inflights_test.go index 582a373ba..0a56ced34 100644 --- a/raft/tracker/inflights_test.go +++ b/raft/tracker/inflights_test.go @@ -105,6 +105,20 @@ func TestInflightFreeTo(t *testing.T) { in.Add(uint64(i)) } + in.FreeLE(0) + + wantIn0 := &Inflights{ + start: 1, + count: 9, + size: 10, + // ↓------------------------ + buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + } + + if !reflect.DeepEqual(in, wantIn0) { + t.Fatalf("in = %+v, want %+v", in, wantIn0) + } + in.FreeLE(4) wantIn := &Inflights{ @@ -166,24 +180,3 @@ func TestInflightFreeTo(t *testing.T) { t.Fatalf("in = %+v, want %+v", in, wantIn4) } } - -func TestInflightFreeFirstOne(t *testing.T) { - in := NewInflights(10) - for i := 0; i < 10; i++ { - in.Add(uint64(i)) - } - - in.FreeFirstOne() - - wantIn := &Inflights{ - start: 1, - count: 9, - size: 10, - // ↓------------------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - } - - if !reflect.DeepEqual(in, wantIn) { - t.Fatalf("in = %+v, want %+v", in, wantIn) - } -} diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index e37e4b63f..c6272d22d 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -55,10 +55,13 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // ProbeSent is used while this follower is in StateProbe. When ProbeSent is - // true, raft should pause sending replication message to this peer until - // ProbeSent is reset. See ProbeAcked() and IsPaused(). - ProbeSent bool + // MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This + // happens in StateProbe, or StateReplicate with saturated Inflights. In both + // cases, we need to continue sending MsgApp once in a while to guarantee + // progress, but we only do so when MsgAppFlowPaused is false (it is reset on + // receiving a heartbeat response), to not overflow the receiver. See + // IsPaused(). + MsgAppFlowPaused bool // Inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries. @@ -78,10 +81,10 @@ type Progress struct { IsLearner bool } -// ResetState moves the Progress into the specified State, resetting ProbeSent, +// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, // PendingSnapshot, and Inflights. func (pr *Progress) ResetState(state StateType) { - pr.ProbeSent = false + pr.MsgAppFlowPaused = false pr.PendingSnapshot = 0 pr.State = state pr.Inflights.reset() @@ -101,13 +104,6 @@ func min(a, b uint64) uint64 { return a } -// ProbeAcked is called when this peer has accepted an append. It resets -// ProbeSent to signal that additional append messages should be sent without -// further delay. -func (pr *Progress) ProbeAcked() { - pr.ProbeSent = false -} - // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, // optionally and if larger, the index of the pending snapshot. func (pr *Progress) BecomeProbe() { @@ -137,6 +133,32 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { pr.PendingSnapshot = snapshoti } +// UpdateOnEntriesSend updates the progress on the given number of consecutive +// entries being sent in a MsgApp, appended at and after the given log index. +func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { + switch pr.State { + case StateReplicate: + if entries > 0 { + last := nextIndex + uint64(entries) - 1 + pr.OptimisticUpdate(last) + pr.Inflights.Add(last) + } + // If this message overflows the in-flights tracker, or it was already full, + // consider this message being a probe, so that the flow is paused. + pr.MsgAppFlowPaused = pr.Inflights.Full() + case StateProbe: + // TODO(pavelkalinnikov): this condition captures the previous behaviour, + // but we should set MsgAppFlowPaused unconditionally for simplicity, because any + // MsgApp in StateProbe is a probe, not only non-empty ones. + if entries > 0 { + pr.MsgAppFlowPaused = true + } + default: + return fmt.Errorf("sending append in unhandled state %s", pr.State) + } + return nil +} + // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true. @@ -145,7 +167,7 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { if pr.Match < n { pr.Match = n updated = true - pr.ProbeAcked() + pr.MsgAppFlowPaused = false } pr.Next = max(pr.Next, n+1) return updated @@ -187,7 +209,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { } pr.Next = max(min(rejected, matchHint+1), 1) - pr.ProbeSent = false + pr.MsgAppFlowPaused = false return true } @@ -200,9 +222,9 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { func (pr *Progress) IsPaused() bool { switch pr.State { case StateProbe: - return pr.ProbeSent + return pr.MsgAppFlowPaused case StateReplicate: - return pr.Inflights.Full() + return pr.MsgAppFlowPaused case StateSnapshot: return true default: diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index 6eb582f04..154366249 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -22,14 +22,14 @@ func TestProgressString(t *testing.T) { ins := NewInflights(1) ins.Add(123) pr := &Progress{ - Match: 1, - Next: 2, - State: StateSnapshot, - PendingSnapshot: 123, - RecentActive: false, - ProbeSent: true, - IsLearner: true, - Inflights: ins, + Match: 1, + Next: 2, + State: StateSnapshot, + PendingSnapshot: 123, + RecentActive: false, + MsgAppFlowPaused: true, + IsLearner: true, + Inflights: ins, } const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]` if act := pr.String(); act != exp { @@ -47,15 +47,15 @@ func TestProgressIsPaused(t *testing.T) { {StateProbe, false, false}, {StateProbe, true, true}, {StateReplicate, false, false}, - {StateReplicate, true, false}, + {StateReplicate, true, true}, {StateSnapshot, false, true}, {StateSnapshot, true, true}, } for i, tt := range tests { p := &Progress{ - State: tt.state, - ProbeSent: tt.paused, - Inflights: NewInflights(256), + State: tt.state, + MsgAppFlowPaused: tt.paused, + Inflights: NewInflights(256), } if g := p.IsPaused(); g != tt.w { t.Errorf("#%d: paused= %t, want %t", i, g, tt.w) @@ -64,20 +64,20 @@ func TestProgressIsPaused(t *testing.T) { } // TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset -// ProbeSent. +// MsgAppFlowPaused. func TestProgressResume(t *testing.T) { p := &Progress{ - Next: 2, - ProbeSent: true, + Next: 2, + MsgAppFlowPaused: true, } p.MaybeDecrTo(1, 1) - if p.ProbeSent { - t.Errorf("paused= %v, want false", p.ProbeSent) + if p.MsgAppFlowPaused { + t.Errorf("paused= %v, want false", p.MsgAppFlowPaused) } - p.ProbeSent = true + p.MsgAppFlowPaused = true p.MaybeUpdate(2) - if p.ProbeSent { - t.Errorf("paused= %v, want false", p.ProbeSent) + if p.MsgAppFlowPaused { + t.Errorf("paused= %v, want false", p.MsgAppFlowPaused) } }