From e200d2a8e2e54c38d66312ec3bba4c3247f2e4fc Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 20 Oct 2014 15:07:20 -0700 Subject: [PATCH] etcdserver/raft: remove msgDenied, removedNodes, shouldStop The future plan is to do all these in etcdserver level. --- etcdserver/server.go | 22 ++++------ etcdserver/server_test.go | 38 ++---------------- raft/log.go | 11 +++-- raft/node.go | 8 ++-- raft/node_test.go | 13 +++--- raft/raft.go | 39 +----------------- raft/raft_test.go | 84 +++------------------------------------ raft/raftpb/raft.pb.go | 36 ----------------- raft/raftpb/raft.proto | 10 ++--- 9 files changed, 35 insertions(+), 226 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 68b21c3cc..08cea9a05 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -261,7 +261,7 @@ func (s *EtcdServer) run() { var syncC <-chan time.Time // snapi indicates the index of the last submitted snapshot request var snapi, appliedi uint64 - var nodes, removedNodes []uint64 + var nodes []uint64 for { select { case <-s.ticker: @@ -269,16 +269,11 @@ func (s *EtcdServer) run() { case rd := <-s.node.Ready(): if rd.SoftState != nil { nodes = rd.SoftState.Nodes - removedNodes = rd.SoftState.RemovedNodes if rd.RaftState == raft.StateLeader { syncC = s.syncTicker } else { syncC = nil } - if rd.SoftState.ShouldStop { - s.Stop() - return - } } s.storage.Save(rd.HardState, rd.Entries) @@ -290,7 +285,7 @@ func (s *EtcdServer) run() { // race them. // TODO: apply configuration change into ClusterStore. if len(rd.CommittedEntries) != 0 { - appliedi = s.apply(rd.CommittedEntries, nodes, removedNodes) + appliedi = s.apply(rd.CommittedEntries, nodes) } if rd.Snapshot.Index > snapi { @@ -512,7 +507,7 @@ func getExpirationTime(r *pb.Request) time.Time { return t } -func (s *EtcdServer) apply(es []raftpb.Entry, nodes, removedNodes []uint64) uint64 { +func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 { var applied uint64 for i := range es { e := es[i] @@ -524,7 +519,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, nodes, removedNodes []uint64) uint case raftpb.EntryConfChange: var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) - s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes, removedNodes)) + s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes)) default: panic("unexpected entry type") } @@ -576,8 +571,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } } -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes, removedNodes []uint64) error { - if err := checkConfChange(cc, nodes, removedNodes); err != nil { +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error { + if err := checkConfChange(cc, nodes); err != nil { cc.NodeID = raft.None s.node.ApplyConfChange(cc) return err @@ -599,10 +594,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes, removedNodes [ return nil } -func checkConfChange(cc raftpb.ConfChange, nodes, removedNodes []uint64) error { - if containsUint64(removedNodes, cc.NodeID) { - return ErrIDRemoved - } +func checkConfChange(cc raftpb.ConfChange, nodes []uint64) error { switch cc.Type { case raftpb.ConfChangeAddNode: if containsUint64(nodes, cc.NodeID) { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index e33b4bb30..ffb0f7102 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -383,9 +383,9 @@ func TestApplyRequest(t *testing.T) { } } +// TODO: test ErrIDRemoved func TestApplyConfChangeError(t *testing.T) { nodes := []uint64{1, 2, 3} - removedNodes := []uint64{4} tests := []struct { cc raftpb.ConfChange werr error @@ -397,20 +397,6 @@ func TestApplyConfChangeError(t *testing.T) { }, ErrIDExists, }, - { - raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: 4, - }, - ErrIDRemoved, - }, - { - raftpb.ConfChange{ - Type: raftpb.ConfChangeRemoveNode, - NodeID: 4, - }, - ErrIDRemoved, - }, { raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, @@ -424,7 +410,7 @@ func TestApplyConfChangeError(t *testing.T) { srv := &EtcdServer{ node: n, } - err := srv.applyConfChange(tt.cc, nodes, removedNodes) + err := srv.applyConfChange(tt.cc, nodes) if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } @@ -934,25 +920,7 @@ func TestRemoveMember(t *testing.T) { } } -// TestServerStopItself tests that if node sends out Ready with ShouldStop, -// server will stop. -func TestServerStopItself(t *testing.T) { - n := newReadyNode() - s := &EtcdServer{ - node: n, - store: &storeRecorder{}, - send: func(_ []raftpb.Message) {}, - storage: &storageRecorder{}, - } - s.start() - n.readyc <- raft.Ready{SoftState: &raft.SoftState{ShouldStop: true}} - - select { - case <-s.done: - case <-time.After(time.Millisecond): - t.Errorf("did not receive from closed done channel as expected") - } -} +// TODO: test server could stop itself when being removed // TODO: test wait trigger correctness in multi-server case diff --git a/raft/log.go b/raft/log.go index 2481d18cb..b386b9932 100644 --- a/raft/log.go +++ b/raft/log.go @@ -175,13 +175,12 @@ func (l *raftLog) compact(i uint64) uint64 { return uint64(len(l.ents)) } -func (l *raftLog) snap(d []byte, index, term uint64, nodes []uint64, removed []uint64) { +func (l *raftLog) snap(d []byte, index, term uint64, nodes []uint64) { l.snapshot = pb.Snapshot{ - Data: d, - Nodes: nodes, - Index: index, - Term: term, - RemovedNodes: removed, + Data: d, + Nodes: nodes, + Index: index, + Term: term, } } diff --git a/raft/node.go b/raft/node.go index f825e0350..06da88fc5 100644 --- a/raft/node.go +++ b/raft/node.go @@ -35,11 +35,9 @@ var ( // SoftState provides state that is useful for logging and debugging. // The state is volatile and does not need to be persisted to the WAL. type SoftState struct { - Lead uint64 - RaftState StateType - Nodes []uint64 - RemovedNodes []uint64 - ShouldStop bool + Lead uint64 + RaftState StateType + Nodes []uint64 } func (a *SoftState) equal(b *SoftState) bool { diff --git a/raft/node_test.go b/raft/node_test.go index cde984588..fd63abce7 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -172,7 +172,7 @@ func TestNode(t *testing.T) { } wants := []Ready{ { - SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RemovedNodes: []uint64{}, RaftState: StateLeader}, + SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader}, HardState: raftpb.HardState{Term: 1, Commit: 2}, Entries: []raftpb.Entry{ {}, @@ -248,11 +248,10 @@ func TestNodeCompact(t *testing.T) { n.Propose(ctx, []byte("foo")) w := raftpb.Snapshot{ - Term: 1, - Index: 2, // one nop + one proposal - Data: []byte("a snapshot"), - Nodes: []uint64{1}, - RemovedNodes: []uint64{}, + Term: 1, + Index: 2, // one nop + one proposal + Data: []byte("a snapshot"), + Nodes: []uint64{1}, } pkg.ForceGosched() @@ -295,9 +294,7 @@ func TestSoftStateEqual(t *testing.T) { {&SoftState{}, true}, {&SoftState{Lead: 1}, false}, {&SoftState{RaftState: StateLeader}, false}, - {&SoftState{ShouldStop: true}, false}, {&SoftState{Nodes: []uint64{1, 2}}, false}, - {&SoftState{RemovedNodes: []uint64{1, 2}}, false}, } for i, tt := range tests { if g := tt.st.equal(&SoftState{}); g != tt.we { diff --git a/raft/raft.go b/raft/raft.go index 50dcc977c..09f91b25f 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -112,9 +112,6 @@ type raft struct { // New configuration is ignored if there exists unapplied configuration. pendingConf bool - // TODO: need GC and recovery from snapshot - removed map[uint64]bool - elapsed int // number of ticks since the last msg heartbeatTimeout int electionTimeout int @@ -132,7 +129,6 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft { lead: None, raftLog: newLog(), prs: make(map[uint64]*progress), - removed: make(map[uint64]bool), electionTimeout: election, heartbeatTimeout: heartbeat, } @@ -145,10 +141,8 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft { func (r *raft) hasLeader() bool { return r.lead != None } -func (r *raft) shouldStop() bool { return r.removed[r.id] } - func (r *raft) softState() *SoftState { - return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes(), RemovedNodes: r.removedNodes(), ShouldStop: r.shouldStop()} + return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()} } func (r *raft) String() string { @@ -363,19 +357,6 @@ func (r *raft) Step(m pb.Message) error { // TODO(bmizerany): this likely allocs - prevent that. defer func() { r.Commit = r.raftLog.committed }() - if r.removed[m.From] { - if m.From != r.id { - r.send(pb.Message{To: m.From, Type: pb.MsgDenied}) - } - // TODO: return an error? - return nil - } - if m.Type == pb.MsgDenied { - r.removed[r.id] = true - // TODO: return an error? - return nil - } - if m.Type == pb.MsgHup { r.campaign() } @@ -425,7 +406,6 @@ func (r *raft) addNode(id uint64) { func (r *raft) removeNode(id uint64) { r.delProgress(id) r.pendingConf = false - r.removed[id] = true } type stepFunc func(r *raft, m pb.Message) @@ -517,10 +497,7 @@ func (r *raft) compact(index uint64, nodes []uint64, d []byte) { if index > r.raftLog.applied { panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied)) } - // We do not get the removed nodes at the given index. - // We get the removed nodes at current index. So a state machine might - // have a newer verison of removed nodes after recovery. It is OK. - r.raftLog.snap(d, index, r.raftLog.term(index), nodes, r.removedNodes()) + r.raftLog.snap(d, index, r.raftLog.term(index), nodes) r.raftLog.compact(index) } @@ -540,10 +517,6 @@ func (r *raft) restore(s pb.Snapshot) bool { r.setProgress(n, 0, r.raftLog.lastIndex()+1) } } - r.removed = make(map[uint64]bool) - for _, n := range s.RemovedNodes { - r.removed[n] = true - } return true } @@ -565,14 +538,6 @@ func (r *raft) nodes() []uint64 { return nodes } -func (r *raft) removedNodes() []uint64 { - removed := make([]uint64, 0, len(r.removed)) - for k := range r.removed { - removed = append(removed, k) - } - return removed -} - func (r *raft) setProgress(id, match, next uint64) { r.prs[id] = &progress{next: next, match: match} } diff --git a/raft/raft_test.go b/raft/raft_test.go index 45d73787c..04476a63e 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -429,13 +429,12 @@ func TestCompact(t *testing.T) { tests := []struct { compacti uint64 nodes []uint64 - removed []uint64 snapd []byte wpanic bool }{ - {1, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), false}, - {2, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), false}, - {4, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), true}, // compact out of range + {1, []uint64{1, 2, 3}, []byte("some data"), false}, + {2, []uint64{1, 2, 3}, []byte("some data"), false}, + {4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range } for i, tt := range tests { @@ -454,14 +453,9 @@ func TestCompact(t *testing.T) { applied: 2, ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}}, }, - removed: make(map[uint64]bool), - } - for _, r := range tt.removed { - sm.removeNode(r) } sm.compact(tt.compacti, tt.nodes, tt.snapd) sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes)) - sort.Sort(uint64Slice(sm.raftLog.snapshot.RemovedNodes)) if sm.raftLog.offset != tt.compacti { t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti) } @@ -471,9 +465,6 @@ func TestCompact(t *testing.T) { if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) { t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd) } - if !reflect.DeepEqual(sm.raftLog.snapshot.RemovedNodes, tt.removed) { - t.Errorf("%d: snap.removedNodes = %v, want %v", i, sm.raftLog.snapshot.RemovedNodes, tt.removed) - } }() } } @@ -912,10 +903,9 @@ func TestRecvMsgBeat(t *testing.T) { func TestRestore(t *testing.T) { s := pb.Snapshot{ - Index: 11, // magic number - Term: 11, // magic number - Nodes: []uint64{1, 2, 3}, - RemovedNodes: []uint64{4, 5}, + Index: 11, // magic number + Term: 11, // magic number + Nodes: []uint64{1, 2, 3}, } sm := newRaft(1, []uint64{1, 2}, 10, 1) @@ -930,15 +920,10 @@ func TestRestore(t *testing.T) { t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term) } sg := sm.nodes() - srn := sm.removedNodes() sort.Sort(uint64Slice(sg)) - sort.Sort(uint64Slice(srn)) if !reflect.DeepEqual(sg, s.Nodes) { t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes) } - if !reflect.DeepEqual(s.RemovedNodes, srn) { - t.Errorf("sm.RemovedNodes = %+v, want %+v", s.RemovedNodes, srn) - } if !reflect.DeepEqual(sm.raftLog.snapshot, s) { t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s) } @@ -1124,63 +1109,6 @@ func TestRemoveNode(t *testing.T) { if g := r.nodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } - wremoved := map[uint64]bool{2: true} - if !reflect.DeepEqual(r.removed, wremoved) { - t.Errorf("rmNodes = %v, want %v", r.removed, wremoved) - } -} - -// TestRecvMsgDenied tests that state machine sets the removed list when -// handling msgDenied, and does not pass it to the actual stepX function. -func TestRecvMsgDenied(t *testing.T) { - called := false - fakeStep := func(r *raft, m pb.Message) { - called = true - } - r := newRaft(1, []uint64{1, 2}, 10, 1) - r.step = fakeStep - r.Step(pb.Message{From: 2, Type: pb.MsgDenied}) - if called != false { - t.Errorf("stepFunc called = %v , want %v", called, false) - } - wremoved := map[uint64]bool{1: true} - if !reflect.DeepEqual(r.removed, wremoved) { - t.Errorf("rmNodes = %v, want %v", r.removed, wremoved) - } -} - -// TestRecvMsgFromRemovedNode tests that state machine sends correct -// messages out when handling message from removed node, and does not -// pass it to the actual stepX function. -func TestRecvMsgFromRemovedNode(t *testing.T) { - tests := []struct { - from uint64 - wmsgNum int - }{ - {1, 0}, - {2, 1}, - } - for i, tt := range tests { - called := false - fakeStep := func(r *raft, m pb.Message) { - called = true - } - r := newRaft(1, []uint64{1}, 10, 1) - r.step = fakeStep - r.removeNode(tt.from) - r.Step(pb.Message{From: tt.from, Type: pb.MsgVote}) - if called != false { - t.Errorf("#%d: stepFunc called = %v , want %v", i, called, false) - } - if len(r.msgs) != tt.wmsgNum { - t.Errorf("#%d: len(msgs) = %d, want %d", i, len(r.msgs), tt.wmsgNum) - } - for j, msg := range r.msgs { - if msg.Type != pb.MsgDenied { - t.Errorf("#%d.%d: msgType = %d, want %d", i, j, msg.Type, pb.MsgDenied) - } - } - } } func TestPromotable(t *testing.T) { diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index d82ef6164..898df719c 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -168,7 +168,6 @@ type Snapshot struct { Nodes []uint64 `protobuf:"varint,2,rep,name=nodes" json:"nodes"` Index uint64 `protobuf:"varint,3,req,name=index" json:"index"` Term uint64 `protobuf:"varint,4,req,name=term" json:"term"` - RemovedNodes []uint64 `protobuf:"varint,5,rep,name=removed_nodes" json:"removed_nodes"` XXX_unrecognized []byte `json:"-"` } @@ -419,23 +418,6 @@ func (m *Snapshot) Unmarshal(data []byte) error { break } } - case 5: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - var v uint64 - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - v |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - m.RemovedNodes = append(m.RemovedNodes, v) default: var sizeOfWire int for { @@ -891,11 +873,6 @@ func (m *Snapshot) Size() (n int) { } n += 1 + sovRaft(uint64(m.Index)) n += 1 + sovRaft(uint64(m.Term)) - if len(m.RemovedNodes) > 0 { - for _, e := range m.RemovedNodes { - n += 1 + sovRaft(uint64(e)) - } - } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1034,19 +1011,6 @@ func (m *Snapshot) MarshalTo(data []byte) (n int, err error) { data[i] = 0x20 i++ i = encodeVarintRaft(data, i, uint64(m.Term)) - if len(m.RemovedNodes) > 0 { - for _, num := range m.RemovedNodes { - data[i] = 0x28 - i++ - for num >= 1<<7 { - data[i] = uint8(uint64(num)&0x7f | 0x80) - num >>= 7 - i++ - } - data[i] = uint8(num) - i++ - } - } if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 52809fa79..47c9ec2ef 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -21,11 +21,10 @@ message Entry { } message Snapshot { - required bytes data = 1 [(gogoproto.nullable) = false]; - repeated uint64 nodes = 2 [(gogoproto.nullable) = false]; - required uint64 index = 3 [(gogoproto.nullable) = false]; - required uint64 term = 4 [(gogoproto.nullable) = false]; - repeated uint64 removed_nodes = 5 [(gogoproto.nullable) = false]; + required bytes data = 1 [(gogoproto.nullable) = false]; + repeated uint64 nodes = 2 [(gogoproto.nullable) = false]; + required uint64 index = 3 [(gogoproto.nullable) = false]; + required uint64 term = 4 [(gogoproto.nullable) = false]; } enum MessageType { @@ -37,7 +36,6 @@ enum MessageType { MsgVote = 5; MsgVoteResp = 6; MsgSnap = 7; - MsgDenied = 8; } message Message {