mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: don't campaign with pending snapshot (#12163)
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
This commit is contained in:
parent
c9a5889915
commit
26b89fd418
@ -167,6 +167,11 @@ func (l *raftLog) hasNextEnts() bool {
|
||||
return l.committed+1 > off
|
||||
}
|
||||
|
||||
// hasPendingSnapshot returns if there is pending snapshot waiting for applying.
|
||||
func (l *raftLog) hasPendingSnapshot() bool {
|
||||
return l.unstable.snapshot != nil && !IsEmptySnap(*l.unstable.snapshot)
|
||||
}
|
||||
|
||||
func (l *raftLog) snapshot() (pb.Snapshot, error) {
|
||||
if l.unstable.snapshot != nil {
|
||||
return *l.unstable.snapshot, nil
|
||||
|
18
raft/raft.go
18
raft/raft.go
@ -773,7 +773,7 @@ func (r *raft) hup(t CampaignType) {
|
||||
}
|
||||
|
||||
if !r.promotable() {
|
||||
r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
|
||||
r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
|
||||
return
|
||||
}
|
||||
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
|
||||
@ -1349,15 +1349,11 @@ func stepFollower(r *raft, m pb.Message) error {
|
||||
m.To = r.lead
|
||||
r.send(m)
|
||||
case pb.MsgTimeoutNow:
|
||||
if r.promotable() {
|
||||
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
|
||||
// Leadership transfers never use pre-vote even if r.preVote is true; we
|
||||
// know we are not recovering from a partition so there is no need for the
|
||||
// extra round trip.
|
||||
r.hup(campaignTransfer)
|
||||
} else {
|
||||
r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
|
||||
}
|
||||
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
|
||||
// Leadership transfers never use pre-vote even if r.preVote is true; we
|
||||
// know we are not recovering from a partition so there is no need for the
|
||||
// extra round trip.
|
||||
r.hup(campaignTransfer)
|
||||
case pb.MsgReadIndex:
|
||||
if r.lead == None {
|
||||
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
|
||||
@ -1494,7 +1490,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
||||
// which is true when its own id is in progress list.
|
||||
func (r *raft) promotable() bool {
|
||||
pr := r.prs.Progress[r.id]
|
||||
return pr != nil && !pr.IsLearner
|
||||
return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
|
||||
}
|
||||
|
||||
func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
|
||||
|
@ -2751,6 +2751,13 @@ func TestRestore(t *testing.T) {
|
||||
if ok := sm.restore(s); ok {
|
||||
t.Fatal("restore succeed, want fail")
|
||||
}
|
||||
// It should not campaign before actually applying data.
|
||||
for i := 0; i < sm.randomizedElectionTimeout; i++ {
|
||||
sm.tick()
|
||||
}
|
||||
if sm.state != StateFollower {
|
||||
t.Errorf("state = %d, want %d", sm.state, StateFollower)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRestoreWithLearner restores a snapshot which contains learners.
|
||||
@ -2865,10 +2872,14 @@ func TestLearnerReceiveSnapshot(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
store := NewMemoryStorage()
|
||||
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, store)
|
||||
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||
|
||||
n1.restore(s)
|
||||
ready := newReady(n1, &SoftState{}, pb.HardState{})
|
||||
store.ApplySnapshot(ready.Snapshot)
|
||||
n1.advance(ready)
|
||||
|
||||
// Force set n1 appplied index.
|
||||
n1.raftLog.appliedTo(n1.raftLog.committed)
|
||||
@ -3495,10 +3506,31 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) {
|
||||
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1)
|
||||
}
|
||||
|
||||
filtered := pb.Message{}
|
||||
// Snapshot needs to be applied before sending MsgAppResp
|
||||
nt.msgHook = func(m pb.Message) bool {
|
||||
if m.Type != pb.MsgAppResp || m.From != 3 || m.Reject {
|
||||
return true
|
||||
}
|
||||
filtered = m
|
||||
return false
|
||||
}
|
||||
// Transfer leadership to 3 when node 3 is lack of snapshot.
|
||||
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
|
||||
// Send pb.MsgHeartbeatResp to leader to trigger a snapshot for node 3.
|
||||
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgHeartbeatResp})
|
||||
if lead.state != StateLeader {
|
||||
t.Fatalf("node 1 should still be leader as snapshot is not applied, got %x", lead.state)
|
||||
}
|
||||
if reflect.DeepEqual(filtered, pb.Message{}) {
|
||||
t.Fatalf("Follower should report snapshot progress automatically.")
|
||||
}
|
||||
|
||||
// Apply snapshot and resume progress
|
||||
follower := nt.peers[3].(*raft)
|
||||
ready := newReady(follower, &SoftState{}, pb.HardState{})
|
||||
nt.storage[3].ApplySnapshot(ready.Snapshot)
|
||||
follower.advance(ready)
|
||||
nt.msgHook = nil
|
||||
nt.send(filtered)
|
||||
|
||||
checkLeaderTransferState(t, lead, StateFollower, 3)
|
||||
}
|
||||
|
@ -157,7 +157,7 @@ func (rn *RawNode) HasReady() bool {
|
||||
if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
|
||||
return true
|
||||
}
|
||||
if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
|
||||
if r.raftLog.hasPendingSnapshot() {
|
||||
return true
|
||||
}
|
||||
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
|
||||
|
Loading…
x
Reference in New Issue
Block a user