From e020b2a228aaf488235a7b219a0abf10cb3b9176 Mon Sep 17 00:00:00 2001 From: "swingbach@gmail.com" Date: Wed, 29 Jun 2016 18:24:58 +0800 Subject: [PATCH 1/3] raft: make leader transferring workable when quorum check is on --- raft/raft.go | 24 +++++++++++++++++++----- raft/raft_paper_test.go | 4 ++-- raft/raft_test.go | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 68cf60df9..9d21bf04e 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -15,6 +15,7 @@ package raft import ( + "bytes" "errors" "fmt" "math" @@ -36,6 +37,17 @@ const ( StateLeader ) +// Possible values for CampaignType +const ( + LeaderElection CampaignType = "LeaderElection" + LeaderTransfer CampaignType = "LeaderTransfer" +) + +// CampaignType represents the type of campaigning +// the reason we use the type of string instead of uint64 +// is because it's simpler to compare and fill in raft entries +type CampaignType string + // StateType represents the role of a node in a cluster. type StateType uint64 @@ -520,7 +532,7 @@ func (r *raft) becomeLeader() { r.logger.Infof("%x became leader at term %d", r.id, r.Term) } -func (r *raft) campaign() { +func (r *raft) campaign(t CampaignType) { r.becomeCandidate() if r.quorum() == r.poll(r.id, true) { r.becomeLeader() @@ -532,7 +544,7 @@ func (r *raft) campaign() { } r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), id, r.Term) - r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()}) + r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Entries: []pb.Entry{{Data: []byte(t)}}}) } } @@ -557,7 +569,7 @@ func (r *raft) Step(m pb.Message) error { if m.Type == pb.MsgHup { if r.state != StateLeader { r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) - r.campaign() + r.campaign(LeaderElection) } else { r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) } @@ -575,7 +587,9 @@ func (r *raft) Step(m pb.Message) error { case m.Term > r.Term: lead := m.From if m.Type == pb.MsgVote { - if r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout { + force := len(m.Entries) == 1 && bytes.Equal(m.Entries[0].Data, []byte(LeaderTransfer)) + inLease := r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout + if !force && inLease { // If a server receives a RequestVote request within the minimum election timeout // of hearing from a current leader, it does not update its term or grant its vote r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored vote from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)", @@ -842,7 +856,7 @@ func stepFollower(r *raft, m pb.Message) { } case pb.MsgTimeoutNow: r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) - r.campaign() + r.campaign(LeaderTransfer) case pb.MsgReadIndex: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index cd5c3e94e..8990f4ef3 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -175,8 +175,8 @@ func testNonleaderStartElection(t *testing.T, state StateType) { msgs := r.readMessages() sort.Sort(messageSlice(msgs)) wmsgs := []pb.Message{ - {From: 1, To: 2, Term: 2, Type: pb.MsgVote}, - {From: 1, To: 3, Term: 2, Type: pb.MsgVote}, + {From: 1, To: 2, Term: 2, Type: pb.MsgVote, Entries: []pb.Entry{{Data: []byte(LeaderElection)}}}, + {From: 1, To: 3, Term: 2, Type: pb.MsgVote, Entries: []pb.Entry{{Data: []byte(LeaderElection)}}}, } if !reflect.DeepEqual(msgs, wmsgs) { t.Errorf("msgs = %v, want %v", msgs, wmsgs) diff --git a/raft/raft_test.go b/raft/raft_test.go index 8318f6f78..791d6ab8e 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -2208,6 +2208,41 @@ func TestLeaderTransferToUpToDateNode(t *testing.T) { checkLeaderTransferState(t, lead, StateLeader, 1) } +// TestLeaderTransferWithCheckQuorum ensures transferring leader still works +// even the current leader is still under its leader lease +func TestLeaderTransferWithCheckQuorum(t *testing.T) { + nt := newNetwork(nil, nil, nil) + for i := 1; i < 4; i++ { + r := nt.peers[uint64(i)].(*raft) + r.checkQuorum = true + } + + f := nt.peers[2].(*raft) + for i := 0; i < f.electionTimeout; i++ { + f.tick() + } + + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + lead := nt.peers[1].(*raft) + + if lead.lead != 1 { + t.Fatalf("after election leader is %x, want 1", lead.lead) + } + + // Transfer leadership to 2. + nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader}) + + checkLeaderTransferState(t, lead, StateFollower, 2) + + // After some log replication, transfer leadership back to 1. + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + + nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader}) + + checkLeaderTransferState(t, lead, StateLeader, 1) +} + func TestLeaderTransferToSlowFollower(t *testing.T) { defaultLogger.EnableDebug() nt := newNetwork(nil, nil, nil) From 0d9b6ba0ab6198df7e5f4aa1ac59dedaa484aa4c Mon Sep 17 00:00:00 2001 From: "swingbach@gmail.com" Date: Mon, 11 Jul 2016 14:59:53 +0800 Subject: [PATCH 2/3] raft: fix a few problems --- raft/raft.go | 19 +++++++++++++------ raft/raft_paper_test.go | 4 ++-- raft/raft_test.go | 1 + 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 9d21bf04e..f6077227b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -39,8 +39,10 @@ const ( // Possible values for CampaignType const ( - LeaderElection CampaignType = "LeaderElection" - LeaderTransfer CampaignType = "LeaderTransfer" + // campaignElection represents the type of normal election + campaignElection CampaignType = "CampaignElection" + // campaignTransfer represents the type of leader transfer + campaignTransfer CampaignType = "CampaignTransfer" ) // CampaignType represents the type of campaigning @@ -544,7 +546,12 @@ func (r *raft) campaign(t CampaignType) { } r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), id, r.Term) - r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Entries: []pb.Entry{{Data: []byte(t)}}}) + + var entries []pb.Entry + if t == campaignTransfer { + entries = []pb.Entry{{Data: []byte(t)}} + } + r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Entries: entries}) } } @@ -569,7 +576,7 @@ func (r *raft) Step(m pb.Message) error { if m.Type == pb.MsgHup { if r.state != StateLeader { r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) - r.campaign(LeaderElection) + r.campaign(campaignElection) } else { r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) } @@ -587,7 +594,7 @@ func (r *raft) Step(m pb.Message) error { case m.Term > r.Term: lead := m.From if m.Type == pb.MsgVote { - force := len(m.Entries) == 1 && bytes.Equal(m.Entries[0].Data, []byte(LeaderTransfer)) + force := len(m.Entries) == 1 && bytes.Equal(m.Entries[0].Data, []byte(campaignTransfer)) inLease := r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout if !force && inLease { // If a server receives a RequestVote request within the minimum election timeout @@ -856,7 +863,7 @@ func stepFollower(r *raft, m pb.Message) { } case pb.MsgTimeoutNow: r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) - r.campaign(LeaderTransfer) + r.campaign(campaignTransfer) case pb.MsgReadIndex: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 8990f4ef3..cd5c3e94e 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -175,8 +175,8 @@ func testNonleaderStartElection(t *testing.T, state StateType) { msgs := r.readMessages() sort.Sort(messageSlice(msgs)) wmsgs := []pb.Message{ - {From: 1, To: 2, Term: 2, Type: pb.MsgVote, Entries: []pb.Entry{{Data: []byte(LeaderElection)}}}, - {From: 1, To: 3, Term: 2, Type: pb.MsgVote, Entries: []pb.Entry{{Data: []byte(LeaderElection)}}}, + {From: 1, To: 2, Term: 2, Type: pb.MsgVote}, + {From: 1, To: 3, Term: 2, Type: pb.MsgVote}, } if !reflect.DeepEqual(msgs, wmsgs) { t.Errorf("msgs = %v, want %v", msgs, wmsgs) diff --git a/raft/raft_test.go b/raft/raft_test.go index 791d6ab8e..b2e16b339 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -2217,6 +2217,7 @@ func TestLeaderTransferWithCheckQuorum(t *testing.T) { r.checkQuorum = true } + // Letting peer 2 electionElapsed reach to timeout so that it can vote for peer 1 f := nt.peers[2].(*raft) for i := 0; i < f.electionTimeout; i++ { f.tick() From c36a40ca15dd758ea25662773c51cc4263fad9f6 Mon Sep 17 00:00:00 2001 From: "swingbach@gmail.com" Date: Tue, 12 Jul 2016 16:14:06 +0800 Subject: [PATCH 3/3] raft: introduce top-level context in message struct --- raft/raft.go | 8 ++++---- raft/raftpb/raft.pb.go | 42 ++++++++++++++++++++++++++++++++++++++++++ raft/raftpb/raft.proto | 1 + 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index f6077227b..658d2d02e 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -547,11 +547,11 @@ func (r *raft) campaign(t CampaignType) { r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), id, r.Term) - var entries []pb.Entry + var ctx []byte if t == campaignTransfer { - entries = []pb.Entry{{Data: []byte(t)}} + ctx = []byte(t) } - r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Entries: entries}) + r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) } } @@ -594,7 +594,7 @@ func (r *raft) Step(m pb.Message) error { case m.Term > r.Term: lead := m.From if m.Type == pb.MsgVote { - force := len(m.Entries) == 1 && bytes.Equal(m.Entries[0].Data, []byte(campaignTransfer)) + force := bytes.Equal(m.Context, []byte(campaignTransfer)) inLease := r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout if !force && inLease { // If a server receives a RequestVote request within the minimum election timeout diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 479a1c683..6176c3d2d 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -236,6 +236,7 @@ type Message struct { Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"` Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"` RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"` + Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -464,6 +465,12 @@ func (m *Message) MarshalTo(data []byte) (int, error) { data[i] = 0x58 i++ i = encodeVarintRaft(data, i, uint64(m.RejectHint)) + if m.Context != nil { + data[i] = 0x62 + i++ + i = encodeVarintRaft(data, i, uint64(len(m.Context))) + i += copy(data[i:], m.Context) + } if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } @@ -655,6 +662,10 @@ func (m *Message) Size() (n int) { n += 1 + l + sovRaft(uint64(l)) n += 2 n += 1 + sovRaft(uint64(m.RejectHint)) + if m.Context != nil { + l = len(m.Context) + n += 1 + l + sovRaft(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1348,6 +1359,37 @@ func (m *Message) Unmarshal(data []byte) error { break } } + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Context", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Context = append(m.Context[:0], data[iNdEx:postIndex]...) + if m.Context == nil { + m.Context = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRaft(data[iNdEx:]) diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 1948fc1e4..18f4cefae 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -64,6 +64,7 @@ message Message { optional Snapshot snapshot = 9 [(gogoproto.nullable) = false]; optional bool reject = 10 [(gogoproto.nullable) = false]; optional uint64 rejectHint = 11 [(gogoproto.nullable) = false]; + optional bytes context = 12; } message HardState {