From 16ba77767ec45581cf0996b4822813dd8f56d271 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 29 Sep 2014 19:48:49 -0700 Subject: [PATCH 1/2] raft: do not decrease nextIndex and send entries for stale reply --- raft/raft.go | 17 +++++++++++++---- raft/raft_test.go | 7 ++++--- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index b3edc21fd..a223931cd 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -69,10 +69,18 @@ func (pr *progress) update(n int64) { pr.next = n + 1 } -func (pr *progress) decr() { +func (pr *progress) maybeDecrTo(to int64) bool { + // the rejection must be stale if the + // progress has matched with follower + // or "to" does not match next - 1 + if pr.match != 0 || pr.next-1 != to { + return false + } + if pr.next--; pr.next < 1 { pr.next = 1 } + return true } func (pr *progress) String() string { @@ -392,7 +400,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()}) } else { - r.send(pb.Message{To: m.From, Type: msgAppResp, Reject: true}) + r.send(pb.Message{To: m.From, Type: msgAppResp, Index: m.Index, Reject: true}) } } @@ -436,8 +444,9 @@ func stepLeader(r *raft, m pb.Message) { r.bcastAppend() case msgAppResp: if m.Reject { - r.prs[m.From].decr() - r.sendAppend(m.From) + if r.prs[m.From].maybeDecrTo(m.Index) { + r.sendAppend(m.From) + } } else { r.prs[m.From].update(m.Index) if r.maybeCommit() { diff --git a/raft/raft_test.go b/raft/raft_test.go index f6ed504c9..6254fcd30 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -690,8 +690,9 @@ func TestLeaderAppResp(t *testing.T) { windex int64 wcommitted int64 }{ - {-1, true, 1, 1, 0}, // bad resp; leader does not commit; reply with log entries - {2, false, 2, 2, 2}, // good resp; leader commits; broadcast with commit index + {3, true, 0, 0, 0}, // stale resp; no replies + {2, true, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg + {2, false, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index } for i, tt := range tests { @@ -857,7 +858,7 @@ func TestProvideSnap(t *testing.T) { // node 1 needs a snapshot sm.prs[2].next = sm.raftLog.offset - sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: -1, Reject: true}) + sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: sm.prs[2].next - 1, Reject: true}) msgs := sm.ReadMessages() if len(msgs) != 1 { t.Fatalf("len(msgs) = %d, want 1", len(msgs)) From 70bf464cd609b52a4cccbe5dff1ebe7fb98f580d Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 3 Oct 2014 13:42:34 +0800 Subject: [PATCH 2/2] raft: add comment to decrTo --- raft/raft.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/raft/raft.go b/raft/raft.go index a223931cd..323e596a9 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -69,6 +69,8 @@ func (pr *progress) update(n int64) { pr.next = n + 1 } +// maybeDecrTo returns false if the given to index comes from an out of order message. +// Otherwise it decreases the progress next index and returns true. func (pr *progress) maybeDecrTo(to int64) bool { // the rejection must be stale if the // progress has matched with follower