Merge pull request #8751 from siddontang/siddontang/raft_learner

raft: add raft learner
This commit is contained in:
Xiang Li 2017-11-11 18:43:10 -08:00 committed by GitHub
commit a8fde603b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 611 additions and 104 deletions

View File

@ -319,7 +319,7 @@ func (n *node) run(r *raft) {
r.Step(m) r.Step(m)
case m := <-n.recvc: case m := <-n.recvc:
// filter out response message from unknown From. // filter out response message from unknown From.
if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) { if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m) // raft never returns an error r.Step(m) // raft never returns an error
} }
case cc := <-n.confc: case cc := <-n.confc:
@ -334,6 +334,8 @@ func (n *node) run(r *raft) {
switch cc.Type { switch cc.Type {
case pb.ConfChangeAddNode: case pb.ConfChangeAddNode:
r.addNode(cc.NodeID) r.addNode(cc.NodeID)
case pb.ConfChangeAddLearnerNode:
r.addLearner(cc.NodeID)
case pb.ConfChangeRemoveNode: case pb.ConfChangeRemoveNode:
// block incoming proposal when local node is // block incoming proposal when local node is
// removed // removed

View File

@ -48,6 +48,7 @@ type Progress struct {
// When in ProgressStateSnapshot, leader should have sent out snapshot // When in ProgressStateSnapshot, leader should have sent out snapshot
// before and stops sending any replication message. // before and stops sending any replication message.
State ProgressStateType State ProgressStateType
// Paused is used in ProgressStateProbe. // Paused is used in ProgressStateProbe.
// When Paused is true, raft should pause sending replication message to this peer. // When Paused is true, raft should pause sending replication message to this peer.
Paused bool Paused bool
@ -76,6 +77,9 @@ type Progress struct {
// be freed by calling inflights.freeTo with the index of the last // be freed by calling inflights.freeTo with the index of the last
// received entry. // received entry.
ins *inflights ins *inflights
// IsLearner is true if this progress is tracked for a learner.
IsLearner bool
} }
func (pr *Progress) resetState(state ProgressStateType) { func (pr *Progress) resetState(state ProgressStateType) {

View File

@ -116,6 +116,10 @@ type Config struct {
// used for testing right now. // used for testing right now.
peers []uint64 peers []uint64
// learners contains the IDs of all leaner nodes (including self if the local node is a leaner) in the raft cluster.
// learners only receives entries from the leader node. It does not vote or promote itself.
learners []uint64
// ElectionTick is the number of Node.Tick invocations that must pass between // ElectionTick is the number of Node.Tick invocations that must pass between
// elections. That is, if a follower does not receive any message from the // elections. That is, if a follower does not receive any message from the
// leader of current term before ElectionTick has elapsed, it will become // leader of current term before ElectionTick has elapsed, it will become
@ -235,9 +239,13 @@ type raft struct {
maxInflight int maxInflight int
maxMsgSize uint64 maxMsgSize uint64
prs map[uint64]*Progress prs map[uint64]*Progress
learnerPrs map[uint64]*Progress
state StateType state StateType
// isLearner is true if the local raft node is a learner.
isLearner bool
votes map[uint64]bool votes map[uint64]bool
msgs []pb.Message msgs []pb.Message
@ -289,22 +297,26 @@ func newRaft(c *Config) *raft {
panic(err) // TODO(bdarnell) panic(err) // TODO(bdarnell)
} }
peers := c.peers peers := c.peers
if len(cs.Nodes) > 0 { learners := c.learners
if len(peers) > 0 { if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
if len(peers) > 0 || len(learners) > 0 {
// TODO(bdarnell): the peers argument is always nil except in // TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be // tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot. // updated to specify their nodes through a snapshot.
panic("cannot specify both newRaft(peers) and ConfState.Nodes)") panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
} }
peers = cs.Nodes peers = cs.Nodes
learners = cs.Learners
} }
r := &raft{ r := &raft{
id: c.ID, id: c.ID,
lead: None, lead: None,
isLearner: false,
raftLog: raftlog, raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg, maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs, maxInflight: c.MaxInflightMsgs,
prs: make(map[uint64]*Progress), prs: make(map[uint64]*Progress),
learnerPrs: make(map[uint64]*Progress),
electionTimeout: c.ElectionTick, electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick, heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger, logger: c.Logger,
@ -316,6 +328,16 @@ func newRaft(c *Config) *raft {
for _, p := range peers { for _, p := range peers {
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
} }
for _, p := range learners {
if _, ok := r.prs[p]; ok {
panic(fmt.Sprintf("node %x is in both learner and peer list", p))
}
r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
if r.id == p {
r.isLearner = true
}
}
if !isHardStateEqual(hs, emptyState) { if !isHardStateEqual(hs, emptyState) {
r.loadState(hs) r.loadState(hs)
} }
@ -349,10 +371,13 @@ func (r *raft) hardState() pb.HardState {
func (r *raft) quorum() int { return len(r.prs)/2 + 1 } func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
func (r *raft) nodes() []uint64 { func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs)) nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs))
for id := range r.prs { for id := range r.prs {
nodes = append(nodes, id) nodes = append(nodes, id)
} }
for id := range r.learnerPrs {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes)) sort.Sort(uint64Slice(nodes))
return nodes return nodes
} }
@ -391,9 +416,17 @@ func (r *raft) send(m pb.Message) {
r.msgs = append(r.msgs, m) r.msgs = append(r.msgs, m)
} }
func (r *raft) getProgress(id uint64) *Progress {
if pr, ok := r.prs[id]; ok {
return pr
}
return r.learnerPrs[id]
}
// sendAppend sends RPC, with entries to the given peer. // sendAppend sends RPC, with entries to the given peer.
func (r *raft) sendAppend(to uint64) { func (r *raft) sendAppend(to uint64) {
pr := r.prs[to] pr := r.getProgress(to)
if pr.IsPaused() { if pr.IsPaused() {
return return
} }
@ -458,7 +491,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// or it might not have all the committed entries. // or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to // The leader MUST NOT forward the follower's commit to
// an unmatched index. // an unmatched index.
commit := min(r.prs[to].Match, r.raftLog.committed) commit := min(r.getProgress(to).Match, r.raftLog.committed)
m := pb.Message{ m := pb.Message{
To: to, To: to,
Type: pb.MsgHeartbeat, Type: pb.MsgHeartbeat,
@ -469,15 +502,26 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
r.send(m) r.send(m)
} }
func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) {
for id, pr := range r.prs {
f(id, pr)
}
for id, pr := range r.learnerPrs {
f(id, pr)
}
}
// bcastAppend sends RPC, with entries to all peers that are not up-to-date // bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs. // according to the progress recorded in r.prs.
func (r *raft) bcastAppend() { func (r *raft) bcastAppend() {
for id := range r.prs { r.forEachProgress(func(id uint64, _ *Progress) {
if id == r.id { if id == r.id {
continue return
} }
r.sendAppend(id) r.sendAppend(id)
} })
} }
// bcastHeartbeat sends RPC, without entries to all the peers. // bcastHeartbeat sends RPC, without entries to all the peers.
@ -491,12 +535,12 @@ func (r *raft) bcastHeartbeat() {
} }
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
for id := range r.prs { r.forEachProgress(func(id uint64, _ *Progress) {
if id == r.id { if id == r.id {
continue return
} }
r.sendHeartbeat(id, ctx) r.sendHeartbeat(id, ctx)
} })
} }
// maybeCommit attempts to advance the commit index. Returns true if // maybeCommit attempts to advance the commit index. Returns true if
@ -505,8 +549,8 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
func (r *raft) maybeCommit() bool { func (r *raft) maybeCommit() bool {
// TODO(bmizerany): optimize.. Currently naive // TODO(bmizerany): optimize.. Currently naive
mis := make(uint64Slice, 0, len(r.prs)) mis := make(uint64Slice, 0, len(r.prs))
for id := range r.prs { for _, p := range r.prs {
mis = append(mis, r.prs[id].Match) mis = append(mis, p.Match)
} }
sort.Sort(sort.Reverse(mis)) sort.Sort(sort.Reverse(mis))
mci := mis[r.quorum()-1] mci := mis[r.quorum()-1]
@ -527,12 +571,13 @@ func (r *raft) reset(term uint64) {
r.abortLeaderTransfer() r.abortLeaderTransfer()
r.votes = make(map[uint64]bool) r.votes = make(map[uint64]bool)
for id := range r.prs { r.forEachProgress(func(id uint64, pr *Progress) {
r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)} *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
if id == r.id { if id == r.id {
r.prs[id].Match = r.raftLog.lastIndex() pr.Match = r.raftLog.lastIndex()
} }
} })
r.pendingConf = false r.pendingConf = false
r.readOnly = newReadOnly(r.readOnly.option) r.readOnly = newReadOnly(r.readOnly.option)
} }
@ -544,7 +589,7 @@ func (r *raft) appendEntry(es ...pb.Entry) {
es[i].Index = li + 1 + uint64(i) es[i].Index = li + 1 + uint64(i)
} }
r.raftLog.append(es...) r.raftLog.append(es...)
r.prs[r.id].maybeUpdate(r.raftLog.lastIndex()) r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex())
// Regardless of maybeCommit's return, our caller will call bcastAppend. // Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit() r.maybeCommit()
} }
@ -787,6 +832,12 @@ func (r *raft) Step(m pb.Message) error {
} }
case pb.MsgVote, pb.MsgPreVote: case pb.MsgVote, pb.MsgPreVote:
if r.isLearner {
// TODO: learner may need to vote, in case of node down when confchange.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
return nil
}
// The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should // The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should
// always equal r.Term. // always equal r.Term.
if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
@ -890,8 +941,8 @@ func stepLeader(r *raft, m pb.Message) {
} }
// All other message types require a progress for m.From (pr). // All other message types require a progress for m.From (pr).
pr, prOk := r.prs[m.From] pr := r.getProgress(m.From)
if !prOk { if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From) r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return return
} }
@ -990,6 +1041,10 @@ func stepLeader(r *raft, m pb.Message) {
} }
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader: case pb.MsgTransferLeader:
if pr.IsLearner {
r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
return
}
leadTransferee := m.From leadTransferee := m.From
lastLeadTransferee := r.leadTransferee lastLeadTransferee := r.leadTransferee
if lastLeadTransferee != None { if lastLeadTransferee != None {
@ -1166,20 +1221,37 @@ func (r *raft) restore(s pb.Snapshot) bool {
return false return false
} }
// The normal peer can't become learner.
if !r.isLearner {
for _, id := range s.Metadata.ConfState.Learners {
if id == r.id {
r.logger.Errorf("%x can't become learner when restores snapshot [index: %d, term: %d]", r.id, s.Metadata.Index, s.Metadata.Term)
return false
}
}
}
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
r.raftLog.restore(s) r.raftLog.restore(s)
r.prs = make(map[uint64]*Progress) r.prs = make(map[uint64]*Progress)
for _, n := range s.Metadata.ConfState.Nodes { r.learnerPrs = make(map[uint64]*Progress)
r.restoreNode(s.Metadata.ConfState.Nodes, false)
r.restoreNode(s.Metadata.ConfState.Learners, true)
return true
}
func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
for _, n := range nodes {
match, next := uint64(0), r.raftLog.lastIndex()+1 match, next := uint64(0), r.raftLog.lastIndex()+1
if n == r.id { if n == r.id {
match = next - 1 match = next - 1
r.isLearner = isLearner
} }
r.setProgress(n, match, next) r.setProgress(n, match, next, isLearner)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n]) r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n))
} }
return true
} }
// promotable indicates whether state machine can be promoted to leader, // promotable indicates whether state machine can be promoted to leader,
@ -1190,18 +1262,46 @@ func (r *raft) promotable() bool {
} }
func (r *raft) addNode(id uint64) { func (r *raft) addNode(id uint64) {
r.addNodeOrLearnerNode(id, false)
}
func (r *raft) addLearner(id uint64) {
r.addNodeOrLearnerNode(id, true)
}
func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
r.pendingConf = false r.pendingConf = false
if _, ok := r.prs[id]; ok { pr := r.getProgress(id)
// Ignore any redundant addNode calls (which can happen because the if pr == nil {
// initial bootstrapping entries are applied twice). r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
return } else {
if isLearner && !pr.IsLearner {
// can only change Learner to Voter
r.logger.Infof("%x ignored addLeaner: do not support changing %x from raft peer to learner.", r.id, id)
return
}
if isLearner == pr.IsLearner {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
// change Learner to Voter, use origin Learner progress
delete(r.learnerPrs, id)
pr.IsLearner = false
r.prs[id] = pr
}
if r.id == id {
r.isLearner = isLearner
} }
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
// When a node is first added, we should mark it as recently active. // When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked // Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has a chance to communicate with us. // before the added node has a chance to communicate with us.
r.prs[id].RecentActive = true pr = r.getProgress(id)
pr.RecentActive = true
} }
func (r *raft) removeNode(id uint64) { func (r *raft) removeNode(id uint64) {
@ -1209,7 +1309,7 @@ func (r *raft) removeNode(id uint64) {
r.pendingConf = false r.pendingConf = false
// do not try to commit or abort transferring if there is no nodes in the cluster. // do not try to commit or abort transferring if there is no nodes in the cluster.
if len(r.prs) == 0 { if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
return return
} }
@ -1226,12 +1326,22 @@ func (r *raft) removeNode(id uint64) {
func (r *raft) resetPendingConf() { r.pendingConf = false } func (r *raft) resetPendingConf() { r.pendingConf = false }
func (r *raft) setProgress(id, match, next uint64) { func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} if !isLearner {
delete(r.learnerPrs, id)
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
return
}
if _, ok := r.prs[id]; ok {
panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id))
}
r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
} }
func (r *raft) delProgress(id uint64) { func (r *raft) delProgress(id uint64) {
delete(r.prs, id) delete(r.prs, id)
delete(r.learnerPrs, id)
} }
func (r *raft) loadState(state pb.HardState) { func (r *raft) loadState(state pb.HardState) {
@ -1261,18 +1371,18 @@ func (r *raft) resetRandomizedElectionTimeout() {
func (r *raft) checkQuorumActive() bool { func (r *raft) checkQuorumActive() bool {
var act int var act int
for id := range r.prs { r.forEachProgress(func(id uint64, pr *Progress) {
if id == r.id { // self is always active if id == r.id { // self is always active
act++ act++
continue return
} }
if r.prs[id].RecentActive { if pr.RecentActive && !pr.IsLearner {
act++ act++
} }
r.prs[id].RecentActive = false pr.RecentActive = false
} })
return act >= r.quorum() return act >= r.quorum()
} }

View File

@ -348,6 +348,91 @@ func testLeaderElection(t *testing.T, preVote bool) {
} }
} }
// TestLearnerElectionTimeout verfies that the leader should not start election even
// when times out.
func TestLearnerElectionTimeout(t *testing.T) {
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
n1.becomeFollower(1, None)
n2.becomeFollower(1, None)
// n2 is learner. Learner should not start election even when times out.
setRandomizedElectionTimeout(n2, n2.electionTimeout)
for i := 0; i < n2.electionTimeout; i++ {
n2.tick()
}
if n2.state != StateFollower {
t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower)
}
}
// TestLearnerPromotion verifies that the leaner should not election until
// it is promoted to a normal peer.
func TestLearnerPromotion(t *testing.T) {
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
n1.becomeFollower(1, None)
n2.becomeFollower(1, None)
nt := newNetwork(n1, n2)
if n1.state == StateLeader {
t.Error("peer 1 state is leader, want not", n1.state)
}
// n1 should become leader
setRandomizedElectionTimeout(n1, n1.electionTimeout)
for i := 0; i < n1.electionTimeout; i++ {
n1.tick()
}
if n1.state != StateLeader {
t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader)
}
if n2.state != StateFollower {
t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower)
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
n1.addNode(2)
n2.addNode(2)
if n2.isLearner {
t.Error("peer 2 is learner, want not")
}
// n2 start election, should become leader
setRandomizedElectionTimeout(n2, n2.electionTimeout)
for i := 0; i < n2.electionTimeout; i++ {
n2.tick()
}
nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
if n1.state != StateFollower {
t.Errorf("peer 1 state: %s, want %s", n1.state, StateFollower)
}
if n2.state != StateLeader {
t.Errorf("peer 2 state: %s, want %s", n2.state, StateLeader)
}
}
// TestLearnerCannotVote checks that a learner can't vote even it receives a valid Vote request.
func TestLearnerCannotVote(t *testing.T) {
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
n2.becomeFollower(1, None)
n2.Step(pb.Message{From: 1, To: 2, Term: 2, Type: pb.MsgVote, LogTerm: 11, Index: 11})
if len(n2.msgs) != 0 {
t.Errorf("expect learner not to vote, but received %v messages", n2.msgs)
}
}
func TestLeaderCycle(t *testing.T) { func TestLeaderCycle(t *testing.T) {
testLeaderCycle(t, false) testLeaderCycle(t, false)
} }
@ -600,6 +685,47 @@ func TestLogReplication(t *testing.T) {
} }
} }
// TestLearnerLogReplication tests that a learner can receive entries from the leader.
func TestLearnerLogReplication(t *testing.T) {
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
nt := newNetwork(n1, n2)
n1.becomeFollower(1, None)
n2.becomeFollower(1, None)
setRandomizedElectionTimeout(n1, n1.electionTimeout)
for i := 0; i < n1.electionTimeout; i++ {
n1.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
// n1 is leader and n2 is learner
if n1.state != StateLeader {
t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader)
}
if !n2.isLearner {
t.Error("peer 2 state: not learner, want yes")
}
nextCommitted := n1.raftLog.committed + 1
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
if n1.raftLog.committed != nextCommitted {
t.Errorf("peer 1 wants committed to %d, but still %d", nextCommitted, n1.raftLog.committed)
}
if n1.raftLog.committed != n2.raftLog.committed {
t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
}
match := n1.getProgress(2).Match
if match != n2.raftLog.committed {
t.Errorf("progresss 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
}
}
func TestSingleNodeCommit(t *testing.T) { func TestSingleNodeCommit(t *testing.T) {
tt := newNetwork(nil) tt := newNetwork(nil)
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
@ -1058,7 +1184,7 @@ func TestCommit(t *testing.T) {
sm := newTestRaft(1, []uint64{1}, 5, 1, storage) sm := newTestRaft(1, []uint64{1}, 5, 1, storage)
for j := 0; j < len(tt.matches); j++ { for j := 0; j < len(tt.matches); j++ {
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1) sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
} }
sm.maybeCommit() sm.maybeCommit()
if g := sm.raftLog.committed; g != tt.w { if g := sm.raftLog.committed; g != tt.w {
@ -2326,6 +2452,130 @@ func TestRestore(t *testing.T) {
} }
} }
// TestRestoreWithLearner restores a snapshot which contains learners.
func TestRestoreWithLearner(t *testing.T) {
s := pb.Snapshot{
Metadata: pb.SnapshotMetadata{
Index: 11, // magic number
Term: 11, // magic number
ConfState: pb.ConfState{Nodes: []uint64{1, 2}, Learners: []uint64{3}},
},
}
storage := NewMemoryStorage()
sm := newTestLearnerRaft(3, []uint64{1, 2}, []uint64{3}, 10, 1, storage)
if ok := sm.restore(s); !ok {
t.Error("restore fail, want succeed")
}
if sm.raftLog.lastIndex() != s.Metadata.Index {
t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
}
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
}
sg := sm.nodes()
if len(sg) != len(s.Metadata.ConfState.Nodes)+len(s.Metadata.ConfState.Learners) {
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState)
}
for _, n := range s.Metadata.ConfState.Nodes {
if sm.prs[n].IsLearner {
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], false)
}
}
for _, n := range s.Metadata.ConfState.Learners {
if !sm.learnerPrs[n].IsLearner {
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], true)
}
}
if ok := sm.restore(s); ok {
t.Error("restore succeed, want fail")
}
}
// TestRestoreInvalidLearner verfies that a normal peer can't become learner again
// when restores snapshot.
func TestRestoreInvalidLearner(t *testing.T) {
s := pb.Snapshot{
Metadata: pb.SnapshotMetadata{
Index: 11, // magic number
Term: 11, // magic number
ConfState: pb.ConfState{Nodes: []uint64{1, 2}, Learners: []uint64{3}},
},
}
storage := NewMemoryStorage()
sm := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, storage)
if sm.isLearner {
t.Errorf("%x is learner, want not", sm.id)
}
if ok := sm.restore(s); ok {
t.Error("restore succeed, want fail")
}
}
// TestRestoreLearnerPromotion checks that a learner can become to a follower after
// restoring snapshot.
func TestRestoreLearnerPromotion(t *testing.T) {
s := pb.Snapshot{
Metadata: pb.SnapshotMetadata{
Index: 11, // magic number
Term: 11, // magic number
ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
},
}
storage := NewMemoryStorage()
sm := newTestLearnerRaft(3, []uint64{1, 2}, []uint64{3}, 10, 1, storage)
if !sm.isLearner {
t.Errorf("%x is not learner, want yes", sm.id)
}
if ok := sm.restore(s); !ok {
t.Error("restore fail, want succeed")
}
if sm.isLearner {
t.Errorf("%x is learner, want not", sm.id)
}
}
// TestLearnerReceiveSnapshot tests that a learner can receive a snpahost from leader
func TestLearnerReceiveSnapshot(t *testing.T) {
// restore the state machine from a snapshot so it has a compacted log and a snapshot
s := pb.Snapshot{
Metadata: pb.SnapshotMetadata{
Index: 11, // magic number
Term: 11, // magic number
ConfState: pb.ConfState{Nodes: []uint64{1}, Learners: []uint64{2}},
},
}
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
n1.restore(s)
// Force set n1 appplied index.
n1.raftLog.appliedTo(n1.raftLog.committed)
nt := newNetwork(n1, n2)
setRandomizedElectionTimeout(n1, n1.electionTimeout)
for i := 0; i < n1.electionTimeout; i++ {
n1.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if n2.raftLog.committed != n1.raftLog.committed {
t.Errorf("peer 2 must commit to %d, but %d", n1.raftLog.committed, n2.raftLog.committed)
}
}
func TestRestoreIgnoreSnapshot(t *testing.T) { func TestRestoreIgnoreSnapshot(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
commit := uint64(1) commit := uint64(1)
@ -2569,6 +2819,24 @@ func TestAddNode(t *testing.T) {
} }
} }
// TestAddLearner tests that addLearner could update pendingConf and nodes correctly.
func TestAddLearner(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.addLearner(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
}
if !r.learnerPrs[2].IsLearner {
t.Errorf("node 2 is learner %t, want %t", r.prs[2].IsLearner, true)
}
}
// TestAddNodeCheckQuorum tests that addNode does not trigger a leader election // TestAddNodeCheckQuorum tests that addNode does not trigger a leader election
// immediately when checkQuorum is set. // immediately when checkQuorum is set.
func TestAddNodeCheckQuorum(t *testing.T) { func TestAddNodeCheckQuorum(t *testing.T) {
@ -2626,6 +2894,27 @@ func TestRemoveNode(t *testing.T) {
} }
} }
// TestRemoveLearner tests that removeNode could update pendingConf, nodes and
// and removed list correctly.
func TestRemoveLearner(t *testing.T) {
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.removeNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
w := []uint64{1}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
// remove all nodes from cluster
r.removeNode(1)
w = []uint64{}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
}
func TestPromotable(t *testing.T) { func TestPromotable(t *testing.T) {
id := uint64(1) id := uint64(1)
tests := []struct { tests := []struct {
@ -3338,10 +3627,19 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
sm := newRaft(cfg) sm := newRaft(cfg)
npeers[id] = sm npeers[id] = sm
case *raft: case *raft:
learners := make(map[uint64]bool, len(v.learnerPrs))
for i := range v.learnerPrs {
learners[i] = true
}
v.id = id v.id = id
v.prs = make(map[uint64]*Progress) v.prs = make(map[uint64]*Progress)
v.learnerPrs = make(map[uint64]*Progress)
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
v.prs[peerAddrs[i]] = &Progress{} if _, ok := learners[peerAddrs[i]]; ok {
v.learnerPrs[peerAddrs[i]] = &Progress{IsLearner: true}
} else {
v.prs[peerAddrs[i]] = &Progress{}
}
} }
v.reset(v.Term) v.reset(v.Term)
npeers[id] = v npeers[id] = v
@ -3462,3 +3760,9 @@ func newTestConfig(id uint64, peers []uint64, election, heartbeat int, storage S
func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
return newRaft(newTestConfig(id, peers, election, heartbeat, storage)) return newRaft(newTestConfig(id, peers, election, heartbeat, storage))
} }
func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election, heartbeat int, storage Storage) *raft {
cfg := newTestConfig(id, peers, election, heartbeat, storage)
cfg.learners = learners
return newRaft(cfg)
}

