From b93d87f17fcf7c900ad7abfc23a04146f4847e06 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 17 Nov 2014 15:44:57 -0800 Subject: [PATCH] raft: include commitIndex in heartbeat --- raft/raft.go | 12 ++++++++++-- raft/raft_test.go | 16 +++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 376f98468..67152968b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -202,9 +202,17 @@ func (r *raft) sendAppend(to uint64) { // sendHeartbeat sends an empty MsgApp func (r *raft) sendHeartbeat(to uint64) { + // Attach the commit as min(to.matched, r.committed). + // When the leader sends out heartbeat message, + // the receiver(follower) might not be matched with the leader + // or it might not have all the committed entries. + // The leader MUST NOT forward the follower's commit to + // an unmatched index. + commit := min(r.prs[to].match, r.raftLog.committed) m := pb.Message{ - To: to, - Type: pb.MsgApp, + To: to, + Type: pb.MsgApp, + Commit: commit, } r.send(m) } diff --git a/raft/raft_test.go b/raft/raft_test.go index 5677e36dc..8e1265e93 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -911,13 +911,20 @@ func TestBcastBeat(t *testing.T) { for i := 0; i < 10; i++ { sm.appendEntry(pb.Entry{}) } + // slow follower + sm.prs[2].match, sm.prs[2].next = 5, 6 + // normal follower + sm.prs[3].match, sm.prs[3].next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() if len(msgs) != 2 { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } - tomap := map[uint64]bool{2: true, 3: true} + wantCommitMap := map[uint64]uint64{ + 2: min(sm.raftLog.committed, sm.prs[2].match), + 3: min(sm.raftLog.committed, sm.prs[3].match), + } for i, m := range msgs { if m.Type != pb.MsgApp { t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp) @@ -928,10 +935,13 @@ func TestBcastBeat(t *testing.T) { if m.LogTerm != 0 { t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0) } - if !tomap[m.To] { + if wantCommitMap[m.To] == 0 { t.Fatalf("#%d: unexpected to %d", i, m.To) } else { - delete(tomap, m.To) + if m.Commit != wantCommitMap[m.To] { + t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To]) + } + delete(wantCommitMap, m.To) } if len(m.Entries) != 0 { t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))