Merge pull request #1889 from xiang90/chord_raft

Chord raft
This commit is contained in:
Xiang Li 2014-12-08 16:35:42 -08:00
commit 325e768c7b

View File

@ -56,9 +56,7 @@ func (st StateType) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("%q", st.String())), nil
}
type progress struct {
match, next uint64
}
type progress struct{ match, next uint64 }
func (pr *progress) update(n uint64) {
if pr.match < n {
@ -69,9 +67,7 @@ func (pr *progress) update(n uint64) {
}
}
func (pr *progress) optimisticUpdate(n uint64) {
pr.next = n + 1
}
func (pr *progress) optimisticUpdate(n uint64) { pr.next = n + 1 }
// maybeDecrTo returns false if the given to index comes from an out of order message.
// Otherwise it decreases the progress next index and returns true.
@ -183,34 +179,15 @@ func (r *raft) softState() *SoftState {
return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
}
func (r *raft) String() string {
s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
switch r.state {
case StateFollower:
s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
case StateCandidate:
s += fmt.Sprintf(` votes="%v"`, r.votes)
case StateLeader:
s += fmt.Sprintf(` prs="%v"`, r.prs)
}
return s
}
func (r *raft) q() int { return len(r.prs)/2 + 1 }
func (r *raft) poll(id uint64, v bool) (granted int) {
if v {
log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term)
} else {
log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term)
func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs))
for k := range r.prs {
nodes = append(nodes, k)
}
if _, ok := r.votes[id]; !ok {
r.votes[id] = v
}
for _, vv := range r.votes {
if vv {
granted++
}
}
return granted
sort.Sort(uint64Slice(nodes))
return nodes
}
// send persists state to stable storage and then sends to its mailbox.
@ -323,10 +300,6 @@ func (r *raft) reset(term uint64) {
r.pendingConf = false
}
func (r *raft) q() int {
return len(r.prs)/2 + 1
}
func (r *raft) appendEntry(e pb.Entry) {
e.Term = r.Term
e.Index = r.raftLog.lastIndex() + 1
@ -418,6 +391,23 @@ func (r *raft) campaign() {
}
}
func (r *raft) poll(id uint64, v bool) (granted int) {
if v {
log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term)
} else {
log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term)
}
if _, ok := r.votes[id]; !ok {
r.votes[id] = v
}
for _, vv := range r.votes {
if vv {
granted++
}
}
return granted
}
func (r *raft) Step(m pb.Message) error {
// TODO(bmizerany): this likely allocs - prevent that.
defer func() { r.Commit = r.raftLog.committed }()
@ -449,51 +439,6 @@ func (r *raft) Step(m pb.Message) error {
return nil
}
func (r *raft) handleAppendEntries(m pb.Message) {
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
}
}
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
}
func (r *raft) handleSnapshot(m pb.Message) {
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) {
log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
}
func (r *raft) resetPendingConf() { r.pendingConf = false }
func (r *raft) addNode(id uint64) {
if _, ok := r.prs[id]; ok {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
r.pendingConf = false
}
func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false
}
type stepFunc func(r *raft, m pb.Message)
func stepLeader(r *raft, m pb.Message) {
@ -597,6 +542,33 @@ func stepFollower(r *raft, m pb.Message) {
}
}
func (r *raft) handleAppendEntries(m pb.Message) {
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
}
}
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
}
func (r *raft) handleSnapshot(m pb.Message) {
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) {
log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
}
// restore recovers the statemachine from a snapshot. It restores the log and the
// configuration of statemachine.
func (r *raft) restore(s pb.Snapshot) bool {
@ -632,15 +604,31 @@ func (r *raft) needSnapshot(i uint64) bool {
return i < r.raftLog.firstIndex()
}
func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs))
for k := range r.prs {
nodes = append(nodes, k)
}
sort.Sort(uint64Slice(nodes))
return nodes
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
_, ok := r.prs[r.id]
return ok
}
func (r *raft) addNode(id uint64) {
if _, ok := r.prs[id]; ok {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
r.pendingConf = false
}
func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false
}
func (r *raft) resetPendingConf() { r.pendingConf = false }
func (r *raft) setProgress(id, match, next uint64) {
r.prs[id] = &progress{next: next, match: match}
}
@ -649,13 +637,6 @@ func (r *raft) delProgress(id uint64) {
delete(r.prs, id)
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
_, ok := r.prs[r.id]
return ok
}
func (r *raft) loadState(state pb.HardState) {
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
log.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())