raft: reply with the commit index when receives a smaller append message

Follower should not reject the append message with a smaller index than its commit
index. Or it will trigger the leader's resending logic, which might have a high cost.
This commit is contained in:
Xiang Li 2015-03-10 18:33:58 -07:00
parent 83496c3966
commit c643967a41
2 changed files with 18 additions and 7 deletions

View File

@ -663,6 +663,11 @@ func stepFollower(r *raft, m pb.Message) {
} }
func (r *raft) handleAppendEntries(m pb.Message) { func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.Commit {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.Commit})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else { } else {

View File

@ -603,27 +603,33 @@ func TestFollowerCheckMsgApp(t *testing.T) {
tests := []struct { tests := []struct {
term uint64 term uint64
index uint64 index uint64
windex uint64
wreject bool wreject bool
wrejectHint uint64 wrejectHint uint64
}{ }{
{ents[0].Term, ents[0].Index, false, 0}, // match with committed entries
{ents[0].Term, ents[0].Index + 1, true, 2}, {0, 0, 1, false, 0},
{ents[0].Term + 1, ents[0].Index, true, 2}, {ents[0].Term, ents[0].Index, 1, false, 0},
{ents[1].Term, ents[1].Index, false, 0}, // match with uncommitted entries
{3, 3, true, 2}, {ents[1].Term, ents[1].Index, 2, false, 0},
// unmatch with existing entry
{ents[0].Term, ents[1].Index, ents[1].Index, true, 2},
// unexisting entry
{ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2},
} }
for i, tt := range tests { for i, tt := range tests {
storage := NewMemoryStorage() storage := NewMemoryStorage()
storage.Append(ents) storage.Append(ents)
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0) r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
r.loadState(pb.HardState{Commit: 2}) r.loadState(pb.HardState{Commit: 1})
r.becomeFollower(2, 2) r.becomeFollower(2, 2)
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index}) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index})
msgs := r.readMessages() msgs := r.readMessages()
wmsgs := []pb.Message{ wmsgs := []pb.Message{
{From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.index, Reject: tt.wreject, RejectHint: tt.wrejectHint}, {From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint},
} }
if !reflect.DeepEqual(msgs, wmsgs) { if !reflect.DeepEqual(msgs, wmsgs) {
t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs) t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs)