diff --git a/raft/raft.go b/raft/raft.go index df2191322..9283b3f8c 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -56,9 +56,7 @@ func (st StateType) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("%q", st.String())), nil } -type progress struct { - match, next uint64 -} +type progress struct{ match, next uint64 } func (pr *progress) update(n uint64) { if pr.match < n { @@ -69,9 +67,7 @@ func (pr *progress) update(n uint64) { } } -func (pr *progress) optimisticUpdate(n uint64) { - pr.next = n + 1 -} +func (pr *progress) optimisticUpdate(n uint64) { pr.next = n + 1 } // maybeDecrTo returns false if the given to index comes from an out of order message. // Otherwise it decreases the progress next index and returns true. @@ -183,34 +179,15 @@ func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()} } -func (r *raft) String() string { - s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term) - switch r.state { - case StateFollower: - s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead) - case StateCandidate: - s += fmt.Sprintf(` votes="%v"`, r.votes) - case StateLeader: - s += fmt.Sprintf(` prs="%v"`, r.prs) - } - return s -} +func (r *raft) q() int { return len(r.prs)/2 + 1 } -func (r *raft) poll(id uint64, v bool) (granted int) { - if v { - log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term) - } else { - log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term) +func (r *raft) nodes() []uint64 { + nodes := make([]uint64, 0, len(r.prs)) + for k := range r.prs { + nodes = append(nodes, k) } - if _, ok := r.votes[id]; !ok { - r.votes[id] = v - } - for _, vv := range r.votes { - if vv { - granted++ - } - } - return granted + sort.Sort(uint64Slice(nodes)) + return nodes } // send persists state to stable storage and then sends to its mailbox. @@ -323,10 +300,6 @@ func (r *raft) reset(term uint64) { r.pendingConf = false } -func (r *raft) q() int { - return len(r.prs)/2 + 1 -} - func (r *raft) appendEntry(e pb.Entry) { e.Term = r.Term e.Index = r.raftLog.lastIndex() + 1 @@ -418,6 +391,23 @@ func (r *raft) campaign() { } } +func (r *raft) poll(id uint64, v bool) (granted int) { + if v { + log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term) + } else { + log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term) + } + if _, ok := r.votes[id]; !ok { + r.votes[id] = v + } + for _, vv := range r.votes { + if vv { + granted++ + } + } + return granted +} + func (r *raft) Step(m pb.Message) error { // TODO(bmizerany): this likely allocs - prevent that. defer func() { r.Commit = r.raftLog.committed }() @@ -449,51 +439,6 @@ func (r *raft) Step(m pb.Message) error { return nil } -func (r *raft) handleAppendEntries(m pb.Message) { - if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) - } else { - log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", - r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From) - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true}) - } -} - -func (r *raft) handleHeartbeat(m pb.Message) { - r.raftLog.commitTo(m.Commit) -} - -func (r *raft) handleSnapshot(m pb.Message) { - sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term - if r.restore(m.Snapshot) { - log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]", - r.id, r.Commit, sindex, sterm) - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) - } else { - log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]", - r.id, r.Commit, sindex, sterm) - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) - } -} - -func (r *raft) resetPendingConf() { r.pendingConf = false } - -func (r *raft) addNode(id uint64) { - if _, ok := r.prs[id]; ok { - // Ignore any redundant addNode calls (which can happen because the - // initial bootstrapping entries are applied twice). - return - } - - r.setProgress(id, 0, r.raftLog.lastIndex()+1) - r.pendingConf = false -} - -func (r *raft) removeNode(id uint64) { - r.delProgress(id) - r.pendingConf = false -} - type stepFunc func(r *raft, m pb.Message) func stepLeader(r *raft, m pb.Message) { @@ -597,6 +542,33 @@ func stepFollower(r *raft, m pb.Message) { } } +func (r *raft) handleAppendEntries(m pb.Message) { + if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) + } else { + log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", + r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true}) + } +} + +func (r *raft) handleHeartbeat(m pb.Message) { + r.raftLog.commitTo(m.Commit) +} + +func (r *raft) handleSnapshot(m pb.Message) { + sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term + if r.restore(m.Snapshot) { + log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]", + r.id, r.Commit, sindex, sterm) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) + } else { + log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]", + r.id, r.Commit, sindex, sterm) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) + } +} + // restore recovers the statemachine from a snapshot. It restores the log and the // configuration of statemachine. func (r *raft) restore(s pb.Snapshot) bool { @@ -632,15 +604,31 @@ func (r *raft) needSnapshot(i uint64) bool { return i < r.raftLog.firstIndex() } -func (r *raft) nodes() []uint64 { - nodes := make([]uint64, 0, len(r.prs)) - for k := range r.prs { - nodes = append(nodes, k) - } - sort.Sort(uint64Slice(nodes)) - return nodes +// promotable indicates whether state machine can be promoted to leader, +// which is true when its own id is in progress list. +func (r *raft) promotable() bool { + _, ok := r.prs[r.id] + return ok } +func (r *raft) addNode(id uint64) { + if _, ok := r.prs[id]; ok { + // Ignore any redundant addNode calls (which can happen because the + // initial bootstrapping entries are applied twice). + return + } + + r.setProgress(id, 0, r.raftLog.lastIndex()+1) + r.pendingConf = false +} + +func (r *raft) removeNode(id uint64) { + r.delProgress(id) + r.pendingConf = false +} + +func (r *raft) resetPendingConf() { r.pendingConf = false } + func (r *raft) setProgress(id, match, next uint64) { r.prs[id] = &progress{next: next, match: match} } @@ -649,13 +637,6 @@ func (r *raft) delProgress(id uint64) { delete(r.prs, id) } -// promotable indicates whether state machine can be promoted to leader, -// which is true when its own id is in progress list. -func (r *raft) promotable() bool { - _, ok := r.prs[r.id] - return ok -} - func (r *raft) loadState(state pb.HardState) { if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() { log.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())