etcdserver/raft: remove msgDenied, removedNodes, shouldStop

The future plan is to do all these in etcdserver level.
This commit is contained in:
Yicheng Qin 2014-10-20 15:07:20 -07:00
parent 8fa3834d69
commit e200d2a8e2
9 changed files with 35 additions and 226 deletions

View File

@ -261,7 +261,7 @@ func (s *EtcdServer) run() {
var syncC <-chan time.Time
// snapi indicates the index of the last submitted snapshot request
var snapi, appliedi uint64
var nodes, removedNodes []uint64
var nodes []uint64
for {
select {
case <-s.ticker:
@ -269,16 +269,11 @@ func (s *EtcdServer) run() {
case rd := <-s.node.Ready():
if rd.SoftState != nil {
nodes = rd.SoftState.Nodes
removedNodes = rd.SoftState.RemovedNodes
if rd.RaftState == raft.StateLeader {
syncC = s.syncTicker
} else {
syncC = nil
}
if rd.SoftState.ShouldStop {
s.Stop()
return
}
}
s.storage.Save(rd.HardState, rd.Entries)
@ -290,7 +285,7 @@ func (s *EtcdServer) run() {
// race them.
// TODO: apply configuration change into ClusterStore.
if len(rd.CommittedEntries) != 0 {
appliedi = s.apply(rd.CommittedEntries, nodes, removedNodes)
appliedi = s.apply(rd.CommittedEntries, nodes)
}
if rd.Snapshot.Index > snapi {
@ -512,7 +507,7 @@ func getExpirationTime(r *pb.Request) time.Time {
return t
}
func (s *EtcdServer) apply(es []raftpb.Entry, nodes, removedNodes []uint64) uint64 {
func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 {
var applied uint64
for i := range es {
e := es[i]
@ -524,7 +519,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, nodes, removedNodes []uint64) uint
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes, removedNodes))
s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes))
default:
panic("unexpected entry type")
}
@ -576,8 +571,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
}
}
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes, removedNodes []uint64) error {
if err := checkConfChange(cc, nodes, removedNodes); err != nil {
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error {
if err := checkConfChange(cc, nodes); err != nil {
cc.NodeID = raft.None
s.node.ApplyConfChange(cc)
return err
@ -599,10 +594,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes, removedNodes [
return nil
}
func checkConfChange(cc raftpb.ConfChange, nodes, removedNodes []uint64) error {
if containsUint64(removedNodes, cc.NodeID) {
return ErrIDRemoved
}
func checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
switch cc.Type {
case raftpb.ConfChangeAddNode:
if containsUint64(nodes, cc.NodeID) {

View File

@ -383,9 +383,9 @@ func TestApplyRequest(t *testing.T) {
}
}
// TODO: test ErrIDRemoved
func TestApplyConfChangeError(t *testing.T) {
nodes := []uint64{1, 2, 3}
removedNodes := []uint64{4}
tests := []struct {
cc raftpb.ConfChange
werr error
@ -397,20 +397,6 @@ func TestApplyConfChangeError(t *testing.T) {
},
ErrIDExists,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: 4,
},
ErrIDRemoved,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: 4,
},
ErrIDRemoved,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
@ -424,7 +410,7 @@ func TestApplyConfChangeError(t *testing.T) {
srv := &EtcdServer{
node: n,
}
err := srv.applyConfChange(tt.cc, nodes, removedNodes)
err := srv.applyConfChange(tt.cc, nodes)
if err != tt.werr {
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
}
@ -934,25 +920,7 @@ func TestRemoveMember(t *testing.T) {
}
}
// TestServerStopItself tests that if node sends out Ready with ShouldStop,
// server will stop.
func TestServerStopItself(t *testing.T) {
n := newReadyNode()
s := &EtcdServer{
node: n,
store: &storeRecorder{},
send: func(_ []raftpb.Message) {},
storage: &storageRecorder{},
}
s.start()
n.readyc <- raft.Ready{SoftState: &raft.SoftState{ShouldStop: true}}
select {
case <-s.done:
case <-time.After(time.Millisecond):
t.Errorf("did not receive from closed done channel as expected")
}
}
// TODO: test server could stop itself when being removed
// TODO: test wait trigger correctness in multi-server case

View File

@ -175,13 +175,12 @@ func (l *raftLog) compact(i uint64) uint64 {
return uint64(len(l.ents))
}
func (l *raftLog) snap(d []byte, index, term uint64, nodes []uint64, removed []uint64) {
func (l *raftLog) snap(d []byte, index, term uint64, nodes []uint64) {
l.snapshot = pb.Snapshot{
Data: d,
Nodes: nodes,
Index: index,
Term: term,
RemovedNodes: removed,
Data: d,
Nodes: nodes,
Index: index,
Term: term,
}
}

View File

@ -35,11 +35,9 @@ var (
// SoftState provides state that is useful for logging and debugging.
// The state is volatile and does not need to be persisted to the WAL.
type SoftState struct {
Lead uint64
RaftState StateType
Nodes []uint64
RemovedNodes []uint64
ShouldStop bool
Lead uint64
RaftState StateType
Nodes []uint64
}
func (a *SoftState) equal(b *SoftState) bool {

View File

@ -172,7 +172,7 @@ func TestNode(t *testing.T) {
}
wants := []Ready{
{
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RemovedNodes: []uint64{}, RaftState: StateLeader},
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader},
HardState: raftpb.HardState{Term: 1, Commit: 2},
Entries: []raftpb.Entry{
{},
@ -248,11 +248,10 @@ func TestNodeCompact(t *testing.T) {
n.Propose(ctx, []byte("foo"))
w := raftpb.Snapshot{
Term: 1,
Index: 2, // one nop + one proposal
Data: []byte("a snapshot"),
Nodes: []uint64{1},
RemovedNodes: []uint64{},
Term: 1,
Index: 2, // one nop + one proposal
Data: []byte("a snapshot"),
Nodes: []uint64{1},
}
pkg.ForceGosched()
@ -295,9 +294,7 @@ func TestSoftStateEqual(t *testing.T) {
{&SoftState{}, true},
{&SoftState{Lead: 1}, false},
{&SoftState{RaftState: StateLeader}, false},
{&SoftState{ShouldStop: true}, false},
{&SoftState{Nodes: []uint64{1, 2}}, false},
{&SoftState{RemovedNodes: []uint64{1, 2}}, false},
}
for i, tt := range tests {
if g := tt.st.equal(&SoftState{}); g != tt.we {

View File

@ -112,9 +112,6 @@ type raft struct {
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
// TODO: need GC and recovery from snapshot
removed map[uint64]bool
elapsed int // number of ticks since the last msg
heartbeatTimeout int
electionTimeout int
@ -132,7 +129,6 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
lead: None,
raftLog: newLog(),
prs: make(map[uint64]*progress),
removed: make(map[uint64]bool),
electionTimeout: election,
heartbeatTimeout: heartbeat,
}
@ -145,10 +141,8 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
func (r *raft) hasLeader() bool { return r.lead != None }
func (r *raft) shouldStop() bool { return r.removed[r.id] }
func (r *raft) softState() *SoftState {
return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes(), RemovedNodes: r.removedNodes(), ShouldStop: r.shouldStop()}
return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
}
func (r *raft) String() string {
@ -363,19 +357,6 @@ func (r *raft) Step(m pb.Message) error {
// TODO(bmizerany): this likely allocs - prevent that.
defer func() { r.Commit = r.raftLog.committed }()
if r.removed[m.From] {
if m.From != r.id {
r.send(pb.Message{To: m.From, Type: pb.MsgDenied})
}
// TODO: return an error?
return nil
}
if m.Type == pb.MsgDenied {
r.removed[r.id] = true
// TODO: return an error?
return nil
}
if m.Type == pb.MsgHup {
r.campaign()
}
@ -425,7 +406,6 @@ func (r *raft) addNode(id uint64) {
func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false
r.removed[id] = true
}
type stepFunc func(r *raft, m pb.Message)
@ -517,10 +497,7 @@ func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
if index > r.raftLog.applied {
panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
}
// We do not get the removed nodes at the given index.
// We get the removed nodes at current index. So a state machine might
// have a newer verison of removed nodes after recovery. It is OK.
r.raftLog.snap(d, index, r.raftLog.term(index), nodes, r.removedNodes())
r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
r.raftLog.compact(index)
}
@ -540,10 +517,6 @@ func (r *raft) restore(s pb.Snapshot) bool {
r.setProgress(n, 0, r.raftLog.lastIndex()+1)
}
}
r.removed = make(map[uint64]bool)
for _, n := range s.RemovedNodes {
r.removed[n] = true
}
return true
}
@ -565,14 +538,6 @@ func (r *raft) nodes() []uint64 {
return nodes
}
func (r *raft) removedNodes() []uint64 {
removed := make([]uint64, 0, len(r.removed))
for k := range r.removed {
removed = append(removed, k)
}
return removed
}
func (r *raft) setProgress(id, match, next uint64) {
r.prs[id] = &progress{next: next, match: match}
}

View File

@ -429,13 +429,12 @@ func TestCompact(t *testing.T) {
tests := []struct {
compacti uint64
nodes []uint64
removed []uint64
snapd []byte
wpanic bool
}{
{1, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), false},
{2, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), false},
{4, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), true}, // compact out of range
{1, []uint64{1, 2, 3}, []byte("some data"), false},
{2, []uint64{1, 2, 3}, []byte("some data"), false},
{4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range
}
for i, tt := range tests {
@ -454,14 +453,9 @@ func TestCompact(t *testing.T) {
applied: 2,
ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
},
removed: make(map[uint64]bool),
}
for _, r := range tt.removed {
sm.removeNode(r)
}
sm.compact(tt.compacti, tt.nodes, tt.snapd)
sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
sort.Sort(uint64Slice(sm.raftLog.snapshot.RemovedNodes))
if sm.raftLog.offset != tt.compacti {
t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
}
@ -471,9 +465,6 @@ func TestCompact(t *testing.T) {
if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
}
if !reflect.DeepEqual(sm.raftLog.snapshot.RemovedNodes, tt.removed) {
t.Errorf("%d: snap.removedNodes = %v, want %v", i, sm.raftLog.snapshot.RemovedNodes, tt.removed)
}
}()
}
}
@ -912,10 +903,9 @@ func TestRecvMsgBeat(t *testing.T) {
func TestRestore(t *testing.T) {
s := pb.Snapshot{
Index: 11, // magic number
Term: 11, // magic number
Nodes: []uint64{1, 2, 3},
RemovedNodes: []uint64{4, 5},
Index: 11, // magic number
Term: 11, // magic number
Nodes: []uint64{1, 2, 3},
}
sm := newRaft(1, []uint64{1, 2}, 10, 1)
@ -930,15 +920,10 @@ func TestRestore(t *testing.T) {
t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
}
sg := sm.nodes()
srn := sm.removedNodes()
sort.Sort(uint64Slice(sg))
sort.Sort(uint64Slice(srn))
if !reflect.DeepEqual(sg, s.Nodes) {
t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
}
if !reflect.DeepEqual(s.RemovedNodes, srn) {
t.Errorf("sm.RemovedNodes = %+v, want %+v", s.RemovedNodes, srn)
}
if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
}
@ -1124,63 +1109,6 @@ func TestRemoveNode(t *testing.T) {
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
wremoved := map[uint64]bool{2: true}
if !reflect.DeepEqual(r.removed, wremoved) {
t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
}
}
// TestRecvMsgDenied tests that state machine sets the removed list when
// handling msgDenied, and does not pass it to the actual stepX function.
func TestRecvMsgDenied(t *testing.T) {
called := false
fakeStep := func(r *raft, m pb.Message) {
called = true
}
r := newRaft(1, []uint64{1, 2}, 10, 1)
r.step = fakeStep
r.Step(pb.Message{From: 2, Type: pb.MsgDenied})
if called != false {
t.Errorf("stepFunc called = %v , want %v", called, false)
}
wremoved := map[uint64]bool{1: true}
if !reflect.DeepEqual(r.removed, wremoved) {
t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
}
}
// TestRecvMsgFromRemovedNode tests that state machine sends correct
// messages out when handling message from removed node, and does not
// pass it to the actual stepX function.
func TestRecvMsgFromRemovedNode(t *testing.T) {
tests := []struct {
from uint64
wmsgNum int
}{
{1, 0},
{2, 1},
}
for i, tt := range tests {
called := false
fakeStep := func(r *raft, m pb.Message) {
called = true
}
r := newRaft(1, []uint64{1}, 10, 1)
r.step = fakeStep
r.removeNode(tt.from)
r.Step(pb.Message{From: tt.from, Type: pb.MsgVote})
if called != false {
t.Errorf("#%d: stepFunc called = %v , want %v", i, called, false)
}
if len(r.msgs) != tt.wmsgNum {
t.Errorf("#%d: len(msgs) = %d, want %d", i, len(r.msgs), tt.wmsgNum)
}
for j, msg := range r.msgs {
if msg.Type != pb.MsgDenied {
t.Errorf("#%d.%d: msgType = %d, want %d", i, j, msg.Type, pb.MsgDenied)
}
}
}
}
func TestPromotable(t *testing.T) {

View File

@ -168,7 +168,6 @@ type Snapshot struct {
Nodes []uint64 `protobuf:"varint,2,rep,name=nodes" json:"nodes"`
Index uint64 `protobuf:"varint,3,req,name=index" json:"index"`
Term uint64 `protobuf:"varint,4,req,name=term" json:"term"`
RemovedNodes []uint64 `protobuf:"varint,5,rep,name=removed_nodes" json:"removed_nodes"`
XXX_unrecognized []byte `json:"-"`
}
@ -419,23 +418,6 @@ func (m *Snapshot) Unmarshal(data []byte) error {
break
}
}
case 5:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
var v uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
v |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.RemovedNodes = append(m.RemovedNodes, v)
default:
var sizeOfWire int
for {
@ -891,11 +873,6 @@ func (m *Snapshot) Size() (n int) {
}
n += 1 + sovRaft(uint64(m.Index))
n += 1 + sovRaft(uint64(m.Term))
if len(m.RemovedNodes) > 0 {
for _, e := range m.RemovedNodes {
n += 1 + sovRaft(uint64(e))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -1034,19 +1011,6 @@ func (m *Snapshot) MarshalTo(data []byte) (n int, err error) {
data[i] = 0x20
i++
i = encodeVarintRaft(data, i, uint64(m.Term))
if len(m.RemovedNodes) > 0 {
for _, num := range m.RemovedNodes {
data[i] = 0x28
i++
for num >= 1<<7 {
data[i] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i++
}
data[i] = uint8(num)
i++
}
}
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}

View File

@ -21,11 +21,10 @@ message Entry {
}
message Snapshot {
required bytes data = 1 [(gogoproto.nullable) = false];
repeated uint64 nodes = 2 [(gogoproto.nullable) = false];
required uint64 index = 3 [(gogoproto.nullable) = false];
required uint64 term = 4 [(gogoproto.nullable) = false];
repeated uint64 removed_nodes = 5 [(gogoproto.nullable) = false];
required bytes data = 1 [(gogoproto.nullable) = false];
repeated uint64 nodes = 2 [(gogoproto.nullable) = false];
required uint64 index = 3 [(gogoproto.nullable) = false];
required uint64 term = 4 [(gogoproto.nullable) = false];
}
enum MessageType {
@ -37,7 +36,6 @@ enum MessageType {
MsgVote = 5;
MsgVoteResp = 6;
MsgSnap = 7;
MsgDenied = 8;
}
message Message {