diff --git a/raft/log.go b/raft/log.go index b386b9932..032fc10d3 100644 --- a/raft/log.go +++ b/raft/log.go @@ -53,8 +53,10 @@ func (l *raftLog) String() string { return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents)) } -func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) bool { - lastnewi := index + uint64(len(ents)) +// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, +// it returns (last index of entries, true). +func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { + lastnewi = index + uint64(len(ents)) if l.matchTerm(index, logTerm) { from := index + 1 ci := l.findConflict(from, ents) @@ -70,9 +72,9 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry if l.committed < tocommit { l.committed = tocommit } - return true + return lastnewi, true } - return false + return 0, false } func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 { diff --git a/raft/raft.go b/raft/raft.go index 09f91b25f..b2e7e8f9c 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -379,8 +379,8 @@ func (r *raft) Step(m pb.Message) error { } func (r *raft) handleAppendEntries(m pb.Message) { - if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) + if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true}) } @@ -428,6 +428,9 @@ func stepLeader(r *raft, m pb.Message) { r.appendEntry(e) r.bcastAppend() case pb.MsgAppResp: + if m.Index == 0 { + return + } if m.Reject { if r.prs[m.From].maybeDecrTo(m.Index) { r.sendAppend(m.From) diff --git a/raft/raft_test.go b/raft/raft_test.go index 04476a63e..4efe6a1ae 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -778,16 +778,22 @@ func TestAllServerStepdown(t *testing.T) { } func TestLeaderAppResp(t *testing.T) { + // initial progress: match = 0; netx = 3 tests := []struct { - index uint64 - reject bool + index uint64 + reject bool + // progress + wmatch uint64 + wnext uint64 + // message wmsgNum int windex uint64 wcommitted uint64 }{ - {3, true, 0, 0, 0}, // stale resp; no replies - {2, true, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg - {2, false, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index + {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies + {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg + {2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index + {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies } for i, tt := range tests { @@ -799,6 +805,15 @@ func TestLeaderAppResp(t *testing.T) { sm.becomeLeader() sm.ReadMessages() sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject}) + + p := sm.prs[2] + if p.match != tt.wmatch { + t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch) + } + if p.next != tt.wnext { + t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext) + } + msgs := sm.ReadMessages() if len(msgs) != tt.wmsgNum {