From c1e8d3a63f3c22f9b4417a7b31c38bb3f3eb9274 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 11 Feb 2021 13:30:55 +0100 Subject: [PATCH] Clarify documentation of probing - Add a large detailed comment about the use and necessity of both the follower and leader probing optimization - fix the log message in stepLeader that previously mixed up the log term for the rejection and the index of the append - improve the test via subtests - add some verbiage in findConflictByTerm around first index --- raft/log.go | 18 ++- raft/raft.go | 148 ++++++++++++++---- raft/raft_test.go | 116 +++++++------- raft/testdata/campaign_learner_must_vote.txt | 2 +- raft/testdata/confchange_v1_add_single.txt | 2 +- .../confchange_v2_add_double_auto.txt | 4 +- .../confchange_v2_add_double_implicit.txt | 2 +- .../confchange_v2_add_single_auto.txt | 2 +- .../confchange_v2_add_single_explicit.txt | 2 +- 9 files changed, 198 insertions(+), 98 deletions(-) diff --git a/raft/log.go b/raft/log.go index dac141624..c94c41f77 100644 --- a/raft/log.go +++ b/raft/log.go @@ -141,11 +141,21 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { // entry on a leader/follower during an append) and finds the largest index in // log l with a term <= `term` and an index <= `index`. If no such index exists // in the log, the log's first index is returned. -// The index provided MUST be equal to or less than l.lastIndex(). +// +// The index provided MUST be equal to or less than l.lastIndex(). Invalid +// inputs log a warning and the input index is returned. func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 { - if index > l.lastIndex() { - l.logger.Panicf("index(%d) is out of range [lastIndex(%d)] in findConflictByTerm", - index, l.lastIndex()) + if li := l.lastIndex(); index > li { + // NB: such calls should not exist, but since there is a straightfoward + // way to recover, do it. + // + // It is tempting to also check something about the first index, but + // there is odd behavior with peers that have no log, in which case + // lastIndex will return zero and firstIndex will return one, which + // leads to calls with an index of zero into this method. + l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm", + index, li) + return index } for { logTerm, err := l.term(index) diff --git a/raft/raft.go b/raft/raft.go index 6f9364656..6fc9a1885 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1106,32 +1106,127 @@ func stepLeader(r *raft, m pb.Message) error { pr.RecentActive = true if m.Reject { - r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, hint: %d) from %x for index %d, log term %d", - r.id, m.RejectHint, m.From, m.Index, m.LogTerm) - // The reject hint is the suggested next base entry for appending, - // i.e. hint+1 would be sent as the next append. 'hint' is usually the - // last index of the follower, or the base index (m.Index) from the - // append being responsed to. - hint := m.RejectHint + // RejectHint is the suggested next base entry for appending (i.e. + // we try to append entry RejectHint+1 next), and LogTerm is the + // term that the follower has at index RejectHint. Older versions + // of this library did not populate LogTerm for rejections and it + // is zero for followers with an empty log. + // + // Under normal circumstances, the leader's log is longer than the + // follower's and the follower's log is a prefix of the leader's + // (i.e. there is no divergent uncommitted suffix of the log on the + // follower). In that case, the first probe reveals where the + // follower's log ends (RejectHint=follower's last index) and the + // subsequent probe succeeds. + // + // However, when networks are partitioned or systems overloaded, + // large divergent log tails can occur. The naive attempt, probing + // entry by entry in decreasing order, will be the product of the + // length of the diverging tails and the network round-trip latency, + // which can easily result in hours of time spent probing and can + // even cause outright outages. The probes are thus optimized as + // described below. + r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d", + r.id, m.RejectHint, m.LogTerm, m.From, m.Index) + nextProbeIdx := m.RejectHint if m.LogTerm > 0 { - // LogTerm is the term that the follower has at index RejectHint. If the - // follower has an uncommitted log tail, we would end up probing one by - // one until we hit the common prefix. + // If the follower has an uncommitted log tail, we would end up + // probing one by one until we hit the common prefix. // // For example, if the leader has: - // [...] (idx=1,term=2) (idx=2,term=5)[...](idx=9,term=5) - // and the follower has: - // [...] (idx=1,term=2) (idx=2,term=4)[...](idx=6,term=4) // - // Then, after sending (idx=9,term=5) we would receive a RejectHint of 6 - // and LogTerm of 4. Without the code below, we would try an append at - // 6, 5, 4 ... until hitting idx=1 which succeeds (as 1 matches). - // Instead, the code below skips all indexes at terms >4, and reduces - // 'hint' to 1 in one term (meaning we'll successfully append 2 next - // time). - hint = r.raftLog.findConflictByTerm(hint, m.LogTerm) + // idx 1 2 3 4 5 6 7 8 9 + // ----------------- + // term (L) 1 3 3 3 5 5 5 5 5 + // term (F) 1 1 1 1 2 2 + // + // Then, after sending an append anchored at (idx=9,term=5) we + // would receive a RejectHint of 6 and LogTerm of 2. Without the + // code below, we would try an append at index 6, which would + // fail again. + // + // However, looking only at what the leader knows about its own + // log and the rejection hint, it is clear that a probe at index + // 6, 5, 4, 3, and 2 must fail as well: + // + // For all of these indexes, the leader's log term is larger than + // the rejection's log term. If a probe at one of these indexes + // succeeded, its log term at that index would match the leader's, + // i.e. 3 or 5 in this example. But the follower already told the + // leader that it is still at term 2 at index 9, and since the + // log term only ever goes up (within a log), this is a contradiction. + // + // At index 1, however, the leader can draw no such conclusion, + // as its term 1 is not larger than the term 2 from the + // follower's rejection. We thus probe at 1, which will succeed + // in this example. In general, with this approach we probe at + // most once per term found in the leader's log. + // + // There is a similar mechanism on the follower (implemented in + // handleAppendEntries via a call to findConflictByTerm) that is + // useful if the follower has a large divergent uncommitted log + // tail[1], as in this example: + // + // idx 1 2 3 4 5 6 7 8 9 + // ----------------- + // term (L) 1 3 3 3 3 3 3 3 7 + // term (F) 1 3 3 4 4 5 5 5 6 + // + // Naively, the leader would probe at idx=9, receive a rejection + // revealing the log term of 6 at the follower. Since the leader's + // term at the previous index is already smaller than 6, the leader- + // side optimization discussed above is ineffective. The leader thus + // probes at index 8 and, naively, receives a rejection for the same + // index and log term 5. Again, the leader optimization does not improve + // over linear probing as term 5 is above the leader's term 3 for that + // and many preceding indexes; the leader would have to probe linearly + // until it would finally hit index 3, where the probe would succeed. + // + // Instead, we apply a similar optimization on the follower. When the + // follower receives the probe at index 8 (log term 3), it concludes + // that all of the leader's log preceding that index has log terms of + // 3 or below. The largest index in the follower's log with a log term + // of 3 or below is index 3. The follower will thus return a rejection + // for index=3, log term=3 instead. The leader's next probe will then + // succeed at that index. + // + // [1]: more precisely, if the log terms in the large uncommitted + // tail on the follower are larger than the leader's. At first, + // it may seem unintuitive that a follower could even have such + // a large tail, but it can happen: + // + // 1. Leader appends (but does not commit) entries 2 and 3, crashes. + // idx 1 2 3 4 5 6 7 8 9 + // ----------------- + // term (L) 1 2 2 [crashes] + // term (F) 1 + // term (F) 1 + // + // 2. a follower becomes leader and appends entries at term 3. + // ----------------- + // term (x) 1 2 2 [down] + // term (F) 1 3 3 3 3 + // term (F) 1 + // + // 3. term 3 leader goes down, term 2 leader returns as term 4 + // leader. It commits the log & entries at term 4. + // + // ----------------- + // term (L) 1 2 2 2 + // term (x) 1 3 3 3 3 [down] + // term (F) 1 + // ----------------- + // term (L) 1 2 2 2 4 4 4 + // term (F) 1 3 3 3 3 [gets probed] + // term (F) 1 2 2 2 4 4 4 + // + // 4. the leader will now probe the returning follower at index + // 7, the rejection points it at the end of the follower's log + // which is at a higher log term than the actually committed + // log. + nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) } - if pr.MaybeDecrTo(m.Index, hint) { + if pr.MaybeDecrTo(m.Index, nextProbeIdx) { r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) if pr.State == tracker.StateReplicate { pr.BecomeProbe() @@ -1392,15 +1487,8 @@ func (r *raft) handleAppendEntries(m pb.Message) { // skip all indexes in the follower's uncommitted tail with terms // greater than the MsgApp's LogTerm. // - // For example, if the leader has: - // [...] (idx=1,term=2)[...](idx=5,term=2) - // and the follower has: - // [...] (idx=1,term=2) (idx=2,term=4)[...](idx=8,term=4) - // - // Then, after receiving a MsgApp (idx=5,term=2), the follower would - // send a rejected MsgAppRsp with a RejectHint of 1 and a LogTerm of 2. - // - // There is similar logic on the leader. + // See the other caller for findConflictByTerm (in stepLeader) for a much + // more detailed explanation of this mechanism. hintIndex := min(m.Index, r.raftLog.lastIndex()) hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) hintTerm, err := r.raftLog.term(hintIndex) diff --git a/raft/raft_test.go b/raft/raft_test.go index bafbcb78d..d105b0bf0 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4481,70 +4481,72 @@ func TestFastLogRejection(t *testing.T) { } for i, test := range tests { - s1 := NewMemoryStorage() - s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} - s1.Append(test.leaderLog) - s2 := NewMemoryStorage() - s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} - s2.Append(test.followerLog) + t.Run("", func(t *testing.T) { + s1 := NewMemoryStorage() + s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} + s1.Append(test.leaderLog) + s2 := NewMemoryStorage() + s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} + s2.Append(test.followerLog) - n1 := newTestRaft(1, 10, 1, s1) - n2 := newTestRaft(2, 10, 1, s2) + n1 := newTestRaft(1, 10, 1, s1) + n2 := newTestRaft(2, 10, 1, s2) - n1.becomeCandidate() - n1.becomeLeader() + n1.becomeCandidate() + n1.becomeLeader() - n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat}) + n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat}) - msgs := n2.readMessages() - if len(msgs) != 1 { - t.Errorf("can't read 1 message from peer 2") - } - if msgs[0].Type != pb.MsgHeartbeatResp { - t.Errorf("can't read heartbeat response from peer 2") - } - if n1.Step(msgs[0]) != nil { - t.Errorf("peer 1 step heartbeat response fail") - } + msgs := n2.readMessages() + if len(msgs) != 1 { + t.Errorf("can't read 1 message from peer 2") + } + if msgs[0].Type != pb.MsgHeartbeatResp { + t.Errorf("can't read heartbeat response from peer 2") + } + if n1.Step(msgs[0]) != nil { + t.Errorf("peer 1 step heartbeat response fail") + } - msgs = n1.readMessages() - if len(msgs) != 1 { - t.Errorf("can't read 1 message from peer 1") - } - if msgs[0].Type != pb.MsgApp { - t.Errorf("can't read append from peer 1") - } + msgs = n1.readMessages() + if len(msgs) != 1 { + t.Errorf("can't read 1 message from peer 1") + } + if msgs[0].Type != pb.MsgApp { + t.Errorf("can't read append from peer 1") + } - if n2.Step(msgs[0]) != nil { - t.Errorf("peer 2 step append fail") - } - msgs = n2.readMessages() - if len(msgs) != 1 { - t.Errorf("can't read 1 message from peer 2") - } - if msgs[0].Type != pb.MsgAppResp { - t.Errorf("can't read append response from peer 2") - } - if !msgs[0].Reject { - t.Errorf("expected rejected append response from peer 2") - } - if msgs[0].LogTerm != test.rejectHintTerm { - t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm) - } - if msgs[0].RejectHint != test.rejectHintIndex { - t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint) - } + if n2.Step(msgs[0]) != nil { + t.Errorf("peer 2 step append fail") + } + msgs = n2.readMessages() + if len(msgs) != 1 { + t.Errorf("can't read 1 message from peer 2") + } + if msgs[0].Type != pb.MsgAppResp { + t.Errorf("can't read append response from peer 2") + } + if !msgs[0].Reject { + t.Errorf("expected rejected append response from peer 2") + } + if msgs[0].LogTerm != test.rejectHintTerm { + t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm) + } + if msgs[0].RejectHint != test.rejectHintIndex { + t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint) + } - if n1.Step(msgs[0]) != nil { - t.Errorf("peer 1 step append fail") - } - msgs = n1.readMessages() - if msgs[0].LogTerm != test.nextAppendTerm { - t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm) - } - if msgs[0].Index != test.nextAppendIndex { - t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index) - } + if n1.Step(msgs[0]) != nil { + t.Errorf("peer 1 step append fail") + } + msgs = n1.readMessages() + if msgs[0].LogTerm != test.nextAppendTerm { + t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm) + } + if msgs[0].Index != test.nextAppendIndex { + t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index) + } + }) } } diff --git a/raft/testdata/campaign_learner_must_vote.txt b/raft/testdata/campaign_learner_must_vote.txt index ed5250049..55d42aa43 100644 --- a/raft/testdata/campaign_learner_must_vote.txt +++ b/raft/testdata/campaign_learner_must_vote.txt @@ -111,7 +111,7 @@ stabilize 2 3 3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3) > 2 receiving messages 3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3) - DEBUG 2 received MsgAppResp(MsgApp was rejected, hint: 3) from 3 for index 4, log term 1 + DEBUG 2 received MsgAppResp(rejected, hint: (index 3, term 1)) from 3 for index 4 DEBUG 2 decreased progress of 3 to [StateProbe match=0 next=4] > 2 handling Ready Ready MustSync=false: diff --git a/raft/testdata/confchange_v1_add_single.txt b/raft/testdata/confchange_v1_add_single.txt index d8c43f6a8..d9cc1a7b1 100644 --- a/raft/testdata/confchange_v1_add_single.txt +++ b/raft/testdata/confchange_v1_add_single.txt @@ -60,7 +60,7 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 + DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] diff --git a/raft/testdata/confchange_v2_add_double_auto.txt b/raft/testdata/confchange_v2_add_double_auto.txt index 5bff3a294..0a5e205bf 100644 --- a/raft/testdata/confchange_v2_add_double_auto.txt +++ b/raft/testdata/confchange_v2_add_double_auto.txt @@ -81,7 +81,7 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 + DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] @@ -155,7 +155,7 @@ stabilize 1 3 3->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 3 for index 3, log term 0 + DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 3 for index 3 DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 5] sent snapshot[index: 5, term: 1] to 3 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=1 paused pendingSnap=5] diff --git a/raft/testdata/confchange_v2_add_double_implicit.txt b/raft/testdata/confchange_v2_add_double_implicit.txt index 50e24c72d..a93eb81cb 100644 --- a/raft/testdata/confchange_v2_add_double_implicit.txt +++ b/raft/testdata/confchange_v2_add_double_implicit.txt @@ -66,7 +66,7 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 + DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] diff --git a/raft/testdata/confchange_v2_add_single_auto.txt b/raft/testdata/confchange_v2_add_single_auto.txt index ee1fae891..47c7f10b8 100644 --- a/raft/testdata/confchange_v2_add_single_auto.txt +++ b/raft/testdata/confchange_v2_add_single_auto.txt @@ -61,7 +61,7 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 + DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] diff --git a/raft/testdata/confchange_v2_add_single_explicit.txt b/raft/testdata/confchange_v2_add_single_explicit.txt index 431b614c5..5ce19dcca 100644 --- a/raft/testdata/confchange_v2_add_single_explicit.txt +++ b/raft/testdata/confchange_v2_add_single_explicit.txt @@ -61,7 +61,7 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 + DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]