View File

@ -162,20 +162,23 @@ func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft,
type ConfChangeType int32 type ConfChangeType int32
const ( const (
ConfChangeAddNode ConfChangeType = 0 ConfChangeAddNode ConfChangeType = 0
ConfChangeRemoveNode ConfChangeType = 1 ConfChangeRemoveNode ConfChangeType = 1
ConfChangeUpdateNode ConfChangeType = 2 ConfChangeUpdateNode ConfChangeType = 2
ConfChangeAddLearnerNode ConfChangeType = 3
) )
var ConfChangeType_name = map[int32]string{ var ConfChangeType_name = map[int32]string{
0: "ConfChangeAddNode", 0: "ConfChangeAddNode",
1: "ConfChangeRemoveNode", 1: "ConfChangeRemoveNode",
2: "ConfChangeUpdateNode", 2: "ConfChangeUpdateNode",
3: "ConfChangeAddLearnerNode",
} }
var ConfChangeType_value = map[string]int32{ var ConfChangeType_value = map[string]int32{
"ConfChangeAddNode": 0, "ConfChangeAddNode": 0,
"ConfChangeRemoveNode": 1, "ConfChangeRemoveNode": 1,
"ConfChangeUpdateNode": 2, "ConfChangeUpdateNode": 2,
"ConfChangeAddLearnerNode": 3,
} }
func (x ConfChangeType) Enum() *ConfChangeType { func (x ConfChangeType) Enum() *ConfChangeType {
@ -267,6 +270,7 @@ func (*HardState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []in
type ConfState struct { type ConfState struct {
Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"` Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"`
Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
} }
@ -537,6 +541,13 @@ func (m *ConfState) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintRaft(dAtA, i, uint64(num)) i = encodeVarintRaft(dAtA, i, uint64(num))
} }
} }
if len(m.Learners) > 0 {
for _, num := range m.Learners {
dAtA[i] = 0x10
i++
i = encodeVarintRaft(dAtA, i, uint64(num))
}
}
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized) i += copy(dAtA[i:], m.XXX_unrecognized)
} }
@ -700,6 +711,11 @@ func (m *ConfState) Size() (n int) {
n += 1 + sovRaft(uint64(e)) n += 1 + sovRaft(uint64(e))
} }
} }
if len(m.Learners) > 0 {
for _, e := range m.Learners {
n += 1 + sovRaft(uint64(e))
}
}
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized) n += len(m.XXX_unrecognized)
} }
@ -1619,6 +1635,68 @@ func (m *ConfState) Unmarshal(dAtA []byte) error {
} else { } else {
return fmt.Errorf("proto: wrong wireType = %d for field Nodes", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Nodes", wireType)
} }
case 2:
if wireType == 0 {
var v uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaft
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Learners = append(m.Learners, v)
} else if wireType == 2 {
var packedLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaft
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
packedLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if packedLen < 0 {
return ErrInvalidLengthRaft
}
postIndex := iNdEx + packedLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
for iNdEx < postIndex {
var v uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaft
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Learners = append(m.Learners, v)
}
} else {
return fmt.Errorf("proto: wrong wireType = %d for field Learners", wireType)
}
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipRaft(dAtA[iNdEx:]) skippy, err := skipRaft(dAtA[iNdEx:])
@ -1888,55 +1966,56 @@ var (
func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) } func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) }
var fileDescriptorRaft = []byte{ var fileDescriptorRaft = []byte{
// 790 bytes of a gzipped FileDescriptorProto // 815 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0xdb, 0x46, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0x23, 0x45,
0x10, 0x16, 0x29, 0xea, 0x6f, 0x28, 0xcb, 0xab, 0xb5, 0x5a, 0x2c, 0x0c, 0x43, 0x55, 0x85, 0x1e, 0x10, 0xf6, 0x8c, 0xc7, 0x7f, 0x35, 0x8e, 0xd3, 0xa9, 0x35, 0xa8, 0x15, 0x45, 0xc6, 0xb2, 0x38,
0x04, 0x17, 0x76, 0x5b, 0x1d, 0x7a, 0xe8, 0xcd, 0x96, 0x0a, 0x58, 0x40, 0x65, 0xb8, 0xb2, 0xdc, 0x58, 0x41, 0x1b, 0x20, 0x07, 0x0e, 0x48, 0x1c, 0x36, 0x09, 0x52, 0x22, 0xad, 0xa3, 0xc5, 0x9b,
0x43, 0x83, 0x20, 0x58, 0x8b, 0x2b, 0x4a, 0x89, 0xc9, 0x25, 0x96, 0x2b, 0xc7, 0xbe, 0x04, 0x79, 0xe5, 0x80, 0x84, 0x50, 0xc7, 0x53, 0x9e, 0x18, 0x32, 0xd3, 0xa3, 0x9e, 0xf6, 0xb2, 0xb9, 0x20,
0x80, 0x3c, 0x40, 0x2e, 0x79, 0x1f, 0x1f, 0x0d, 0xe4, 0x1e, 0xc4, 0xce, 0x8b, 0x04, 0xbb, 0x5c, 0x1e, 0x80, 0x07, 0xe0, 0xc2, 0xfb, 0xe4, 0xb8, 0x12, 0x77, 0xc4, 0x86, 0x17, 0x41, 0xdd, 0xd3,
0x4a, 0x94, 0x74, 0xdb, 0xf9, 0xbe, 0xe1, 0xcc, 0x37, 0xdf, 0xce, 0x12, 0x40, 0xd0, 0xa9, 0x3c, 0x63, 0xcf, 0x24, 0xb7, 0xae, 0xef, 0xab, 0xae, 0xfa, 0xea, 0xeb, 0x9a, 0x01, 0x50, 0x62, 0xa9,
0x8e, 0x04, 0x97, 0x1c, 0x17, 0xd5, 0x39, 0xba, 0xde, 0x6f, 0xf8, 0xdc, 0xe7, 0x1a, 0xfa, 0x4d, 0x8f, 0x32, 0x25, 0xb5, 0xc4, 0xb6, 0x39, 0x67, 0xd7, 0xfb, 0xc3, 0x58, 0xc6, 0xd2, 0x42, 0x9f,
0x9d, 0x12, 0xb6, 0xfd, 0x0e, 0x0a, 0x7f, 0x87, 0x52, 0xdc, 0xe3, 0x5f, 0xc1, 0x19, 0xdf, 0x47, 0x9b, 0x53, 0xc1, 0x4e, 0x7e, 0x83, 0xd6, 0xb7, 0xa9, 0x56, 0x77, 0xf8, 0x19, 0x04, 0x57, 0x77,
0x8c, 0x58, 0x2d, 0xab, 0x53, 0xeb, 0xd6, 0x8f, 0x93, 0xaf, 0x8e, 0x35, 0xa9, 0x88, 0x53, 0xe7, 0x19, 0x71, 0x6f, 0xec, 0x4d, 0x07, 0xc7, 0x7b, 0x47, 0xc5, 0xad, 0x23, 0x4b, 0x1a, 0xe2, 0x24,
0xe1, 0xcb, 0x4f, 0xb9, 0x91, 0x4e, 0xc2, 0x04, 0x9c, 0x31, 0x13, 0x01, 0xb1, 0x5b, 0x56, 0xc7, 0xb8, 0xff, 0xe7, 0x93, 0xc6, 0xdc, 0x26, 0x21, 0x87, 0xe0, 0x8a, 0x54, 0xc2, 0xfd, 0xb1, 0x37,
0x59, 0x32, 0x4c, 0x04, 0x78, 0x1f, 0x0a, 0x83, 0xd0, 0x63, 0x77, 0x24, 0x9f, 0xa1, 0x12, 0x08, 0x0d, 0x36, 0x0c, 0xa9, 0x04, 0xf7, 0xa1, 0x75, 0x91, 0x46, 0xf4, 0x8e, 0x37, 0x2b, 0x54, 0x01,
0x63, 0x70, 0xfa, 0x54, 0x52, 0xe2, 0xb4, 0xac, 0x4e, 0x75, 0xa4, 0xcf, 0xed, 0xf7, 0x16, 0xa0, 0x21, 0x42, 0x70, 0x26, 0xb4, 0xe0, 0xc1, 0xd8, 0x9b, 0xf6, 0xe7, 0xf6, 0x3c, 0xf9, 0xdd, 0x03,
0xcb, 0x90, 0x46, 0xf1, 0x8c, 0xcb, 0x21, 0x93, 0xd4, 0xa3, 0x92, 0xe2, 0x3f, 0x01, 0x26, 0x3c, 0xf6, 0x3a, 0x15, 0x59, 0x7e, 0x23, 0xf5, 0x8c, 0xb4, 0x88, 0x84, 0x16, 0xf8, 0x15, 0xc0, 0x42,
0x9c, 0xbe, 0x8a, 0x25, 0x95, 0x89, 0x22, 0x77, 0xa5, 0xa8, 0xc7, 0xc3, 0xe9, 0xa5, 0x22, 0x4c, 0xa6, 0xcb, 0x9f, 0x72, 0x2d, 0x74, 0xa1, 0x28, 0xdc, 0x2a, 0x3a, 0x95, 0xe9, 0xf2, 0xb5, 0x21,
0xf1, 0xca, 0x24, 0x05, 0x54, 0xf3, 0xb9, 0x6e, 0x9e, 0xd5, 0x95, 0x40, 0x4a, 0xb2, 0x54, 0x92, 0x5c, 0xf1, 0xde, 0xa2, 0x04, 0x4c, 0xf3, 0x95, 0x6d, 0x5e, 0xd5, 0x55, 0x40, 0x46, 0xb2, 0x36,
0xb3, 0xba, 0x34, 0xd2, 0xfe, 0x1f, 0xca, 0xa9, 0x02, 0x25, 0x51, 0x29, 0xd0, 0x3d, 0xab, 0x23, 0x92, 0xab, 0xba, 0x2c, 0x32, 0xf9, 0x01, 0xba, 0xa5, 0x02, 0x23, 0xd1, 0x28, 0xb0, 0x3d, 0xfb,
0x7d, 0xc6, 0x7f, 0x41, 0x39, 0x30, 0xca, 0x74, 0x61, 0xb7, 0x4b, 0x52, 0x2d, 0x9b, 0xca, 0x4d, 0x73, 0x7b, 0xc6, 0xaf, 0xa1, 0x9b, 0x38, 0x65, 0xb6, 0x70, 0x78, 0xcc, 0x4b, 0x2d, 0x8f, 0x95,
0xdd, 0x65, 0x7e, 0xfb, 0x53, 0x1e, 0x4a, 0x43, 0x16, 0xc7, 0xd4, 0x67, 0xf8, 0x08, 0x1c, 0xb9, 0xbb, 0xba, 0x9b, 0xfc, 0xc9, 0x5f, 0x4d, 0xe8, 0xcc, 0x28, 0xcf, 0x45, 0x4c, 0xf8, 0x1c, 0x02,
0x72, 0x78, 0x2f, 0xad, 0x61, 0xe8, 0xac, 0xc7, 0x2a, 0x0d, 0x37, 0xc0, 0x96, 0x7c, 0x6d, 0x12, 0xbd, 0x75, 0xf8, 0x59, 0x59, 0xc3, 0xd1, 0x55, 0x8f, 0x4d, 0x1a, 0x0e, 0xc1, 0xd7, 0xb2, 0x36,
0x5b, 0x72, 0x35, 0xc6, 0x54, 0xf0, 0x8d, 0x31, 0x14, 0xb2, 0x1c, 0xd0, 0xd9, 0x1c, 0x10, 0x37, 0x89, 0xaf, 0xa5, 0x19, 0x63, 0xa9, 0xe4, 0xa3, 0x31, 0x0c, 0xb2, 0x19, 0x30, 0x78, 0x3c, 0x20,
0xa1, 0x74, 0xc3, 0x7d, 0x7d, 0x61, 0x85, 0x0c, 0x99, 0x82, 0x2b, 0xdb, 0x8a, 0xdb, 0xb6, 0x1d, 0x8e, 0xa0, 0x73, 0x2b, 0x63, 0xfb, 0x60, 0xad, 0x0a, 0x59, 0x82, 0x5b, 0xdb, 0xda, 0x4f, 0x6d,
0x41, 0x89, 0x85, 0x52, 0xcc, 0x59, 0x4c, 0x4a, 0xad, 0x7c, 0xc7, 0xed, 0xee, 0xac, 0x6d, 0x46, 0x7b, 0x0e, 0x1d, 0x4a, 0xb5, 0x5a, 0x51, 0xce, 0x3b, 0xe3, 0xe6, 0x34, 0x3c, 0xde, 0xa9, 0x6d,
0x5a, 0xca, 0xe4, 0xe0, 0x03, 0x28, 0x4e, 0x78, 0x10, 0xcc, 0x25, 0x29, 0x67, 0x6a, 0x19, 0x0c, 0x46, 0x59, 0xca, 0xe5, 0xe0, 0x01, 0xb4, 0x17, 0x32, 0x49, 0x56, 0x9a, 0x77, 0x2b, 0xb5, 0x1c,
0x77, 0xa1, 0x1c, 0x1b, 0xc7, 0x48, 0x45, 0x3b, 0x89, 0x36, 0x9d, 0x4c, 0x1d, 0x4c, 0xf3, 0x54, 0x86, 0xc7, 0xd0, 0xcd, 0x9d, 0x63, 0xbc, 0x67, 0x9d, 0x64, 0x8f, 0x9d, 0x2c, 0x1d, 0x2c, 0xf3,
0x45, 0xc1, 0x5e, 0xb3, 0x89, 0x24, 0xd0, 0xb2, 0x3a, 0xe5, 0xb4, 0x62, 0x82, 0xe1, 0x5f, 0x00, 0x4c, 0x45, 0x45, 0x3f, 0xd3, 0x42, 0x73, 0x18, 0x7b, 0xd3, 0x6e, 0x59, 0xb1, 0xc0, 0xf0, 0x53,
0x92, 0xd3, 0xd9, 0x3c, 0x94, 0xc4, 0xcd, 0xf4, 0xcc, 0xe0, 0x98, 0x40, 0x69, 0xc2, 0x43, 0xc9, 0x80, 0xe2, 0x74, 0xbe, 0x4a, 0x35, 0x0f, 0x2b, 0x3d, 0x2b, 0x38, 0x72, 0xe8, 0x2c, 0x64, 0xaa,
0xee, 0x24, 0xa9, 0xea, 0x8b, 0x4d, 0xc3, 0xf6, 0x4b, 0xa8, 0x9c, 0x51, 0xe1, 0x25, 0xeb, 0x93, 0xe9, 0x9d, 0xe6, 0x7d, 0xfb, 0xb0, 0x65, 0x38, 0xf9, 0x11, 0x7a, 0xe7, 0x42, 0x45, 0xc5, 0xfa,
0x3a, 0x68, 0x6d, 0x39, 0x48, 0xc0, 0xb9, 0xe5, 0x92, 0xad, 0xef, 0xbb, 0x42, 0x32, 0x03, 0xe7, 0x94, 0x0e, 0x7a, 0x4f, 0x1c, 0xe4, 0x10, 0xbc, 0x95, 0x9a, 0xea, 0xfb, 0x6e, 0x90, 0xca, 0xc0,
0xb7, 0x07, 0x6e, 0xff, 0x0c, 0x95, 0xe5, 0xba, 0xe2, 0x06, 0x14, 0x42, 0xee, 0xb1, 0x98, 0x58, 0xcd, 0xa7, 0x03, 0x4f, 0xbe, 0x81, 0xde, 0x66, 0x5d, 0x71, 0x08, 0xad, 0x54, 0x46, 0x94, 0x73,
0xad, 0x7c, 0xc7, 0x19, 0x25, 0x41, 0xfb, 0x83, 0x05, 0xa0, 0x72, 0x7a, 0x33, 0x1a, 0xfa, 0xfa, 0x6f, 0xdc, 0x9c, 0x06, 0xf3, 0x22, 0xc0, 0x7d, 0xe8, 0xde, 0x92, 0x50, 0x29, 0xa9, 0x9c, 0xfb,
0xd6, 0x07, 0xfd, 0x35, 0x05, 0xf6, 0xa0, 0x8f, 0x7f, 0x37, 0x8f, 0xd3, 0xd6, 0xab, 0xf3, 0x63, 0x96, 0xd8, 0xc4, 0x93, 0x3f, 0x3c, 0x00, 0x73, 0xff, 0xf4, 0x46, 0xa4, 0xb1, 0xdd, 0x88, 0x8b,
0xf6, 0x29, 0x24, 0xdf, 0x6d, 0xbd, 0xd0, 0x03, 0x28, 0x9e, 0x73, 0x8f, 0x0d, 0xfa, 0xeb, 0xba, 0xb3, 0x9a, 0x3a, 0xff, 0xe2, 0x0c, 0xbf, 0x70, 0x1f, 0xae, 0x6f, 0xd7, 0xea, 0xe3, 0xea, 0x67,
0x12, 0x4c, 0x19, 0xd2, 0x33, 0x86, 0x24, 0x8f, 0x31, 0x0d, 0x0f, 0xff, 0x80, 0xca, 0xf2, 0xc9, 0x52, 0xdc, 0x7b, 0xf2, 0xf5, 0x1e, 0x40, 0xfb, 0x52, 0x46, 0x74, 0x71, 0x56, 0xd7, 0x5c, 0x60,
0xe3, 0x5d, 0x70, 0x75, 0x70, 0xce, 0x45, 0x40, 0x6f, 0x50, 0x0e, 0xef, 0xc1, 0xae, 0x06, 0x56, 0xc6, 0xac, 0x53, 0x67, 0x56, 0xf1, 0xa1, 0x96, 0xe1, 0xe1, 0x97, 0xd0, 0xdb, 0xfc, 0x0e, 0x70,
0x8d, 0x91, 0x75, 0xf8, 0xd9, 0x06, 0x37, 0xb3, 0xc4, 0x18, 0xa0, 0x38, 0x8c, 0xfd, 0xb3, 0x45, 0x17, 0x42, 0x1b, 0x5c, 0x4a, 0x95, 0x88, 0x5b, 0xd6, 0xc0, 0x67, 0xb0, 0x6b, 0x81, 0x6d, 0x63,
0x84, 0x72, 0xd8, 0x85, 0xd2, 0x30, 0xf6, 0x4f, 0x19, 0x95, 0xc8, 0x32, 0xc1, 0x85, 0xe0, 0x11, 0xe6, 0x1d, 0xfe, 0xed, 0x43, 0x58, 0x59, 0x70, 0x04, 0x68, 0xcf, 0xf2, 0xf8, 0x7c, 0x9d, 0xb1,
0xb2, 0x4d, 0xd6, 0x49, 0x14, 0xa1, 0x3c, 0xae, 0x01, 0x24, 0xe7, 0x11, 0x8b, 0x23, 0xe4, 0x98, 0x06, 0x86, 0xd0, 0x99, 0xe5, 0xf1, 0x09, 0x09, 0xcd, 0x3c, 0x17, 0xbc, 0x52, 0x32, 0x63, 0xbe,
0xc4, 0xff, 0xb8, 0x64, 0xa8, 0xa0, 0x44, 0x98, 0x40, 0xb3, 0x45, 0xc3, 0xaa, 0x85, 0x41, 0x25, 0xcb, 0x7a, 0x91, 0x65, 0xac, 0x89, 0x03, 0x80, 0xe2, 0x3c, 0xa7, 0x3c, 0x63, 0x81, 0x4b, 0xfc,
0x8c, 0xa0, 0xaa, 0x9a, 0x31, 0x2a, 0xe4, 0xb5, 0xea, 0x52, 0xc6, 0x0d, 0x40, 0x59, 0x44, 0x7f, 0x5e, 0x6a, 0x62, 0x2d, 0x23, 0xc2, 0x05, 0x96, 0x6d, 0x3b, 0xd6, 0x2c, 0x13, 0xeb, 0x20, 0x83,
0x54, 0xc1, 0x18, 0x6a, 0xc3, 0xd8, 0xbf, 0x0a, 0x05, 0xa3, 0x93, 0x19, 0xbd, 0xbe, 0x61, 0x08, 0xbe, 0x69, 0x46, 0x42, 0xe9, 0x6b, 0xd3, 0xa5, 0x8b, 0x43, 0x60, 0x55, 0xc4, 0x5e, 0xea, 0x21,
0x70, 0x1d, 0x76, 0x4c, 0x21, 0x75, 0x41, 0x8b, 0x18, 0xb9, 0x26, 0xad, 0x37, 0x63, 0x93, 0x37, 0xc2, 0x60, 0x96, 0xc7, 0x6f, 0x52, 0x45, 0x62, 0x71, 0x23, 0xae, 0x6f, 0x89, 0x01, 0xee, 0xc1,
0xff, 0x2e, 0xb8, 0x58, 0x04, 0xa8, 0x8a, 0x7f, 0x80, 0xfa, 0x30, 0xf6, 0xc7, 0x82, 0x86, 0xf1, 0x8e, 0x2b, 0x64, 0x1e, 0x6f, 0x9d, 0xb3, 0xd0, 0xa5, 0x9d, 0xde, 0xd0, 0xe2, 0x97, 0xef, 0xd6,
0x94, 0x89, 0x7f, 0x18, 0xf5, 0x98, 0x40, 0x3b, 0xe6, 0xeb, 0xf1, 0x3c, 0x60, 0x7c, 0x21, 0xcf, 0x52, 0xad, 0x13, 0xd6, 0xc7, 0x8f, 0x60, 0x6f, 0x96, 0xc7, 0x57, 0x4a, 0xa4, 0xf9, 0x92, 0xd4,
0xf9, 0x5b, 0x54, 0x33, 0x62, 0x46, 0x8c, 0x7a, 0xfa, 0x87, 0x87, 0x76, 0x8d, 0x98, 0x25, 0xa2, 0x4b, 0x12, 0x11, 0x29, 0xb6, 0xe3, 0x6e, 0x5f, 0xad, 0x12, 0x92, 0x6b, 0x7d, 0x29, 0x7f, 0x65,
0xc5, 0x20, 0x33, 0xef, 0x85, 0x60, 0x7a, 0xc4, 0xba, 0xe9, 0x6a, 0x62, 0x9d, 0x83, 0x0f, 0x5f, 0x03, 0x27, 0x66, 0x4e, 0x22, 0xb2, 0x3f, 0x43, 0xb6, 0xeb, 0xc4, 0x6c, 0x10, 0x2b, 0x86, 0xb9,
0x40, 0x6d, 0xfd, 0x7a, 0x95, 0x8e, 0x15, 0x72, 0xe2, 0x79, 0xea, 0x2e, 0x51, 0x0e, 0x13, 0x68, 0x79, 0x5f, 0x29, 0xb2, 0x23, 0xee, 0xb9, 0xae, 0x2e, 0xb6, 0x39, 0x78, 0x78, 0x07, 0x83, 0xfa,
0xac, 0xe0, 0x11, 0x0b, 0xf8, 0x2d, 0xd3, 0x8c, 0xb5, 0xce, 0x5c, 0x45, 0x1e, 0x95, 0x09, 0x63, 0xf3, 0x1a, 0x1d, 0x5b, 0xe4, 0x45, 0x14, 0x99, 0xb7, 0x64, 0x0d, 0xe4, 0x30, 0xdc, 0xc2, 0x73,
0x9f, 0x92, 0x87, 0xa7, 0x66, 0xee, 0xf1, 0xa9, 0x99, 0x7b, 0x78, 0x6e, 0x5a, 0x8f, 0xcf, 0x4d, 0x4a, 0xe4, 0x5b, 0xb2, 0x8c, 0x57, 0x67, 0xde, 0x64, 0x91, 0xd0, 0x05, 0xe3, 0xe3, 0x01, 0xf0,
0xeb, 0xeb, 0x73, 0xd3, 0xfa, 0xf8, 0xad, 0x99, 0xfb, 0x1e, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x30, 0x5a, 0xa9, 0x97, 0xc5, 0x36, 0x5a, 0xb6, 0x79, 0xc2, 0xef, 0x3f, 0x8c, 0x1a, 0xef, 0x3f, 0x8c,
0x01, 0x41, 0x3a, 0x06, 0x00, 0x00, 0x1a, 0xf7, 0x0f, 0x23, 0xef, 0xfd, 0xc3, 0xc8, 0xfb, 0xf7, 0x61, 0xe4, 0xfd, 0xf9, 0xdf, 0xa8,
0xf1, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x86, 0x52, 0x5b, 0xe0, 0x74, 0x06, 0x00, 0x00,
} }

View File

@ -76,13 +76,15 @@ message HardState {
} }
message ConfState { message ConfState {
repeated uint64 nodes = 1; repeated uint64 nodes = 1;
repeated uint64 learners = 2;
} }
enum ConfChangeType { enum ConfChangeType {
ConfChangeAddNode = 0; ConfChangeAddNode = 0;
ConfChangeRemoveNode = 1; ConfChangeRemoveNode = 1;
ConfChangeUpdateNode = 2; ConfChangeUpdateNode = 2;
ConfChangeAddLearnerNode = 3;
} }
message ConfChange { message ConfChange {

View File

@ -175,6 +175,8 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
switch cc.Type { switch cc.Type {
case pb.ConfChangeAddNode: case pb.ConfChangeAddNode:
rn.raft.addNode(cc.NodeID) rn.raft.addNode(cc.NodeID)
case pb.ConfChangeAddLearnerNode:
rn.raft.addLearner(cc.NodeID)
case pb.ConfChangeRemoveNode: case pb.ConfChangeRemoveNode:
rn.raft.removeNode(cc.NodeID) rn.raft.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode: case pb.ConfChangeUpdateNode:
@ -191,7 +193,7 @@ func (rn *RawNode) Step(m pb.Message) error {
if IsLocalMsg(m.Type) { if IsLocalMsg(m.Type) {
return ErrStepLocalMsg return ErrStepLocalMsg
} }
if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m.Type) { if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
return rn.raft.Step(m) return rn.raft.Step(m)
} }
return ErrStepPeerNotFound return ErrStepPeerNotFound

View File

@ -49,6 +49,10 @@ func getStatus(r *raft) Status {
for id, p := range r.prs { for id, p := range r.prs {
s.Progress[id] = *p s.Progress[id] = *p
} }
for id, p := range r.learnerPrs {
s.Progress[id] = *p
}
} }
return s return s