From cc6d98bf89ae24dee0b3da2f01a8b704f4e3a0f0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 10 Dec 2015 12:17:30 -0800 Subject: [PATCH] etcdserver: only send snapshot when the member is active --- raft/raft.go | 5 +++++ raft/raft_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 0fc874be4..5a4f52319 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -275,6 +275,11 @@ func (r *raft) sendAppend(to uint64) { ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) if errt != nil || erre != nil { // send snapshot if we failed to get term or entries + if !pr.RecentActive { + r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) + return + } + m.Type = pb.MsgSnap snapshot, err := r.raftLog.snapshot() if err != nil { diff --git a/raft/raft_test.go b/raft/raft_test.go index 01c72d975..52752445d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1563,8 +1563,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { } func TestProvideSnap(t *testing.T) { - // restore the statemachin from a snapshot - // so it has a compacted log and a snapshot + // restore the state machine from a snapshot so it has a compacted log and a snapshot s := pb.Snapshot{ Metadata: pb.SnapshotMetadata{ Index: 11, // magic number @@ -1579,11 +1578,10 @@ func TestProvideSnap(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - // force set the next of node 1, so that - // node 1 needs a snapshot + // force set the next of node 2, so that node 2 needs a snapshot sm.prs[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true}) + msgs := sm.readMessages() if len(msgs) != 1 { t.Fatalf("len(msgs) = %d, want 1", len(msgs)) @@ -1594,6 +1592,35 @@ func TestProvideSnap(t *testing.T) { } } +func TestIgnoreProvidingSnap(t *testing.T) { + // restore the state machine from a snapshot so it has a compacted log and a snapshot + s := pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + Index: 11, // magic number + Term: 11, // magic number + ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + }, + } + storage := NewMemoryStorage() + sm := newTestRaft(1, []uint64{1}, 10, 1, storage) + sm.restore(s) + + sm.becomeCandidate() + sm.becomeLeader() + + // force set the next of node 2, so that node 2 needs a snapshot + // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 + sm.prs[2].Next = sm.raftLog.firstIndex() - 1 + sm.prs[2].RecentActive = false + + sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + + msgs := sm.readMessages() + if len(msgs) != 0 { + t.Errorf("len(msgs) = %d, want 0", len(msgs)) + } +} + func TestRestoreFromSnapMsg(t *testing.T) { s := pb.Snapshot{ Metadata: pb.SnapshotMetadata{ @@ -1624,8 +1651,18 @@ func TestSlowNodeRestore(t *testing.T) { nt.storage[1].Compact(lead.raftLog.applied) nt.recover() + // send heartbeats so that the leader can learn everyone is active. + // node 3 will only be considered as active when node 1 receives a reply from it. + for { + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) + if lead.prs[3].RecentActive { + break + } + } + // trigger a snapshot nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + follower := nt.peers[3].(*raft) // trigger a commit