raft: check pending conf change before campaign (#12134)

* raft: check conf change before campaign

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>

* raft: extract hup function

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>

* raft: check pending conf change for transferleader

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
This commit is contained in:
Jay 2020-07-23 08:04:48 +08:00 committed by GitHub
parent 772dfbfe35
commit d0e4fe56a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 120 additions and 23 deletions

View File

@ -766,6 +766,29 @@ func (r *raft) becomeLeader() {
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
func (r *raft) hup(t CampaignType) {
if r.state == StateLeader {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
return
}
if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
return
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}
// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
@ -907,28 +930,10 @@ func (r *raft) Step(m pb.Message) error {
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
return nil
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return nil
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
if r.preVote {
r.hup(campaignPreElection)
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
r.hup(campaignElection)
}
case pb.MsgVote, pb.MsgPreVote:
@ -1349,7 +1354,7 @@ func stepFollower(r *raft, m pb.Message) error {
// Leadership transfers never use pre-vote even if r.preVote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
r.campaign(campaignTransfer)
r.hup(campaignTransfer)
} else {
r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
}
@ -1675,7 +1680,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
func numOfPendingConf(ents []pb.Entry) int {
n := 0
for i := range ents {
if ents[i].Type == pb.EntryConfChange {
if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
n++
}
}

View File

@ -4141,6 +4141,98 @@ func TestPreVoteMigrationWithFreeStuckPreCandidate(t *testing.T) {
}
}
func testConfChangeCheckBeforeCampaign(t *testing.T, v2 bool) {
nt := newNetwork(nil, nil, nil)
n1 := nt.peers[1].(*raft)
n2 := nt.peers[2].(*raft)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if n1.state != StateLeader {
t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
}
// Begin to remove the third node.
cc := pb.ConfChange{
Type: pb.ConfChangeRemoveNode,
NodeID: 2,
}
var ccData []byte
var err error
var ty pb.EntryType
if v2 {
ccv2 := cc.AsV2()
ccData, err = ccv2.Marshal()
ty = pb.EntryConfChangeV2
} else {
ccData, err = cc.Marshal()
ty = pb.EntryConfChange
}
if err != nil {
t.Fatal(err)
}
nt.send(pb.Message{
From: 1,
To: 1,
Type: pb.MsgProp,
Entries: []pb.Entry{
{Type: ty, Data: ccData},
},
})
// Trigger campaign in node 2
for i := 0; i < n2.randomizedElectionTimeout; i++ {
n2.tick()
}
// It's still follower because committed conf change is not applied.
if n2.state != StateFollower {
t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
}
// Transfer leadership to peer 2.
nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
if n1.state != StateLeader {
t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
}
// It's still follower because committed conf change is not applied.
if n2.state != StateFollower {
t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
}
// Abort transfer leader
for i := 0; i < n1.electionTimeout; i++ {
n1.tick()
}
// Advance apply
nextEnts(n2, nt.storage[2])
// Transfer leadership to peer 2 again.
nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
if n1.state != StateFollower {
t.Errorf("node 1 state: %s, want %s", n1.state, StateFollower)
}
if n2.state != StateLeader {
t.Errorf("node 2 state: %s, want %s", n2.state, StateLeader)
}
nextEnts(n1, nt.storage[1])
// Trigger campaign in node 2
for i := 0; i < n1.randomizedElectionTimeout; i++ {
n1.tick()
}
if n1.state != StateCandidate {
t.Errorf("node 1 state: %s, want %s", n1.state, StateCandidate)
}
}
// Tests if unapplied ConfChange is checked before campaign.
func TestConfChangeCheckBeforeCampaign(t *testing.T) {
testConfChangeCheckBeforeCampaign(t, false)
}
// Tests if unapplied ConfChangeV2 is checked before campaign.
func TestConfChangeV2CheckBeforeCampaign(t *testing.T) {
testConfChangeCheckBeforeCampaign(t, true)
}
func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft {
storage := NewMemoryStorage()
for i, term := range terms {