diff --git a/raft/raft.go b/raft/raft.go index 0c6810f31..a77024181 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -49,6 +49,12 @@ var stmap = [...]string{ stateLeader: "stateLeader", } +var stepmap = [...]stepFunc{ + stateFollower: stepFollower, + stateCandidate: stepCandidate, + stateLeader: stepLeader, +} + func (st stateType) String() string { return stmap[int(st)] } @@ -241,8 +247,7 @@ func (sm *stateMachine) Msgs() []Message { } func (sm *stateMachine) Step(m Message) (ok bool) { - switch m.Type { - case msgHup: + if m.Type == msgHup { sm.becomeCandidate() if sm.q() == sm.poll(sm.id, true) { sm.becomeLeader() @@ -256,43 +261,11 @@ func (sm *stateMachine) Step(m Message) (ok bool) { sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log.term(lasti)}) } return true - case msgBeat: - if sm.state != stateLeader { - return true - } - sm.bcastAppend() - return - case msgProp: - if len(m.Entries) != 1 { - panic("unexpected length(entries) of a msgProp") - } - - switch sm.lead { - case sm.id: - e := m.Entries[0] - if e.isConfig() { - if sm.pendingConf { - return false - } - sm.pendingConf = true - } - e.Term = sm.term - - sm.log.append(sm.log.lastIndex(), e) - sm.ins[sm.id].update(sm.log.lastIndex()) - sm.maybeCommit() - sm.bcastAppend() - case none: - // msgProp given without leader - return false - default: - m.To = sm.lead - sm.send(m) - } - return true } switch { + case m.Term == 0: + // local message case m.Term > sm.term: sm.becomeFollower(m.Term, m.From) case m.Term < sm.term: @@ -300,61 +273,15 @@ func (sm *stateMachine) Step(m Message) (ok bool) { return true } - handleAppendEntries := func() { - if sm.log.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { - sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()}) - } else { - sm.send(Message{To: m.From, Type: msgAppResp, Index: -1}) - } - } + return stepmap[sm.state](sm, m) +} - switch sm.state { - case stateLeader: - switch m.Type { - case msgAppResp: - if m.Index < 0 { - sm.ins[m.From].decr() - sm.sendAppend(m.From) - } else { - sm.ins[m.From].update(m.Index) - if sm.maybeCommit() { - sm.bcastAppend() - } - } - case msgVote: - sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) - } - case stateCandidate: - switch m.Type { - case msgApp: - sm.becomeFollower(sm.term, m.From) - handleAppendEntries() - case msgVote: - sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) - case msgVoteResp: - gr := sm.poll(m.From, m.Index >= 0) - switch sm.q() { - case gr: - sm.becomeLeader() - sm.bcastAppend() - case len(sm.votes) - gr: - sm.becomeFollower(sm.term, none) - } - } - case stateFollower: - switch m.Type { - case msgApp: - handleAppendEntries() - case msgVote: - if (sm.vote == none || sm.vote == m.From) && sm.log.isUpToDate(m.Index, m.LogTerm) { - sm.vote = m.From - sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.log.lastIndex()}) - } else { - sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) - } - } +func (sm *stateMachine) handleAppendEntries(m Message) { + if sm.log.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { + sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()}) + } else { + sm.send(Message{To: m.From, Type: msgAppResp, Index: -1}) } - return true } func (sm *stateMachine) addNode(id int) { @@ -366,3 +293,85 @@ func (sm *stateMachine) removeNode(id int) { delete(sm.ins, id) sm.pendingConf = false } + +type stepFunc func(sm *stateMachine, m Message) bool + +func stepLeader(sm *stateMachine, m Message) bool { + switch m.Type { + case msgBeat: + sm.bcastAppend() + case msgProp: + if len(m.Entries) != 1 { + panic("unexpected length(entries) of a msgProp") + } + e := m.Entries[0] + if e.isConfig() { + if sm.pendingConf { + return false + } + sm.pendingConf = true + } + e.Term = sm.term + + sm.log.append(sm.log.lastIndex(), e) + sm.ins[sm.id].update(sm.log.lastIndex()) + sm.maybeCommit() + sm.bcastAppend() + case msgAppResp: + if m.Index < 0 { + sm.ins[m.From].decr() + sm.sendAppend(m.From) + } else { + sm.ins[m.From].update(m.Index) + if sm.maybeCommit() { + sm.bcastAppend() + } + } + case msgVote: + sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) + } + return true +} + +func stepCandidate(sm *stateMachine, m Message) bool { + switch m.Type { + case msgProp: + return false + case msgApp: + sm.becomeFollower(sm.term, m.From) + sm.handleAppendEntries(m) + case msgVote: + sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) + case msgVoteResp: + gr := sm.poll(m.From, m.Index >= 0) + switch sm.q() { + case gr: + sm.becomeLeader() + sm.bcastAppend() + case len(sm.votes) - gr: + sm.becomeFollower(sm.term, none) + } + } + return true +} + +func stepFollower(sm *stateMachine, m Message) bool { + switch m.Type { + case msgProp: + if sm.lead == none { + return false + } + m.To = sm.lead + sm.send(m) + case msgApp: + sm.handleAppendEntries(m) + case msgVote: + if (sm.vote == none || sm.vote == m.From) && sm.log.isUpToDate(m.Index, m.LogTerm) { + sm.vote = m.From + sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.log.lastIndex()}) + } else { + sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) + } + } + return true +}