raft: Refactor vote handling

Move all vote handling from the per-state step functions to the
top-level Step(). This wasn't necessary before because MsgVote would
cause us to become a follower, but MsgPreVote needs to be handled
without changing the node's current state.
This commit is contained in:
Ben Darnell 2016-10-19 11:07:43 +08:00
parent 73cae7abd0
commit cf93a74aa8
2 changed files with 121 additions and 45 deletions

View File

@ -38,6 +38,7 @@ const (
StateCandidate
StateLeader
StatePreCandidate
numStates
)
type ReadOnlyOption int
@ -676,29 +677,7 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) {
}
func (r *raft) Step(m pb.Message) error {
if m.Type == pb.MsgHup {
if r.state != StateLeader {
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return nil
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
}
return nil
}
// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
// local message
@ -730,6 +709,7 @@ func (r *raft) Step(m pb.Message) error {
r.id, r.Term, m.Type, m.From, m.Term)
r.becomeFollower(m.Term, lead)
}
case m.Term < r.Term:
if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
// We have received messages from a leader at a lower term. It is possible
@ -753,7 +733,48 @@ func (r *raft) Step(m pb.Message) error {
}
return nil
}
r.step(r, m)
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return nil
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
}
case pb.MsgVote, pb.MsgPreVote:
if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true})
}
default:
r.step(r, m)
}
return nil
}
@ -797,11 +818,6 @@ func stepLeader(r *raft, m pb.Message) {
r.appendEntry(m.Entries...)
r.bcastAppend()
return
case pb.MsgVote, pb.MsgPreVote:
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true})
return
case pb.MsgReadIndex:
if r.quorum() > 1 {
// thinking: use an interally defined context instead of the user given context.
@ -983,10 +999,6 @@ func stepCandidate(r *raft, m pb.Message) {
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From)
r.handleSnapshot(m)
case pb.MsgVote, pb.MsgPreVote:
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true})
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
@ -1028,18 +1040,6 @@ func stepFollower(r *raft, m pb.Message) {
r.electionElapsed = 0
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgVote, pb.MsgPreVote:
if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.electionElapsed = 0
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.Vote = m.From
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type)})
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true})
}
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)

View File

@ -366,6 +366,82 @@ func TestLeaderCycle(t *testing.T) {
}
}
func TestVoteFromAnyState(t *testing.T) {
for _, vt := range []pb.MessageType{pb.MsgVote, pb.MsgPreVote} {
for st := StateType(0); st < numStates; st++ {
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r.Term = 1
switch st {
case StateFollower:
r.becomeFollower(r.Term, 3)
case StatePreCandidate:
r.becomePreCandidate()
case StateCandidate:
r.becomeCandidate()
case StateLeader:
r.becomeCandidate()
r.becomeLeader()
}
// Note that setting our state above may have advanced r.Term
// past its initial value.
origTerm := r.Term
newTerm := r.Term + 1
msg := pb.Message{
From: 2,
To: 1,
Type: vt,
Term: newTerm,
LogTerm: newTerm,
Index: 42,
}
if err := r.Step(msg); err != nil {
t.Errorf("%s,%s: Step failed: %s", vt, st, err)
}
if len(r.msgs) != 1 {
t.Errorf("%s,%s: %d response messages, want 1: %+v", vt, st, len(r.msgs), r.msgs)
} else {
resp := r.msgs[0]
if resp.Type != voteRespMsgType(vt) {
t.Errorf("%s,%s: response message is %s, want %s",
vt, st, resp.Type, voteRespMsgType(vt))
}
if resp.Reject {
t.Errorf("%s,%s: unexpected rejection", vt, st)
}
}
// If this was a real vote, we reset our state and term.
if vt == pb.MsgVote {
if r.state != StateFollower {
t.Errorf("%s,%s: state %s, want %s", vt, StateFollower, r.state, st)
}
if r.Term != newTerm {
t.Errorf("%s,%s: term %d, want %d", vt, st, r.Term, newTerm)
}
if r.Vote != 2 {
t.Errorf("%s,%s: vote %d, want 2", vt, st, r.Vote)
}
} else {
// In a prevote, nothing changes.
if r.state != st {
t.Errorf("%s,%s: state %s, want %s", vt, st, r.state, st)
}
if r.Term != origTerm {
t.Errorf("%s,%s: term %d, want %d", vt, st, r.Term, origTerm)
}
// if st == StateFollower or StatePreCandidate, r hasn't voted yet.
// In StateCandidate or StateLeader, it's voted for itself.
if r.Vote != None && r.Vote != 1 {
t.Errorf("%s,%s: vote %d, want %d or 1", vt, st, r.Vote, None)
}
}
}
}
}
func TestLogReplication(t *testing.T) {
tests := []struct {
*network