raft: add removed

The usage of removed:
1. tell removed node about its removal explicitly using msgDenied
2. prevent removed node disrupt cluster progress by launching leader election

It is set when apply node removal, or receive msgDenied.
This commit is contained in:
Yicheng Qin 2014-09-24 12:26:19 -07:00
parent b1fc0feb72
commit e4a6c9651a
7 changed files with 129 additions and 6 deletions

View File

@ -196,6 +196,10 @@ func (s *EtcdServer) run() {
} else {
syncC = nil
}
if rd.SoftState.ShouldStop {
s.Stop()
return
}
}
case <-syncC:
s.sync(defaultSyncTimeout)

View File

@ -826,6 +826,26 @@ func TestRemoveNode(t *testing.T) {
}
}
// TestServerStopItself tests that if node sends out Ready with ShouldStop,
// server will stop.
func TestServerStopItself(t *testing.T) {
n := newReadyNode()
s := &EtcdServer{
Node: n,
Store: &storeRecorder{},
Send: func(_ []raftpb.Message) {},
Storage: &storageRecorder{},
}
s.Start()
n.readyc <- raft.Ready{SoftState: &raft.SoftState{ShouldStop: true}}
select {
case <-s.done:
case <-time.After(time.Millisecond):
t.Errorf("did not receive from closed done channel as expected")
}
}
// TODO: test wait trigger correctness in multi-server case
func TestPublish(t *testing.T) {

View File

@ -75,5 +75,8 @@ raftpb.EntryConfChange will be returned. You should apply it to node through:
cc.Unmarshal(data)
n.ApplyConfChange(cc)
Note: One ID represents one unique node in a cluster. A given ID MUST be used
only once even if the old node has been removed.
*/
package raft

View File

@ -16,12 +16,13 @@ var (
// SoftState provides state that is useful for logging and debugging.
// The state is volatile and does not need to be persisted to the WAL.
type SoftState struct {
Lead int64
RaftState StateType
Lead int64
RaftState StateType
ShouldStop bool
}
func (a *SoftState) equal(b *SoftState) bool {
return a.Lead == b.Lead && a.RaftState == b.RaftState
return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop
}
// Ready encapsulates the entries and messages that are ready to read,

View File

@ -257,7 +257,24 @@ func TestCompact(t *testing.T) {
}
}
func TestIsStateEqual(t *testing.T) {
func TestSoftStateEqual(t *testing.T) {
tests := []struct {
st *SoftState
we bool
}{
{&SoftState{}, true},
{&SoftState{Lead: 1}, false},
{&SoftState{RaftState: StateLeader}, false},
{&SoftState{ShouldStop: true}, false},
}
for i, tt := range tests {
if g := tt.st.equal(&SoftState{}); g != tt.we {
t.Errorf("#%d, equal = %v, want %v", i, g, tt.we)
}
}
}
func TestIsHardStateEqual(t *testing.T) {
tests := []struct {
st raftpb.HardState
we bool

View File

@ -108,6 +108,9 @@ type raft struct {
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
// TODO: need GC and recovery from snapshot
removed map[int64]bool
elapsed int // number of ticks since the last msg
heartbeatTimeout int
electionTimeout int
@ -124,6 +127,7 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft {
lead: None,
raftLog: newLog(),
prs: make(map[int64]*progress),
removed: make(map[int64]bool),
electionTimeout: election,
heartbeatTimeout: heartbeat,
}
@ -136,8 +140,10 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft {
func (r *raft) hasLeader() bool { return r.lead != None }
func (r *raft) shouldStop() bool { return r.removed[r.id] }
func (r *raft) softState() *SoftState {
return &SoftState{Lead: r.lead, RaftState: r.state}
return &SoftState{Lead: r.lead, RaftState: r.state, ShouldStop: r.shouldStop()}
}
func (r *raft) String() string {
@ -348,6 +354,19 @@ func (r *raft) Step(m pb.Message) error {
// TODO(bmizerany): this likely allocs - prevent that.
defer func() { r.Commit = r.raftLog.committed }()
if r.removed[m.From] {
if m.From != r.id {
r.send(pb.Message{To: m.From, Type: msgDenied})
}
// TODO: return an error?
return nil
}
if m.Type == msgDenied {
r.removed[r.id] = true
// TODO: return an error?
return nil
}
if m.Type == msgHup {
r.campaign()
}
@ -393,6 +412,7 @@ func (r *raft) addNode(id int64) {
func (r *raft) removeNode(id int64) {
r.delProgress(id)
r.pendingConf = false
r.removed[id] = true
}
type stepFunc func(r *raft, m pb.Message)

View File

@ -1002,7 +1002,8 @@ func TestAddNode(t *testing.T) {
}
}
// TestRemoveNode tests that removeNode could update pendingConf and peer list correctly.
// TestRemoveNode tests that removeNode could update pendingConf, peer list,
// removed correctly.
func TestRemoveNode(t *testing.T) {
r := newRaft(1, []int64{1, 2}, 0, 0)
r.pendingConf = true
@ -1014,6 +1015,63 @@ func TestRemoveNode(t *testing.T) {
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
wremoved := map[int64]bool{2: true}
if !reflect.DeepEqual(r.removed, wremoved) {
t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
}
}
// TestRecvMsgDenied tests that state machine sets removed when handling
// msgDenied, and does not pass it to the actual stepX function.
func TestRecvMsgDenied(t *testing.T) {
called := false
fakeStep := func(r *raft, m pb.Message) {
called = true
}
r := newRaft(1, []int64{1, 2}, 0, 0)
r.step = fakeStep
r.Step(pb.Message{From: 2, Type: msgDenied})
if called != false {
t.Errorf("stepFunc called = %v , want %v", called, false)
}
wremoved := map[int64]bool{1: true}
if !reflect.DeepEqual(r.removed, wremoved) {
t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
}
}
// TestRecvMsgFromRemovedNode tests that state machine sends correct
// messages out when handling message from removed node, and does not
// pass it to the actual stepX function.
func TestRecvMsgFromRemovedNode(t *testing.T) {
tests := []struct {
from int64
wmsgNum int
}{
{1, 0},
{2, 1},
}
for i, tt := range tests {
called := false
fakeStep := func(r *raft, m pb.Message) {
called = true
}
r := newRaft(1, []int64{1}, 0, 0)
r.step = fakeStep
r.removeNode(tt.from)
r.Step(pb.Message{From: tt.from, Type: msgVote})
if called != false {
t.Errorf("#%d: stepFunc called = %v , want %v", i, called, false)
}
if len(r.msgs) != tt.wmsgNum {
t.Errorf("#%d: len(msgs) = %d, want %d", i, len(r.msgs), tt.wmsgNum)
}
for j, msg := range r.msgs {
if msg.Type != msgDenied {
t.Errorf("#%d.%d: msgType = %d, want %d", i, j, msg.Type, msgDenied)
}
}
}
}
func TestPromotable(t *testing.T) {