From 21d116d3e18e9fa4cda509ebdcebebefa5d0f382 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 15 Sep 2014 09:58:22 -0700 Subject: [PATCH] raft: fix heartbeat --- raft/raft.go | 2 +- raft/raft_test.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/raft/raft.go b/raft/raft.go index 2245749f0..a61d9de83 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -189,7 +189,7 @@ func (r *raft) sendAppend(to int64) { // sendHeartbeat sends RRPC, without entries to the given peer. func (r *raft) sendHeartbeat(to int64) { pr := r.prs[to] - index := max(pr.next-1, r.raftLog.lastIndex()) + index := max(pr.next-1, r.raftLog.offset) m := pb.Message{ To: to, Type: msgApp, diff --git a/raft/raft_test.go b/raft/raft_test.go index ddf612aaf..8da8d0661 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -731,6 +731,68 @@ func TestLeaderAppResp(t *testing.T) { } } +// When the leader receives a heartbeat tick, it should +// send a msgApp with m.Index = max(progress.next-1,log.offset) and empty +// entries. +func TestBcastBeat(t *testing.T) { + offset := int64(1000) + // make a state machine with log.offset = 1000 + s := pb.Snapshot{ + Index: offset, + Term: 1, + Nodes: []int64{1, 2}, + } + sm := newRaft(1, []int64{1, 2}, 0, 0) + sm.Term = 1 + sm.restore(s) + + sm.becomeCandidate() + sm.becomeLeader() + for i := 0; i < 10; i++ { + sm.appendEntry(pb.Entry{}) + } + + tests := []struct { + pnext int64 + windex int64 + wterm int64 + wto int64 + }{ + {offset + 1, offset, 1, 2}, + {offset + 2, offset + 1, 2, 2}, + // pr.next -1 < offset + {offset, offset, 1, 2}, + {offset - 1, offset, 1, 2}, + } + + for i, tt := range tests { + sm.prs[2].match = 0 + sm.prs[2].next = tt.pnext + + sm.Step(pb.Message{Type: msgBeat}) + msgs := sm.ReadMessages() + if len(msgs) != 1 { + t.Fatalf("#%d: len(msgs) = %v, want 1", i, len(msgs)) + } + m := msgs[0] + if m.Type != msgApp { + t.Fatalf("#%d: type = %v, want = %v", i, m.Type, msgApp) + } + if m.Index != tt.windex { + t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, tt.windex) + } + if m.LogTerm != tt.wterm { + t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, tt.wterm) + } + if m.To != tt.wto { + t.Fatalf("#%d: to = %d, want %d", i, m.To, tt.wto) + } + if len(m.Entries) != 0 { + t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries)) + } + } +} + // tests the output of the statemachine when receiving msgBeat func TestRecvMsgBeat(t *testing.T) { tests := []struct {