diff --git a/etcdserver/server.go b/etcdserver/server.go index ed979bf53..7c043077f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -196,6 +196,10 @@ func (s *EtcdServer) run() { } else { syncC = nil } + if rd.SoftState.ShouldStop { + s.Stop() + return + } } case <-syncC: s.sync(defaultSyncTimeout) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a91bda504..a5de36451 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -826,6 +826,26 @@ func TestRemoveNode(t *testing.T) { } } +// TestServerStopItself tests that if node sends out Ready with ShouldStop, +// server will stop. +func TestServerStopItself(t *testing.T) { + n := newReadyNode() + s := &EtcdServer{ + Node: n, + Store: &storeRecorder{}, + Send: func(_ []raftpb.Message) {}, + Storage: &storageRecorder{}, + } + s.start() + n.readyc <- raft.Ready{SoftState: &raft.SoftState{ShouldStop: true}} + + select { + case <-s.done: + case <-time.After(time.Millisecond): + t.Errorf("did not receive from closed done channel as expected") + } +} + // TODO: test wait trigger correctness in multi-server case func TestPublish(t *testing.T) { diff --git a/raft/doc.go b/raft/doc.go index 3a0b22278..c6cc931d5 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -75,5 +75,8 @@ raftpb.EntryConfChange will be returned. You should apply it to node through: cc.Unmarshal(data) n.ApplyConfChange(cc) +Note: An ID represents a unique node in a cluster. A given ID MUST be used +only once even if the old node has been removed. + */ package raft diff --git a/raft/node.go b/raft/node.go index ed9b01a39..e853758ff 100644 --- a/raft/node.go +++ b/raft/node.go @@ -16,12 +16,13 @@ var ( // SoftState provides state that is useful for logging and debugging. // The state is volatile and does not need to be persisted to the WAL. type SoftState struct { - Lead int64 - RaftState StateType + Lead int64 + RaftState StateType + ShouldStop bool } func (a *SoftState) equal(b *SoftState) bool { - return a.Lead == b.Lead && a.RaftState == b.RaftState + return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop } // Ready encapsulates the entries and messages that are ready to read, diff --git a/raft/node_test.go b/raft/node_test.go index 41917e3e1..4843f1ccb 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -257,7 +257,24 @@ func TestCompact(t *testing.T) { } } -func TestIsStateEqual(t *testing.T) { +func TestSoftStateEqual(t *testing.T) { + tests := []struct { + st *SoftState + we bool + }{ + {&SoftState{}, true}, + {&SoftState{Lead: 1}, false}, + {&SoftState{RaftState: StateLeader}, false}, + {&SoftState{ShouldStop: true}, false}, + } + for i, tt := range tests { + if g := tt.st.equal(&SoftState{}); g != tt.we { + t.Errorf("#%d, equal = %v, want %v", i, g, tt.we) + } + } +} + +func TestIsHardStateEqual(t *testing.T) { tests := []struct { st raftpb.HardState we bool diff --git a/raft/raft.go b/raft/raft.go index b4da55f54..691993c3b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -108,6 +108,9 @@ type raft struct { // New configuration is ignored if there exists unapplied configuration. pendingConf bool + // TODO: need GC and recovery from snapshot + removed map[int64]bool + elapsed int // number of ticks since the last msg heartbeatTimeout int electionTimeout int @@ -124,6 +127,7 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft { lead: None, raftLog: newLog(), prs: make(map[int64]*progress), + removed: make(map[int64]bool), electionTimeout: election, heartbeatTimeout: heartbeat, } @@ -136,8 +140,10 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft { func (r *raft) hasLeader() bool { return r.lead != None } +func (r *raft) shouldStop() bool { return r.removed[r.id] } + func (r *raft) softState() *SoftState { - return &SoftState{Lead: r.lead, RaftState: r.state} + return &SoftState{Lead: r.lead, RaftState: r.state, ShouldStop: r.shouldStop()} } func (r *raft) String() string { @@ -348,6 +354,19 @@ func (r *raft) Step(m pb.Message) error { // TODO(bmizerany): this likely allocs - prevent that. defer func() { r.Commit = r.raftLog.committed }() + if r.removed[m.From] { + if m.From != r.id { + r.send(pb.Message{To: m.From, Type: msgDenied}) + } + // TODO: return an error? + return nil + } + if m.Type == msgDenied { + r.removed[r.id] = true + // TODO: return an error? + return nil + } + if m.Type == msgHup { r.campaign() } @@ -393,6 +412,7 @@ func (r *raft) addNode(id int64) { func (r *raft) removeNode(id int64) { r.delProgress(id) r.pendingConf = false + r.removed[id] = true } type stepFunc func(r *raft, m pb.Message) diff --git a/raft/raft_test.go b/raft/raft_test.go index d68f3068c..ef09f556b 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -986,7 +986,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) { }() } -// TestAddNode tests that addNode could update pendingConf and peer list correctly. +// TestAddNode tests that addNode could update pendingConf and nodes correctly. func TestAddNode(t *testing.T) { r := newRaft(1, []int64{1}, 0, 0) r.pendingConf = true @@ -1002,7 +1002,8 @@ func TestAddNode(t *testing.T) { } } -// TestRemoveNode tests that removeNode could update pendingConf and peer list correctly. +// TestRemoveNode tests that removeNode could update pendingConf, nodes and +// and removed list correctly. func TestRemoveNode(t *testing.T) { r := newRaft(1, []int64{1, 2}, 0, 0) r.pendingConf = true @@ -1014,6 +1015,63 @@ func TestRemoveNode(t *testing.T) { if g := r.nodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } + wremoved := map[int64]bool{2: true} + if !reflect.DeepEqual(r.removed, wremoved) { + t.Errorf("rmNodes = %v, want %v", r.removed, wremoved) + } +} + +// TestRecvMsgDenied tests that state machine sets the removed list when +// handling msgDenied, and does not pass it to the actual stepX function. +func TestRecvMsgDenied(t *testing.T) { + called := false + fakeStep := func(r *raft, m pb.Message) { + called = true + } + r := newRaft(1, []int64{1, 2}, 0, 0) + r.step = fakeStep + r.Step(pb.Message{From: 2, Type: msgDenied}) + if called != false { + t.Errorf("stepFunc called = %v , want %v", called, false) + } + wremoved := map[int64]bool{1: true} + if !reflect.DeepEqual(r.removed, wremoved) { + t.Errorf("rmNodes = %v, want %v", r.removed, wremoved) + } +} + +// TestRecvMsgFromRemovedNode tests that state machine sends correct +// messages out when handling message from removed node, and does not +// pass it to the actual stepX function. +func TestRecvMsgFromRemovedNode(t *testing.T) { + tests := []struct { + from int64 + wmsgNum int + }{ + {1, 0}, + {2, 1}, + } + for i, tt := range tests { + called := false + fakeStep := func(r *raft, m pb.Message) { + called = true + } + r := newRaft(1, []int64{1}, 0, 0) + r.step = fakeStep + r.removeNode(tt.from) + r.Step(pb.Message{From: tt.from, Type: msgVote}) + if called != false { + t.Errorf("#%d: stepFunc called = %v , want %v", i, called, false) + } + if len(r.msgs) != tt.wmsgNum { + t.Errorf("#%d: len(msgs) = %d, want %d", i, len(r.msgs), tt.wmsgNum) + } + for j, msg := range r.msgs { + if msg.Type != msgDenied { + t.Errorf("#%d.%d: msgType = %d, want %d", i, j, msg.Type, msgDenied) + } + } + } } func TestPromotable(t *testing.T) {