raft: fix a few problems

This commit is contained in:
swingbach@gmail.com 2016-07-11 14:59:53 +08:00
parent e020b2a228
commit 0d9b6ba0ab
3 changed files with 16 additions and 8 deletions

View File

@ -39,8 +39,10 @@ const (
// Possible values for CampaignType // Possible values for CampaignType
const ( const (
LeaderElection CampaignType = "LeaderElection" // campaignElection represents the type of normal election
LeaderTransfer CampaignType = "LeaderTransfer" campaignElection CampaignType = "CampaignElection"
// campaignTransfer represents the type of leader transfer
campaignTransfer CampaignType = "CampaignTransfer"
) )
// CampaignType represents the type of campaigning // CampaignType represents the type of campaigning
@ -544,7 +546,12 @@ func (r *raft) campaign(t CampaignType) {
} }
r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d", r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), id, r.Term) r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), id, r.Term)
r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Entries: []pb.Entry{{Data: []byte(t)}}})
var entries []pb.Entry
if t == campaignTransfer {
entries = []pb.Entry{{Data: []byte(t)}}
}
r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Entries: entries})
} }
} }
@ -569,7 +576,7 @@ func (r *raft) Step(m pb.Message) error {
if m.Type == pb.MsgHup { if m.Type == pb.MsgHup {
if r.state != StateLeader { if r.state != StateLeader {
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(LeaderElection) r.campaign(campaignElection)
} else { } else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
} }
@ -587,7 +594,7 @@ func (r *raft) Step(m pb.Message) error {
case m.Term > r.Term: case m.Term > r.Term:
lead := m.From lead := m.From
if m.Type == pb.MsgVote { if m.Type == pb.MsgVote {
force := len(m.Entries) == 1 && bytes.Equal(m.Entries[0].Data, []byte(LeaderTransfer)) force := len(m.Entries) == 1 && bytes.Equal(m.Entries[0].Data, []byte(campaignTransfer))
inLease := r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout inLease := r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout
if !force && inLease { if !force && inLease {
// If a server receives a RequestVote request within the minimum election timeout // If a server receives a RequestVote request within the minimum election timeout
@ -856,7 +863,7 @@ func stepFollower(r *raft, m pb.Message) {
} }
case pb.MsgTimeoutNow: case pb.MsgTimeoutNow:
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
r.campaign(LeaderTransfer) r.campaign(campaignTransfer)
case pb.MsgReadIndex: case pb.MsgReadIndex:
if r.lead == None { if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)

View File

@ -175,8 +175,8 @@ func testNonleaderStartElection(t *testing.T, state StateType) {
msgs := r.readMessages() msgs := r.readMessages()
sort.Sort(messageSlice(msgs)) sort.Sort(messageSlice(msgs))
wmsgs := []pb.Message{ wmsgs := []pb.Message{
{From: 1, To: 2, Term: 2, Type: pb.MsgVote, Entries: []pb.Entry{{Data: []byte(LeaderElection)}}}, {From: 1, To: 2, Term: 2, Type: pb.MsgVote},
{From: 1, To: 3, Term: 2, Type: pb.MsgVote, Entries: []pb.Entry{{Data: []byte(LeaderElection)}}}, {From: 1, To: 3, Term: 2, Type: pb.MsgVote},
} }
if !reflect.DeepEqual(msgs, wmsgs) { if !reflect.DeepEqual(msgs, wmsgs) {
t.Errorf("msgs = %v, want %v", msgs, wmsgs) t.Errorf("msgs = %v, want %v", msgs, wmsgs)

View File

@ -2217,6 +2217,7 @@ func TestLeaderTransferWithCheckQuorum(t *testing.T) {
r.checkQuorum = true r.checkQuorum = true
} }
// Letting peer 2 electionElapsed reach to timeout so that it can vote for peer 1
f := nt.peers[2].(*raft) f := nt.peers[2].(*raft)
for i := 0; i < f.electionTimeout; i++ { for i := 0; i < f.electionTimeout; i++ {
f.tick() f.tick()