mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #5809 from swingbach/master
raft: make leader transferring workable when quorum check is on
This commit is contained in:
commit
7432e9fbe9
31
raft/raft.go
31
raft/raft.go
@ -15,6 +15,7 @@
|
|||||||
package raft
|
package raft
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
@ -36,6 +37,19 @@ const (
|
|||||||
StateLeader
|
StateLeader
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Possible values for CampaignType
|
||||||
|
const (
|
||||||
|
// campaignElection represents the type of normal election
|
||||||
|
campaignElection CampaignType = "CampaignElection"
|
||||||
|
// campaignTransfer represents the type of leader transfer
|
||||||
|
campaignTransfer CampaignType = "CampaignTransfer"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CampaignType represents the type of campaigning
|
||||||
|
// the reason we use the type of string instead of uint64
|
||||||
|
// is because it's simpler to compare and fill in raft entries
|
||||||
|
type CampaignType string
|
||||||
|
|
||||||
// StateType represents the role of a node in a cluster.
|
// StateType represents the role of a node in a cluster.
|
||||||
type StateType uint64
|
type StateType uint64
|
||||||
|
|
||||||
@ -520,7 +534,7 @@ func (r *raft) becomeLeader() {
|
|||||||
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
|
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) campaign() {
|
func (r *raft) campaign(t CampaignType) {
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
if r.quorum() == r.poll(r.id, true) {
|
if r.quorum() == r.poll(r.id, true) {
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
@ -532,7 +546,12 @@ func (r *raft) campaign() {
|
|||||||
}
|
}
|
||||||
r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
|
r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), id, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), id, r.Term)
|
||||||
r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
|
|
||||||
|
var ctx []byte
|
||||||
|
if t == campaignTransfer {
|
||||||
|
ctx = []byte(t)
|
||||||
|
}
|
||||||
|
r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -557,7 +576,7 @@ func (r *raft) Step(m pb.Message) error {
|
|||||||
if m.Type == pb.MsgHup {
|
if m.Type == pb.MsgHup {
|
||||||
if r.state != StateLeader {
|
if r.state != StateLeader {
|
||||||
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
|
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
|
||||||
r.campaign()
|
r.campaign(campaignElection)
|
||||||
} else {
|
} else {
|
||||||
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
|
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
|
||||||
}
|
}
|
||||||
@ -575,7 +594,9 @@ func (r *raft) Step(m pb.Message) error {
|
|||||||
case m.Term > r.Term:
|
case m.Term > r.Term:
|
||||||
lead := m.From
|
lead := m.From
|
||||||
if m.Type == pb.MsgVote {
|
if m.Type == pb.MsgVote {
|
||||||
if r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout {
|
force := bytes.Equal(m.Context, []byte(campaignTransfer))
|
||||||
|
inLease := r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout
|
||||||
|
if !force && inLease {
|
||||||
// If a server receives a RequestVote request within the minimum election timeout
|
// If a server receives a RequestVote request within the minimum election timeout
|
||||||
// of hearing from a current leader, it does not update its term or grant its vote
|
// of hearing from a current leader, it does not update its term or grant its vote
|
||||||
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored vote from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored vote from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
|
||||||
@ -843,7 +864,7 @@ func stepFollower(r *raft, m pb.Message) {
|
|||||||
}
|
}
|
||||||
case pb.MsgTimeoutNow:
|
case pb.MsgTimeoutNow:
|
||||||
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
|
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
|
||||||
r.campaign()
|
r.campaign(campaignTransfer)
|
||||||
case pb.MsgReadIndex:
|
case pb.MsgReadIndex:
|
||||||
if r.lead == None {
|
if r.lead == None {
|
||||||
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
|
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
|
||||||
|
@ -2212,6 +2212,42 @@ func TestLeaderTransferToUpToDateNode(t *testing.T) {
|
|||||||
checkLeaderTransferState(t, lead, StateLeader, 1)
|
checkLeaderTransferState(t, lead, StateLeader, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestLeaderTransferWithCheckQuorum ensures transferring leader still works
|
||||||
|
// even the current leader is still under its leader lease
|
||||||
|
func TestLeaderTransferWithCheckQuorum(t *testing.T) {
|
||||||
|
nt := newNetwork(nil, nil, nil)
|
||||||
|
for i := 1; i < 4; i++ {
|
||||||
|
r := nt.peers[uint64(i)].(*raft)
|
||||||
|
r.checkQuorum = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Letting peer 2 electionElapsed reach to timeout so that it can vote for peer 1
|
||||||
|
f := nt.peers[2].(*raft)
|
||||||
|
for i := 0; i < f.electionTimeout; i++ {
|
||||||
|
f.tick()
|
||||||
|
}
|
||||||
|
|
||||||
|
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||||
|
|
||||||
|
lead := nt.peers[1].(*raft)
|
||||||
|
|
||||||
|
if lead.lead != 1 {
|
||||||
|
t.Fatalf("after election leader is %x, want 1", lead.lead)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transfer leadership to 2.
|
||||||
|
nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
|
||||||
|
|
||||||
|
checkLeaderTransferState(t, lead, StateFollower, 2)
|
||||||
|
|
||||||
|
// After some log replication, transfer leadership back to 1.
|
||||||
|
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
|
||||||
|
|
||||||
|
nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader})
|
||||||
|
|
||||||
|
checkLeaderTransferState(t, lead, StateLeader, 1)
|
||||||
|
}
|
||||||
|
|
||||||
func TestLeaderTransferToSlowFollower(t *testing.T) {
|
func TestLeaderTransferToSlowFollower(t *testing.T) {
|
||||||
defaultLogger.EnableDebug()
|
defaultLogger.EnableDebug()
|
||||||
nt := newNetwork(nil, nil, nil)
|
nt := newNetwork(nil, nil, nil)
|
||||||
|
@ -236,6 +236,7 @@ type Message struct {
|
|||||||
Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
|
Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
|
||||||
Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"`
|
Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"`
|
||||||
RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
|
RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
|
||||||
|
Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
|
||||||
XXX_unrecognized []byte `json:"-"`
|
XXX_unrecognized []byte `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -464,6 +465,12 @@ func (m *Message) MarshalTo(data []byte) (int, error) {
|
|||||||
data[i] = 0x58
|
data[i] = 0x58
|
||||||
i++
|
i++
|
||||||
i = encodeVarintRaft(data, i, uint64(m.RejectHint))
|
i = encodeVarintRaft(data, i, uint64(m.RejectHint))
|
||||||
|
if m.Context != nil {
|
||||||
|
data[i] = 0x62
|
||||||
|
i++
|
||||||
|
i = encodeVarintRaft(data, i, uint64(len(m.Context)))
|
||||||
|
i += copy(data[i:], m.Context)
|
||||||
|
}
|
||||||
if m.XXX_unrecognized != nil {
|
if m.XXX_unrecognized != nil {
|
||||||
i += copy(data[i:], m.XXX_unrecognized)
|
i += copy(data[i:], m.XXX_unrecognized)
|
||||||
}
|
}
|
||||||
@ -655,6 +662,10 @@ func (m *Message) Size() (n int) {
|
|||||||
n += 1 + l + sovRaft(uint64(l))
|
n += 1 + l + sovRaft(uint64(l))
|
||||||
n += 2
|
n += 2
|
||||||
n += 1 + sovRaft(uint64(m.RejectHint))
|
n += 1 + sovRaft(uint64(m.RejectHint))
|
||||||
|
if m.Context != nil {
|
||||||
|
l = len(m.Context)
|
||||||
|
n += 1 + l + sovRaft(uint64(l))
|
||||||
|
}
|
||||||
if m.XXX_unrecognized != nil {
|
if m.XXX_unrecognized != nil {
|
||||||
n += len(m.XXX_unrecognized)
|
n += len(m.XXX_unrecognized)
|
||||||
}
|
}
|
||||||
@ -1348,6 +1359,37 @@ func (m *Message) Unmarshal(data []byte) error {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case 12:
|
||||||
|
if wireType != 2 {
|
||||||
|
return fmt.Errorf("proto: wrong wireType = %d for field Context", wireType)
|
||||||
|
}
|
||||||
|
var byteLen int
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return ErrIntOverflowRaft
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := data[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
byteLen |= (int(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if byteLen < 0 {
|
||||||
|
return ErrInvalidLengthRaft
|
||||||
|
}
|
||||||
|
postIndex := iNdEx + byteLen
|
||||||
|
if postIndex > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
m.Context = append(m.Context[:0], data[iNdEx:postIndex]...)
|
||||||
|
if m.Context == nil {
|
||||||
|
m.Context = []byte{}
|
||||||
|
}
|
||||||
|
iNdEx = postIndex
|
||||||
default:
|
default:
|
||||||
iNdEx = preIndex
|
iNdEx = preIndex
|
||||||
skippy, err := skipRaft(data[iNdEx:])
|
skippy, err := skipRaft(data[iNdEx:])
|
||||||
|
@ -64,6 +64,7 @@ message Message {
|
|||||||
optional Snapshot snapshot = 9 [(gogoproto.nullable) = false];
|
optional Snapshot snapshot = 9 [(gogoproto.nullable) = false];
|
||||||
optional bool reject = 10 [(gogoproto.nullable) = false];
|
optional bool reject = 10 [(gogoproto.nullable) = false];
|
||||||
optional uint64 rejectHint = 11 [(gogoproto.nullable) = false];
|
optional uint64 rejectHint = 11 [(gogoproto.nullable) = false];
|
||||||
|
optional bytes context = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
message HardState {
|
message HardState {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user