mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: centralize configuration change application
Put all the logic related to applying a configuration change in one place in preparation for adding joint consensus. This inspired various TODOs. I had to rewrite TestSnapshotSucceedViaAppResp since it was relying on a snapshot applied to the leader, which is now prevented.
This commit is contained in:
parent
1f40b6642f
commit
b171e1c78b
41
raft/node.go
41
raft/node.go
@ -208,7 +208,19 @@ func StartNode(c *Config, peers []Peer) Node {
|
||||
if err != nil {
|
||||
panic("unexpected marshal error")
|
||||
}
|
||||
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
|
||||
// TODO(tbg): this should append the ConfChange for the own node first
|
||||
// and also call applyConfChange below for that node first. Otherwise
|
||||
// we have a Raft group (for a little while) that doesn't have itself
|
||||
// in its config, which is bad.
|
||||
// This whole way of setting things up is rickety. The app should just
|
||||
// populate the initial ConfState appropriately and then all of this
|
||||
// goes away.
|
||||
e := pb.Entry{
|
||||
Type: pb.EntryConfChange,
|
||||
Term: 1,
|
||||
Index: r.raftLog.lastIndex() + 1,
|
||||
Data: d,
|
||||
}
|
||||
r.raftLog.append(e)
|
||||
}
|
||||
// Mark these initial entries as committed.
|
||||
@ -225,7 +237,7 @@ func StartNode(c *Config, peers []Peer) Node {
|
||||
// We do not set raftLog.applied so the application will be able
|
||||
// to observe all conf changes via Ready.CommittedEntries.
|
||||
for _, peer := range peers {
|
||||
r.addNode(peer.ID)
|
||||
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
|
||||
}
|
||||
|
||||
n := newNode()
|
||||
@ -357,35 +369,16 @@ func (n *node) run(r *raft) {
|
||||
r.Step(m)
|
||||
}
|
||||
case cc := <-n.confc:
|
||||
if cc.NodeID == None {
|
||||
select {
|
||||
case n.confstatec <- pb.ConfState{
|
||||
Nodes: r.prs.VoterNodes(),
|
||||
Learners: r.prs.LearnerNodes()}:
|
||||
case <-n.done:
|
||||
}
|
||||
break
|
||||
}
|
||||
switch cc.Type {
|
||||
case pb.ConfChangeAddNode:
|
||||
r.addNode(cc.NodeID)
|
||||
case pb.ConfChangeAddLearnerNode:
|
||||
r.addLearner(cc.NodeID)
|
||||
case pb.ConfChangeRemoveNode:
|
||||
cs := r.applyConfChange(cc)
|
||||
if _, ok := r.prs.Progress[r.id]; !ok {
|
||||
// block incoming proposal when local node is
|
||||
// removed
|
||||
if cc.NodeID == r.id {
|
||||
propc = nil
|
||||
}
|
||||
r.removeNode(cc.NodeID)
|
||||
case pb.ConfChangeUpdateNode:
|
||||
default:
|
||||
panic("unexpected conf type")
|
||||
}
|
||||
select {
|
||||
case n.confstatec <- pb.ConfState{
|
||||
Nodes: r.prs.VoterNodes(),
|
||||
Learners: r.prs.LearnerNodes()}:
|
||||
case n.confstatec <- cs:
|
||||
case <-n.done:
|
||||
}
|
||||
case <-n.tickc:
|
||||
|
200
raft/raft.go
200
raft/raft.go
@ -1322,11 +1322,51 @@ func (r *raft) handleSnapshot(m pb.Message) {
|
||||
}
|
||||
|
||||
// restore recovers the state machine from a snapshot. It restores the log and the
|
||||
// configuration of state machine.
|
||||
// configuration of state machine. If this method returns false, the snapshot was
|
||||
// ignored, either because it was obsolete or because of an error.
|
||||
func (r *raft) restore(s pb.Snapshot) bool {
|
||||
if s.Metadata.Index <= r.raftLog.committed {
|
||||
return false
|
||||
}
|
||||
if r.state != StateFollower {
|
||||
// This is defense-in-depth: if the leader somehow ended up applying a
|
||||
// snapshot, it could move into a new term without moving into a
|
||||
// follower state. This should never fire, but if it did, we'd have
|
||||
// prevented damage by returning early, so log only a loud warning.
|
||||
//
|
||||
// At the time of writing, the instance is guaranteed to be in follower
|
||||
// state when this method is called.
|
||||
r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
|
||||
r.becomeFollower(r.Term+1, None)
|
||||
return false
|
||||
}
|
||||
|
||||
// More defense-in-depth: throw away snapshot if recipient is not in the
|
||||
// config. This shouuldn't ever happen (at the time of writing) but lots of
|
||||
// code here and there assumes that r.id is in the progress tracker.
|
||||
found := false
|
||||
cs := s.Metadata.ConfState
|
||||
for _, set := range [][]uint64{
|
||||
cs.Nodes,
|
||||
cs.Learners,
|
||||
} {
|
||||
for _, id := range set {
|
||||
if id == r.id {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
r.logger.Warningf(
|
||||
"%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
|
||||
r.id, cs,
|
||||
)
|
||||
return false
|
||||
}
|
||||
|
||||
// Now go ahead and actually restore.
|
||||
|
||||
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
||||
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
||||
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||
@ -1344,26 +1384,23 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
||||
}
|
||||
}
|
||||
|
||||
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
||||
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||
|
||||
r.raftLog.restore(s)
|
||||
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
|
||||
r.restoreNode(s.Metadata.ConfState.Nodes, false)
|
||||
r.restoreNode(s.Metadata.ConfState.Learners, true)
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
|
||||
for _, n := range nodes {
|
||||
match, next := uint64(0), r.raftLog.lastIndex()+1
|
||||
if n == r.id {
|
||||
match = next - 1
|
||||
r.isLearner = isLearner
|
||||
}
|
||||
r.prs.InitProgress(n, match, next, isLearner)
|
||||
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n])
|
||||
// Reset the configuration and add the (potentially updated) peers in anew.
|
||||
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
|
||||
for _, id := range s.Metadata.ConfState.Nodes {
|
||||
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode})
|
||||
}
|
||||
for _, id := range s.Metadata.ConfState.Learners {
|
||||
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode})
|
||||
}
|
||||
|
||||
pr := r.prs.Progress[r.id]
|
||||
pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
|
||||
|
||||
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
|
||||
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||
return true
|
||||
}
|
||||
|
||||
// promotable indicates whether state machine can be promoted to leader,
|
||||
@ -1373,68 +1410,97 @@ func (r *raft) promotable() bool {
|
||||
return pr != nil && !pr.IsLearner
|
||||
}
|
||||
|
||||
func (r *raft) addNode(id uint64) {
|
||||
r.addNodeOrLearnerNode(id, false)
|
||||
}
|
||||
func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
|
||||
addNodeOrLearnerNode := func(id uint64, isLearner bool) {
|
||||
// NB: this method is intentionally hidden from view. All mutations of
|
||||
// the conf state must call applyConfChange directly.
|
||||
pr := r.prs.Progress[id]
|
||||
if pr == nil {
|
||||
r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
|
||||
} else {
|
||||
if isLearner && !pr.IsLearner {
|
||||
// Can only change Learner to Voter.
|
||||
//
|
||||
// TODO(tbg): why?
|
||||
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
|
||||
return
|
||||
}
|
||||
|
||||
func (r *raft) addLearner(id uint64) {
|
||||
r.addNodeOrLearnerNode(id, true)
|
||||
}
|
||||
if isLearner == pr.IsLearner {
|
||||
// Ignore any redundant addNode calls (which can happen because the
|
||||
// initial bootstrapping entries are applied twice).
|
||||
return
|
||||
}
|
||||
|
||||
func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
|
||||
pr := r.prs.Progress[id]
|
||||
if pr == nil {
|
||||
r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
|
||||
} else {
|
||||
if isLearner && !pr.IsLearner {
|
||||
// Can only change Learner to Voter.
|
||||
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
|
||||
return
|
||||
// Change Learner to Voter, use origin Learner progress.
|
||||
r.prs.RemoveAny(id)
|
||||
r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
|
||||
pr.IsLearner = false
|
||||
*r.prs.Progress[id] = *pr
|
||||
}
|
||||
|
||||
if isLearner == pr.IsLearner {
|
||||
// Ignore any redundant addNode calls (which can happen because the
|
||||
// initial bootstrapping entries are applied twice).
|
||||
return
|
||||
// When a node is first added, we should mark it as recently active.
|
||||
// Otherwise, CheckQuorum may cause us to step down if it is invoked
|
||||
// before the added node has had a chance to communicate with us.
|
||||
r.prs.Progress[id].RecentActive = true
|
||||
}
|
||||
|
||||
var removed int
|
||||
if cc.NodeID != None {
|
||||
switch cc.Type {
|
||||
case pb.ConfChangeAddNode:
|
||||
addNodeOrLearnerNode(cc.NodeID, false /* isLearner */)
|
||||
case pb.ConfChangeAddLearnerNode:
|
||||
addNodeOrLearnerNode(cc.NodeID, true /* isLearner */)
|
||||
case pb.ConfChangeRemoveNode:
|
||||
removed++
|
||||
r.prs.RemoveAny(cc.NodeID)
|
||||
case pb.ConfChangeUpdateNode:
|
||||
default:
|
||||
panic("unexpected conf type")
|
||||
}
|
||||
|
||||
// Change Learner to Voter, use origin Learner progress.
|
||||
r.prs.RemoveAny(id)
|
||||
r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
|
||||
pr.IsLearner = false
|
||||
*r.prs.Progress[id] = *pr
|
||||
}
|
||||
|
||||
if r.id == id {
|
||||
r.isLearner = isLearner
|
||||
// Now that the configuration is updated, handle any side effects.
|
||||
|
||||
cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()}
|
||||
pr, ok := r.prs.Progress[r.id]
|
||||
|
||||
// Update whether the node itself is a learner, resetting to false when the
|
||||
// node is removed.
|
||||
r.isLearner = ok && pr.IsLearner
|
||||
|
||||
if (!ok || r.isLearner) && r.state == StateLeader {
|
||||
// This node is leader and was removed or demoted. We prevent demotions
|
||||
// at the time writing but hypothetically we handle them the same way as
|
||||
// removing the leader: stepping down into the next Term.
|
||||
//
|
||||
// TODO(tbg): step down (for sanity) and ask follower with largest Match
|
||||
// to TimeoutNow (to avoid interruption). This might still drop some
|
||||
// proposals but it's better than nothing.
|
||||
//
|
||||
// TODO(tbg): test this branch. It is untested at the time of writing.
|
||||
return cs
|
||||
}
|
||||
|
||||
// When a node is first added, we should mark it as recently active.
|
||||
// Otherwise, CheckQuorum may cause us to step down if it is invoked
|
||||
// before the added node has a chance to communicate with us.
|
||||
r.prs.Progress[id].RecentActive = true
|
||||
}
|
||||
|
||||
func (r *raft) removeNode(id uint64) {
|
||||
r.prs.RemoveAny(id)
|
||||
|
||||
// Do not try to commit or abort transferring if the cluster is now empty.
|
||||
if len(r.prs.Voters[0]) == 0 && len(r.prs.Learners) == 0 {
|
||||
return
|
||||
// The remaining steps only make sense if this node is the leader and there
|
||||
// are other nodes.
|
||||
if r.state != StateLeader || len(cs.Nodes) == 0 {
|
||||
return cs
|
||||
}
|
||||
|
||||
// TODO(tbg): won't bad (or at least unfortunate) things happen if the
|
||||
// leader just removed itself?
|
||||
|
||||
// The quorum size is now smaller, so see if any pending entries can
|
||||
// be committed.
|
||||
if r.maybeCommit() {
|
||||
r.bcastAppend()
|
||||
if removed > 0 {
|
||||
// The quorum size may have been reduced (but not to zero), so see if
|
||||
// any pending entries can be committed.
|
||||
if r.maybeCommit() {
|
||||
r.bcastAppend()
|
||||
}
|
||||
}
|
||||
// If the removed node is the leadTransferee, then abort the leadership transferring.
|
||||
if r.state == StateLeader && r.leadTransferee == id {
|
||||
// If the the leadTransferee was removed, abort the leadership transfer.
|
||||
if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
|
||||
r.abortLeaderTransfer()
|
||||
}
|
||||
|
||||
return cs
|
||||
}
|
||||
|
||||
func (r *raft) loadState(state pb.HardState) {
|
||||
|
@ -118,30 +118,38 @@ func TestSnapshotSucceed(t *testing.T) {
|
||||
// in the past left the follower in probing status until the next log entry was
|
||||
// committed.
|
||||
func TestSnapshotSucceedViaAppResp(t *testing.T) {
|
||||
snap := pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{
|
||||
Index: 11, // magic number
|
||||
Term: 11, // magic number
|
||||
ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
|
||||
},
|
||||
}
|
||||
|
||||
s1 := NewMemoryStorage()
|
||||
n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s1)
|
||||
|
||||
// Become follower because otherwise the way this test sets things up the
|
||||
// leadership term will be 1 (which is stale). We want it to match the snap-
|
||||
// shot term in this test.
|
||||
n1.becomeFollower(snap.Metadata.Term-1, 2)
|
||||
// Create a single-node leader.
|
||||
n1 := newTestRaft(1, []uint64{1}, 10, 1, s1)
|
||||
n1.becomeCandidate()
|
||||
n1.becomeLeader()
|
||||
// We need to add a second empty entry so that we can truncate the first
|
||||
// one away.
|
||||
n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}})
|
||||
|
||||
// Apply a snapshot on the leader. This is a workaround against the fact that
|
||||
// the leader will always append an empty entry, but that empty entry works
|
||||
// against what we're trying to assert in this test, namely that a snapshot
|
||||
// at the latest committed index leaves the follower in probing state.
|
||||
// With the snapshot, the empty entry is fully committed.
|
||||
n1.restore(snap)
|
||||
rd := newReady(n1, &SoftState{}, pb.HardState{})
|
||||
s1.Append(rd.Entries)
|
||||
s1.SetHardState(rd.HardState)
|
||||
|
||||
if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp {
|
||||
t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1)
|
||||
}
|
||||
|
||||
// Force a log truncation.
|
||||
if err := s1.Compact(1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Add a follower to the group. Do this in a clandestine way for simplicity.
|
||||
// Also set up a snapshot that will be sent to the follower.
|
||||
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
|
||||
s1.snapshot = pb.Snapshot{
|
||||
Metadata: pb.SnapshotMetadata{
|
||||
ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
|
||||
Index: s1.lastIndex(),
|
||||
Term: s1.ents[len(s1.ents)-1].Term,
|
||||
},
|
||||
}
|
||||
|
||||
noMessage := pb.MessageType(-1)
|
||||
mustSend := func(from, to *raft, typ pb.MessageType) pb.Message {
|
||||
@ -151,6 +159,9 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
t.Log(DescribeMessage(msg, func([]byte) string { return "" }))
|
||||
if len(msg.Entries) > 0 {
|
||||
t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) }))
|
||||
}
|
||||
if err := to.Step(msg); err != nil {
|
||||
t.Fatalf("%v: %s", msg, err)
|
||||
}
|
||||
@ -169,7 +180,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
|
||||
|
||||
// Create the follower that will receive the snapshot.
|
||||
s2 := NewMemoryStorage()
|
||||
n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, s2)
|
||||
n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2)
|
||||
|
||||
// Let the leader probe the follower.
|
||||
if !n1.maybeSendAppend(2, true /* sendIfEmpty */) {
|
||||
@ -186,9 +197,9 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
|
||||
t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint)
|
||||
}
|
||||
|
||||
expIdx := snap.Metadata.Index
|
||||
// Leader sends snapshot due to RejectHint of zero (the storage we use here
|
||||
// has index zero compacted).
|
||||
const expIdx = 2
|
||||
// Leader sends snapshot due to RejectHint of zero (we set up the raft log
|
||||
// to start at index 2).
|
||||
if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx {
|
||||
t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index)
|
||||
}
|
||||
|
@ -356,8 +356,8 @@ func TestLearnerPromotion(t *testing.T) {
|
||||
|
||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||
|
||||
n1.addNode(2)
|
||||
n2.addNode(2)
|
||||
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
|
||||
n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
|
||||
if n2.isLearner {
|
||||
t.Error("peer 2 is learner, want not")
|
||||
}
|
||||
@ -3076,7 +3076,7 @@ func TestNewLeaderPendingConfig(t *testing.T) {
|
||||
// TestAddNode tests that addNode could update nodes correctly.
|
||||
func TestAddNode(t *testing.T) {
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
r.addNode(2)
|
||||
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
|
||||
nodes := r.prs.VoterNodes()
|
||||
wnodes := []uint64{1, 2}
|
||||
if !reflect.DeepEqual(nodes, wnodes) {
|
||||
@ -3087,7 +3087,7 @@ func TestAddNode(t *testing.T) {
|
||||
// TestAddLearner tests that addLearner could update nodes correctly.
|
||||
func TestAddLearner(t *testing.T) {
|
||||
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
r.addLearner(2)
|
||||
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode})
|
||||
nodes := r.prs.LearnerNodes()
|
||||
wnodes := []uint64{2}
|
||||
if !reflect.DeepEqual(nodes, wnodes) {
|
||||
@ -3111,7 +3111,7 @@ func TestAddNodeCheckQuorum(t *testing.T) {
|
||||
r.tick()
|
||||
}
|
||||
|
||||
r.addNode(2)
|
||||
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
|
||||
|
||||
// This tick will reach electionTimeout, which triggers a quorum check.
|
||||
r.tick()
|
||||
@ -3136,14 +3136,14 @@ func TestAddNodeCheckQuorum(t *testing.T) {
|
||||
// and removed list correctly.
|
||||
func TestRemoveNode(t *testing.T) {
|
||||
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||
r.removeNode(2)
|
||||
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode})
|
||||
w := []uint64{1}
|
||||
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("nodes = %v, want %v", g, w)
|
||||
}
|
||||
|
||||
// remove all nodes from cluster
|
||||
r.removeNode(1)
|
||||
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode})
|
||||
w = []uint64{}
|
||||
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("nodes = %v, want %v", g, w)
|
||||
@ -3154,7 +3154,7 @@ func TestRemoveNode(t *testing.T) {
|
||||
// and removed list correctly.
|
||||
func TestRemoveLearner(t *testing.T) {
|
||||
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
r.removeNode(2)
|
||||
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode})
|
||||
w := []uint64{1}
|
||||
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("nodes = %v, want %v", g, w)
|
||||
@ -3166,7 +3166,7 @@ func TestRemoveLearner(t *testing.T) {
|
||||
}
|
||||
|
||||
// remove all nodes from cluster
|
||||
r.removeNode(1)
|
||||
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode})
|
||||
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("nodes = %v, want %v", g, w)
|
||||
}
|
||||
@ -3300,7 +3300,7 @@ func TestCommitAfterRemoveNode(t *testing.T) {
|
||||
|
||||
// Apply the config change. This reduces quorum requirements so the
|
||||
// pending command can now commit.
|
||||
r.removeNode(2)
|
||||
r.applyConfChange(cc)
|
||||
ents = nextEnts(r, s)
|
||||
if len(ents) != 1 || ents[0].Type != pb.EntryNormal ||
|
||||
string(ents[0].Data) != "hello" {
|
||||
@ -3549,7 +3549,7 @@ func TestLeaderTransferRemoveNode(t *testing.T) {
|
||||
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
|
||||
}
|
||||
|
||||
lead.removeNode(3)
|
||||
lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode})
|
||||
|
||||
checkLeaderTransferState(t, lead, StateLeader, 1)
|
||||
}
|
||||
@ -3875,9 +3875,9 @@ func TestPreVoteWithCheckQuorum(t *testing.T) {
|
||||
// a MsgHup or MsgTimeoutNow.
|
||||
func TestLearnerCampaign(t *testing.T) {
|
||||
n1 := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
n1.addLearner(2)
|
||||
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode})
|
||||
n2 := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
n2.addLearner(2)
|
||||
n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode})
|
||||
nt := newNetwork(n1, n2)
|
||||
nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
|
||||
|
||||
|
@ -101,7 +101,7 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
|
||||
r.raftLog.append(ents...)
|
||||
r.raftLog.committed = uint64(len(ents))
|
||||
for _, peer := range peers {
|
||||
r.addNode(peer.ID)
|
||||
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
|
||||
}
|
||||
}
|
||||
|
||||
@ -166,21 +166,8 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
|
||||
|
||||
// ApplyConfChange applies a config change to the local node.
|
||||
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
|
||||
if cc.NodeID == None {
|
||||
return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()}
|
||||
}
|
||||
switch cc.Type {
|
||||
case pb.ConfChangeAddNode:
|
||||
rn.raft.addNode(cc.NodeID)
|
||||
case pb.ConfChangeAddLearnerNode:
|
||||
rn.raft.addLearner(cc.NodeID)
|
||||
case pb.ConfChangeRemoveNode:
|
||||
rn.raft.removeNode(cc.NodeID)
|
||||
case pb.ConfChangeUpdateNode:
|
||||
default:
|
||||
panic("unexpected conf type")
|
||||
}
|
||||
return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()}
|
||||
cs := rn.raft.applyConfChange(cc)
|
||||
return &cs
|
||||
}
|
||||
|
||||
// Step advances the state machine using the given message.
|
||||
|
Loading…
x
Reference in New Issue
Block a user