raft: include commitIndex in heartbeat

This commit is contained in:
Xiang Li 2014-11-17 15:44:57 -08:00
parent e04e4632b3
commit b93d87f17f
2 changed files with 23 additions and 5 deletions

View File

@ -202,9 +202,17 @@ func (r *raft) sendAppend(to uint64) {
// sendHeartbeat sends an empty MsgApp
func (r *raft) sendHeartbeat(to uint64) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.prs[to].match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgApp,
To: to,
Type: pb.MsgApp,
Commit: commit,
}
r.send(m)
}

View File

@ -911,13 +911,20 @@ func TestBcastBeat(t *testing.T) {
for i := 0; i < 10; i++ {
sm.appendEntry(pb.Entry{})
}
// slow follower
sm.prs[2].match, sm.prs[2].next = 5, 6
// normal follower
sm.prs[3].match, sm.prs[3].next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
sm.Step(pb.Message{Type: pb.MsgBeat})
msgs := sm.readMessages()
if len(msgs) != 2 {
t.Fatalf("len(msgs) = %v, want 2", len(msgs))
}
tomap := map[uint64]bool{2: true, 3: true}
wantCommitMap := map[uint64]uint64{
2: min(sm.raftLog.committed, sm.prs[2].match),
3: min(sm.raftLog.committed, sm.prs[3].match),
}
for i, m := range msgs {
if m.Type != pb.MsgApp {
t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
@ -928,10 +935,13 @@ func TestBcastBeat(t *testing.T) {
if m.LogTerm != 0 {
t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
}
if !tomap[m.To] {
if wantCommitMap[m.To] == 0 {
t.Fatalf("#%d: unexpected to %d", i, m.To)
} else {
delete(tomap, m.To)
if m.Commit != wantCommitMap[m.To] {
t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
}
delete(wantCommitMap, m.To)
}
if len(m.Entries) != 0 {
t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))