Clarify documentation of probing

- Add a large detailed comment about the use and necessity of
  both the follower and leader probing optimization
- fix the log message in stepLeader that previously mixed up the
  log term for the rejection and the index of the append
- improve the test via subtests
- add some verbiage in findConflictByTerm around first index
This commit is contained in:
Tobias Grieger 2021-02-11 13:30:55 +01:00
parent 6828517965
commit c1e8d3a63f
9 changed files with 198 additions and 98 deletions

View File

@ -141,11 +141,21 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
// entry on a leader/follower during an append) and finds the largest index in
// log l with a term <= `term` and an index <= `index`. If no such index exists
// in the log, the log's first index is returned.
// The index provided MUST be equal to or less than l.lastIndex().
//
// The index provided MUST be equal to or less than l.lastIndex(). Invalid
// inputs log a warning and the input index is returned.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
if index > l.lastIndex() {
l.logger.Panicf("index(%d) is out of range [lastIndex(%d)] in findConflictByTerm",
index, l.lastIndex())
if li := l.lastIndex(); index > li {
// NB: such calls should not exist, but since there is a straightfoward
// way to recover, do it.
//
// It is tempting to also check something about the first index, but
// there is odd behavior with peers that have no log, in which case
// lastIndex will return zero and firstIndex will return one, which
// leads to calls with an index of zero into this method.
l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm",
index, li)
return index
}
for {
logTerm, err := l.term(index)

View File

@ -1106,32 +1106,127 @@ func stepLeader(r *raft, m pb.Message) error {
pr.RecentActive = true
if m.Reject {
r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, hint: %d) from %x for index %d, log term %d",
r.id, m.RejectHint, m.From, m.Index, m.LogTerm)
// The reject hint is the suggested next base entry for appending,
// i.e. hint+1 would be sent as the next append. 'hint' is usually the
// last index of the follower, or the base index (m.Index) from the
// append being responsed to.
hint := m.RejectHint
// RejectHint is the suggested next base entry for appending (i.e.
// we try to append entry RejectHint+1 next), and LogTerm is the
// term that the follower has at index RejectHint. Older versions
// of this library did not populate LogTerm for rejections and it
// is zero for followers with an empty log.
//
// Under normal circumstances, the leader's log is longer than the
// follower's and the follower's log is a prefix of the leader's
// (i.e. there is no divergent uncommitted suffix of the log on the
// follower). In that case, the first probe reveals where the
// follower's log ends (RejectHint=follower's last index) and the
// subsequent probe succeeds.
//
// However, when networks are partitioned or systems overloaded,
// large divergent log tails can occur. The naive attempt, probing
// entry by entry in decreasing order, will be the product of the
// length of the diverging tails and the network round-trip latency,
// which can easily result in hours of time spent probing and can
// even cause outright outages. The probes are thus optimized as
// described below.
r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d",
r.id, m.RejectHint, m.LogTerm, m.From, m.Index)
nextProbeIdx := m.RejectHint
if m.LogTerm > 0 {
// LogTerm is the term that the follower has at index RejectHint. If the
// follower has an uncommitted log tail, we would end up probing one by
// one until we hit the common prefix.
// If the follower has an uncommitted log tail, we would end up
// probing one by one until we hit the common prefix.
//
// For example, if the leader has:
// [...] (idx=1,term=2) (idx=2,term=5)[...](idx=9,term=5)
// and the follower has:
// [...] (idx=1,term=2) (idx=2,term=4)[...](idx=6,term=4)
//
// Then, after sending (idx=9,term=5) we would receive a RejectHint of 6
// and LogTerm of 4. Without the code below, we would try an append at
// 6, 5, 4 ... until hitting idx=1 which succeeds (as 1 matches).
// Instead, the code below skips all indexes at terms >4, and reduces
// 'hint' to 1 in one term (meaning we'll successfully append 2 next
// time).
hint = r.raftLog.findConflictByTerm(hint, m.LogTerm)
// idx 1 2 3 4 5 6 7 8 9
// -----------------
// term (L) 1 3 3 3 5 5 5 5 5
// term (F) 1 1 1 1 2 2
//
// Then, after sending an append anchored at (idx=9,term=5) we
// would receive a RejectHint of 6 and LogTerm of 2. Without the
// code below, we would try an append at index 6, which would
// fail again.
//
// However, looking only at what the leader knows about its own
// log and the rejection hint, it is clear that a probe at index
// 6, 5, 4, 3, and 2 must fail as well:
//
// For all of these indexes, the leader's log term is larger than
// the rejection's log term. If a probe at one of these indexes
// succeeded, its log term at that index would match the leader's,
// i.e. 3 or 5 in this example. But the follower already told the
// leader that it is still at term 2 at index 9, and since the
// log term only ever goes up (within a log), this is a contradiction.
//
// At index 1, however, the leader can draw no such conclusion,
// as its term 1 is not larger than the term 2 from the
// follower's rejection. We thus probe at 1, which will succeed
// in this example. In general, with this approach we probe at
// most once per term found in the leader's log.
//
// There is a similar mechanism on the follower (implemented in
// handleAppendEntries via a call to findConflictByTerm) that is
// useful if the follower has a large divergent uncommitted log
// tail[1], as in this example:
//
// idx 1 2 3 4 5 6 7 8 9
// -----------------
// term (L) 1 3 3 3 3 3 3 3 7
// term (F) 1 3 3 4 4 5 5 5 6
//
// Naively, the leader would probe at idx=9, receive a rejection
// revealing the log term of 6 at the follower. Since the leader's
// term at the previous index is already smaller than 6, the leader-
// side optimization discussed above is ineffective. The leader thus
// probes at index 8 and, naively, receives a rejection for the same
// index and log term 5. Again, the leader optimization does not improve
// over linear probing as term 5 is above the leader's term 3 for that
// and many preceding indexes; the leader would have to probe linearly
// until it would finally hit index 3, where the probe would succeed.
//
// Instead, we apply a similar optimization on the follower. When the
// follower receives the probe at index 8 (log term 3), it concludes
// that all of the leader's log preceding that index has log terms of
// 3 or below. The largest index in the follower's log with a log term
// of 3 or below is index 3. The follower will thus return a rejection
// for index=3, log term=3 instead. The leader's next probe will then
// succeed at that index.
//
// [1]: more precisely, if the log terms in the large uncommitted
// tail on the follower are larger than the leader's. At first,
// it may seem unintuitive that a follower could even have such
// a large tail, but it can happen:
//
// 1. Leader appends (but does not commit) entries 2 and 3, crashes.
// idx 1 2 3 4 5 6 7 8 9
// -----------------
// term (L) 1 2 2 [crashes]
// term (F) 1
// term (F) 1
//
// 2. a follower becomes leader and appends entries at term 3.
// -----------------
// term (x) 1 2 2 [down]
// term (F) 1 3 3 3 3
// term (F) 1
//
// 3. term 3 leader goes down, term 2 leader returns as term 4
// leader. It commits the log & entries at term 4.
//
// -----------------
// term (L) 1 2 2 2
// term (x) 1 3 3 3 3 [down]
// term (F) 1
// -----------------
// term (L) 1 2 2 2 4 4 4
// term (F) 1 3 3 3 3 [gets probed]
// term (F) 1 2 2 2 4 4 4
//
// 4. the leader will now probe the returning follower at index
// 7, the rejection points it at the end of the follower's log
// which is at a higher log term than the actually committed
// log.
nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
}
if pr.MaybeDecrTo(m.Index, hint) {
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
@ -1392,15 +1487,8 @@ func (r *raft) handleAppendEntries(m pb.Message) {
// skip all indexes in the follower's uncommitted tail with terms
// greater than the MsgApp's LogTerm.
//
// For example, if the leader has:
// [...] (idx=1,term=2)[...](idx=5,term=2)
// and the follower has:
// [...] (idx=1,term=2) (idx=2,term=4)[...](idx=8,term=4)
//
// Then, after receiving a MsgApp (idx=5,term=2), the follower would
// send a rejected MsgAppRsp with a RejectHint of 1 and a LogTerm of 2.
//
// There is similar logic on the leader.
// See the other caller for findConflictByTerm (in stepLeader) for a much
// more detailed explanation of this mechanism.
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
hintTerm, err := r.raftLog.term(hintIndex)

View File

@ -4481,70 +4481,72 @@ func TestFastLogRejection(t *testing.T) {
}
for i, test := range tests {
s1 := NewMemoryStorage()
s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
s1.Append(test.leaderLog)
s2 := NewMemoryStorage()
s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
s2.Append(test.followerLog)
t.Run("", func(t *testing.T) {
s1 := NewMemoryStorage()
s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
s1.Append(test.leaderLog)
s2 := NewMemoryStorage()
s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
s2.Append(test.followerLog)
n1 := newTestRaft(1, 10, 1, s1)
n2 := newTestRaft(2, 10, 1, s2)
n1 := newTestRaft(1, 10, 1, s1)
n2 := newTestRaft(2, 10, 1, s2)
n1.becomeCandidate()
n1.becomeLeader()
n1.becomeCandidate()
n1.becomeLeader()
n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat})
n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat})
msgs := n2.readMessages()
if len(msgs) != 1 {
t.Errorf("can't read 1 message from peer 2")
}
if msgs[0].Type != pb.MsgHeartbeatResp {
t.Errorf("can't read heartbeat response from peer 2")
}
if n1.Step(msgs[0]) != nil {
t.Errorf("peer 1 step heartbeat response fail")
}
msgs := n2.readMessages()
if len(msgs) != 1 {
t.Errorf("can't read 1 message from peer 2")
}
if msgs[0].Type != pb.MsgHeartbeatResp {
t.Errorf("can't read heartbeat response from peer 2")
}
if n1.Step(msgs[0]) != nil {
t.Errorf("peer 1 step heartbeat response fail")
}
msgs = n1.readMessages()
if len(msgs) != 1 {
t.Errorf("can't read 1 message from peer 1")
}
if msgs[0].Type != pb.MsgApp {
t.Errorf("can't read append from peer 1")
}
msgs = n1.readMessages()
if len(msgs) != 1 {
t.Errorf("can't read 1 message from peer 1")
}
if msgs[0].Type != pb.MsgApp {
t.Errorf("can't read append from peer 1")
}
if n2.Step(msgs[0]) != nil {
t.Errorf("peer 2 step append fail")
}
msgs = n2.readMessages()
if len(msgs) != 1 {
t.Errorf("can't read 1 message from peer 2")
}
if msgs[0].Type != pb.MsgAppResp {
t.Errorf("can't read append response from peer 2")
}
if !msgs[0].Reject {
t.Errorf("expected rejected append response from peer 2")
}
if msgs[0].LogTerm != test.rejectHintTerm {
t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm)
}
if msgs[0].RejectHint != test.rejectHintIndex {
t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint)
}
if n2.Step(msgs[0]) != nil {
t.Errorf("peer 2 step append fail")
}
msgs = n2.readMessages()
if len(msgs) != 1 {
t.Errorf("can't read 1 message from peer 2")
}
if msgs[0].Type != pb.MsgAppResp {
t.Errorf("can't read append response from peer 2")
}
if !msgs[0].Reject {
t.Errorf("expected rejected append response from peer 2")
}
if msgs[0].LogTerm != test.rejectHintTerm {
t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm)
}
if msgs[0].RejectHint != test.rejectHintIndex {
t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint)
}
if n1.Step(msgs[0]) != nil {
t.Errorf("peer 1 step append fail")
}
msgs = n1.readMessages()
if msgs[0].LogTerm != test.nextAppendTerm {
t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm)
}
if msgs[0].Index != test.nextAppendIndex {
t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index)
}
if n1.Step(msgs[0]) != nil {
t.Errorf("peer 1 step append fail")
}
msgs = n1.readMessages()
if msgs[0].LogTerm != test.nextAppendTerm {
t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm)
}
if msgs[0].Index != test.nextAppendIndex {
t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index)
}
})
}
}

View File

@ -111,7 +111,7 @@ stabilize 2 3
3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3)
> 2 receiving messages
3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3)
DEBUG 2 received MsgAppResp(MsgApp was rejected, hint: 3) from 3 for index 4, log term 1
DEBUG 2 received MsgAppResp(rejected, hint: (index 3, term 1)) from 3 for index 4
DEBUG 2 decreased progress of 3 to [StateProbe match=0 next=4]
> 2 handling Ready
Ready MustSync=false:

View File

@ -60,7 +60,7 @@ stabilize
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]

View File

@ -81,7 +81,7 @@ stabilize 1 2
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
@ -155,7 +155,7 @@ stabilize 1 3
3->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 3 for index 3, log term 0
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 3 for index 3
DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 5] sent snapshot[index: 5, term: 1] to 3 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=1 paused pendingSnap=5]

View File

@ -66,7 +66,7 @@ stabilize 1 2
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]

View File

@ -61,7 +61,7 @@ stabilize
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]

View File

@ -61,7 +61,7 @@ stabilize 1 2
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]