mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: implement fast log rejection
Signed-off-by: qupeng <qupeng@pingcap.com>
This commit is contained in:
parent
b757e1bc87
commit
6828517965
21
raft/log.go
21
raft/log.go
@ -123,7 +123,6 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 {
|
||||
// entries, the index of the first new entry will be returned.
|
||||
// An entry is considered to be conflicting if it has the same index but
|
||||
// a different term.
|
||||
// The first entry MUST have an index equal to the argument 'from'.
|
||||
// The index of the given entries MUST be continuously increasing.
|
||||
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
|
||||
for _, ne := range ents {
|
||||
@ -138,6 +137,26 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// findConflictByTerm takes an (index, term) pair (indicating a conflicting log
|
||||
// entry on a leader/follower during an append) and finds the largest index in
|
||||
// log l with a term <= `term` and an index <= `index`. If no such index exists
|
||||
// in the log, the log's first index is returned.
|
||||
// The index provided MUST be equal to or less than l.lastIndex().
|
||||
func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
|
||||
if index > l.lastIndex() {
|
||||
l.logger.Panicf("index(%d) is out of range [lastIndex(%d)] in findConflictByTerm",
|
||||
index, l.lastIndex())
|
||||
}
|
||||
for {
|
||||
logTerm, err := l.term(index)
|
||||
if logTerm <= term || err != nil {
|
||||
break
|
||||
}
|
||||
index--
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
func (l *raftLog) unstableEntries() []pb.Entry {
|
||||
if len(l.unstable.entries) == 0 {
|
||||
return nil
|
||||
|
61
raft/raft.go
61
raft/raft.go
@ -1106,9 +1106,32 @@ func stepLeader(r *raft, m pb.Message) error {
|
||||
pr.RecentActive = true
|
||||
|
||||
if m.Reject {
|
||||
r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d",
|
||||
r.id, m.RejectHint, m.From, m.Index)
|
||||
if pr.MaybeDecrTo(m.Index, m.RejectHint) {
|
||||
r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, hint: %d) from %x for index %d, log term %d",
|
||||
r.id, m.RejectHint, m.From, m.Index, m.LogTerm)
|
||||
// The reject hint is the suggested next base entry for appending,
|
||||
// i.e. hint+1 would be sent as the next append. 'hint' is usually the
|
||||
// last index of the follower, or the base index (m.Index) from the
|
||||
// append being responsed to.
|
||||
hint := m.RejectHint
|
||||
if m.LogTerm > 0 {
|
||||
// LogTerm is the term that the follower has at index RejectHint. If the
|
||||
// follower has an uncommitted log tail, we would end up probing one by
|
||||
// one until we hit the common prefix.
|
||||
//
|
||||
// For example, if the leader has:
|
||||
// [...] (idx=1,term=2) (idx=2,term=5)[...](idx=9,term=5)
|
||||
// and the follower has:
|
||||
// [...] (idx=1,term=2) (idx=2,term=4)[...](idx=6,term=4)
|
||||
//
|
||||
// Then, after sending (idx=9,term=5) we would receive a RejectHint of 6
|
||||
// and LogTerm of 4. Without the code below, we would try an append at
|
||||
// 6, 5, 4 ... until hitting idx=1 which succeeds (as 1 matches).
|
||||
// Instead, the code below skips all indexes at terms >4, and reduces
|
||||
// 'hint' to 1 in one term (meaning we'll successfully append 2 next
|
||||
// time).
|
||||
hint = r.raftLog.findConflictByTerm(hint, m.LogTerm)
|
||||
}
|
||||
if pr.MaybeDecrTo(m.Index, hint) {
|
||||
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
|
||||
if pr.State == tracker.StateReplicate {
|
||||
pr.BecomeProbe()
|
||||
@ -1361,7 +1384,37 @@ func (r *raft) handleAppendEntries(m pb.Message) {
|
||||
} else {
|
||||
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
|
||||
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
|
||||
|
||||
// Return a hint to the leader about the maximum index and term that the
|
||||
// two logs could be divergent at. Do this by searching through the
|
||||
// follower's log for the maximum (index, term) pair with a term <= the
|
||||
// MsgApp's LogTerm and an index <= the MsgApp's Index. This can help
|
||||
// skip all indexes in the follower's uncommitted tail with terms
|
||||
// greater than the MsgApp's LogTerm.
|
||||
//
|
||||
// For example, if the leader has:
|
||||
// [...] (idx=1,term=2)[...](idx=5,term=2)
|
||||
// and the follower has:
|
||||
// [...] (idx=1,term=2) (idx=2,term=4)[...](idx=8,term=4)
|
||||
//
|
||||
// Then, after receiving a MsgApp (idx=5,term=2), the follower would
|
||||
// send a rejected MsgAppRsp with a RejectHint of 1 and a LogTerm of 2.
|
||||
//
|
||||
// There is similar logic on the leader.
|
||||
hintIndex := min(m.Index, r.raftLog.lastIndex())
|
||||
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
|
||||
hintTerm, err := r.raftLog.term(hintIndex)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
|
||||
}
|
||||
r.send(pb.Message{
|
||||
To: m.From,
|
||||
Type: pb.MsgAppResp,
|
||||
Index: m.Index,
|
||||
Reject: true,
|
||||
RejectHint: hintIndex,
|
||||
LogTerm: hintTerm,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -606,17 +606,18 @@ func TestFollowerCheckMsgApp(t *testing.T) {
|
||||
windex uint64
|
||||
wreject bool
|
||||
wrejectHint uint64
|
||||
wlogterm uint64
|
||||
}{
|
||||
// match with committed entries
|
||||
{0, 0, 1, false, 0},
|
||||
{ents[0].Term, ents[0].Index, 1, false, 0},
|
||||
{0, 0, 1, false, 0, 0},
|
||||
{ents[0].Term, ents[0].Index, 1, false, 0, 0},
|
||||
// match with uncommitted entries
|
||||
{ents[1].Term, ents[1].Index, 2, false, 0},
|
||||
{ents[1].Term, ents[1].Index, 2, false, 0, 0},
|
||||
|
||||
// unmatch with existing entry
|
||||
{ents[0].Term, ents[1].Index, ents[1].Index, true, 2},
|
||||
{ents[0].Term, ents[1].Index, ents[1].Index, true, 1, 1},
|
||||
// unexisting entry
|
||||
{ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2},
|
||||
{ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2, 2},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
storage := newTestMemoryStorage(withPeers(1, 2, 3))
|
||||
@ -629,7 +630,7 @@ func TestFollowerCheckMsgApp(t *testing.T) {
|
||||
|
||||
msgs := r.readMessages()
|
||||
wmsgs := []pb.Message{
|
||||
{From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint},
|
||||
{From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint, LogTerm: tt.wlogterm},
|
||||
}
|
||||
if !reflect.DeepEqual(msgs, wmsgs) {
|
||||
t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs)
|
||||
|
@ -4265,6 +4265,289 @@ func TestConfChangeV2CheckBeforeCampaign(t *testing.T) {
|
||||
testConfChangeCheckBeforeCampaign(t, true)
|
||||
}
|
||||
|
||||
func TestFastLogRejection(t *testing.T) {
|
||||
tests := []struct {
|
||||
leaderLog []pb.Entry // Logs on the leader
|
||||
followerLog []pb.Entry // Logs on the follower
|
||||
rejectHintTerm uint64 // Expected term included in rejected MsgAppResp.
|
||||
rejectHintIndex uint64 // Expected index included in rejected MsgAppResp.
|
||||
nextAppendTerm uint64 // Expected term when leader appends after rejected.
|
||||
nextAppendIndex uint64 // Expected index when leader appends after rejected.
|
||||
}{
|
||||
// This case tests that leader can find the conflict index quickly.
|
||||
// Firstly leader appends (type=MsgApp,index=7,logTerm=4, entries=...);
|
||||
// After rejected leader appends (type=MsgApp,index=3,logTerm=2).
|
||||
{
|
||||
leaderLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 2, Index: 2},
|
||||
{Term: 2, Index: 3},
|
||||
{Term: 4, Index: 4},
|
||||
{Term: 4, Index: 5},
|
||||
{Term: 4, Index: 6},
|
||||
{Term: 4, Index: 7},
|
||||
},
|
||||
followerLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 2, Index: 2},
|
||||
{Term: 2, Index: 3},
|
||||
{Term: 3, Index: 4},
|
||||
{Term: 3, Index: 5},
|
||||
{Term: 3, Index: 6},
|
||||
{Term: 3, Index: 7},
|
||||
{Term: 3, Index: 8},
|
||||
{Term: 3, Index: 9},
|
||||
{Term: 3, Index: 10},
|
||||
{Term: 3, Index: 11},
|
||||
},
|
||||
rejectHintTerm: 3,
|
||||
rejectHintIndex: 7,
|
||||
nextAppendTerm: 2,
|
||||
nextAppendIndex: 3,
|
||||
},
|
||||
// This case tests that leader can find the conflict index quickly.
|
||||
// Firstly leader appends (type=MsgApp,index=8,logTerm=5, entries=...);
|
||||
// After rejected leader appends (type=MsgApp,index=4,logTerm=3).
|
||||
{
|
||||
leaderLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 2, Index: 2},
|
||||
{Term: 2, Index: 3},
|
||||
{Term: 3, Index: 4},
|
||||
{Term: 4, Index: 5},
|
||||
{Term: 4, Index: 6},
|
||||
{Term: 4, Index: 7},
|
||||
{Term: 5, Index: 8},
|
||||
},
|
||||
followerLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 2, Index: 2},
|
||||
{Term: 2, Index: 3},
|
||||
{Term: 3, Index: 4},
|
||||
{Term: 3, Index: 5},
|
||||
{Term: 3, Index: 6},
|
||||
{Term: 3, Index: 7},
|
||||
{Term: 3, Index: 8},
|
||||
{Term: 3, Index: 9},
|
||||
{Term: 3, Index: 10},
|
||||
{Term: 3, Index: 11},
|
||||
},
|
||||
rejectHintTerm: 3,
|
||||
rejectHintIndex: 8,
|
||||
nextAppendTerm: 3,
|
||||
nextAppendIndex: 4,
|
||||
},
|
||||
// This case tests that follower can find the conflict index quickly.
|
||||
// Firstly leader appends (type=MsgApp,index=4,logTerm=1, entries=...);
|
||||
// After rejected leader appends (type=MsgApp,index=1,logTerm=1).
|
||||
{
|
||||
leaderLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 1, Index: 2},
|
||||
{Term: 1, Index: 3},
|
||||
{Term: 1, Index: 4},
|
||||
},
|
||||
followerLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 2, Index: 2},
|
||||
{Term: 2, Index: 3},
|
||||
{Term: 4, Index: 4},
|
||||
},
|
||||
rejectHintTerm: 1,
|
||||
rejectHintIndex: 1,
|
||||
nextAppendTerm: 1,
|
||||
nextAppendIndex: 1,
|
||||
},
|
||||
// This case is similar to the previous case. However, this time, the
|
||||
// leader has a longer uncommitted log tail than the follower.
|
||||
// Firstly leader appends (type=MsgApp,index=6,logTerm=1, entries=...);
|
||||
// After rejected leader appends (type=MsgApp,index=1,logTerm=1).
|
||||
{
|
||||
leaderLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 1, Index: 2},
|
||||
{Term: 1, Index: 3},
|
||||
{Term: 1, Index: 4},
|
||||
{Term: 1, Index: 5},
|
||||
{Term: 1, Index: 6},
|
||||
},
|
||||
followerLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 2, Index: 2},
|
||||
{Term: 2, Index: 3},
|
||||
{Term: 4, Index: 4},
|
||||
},
|
||||
rejectHintTerm: 1,
|
||||
rejectHintIndex: 1,
|
||||
nextAppendTerm: 1,
|
||||
nextAppendIndex: 1,
|
||||
},
|
||||
// This case is similar to the previous case. However, this time, the
|
||||
// follower has a longer uncommitted log tail than the leader.
|
||||
// Firstly leader appends (type=MsgApp,index=4,logTerm=1, entries=...);
|
||||
// After rejected leader appends (type=MsgApp,index=1,logTerm=1).
|
||||
{
|
||||
leaderLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 1, Index: 2},
|
||||
{Term: 1, Index: 3},
|
||||
{Term: 1, Index: 4},
|
||||
},
|
||||
followerLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 2, Index: 2},
|
||||
{Term: 2, Index: 3},
|
||||
{Term: 4, Index: 4},
|
||||
{Term: 4, Index: 5},
|
||||
{Term: 4, Index: 6},
|
||||
},
|
||||
rejectHintTerm: 1,
|
||||
rejectHintIndex: 1,
|
||||
nextAppendTerm: 1,
|
||||
nextAppendIndex: 1,
|
||||
},
|
||||
// An normal case that there are no log conflicts.
|
||||
// Firstly leader appends (type=MsgApp,index=5,logTerm=5, entries=...);
|
||||
// After rejected leader appends (type=MsgApp,index=4,logTerm=4).
|
||||
{
|
||||
leaderLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 1, Index: 2},
|
||||
{Term: 1, Index: 3},
|
||||
{Term: 4, Index: 4},
|
||||
{Term: 5, Index: 5},
|
||||
},
|
||||
followerLog: []pb.Entry{
|
||||
{Term: 1, Index: 1},
|
||||
{Term: 1, Index: 2},
|
||||
{Term: 1, Index: 3},
|
||||
{Term: 4, Index: 4},
|
||||
},
|
||||
rejectHintTerm: 4,
|
||||
rejectHintIndex: 4,
|
||||
nextAppendTerm: 4,
|
||||
nextAppendIndex: 4,
|
||||
},
|
||||
// Test case from example comment in stepLeader (on leader).
|
||||
{
|
||||
leaderLog: []pb.Entry{
|
||||
{Term: 2, Index: 1},
|
||||
{Term: 5, Index: 2},
|
||||
{Term: 5, Index: 3},
|
||||
{Term: 5, Index: 4},
|
||||
{Term: 5, Index: 5},
|
||||
{Term: 5, Index: 6},
|
||||
{Term: 5, Index: 7},
|
||||
{Term: 5, Index: 8},
|
||||
{Term: 5, Index: 9},
|
||||
},
|
||||
followerLog: []pb.Entry{
|
||||
{Term: 2, Index: 1},
|
||||
{Term: 4, Index: 2},
|
||||
{Term: 4, Index: 3},
|
||||
{Term: 4, Index: 4},
|
||||
{Term: 4, Index: 5},
|
||||
{Term: 4, Index: 6},
|
||||
},
|
||||
rejectHintTerm: 4,
|
||||
rejectHintIndex: 6,
|
||||
nextAppendTerm: 2,
|
||||
nextAppendIndex: 1,
|
||||
},
|
||||
// Test case from example comment in handleAppendEntries (on follower).
|
||||
{
|
||||
leaderLog: []pb.Entry{
|
||||
{Term: 2, Index: 1},
|
||||
{Term: 2, Index: 2},
|
||||
{Term: 2, Index: 3},
|
||||
{Term: 2, Index: 4},
|
||||
{Term: 2, Index: 5},
|
||||
},
|
||||
followerLog: []pb.Entry{
|
||||
{Term: 2, Index: 1},
|
||||
{Term: 4, Index: 2},
|
||||
{Term: 4, Index: 3},
|
||||
{Term: 4, Index: 4},
|
||||
{Term: 4, Index: 5},
|
||||
{Term: 4, Index: 6},
|
||||
{Term: 4, Index: 7},
|
||||
{Term: 4, Index: 8},
|
||||
},
|
||||
nextAppendTerm: 2,
|
||||
nextAppendIndex: 1,
|
||||
rejectHintTerm: 2,
|
||||
rejectHintIndex: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
s1 := NewMemoryStorage()
|
||||
s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
|
||||
s1.Append(test.leaderLog)
|
||||
s2 := NewMemoryStorage()
|
||||
s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
|
||||
s2.Append(test.followerLog)
|
||||
|
||||
n1 := newTestRaft(1, 10, 1, s1)
|
||||
n2 := newTestRaft(2, 10, 1, s2)
|
||||
|
||||
n1.becomeCandidate()
|
||||
n1.becomeLeader()
|
||||
|
||||
n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat})
|
||||
|
||||
msgs := n2.readMessages()
|
||||
if len(msgs) != 1 {
|
||||
t.Errorf("can't read 1 message from peer 2")
|
||||
}
|
||||
if msgs[0].Type != pb.MsgHeartbeatResp {
|
||||
t.Errorf("can't read heartbeat response from peer 2")
|
||||
}
|
||||
if n1.Step(msgs[0]) != nil {
|
||||
t.Errorf("peer 1 step heartbeat response fail")
|
||||
}
|
||||
|
||||
msgs = n1.readMessages()
|
||||
if len(msgs) != 1 {
|
||||
t.Errorf("can't read 1 message from peer 1")
|
||||
}
|
||||
if msgs[0].Type != pb.MsgApp {
|
||||
t.Errorf("can't read append from peer 1")
|
||||
}
|
||||
|
||||
if n2.Step(msgs[0]) != nil {
|
||||
t.Errorf("peer 2 step append fail")
|
||||
}
|
||||
msgs = n2.readMessages()
|
||||
if len(msgs) != 1 {
|
||||
t.Errorf("can't read 1 message from peer 2")
|
||||
}
|
||||
if msgs[0].Type != pb.MsgAppResp {
|
||||
t.Errorf("can't read append response from peer 2")
|
||||
}
|
||||
if !msgs[0].Reject {
|
||||
t.Errorf("expected rejected append response from peer 2")
|
||||
}
|
||||
if msgs[0].LogTerm != test.rejectHintTerm {
|
||||
t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm)
|
||||
}
|
||||
if msgs[0].RejectHint != test.rejectHintIndex {
|
||||
t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint)
|
||||
}
|
||||
|
||||
if n1.Step(msgs[0]) != nil {
|
||||
t.Errorf("peer 1 step append fail")
|
||||
}
|
||||
msgs = n1.readMessages()
|
||||
if msgs[0].LogTerm != test.nextAppendTerm {
|
||||
t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm)
|
||||
}
|
||||
if msgs[0].Index != test.nextAppendIndex {
|
||||
t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft {
|
||||
storage := NewMemoryStorage()
|
||||
for i, term := range terms {
|
||||
|
@ -62,6 +62,11 @@ message Message {
|
||||
optional uint64 to = 2 [(gogoproto.nullable) = false];
|
||||
optional uint64 from = 3 [(gogoproto.nullable) = false];
|
||||
optional uint64 term = 4 [(gogoproto.nullable) = false];
|
||||
// logTerm is generally used for appending Raft logs to followers. For example,
|
||||
// (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at
|
||||
// index=101, and the term of entry at index 100 is 5.
|
||||
// (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some
|
||||
// entries from its leader as it already has an entry with term 5 at index 100.
|
||||
optional uint64 logTerm = 5 [(gogoproto.nullable) = false];
|
||||
optional uint64 index = 6 [(gogoproto.nullable) = false];
|
||||
repeated Entry entries = 7 [(gogoproto.nullable) = false];
|
||||
|
6
raft/testdata/campaign_learner_must_vote.txt
vendored
6
raft/testdata/campaign_learner_must_vote.txt
vendored
@ -108,10 +108,10 @@ stabilize 2 3
|
||||
Ready MustSync=false:
|
||||
Lead:2 State:StateFollower
|
||||
Messages:
|
||||
3->2 MsgAppResp Term:2 Log:0/4 Rejected (Hint: 3)
|
||||
3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3)
|
||||
> 2 receiving messages
|
||||
3->2 MsgAppResp Term:2 Log:0/4 Rejected (Hint: 3)
|
||||
DEBUG 2 received MsgAppResp(MsgApp was rejected, lastindex: 3) from 3 for index 4
|
||||
3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3)
|
||||
DEBUG 2 received MsgAppResp(MsgApp was rejected, hint: 3) from 3 for index 4, log term 1
|
||||
DEBUG 2 decreased progress of 3 to [StateProbe match=0 next=4]
|
||||
> 2 handling Ready
|
||||
Ready MustSync=false:
|
||||
|
2
raft/testdata/confchange_v1_add_single.txt
vendored
2
raft/testdata/confchange_v1_add_single.txt
vendored
@ -60,7 +60,7 @@ stabilize
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
> 1 receiving messages
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
|
||||
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
|
||||
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
|
||||
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
|
||||
|
@ -81,7 +81,7 @@ stabilize 1 2
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
> 1 receiving messages
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
|
||||
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
|
||||
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
|
||||
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
|
||||
@ -155,7 +155,7 @@ stabilize 1 3
|
||||
3->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
> 1 receiving messages
|
||||
3->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 3 for index 3
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 3 for index 3, log term 0
|
||||
DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=1]
|
||||
DEBUG 1 [firstindex: 3, commit: 5] sent snapshot[index: 5, term: 1] to 3 [StateProbe match=0 next=1]
|
||||
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=1 paused pendingSnap=5]
|
||||
|
@ -66,7 +66,7 @@ stabilize 1 2
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
> 1 receiving messages
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
|
||||
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
|
||||
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
|
||||
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
|
||||
|
@ -61,7 +61,7 @@ stabilize
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
> 1 receiving messages
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
|
||||
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
|
||||
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
|
||||
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
|
||||
|
@ -61,7 +61,7 @@ stabilize 1 2
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
> 1 receiving messages
|
||||
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, lastindex: 0) from 2 for index 3
|
||||
DEBUG 1 received MsgAppResp(MsgApp was rejected, hint: 0) from 2 for index 3, log term 0
|
||||
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
|
||||
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
|
||||
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
|
||||
|
128
raft/testdata/probe_and_replicate.txt
vendored
128
raft/testdata/probe_and_replicate.txt
vendored
@ -488,9 +488,9 @@ stabilize 1 2
|
||||
Ready MustSync=false:
|
||||
Lead:1 State:StateFollower
|
||||
Messages:
|
||||
2->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 19)
|
||||
2->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 19)
|
||||
> 1 receiving messages
|
||||
2->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 19)
|
||||
2->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 19)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
@ -527,9 +527,9 @@ stabilize 1 3
|
||||
Ready MustSync=false:
|
||||
Lead:1 State:StateFollower
|
||||
Messages:
|
||||
3->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 14)
|
||||
3->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 14)
|
||||
> 1 receiving messages
|
||||
3->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 14)
|
||||
3->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 14)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
@ -617,21 +617,9 @@ stabilize 1 5
|
||||
Ready MustSync=false:
|
||||
Lead:1 State:StateFollower
|
||||
Messages:
|
||||
5->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 22)
|
||||
5->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 18)
|
||||
> 1 receiving messages
|
||||
5->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 22)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->5 MsgApp Term:8 Log:6/19 Commit:21 Entries:[6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 5 receiving messages
|
||||
1->5 MsgApp Term:8 Log:6/19 Commit:21 Entries:[6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 5 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
5->1 MsgAppResp Term:8 Log:0/19 Rejected (Hint: 22)
|
||||
> 1 receiving messages
|
||||
5->1 MsgAppResp Term:8 Log:0/19 Rejected (Hint: 22)
|
||||
5->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 18)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
@ -676,33 +664,9 @@ stabilize 1 6
|
||||
Ready MustSync=false:
|
||||
Lead:1 State:StateFollower
|
||||
Messages:
|
||||
6->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 17)
|
||||
6->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 17)
|
||||
> 1 receiving messages
|
||||
6->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 17)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->6 MsgApp Term:8 Log:5/17 Commit:21 Entries:[6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 6 receiving messages
|
||||
1->6 MsgApp Term:8 Log:5/17 Commit:21 Entries:[6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 6 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
6->1 MsgAppResp Term:8 Log:0/17 Rejected (Hint: 17)
|
||||
> 1 receiving messages
|
||||
6->1 MsgAppResp Term:8 Log:0/17 Rejected (Hint: 17)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->6 MsgApp Term:8 Log:5/16 Commit:21 Entries:[5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 6 receiving messages
|
||||
1->6 MsgApp Term:8 Log:5/16 Commit:21 Entries:[5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 6 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
6->1 MsgAppResp Term:8 Log:0/16 Rejected (Hint: 17)
|
||||
> 1 receiving messages
|
||||
6->1 MsgAppResp Term:8 Log:0/16 Rejected (Hint: 17)
|
||||
6->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 17)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
@ -753,81 +717,9 @@ stabilize 1 7
|
||||
Ready MustSync=false:
|
||||
Lead:1 State:StateFollower
|
||||
Messages:
|
||||
7->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 21)
|
||||
7->1 MsgAppResp Term:8 Log:3/20 Rejected (Hint: 20)
|
||||
> 1 receiving messages
|
||||
7->1 MsgAppResp Term:8 Log:0/20 Rejected (Hint: 21)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->7 MsgApp Term:8 Log:6/19 Commit:21 Entries:[6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 receiving messages
|
||||
1->7 MsgApp Term:8 Log:6/19 Commit:21 Entries:[6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
7->1 MsgAppResp Term:8 Log:0/19 Rejected (Hint: 21)
|
||||
> 1 receiving messages
|
||||
7->1 MsgAppResp Term:8 Log:0/19 Rejected (Hint: 21)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->7 MsgApp Term:8 Log:6/18 Commit:21 Entries:[6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 receiving messages
|
||||
1->7 MsgApp Term:8 Log:6/18 Commit:21 Entries:[6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
7->1 MsgAppResp Term:8 Log:0/18 Rejected (Hint: 21)
|
||||
> 1 receiving messages
|
||||
7->1 MsgAppResp Term:8 Log:0/18 Rejected (Hint: 21)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->7 MsgApp Term:8 Log:5/17 Commit:21 Entries:[6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 receiving messages
|
||||
1->7 MsgApp Term:8 Log:5/17 Commit:21 Entries:[6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
7->1 MsgAppResp Term:8 Log:0/17 Rejected (Hint: 21)
|
||||
> 1 receiving messages
|
||||
7->1 MsgAppResp Term:8 Log:0/17 Rejected (Hint: 21)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->7 MsgApp Term:8 Log:5/16 Commit:21 Entries:[5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 receiving messages
|
||||
1->7 MsgApp Term:8 Log:5/16 Commit:21 Entries:[5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
7->1 MsgAppResp Term:8 Log:0/16 Rejected (Hint: 21)
|
||||
> 1 receiving messages
|
||||
7->1 MsgAppResp Term:8 Log:0/16 Rejected (Hint: 21)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->7 MsgApp Term:8 Log:4/15 Commit:21 Entries:[5/16 EntryNormal "", 5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 receiving messages
|
||||
1->7 MsgApp Term:8 Log:4/15 Commit:21 Entries:[5/16 EntryNormal "", 5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
7->1 MsgAppResp Term:8 Log:0/15 Rejected (Hint: 21)
|
||||
> 1 receiving messages
|
||||
7->1 MsgAppResp Term:8 Log:0/15 Rejected (Hint: 21)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->7 MsgApp Term:8 Log:4/14 Commit:21 Entries:[4/15 EntryNormal "prop_4_15", 5/16 EntryNormal "", 5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 receiving messages
|
||||
1->7 MsgApp Term:8 Log:4/14 Commit:21 Entries:[4/15 EntryNormal "prop_4_15", 5/16 EntryNormal "", 5/17 EntryNormal "prop_5_17", 6/18 EntryNormal "", 6/19 EntryNormal "prop_6_19", 6/20 EntryNormal "prop_6_20", 8/21 EntryNormal ""]
|
||||
> 7 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
7->1 MsgAppResp Term:8 Log:0/14 Rejected (Hint: 21)
|
||||
> 1 receiving messages
|
||||
7->1 MsgAppResp Term:8 Log:0/14 Rejected (Hint: 21)
|
||||
7->1 MsgAppResp Term:8 Log:3/20 Rejected (Hint: 20)
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
|
@ -157,8 +157,8 @@ func (pr *Progress) MaybeUpdate(n uint64) bool {
|
||||
func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
|
||||
|
||||
// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
|
||||
// arguments are the index the follower rejected to append to its log, and its
|
||||
// last index.
|
||||
// arguments are the index of the append message rejected by the follower, and
|
||||
// the hint that we want to decrease to.
|
||||
//
|
||||
// Rejections can happen spuriously as messages are sent out of order or
|
||||
// duplicated. In such cases, the rejection pertains to an index that the
|
||||
@ -167,7 +167,7 @@ func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
|
||||
//
|
||||
// If the rejection is genuine, Next is lowered sensibly, and the Progress is
|
||||
// cleared for sending log entries.
|
||||
func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
|
||||
func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
|
||||
if pr.State == StateReplicate {
|
||||
// The rejection must be stale if the progress has matched and "rejected"
|
||||
// is smaller than "match".
|
||||
@ -176,7 +176,7 @@ func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
|
||||
}
|
||||
// Directly decrease next to match + 1.
|
||||
//
|
||||
// TODO(tbg): why not use last if it's larger?
|
||||
// TODO(tbg): why not use matchHint if it's larger?
|
||||
pr.Next = pr.Match + 1
|
||||
return true
|
||||
}
|
||||
@ -187,7 +187,7 @@ func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
pr.Next = max(min(rejected, last+1), 1)
|
||||
pr.Next = max(min(rejected, matchHint+1), 1)
|
||||
pr.ProbeSent = false
|
||||
return true
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user