diff --git a/raft/raft.go b/raft/raft.go deleted file mode 100644 index f550d0570..000000000 --- a/raft/raft.go +++ /dev/null @@ -1,593 +0,0 @@ -package raft - -import ( - "errors" - "fmt" - "sort" - "sync/atomic" -) - -const none = -1 - -type messageType int64 - -const ( - msgHup messageType = iota - msgBeat - msgProp - msgApp - msgAppResp - msgVote - msgVoteResp - msgSnap - msgDenied -) - -var mtmap = [...]string{ - msgHup: "msgHup", - msgBeat: "msgBeat", - msgProp: "msgProp", - msgApp: "msgApp", - msgAppResp: "msgAppResp", - msgVote: "msgVote", - msgVoteResp: "msgVoteResp", - msgSnap: "msgSnap", - msgDenied: "msgDenied", -} - -func (mt messageType) String() string { - return mtmap[int64(mt)] -} - -var errNoLeader = errors.New("no leader") - -const ( - stateFollower stateType = iota - stateCandidate - stateLeader -) - -type stateType int64 - -var stmap = [...]string{ - stateFollower: "stateFollower", - stateCandidate: "stateCandidate", - stateLeader: "stateLeader", -} - -var stepmap = [...]stepFunc{ - stateFollower: stepFollower, - stateCandidate: stepCandidate, - stateLeader: stepLeader, -} - -func (st stateType) String() string { - return stmap[int64(st)] -} - -var EmptyState = State{} - -type Message struct { - Type messageType - ClusterId int64 - To int64 - From int64 - Term int64 - LogTerm int64 - Index int64 - Entries []Entry - Commit int64 - Snapshot Snapshot -} - -func (m Message) IsMsgApp() bool { - return m.Type == msgApp -} - -func (m Message) String() string { - return fmt.Sprintf("type=%v from=%x to=%x term=%d logTerm=%d i=%d ci=%d len(ents)=%d", - m.Type, m.From, m.To, m.Term, m.LogTerm, m.Index, m.Commit, len(m.Entries)) -} - -type index struct { - match, next int64 -} - -func (in *index) update(n int64) { - in.match = n - in.next = n + 1 -} - -func (in *index) decr() { - if in.next--; in.next < 1 { - in.next = 1 - } -} - -func (in *index) String() string { - return fmt.Sprintf("n=%d m=%d", in.next, in.match) -} - -// An AtomicInt is an int64 to be accessed atomically. -type atomicInt int64 - -func (i *atomicInt) Set(n int64) { - atomic.StoreInt64((*int64)(i), n) -} - -func (i *atomicInt) Get() int64 { - return atomic.LoadInt64((*int64)(i)) -} - -// int64Slice implements sort interface -type int64Slice []int64 - -func (p int64Slice) Len() int { return len(p) } -func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -type stateMachine struct { - clusterId int64 - id int64 - - // the term we are participating in at any time - term atomicInt - index atomicInt - - // who we voted for in term - vote int64 - - // the log - raftLog *raftLog - - ins map[int64]*index - - state stateType - - votes map[int64]bool - - msgs []Message - - // the leader id - lead atomicInt - - // pending reconfiguration - pendingConf bool - - unstableState State - - // promotable indicates whether state machine could be promoted. - // New machine has to wait until it has been added to the cluster, or it - // may become the leader of the cluster without it. - promotable bool -} - -func newStateMachine(id int64, peers []int64) *stateMachine { - if id == none { - panic("cannot use none id") - } - sm := &stateMachine{id: id, clusterId: none, lead: none, raftLog: newLog(), ins: make(map[int64]*index)} - for _, p := range peers { - sm.ins[p] = &index{} - } - sm.reset(0) - return sm -} - -func (sm *stateMachine) String() string { - s := fmt.Sprintf(`state=%v term=%d`, sm.state, sm.term) - switch sm.state { - case stateFollower: - s += fmt.Sprintf(" vote=%v lead=%v", sm.vote, sm.lead) - case stateCandidate: - s += fmt.Sprintf(` votes="%v"`, sm.votes) - case stateLeader: - s += fmt.Sprintf(` ins="%v"`, sm.ins) - } - return s -} - -func (sm *stateMachine) poll(id int64, v bool) (granted int) { - if _, ok := sm.votes[id]; !ok { - sm.votes[id] = v - } - for _, vv := range sm.votes { - if vv { - granted++ - } - } - return granted -} - -// send persists state to stable storage and then sends to its mailbox. -func (sm *stateMachine) send(m Message) { - m.ClusterId = sm.clusterId - m.From = sm.id - m.Term = sm.term.Get() - sm.msgs = append(sm.msgs, m) -} - -// sendAppend sends RRPC, with entries to the given peer. -func (sm *stateMachine) sendAppend(to int64) { - in := sm.ins[to] - m := Message{} - m.To = to - m.Index = in.next - 1 - if sm.needSnapshot(m.Index) { - m.Type = msgSnap - m.Snapshot = sm.raftLog.snapshot - } else { - m.Type = msgApp - m.LogTerm = sm.raftLog.term(in.next - 1) - m.Entries = sm.raftLog.entries(in.next) - m.Commit = sm.raftLog.committed - } - sm.send(m) -} - -// sendHeartbeat sends RRPC, without entries to the given peer. -func (sm *stateMachine) sendHeartbeat(to int64) { - in := sm.ins[to] - index := max(in.next-1, sm.raftLog.lastIndex()) - m := Message{ - To: to, - Type: msgApp, - Index: index, - LogTerm: sm.raftLog.term(index), - Commit: sm.raftLog.committed, - } - sm.send(m) -} - -// bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis. -func (sm *stateMachine) bcastAppend() { - for i := range sm.ins { - if i == sm.id { - continue - } - sm.sendAppend(i) - } -} - -// bcastHeartbeat sends RRPC, without entries to all the peers. -func (sm *stateMachine) bcastHeartbeat() { - for i := range sm.ins { - if i == sm.id { - continue - } - sm.sendHeartbeat(i) - } -} - -func (sm *stateMachine) maybeCommit() bool { - // TODO(bmizerany): optimize.. Currently naive - mis := make(int64Slice, 0, len(sm.ins)) - for i := range sm.ins { - mis = append(mis, sm.ins[i].match) - } - sort.Sort(sort.Reverse(mis)) - mci := mis[sm.q()-1] - - return sm.raftLog.maybeCommit(mci, sm.term.Get()) -} - -// nextEnts returns the appliable entries and updates the applied index -func (sm *stateMachine) nextEnts() (ents []Entry) { - ents = sm.raftLog.nextEnts() - sm.raftLog.resetNextEnts() - return ents -} - -func (sm *stateMachine) reset(term int64) { - sm.setTerm(term) - sm.lead.Set(none) - sm.setVote(none) - sm.votes = make(map[int64]bool) - for i := range sm.ins { - sm.ins[i] = &index{next: sm.raftLog.lastIndex() + 1} - if i == sm.id { - sm.ins[i].match = sm.raftLog.lastIndex() - } - } -} - -func (sm *stateMachine) q() int { - return len(sm.ins)/2 + 1 -} - -func (sm *stateMachine) appendEntry(e Entry) { - e.Term = sm.term.Get() - e.Index = sm.raftLog.lastIndex() + 1 - sm.index.Set(sm.raftLog.append(sm.raftLog.lastIndex(), e)) - sm.ins[sm.id].update(sm.raftLog.lastIndex()) - sm.maybeCommit() -} - -func (sm *stateMachine) becomeFollower(term int64, lead int64) { - sm.reset(term) - sm.lead.Set(lead) - sm.state = stateFollower - sm.pendingConf = false -} - -func (sm *stateMachine) becomeCandidate() { - // TODO(xiangli) remove the panic when the raft implementation is stable - if sm.state == stateLeader { - panic("invalid transition [leader -> candidate]") - } - sm.reset(sm.term.Get() + 1) - sm.setVote(sm.id) - sm.state = stateCandidate -} - -func (sm *stateMachine) becomeLeader() { - // TODO(xiangli) remove the panic when the raft implementation is stable - if sm.state == stateFollower { - panic("invalid transition [follower -> leader]") - } - sm.reset(sm.term.Get()) - sm.lead.Set(sm.id) - sm.state = stateLeader - - for _, e := range sm.raftLog.entries(sm.raftLog.committed + 1) { - if e.isConfig() { - sm.pendingConf = true - } - } - - sm.appendEntry(Entry{Type: Normal, Data: nil}) -} - -func (sm *stateMachine) Msgs() []Message { - msgs := sm.msgs - sm.msgs = make([]Message, 0) - - return msgs -} - -func (sm *stateMachine) Step(m Message) (ok bool) { - if m.Type == msgHup { - sm.becomeCandidate() - if sm.q() == sm.poll(sm.id, true) { - sm.becomeLeader() - return true - } - for i := range sm.ins { - if i == sm.id { - continue - } - lasti := sm.raftLog.lastIndex() - sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.raftLog.term(lasti)}) - } - return true - } - - switch { - case m.Term == 0: - // local message - case m.Term > sm.term.Get(): - lead := m.From - if m.Type == msgVote { - lead = none - } - sm.becomeFollower(m.Term, lead) - case m.Term < sm.term.Get(): - // ignore - return true - } - - return stepmap[sm.state](sm, m) -} - -func (sm *stateMachine) handleAppendEntries(m Message) { - if sm.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { - sm.index.Set(sm.raftLog.lastIndex()) - sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()}) - } else { - sm.send(Message{To: m.From, Type: msgAppResp, Index: -1}) - } -} - -func (sm *stateMachine) handleSnapshot(m Message) { - if sm.restore(m.Snapshot) { - sm.raftLog.unstableSnapshot = m.Snapshot - sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()}) - } else { - sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.committed}) - } -} - -func (sm *stateMachine) addNode(id int64) { - sm.addIns(id, 0, sm.raftLog.lastIndex()+1) - sm.pendingConf = false - if id == sm.id { - sm.promotable = true - } -} - -func (sm *stateMachine) removeNode(id int64) { - sm.deleteIns(id) - sm.pendingConf = false -} - -type stepFunc func(sm *stateMachine, m Message) bool - -func stepLeader(sm *stateMachine, m Message) bool { - switch m.Type { - case msgBeat: - sm.bcastHeartbeat() - case msgProp: - if len(m.Entries) != 1 { - panic("unexpected length(entries) of a msgProp") - } - e := m.Entries[0] - if e.isConfig() { - if sm.pendingConf { - return false - } - sm.pendingConf = true - } - sm.appendEntry(e) - sm.bcastAppend() - case msgAppResp: - if m.Index < 0 { - sm.ins[m.From].decr() - sm.sendAppend(m.From) - } else { - sm.ins[m.From].update(m.Index) - if sm.maybeCommit() { - sm.bcastAppend() - } - } - case msgVote: - sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) - } - return true -} - -func stepCandidate(sm *stateMachine, m Message) bool { - switch m.Type { - case msgProp: - return false - case msgApp: - sm.becomeFollower(sm.term.Get(), m.From) - sm.handleAppendEntries(m) - case msgSnap: - sm.becomeFollower(m.Term, m.From) - sm.handleSnapshot(m) - case msgVote: - sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) - case msgVoteResp: - gr := sm.poll(m.From, m.Index >= 0) - switch sm.q() { - case gr: - sm.becomeLeader() - sm.bcastAppend() - case len(sm.votes) - gr: - sm.becomeFollower(sm.term.Get(), none) - } - } - return true -} - -func stepFollower(sm *stateMachine, m Message) bool { - switch m.Type { - case msgProp: - if sm.lead.Get() == none { - return false - } - m.To = sm.lead.Get() - sm.send(m) - case msgApp: - sm.lead.Set(m.From) - sm.handleAppendEntries(m) - case msgSnap: - sm.handleSnapshot(m) - case msgVote: - if (sm.vote == none || sm.vote == m.From) && sm.raftLog.isUpToDate(m.Index, m.LogTerm) { - sm.setVote(m.From) - sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.raftLog.lastIndex()}) - } else { - sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) - } - } - return true -} - -func (sm *stateMachine) compact(d []byte) { - sm.raftLog.snap(d, sm.clusterId, sm.raftLog.applied, sm.raftLog.term(sm.raftLog.applied), sm.nodes()) - sm.raftLog.compact(sm.raftLog.applied) -} - -// restore recovers the statemachine from a snapshot. It restores the log and the -// configuration of statemachine. -func (sm *stateMachine) restore(s Snapshot) bool { - if s.Index <= sm.raftLog.committed { - return false - } - - sm.raftLog.restore(s) - sm.index.Set(sm.raftLog.lastIndex()) - sm.clusterId = s.ClusterId - sm.ins = make(map[int64]*index) - for _, n := range s.Nodes { - if n == sm.id { - sm.addIns(n, sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1) - sm.promotable = true - } else { - sm.addIns(n, 0, sm.raftLog.lastIndex()+1) - } - } - sm.pendingConf = false - return true -} - -func (sm *stateMachine) needSnapshot(i int64) bool { - if i < sm.raftLog.offset { - if sm.raftLog.snapshot.IsEmpty() { - panic("need non-empty snapshot") - } - return true - } - return false -} - -func (sm *stateMachine) nodes() []int64 { - nodes := make([]int64, 0, len(sm.ins)) - for k := range sm.ins { - nodes = append(nodes, k) - } - return nodes -} - -func (sm *stateMachine) setTerm(term int64) { - sm.term.Set(term) - sm.saveState() -} - -func (sm *stateMachine) setVote(vote int64) { - sm.vote = vote - sm.saveState() -} - -func (sm *stateMachine) addIns(id, match, next int64) { - sm.ins[id] = &index{next: next, match: match} - sm.saveState() -} - -func (sm *stateMachine) deleteIns(id int64) { - delete(sm.ins, id) - sm.saveState() -} - -// saveState saves the state to sm.unstableState -// When there is a term change, vote change or configuration change, raft -// must call saveState. -func (sm *stateMachine) saveState() { - sm.setState(sm.vote, sm.term.Get(), sm.raftLog.committed) -} - -func (sm *stateMachine) clearState() { - sm.setState(0, 0, 0) -} - -func (sm *stateMachine) setState(vote, term, commit int64) { - sm.unstableState.Vote = vote - sm.unstableState.Term = term - sm.unstableState.Commit = commit -} - -func (sm *stateMachine) loadEnts(ents []Entry) { - sm.raftLog.append(sm.raftLog.lastIndex(), ents...) - sm.raftLog.unstable = sm.raftLog.lastIndex() + 1 -} - -func (sm *stateMachine) loadState(state State) { - sm.raftLog.committed = state.Commit - sm.setTerm(state.Term) - sm.setVote(state.Vote) -} - -func (s *State) IsEmpty() bool { - return s.Term == 0 -} diff --git a/raft/diff_test.go b/raft2/diff_test.go similarity index 100% rename from raft/diff_test.go rename to raft2/diff_test.go diff --git a/raft/entry.pb.go b/raft2/entry.pb.go similarity index 100% rename from raft/entry.pb.go rename to raft2/entry.pb.go diff --git a/raft/info.pb.go b/raft2/info.pb.go similarity index 100% rename from raft/info.pb.go rename to raft2/info.pb.go diff --git a/raft/log.go b/raft2/log.go similarity index 100% rename from raft/log.go rename to raft2/log.go diff --git a/raft/log_test.go b/raft2/log_test.go similarity index 100% rename from raft/log_test.go rename to raft2/log_test.go diff --git a/raft2/node.go b/raft2/node.go index 300684706..0ca4c34b1 100644 --- a/raft2/node.go +++ b/raft2/node.go @@ -9,14 +9,9 @@ type stateResp struct { msgs []Message } -type proposal struct { - id int64 - data []byte -} - type Node struct { ctx context.Context - propc chan proposal + propc chan []byte recvc chan Message statec chan stateResp tickc chan struct{} @@ -25,7 +20,7 @@ type Node struct { func Start(ctx context.Context, name string, election, heartbeat int) *Node { n := &Node{ ctx: ctx, - propc: make(chan proposal), + propc: make(chan []byte), recvc: make(chan Message), statec: make(chan stateResp), tickc: make(chan struct{}), @@ -54,13 +49,13 @@ func (n *Node) run(r *raft) { select { case p := <-propc: - r.propose(p.id, p.data) + r.propose(p) case m := <-n.recvc: - r.step(m) + r.Step(m) // raft never returns an error case <-n.tickc: - r.tick() - case n.statec <- stateResp{r.State, r.ents, r.msgs}: - r.resetState() + // r.tick() + // case n.statec <- stateResp{r.State, r.ents, r.msgs}: + // r.resetState() case <-n.ctx.Done(): return } @@ -77,9 +72,9 @@ func (n *Node) Tick() error { } // Propose proposes data be appended to the log. -func (n *Node) Propose(id int64, data []byte) error { +func (n *Node) Propose(data []byte) error { select { - case n.propc <- proposal{id, data}: + case n.propc <- data: return nil case <-n.ctx.Done(): return n.ctx.Err() diff --git a/raft2/raft.go b/raft2/raft.go index 616768c33..d41fbeec8 100644 --- a/raft2/raft.go +++ b/raft2/raft.go @@ -1,35 +1,603 @@ package raft -type State struct { - CommitIndex int64 +import ( + "errors" + "fmt" + "sort" + "sync/atomic" +) + +const none = -1 + +type messageType int64 + +const ( + msgHup messageType = iota + msgBeat + msgProp + msgApp + msgAppResp + msgVote + msgVoteResp + msgSnap + msgDenied +) + +var mtmap = [...]string{ + msgHup: "msgHup", + msgBeat: "msgBeat", + msgProp: "msgProp", + msgApp: "msgApp", + msgAppResp: "msgAppResp", + msgVote: "msgVote", + msgVoteResp: "msgVoteResp", + msgSnap: "msgSnap", + msgDenied: "msgDenied", } +func (mt messageType) String() string { + return mtmap[int64(mt)] +} + +var errNoLeader = errors.New("no leader") + +const ( + stateFollower stateType = iota + stateCandidate + stateLeader +) + +type stateType int64 + +var stmap = [...]string{ + stateFollower: "stateFollower", + stateCandidate: "stateCandidate", + stateLeader: "stateLeader", +} + +var stepmap = [...]stepFunc{ + stateFollower: stepFollower, + stateCandidate: stepCandidate, + stateLeader: stepLeader, +} + +func (st stateType) String() string { + return stmap[int64(st)] +} + +var EmptyState = State{} + type Message struct { - State State - To string - Data []byte + Type messageType + ClusterId int64 + To int64 + From int64 + Term int64 + LogTerm int64 + Index int64 + Entries []Entry + Commit int64 + Snapshot Snapshot } -type Entry struct { - Id int64 - Index int64 - Data []byte +func (m Message) IsMsgApp() bool { + return m.Type == msgApp } +func (m Message) String() string { + return fmt.Sprintf("type=%v from=%x to=%x term=%d logTerm=%d i=%d ci=%d len(ents)=%d", + m.Type, m.From, m.To, m.Term, m.LogTerm, m.Index, m.Commit, len(m.Entries)) +} + +type index struct { + match, next int64 +} + +func (in *index) update(n int64) { + in.match = n + in.next = n + 1 +} + +func (in *index) decr() { + if in.next--; in.next < 1 { + in.next = 1 + } +} + +func (in *index) String() string { + return fmt.Sprintf("n=%d m=%d", in.next, in.match) +} + +// An AtomicInt is an int64 to be accessed atomically. +type atomicInt int64 + +func (i *atomicInt) Set(n int64) { + atomic.StoreInt64((*int64)(i), n) +} + +func (i *atomicInt) Get() int64 { + return atomic.LoadInt64((*int64)(i)) +} + +// int64Slice implements sort interface +type int64Slice []int64 + +func (p int64Slice) Len() int { return len(p) } +func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + type raft struct { - name string - - State - + // --- new stuff --- + name string election int heartbeat int + // ----------------- + + clusterId int64 + id int64 + + // the term we are participating in at any time + term atomicInt + index atomicInt + + // who we voted for in term + vote int64 + + // the log + raftLog *raftLog + + ins map[int64]*index + + state stateType + + votes map[int64]bool msgs []Message - ents []Entry + + // the leader id + lead atomicInt + + // pending reconfiguration + pendingConf bool + + unstableState State + + // promotable indicates whether state machine could be promoted. + // New machine has to wait until it has been added to the cluster, or it + // may become the leader of the cluster without it. + promotable bool } -func (sm *raft) hasLeader() bool { return false } -func (sm *raft) step(m Message) {} -func (sm *raft) resetState() {} -func (sm *raft) propose(id int64, data []byte) {} -func (sm *raft) tick() {} +func newStateMachine(id int64, peers []int64) *raft { + if id == none { + panic("cannot use none id") + } + sm := &raft{id: id, clusterId: none, lead: none, raftLog: newLog(), ins: make(map[int64]*index)} + for _, p := range peers { + sm.ins[p] = &index{} + } + sm.reset(0) + return sm +} + +func (r *raft) hasLeader() bool { return r.state != stateCandidate } + +func (r *raft) propose(data []byte) { + r.Step(Message{From: r.id, Type: msgProp, Entries: []Entry{{Data: data}}}) +} + +func (sm *raft) String() string { + s := fmt.Sprintf(`state=%v term=%d`, sm.state, sm.term) + switch sm.state { + case stateFollower: + s += fmt.Sprintf(" vote=%v lead=%v", sm.vote, sm.lead) + case stateCandidate: + s += fmt.Sprintf(` votes="%v"`, sm.votes) + case stateLeader: + s += fmt.Sprintf(` ins="%v"`, sm.ins) + } + return s +} + +func (sm *raft) poll(id int64, v bool) (granted int) { + if _, ok := sm.votes[id]; !ok { + sm.votes[id] = v + } + for _, vv := range sm.votes { + if vv { + granted++ + } + } + return granted +} + +// send persists state to stable storage and then sends to its mailbox. +func (sm *raft) send(m Message) { + m.ClusterId = sm.clusterId + m.From = sm.id + m.Term = sm.term.Get() + sm.msgs = append(sm.msgs, m) +} + +// sendAppend sends RRPC, with entries to the given peer. +func (sm *raft) sendAppend(to int64) { + in := sm.ins[to] + m := Message{} + m.To = to + m.Index = in.next - 1 + if sm.needSnapshot(m.Index) { + m.Type = msgSnap + m.Snapshot = sm.raftLog.snapshot + } else { + m.Type = msgApp + m.LogTerm = sm.raftLog.term(in.next - 1) + m.Entries = sm.raftLog.entries(in.next) + m.Commit = sm.raftLog.committed + } + sm.send(m) +} + +// sendHeartbeat sends RRPC, without entries to the given peer. +func (sm *raft) sendHeartbeat(to int64) { + in := sm.ins[to] + index := max(in.next-1, sm.raftLog.lastIndex()) + m := Message{ + To: to, + Type: msgApp, + Index: index, + LogTerm: sm.raftLog.term(index), + Commit: sm.raftLog.committed, + } + sm.send(m) +} + +// bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis. +func (sm *raft) bcastAppend() { + for i := range sm.ins { + if i == sm.id { + continue + } + sm.sendAppend(i) + } +} + +// bcastHeartbeat sends RRPC, without entries to all the peers. +func (sm *raft) bcastHeartbeat() { + for i := range sm.ins { + if i == sm.id { + continue + } + sm.sendHeartbeat(i) + } +} + +func (sm *raft) maybeCommit() bool { + // TODO(bmizerany): optimize.. Currently naive + mis := make(int64Slice, 0, len(sm.ins)) + for i := range sm.ins { + mis = append(mis, sm.ins[i].match) + } + sort.Sort(sort.Reverse(mis)) + mci := mis[sm.q()-1] + + return sm.raftLog.maybeCommit(mci, sm.term.Get()) +} + +// nextEnts returns the appliable entries and updates the applied index +func (sm *raft) nextEnts() (ents []Entry) { + ents = sm.raftLog.nextEnts() + sm.raftLog.resetNextEnts() + return ents +} + +func (sm *raft) reset(term int64) { + sm.setTerm(term) + sm.lead.Set(none) + sm.setVote(none) + sm.votes = make(map[int64]bool) + for i := range sm.ins { + sm.ins[i] = &index{next: sm.raftLog.lastIndex() + 1} + if i == sm.id { + sm.ins[i].match = sm.raftLog.lastIndex() + } + } +} + +func (sm *raft) q() int { + return len(sm.ins)/2 + 1 +} + +func (sm *raft) appendEntry(e Entry) { + e.Term = sm.term.Get() + e.Index = sm.raftLog.lastIndex() + 1 + sm.index.Set(sm.raftLog.append(sm.raftLog.lastIndex(), e)) + sm.ins[sm.id].update(sm.raftLog.lastIndex()) + sm.maybeCommit() +} + +func (sm *raft) becomeFollower(term int64, lead int64) { + sm.reset(term) + sm.lead.Set(lead) + sm.state = stateFollower + sm.pendingConf = false +} + +func (sm *raft) becomeCandidate() { + // TODO(xiangli) remove the panic when the raft implementation is stable + if sm.state == stateLeader { + panic("invalid transition [leader -> candidate]") + } + sm.reset(sm.term.Get() + 1) + sm.setVote(sm.id) + sm.state = stateCandidate +} + +func (sm *raft) becomeLeader() { + // TODO(xiangli) remove the panic when the raft implementation is stable + if sm.state == stateFollower { + panic("invalid transition [follower -> leader]") + } + sm.reset(sm.term.Get()) + sm.lead.Set(sm.id) + sm.state = stateLeader + + for _, e := range sm.raftLog.entries(sm.raftLog.committed + 1) { + if e.isConfig() { + sm.pendingConf = true + } + } + + sm.appendEntry(Entry{Type: Normal, Data: nil}) +} + +func (sm *raft) ReadMessages() []Message { + msgs := sm.msgs + sm.msgs = make([]Message, 0) + + return msgs +} + +func (sm *raft) Step(m Message) error { + if m.Type == msgHup { + sm.becomeCandidate() + if sm.q() == sm.poll(sm.id, true) { + sm.becomeLeader() + } + for i := range sm.ins { + if i == sm.id { + continue + } + lasti := sm.raftLog.lastIndex() + sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.raftLog.term(lasti)}) + } + } + + switch { + case m.Term == 0: + // local message + case m.Term > sm.term.Get(): + lead := m.From + if m.Type == msgVote { + lead = none + } + sm.becomeFollower(m.Term, lead) + case m.Term < sm.term.Get(): + // ignore + } + + stepmap[sm.state](sm, m) + return nil +} + +func (sm *raft) handleAppendEntries(m Message) { + if sm.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { + sm.index.Set(sm.raftLog.lastIndex()) + sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()}) + } else { + sm.send(Message{To: m.From, Type: msgAppResp, Index: -1}) + } +} + +func (sm *raft) handleSnapshot(m Message) { + if sm.restore(m.Snapshot) { + sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()}) + } else { + sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.committed}) + } +} + +func (sm *raft) addNode(id int64) { + sm.addIns(id, 0, sm.raftLog.lastIndex()+1) + sm.pendingConf = false + if id == sm.id { + sm.promotable = true + } +} + +func (sm *raft) removeNode(id int64) { + sm.deleteIns(id) + sm.pendingConf = false +} + +type stepFunc func(sm *raft, m Message) bool + +func stepLeader(sm *raft, m Message) bool { + switch m.Type { + case msgBeat: + sm.bcastHeartbeat() + case msgProp: + if len(m.Entries) != 1 { + panic("unexpected length(entries) of a msgProp") + } + e := m.Entries[0] + if e.isConfig() { + if sm.pendingConf { + return false + } + sm.pendingConf = true + } + sm.appendEntry(e) + sm.bcastAppend() + case msgAppResp: + if m.Index < 0 { + sm.ins[m.From].decr() + sm.sendAppend(m.From) + } else { + sm.ins[m.From].update(m.Index) + if sm.maybeCommit() { + sm.bcastAppend() + } + } + case msgVote: + sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) + } + return true +} + +func stepCandidate(sm *raft, m Message) bool { + switch m.Type { + case msgProp: + return false + case msgApp: + sm.becomeFollower(sm.term.Get(), m.From) + sm.handleAppendEntries(m) + case msgSnap: + sm.becomeFollower(m.Term, m.From) + sm.handleSnapshot(m) + case msgVote: + sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) + case msgVoteResp: + gr := sm.poll(m.From, m.Index >= 0) + switch sm.q() { + case gr: + sm.becomeLeader() + sm.bcastAppend() + case len(sm.votes) - gr: + sm.becomeFollower(sm.term.Get(), none) + } + } + return true +} + +func stepFollower(sm *raft, m Message) bool { + switch m.Type { + case msgProp: + if sm.lead.Get() == none { + return false + } + m.To = sm.lead.Get() + sm.send(m) + case msgApp: + sm.lead.Set(m.From) + sm.handleAppendEntries(m) + case msgSnap: + sm.handleSnapshot(m) + case msgVote: + if (sm.vote == none || sm.vote == m.From) && sm.raftLog.isUpToDate(m.Index, m.LogTerm) { + sm.setVote(m.From) + sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.raftLog.lastIndex()}) + } else { + sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) + } + } + return true +} + +func (sm *raft) compact(d []byte) { + sm.raftLog.snap(d, sm.raftLog.applied, sm.raftLog.term(sm.raftLog.applied), sm.nodes()) + sm.raftLog.compact(sm.raftLog.applied) +} + +// restore recovers the statemachine from a snapshot. It restores the log and the +// configuration of statemachine. +func (sm *raft) restore(s Snapshot) bool { + if s.Index <= sm.raftLog.committed { + return false + } + + sm.raftLog.restore(s) + sm.index.Set(sm.raftLog.lastIndex()) + sm.ins = make(map[int64]*index) + for _, n := range s.Nodes { + if n == sm.id { + sm.addIns(n, sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1) + } else { + sm.addIns(n, 0, sm.raftLog.lastIndex()+1) + } + } + sm.pendingConf = false + return true +} + +func (sm *raft) needSnapshot(i int64) bool { + if i < sm.raftLog.offset { + if sm.raftLog.snapshot.IsEmpty() { + panic("need non-empty snapshot") + } + return true + } + return false +} + +func (sm *raft) nodes() []int64 { + nodes := make([]int64, 0, len(sm.ins)) + for k := range sm.ins { + nodes = append(nodes, k) + } + return nodes +} + +func (sm *raft) setTerm(term int64) { + sm.term.Set(term) + sm.saveState() +} + +func (sm *raft) setVote(vote int64) { + sm.vote = vote + sm.saveState() +} + +func (sm *raft) addIns(id, match, next int64) { + sm.ins[id] = &index{next: next, match: match} + sm.saveState() +} + +func (sm *raft) deleteIns(id int64) { + delete(sm.ins, id) + sm.saveState() +} + +// saveState saves the state to sm.unstableState +// When there is a term change, vote change or configuration change, raft +// must call saveState. +func (sm *raft) saveState() { + sm.setState(sm.vote, sm.term.Get(), sm.raftLog.committed) +} + +func (sm *raft) clearState() { + sm.setState(0, 0, 0) +} + +func (sm *raft) setState(vote, term, commit int64) { + sm.unstableState.Vote = vote + sm.unstableState.Term = term + sm.unstableState.Commit = commit +} + +func (sm *raft) loadEnts(ents []Entry) { + if !sm.raftLog.isEmpty() { + panic("cannot load entries when log is not empty") + } + sm.raftLog.append(0, ents...) + sm.raftLog.unstable = sm.raftLog.lastIndex() + 1 +} + +func (sm *raft) loadState(state State) { + sm.raftLog.committed = state.Commit + sm.setTerm(state.Term) + sm.setVote(state.Vote) +} + +func (s *State) IsEmpty() bool { + return s.Term == 0 +} diff --git a/raft/raft_test.go b/raft2/raft_test.go similarity index 95% rename from raft/raft_test.go rename to raft2/raft_test.go index dd628c34d..1a36ded9e 100644 --- a/raft/raft_test.go +++ b/raft2/raft_test.go @@ -8,6 +8,11 @@ import ( "testing" ) +type Interface interface { + Step(m Message) error + ReadMessages() []Message +} + func TestLeaderElection(t *testing.T) { tests := []struct { *network @@ -28,7 +33,7 @@ func TestLeaderElection(t *testing.T) { for i, tt := range tests { tt.send(Message{From: 0, To: 0, Type: msgHup}) - sm := tt.network.peers[0].(*stateMachine) + sm := tt.network.peers[0].(*raft) if sm.state != tt.state { t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state) } @@ -71,7 +76,7 @@ func TestLogReplication(t *testing.T) { } for j, x := range tt.network.peers { - sm := x.(*stateMachine) + sm := x.(*raft) if sm.raftLog.committed != tt.wcommitted { t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted) @@ -104,7 +109,7 @@ func TestSingleNodeCommit(t *testing.T) { tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) - sm := tt.peers[0].(*stateMachine) + sm := tt.peers[0].(*raft) if sm.raftLog.committed != 3 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3) } @@ -125,7 +130,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) - sm := tt.peers[0].(*stateMachine) + sm := tt.peers[0].(*raft) if sm.raftLog.committed != 1 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1) } @@ -139,7 +144,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt.send(Message{From: 1, To: 1, Type: msgHup}) // no log entries from previous term should be committed - sm = tt.peers[1].(*stateMachine) + sm = tt.peers[1].(*raft) if sm.raftLog.committed != 1 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1) } @@ -177,7 +182,7 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) - sm := tt.peers[0].(*stateMachine) + sm := tt.peers[0].(*raft) if sm.raftLog.committed != 1 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1) } @@ -211,7 +216,7 @@ func TestDuelingCandidates(t *testing.T) { wlog := &raftLog{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1, Index: 1}}, committed: 1} tests := []struct { - sm *stateMachine + sm *raft state stateType term int64 raftLog *raftLog @@ -229,7 +234,7 @@ func TestDuelingCandidates(t *testing.T) { t.Errorf("#%d: term = %d, want %d", i, g, tt.term) } base := ltoa(tt.raftLog) - if sm, ok := nt.peers[int64(i)].(*stateMachine); ok { + if sm, ok := nt.peers[int64(i)].(*raft); ok { l := ltoa(sm.raftLog) if g := diffu(base, l); g != "" { t.Errorf("#%d: diff:\n%s", i, g) @@ -254,7 +259,7 @@ func TestCandidateConcede(t *testing.T) { // send a proposal to 2 to flush out a msgApp to 0 tt.send(Message{From: 2, To: 2, Type: msgProp, Entries: []Entry{{Data: data}}}) - a := tt.peers[0].(*stateMachine) + a := tt.peers[0].(*raft) if g := a.state; g != stateFollower { t.Errorf("state = %s, want %s", g, stateFollower) } @@ -263,7 +268,7 @@ func TestCandidateConcede(t *testing.T) { } wantLog := ltoa(&raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}) for i, p := range tt.peers { - if sm, ok := p.(*stateMachine); ok { + if sm, ok := p.(*raft); ok { l := ltoa(sm.raftLog) if g := diffu(wantLog, l); g != "" { t.Errorf("#%d: diff:\n%s", i, g) @@ -278,7 +283,7 @@ func TestSingleNodeCandidate(t *testing.T) { tt := newNetwork(nil) tt.send(Message{From: 0, To: 0, Type: msgHup}) - sm := tt.peers[0].(*stateMachine) + sm := tt.peers[0].(*raft) if sm.state != stateLeader { t.Errorf("state = %d, want %d", sm.state, stateLeader) } @@ -302,7 +307,7 @@ func TestOldMessages(t *testing.T) { } base := ltoa(l) for i, p := range tt.peers { - if sm, ok := p.(*stateMachine); ok { + if sm, ok := p.(*raft); ok { l := ltoa(sm.raftLog) if g := diffu(base, l); g != "" { t.Errorf("#%d: diff:\n%s", i, g) @@ -354,7 +359,7 @@ func TestProposal(t *testing.T) { } base := ltoa(wantLog) for i, p := range tt.peers { - if sm, ok := p.(*stateMachine); ok { + if sm, ok := p.(*raft); ok { l := ltoa(sm.raftLog) if g := diffu(base, l); g != "" { t.Errorf("#%d: diff:\n%s", i, g) @@ -363,7 +368,7 @@ func TestProposal(t *testing.T) { t.Logf("#%d: empty log", i) } } - sm := tt.network.peers[0].(*stateMachine) + sm := tt.network.peers[0].(*raft) if g := sm.term.Get(); g != 1 { t.Errorf("#%d: term = %d, want %d", i, g, 1) } @@ -387,7 +392,7 @@ func TestProposalByProxy(t *testing.T) { wantLog := &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2} base := ltoa(wantLog) for i, p := range tt.peers { - if sm, ok := p.(*stateMachine); ok { + if sm, ok := p.(*raft); ok { l := ltoa(sm.raftLog) if g := diffu(base, l); g != "" { t.Errorf("#%d: diff:\n%s", i, g) @@ -396,7 +401,7 @@ func TestProposalByProxy(t *testing.T) { t.Logf("#%d: empty log", i) } } - sm := tt.peers[0].(*stateMachine) + sm := tt.peers[0].(*raft) if g := sm.term.Get(); g != 1 { t.Errorf("#%d: term = %d, want %d", i, g, 1) } @@ -436,7 +441,7 @@ func TestCommit(t *testing.T) { for j := 0; j < len(tt.matches); j++ { ins[int64(j)] = &index{tt.matches[j], tt.matches[j] + 1} } - sm := &stateMachine{raftLog: &raftLog{ents: tt.logs}, ins: ins, term: atomicInt(tt.smTerm)} + sm := &raft{raftLog: &raftLog{ents: tt.logs}, ins: ins, term: atomicInt(tt.smTerm)} sm.maybeCommit() if g := sm.raftLog.committed; g != tt.w { t.Errorf("#%d: committed = %d, want %d", i, g, tt.w) @@ -473,7 +478,7 @@ func TestHandleMsgApp(t *testing.T) { } for i, tt := range tests { - sm := &stateMachine{ + sm := &raft{ state: stateFollower, term: 2, raftLog: &raftLog{committed: 0, ents: []Entry{{}, {Term: 1}, {Term: 2}}}, @@ -486,7 +491,7 @@ func TestHandleMsgApp(t *testing.T) { if sm.raftLog.committed != tt.wCommit { t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) } - m := sm.Msgs() + m := sm.ReadMessages() if len(m) != 1 { t.Errorf("#%d: msg = nil, want 1") } @@ -535,7 +540,7 @@ func TestRecvMsgVote(t *testing.T) { } for i, tt := range tests { - sm := &stateMachine{ + sm := &raft{ state: tt.state, vote: tt.voteFor, raftLog: &raftLog{ents: []Entry{{}, {Term: 2}, {Term: 2}}}, @@ -543,7 +548,7 @@ func TestRecvMsgVote(t *testing.T) { sm.Step(Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term}) - msgs := sm.Msgs() + msgs := sm.ReadMessages() if g := len(msgs); g != 1 { t.Errorf("#%d: len(msgs) = %d, want 1", i, g) continue @@ -724,9 +729,9 @@ func TestLeaderAppResp(t *testing.T) { sm.raftLog = &raftLog{ents: []Entry{{}, {Term: 0}, {Term: 1}}} sm.becomeCandidate() sm.becomeLeader() - sm.Msgs() + sm.ReadMessages() sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term.Get()}) - msgs := sm.Msgs() + msgs := sm.ReadMessages() if len(msgs) != tt.wmsgNum { t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum) @@ -761,7 +766,7 @@ func TestRecvMsgBeat(t *testing.T) { sm.state = tt.state sm.Step(Message{From: 0, To: 0, Type: msgBeat}) - msgs := sm.Msgs() + msgs := sm.ReadMessages() if len(msgs) != tt.wMsg { t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg) } @@ -826,7 +831,7 @@ func TestProvideSnap(t *testing.T) { sm.becomeLeader() sm.Step(Message{From: 0, To: 0, Type: msgBeat}) - msgs := sm.Msgs() + msgs := sm.ReadMessages() if len(msgs) != 1 { t.Errorf("len(msgs) = %d, want 1", len(msgs)) } @@ -840,7 +845,7 @@ func TestProvideSnap(t *testing.T) { sm.ins[1].next = sm.raftLog.offset sm.Step(Message{From: 1, To: 0, Type: msgAppResp, Index: -1}) - msgs = sm.Msgs() + msgs = sm.ReadMessages() if len(msgs) != 1 { t.Errorf("len(msgs) = %d, want 1", len(msgs)) } @@ -874,14 +879,14 @@ func TestSlowNodeRestore(t *testing.T) { for j := 0; j < defaultCompactThreshold+1; j++ { nt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{}}}) } - lead := nt.peers[0].(*stateMachine) + lead := nt.peers[0].(*raft) lead.nextEnts() lead.compact(nil) nt.recover() nt.send(Message{From: 0, To: 0, Type: msgBeat}) - follower := nt.peers[2].(*stateMachine) + follower := nt.peers[2].(*raft) if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) { t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot) } @@ -928,13 +933,13 @@ func TestUnstableState(t *testing.T) { sm.clearState() } -func ents(terms ...int64) *stateMachine { +func ents(terms ...int64) *raft { ents := []Entry{{}} for _, term := range terms { ents = append(ents, Entry{Term: term}) } - sm := &stateMachine{raftLog: &raftLog{ents: ents}} + sm := &raft{raftLog: &raftLog{ents: ents}} sm.reset(0) return sm } @@ -964,7 +969,7 @@ func newNetwork(peers ...Interface) *network { case nil: sm := newStateMachine(nid, defaultPeerAddrs) npeers[nid] = sm - case *stateMachine: + case *raft: v.id = nid v.ins = make(map[int64]*index) for i := 0; i < size; i++ { @@ -972,8 +977,6 @@ func newNetwork(peers ...Interface) *network { } v.reset(0) npeers[nid] = v - case *Node: - npeers[v.sm.id] = v default: npeers[nid] = v } @@ -990,7 +993,7 @@ func (nw *network) send(msgs ...Message) { m := msgs[0] p := nw.peers[m.To] p.Step(m) - msgs = append(msgs[1:], nw.filter(p.Msgs())...) + msgs = append(msgs[1:], nw.filter(p.ReadMessages())...) } } @@ -1049,7 +1052,7 @@ type connem struct { type blackHole struct{} -func (blackHole) Step(Message) bool { return true } -func (blackHole) Msgs() []Message { return nil } +func (blackHole) Step(Message) error { return nil } +func (blackHole) ReadMessages() []Message { return nil } var nopStepper = &blackHole{} diff --git a/raft/snapshot.go b/raft2/snapshot.go similarity index 100% rename from raft/snapshot.go rename to raft2/snapshot.go diff --git a/raft/state.pb.go b/raft2/state.pb.go similarity index 100% rename from raft/state.pb.go rename to raft2/state.pb.go diff --git a/raft/state.proto b/raft2/state.proto similarity index 100% rename from raft/state.proto rename to raft2/state.proto