raft: group step funcs

This commit is contained in:
Xiang Li 2014-12-08 15:29:54 -08:00
parent 099f4f10ea
commit ba45637ba3

View File

@ -439,33 +439,6 @@ func (r *raft) Step(m pb.Message) error {
return nil
}
func (r *raft) handleAppendEntries(m pb.Message) {
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
}
}
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
}
func (r *raft) handleSnapshot(m pb.Message) {
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) {
log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
}
type stepFunc func(r *raft, m pb.Message)
func stepLeader(r *raft, m pb.Message) {
@ -569,6 +542,33 @@ func stepFollower(r *raft, m pb.Message) {
}
}
func (r *raft) handleAppendEntries(m pb.Message) {
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
}
}
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
}
func (r *raft) handleSnapshot(m pb.Message) {
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) {
log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
}
// restore recovers the statemachine from a snapshot. It restores the log and the
// configuration of statemachine.
func (r *raft) restore(s pb.Snapshot) bool {