From 233617bea2c25ca90903a91c547d84d8b3c6eff9 Mon Sep 17 00:00:00 2001 From: Soheil Hassas Yeganeh Date: Thu, 23 Oct 2014 14:56:17 -0400 Subject: [PATCH 1/3] raft: Make MsgAppRes ack only the last index in MsgApp As explained in #1366, the leader will fail to transmit the missed logs if the leader receives a hearbeat response from a follower that is not yet matched in the leader. In other words, there are append responses that do not explicitly reject an append but implied a gap. This commit is based on @xiangli-cmu's idea. We should only acknowledge upto the index of logs in the append message. This way responses to heartbeats would never interfer with the log synchronization because their log index is always 0. Fixes #1366 --- raft/raft.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/raft/raft.go b/raft/raft.go index 09f91b25f..99586c850 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -380,7 +380,11 @@ func (r *raft) Step(m pb.Message) error { func (r *raft) handleAppendEntries(m pb.Message) { if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) + mlastIndex := m.Index + if len(m.Entries) != 0 { + mlastIndex = m.Entries[len(m.Entries)-1].Index + } + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true}) } @@ -428,6 +432,9 @@ func stepLeader(r *raft, m pb.Message) { r.appendEntry(e) r.bcastAppend() case pb.MsgAppResp: + if m.Index == 0 { + return + } if m.Reject { if r.prs[m.From].maybeDecrTo(m.Index) { r.sendAppend(m.From) From 09e9618b025bff82a0b61caabf8e1979fe58f3c5 Mon Sep 17 00:00:00 2001 From: Soheil Hassas Yeganeh Date: Thu, 23 Oct 2014 15:37:13 -0400 Subject: [PATCH 2/3] raft: change raftLog.maybeAppend to return the last new index As per @unihorn's comment on #1366, we change raftLog.maybeAppend to return the last new index of entries in maybeAppend. --- raft/log.go | 10 ++++++---- raft/raft.go | 6 +----- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/raft/log.go b/raft/log.go index b386b9932..032fc10d3 100644 --- a/raft/log.go +++ b/raft/log.go @@ -53,8 +53,10 @@ func (l *raftLog) String() string { return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents)) } -func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) bool { - lastnewi := index + uint64(len(ents)) +// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, +// it returns (last index of entries, true). +func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { + lastnewi = index + uint64(len(ents)) if l.matchTerm(index, logTerm) { from := index + 1 ci := l.findConflict(from, ents) @@ -70,9 +72,9 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry if l.committed < tocommit { l.committed = tocommit } - return true + return lastnewi, true } - return false + return 0, false } func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 { diff --git a/raft/raft.go b/raft/raft.go index 99586c850..b2e7e8f9c 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -379,11 +379,7 @@ func (r *raft) Step(m pb.Message) error { } func (r *raft) handleAppendEntries(m pb.Message) { - if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { - mlastIndex := m.Index - if len(m.Entries) != 0 { - mlastIndex = m.Entries[len(m.Entries)-1].Index - } + if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true}) From 507300130bf9c3e831d72c7a6d402fc4fce1f24f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 24 Oct 2014 11:50:21 -0700 Subject: [PATCH 3/3] raft: add tests for ignoring heartbeat reply --- raft/raft_test.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index 04476a63e..4efe6a1ae 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -778,16 +778,22 @@ func TestAllServerStepdown(t *testing.T) { } func TestLeaderAppResp(t *testing.T) { + // initial progress: match = 0; netx = 3 tests := []struct { - index uint64 - reject bool + index uint64 + reject bool + // progress + wmatch uint64 + wnext uint64 + // message wmsgNum int windex uint64 wcommitted uint64 }{ - {3, true, 0, 0, 0}, // stale resp; no replies - {2, true, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg - {2, false, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index + {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies + {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg + {2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index + {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies } for i, tt := range tests { @@ -799,6 +805,15 @@ func TestLeaderAppResp(t *testing.T) { sm.becomeLeader() sm.ReadMessages() sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject}) + + p := sm.prs[2] + if p.match != tt.wmatch { + t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch) + } + if p.next != tt.wnext { + t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext) + } + msgs := sm.ReadMessages() if len(msgs) != tt.wmsgNum {