From 682851796524be86f958d1526ad579938b5cdd50 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 2 Jun 2020 14:26:58 +0800 Subject: [PATCH] raft: implement fast log rejection Signed-off-by: qupeng --- raft/log.go | 21 +- raft/raft.go | 61 +++- raft/raft_paper_test.go | 13 +- raft/raft_test.go | 283 ++++++++++++++++++ raft/raftpb/raft.proto | 5 + raft/testdata/campaign_learner_must_vote.txt | 6 +- raft/testdata/confchange_v1_add_single.txt | 2 +- .../confchange_v2_add_double_auto.txt | 4 +- .../confchange_v2_add_double_implicit.txt | 2 +- .../confchange_v2_add_single_auto.txt | 2 +- .../confchange_v2_add_single_explicit.txt | 2 +- raft/testdata/probe_and_replicate.txt | 128 +------- raft/tracker/progress.go | 10 +- 13 files changed, 396 insertions(+), 143 deletions(-) diff --git a/raft/log.go b/raft/log.go index b786e7d4a..dac141624 100644 --- a/raft/log.go +++ b/raft/log.go @@ -123,7 +123,6 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 { // entries, the index of the first new entry will be returned. // An entry is considered to be conflicting if it has the same index but // a different term. -// The first entry MUST have an index equal to the argument 'from'. // The index of the given entries MUST be continuously increasing. func (l *raftLog) findConflict(ents []pb.Entry) uint64 { for _, ne := range ents { @@ -138,6 +137,26 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { return 0 } +// findConflictByTerm takes an (index, term) pair (indicating a conflicting log +// entry on a leader/follower during an append) and finds the largest index in +// log l with a term <= `term` and an index <= `index`. If no such index exists +// in the log, the log's first index is returned. +// The index provided MUST be equal to or less than l.lastIndex(). +func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 { + if index > l.lastIndex() { + l.logger.Panicf("index(%d) is out of range [lastIndex(%d)] in findConflictByTerm", + index, l.lastIndex()) + } + for { + logTerm, err := l.term(index) + if logTerm <= term || err != nil { + break + } + index-- + } + return index +} + func (l *raftLog) unstableEntries() []pb.Entry { if len(l.unstable.entries) == 0 { return nil diff --git a/raft/raft.go b/raft/raft.go index e048534a4..6f9364656 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1106,9 +1106,32 @@ func stepLeader(r *raft, m pb.Message) error { pr.RecentActive = true if m.Reject { - r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d", - r.id, m.RejectHint, m.From, m.Index) - if pr.MaybeDecrTo(m.Index, m.RejectHint) { + r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, hint: %d) from %x for index %d, log term %d", + r.id, m.RejectHint, m.From, m.Index, m.LogTerm) + // The reject hint is the suggested next base entry for appending, + // i.e. hint+1 would be sent as the next append. 'hint' is usually the + // last index of the follower, or the base index (m.Index) from the + // append being responsed to. + hint := m.RejectHint + if m.LogTerm > 0 { + // LogTerm is the term that the follower has at index RejectHint. If the + // follower has an uncommitted log tail, we would end up probing one by + // one until we hit the common prefix. + // + // For example, if the leader has: + // [...] (idx=1,term=2) (idx=2,term=5)[...](idx=9,term=5) + // and the follower has: + // [...] (idx=1,term=2) (idx=2,term=4)[...](idx=6,term=4) + // + // Then, after sending (idx=9,term=5) we would receive a RejectHint of 6 + // and LogTerm of 4. Without the code below, we would try an append at + // 6, 5, 4 ... until hitting idx=1 which succeeds (as 1 matches). + // Instead, the code below skips all indexes at terms >4, and reduces + // 'hint' to 1 in one term (meaning we'll successfully append 2 next + // time). + hint = r.raftLog.findConflictByTerm(hint, m.LogTerm) + } + if pr.MaybeDecrTo(m.Index, hint) { r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) if pr.State == tracker.StateReplicate { pr.BecomeProbe() @@ -1361,7 +1384,37 @@ func (r *raft) handleAppendEntries(m pb.Message) { } else { r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) + + // Return a hint to the leader about the maximum index and term that the + // two logs could be divergent at. Do this by searching through the + // follower's log for the maximum (index, term) pair with a term <= the + // MsgApp's LogTerm and an index <= the MsgApp's Index. This can help + // skip all indexes in the follower's uncommitted tail with terms + // greater than the MsgApp's LogTerm. + // + // For example, if the leader has: + // [...] (idx=1,term=2)[...](idx=5,term=2) + // and the follower has: + // [...] (idx=1,term=2) (idx=2,term=4)[...](idx=8,term=4) + // + // Then, after receiving a MsgApp (idx=5,term=2), the follower would + // send a rejected MsgAppRsp with a RejectHint of 1 and a LogTerm of 2. + // + // There is similar logic on the leader. + hintIndex := min(m.Index, r.raftLog.lastIndex()) + hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) + hintTerm, err := r.raftLog.term(hintIndex) + if err != nil { + panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err)) + } + r.send(pb.Message{ + To: m.From, + Type: pb.MsgAppResp, + Index: m.Index, + Reject: true, + RejectHint: hintIndex, + LogTerm: hintTerm, + }) } } diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 05b0ae027..9c71ebaa2 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -606,17 +606,18 @@ func TestFollowerCheckMsgApp(t *testing.T) { windex uint64 wreject bool wrejectHint uint64 + wlogterm uint64 }{ // match with committed entries - {0, 0, 1, false, 0}, - {ents[0].Term, ents[0].Index, 1, false, 0}, + {0, 0, 1, false, 0, 0}, + {ents[0].Term, ents[0].Index, 1, false, 0, 0}, // match with uncommitted entries - {ents[1].Term, ents[1].Index, 2, false, 0}, + {ents[1].Term, ents[1].Index, 2, false, 0, 0}, // unmatch with existing entry - {ents[0].Term, ents[1].Index, ents[1].Index, true, 2}, + {ents[0].Term, ents[1].Index, ents[1].Index, true, 1, 1}, // unexisting entry - {ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2}, + {ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2, 2}, } for i, tt := range tests { storage := newTestMemoryStorage(withPeers(1, 2, 3)) @@ -629,7 +630,7 @@ func TestFollowerCheckMsgApp(t *testing.T) { msgs := r.readMessages() wmsgs := []pb.Message{ - {From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint}, + {From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint, LogTerm: tt.wlogterm}, } if !reflect.DeepEqual(msgs, wmsgs) { t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs) diff --git a/raft/raft_test.go b/raft/raft_test.go index 59d47b097..bafbcb78d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4265,6 +4265,289 @@ func TestConfChangeV2CheckBeforeCampaign(t *testing.T) { testConfChangeCheckBeforeCampaign(t, true) } +func TestFastLogRejection(t *testing.T) { + tests := []struct { + leaderLog []pb.Entry // Logs on the leader + followerLog []pb.Entry // Logs on the follower + rejectHintTerm uint64 // Expected term included in rejected MsgAppResp. + rejectHintIndex uint64 // Expected index included in rejected MsgAppResp. + nextAppendTerm uint64 // Expected term when leader appends after rejected. + nextAppendIndex uint64 // Expected index when leader appends after rejected. + }{ + // This case tests that leader can find the conflict index quickly. + // Firstly leader appends (type=MsgApp,index=7,logTerm=4, entries=...); + // After rejected leader appends (type=MsgApp,index=3,logTerm=2). + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 2, Index: 2}, + {Term: 2, Index: 3}, + {Term: 4, Index: 4}, + {Term: 4, Index: 5}, + {Term: 4, Index: 6}, + {Term: 4, Index: 7}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 2, Index: 2}, + {Term: 2, Index: 3}, + {Term: 3, Index: 4}, + {Term: 3, Index: 5}, + {Term: 3, Index: 6}, + {Term: 3, Index: 7}, + {Term: 3, Index: 8}, + {Term: 3, Index: 9}, + {Term: 3, Index: 10}, + {Term: 3, Index: 11}, + }, + rejectHintTerm: 3, + rejectHintIndex: 7, + nextAppendTerm: 2, + nextAppendIndex: 3, + }, + // This case tests that leader can find the conflict index quickly. + // Firstly leader appends (type=MsgApp,index=8,logTerm=5, entries=...); + // After rejected leader appends (type=MsgApp,index=4,logTerm=3). + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 2, Index: 2}, + {Term: 2, Index: 3}, + {Term: 3, Index: 4}, + {Term: 4, Index: 5}, + {Term: 4, Index: 6}, + {Term: 4, Index: 7}, + {Term: 5, Index: 8}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 2, Index: 2}, + {Term: 2, Index: 3}, + {Term: 3, Index: 4}, + {Term: 3, Index: 5}, + {Term: 3, Index: 6}, + {Term: 3, Index: 7}, + {Term: 3, Index: 8}, + {Term: 3, Index: 9}, + {Term: 3, Index: 10}, + {Term: 3, Index: 11}, + }, + rejectHintTerm: 3, + rejectHintIndex: 8, + nextAppendTerm: 3, + nextAppendIndex: 4, + }, + // This case tests that follower can find the conflict index quickly. + // Firstly leader appends (type=MsgApp,index=4,logTerm=1, entries=...); + // After rejected leader appends (type=MsgApp,index=1,logTerm=1). + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 1, Index: 3}, + {Term: 1, Index: 4}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 2, Index: 2}, + {Term: 2, Index: 3}, + {Term: 4, Index: 4}, + }, + rejectHintTerm: 1, + rejectHintIndex: 1, + nextAppendTerm: 1, + nextAppendIndex: 1, + }, + // This case is similar to the previous case. However, this time, the + // leader has a longer uncommitted log tail than the follower. + // Firstly leader appends (type=MsgApp,index=6,logTerm=1, entries=...); + // After rejected leader appends (type=MsgApp,index=1,logTerm=1). + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 1, Index: 3}, + {Term: 1, Index: 4}, + {Term: 1, Index: 5}, + {Term: 1, Index: 6}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 2, Index: 2}, + {Term: 2, Index: 3}, + {Term: 4, Index: 4}, + }, + rejectHintTerm: 1, + rejectHintIndex: 1, + nextAppendTerm: 1, + nextAppendIndex: 1, + }, + // This case is similar to the previous case. However, this time, the + // follower has a longer uncommitted log tail than the leader. + // Firstly leader appends (type=MsgApp,index=4,logTerm=1, entries=...); + // After rejected leader appends (type=MsgApp,index=1,logTerm=1). + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 1, Index: 3}, + {Term: 1, Index: 4}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 2, Index: 2}, + {Term: 2, Index: 3}, + {Term: 4, Index: 4}, + {Term: 4, Index: 5}, + {Term: 4, Index: 6}, + }, + rejectHintTerm: 1, + rejectHintIndex: 1, + nextAppendTerm: 1, + nextAppendIndex: 1, + }, + // An normal case that there are no log conflicts. + // Firstly leader appends (type=MsgApp,index=5,logTerm=5, entries=...); + // After rejected leader appends (type=MsgApp,index=4,logTerm=4). + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 1, Index: 3}, + {Term: 4, Index: 4}, + {Term: 5, Index: 5}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 1, Index: 3}, + {Term: 4, Index: 4}, + }, + rejectHintTerm: 4, + rejectHintIndex: 4, + nextAppendTerm: 4, + nextAppendIndex: 4, + }, + // Test case from example comment in stepLeader (on leader). + { + leaderLog: []pb.Entry{ + {Term: 2, Index: 1}, + {Term: 5, Index: 2}, + {Term: 5, Index: 3}, + {Term: 5, Index: 4}, + {Term: 5, Index: 5}, + {Term: 5, Index: 6}, + {Term: 5, Index: 7}, + {Term: 5, Index: 8}, + {Term: 5, Index: 9}, + }, + followerLog: []pb.Entry{ + {Term: 2, Index: 1}, + {Term: 4, Index: 2}, + {Term: 4, Index: 3}, + {Term: 4, Index: 4}, + {Term: 4, Index: 5}, + {Term: 4, Index: 6}, + }, + rejectHintTerm: 4, + rejectHintIndex: 6, + nextAppendTerm: 2, + nextAppendIndex: 1, + }, + // Test case from example comment in handleAppendEntries (on follower). + { + leaderLog: []pb.Entry{ + {Term: 2, Index: 1}, + {Term: 2, Index: 2}, + {Term: 2, Index: 3}, + {Term: 2, Index: 4}, + {Term: 2, Index: 5}, + }, + followerLog: []pb.Entry{ + {Term: 2, Index: 1}, + {Term: 4, Index: 2}, + {Term: 4, Index: 3}, + {Term: 4, Index: 4}, + {Term: 4, Index: 5}, + {Term: 4, Index: 6}, + {Term: 4, Index: 7}, + {Term: 4, Index: 8}, + }, + nextAppendTerm: 2, + nextAppendIndex: 1, + rejectHintTerm: 2, + rejectHintIndex: 1, + }, + } + + for i, test := range tests { + s1 := NewMemoryStorage() + s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} + s1.Append(test.leaderLog) + s2 := NewMemoryStorage() + s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} + s2.Append(test.followerLog) + + n1 := newTestRaft(1, 10, 1, s1) + n2 := newTestRaft(2, 10, 1, s2) + + n1.becomeCandidate() + n1.becomeLeader() + + n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat}) + + msgs := n2.readMessages() + if len(msgs) != 1 { + t.Errorf("can't read 1 message from peer 2") + } + if msgs[0].Type != pb.MsgHeartbeatResp { + t.Errorf("can't read heartbeat response from peer 2") + } + if n1.Step(msgs[0]) != nil { + t.Errorf("peer 1 step heartbeat response fail") + } + + msgs = n1.readMessages() + if len(msgs) != 1 { + t.Errorf("can't read 1 message from peer 1") + } + if msgs[0].Type != pb.MsgApp { + t.Errorf("can't read append from peer 1") + } + + if n2.Step(msgs[0]) != nil { + t.Errorf("peer 2 step append fail") + } + msgs = n2.readMessages() + if len(msgs) != 1 { + t.Errorf("can't read 1 message from peer 2") + } + if msgs[0].Type != pb.MsgAppResp { + t.Errorf("can't read append response from peer 2") + } + if !msgs[0].Reject { + t.Errorf("expected rejected append response from peer 2") + } + if msgs[0].LogTerm != test.rejectHintTerm { + t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm) + } + if msgs[0].RejectHint != test.rejectHintIndex { + t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint) + } + + if n1.Step(msgs[0]) != nil { + t.Errorf("peer 1 step append fail") + } + msgs = n1.readMessages() + if msgs[0].LogTerm != test.nextAppendTerm { + t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm) + } + if msgs[0].Index != test.nextAppendIndex { + t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index) + } + } +} + func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft { storage := NewMemoryStorage() for i, term := range terms { diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index ed4e45151..5136e5108 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -62,6 +62,11 @@ message Message { optional uint64 to = 2 [(gogoproto.nullable) = false]; optional uint64 from = 3 [(gogoproto.nullable) = false]; optional uint64 term = 4 [(gogoproto.nullable) = false]; + // logTerm is generally used for appending Raft logs to followers. For example, + // (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at + // index=101, and the term of entry at index 100 is 5. + // (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some + // entries from its leader as it already has an entry with term 5 at index 100. optional uint64 logTerm = 5 [(gogoproto.nullable) = false]; optional uint64 index = 6 [(gogoproto.nullable) = false]; repeated Entry entries = 7 [(gogoproto.nullable) = false]; diff --git a/raft/testdata/campaign_learner_must_vote.txt b/raft/testdata/campaign_learner_must_vote.txt index c9a95ad3e..ed5250049 100644 --- a/raft/testdata/campaign_learner_must_vote.txt +++ b/raft/testdata/campaign_learner_must_vote.txt @@ -108,10 +108,10 @@ stabilize 2 3 Ready MustSync=false: Lead:2 State:StateFollower Messages: - 3->2 MsgAppResp Term:2 Log:0/4 Rejected (Hint: 3) + 3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3) > 2 receiving messages - 3->2 MsgAppResp Term:2 Log:0/4 Rejected (Hint: 3) - DEBUG 2 received MsgAppResp(MsgApp was rejected, lastindex: 3) from 3 for index 4 + 3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3) + DEBUG 2 received MsgAppResp(MsgApp was rejected, hint: 3) from 3 for index 4, log term 1 DEBUG 2 decreased progress of 3 to [StateProbe match=0 next=4] > 2 handling Ready Ready MustSync=false: diff --git a/raft/testdata/confchange_v1_add_single.txt b/raft/testdata/confchange_v1_add_single.txt index 4f93e4f17..d8c43f6a8 100644 --- a/raft/testdata/confchange_v1_add_single.txt +++ b/raft/testdata/confchange_v1_add_single.txt @@ -60,7 +60,7 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3 + DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] diff --git a/raft/testdata/confchange_v2_add_double_auto.txt b/raft/testdata/confchange_v2_add_double_auto.txt index 4a58e5f0a..5bff3a294 100644 --- a/raft/testdata/confchange_v2_add_double_auto.txt +++ b/raft/testdata/confchange_v2_add_double_auto.txt @@ -81,7 +81,7 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3 + DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] @@ -155,7 +155,7 @@ stabilize 1 3 3->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 3 for index 3 + DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 3 for index 3, log term 0 DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 5] sent snapshot[index: 5, term: 1] to 3 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=1 paused pendingSnap=5] diff --git a/raft/testdata/confchange_v2_add_double_implicit.txt b/raft/testdata/confchange_v2_add_double_implicit.txt index 827e63b0e..50e24c72d 100644 --- a/raft/testdata/confchange_v2_add_double_implicit.txt +++ b/raft/testdata/confchange_v2_add_double_implicit.txt @@ -66,7 +66,7 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3 + DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] diff --git a/raft/testdata/confchange_v2_add_single_auto.txt b/raft/testdata/confchange_v2_add_single_auto.txt index 223a29b7c..ee1fae891 100644 --- a/raft/testdata/confchange_v2_add_single_auto.txt +++ b/raft/testdata/confchange_v2_add_single_auto.txt @@ -61,7 +61,7 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3 + DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] diff --git a/raft/testdata/confchange_v2_add_single_explicit.txt b/raft/testdata/confchange_v2_add_single_explicit.txt index 0042aac51..431b614c5 100644 --- a/raft/testdata/confchange_v2_add_single_explicit.txt +++ b/raft/testdata/confchange_v2_add_single_explicit.txt @@ -61,7 +61,7 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) - DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3 + DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] diff --git a/raft/testdata/probe_and_replicate.txt b/raft/testdata/probe_and_replicate.txt index b8995ef90..bebae6ef9 100644 --- a/raft/testdata/probe_and_replicate.txt +++ b/raft/testdata/probe_and_replicate.txt @@ -488,9 +488,9 @@ stabilize 1 2 Ready MustSync=false: Lead:1 State:StateFollower Messages: - 2->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 19) + 2->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 19) > 1 receiving messages - 2->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 19) + 2->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 19) > 1 handling Ready Ready MustSync=false: Messages: @@ -527,9 +527,9 @@ stabilize 1 3 Ready MustSync=false: Lead:1 State:StateFollower Messages: - 3->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 14) + 3->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 14) > 1 receiving messages - 3->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 14) + 3->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 14) > 1 handling Ready Ready MustSync=false: Messages: @@ -617,21 +617,9 @@ stabilize 1 5 Ready MustSync=false: Lead:1 State:StateFollower Messages: - 5->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 22) + 5->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 18) > 1 receiving messages - 5->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 22) -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->5 MsgApp Term:8 Log:6/19 Commit:21 Entries:[6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 5 receiving messages - 1->5 MsgApp Term:8 Log:6/19 Commit:21 Entries:[6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 5 handling Ready - Ready MustSync=false: - Messages: - 5->1 MsgAppResp Term:8 Log:0/19 Rejected (Hint: 22) -> 1 receiving messages - 5->1 MsgAppResp Term:8 Log:0/19 Rejected (Hint: 22) + 5->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 18) > 1 handling Ready Ready MustSync=false: Messages: @@ -676,33 +664,9 @@ stabilize 1 6 Ready MustSync=false: Lead:1 State:StateFollower Messages: - 6->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 17) + 6->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 17) > 1 receiving messages - 6->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 17) -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->6 MsgApp Term:8 Log:5/17 Commit:21 Entries:[6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 6 receiving messages - 1->6 MsgApp Term:8 Log:5/17 Commit:21 Entries:[6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 6 handling Ready - Ready MustSync=false: - Messages: - 6->1 MsgAppResp Term:8 Log:0/17 Rejected (Hint: 17) -> 1 receiving messages - 6->1 MsgAppResp Term:8 Log:0/17 Rejected (Hint: 17) -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->6 MsgApp Term:8 Log:5/16 Commit:21 Entries:[5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 6 receiving messages - 1->6 MsgApp Term:8 Log:5/16 Commit:21 Entries:[5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 6 handling Ready - Ready MustSync=false: - Messages: - 6->1 MsgAppResp Term:8 Log:0/16 Rejected (Hint: 17) -> 1 receiving messages - 6->1 MsgAppResp Term:8 Log:0/16 Rejected (Hint: 17) + 6->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 17) > 1 handling Ready Ready MustSync=false: Messages: @@ -753,81 +717,9 @@ stabilize 1 7 Ready MustSync=false: Lead:1 State:StateFollower Messages: - 7->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 21) + 7->1 MsgAppResp Term:8 Log:3/20 Rejected (Hint: 20) > 1 receiving messages - 7->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 21) -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->7 MsgApp Term:8 Log:6/19 Commit:21 Entries:[6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 receiving messages - 1->7 MsgApp Term:8 Log:6/19 Commit:21 Entries:[6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 handling Ready - Ready MustSync=false: - Messages: - 7->1 MsgAppResp Term:8 Log:0/19 Rejected (Hint: 21) -> 1 receiving messages - 7->1 MsgAppResp Term:8 Log:0/19 Rejected (Hint: 21) -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->7 MsgApp Term:8 Log:6/18 Commit:21 Entries:[6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 receiving messages - 1->7 MsgApp Term:8 Log:6/18 Commit:21 Entries:[6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 handling Ready - Ready MustSync=false: - Messages: - 7->1 MsgAppResp Term:8 Log:0/18 Rejected (Hint: 21) -> 1 receiving messages - 7->1 MsgAppResp Term:8 Log:0/18 Rejected (Hint: 21) -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->7 MsgApp Term:8 Log:5/17 Commit:21 Entries:[6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 receiving messages - 1->7 MsgApp Term:8 Log:5/17 Commit:21 Entries:[6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 handling Ready - Ready MustSync=false: - Messages: - 7->1 MsgAppResp Term:8 Log:0/17 Rejected (Hint: 21) -> 1 receiving messages - 7->1 MsgAppResp Term:8 Log:0/17 Rejected (Hint: 21) -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->7 MsgApp Term:8 Log:5/16 Commit:21 Entries:[5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 receiving messages - 1->7 MsgApp Term:8 Log:5/16 Commit:21 Entries:[5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 handling Ready - Ready MustSync=false: - Messages: - 7->1 MsgAppResp Term:8 Log:0/16 Rejected (Hint: 21) -> 1 receiving messages - 7->1 MsgAppResp Term:8 Log:0/16 Rejected (Hint: 21) -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->7 MsgApp Term:8 Log:4/15 Commit:21 Entries:[5/16 EntryNormal "", 5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 receiving messages - 1->7 MsgApp Term:8 Log:4/15 Commit:21 Entries:[5/16 EntryNormal "", 5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 handling Ready - Ready MustSync=false: - Messages: - 7->1 MsgAppResp Term:8 Log:0/15 Rejected (Hint: 21) -> 1 receiving messages - 7->1 MsgAppResp Term:8 Log:0/15 Rejected (Hint: 21) -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->7 MsgApp Term:8 Log:4/14 Commit:21 Entries:[4/15 EntryNormal "prop_4_15", 5/16 EntryNormal "", 5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 receiving messages - 1->7 MsgApp Term:8 Log:4/14 Commit:21 Entries:[4/15 EntryNormal "prop_4_15", 5/16 EntryNormal "", 5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""] -> 7 handling Ready - Ready MustSync=false: - Messages: - 7->1 MsgAppResp Term:8 Log:0/14 Rejected (Hint: 21) -> 1 receiving messages - 7->1 MsgAppResp Term:8 Log:0/14 Rejected (Hint: 21) + 7->1 MsgAppResp Term:8 Log:3/20 Rejected (Hint: 20) > 1 handling Ready Ready MustSync=false: Messages: diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index cdb8168bb..a36e5261a 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -157,8 +157,8 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 } // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The -// arguments are the index the follower rejected to append to its log, and its -// last index. +// arguments are the index of the append message rejected by the follower, and +// the hint that we want to decrease to. // // Rejections can happen spuriously as messages are sent out of order or // duplicated. In such cases, the rejection pertains to an index that the @@ -167,7 +167,7 @@ func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 } // // If the rejection is genuine, Next is lowered sensibly, and the Progress is // cleared for sending log entries. -func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool { +func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { if pr.State == StateReplicate { // The rejection must be stale if the progress has matched and "rejected" // is smaller than "match". @@ -176,7 +176,7 @@ func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool { } // Directly decrease next to match + 1. // - // TODO(tbg): why not use last if it's larger? + // TODO(tbg): why not use matchHint if it's larger? pr.Next = pr.Match + 1 return true } @@ -187,7 +187,7 @@ func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool { return false } - pr.Next = max(min(rejected, last+1), 1) + pr.Next = max(min(rejected, matchHint+1), 1) pr.ProbeSent = false return true }