diff --git a/raft/log.go b/raft/log.go index f9ed4dc5d..342d8f4ce 100644 --- a/raft/log.go +++ b/raft/log.go @@ -167,6 +167,11 @@ func (l *raftLog) hasNextEnts() bool { return l.committed+1 > off } +// hasPendingSnapshot returns if there is pending snapshot waiting for applying. +func (l *raftLog) hasPendingSnapshot() bool { + return l.unstable.snapshot != nil && !IsEmptySnap(*l.unstable.snapshot) +} + func (l *raftLog) snapshot() (pb.Snapshot, error) { if l.unstable.snapshot != nil { return *l.unstable.snapshot, nil diff --git a/raft/raft.go b/raft/raft.go index 39f4481b1..2c065186b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -773,7 +773,7 @@ func (r *raft) hup(t CampaignType) { } if !r.promotable() { - r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id) + r.logger.Warningf("%x is unpromotable and can not campaign", r.id) return } ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) @@ -1349,15 +1349,11 @@ func stepFollower(r *raft, m pb.Message) error { m.To = r.lead r.send(m) case pb.MsgTimeoutNow: - if r.promotable() { - r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) - // Leadership transfers never use pre-vote even if r.preVote is true; we - // know we are not recovering from a partition so there is no need for the - // extra round trip. - r.hup(campaignTransfer) - } else { - r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From) - } + r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) + // Leadership transfers never use pre-vote even if r.preVote is true; we + // know we are not recovering from a partition so there is no need for the + // extra round trip. + r.hup(campaignTransfer) case pb.MsgReadIndex: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) @@ -1494,7 +1490,7 @@ func (r *raft) restore(s pb.Snapshot) bool { // which is true when its own id is in progress list. func (r *raft) promotable() bool { pr := r.prs.Progress[r.id] - return pr != nil && !pr.IsLearner + return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot() } func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { diff --git a/raft/raft_test.go b/raft/raft_test.go index ad70b6592..7cd849b23 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -2751,6 +2751,13 @@ func TestRestore(t *testing.T) { if ok := sm.restore(s); ok { t.Fatal("restore succeed, want fail") } + // It should not campaign before actually applying data. + for i := 0; i < sm.randomizedElectionTimeout; i++ { + sm.tick() + } + if sm.state != StateFollower { + t.Errorf("state = %d, want %d", sm.state, StateFollower) + } } // TestRestoreWithLearner restores a snapshot which contains learners. @@ -2865,10 +2872,14 @@ func TestLearnerReceiveSnapshot(t *testing.T) { }, } - n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) + store := NewMemoryStorage() + n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, store) n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) n1.restore(s) + ready := newReady(n1, &SoftState{}, pb.HardState{}) + store.ApplySnapshot(ready.Snapshot) + n1.advance(ready) // Force set n1 appplied index. n1.raftLog.appliedTo(n1.raftLog.committed) @@ -3495,10 +3506,31 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1) } + filtered := pb.Message{} + // Snapshot needs to be applied before sending MsgAppResp + nt.msgHook = func(m pb.Message) bool { + if m.Type != pb.MsgAppResp || m.From != 3 || m.Reject { + return true + } + filtered = m + return false + } // Transfer leadership to 3 when node 3 is lack of snapshot. nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) - // Send pb.MsgHeartbeatResp to leader to trigger a snapshot for node 3. - nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgHeartbeatResp}) + if lead.state != StateLeader { + t.Fatalf("node 1 should still be leader as snapshot is not applied, got %x", lead.state) + } + if reflect.DeepEqual(filtered, pb.Message{}) { + t.Fatalf("Follower should report snapshot progress automatically.") + } + + // Apply snapshot and resume progress + follower := nt.peers[3].(*raft) + ready := newReady(follower, &SoftState{}, pb.HardState{}) + nt.storage[3].ApplySnapshot(ready.Snapshot) + follower.advance(ready) + nt.msgHook = nil + nt.send(filtered) checkLeaderTransferState(t, lead, StateFollower, 3) } diff --git a/raft/rawnode.go b/raft/rawnode.go index 2dffc066c..a056e7206 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -157,7 +157,7 @@ func (rn *RawNode) HasReady() bool { if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) { return true } - if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) { + if r.raftLog.hasPendingSnapshot() { return true } if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {