mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: only send snapshot when the member is active
This commit is contained in:
parent
9df46f9d6f
commit
cc6d98bf89
@ -275,6 +275,11 @@ func (r *raft) sendAppend(to uint64) {
|
|||||||
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
|
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
|
||||||
|
|
||||||
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
|
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
|
||||||
|
if !pr.RecentActive {
|
||||||
|
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
m.Type = pb.MsgSnap
|
m.Type = pb.MsgSnap
|
||||||
snapshot, err := r.raftLog.snapshot()
|
snapshot, err := r.raftLog.snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1563,8 +1563,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestProvideSnap(t *testing.T) {
|
func TestProvideSnap(t *testing.T) {
|
||||||
// restore the statemachin from a snapshot
|
// restore the state machine from a snapshot so it has a compacted log and a snapshot
|
||||||
// so it has a compacted log and a snapshot
|
|
||||||
s := pb.Snapshot{
|
s := pb.Snapshot{
|
||||||
Metadata: pb.SnapshotMetadata{
|
Metadata: pb.SnapshotMetadata{
|
||||||
Index: 11, // magic number
|
Index: 11, // magic number
|
||||||
@ -1579,11 +1578,10 @@ func TestProvideSnap(t *testing.T) {
|
|||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
|
|
||||||
// force set the next of node 1, so that
|
// force set the next of node 2, so that node 2 needs a snapshot
|
||||||
// node 1 needs a snapshot
|
|
||||||
sm.prs[2].Next = sm.raftLog.firstIndex()
|
sm.prs[2].Next = sm.raftLog.firstIndex()
|
||||||
|
|
||||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true})
|
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true})
|
||||||
|
|
||||||
msgs := sm.readMessages()
|
msgs := sm.readMessages()
|
||||||
if len(msgs) != 1 {
|
if len(msgs) != 1 {
|
||||||
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
|
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
|
||||||
@ -1594,6 +1592,35 @@ func TestProvideSnap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIgnoreProvidingSnap(t *testing.T) {
|
||||||
|
// restore the state machine from a snapshot so it has a compacted log and a snapshot
|
||||||
|
s := pb.Snapshot{
|
||||||
|
Metadata: pb.SnapshotMetadata{
|
||||||
|
Index: 11, // magic number
|
||||||
|
Term: 11, // magic number
|
||||||
|
ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
storage := NewMemoryStorage()
|
||||||
|
sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
|
||||||
|
sm.restore(s)
|
||||||
|
|
||||||
|
sm.becomeCandidate()
|
||||||
|
sm.becomeLeader()
|
||||||
|
|
||||||
|
// force set the next of node 2, so that node 2 needs a snapshot
|
||||||
|
// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
|
||||||
|
sm.prs[2].Next = sm.raftLog.firstIndex() - 1
|
||||||
|
sm.prs[2].RecentActive = false
|
||||||
|
|
||||||
|
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
|
|
||||||
|
msgs := sm.readMessages()
|
||||||
|
if len(msgs) != 0 {
|
||||||
|
t.Errorf("len(msgs) = %d, want 0", len(msgs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRestoreFromSnapMsg(t *testing.T) {
|
func TestRestoreFromSnapMsg(t *testing.T) {
|
||||||
s := pb.Snapshot{
|
s := pb.Snapshot{
|
||||||
Metadata: pb.SnapshotMetadata{
|
Metadata: pb.SnapshotMetadata{
|
||||||
@ -1624,8 +1651,18 @@ func TestSlowNodeRestore(t *testing.T) {
|
|||||||
nt.storage[1].Compact(lead.raftLog.applied)
|
nt.storage[1].Compact(lead.raftLog.applied)
|
||||||
|
|
||||||
nt.recover()
|
nt.recover()
|
||||||
|
// send heartbeats so that the leader can learn everyone is active.
|
||||||
|
// node 3 will only be considered as active when node 1 receives a reply from it.
|
||||||
|
for {
|
||||||
|
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||||
|
if lead.prs[3].RecentActive {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// trigger a snapshot
|
// trigger a snapshot
|
||||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
|
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
|
||||||
|
|
||||||
follower := nt.peers[3].(*raft)
|
follower := nt.peers[3].(*raft)
|
||||||
|
|
||||||
// trigger a commit
|
// trigger a commit
|
||||||
|
Loading…
x
Reference in New Issue
Block a user