mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #1333 from unihorn/172
etcdserver/raft: remove msgDenied, removedNodes, shouldStop
This commit is contained in:
commit
92230cee63
@ -263,7 +263,7 @@ func (s *EtcdServer) run() {
|
|||||||
var syncC <-chan time.Time
|
var syncC <-chan time.Time
|
||||||
// snapi indicates the index of the last submitted snapshot request
|
// snapi indicates the index of the last submitted snapshot request
|
||||||
var snapi, appliedi uint64
|
var snapi, appliedi uint64
|
||||||
var nodes, removedNodes []uint64
|
var nodes []uint64
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.Ticker:
|
case <-s.Ticker:
|
||||||
@ -271,16 +271,11 @@ func (s *EtcdServer) run() {
|
|||||||
case rd := <-s.node.Ready():
|
case rd := <-s.node.Ready():
|
||||||
if rd.SoftState != nil {
|
if rd.SoftState != nil {
|
||||||
nodes = rd.SoftState.Nodes
|
nodes = rd.SoftState.Nodes
|
||||||
removedNodes = rd.SoftState.RemovedNodes
|
|
||||||
if rd.RaftState == raft.StateLeader {
|
if rd.RaftState == raft.StateLeader {
|
||||||
syncC = s.SyncTicker
|
syncC = s.SyncTicker
|
||||||
} else {
|
} else {
|
||||||
syncC = nil
|
syncC = nil
|
||||||
}
|
}
|
||||||
if rd.SoftState.ShouldStop {
|
|
||||||
s.Stop()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.storage.Save(rd.HardState, rd.Entries)
|
s.storage.Save(rd.HardState, rd.Entries)
|
||||||
@ -292,7 +287,7 @@ func (s *EtcdServer) run() {
|
|||||||
// race them.
|
// race them.
|
||||||
// TODO: apply configuration change into ClusterStore.
|
// TODO: apply configuration change into ClusterStore.
|
||||||
if len(rd.CommittedEntries) != 0 {
|
if len(rd.CommittedEntries) != 0 {
|
||||||
appliedi = s.apply(rd.CommittedEntries, nodes, removedNodes)
|
appliedi = s.apply(rd.CommittedEntries, nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rd.Snapshot.Index > snapi {
|
if rd.Snapshot.Index > snapi {
|
||||||
@ -516,7 +511,7 @@ func getExpirationTime(r *pb.Request) time.Time {
|
|||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) apply(es []raftpb.Entry, nodes, removedNodes []uint64) uint64 {
|
func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 {
|
||||||
var applied uint64
|
var applied uint64
|
||||||
for i := range es {
|
for i := range es {
|
||||||
e := es[i]
|
e := es[i]
|
||||||
@ -528,7 +523,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, nodes, removedNodes []uint64) uint
|
|||||||
case raftpb.EntryConfChange:
|
case raftpb.EntryConfChange:
|
||||||
var cc raftpb.ConfChange
|
var cc raftpb.ConfChange
|
||||||
pbutil.MustUnmarshal(&cc, e.Data)
|
pbutil.MustUnmarshal(&cc, e.Data)
|
||||||
s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes, removedNodes))
|
s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes))
|
||||||
default:
|
default:
|
||||||
panic("unexpected entry type")
|
panic("unexpected entry type")
|
||||||
}
|
}
|
||||||
@ -580,8 +575,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes, removedNodes []uint64) error {
|
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error {
|
||||||
if err := checkConfChange(cc, nodes, removedNodes); err != nil {
|
if err := checkConfChange(cc, nodes); err != nil {
|
||||||
cc.NodeID = raft.None
|
cc.NodeID = raft.None
|
||||||
s.node.ApplyConfChange(cc)
|
s.node.ApplyConfChange(cc)
|
||||||
return err
|
return err
|
||||||
@ -603,10 +598,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes, removedNodes [
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkConfChange(cc raftpb.ConfChange, nodes, removedNodes []uint64) error {
|
func checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
|
||||||
if containsUint64(removedNodes, cc.NodeID) {
|
|
||||||
return ErrIDRemoved
|
|
||||||
}
|
|
||||||
switch cc.Type {
|
switch cc.Type {
|
||||||
case raftpb.ConfChangeAddNode:
|
case raftpb.ConfChangeAddNode:
|
||||||
if containsUint64(nodes, cc.NodeID) {
|
if containsUint64(nodes, cc.NodeID) {
|
||||||
|
@ -383,9 +383,9 @@ func TestApplyRequest(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: test ErrIDRemoved
|
||||||
func TestApplyConfChangeError(t *testing.T) {
|
func TestApplyConfChangeError(t *testing.T) {
|
||||||
nodes := []uint64{1, 2, 3}
|
nodes := []uint64{1, 2, 3}
|
||||||
removedNodes := []uint64{4}
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
cc raftpb.ConfChange
|
cc raftpb.ConfChange
|
||||||
werr error
|
werr error
|
||||||
@ -397,20 +397,6 @@ func TestApplyConfChangeError(t *testing.T) {
|
|||||||
},
|
},
|
||||||
ErrIDExists,
|
ErrIDExists,
|
||||||
},
|
},
|
||||||
{
|
|
||||||
raftpb.ConfChange{
|
|
||||||
Type: raftpb.ConfChangeAddNode,
|
|
||||||
NodeID: 4,
|
|
||||||
},
|
|
||||||
ErrIDRemoved,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
raftpb.ConfChange{
|
|
||||||
Type: raftpb.ConfChangeRemoveNode,
|
|
||||||
NodeID: 4,
|
|
||||||
},
|
|
||||||
ErrIDRemoved,
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
raftpb.ConfChange{
|
raftpb.ConfChange{
|
||||||
Type: raftpb.ConfChangeRemoveNode,
|
Type: raftpb.ConfChangeRemoveNode,
|
||||||
@ -424,7 +410,7 @@ func TestApplyConfChangeError(t *testing.T) {
|
|||||||
srv := &EtcdServer{
|
srv := &EtcdServer{
|
||||||
node: n,
|
node: n,
|
||||||
}
|
}
|
||||||
err := srv.applyConfChange(tt.cc, nodes, removedNodes)
|
err := srv.applyConfChange(tt.cc, nodes)
|
||||||
if err != tt.werr {
|
if err != tt.werr {
|
||||||
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
|
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
|
||||||
}
|
}
|
||||||
@ -934,25 +920,7 @@ func TestRemoveMember(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestServerStopItself tests that if node sends out Ready with ShouldStop,
|
// TODO: test server could stop itself when being removed
|
||||||
// server will stop.
|
|
||||||
func TestServerStopItself(t *testing.T) {
|
|
||||||
n := newReadyNode()
|
|
||||||
s := &EtcdServer{
|
|
||||||
node: n,
|
|
||||||
store: &storeRecorder{},
|
|
||||||
send: func(_ []raftpb.Message) {},
|
|
||||||
storage: &storageRecorder{},
|
|
||||||
}
|
|
||||||
s.start()
|
|
||||||
n.readyc <- raft.Ready{SoftState: &raft.SoftState{ShouldStop: true}}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-s.done:
|
|
||||||
case <-time.After(time.Millisecond):
|
|
||||||
t.Errorf("did not receive from closed done channel as expected")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: test wait trigger correctness in multi-server case
|
// TODO: test wait trigger correctness in multi-server case
|
||||||
|
|
||||||
|
11
raft/log.go
11
raft/log.go
@ -175,13 +175,12 @@ func (l *raftLog) compact(i uint64) uint64 {
|
|||||||
return uint64(len(l.ents))
|
return uint64(len(l.ents))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *raftLog) snap(d []byte, index, term uint64, nodes []uint64, removed []uint64) {
|
func (l *raftLog) snap(d []byte, index, term uint64, nodes []uint64) {
|
||||||
l.snapshot = pb.Snapshot{
|
l.snapshot = pb.Snapshot{
|
||||||
Data: d,
|
Data: d,
|
||||||
Nodes: nodes,
|
Nodes: nodes,
|
||||||
Index: index,
|
Index: index,
|
||||||
Term: term,
|
Term: term,
|
||||||
RemovedNodes: removed,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,11 +35,9 @@ var (
|
|||||||
// SoftState provides state that is useful for logging and debugging.
|
// SoftState provides state that is useful for logging and debugging.
|
||||||
// The state is volatile and does not need to be persisted to the WAL.
|
// The state is volatile and does not need to be persisted to the WAL.
|
||||||
type SoftState struct {
|
type SoftState struct {
|
||||||
Lead uint64
|
Lead uint64
|
||||||
RaftState StateType
|
RaftState StateType
|
||||||
Nodes []uint64
|
Nodes []uint64
|
||||||
RemovedNodes []uint64
|
|
||||||
ShouldStop bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *SoftState) equal(b *SoftState) bool {
|
func (a *SoftState) equal(b *SoftState) bool {
|
||||||
|
@ -172,7 +172,7 @@ func TestNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
wants := []Ready{
|
wants := []Ready{
|
||||||
{
|
{
|
||||||
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RemovedNodes: []uint64{}, RaftState: StateLeader},
|
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader},
|
||||||
HardState: raftpb.HardState{Term: 1, Commit: 2},
|
HardState: raftpb.HardState{Term: 1, Commit: 2},
|
||||||
Entries: []raftpb.Entry{
|
Entries: []raftpb.Entry{
|
||||||
{},
|
{},
|
||||||
@ -248,11 +248,10 @@ func TestNodeCompact(t *testing.T) {
|
|||||||
n.Propose(ctx, []byte("foo"))
|
n.Propose(ctx, []byte("foo"))
|
||||||
|
|
||||||
w := raftpb.Snapshot{
|
w := raftpb.Snapshot{
|
||||||
Term: 1,
|
Term: 1,
|
||||||
Index: 2, // one nop + one proposal
|
Index: 2, // one nop + one proposal
|
||||||
Data: []byte("a snapshot"),
|
Data: []byte("a snapshot"),
|
||||||
Nodes: []uint64{1},
|
Nodes: []uint64{1},
|
||||||
RemovedNodes: []uint64{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pkg.ForceGosched()
|
pkg.ForceGosched()
|
||||||
@ -295,9 +294,7 @@ func TestSoftStateEqual(t *testing.T) {
|
|||||||
{&SoftState{}, true},
|
{&SoftState{}, true},
|
||||||
{&SoftState{Lead: 1}, false},
|
{&SoftState{Lead: 1}, false},
|
||||||
{&SoftState{RaftState: StateLeader}, false},
|
{&SoftState{RaftState: StateLeader}, false},
|
||||||
{&SoftState{ShouldStop: true}, false},
|
|
||||||
{&SoftState{Nodes: []uint64{1, 2}}, false},
|
{&SoftState{Nodes: []uint64{1, 2}}, false},
|
||||||
{&SoftState{RemovedNodes: []uint64{1, 2}}, false},
|
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
if g := tt.st.equal(&SoftState{}); g != tt.we {
|
if g := tt.st.equal(&SoftState{}); g != tt.we {
|
||||||
|
39
raft/raft.go
39
raft/raft.go
@ -112,9 +112,6 @@ type raft struct {
|
|||||||
// New configuration is ignored if there exists unapplied configuration.
|
// New configuration is ignored if there exists unapplied configuration.
|
||||||
pendingConf bool
|
pendingConf bool
|
||||||
|
|
||||||
// TODO: need GC and recovery from snapshot
|
|
||||||
removed map[uint64]bool
|
|
||||||
|
|
||||||
elapsed int // number of ticks since the last msg
|
elapsed int // number of ticks since the last msg
|
||||||
heartbeatTimeout int
|
heartbeatTimeout int
|
||||||
electionTimeout int
|
electionTimeout int
|
||||||
@ -132,7 +129,6 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
|
|||||||
lead: None,
|
lead: None,
|
||||||
raftLog: newLog(),
|
raftLog: newLog(),
|
||||||
prs: make(map[uint64]*progress),
|
prs: make(map[uint64]*progress),
|
||||||
removed: make(map[uint64]bool),
|
|
||||||
electionTimeout: election,
|
electionTimeout: election,
|
||||||
heartbeatTimeout: heartbeat,
|
heartbeatTimeout: heartbeat,
|
||||||
}
|
}
|
||||||
@ -145,10 +141,8 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
|
|||||||
|
|
||||||
func (r *raft) hasLeader() bool { return r.lead != None }
|
func (r *raft) hasLeader() bool { return r.lead != None }
|
||||||
|
|
||||||
func (r *raft) shouldStop() bool { return r.removed[r.id] }
|
|
||||||
|
|
||||||
func (r *raft) softState() *SoftState {
|
func (r *raft) softState() *SoftState {
|
||||||
return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes(), RemovedNodes: r.removedNodes(), ShouldStop: r.shouldStop()}
|
return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) String() string {
|
func (r *raft) String() string {
|
||||||
@ -363,19 +357,6 @@ func (r *raft) Step(m pb.Message) error {
|
|||||||
// TODO(bmizerany): this likely allocs - prevent that.
|
// TODO(bmizerany): this likely allocs - prevent that.
|
||||||
defer func() { r.Commit = r.raftLog.committed }()
|
defer func() { r.Commit = r.raftLog.committed }()
|
||||||
|
|
||||||
if r.removed[m.From] {
|
|
||||||
if m.From != r.id {
|
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgDenied})
|
|
||||||
}
|
|
||||||
// TODO: return an error?
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if m.Type == pb.MsgDenied {
|
|
||||||
r.removed[r.id] = true
|
|
||||||
// TODO: return an error?
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.Type == pb.MsgHup {
|
if m.Type == pb.MsgHup {
|
||||||
r.campaign()
|
r.campaign()
|
||||||
}
|
}
|
||||||
@ -425,7 +406,6 @@ func (r *raft) addNode(id uint64) {
|
|||||||
func (r *raft) removeNode(id uint64) {
|
func (r *raft) removeNode(id uint64) {
|
||||||
r.delProgress(id)
|
r.delProgress(id)
|
||||||
r.pendingConf = false
|
r.pendingConf = false
|
||||||
r.removed[id] = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type stepFunc func(r *raft, m pb.Message)
|
type stepFunc func(r *raft, m pb.Message)
|
||||||
@ -517,10 +497,7 @@ func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
|
|||||||
if index > r.raftLog.applied {
|
if index > r.raftLog.applied {
|
||||||
panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
|
panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
|
||||||
}
|
}
|
||||||
// We do not get the removed nodes at the given index.
|
r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
|
||||||
// We get the removed nodes at current index. So a state machine might
|
|
||||||
// have a newer verison of removed nodes after recovery. It is OK.
|
|
||||||
r.raftLog.snap(d, index, r.raftLog.term(index), nodes, r.removedNodes())
|
|
||||||
r.raftLog.compact(index)
|
r.raftLog.compact(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -540,10 +517,6 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
|||||||
r.setProgress(n, 0, r.raftLog.lastIndex()+1)
|
r.setProgress(n, 0, r.raftLog.lastIndex()+1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
r.removed = make(map[uint64]bool)
|
|
||||||
for _, n := range s.RemovedNodes {
|
|
||||||
r.removed[n] = true
|
|
||||||
}
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -565,14 +538,6 @@ func (r *raft) nodes() []uint64 {
|
|||||||
return nodes
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) removedNodes() []uint64 {
|
|
||||||
removed := make([]uint64, 0, len(r.removed))
|
|
||||||
for k := range r.removed {
|
|
||||||
removed = append(removed, k)
|
|
||||||
}
|
|
||||||
return removed
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *raft) setProgress(id, match, next uint64) {
|
func (r *raft) setProgress(id, match, next uint64) {
|
||||||
r.prs[id] = &progress{next: next, match: match}
|
r.prs[id] = &progress{next: next, match: match}
|
||||||
}
|
}
|
||||||
|
@ -429,13 +429,12 @@ func TestCompact(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
compacti uint64
|
compacti uint64
|
||||||
nodes []uint64
|
nodes []uint64
|
||||||
removed []uint64
|
|
||||||
snapd []byte
|
snapd []byte
|
||||||
wpanic bool
|
wpanic bool
|
||||||
}{
|
}{
|
||||||
{1, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), false},
|
{1, []uint64{1, 2, 3}, []byte("some data"), false},
|
||||||
{2, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), false},
|
{2, []uint64{1, 2, 3}, []byte("some data"), false},
|
||||||
{4, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), true}, // compact out of range
|
{4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
@ -454,14 +453,9 @@ func TestCompact(t *testing.T) {
|
|||||||
applied: 2,
|
applied: 2,
|
||||||
ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
|
ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
|
||||||
},
|
},
|
||||||
removed: make(map[uint64]bool),
|
|
||||||
}
|
|
||||||
for _, r := range tt.removed {
|
|
||||||
sm.removeNode(r)
|
|
||||||
}
|
}
|
||||||
sm.compact(tt.compacti, tt.nodes, tt.snapd)
|
sm.compact(tt.compacti, tt.nodes, tt.snapd)
|
||||||
sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
|
sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
|
||||||
sort.Sort(uint64Slice(sm.raftLog.snapshot.RemovedNodes))
|
|
||||||
if sm.raftLog.offset != tt.compacti {
|
if sm.raftLog.offset != tt.compacti {
|
||||||
t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
|
t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
|
||||||
}
|
}
|
||||||
@ -471,9 +465,6 @@ func TestCompact(t *testing.T) {
|
|||||||
if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
|
if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
|
||||||
t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
|
t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(sm.raftLog.snapshot.RemovedNodes, tt.removed) {
|
|
||||||
t.Errorf("%d: snap.removedNodes = %v, want %v", i, sm.raftLog.snapshot.RemovedNodes, tt.removed)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -912,10 +903,9 @@ func TestRecvMsgBeat(t *testing.T) {
|
|||||||
|
|
||||||
func TestRestore(t *testing.T) {
|
func TestRestore(t *testing.T) {
|
||||||
s := pb.Snapshot{
|
s := pb.Snapshot{
|
||||||
Index: 11, // magic number
|
Index: 11, // magic number
|
||||||
Term: 11, // magic number
|
Term: 11, // magic number
|
||||||
Nodes: []uint64{1, 2, 3},
|
Nodes: []uint64{1, 2, 3},
|
||||||
RemovedNodes: []uint64{4, 5},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := newRaft(1, []uint64{1, 2}, 10, 1)
|
sm := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||||
@ -930,15 +920,10 @@ func TestRestore(t *testing.T) {
|
|||||||
t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
|
t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
|
||||||
}
|
}
|
||||||
sg := sm.nodes()
|
sg := sm.nodes()
|
||||||
srn := sm.removedNodes()
|
|
||||||
sort.Sort(uint64Slice(sg))
|
sort.Sort(uint64Slice(sg))
|
||||||
sort.Sort(uint64Slice(srn))
|
|
||||||
if !reflect.DeepEqual(sg, s.Nodes) {
|
if !reflect.DeepEqual(sg, s.Nodes) {
|
||||||
t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
|
t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(s.RemovedNodes, srn) {
|
|
||||||
t.Errorf("sm.RemovedNodes = %+v, want %+v", s.RemovedNodes, srn)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
|
if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
|
||||||
t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
|
t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
|
||||||
}
|
}
|
||||||
@ -1124,63 +1109,6 @@ func TestRemoveNode(t *testing.T) {
|
|||||||
if g := r.nodes(); !reflect.DeepEqual(g, w) {
|
if g := r.nodes(); !reflect.DeepEqual(g, w) {
|
||||||
t.Errorf("nodes = %v, want %v", g, w)
|
t.Errorf("nodes = %v, want %v", g, w)
|
||||||
}
|
}
|
||||||
wremoved := map[uint64]bool{2: true}
|
|
||||||
if !reflect.DeepEqual(r.removed, wremoved) {
|
|
||||||
t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestRecvMsgDenied tests that state machine sets the removed list when
|
|
||||||
// handling msgDenied, and does not pass it to the actual stepX function.
|
|
||||||
func TestRecvMsgDenied(t *testing.T) {
|
|
||||||
called := false
|
|
||||||
fakeStep := func(r *raft, m pb.Message) {
|
|
||||||
called = true
|
|
||||||
}
|
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
|
||||||
r.step = fakeStep
|
|
||||||
r.Step(pb.Message{From: 2, Type: pb.MsgDenied})
|
|
||||||
if called != false {
|
|
||||||
t.Errorf("stepFunc called = %v , want %v", called, false)
|
|
||||||
}
|
|
||||||
wremoved := map[uint64]bool{1: true}
|
|
||||||
if !reflect.DeepEqual(r.removed, wremoved) {
|
|
||||||
t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestRecvMsgFromRemovedNode tests that state machine sends correct
|
|
||||||
// messages out when handling message from removed node, and does not
|
|
||||||
// pass it to the actual stepX function.
|
|
||||||
func TestRecvMsgFromRemovedNode(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
from uint64
|
|
||||||
wmsgNum int
|
|
||||||
}{
|
|
||||||
{1, 0},
|
|
||||||
{2, 1},
|
|
||||||
}
|
|
||||||
for i, tt := range tests {
|
|
||||||
called := false
|
|
||||||
fakeStep := func(r *raft, m pb.Message) {
|
|
||||||
called = true
|
|
||||||
}
|
|
||||||
r := newRaft(1, []uint64{1}, 10, 1)
|
|
||||||
r.step = fakeStep
|
|
||||||
r.removeNode(tt.from)
|
|
||||||
r.Step(pb.Message{From: tt.from, Type: pb.MsgVote})
|
|
||||||
if called != false {
|
|
||||||
t.Errorf("#%d: stepFunc called = %v , want %v", i, called, false)
|
|
||||||
}
|
|
||||||
if len(r.msgs) != tt.wmsgNum {
|
|
||||||
t.Errorf("#%d: len(msgs) = %d, want %d", i, len(r.msgs), tt.wmsgNum)
|
|
||||||
}
|
|
||||||
for j, msg := range r.msgs {
|
|
||||||
if msg.Type != pb.MsgDenied {
|
|
||||||
t.Errorf("#%d.%d: msgType = %d, want %d", i, j, msg.Type, pb.MsgDenied)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPromotable(t *testing.T) {
|
func TestPromotable(t *testing.T) {
|
||||||
|
@ -168,7 +168,6 @@ type Snapshot struct {
|
|||||||
Nodes []uint64 `protobuf:"varint,2,rep,name=nodes" json:"nodes"`
|
Nodes []uint64 `protobuf:"varint,2,rep,name=nodes" json:"nodes"`
|
||||||
Index uint64 `protobuf:"varint,3,req,name=index" json:"index"`
|
Index uint64 `protobuf:"varint,3,req,name=index" json:"index"`
|
||||||
Term uint64 `protobuf:"varint,4,req,name=term" json:"term"`
|
Term uint64 `protobuf:"varint,4,req,name=term" json:"term"`
|
||||||
RemovedNodes []uint64 `protobuf:"varint,5,rep,name=removed_nodes" json:"removed_nodes"`
|
|
||||||
XXX_unrecognized []byte `json:"-"`
|
XXX_unrecognized []byte `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -419,23 +418,6 @@ func (m *Snapshot) Unmarshal(data []byte) error {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case 5:
|
|
||||||
if wireType != 0 {
|
|
||||||
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
|
|
||||||
}
|
|
||||||
var v uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if index >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := data[index]
|
|
||||||
index++
|
|
||||||
v |= (uint64(b) & 0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
m.RemovedNodes = append(m.RemovedNodes, v)
|
|
||||||
default:
|
default:
|
||||||
var sizeOfWire int
|
var sizeOfWire int
|
||||||
for {
|
for {
|
||||||
@ -891,11 +873,6 @@ func (m *Snapshot) Size() (n int) {
|
|||||||
}
|
}
|
||||||
n += 1 + sovRaft(uint64(m.Index))
|
n += 1 + sovRaft(uint64(m.Index))
|
||||||
n += 1 + sovRaft(uint64(m.Term))
|
n += 1 + sovRaft(uint64(m.Term))
|
||||||
if len(m.RemovedNodes) > 0 {
|
|
||||||
for _, e := range m.RemovedNodes {
|
|
||||||
n += 1 + sovRaft(uint64(e))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if m.XXX_unrecognized != nil {
|
if m.XXX_unrecognized != nil {
|
||||||
n += len(m.XXX_unrecognized)
|
n += len(m.XXX_unrecognized)
|
||||||
}
|
}
|
||||||
@ -1034,19 +1011,6 @@ func (m *Snapshot) MarshalTo(data []byte) (n int, err error) {
|
|||||||
data[i] = 0x20
|
data[i] = 0x20
|
||||||
i++
|
i++
|
||||||
i = encodeVarintRaft(data, i, uint64(m.Term))
|
i = encodeVarintRaft(data, i, uint64(m.Term))
|
||||||
if len(m.RemovedNodes) > 0 {
|
|
||||||
for _, num := range m.RemovedNodes {
|
|
||||||
data[i] = 0x28
|
|
||||||
i++
|
|
||||||
for num >= 1<<7 {
|
|
||||||
data[i] = uint8(uint64(num)&0x7f | 0x80)
|
|
||||||
num >>= 7
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
data[i] = uint8(num)
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if m.XXX_unrecognized != nil {
|
if m.XXX_unrecognized != nil {
|
||||||
i += copy(data[i:], m.XXX_unrecognized)
|
i += copy(data[i:], m.XXX_unrecognized)
|
||||||
}
|
}
|
||||||
|
@ -21,11 +21,10 @@ message Entry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message Snapshot {
|
message Snapshot {
|
||||||
required bytes data = 1 [(gogoproto.nullable) = false];
|
required bytes data = 1 [(gogoproto.nullable) = false];
|
||||||
repeated uint64 nodes = 2 [(gogoproto.nullable) = false];
|
repeated uint64 nodes = 2 [(gogoproto.nullable) = false];
|
||||||
required uint64 index = 3 [(gogoproto.nullable) = false];
|
required uint64 index = 3 [(gogoproto.nullable) = false];
|
||||||
required uint64 term = 4 [(gogoproto.nullable) = false];
|
required uint64 term = 4 [(gogoproto.nullable) = false];
|
||||||
repeated uint64 removed_nodes = 5 [(gogoproto.nullable) = false];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
enum MessageType {
|
enum MessageType {
|
||||||
@ -37,7 +36,6 @@ enum MessageType {
|
|||||||
MsgVote = 5;
|
MsgVote = 5;
|
||||||
MsgVoteResp = 6;
|
MsgVoteResp = 6;
|
||||||
MsgSnap = 7;
|
MsgSnap = 7;
|
||||||
MsgDenied = 8;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message Message {
|
message Message {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user