mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: support learner
This commit is contained in:
parent
b64c1bfce6
commit
c6f2db2e92
@ -319,7 +319,7 @@ func (n *node) run(r *raft) {
|
||||
r.Step(m)
|
||||
case m := <-n.recvc:
|
||||
// filter out response message from unknown From.
|
||||
if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) {
|
||||
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
|
||||
r.Step(m) // raft never returns an error
|
||||
}
|
||||
case cc := <-n.confc:
|
||||
@ -334,6 +334,8 @@ func (n *node) run(r *raft) {
|
||||
switch cc.Type {
|
||||
case pb.ConfChangeAddNode:
|
||||
r.addNode(cc.NodeID)
|
||||
case pb.ConfChangeAddLearnerNode:
|
||||
r.addLearner(cc.NodeID)
|
||||
case pb.ConfChangeRemoveNode:
|
||||
// block incoming proposal when local node is
|
||||
// removed
|
||||
|
@ -48,6 +48,7 @@ type Progress struct {
|
||||
// When in ProgressStateSnapshot, leader should have sent out snapshot
|
||||
// before and stops sending any replication message.
|
||||
State ProgressStateType
|
||||
|
||||
// Paused is used in ProgressStateProbe.
|
||||
// When Paused is true, raft should pause sending replication message to this peer.
|
||||
Paused bool
|
||||
@ -76,6 +77,9 @@ type Progress struct {
|
||||
// be freed by calling inflights.freeTo with the index of the last
|
||||
// received entry.
|
||||
ins *inflights
|
||||
|
||||
// IsLearner is true if this progress is tracked for a learner.
|
||||
IsLearner bool
|
||||
}
|
||||
|
||||
func (pr *Progress) resetState(state ProgressStateType) {
|
||||
|
188
raft/raft.go
188
raft/raft.go
@ -116,6 +116,10 @@ type Config struct {
|
||||
// used for testing right now.
|
||||
peers []uint64
|
||||
|
||||
// learners contains the IDs of all leaner nodes (including self if the local node is a leaner) in the raft cluster.
|
||||
// learners only receives entries from the leader node. It does not vote or promote itself.
|
||||
learners []uint64
|
||||
|
||||
// ElectionTick is the number of Node.Tick invocations that must pass between
|
||||
// elections. That is, if a follower does not receive any message from the
|
||||
// leader of current term before ElectionTick has elapsed, it will become
|
||||
@ -235,9 +239,13 @@ type raft struct {
|
||||
maxInflight int
|
||||
maxMsgSize uint64
|
||||
prs map[uint64]*Progress
|
||||
learnerPrs map[uint64]*Progress
|
||||
|
||||
state StateType
|
||||
|
||||
// isLearner is true if the local raft node is a learner.
|
||||
isLearner bool
|
||||
|
||||
votes map[uint64]bool
|
||||
|
||||
msgs []pb.Message
|
||||
@ -289,22 +297,26 @@ func newRaft(c *Config) *raft {
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
peers := c.peers
|
||||
if len(cs.Nodes) > 0 {
|
||||
if len(peers) > 0 {
|
||||
learners := c.learners
|
||||
if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
|
||||
if len(peers) > 0 || len(learners) > 0 {
|
||||
// TODO(bdarnell): the peers argument is always nil except in
|
||||
// tests; the argument should be removed and these tests should be
|
||||
// updated to specify their nodes through a snapshot.
|
||||
panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
|
||||
panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
|
||||
}
|
||||
peers = cs.Nodes
|
||||
learners = cs.Learners
|
||||
}
|
||||
r := &raft{
|
||||
id: c.ID,
|
||||
lead: None,
|
||||
isLearner: false,
|
||||
raftLog: raftlog,
|
||||
maxMsgSize: c.MaxSizePerMsg,
|
||||
maxInflight: c.MaxInflightMsgs,
|
||||
prs: make(map[uint64]*Progress),
|
||||
learnerPrs: make(map[uint64]*Progress),
|
||||
electionTimeout: c.ElectionTick,
|
||||
heartbeatTimeout: c.HeartbeatTick,
|
||||
logger: c.Logger,
|
||||
@ -316,6 +328,16 @@ func newRaft(c *Config) *raft {
|
||||
for _, p := range peers {
|
||||
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
|
||||
}
|
||||
for _, p := range learners {
|
||||
if _, ok := r.prs[p]; ok {
|
||||
panic(fmt.Sprintf("node %x is in both learner and peer list", p))
|
||||
}
|
||||
r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
|
||||
if r.id == p {
|
||||
r.isLearner = true
|
||||
}
|
||||
}
|
||||
|
||||
if !isHardStateEqual(hs, emptyState) {
|
||||
r.loadState(hs)
|
||||
}
|
||||
@ -349,10 +371,13 @@ func (r *raft) hardState() pb.HardState {
|
||||
func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
|
||||
|
||||
func (r *raft) nodes() []uint64 {
|
||||
nodes := make([]uint64, 0, len(r.prs))
|
||||
nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs))
|
||||
for id := range r.prs {
|
||||
nodes = append(nodes, id)
|
||||
}
|
||||
for id := range r.learnerPrs {
|
||||
nodes = append(nodes, id)
|
||||
}
|
||||
sort.Sort(uint64Slice(nodes))
|
||||
return nodes
|
||||
}
|
||||
@ -391,9 +416,17 @@ func (r *raft) send(m pb.Message) {
|
||||
r.msgs = append(r.msgs, m)
|
||||
}
|
||||
|
||||
func (r *raft) getProgress(id uint64) *Progress {
|
||||
if pr, ok := r.prs[id]; ok {
|
||||
return pr
|
||||
}
|
||||
|
||||
return r.learnerPrs[id]
|
||||
}
|
||||
|
||||
// sendAppend sends RPC, with entries to the given peer.
|
||||
func (r *raft) sendAppend(to uint64) {
|
||||
pr := r.prs[to]
|
||||
pr := r.getProgress(to)
|
||||
if pr.IsPaused() {
|
||||
return
|
||||
}
|
||||
@ -458,7 +491,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
|
||||
// or it might not have all the committed entries.
|
||||
// The leader MUST NOT forward the follower's commit to
|
||||
// an unmatched index.
|
||||
commit := min(r.prs[to].Match, r.raftLog.committed)
|
||||
commit := min(r.getProgress(to).Match, r.raftLog.committed)
|
||||
m := pb.Message{
|
||||
To: to,
|
||||
Type: pb.MsgHeartbeat,
|
||||
@ -469,15 +502,26 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
|
||||
r.send(m)
|
||||
}
|
||||
|
||||
func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) {
|
||||
for id, pr := range r.prs {
|
||||
f(id, pr)
|
||||
}
|
||||
|
||||
for id, pr := range r.learnerPrs {
|
||||
f(id, pr)
|
||||
}
|
||||
}
|
||||
|
||||
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
|
||||
// according to the progress recorded in r.prs.
|
||||
func (r *raft) bcastAppend() {
|
||||
for id := range r.prs {
|
||||
r.forEachProgress(func(id uint64, _ *Progress) {
|
||||
if id == r.id {
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
r.sendAppend(id)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// bcastHeartbeat sends RPC, without entries to all the peers.
|
||||
@ -491,12 +535,12 @@ func (r *raft) bcastHeartbeat() {
|
||||
}
|
||||
|
||||
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
|
||||
for id := range r.prs {
|
||||
r.forEachProgress(func(id uint64, _ *Progress) {
|
||||
if id == r.id {
|
||||
continue
|
||||
return
|
||||
}
|
||||
r.sendHeartbeat(id, ctx)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// maybeCommit attempts to advance the commit index. Returns true if
|
||||
@ -505,8 +549,8 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
|
||||
func (r *raft) maybeCommit() bool {
|
||||
// TODO(bmizerany): optimize.. Currently naive
|
||||
mis := make(uint64Slice, 0, len(r.prs))
|
||||
for id := range r.prs {
|
||||
mis = append(mis, r.prs[id].Match)
|
||||
for _, p := range r.prs {
|
||||
mis = append(mis, p.Match)
|
||||
}
|
||||
sort.Sort(sort.Reverse(mis))
|
||||
mci := mis[r.quorum()-1]
|
||||
@ -527,12 +571,13 @@ func (r *raft) reset(term uint64) {
|
||||
r.abortLeaderTransfer()
|
||||
|
||||
r.votes = make(map[uint64]bool)
|
||||
for id := range r.prs {
|
||||
r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)}
|
||||
r.forEachProgress(func(id uint64, pr *Progress) {
|
||||
*pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
|
||||
if id == r.id {
|
||||
r.prs[id].Match = r.raftLog.lastIndex()
|
||||
pr.Match = r.raftLog.lastIndex()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
r.pendingConf = false
|
||||
r.readOnly = newReadOnly(r.readOnly.option)
|
||||
}
|
||||
@ -544,7 +589,7 @@ func (r *raft) appendEntry(es ...pb.Entry) {
|
||||
es[i].Index = li + 1 + uint64(i)
|
||||
}
|
||||
r.raftLog.append(es...)
|
||||
r.prs[r.id].maybeUpdate(r.raftLog.lastIndex())
|
||||
r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex())
|
||||
// Regardless of maybeCommit's return, our caller will call bcastAppend.
|
||||
r.maybeCommit()
|
||||
}
|
||||
@ -787,6 +832,12 @@ func (r *raft) Step(m pb.Message) error {
|
||||
}
|
||||
|
||||
case pb.MsgVote, pb.MsgPreVote:
|
||||
if r.isLearner {
|
||||
// TODO: learner may need to vote, in case of node down when confchange.
|
||||
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
|
||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
|
||||
return nil
|
||||
}
|
||||
// The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should
|
||||
// always equal r.Term.
|
||||
if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
|
||||
@ -890,8 +941,8 @@ func stepLeader(r *raft, m pb.Message) {
|
||||
}
|
||||
|
||||
// All other message types require a progress for m.From (pr).
|
||||
pr, prOk := r.prs[m.From]
|
||||
if !prOk {
|
||||
pr := r.getProgress(m.From)
|
||||
if pr == nil {
|
||||
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
|
||||
return
|
||||
}
|
||||
@ -990,6 +1041,10 @@ func stepLeader(r *raft, m pb.Message) {
|
||||
}
|
||||
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
|
||||
case pb.MsgTransferLeader:
|
||||
if pr.IsLearner {
|
||||
r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
|
||||
return
|
||||
}
|
||||
leadTransferee := m.From
|
||||
lastLeadTransferee := r.leadTransferee
|
||||
if lastLeadTransferee != None {
|
||||
@ -1166,20 +1221,37 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// The normal peer can't become learner.
|
||||
if !r.isLearner {
|
||||
for _, id := range s.Metadata.ConfState.Learners {
|
||||
if id == r.id {
|
||||
r.logger.Errorf("%x can't become learner when restores snapshot [index: %d, term: %d]", r.id, s.Metadata.Index, s.Metadata.Term)
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
||||
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||
|
||||
r.raftLog.restore(s)
|
||||
r.prs = make(map[uint64]*Progress)
|
||||
for _, n := range s.Metadata.ConfState.Nodes {
|
||||
r.learnerPrs = make(map[uint64]*Progress)
|
||||
r.restoreNode(s.Metadata.ConfState.Nodes, false)
|
||||
r.restoreNode(s.Metadata.ConfState.Learners, true)
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
|
||||
for _, n := range nodes {
|
||||
match, next := uint64(0), r.raftLog.lastIndex()+1
|
||||
if n == r.id {
|
||||
match = next - 1
|
||||
r.isLearner = isLearner
|
||||
}
|
||||
r.setProgress(n, match, next)
|
||||
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
|
||||
r.setProgress(n, match, next, isLearner)
|
||||
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n))
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// promotable indicates whether state machine can be promoted to leader,
|
||||
@ -1190,18 +1262,46 @@ func (r *raft) promotable() bool {
|
||||
}
|
||||
|
||||
func (r *raft) addNode(id uint64) {
|
||||
r.addNodeOrLearnerNode(id, false)
|
||||
}
|
||||
|
||||
func (r *raft) addLearner(id uint64) {
|
||||
r.addNodeOrLearnerNode(id, true)
|
||||
}
|
||||
|
||||
func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
|
||||
r.pendingConf = false
|
||||
if _, ok := r.prs[id]; ok {
|
||||
// Ignore any redundant addNode calls (which can happen because the
|
||||
// initial bootstrapping entries are applied twice).
|
||||
return
|
||||
pr := r.getProgress(id)
|
||||
if pr == nil {
|
||||
r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
|
||||
} else {
|
||||
if isLearner && !pr.IsLearner {
|
||||
// can only change Learner to Voter
|
||||
r.logger.Infof("%x ignored addLeaner: do not support changing %x from raft peer to learner.", r.id, id)
|
||||
return
|
||||
}
|
||||
|
||||
if isLearner == pr.IsLearner {
|
||||
// Ignore any redundant addNode calls (which can happen because the
|
||||
// initial bootstrapping entries are applied twice).
|
||||
return
|
||||
}
|
||||
|
||||
// change Learner to Voter, use origin Learner progress
|
||||
delete(r.learnerPrs, id)
|
||||
pr.IsLearner = false
|
||||
r.prs[id] = pr
|
||||
}
|
||||
|
||||
if r.id == id {
|
||||
r.isLearner = isLearner
|
||||
}
|
||||
|
||||
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
|
||||
// When a node is first added, we should mark it as recently active.
|
||||
// Otherwise, CheckQuorum may cause us to step down if it is invoked
|
||||
// before the added node has a chance to communicate with us.
|
||||
r.prs[id].RecentActive = true
|
||||
pr = r.getProgress(id)
|
||||
pr.RecentActive = true
|
||||
}
|
||||
|
||||
func (r *raft) removeNode(id uint64) {
|
||||
@ -1209,7 +1309,7 @@ func (r *raft) removeNode(id uint64) {
|
||||
r.pendingConf = false
|
||||
|
||||
// do not try to commit or abort transferring if there is no nodes in the cluster.
|
||||
if len(r.prs) == 0 {
|
||||
if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@ -1226,12 +1326,22 @@ func (r *raft) removeNode(id uint64) {
|
||||
|
||||
func (r *raft) resetPendingConf() { r.pendingConf = false }
|
||||
|
||||
func (r *raft) setProgress(id, match, next uint64) {
|
||||
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
|
||||
func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
|
||||
if !isLearner {
|
||||
delete(r.learnerPrs, id)
|
||||
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok := r.prs[id]; ok {
|
||||
panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id))
|
||||
}
|
||||
r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
|
||||
}
|
||||
|
||||
func (r *raft) delProgress(id uint64) {
|
||||
delete(r.prs, id)
|
||||
delete(r.learnerPrs, id)
|
||||
}
|
||||
|
||||
func (r *raft) loadState(state pb.HardState) {
|
||||
@ -1261,18 +1371,18 @@ func (r *raft) resetRandomizedElectionTimeout() {
|
||||
func (r *raft) checkQuorumActive() bool {
|
||||
var act int
|
||||
|
||||
for id := range r.prs {
|
||||
r.forEachProgress(func(id uint64, pr *Progress) {
|
||||
if id == r.id { // self is always active
|
||||
act++
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
if r.prs[id].RecentActive {
|
||||
if pr.RecentActive && !pr.IsLearner {
|
||||
act++
|
||||
}
|
||||
|
||||
r.prs[id].RecentActive = false
|
||||
}
|
||||
pr.RecentActive = false
|
||||
})
|
||||
|
||||
return act >= r.quorum()
|
||||
}
|
||||
|
@ -348,6 +348,91 @@ func testLeaderElection(t *testing.T, preVote bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLearnerElectionTimeout verfies that the leader should not start election even
|
||||
// when times out.
|
||||
func TestLearnerElectionTimeout(t *testing.T) {
|
||||
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
|
||||
n1.becomeFollower(1, None)
|
||||
n2.becomeFollower(1, None)
|
||||
|
||||
// n2 is learner. Learner should not start election even when times out.
|
||||
setRandomizedElectionTimeout(n2, n2.electionTimeout)
|
||||
for i := 0; i < n2.electionTimeout; i++ {
|
||||
n2.tick()
|
||||
}
|
||||
|
||||
if n2.state != StateFollower {
|
||||
t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLearnerPromotion verifies that the leaner should not election until
|
||||
// it is promoted to a normal peer.
|
||||
func TestLearnerPromotion(t *testing.T) {
|
||||
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
|
||||
n1.becomeFollower(1, None)
|
||||
n2.becomeFollower(1, None)
|
||||
|
||||
nt := newNetwork(n1, n2)
|
||||
|
||||
if n1.state == StateLeader {
|
||||
t.Error("peer 1 state is leader, want not", n1.state)
|
||||
}
|
||||
|
||||
// n1 should become leader
|
||||
setRandomizedElectionTimeout(n1, n1.electionTimeout)
|
||||
for i := 0; i < n1.electionTimeout; i++ {
|
||||
n1.tick()
|
||||
}
|
||||
|
||||
if n1.state != StateLeader {
|
||||
t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader)
|
||||
}
|
||||
if n2.state != StateFollower {
|
||||
t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower)
|
||||
}
|
||||
|
||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||
|
||||
n1.addNode(2)
|
||||
n2.addNode(2)
|
||||
if n2.isLearner {
|
||||
t.Error("peer 2 is learner, want not")
|
||||
}
|
||||
|
||||
// n2 start election, should become leader
|
||||
setRandomizedElectionTimeout(n2, n2.electionTimeout)
|
||||
for i := 0; i < n2.electionTimeout; i++ {
|
||||
n2.tick()
|
||||
}
|
||||
|
||||
nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
|
||||
|
||||
if n1.state != StateFollower {
|
||||
t.Errorf("peer 1 state: %s, want %s", n1.state, StateFollower)
|
||||
}
|
||||
if n2.state != StateLeader {
|
||||
t.Errorf("peer 2 state: %s, want %s", n2.state, StateLeader)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLearnerCannotVote checks that a learner can't vote even it receives a valid Vote request.
|
||||
func TestLearnerCannotVote(t *testing.T) {
|
||||
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
|
||||
n2.becomeFollower(1, None)
|
||||
|
||||
n2.Step(pb.Message{From: 1, To: 2, Term: 2, Type: pb.MsgVote, LogTerm: 11, Index: 11})
|
||||
|
||||
if len(n2.msgs) != 0 {
|
||||
t.Errorf("expect learner not to vote, but received %v messages", n2.msgs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaderCycle(t *testing.T) {
|
||||
testLeaderCycle(t, false)
|
||||
}
|
||||
@ -600,6 +685,47 @@ func TestLogReplication(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLearnerLogReplication tests that a learner can receive entries from the leader.
|
||||
func TestLearnerLogReplication(t *testing.T) {
|
||||
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
|
||||
nt := newNetwork(n1, n2)
|
||||
|
||||
n1.becomeFollower(1, None)
|
||||
n2.becomeFollower(1, None)
|
||||
|
||||
setRandomizedElectionTimeout(n1, n1.electionTimeout)
|
||||
for i := 0; i < n1.electionTimeout; i++ {
|
||||
n1.tick()
|
||||
}
|
||||
|
||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||
|
||||
// n1 is leader and n2 is learner
|
||||
if n1.state != StateLeader {
|
||||
t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader)
|
||||
}
|
||||
if !n2.isLearner {
|
||||
t.Error("peer 2 state: not learner, want yes")
|
||||
}
|
||||
|
||||
nextCommitted := n1.raftLog.committed + 1
|
||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||
if n1.raftLog.committed != nextCommitted {
|
||||
t.Errorf("peer 1 wants committed to %d, but still %d", nextCommitted, n1.raftLog.committed)
|
||||
}
|
||||
|
||||
if n1.raftLog.committed != n2.raftLog.committed {
|
||||
t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
|
||||
}
|
||||
|
||||
match := n1.getProgress(2).Match
|
||||
if match != n2.raftLog.committed {
|
||||
t.Errorf("progresss 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSingleNodeCommit(t *testing.T) {
|
||||
tt := newNetwork(nil)
|
||||
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||
@ -1058,7 +1184,7 @@ func TestCommit(t *testing.T) {
|
||||
|
||||
sm := newTestRaft(1, []uint64{1}, 5, 1, storage)
|
||||
for j := 0; j < len(tt.matches); j++ {
|
||||
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1)
|
||||
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
|
||||
}
|
||||
sm.maybeCommit()
|
||||
if g := sm.raftLog.committed; g != tt.w {
|
||||
@ -2326,6 +2452,130 @@ func TestRestore(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRestoreWithLearner restores a snapshot which contains learners.
|
||||
func TestRestoreWithLearner(t *testing.T) {
|
||||
s := pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{
|
||||
Index: 11, // magic number
|
||||
Term: 11, // magic number
|
||||
ConfState: pb.ConfState{Nodes: []uint64{1, 2}, Learners: []uint64{3}},
|
||||
},
|
||||
}
|
||||
|
||||
storage := NewMemoryStorage()
|
||||
sm := newTestLearnerRaft(3, []uint64{1, 2}, []uint64{3}, 10, 1, storage)
|
||||
if ok := sm.restore(s); !ok {
|
||||
t.Error("restore fail, want succeed")
|
||||
}
|
||||
|
||||
if sm.raftLog.lastIndex() != s.Metadata.Index {
|
||||
t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
|
||||
}
|
||||
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
|
||||
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
|
||||
}
|
||||
sg := sm.nodes()
|
||||
if len(sg) != len(s.Metadata.ConfState.Nodes)+len(s.Metadata.ConfState.Learners) {
|
||||
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState)
|
||||
}
|
||||
for _, n := range s.Metadata.ConfState.Nodes {
|
||||
if sm.prs[n].IsLearner {
|
||||
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], false)
|
||||
}
|
||||
}
|
||||
for _, n := range s.Metadata.ConfState.Learners {
|
||||
if !sm.learnerPrs[n].IsLearner {
|
||||
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], true)
|
||||
}
|
||||
}
|
||||
|
||||
if ok := sm.restore(s); ok {
|
||||
t.Error("restore succeed, want fail")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRestoreInvalidLearner verfies that a normal peer can't become learner again
|
||||
// when restores snapshot.
|
||||
func TestRestoreInvalidLearner(t *testing.T) {
|
||||
s := pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{
|
||||
Index: 11, // magic number
|
||||
Term: 11, // magic number
|
||||
ConfState: pb.ConfState{Nodes: []uint64{1, 2}, Learners: []uint64{3}},
|
||||
},
|
||||
}
|
||||
|
||||
storage := NewMemoryStorage()
|
||||
sm := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, storage)
|
||||
|
||||
if sm.isLearner {
|
||||
t.Errorf("%x is learner, want not", sm.id)
|
||||
}
|
||||
if ok := sm.restore(s); ok {
|
||||
t.Error("restore succeed, want fail")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRestoreLearnerPromotion checks that a learner can become to a follower after
|
||||
// restoring snapshot.
|
||||
func TestRestoreLearnerPromotion(t *testing.T) {
|
||||
s := pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{
|
||||
Index: 11, // magic number
|
||||
Term: 11, // magic number
|
||||
ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
|
||||
},
|
||||
}
|
||||
|
||||
storage := NewMemoryStorage()
|
||||
sm := newTestLearnerRaft(3, []uint64{1, 2}, []uint64{3}, 10, 1, storage)
|
||||
|
||||
if !sm.isLearner {
|
||||
t.Errorf("%x is not learner, want yes", sm.id)
|
||||
}
|
||||
|
||||
if ok := sm.restore(s); !ok {
|
||||
t.Error("restore fail, want succeed")
|
||||
}
|
||||
|
||||
if sm.isLearner {
|
||||
t.Errorf("%x is learner, want not", sm.id)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLearnerReceiveSnapshot tests that a learner can receive a snpahost from leader
|
||||
func TestLearnerReceiveSnapshot(t *testing.T) {
|
||||
// restore the state machine from a snapshot so it has a compacted log and a snapshot
|
||||
s := pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{
|
||||
Index: 11, // magic number
|
||||
Term: 11, // magic number
|
||||
ConfState: pb.ConfState{Nodes: []uint64{1}, Learners: []uint64{2}},
|
||||
},
|
||||
}
|
||||
|
||||
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
|
||||
n1.restore(s)
|
||||
|
||||
// Force set n1 appplied index.
|
||||
n1.raftLog.appliedTo(n1.raftLog.committed)
|
||||
|
||||
nt := newNetwork(n1, n2)
|
||||
|
||||
setRandomizedElectionTimeout(n1, n1.electionTimeout)
|
||||
for i := 0; i < n1.electionTimeout; i++ {
|
||||
n1.tick()
|
||||
}
|
||||
|
||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||
|
||||
if n2.raftLog.committed != n1.raftLog.committed {
|
||||
t.Errorf("peer 2 must commit to %d, but %d", n1.raftLog.committed, n2.raftLog.committed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestoreIgnoreSnapshot(t *testing.T) {
|
||||
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
|
||||
commit := uint64(1)
|
||||
@ -2569,6 +2819,24 @@ func TestAddNode(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestAddLearner tests that addLearner could update pendingConf and nodes correctly.
|
||||
func TestAddLearner(t *testing.T) {
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
r.pendingConf = true
|
||||
r.addLearner(2)
|
||||
if r.pendingConf {
|
||||
t.Errorf("pendingConf = %v, want false", r.pendingConf)
|
||||
}
|
||||
nodes := r.nodes()
|
||||
wnodes := []uint64{1, 2}
|
||||
if !reflect.DeepEqual(nodes, wnodes) {
|
||||
t.Errorf("nodes = %v, want %v", nodes, wnodes)
|
||||
}
|
||||
if !r.learnerPrs[2].IsLearner {
|
||||
t.Errorf("node 2 is learner %t, want %t", r.prs[2].IsLearner, true)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAddNodeCheckQuorum tests that addNode does not trigger a leader election
|
||||
// immediately when checkQuorum is set.
|
||||
func TestAddNodeCheckQuorum(t *testing.T) {
|
||||
@ -2626,6 +2894,27 @@ func TestRemoveNode(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRemoveLearner tests that removeNode could update pendingConf, nodes and
|
||||
// and removed list correctly.
|
||||
func TestRemoveLearner(t *testing.T) {
|
||||
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
r.pendingConf = true
|
||||
r.removeNode(2)
|
||||
if r.pendingConf {
|
||||
t.Errorf("pendingConf = %v, want false", r.pendingConf)
|
||||
}
|
||||
w := []uint64{1}
|
||||
if g := r.nodes(); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("nodes = %v, want %v", g, w)
|
||||
}
|
||||
|
||||
// remove all nodes from cluster
|
||||
r.removeNode(1)
|
||||
w = []uint64{}
|
||||
if g := r.nodes(); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("nodes = %v, want %v", g, w)
|
||||
}
|
||||
}
|
||||
func TestPromotable(t *testing.T) {
|
||||
id := uint64(1)
|
||||
tests := []struct {
|
||||
@ -3338,10 +3627,19 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
|
||||
sm := newRaft(cfg)
|
||||
npeers[id] = sm
|
||||
case *raft:
|
||||
learners := make(map[uint64]bool, len(v.learnerPrs))
|
||||
for i := range v.learnerPrs {
|
||||
learners[i] = true
|
||||
}
|
||||
v.id = id
|
||||
v.prs = make(map[uint64]*Progress)
|
||||
v.learnerPrs = make(map[uint64]*Progress)
|
||||
for i := 0; i < size; i++ {
|
||||
v.prs[peerAddrs[i]] = &Progress{}
|
||||
if _, ok := learners[peerAddrs[i]]; ok {
|
||||
v.learnerPrs[peerAddrs[i]] = &Progress{IsLearner: true}
|
||||
} else {
|
||||
v.prs[peerAddrs[i]] = &Progress{}
|
||||
}
|
||||
}
|
||||
v.reset(v.Term)
|
||||
npeers[id] = v
|
||||
@ -3462,3 +3760,9 @@ func newTestConfig(id uint64, peers []uint64, election, heartbeat int, storage S
|
||||
func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
|
||||
return newRaft(newTestConfig(id, peers, election, heartbeat, storage))
|
||||
}
|
||||
|
||||
func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election, heartbeat int, storage Storage) *raft {
|
||||
cfg := newTestConfig(id, peers, election, heartbeat, storage)
|
||||
cfg.learners = learners
|
||||
return newRaft(cfg)
|
||||
}
|
||||
|
@ -162,20 +162,23 @@ func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft,
|
||||
type ConfChangeType int32
|
||||
|
||||
const (
|
||||
ConfChangeAddNode ConfChangeType = 0
|
||||
ConfChangeRemoveNode ConfChangeType = 1
|
||||
ConfChangeUpdateNode ConfChangeType = 2
|
||||
ConfChangeAddNode ConfChangeType = 0
|
||||
ConfChangeRemoveNode ConfChangeType = 1
|
||||
ConfChangeUpdateNode ConfChangeType = 2
|
||||
ConfChangeAddLearnerNode ConfChangeType = 3
|
||||
)
|
||||
|
||||
var ConfChangeType_name = map[int32]string{
|
||||
0: "ConfChangeAddNode",
|
||||
1: "ConfChangeRemoveNode",
|
||||
2: "ConfChangeUpdateNode",
|
||||
3: "ConfChangeAddLearnerNode",
|
||||
}
|
||||
var ConfChangeType_value = map[string]int32{
|
||||
"ConfChangeAddNode": 0,
|
||||
"ConfChangeRemoveNode": 1,
|
||||
"ConfChangeUpdateNode": 2,
|
||||
"ConfChangeAddNode": 0,
|
||||
"ConfChangeRemoveNode": 1,
|
||||
"ConfChangeUpdateNode": 2,
|
||||
"ConfChangeAddLearnerNode": 3,
|
||||
}
|
||||
|
||||
func (x ConfChangeType) Enum() *ConfChangeType {
|
||||
@ -267,6 +270,7 @@ func (*HardState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []in
|
||||
|
||||
type ConfState struct {
|
||||
Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"`
|
||||
Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
@ -537,6 +541,13 @@ func (m *ConfState) MarshalTo(dAtA []byte) (int, error) {
|
||||
i = encodeVarintRaft(dAtA, i, uint64(num))
|
||||
}
|
||||
}
|
||||
if len(m.Learners) > 0 {
|
||||
for _, num := range m.Learners {
|
||||
dAtA[i] = 0x10
|
||||
i++
|
||||
i = encodeVarintRaft(dAtA, i, uint64(num))
|
||||
}
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
i += copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
@ -700,6 +711,11 @@ func (m *ConfState) Size() (n int) {
|
||||
n += 1 + sovRaft(uint64(e))
|
||||
}
|
||||
}
|
||||
if len(m.Learners) > 0 {
|
||||
for _, e := range m.Learners {
|
||||
n += 1 + sovRaft(uint64(e))
|
||||
}
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
@ -1619,6 +1635,68 @@ func (m *ConfState) Unmarshal(dAtA []byte) error {
|
||||
} else {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Nodes", wireType)
|
||||
}
|
||||
case 2:
|
||||
if wireType == 0 {
|
||||
var v uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRaft
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
v |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
m.Learners = append(m.Learners, v)
|
||||
} else if wireType == 2 {
|
||||
var packedLen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRaft
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
packedLen |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if packedLen < 0 {
|
||||
return ErrInvalidLengthRaft
|
||||
}
|
||||
postIndex := iNdEx + packedLen
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
for iNdEx < postIndex {
|
||||
var v uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRaft
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
v |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
m.Learners = append(m.Learners, v)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Learners", wireType)
|
||||
}
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipRaft(dAtA[iNdEx:])
|
||||
@ -1888,55 +1966,56 @@ var (
|
||||
func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) }
|
||||
|
||||
var fileDescriptorRaft = []byte{
|
||||
// 790 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0xdb, 0x46,
|
||||
0x10, 0x16, 0x29, 0xea, 0x6f, 0x28, 0xcb, 0xab, 0xb5, 0x5a, 0x2c, 0x0c, 0x43, 0x55, 0x85, 0x1e,
|
||||
0x04, 0x17, 0x76, 0x5b, 0x1d, 0x7a, 0xe8, 0xcd, 0x96, 0x0a, 0x58, 0x40, 0x65, 0xb8, 0xb2, 0xdc,
|
||||
0x43, 0x83, 0x20, 0x58, 0x8b, 0x2b, 0x4a, 0x89, 0xc9, 0x25, 0x96, 0x2b, 0xc7, 0xbe, 0x04, 0x79,
|
||||
0x80, 0x3c, 0x40, 0x2e, 0x79, 0x1f, 0x1f, 0x0d, 0xe4, 0x1e, 0xc4, 0xce, 0x8b, 0x04, 0xbb, 0x5c,
|
||||
0x4a, 0x94, 0x74, 0xdb, 0xf9, 0xbe, 0xe1, 0xcc, 0x37, 0xdf, 0xce, 0x12, 0x40, 0xd0, 0xa9, 0x3c,
|
||||
0x8e, 0x04, 0x97, 0x1c, 0x17, 0xd5, 0x39, 0xba, 0xde, 0x6f, 0xf8, 0xdc, 0xe7, 0x1a, 0xfa, 0x4d,
|
||||
0x9d, 0x12, 0xb6, 0xfd, 0x0e, 0x0a, 0x7f, 0x87, 0x52, 0xdc, 0xe3, 0x5f, 0xc1, 0x19, 0xdf, 0x47,
|
||||
0x8c, 0x58, 0x2d, 0xab, 0x53, 0xeb, 0xd6, 0x8f, 0x93, 0xaf, 0x8e, 0x35, 0xa9, 0x88, 0x53, 0xe7,
|
||||
0xe1, 0xcb, 0x4f, 0xb9, 0x91, 0x4e, 0xc2, 0x04, 0x9c, 0x31, 0x13, 0x01, 0xb1, 0x5b, 0x56, 0xc7,
|
||||
0x59, 0x32, 0x4c, 0x04, 0x78, 0x1f, 0x0a, 0x83, 0xd0, 0x63, 0x77, 0x24, 0x9f, 0xa1, 0x12, 0x08,
|
||||
0x63, 0x70, 0xfa, 0x54, 0x52, 0xe2, 0xb4, 0xac, 0x4e, 0x75, 0xa4, 0xcf, 0xed, 0xf7, 0x16, 0xa0,
|
||||
0xcb, 0x90, 0x46, 0xf1, 0x8c, 0xcb, 0x21, 0x93, 0xd4, 0xa3, 0x92, 0xe2, 0x3f, 0x01, 0x26, 0x3c,
|
||||
0x9c, 0xbe, 0x8a, 0x25, 0x95, 0x89, 0x22, 0x77, 0xa5, 0xa8, 0xc7, 0xc3, 0xe9, 0xa5, 0x22, 0x4c,
|
||||
0xf1, 0xca, 0x24, 0x05, 0x54, 0xf3, 0xb9, 0x6e, 0x9e, 0xd5, 0x95, 0x40, 0x4a, 0xb2, 0x54, 0x92,
|
||||
0xb3, 0xba, 0x34, 0xd2, 0xfe, 0x1f, 0xca, 0xa9, 0x02, 0x25, 0x51, 0x29, 0xd0, 0x3d, 0xab, 0x23,
|
||||
0x7d, 0xc6, 0x7f, 0x41, 0x39, 0x30, 0xca, 0x74, 0x61, 0xb7, 0x4b, 0x52, 0x2d, 0x9b, 0xca, 0x4d,
|
||||
0xdd, 0x65, 0x7e, 0xfb, 0x53, 0x1e, 0x4a, 0x43, 0x16, 0xc7, 0xd4, 0x67, 0xf8, 0x08, 0x1c, 0xb9,
|
||||
0x72, 0x78, 0x2f, 0xad, 0x61, 0xe8, 0xac, 0xc7, 0x2a, 0x0d, 0x37, 0xc0, 0x96, 0x7c, 0x6d, 0x12,
|
||||
0x5b, 0x72, 0x35, 0xc6, 0x54, 0xf0, 0x8d, 0x31, 0x14, 0xb2, 0x1c, 0xd0, 0xd9, 0x1c, 0x10, 0x37,
|
||||
0xa1, 0x74, 0xc3, 0x7d, 0x7d, 0x61, 0x85, 0x0c, 0x99, 0x82, 0x2b, 0xdb, 0x8a, 0xdb, 0xb6, 0x1d,
|
||||
0x41, 0x89, 0x85, 0x52, 0xcc, 0x59, 0x4c, 0x4a, 0xad, 0x7c, 0xc7, 0xed, 0xee, 0xac, 0x6d, 0x46,
|
||||
0x5a, 0xca, 0xe4, 0xe0, 0x03, 0x28, 0x4e, 0x78, 0x10, 0xcc, 0x25, 0x29, 0x67, 0x6a, 0x19, 0x0c,
|
||||
0x77, 0xa1, 0x1c, 0x1b, 0xc7, 0x48, 0x45, 0x3b, 0x89, 0x36, 0x9d, 0x4c, 0x1d, 0x4c, 0xf3, 0x54,
|
||||
0x45, 0xc1, 0x5e, 0xb3, 0x89, 0x24, 0xd0, 0xb2, 0x3a, 0xe5, 0xb4, 0x62, 0x82, 0xe1, 0x5f, 0x00,
|
||||
0x92, 0xd3, 0xd9, 0x3c, 0x94, 0xc4, 0xcd, 0xf4, 0xcc, 0xe0, 0x98, 0x40, 0x69, 0xc2, 0x43, 0xc9,
|
||||
0xee, 0x24, 0xa9, 0xea, 0x8b, 0x4d, 0xc3, 0xf6, 0x4b, 0xa8, 0x9c, 0x51, 0xe1, 0x25, 0xeb, 0x93,
|
||||
0x3a, 0x68, 0x6d, 0x39, 0x48, 0xc0, 0xb9, 0xe5, 0x92, 0xad, 0xef, 0xbb, 0x42, 0x32, 0x03, 0xe7,
|
||||
0xb7, 0x07, 0x6e, 0xff, 0x0c, 0x95, 0xe5, 0xba, 0xe2, 0x06, 0x14, 0x42, 0xee, 0xb1, 0x98, 0x58,
|
||||
0xad, 0x7c, 0xc7, 0x19, 0x25, 0x41, 0xfb, 0x83, 0x05, 0xa0, 0x72, 0x7a, 0x33, 0x1a, 0xfa, 0xfa,
|
||||
0xd6, 0x07, 0xfd, 0x35, 0x05, 0xf6, 0xa0, 0x8f, 0x7f, 0x37, 0x8f, 0xd3, 0xd6, 0xab, 0xf3, 0x63,
|
||||
0xf6, 0x29, 0x24, 0xdf, 0x6d, 0xbd, 0xd0, 0x03, 0x28, 0x9e, 0x73, 0x8f, 0x0d, 0xfa, 0xeb, 0xba,
|
||||
0x12, 0x4c, 0x19, 0xd2, 0x33, 0x86, 0x24, 0x8f, 0x31, 0x0d, 0x0f, 0xff, 0x80, 0xca, 0xf2, 0xc9,
|
||||
0xe3, 0x5d, 0x70, 0x75, 0x70, 0xce, 0x45, 0x40, 0x6f, 0x50, 0x0e, 0xef, 0xc1, 0xae, 0x06, 0x56,
|
||||
0x8d, 0x91, 0x75, 0xf8, 0xd9, 0x06, 0x37, 0xb3, 0xc4, 0x18, 0xa0, 0x38, 0x8c, 0xfd, 0xb3, 0x45,
|
||||
0x84, 0x72, 0xd8, 0x85, 0xd2, 0x30, 0xf6, 0x4f, 0x19, 0x95, 0xc8, 0x32, 0xc1, 0x85, 0xe0, 0x11,
|
||||
0xb2, 0x4d, 0xd6, 0x49, 0x14, 0xa1, 0x3c, 0xae, 0x01, 0x24, 0xe7, 0x11, 0x8b, 0x23, 0xe4, 0x98,
|
||||
0xc4, 0xff, 0xb8, 0x64, 0xa8, 0xa0, 0x44, 0x98, 0x40, 0xb3, 0x45, 0xc3, 0xaa, 0x85, 0x41, 0x25,
|
||||
0x8c, 0xa0, 0xaa, 0x9a, 0x31, 0x2a, 0xe4, 0xb5, 0xea, 0x52, 0xc6, 0x0d, 0x40, 0x59, 0x44, 0x7f,
|
||||
0x54, 0xc1, 0x18, 0x6a, 0xc3, 0xd8, 0xbf, 0x0a, 0x05, 0xa3, 0x93, 0x19, 0xbd, 0xbe, 0x61, 0x08,
|
||||
0x70, 0x1d, 0x76, 0x4c, 0x21, 0x75, 0x41, 0x8b, 0x18, 0xb9, 0x26, 0xad, 0x37, 0x63, 0x93, 0x37,
|
||||
0xff, 0x2e, 0xb8, 0x58, 0x04, 0xa8, 0x8a, 0x7f, 0x80, 0xfa, 0x30, 0xf6, 0xc7, 0x82, 0x86, 0xf1,
|
||||
0x94, 0x89, 0x7f, 0x18, 0xf5, 0x98, 0x40, 0x3b, 0xe6, 0xeb, 0xf1, 0x3c, 0x60, 0x7c, 0x21, 0xcf,
|
||||
0xf9, 0x5b, 0x54, 0x33, 0x62, 0x46, 0x8c, 0x7a, 0xfa, 0x87, 0x87, 0x76, 0x8d, 0x98, 0x25, 0xa2,
|
||||
0xc5, 0x20, 0x33, 0xef, 0x85, 0x60, 0x7a, 0xc4, 0xba, 0xe9, 0x6a, 0x62, 0x9d, 0x83, 0x0f, 0x5f,
|
||||
0x40, 0x6d, 0xfd, 0x7a, 0x95, 0x8e, 0x15, 0x72, 0xe2, 0x79, 0xea, 0x2e, 0x51, 0x0e, 0x13, 0x68,
|
||||
0xac, 0xe0, 0x11, 0x0b, 0xf8, 0x2d, 0xd3, 0x8c, 0xb5, 0xce, 0x5c, 0x45, 0x1e, 0x95, 0x09, 0x63,
|
||||
0x9f, 0x92, 0x87, 0xa7, 0x66, 0xee, 0xf1, 0xa9, 0x99, 0x7b, 0x78, 0x6e, 0x5a, 0x8f, 0xcf, 0x4d,
|
||||
0xeb, 0xeb, 0x73, 0xd3, 0xfa, 0xf8, 0xad, 0x99, 0xfb, 0x1e, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x30,
|
||||
0x01, 0x41, 0x3a, 0x06, 0x00, 0x00,
|
||||
// 815 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0x23, 0x45,
|
||||
0x10, 0xf6, 0x8c, 0xc7, 0x7f, 0x35, 0x8e, 0xd3, 0xa9, 0x35, 0xa8, 0x15, 0x45, 0xc6, 0xb2, 0x38,
|
||||
0x58, 0x41, 0x1b, 0x20, 0x07, 0x0e, 0x48, 0x1c, 0x36, 0x09, 0x52, 0x22, 0xad, 0xa3, 0xc5, 0x9b,
|
||||
0xe5, 0x80, 0x84, 0x50, 0xc7, 0x53, 0x9e, 0x18, 0x32, 0xd3, 0xa3, 0x9e, 0xf6, 0xb2, 0xb9, 0x20,
|
||||
0x1e, 0x80, 0x07, 0xe0, 0xc2, 0xfb, 0xe4, 0xb8, 0x12, 0x77, 0xc4, 0x86, 0x17, 0x41, 0xdd, 0xd3,
|
||||
0x63, 0xcf, 0x24, 0xb7, 0xae, 0xef, 0xab, 0xae, 0xfa, 0xea, 0xeb, 0x9a, 0x01, 0x50, 0x62, 0xa9,
|
||||
0x8f, 0x32, 0x25, 0xb5, 0xc4, 0xb6, 0x39, 0x67, 0xd7, 0xfb, 0xc3, 0x58, 0xc6, 0xd2, 0x42, 0x9f,
|
||||
0x9b, 0x53, 0xc1, 0x4e, 0x7e, 0x83, 0xd6, 0xb7, 0xa9, 0x56, 0x77, 0xf8, 0x19, 0x04, 0x57, 0x77,
|
||||
0x19, 0x71, 0x6f, 0xec, 0x4d, 0x07, 0xc7, 0x7b, 0x47, 0xc5, 0xad, 0x23, 0x4b, 0x1a, 0xe2, 0x24,
|
||||
0xb8, 0xff, 0xe7, 0x93, 0xc6, 0xdc, 0x26, 0x21, 0x87, 0xe0, 0x8a, 0x54, 0xc2, 0xfd, 0xb1, 0x37,
|
||||
0x0d, 0x36, 0x0c, 0xa9, 0x04, 0xf7, 0xa1, 0x75, 0x91, 0x46, 0xf4, 0x8e, 0x37, 0x2b, 0x54, 0x01,
|
||||
0x21, 0x42, 0x70, 0x26, 0xb4, 0xe0, 0xc1, 0xd8, 0x9b, 0xf6, 0xe7, 0xf6, 0x3c, 0xf9, 0xdd, 0x03,
|
||||
0xf6, 0x3a, 0x15, 0x59, 0x7e, 0x23, 0xf5, 0x8c, 0xb4, 0x88, 0x84, 0x16, 0xf8, 0x15, 0xc0, 0x42,
|
||||
0xa6, 0xcb, 0x9f, 0x72, 0x2d, 0x74, 0xa1, 0x28, 0xdc, 0x2a, 0x3a, 0x95, 0xe9, 0xf2, 0xb5, 0x21,
|
||||
0x5c, 0xf1, 0xde, 0xa2, 0x04, 0x4c, 0xf3, 0x95, 0x6d, 0x5e, 0xd5, 0x55, 0x40, 0x46, 0xb2, 0x36,
|
||||
0x92, 0xab, 0xba, 0x2c, 0x32, 0xf9, 0x01, 0xba, 0xa5, 0x02, 0x23, 0xd1, 0x28, 0xb0, 0x3d, 0xfb,
|
||||
0x73, 0x7b, 0xc6, 0xaf, 0xa1, 0x9b, 0x38, 0x65, 0xb6, 0x70, 0x78, 0xcc, 0x4b, 0x2d, 0x8f, 0x95,
|
||||
0xbb, 0xba, 0x9b, 0xfc, 0xc9, 0x5f, 0x4d, 0xe8, 0xcc, 0x28, 0xcf, 0x45, 0x4c, 0xf8, 0x1c, 0x02,
|
||||
0xbd, 0x75, 0xf8, 0x59, 0x59, 0xc3, 0xd1, 0x55, 0x8f, 0x4d, 0x1a, 0x0e, 0xc1, 0xd7, 0xb2, 0x36,
|
||||
0x89, 0xaf, 0xa5, 0x19, 0x63, 0xa9, 0xe4, 0xa3, 0x31, 0x0c, 0xb2, 0x19, 0x30, 0x78, 0x3c, 0x20,
|
||||
0x8e, 0xa0, 0x73, 0x2b, 0x63, 0xfb, 0x60, 0xad, 0x0a, 0x59, 0x82, 0x5b, 0xdb, 0xda, 0x4f, 0x6d,
|
||||
0x7b, 0x0e, 0x1d, 0x4a, 0xb5, 0x5a, 0x51, 0xce, 0x3b, 0xe3, 0xe6, 0x34, 0x3c, 0xde, 0xa9, 0x6d,
|
||||
0x46, 0x59, 0xca, 0xe5, 0xe0, 0x01, 0xb4, 0x17, 0x32, 0x49, 0x56, 0x9a, 0x77, 0x2b, 0xb5, 0x1c,
|
||||
0x86, 0xc7, 0xd0, 0xcd, 0x9d, 0x63, 0xbc, 0x67, 0x9d, 0x64, 0x8f, 0x9d, 0x2c, 0x1d, 0x2c, 0xf3,
|
||||
0x4c, 0x45, 0x45, 0x3f, 0xd3, 0x42, 0x73, 0x18, 0x7b, 0xd3, 0x6e, 0x59, 0xb1, 0xc0, 0xf0, 0x53,
|
||||
0x80, 0xe2, 0x74, 0xbe, 0x4a, 0x35, 0x0f, 0x2b, 0x3d, 0x2b, 0x38, 0x72, 0xe8, 0x2c, 0x64, 0xaa,
|
||||
0xe9, 0x9d, 0xe6, 0x7d, 0xfb, 0xb0, 0x65, 0x38, 0xf9, 0x11, 0x7a, 0xe7, 0x42, 0x45, 0xc5, 0xfa,
|
||||
0x94, 0x0e, 0x7a, 0x4f, 0x1c, 0xe4, 0x10, 0xbc, 0x95, 0x9a, 0xea, 0xfb, 0x6e, 0x90, 0xca, 0xc0,
|
||||
0xcd, 0xa7, 0x03, 0x4f, 0xbe, 0x81, 0xde, 0x66, 0x5d, 0x71, 0x08, 0xad, 0x54, 0x46, 0x94, 0x73,
|
||||
0x6f, 0xdc, 0x9c, 0x06, 0xf3, 0x22, 0xc0, 0x7d, 0xe8, 0xde, 0x92, 0x50, 0x29, 0xa9, 0x9c, 0xfb,
|
||||
0x96, 0xd8, 0xc4, 0x93, 0x3f, 0x3c, 0x00, 0x73, 0xff, 0xf4, 0x46, 0xa4, 0xb1, 0xdd, 0x88, 0x8b,
|
||||
0xb3, 0x9a, 0x3a, 0xff, 0xe2, 0x0c, 0xbf, 0x70, 0x1f, 0xae, 0x6f, 0xd7, 0xea, 0xe3, 0xea, 0x67,
|
||||
0x52, 0xdc, 0x7b, 0xf2, 0xf5, 0x1e, 0x40, 0xfb, 0x52, 0x46, 0x74, 0x71, 0x56, 0xd7, 0x5c, 0x60,
|
||||
0xc6, 0xac, 0x53, 0x67, 0x56, 0xf1, 0xa1, 0x96, 0xe1, 0xe1, 0x97, 0xd0, 0xdb, 0xfc, 0x0e, 0x70,
|
||||
0x17, 0x42, 0x1b, 0x5c, 0x4a, 0x95, 0x88, 0x5b, 0xd6, 0xc0, 0x67, 0xb0, 0x6b, 0x81, 0x6d, 0x63,
|
||||
0xe6, 0x1d, 0xfe, 0xed, 0x43, 0x58, 0x59, 0x70, 0x04, 0x68, 0xcf, 0xf2, 0xf8, 0x7c, 0x9d, 0xb1,
|
||||
0x06, 0x86, 0xd0, 0x99, 0xe5, 0xf1, 0x09, 0x09, 0xcd, 0x3c, 0x17, 0xbc, 0x52, 0x32, 0x63, 0xbe,
|
||||
0xcb, 0x7a, 0x91, 0x65, 0xac, 0x89, 0x03, 0x80, 0xe2, 0x3c, 0xa7, 0x3c, 0x63, 0x81, 0x4b, 0xfc,
|
||||
0x5e, 0x6a, 0x62, 0x2d, 0x23, 0xc2, 0x05, 0x96, 0x6d, 0x3b, 0xd6, 0x2c, 0x13, 0xeb, 0x20, 0x83,
|
||||
0xbe, 0x69, 0x46, 0x42, 0xe9, 0x6b, 0xd3, 0xa5, 0x8b, 0x43, 0x60, 0x55, 0xc4, 0x5e, 0xea, 0x21,
|
||||
0xc2, 0x60, 0x96, 0xc7, 0x6f, 0x52, 0x45, 0x62, 0x71, 0x23, 0xae, 0x6f, 0x89, 0x01, 0xee, 0xc1,
|
||||
0x8e, 0x2b, 0x64, 0x1e, 0x6f, 0x9d, 0xb3, 0xd0, 0xa5, 0x9d, 0xde, 0xd0, 0xe2, 0x97, 0xef, 0xd6,
|
||||
0x52, 0xad, 0x13, 0xd6, 0xc7, 0x8f, 0x60, 0x6f, 0x96, 0xc7, 0x57, 0x4a, 0xa4, 0xf9, 0x92, 0xd4,
|
||||
0x4b, 0x12, 0x11, 0x29, 0xb6, 0xe3, 0x6e, 0x5f, 0xad, 0x12, 0x92, 0x6b, 0x7d, 0x29, 0x7f, 0x65,
|
||||
0x03, 0x27, 0x66, 0x4e, 0x22, 0xb2, 0x3f, 0x43, 0xb6, 0xeb, 0xc4, 0x6c, 0x10, 0x2b, 0x86, 0xb9,
|
||||
0x79, 0x5f, 0x29, 0xb2, 0x23, 0xee, 0xb9, 0xae, 0x2e, 0xb6, 0x39, 0x78, 0x78, 0x07, 0x83, 0xfa,
|
||||
0xf3, 0x1a, 0x1d, 0x5b, 0xe4, 0x45, 0x14, 0x99, 0xb7, 0x64, 0x0d, 0xe4, 0x30, 0xdc, 0xc2, 0x73,
|
||||
0x4a, 0xe4, 0x5b, 0xb2, 0x8c, 0x57, 0x67, 0xde, 0x64, 0x91, 0xd0, 0x05, 0xe3, 0xe3, 0x01, 0xf0,
|
||||
0x5a, 0xa9, 0x97, 0xc5, 0x36, 0x5a, 0xb6, 0x79, 0xc2, 0xef, 0x3f, 0x8c, 0x1a, 0xef, 0x3f, 0x8c,
|
||||
0x1a, 0xf7, 0x0f, 0x23, 0xef, 0xfd, 0xc3, 0xc8, 0xfb, 0xf7, 0x61, 0xe4, 0xfd, 0xf9, 0xdf, 0xa8,
|
||||
0xf1, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x86, 0x52, 0x5b, 0xe0, 0x74, 0x06, 0x00, 0x00,
|
||||
}
|
||||
|
@ -76,13 +76,15 @@ message HardState {
|
||||
}
|
||||
|
||||
message ConfState {
|
||||
repeated uint64 nodes = 1;
|
||||
repeated uint64 nodes = 1;
|
||||
repeated uint64 learners = 2;
|
||||
}
|
||||
|
||||
enum ConfChangeType {
|
||||
ConfChangeAddNode = 0;
|
||||
ConfChangeRemoveNode = 1;
|
||||
ConfChangeUpdateNode = 2;
|
||||
ConfChangeAddNode = 0;
|
||||
ConfChangeRemoveNode = 1;
|
||||
ConfChangeUpdateNode = 2;
|
||||
ConfChangeAddLearnerNode = 3;
|
||||
}
|
||||
|
||||
message ConfChange {
|
||||
|
@ -175,6 +175,8 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
|
||||
switch cc.Type {
|
||||
case pb.ConfChangeAddNode:
|
||||
rn.raft.addNode(cc.NodeID)
|
||||
case pb.ConfChangeAddLearnerNode:
|
||||
rn.raft.addLearner(cc.NodeID)
|
||||
case pb.ConfChangeRemoveNode:
|
||||
rn.raft.removeNode(cc.NodeID)
|
||||
case pb.ConfChangeUpdateNode:
|
||||
@ -191,7 +193,7 @@ func (rn *RawNode) Step(m pb.Message) error {
|
||||
if IsLocalMsg(m.Type) {
|
||||
return ErrStepLocalMsg
|
||||
}
|
||||
if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m.Type) {
|
||||
if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
|
||||
return rn.raft.Step(m)
|
||||
}
|
||||
return ErrStepPeerNotFound
|
||||
|
@ -49,6 +49,10 @@ func getStatus(r *raft) Status {
|
||||
for id, p := range r.prs {
|
||||
s.Progress[id] = *p
|
||||
}
|
||||
|
||||
for id, p := range r.learnerPrs {
|
||||
s.Progress[id] = *p
|
||||
}
|
||||
}
|
||||
|
||||
return s
|
||||
|
Loading…
x
Reference in New Issue
Block a user