From f4141f0f5184db361535fe021b5b74ad82b41f92 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 10 Aug 2016 16:24:29 -0700 Subject: [PATCH 1/3] raft: handle 'MsgTransferLeader' in follower --- raft/raft.go | 12 +++++++----- raft/util.go | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index adef48e72..740c832b8 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -590,11 +590,6 @@ func (r *raft) Step(m pb.Message) error { } return nil } - if m.Type == pb.MsgTransferLeader { - if r.state != StateLeader { - r.logger.Debugf("%x [term %d state %v] ignoring MsgTransferLeader to %x", r.id, r.Term, r.state, m.From) - } - } switch { case m.Term == 0: @@ -874,6 +869,13 @@ func stepFollower(r *raft, m pb.Message) { r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) } + case pb.MsgTransferLeader: + if r.lead == None { + r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term) + return + } + m.To = r.lead + r.send(m) case pb.MsgTimeoutNow: r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) r.campaign(campaignTransfer) diff --git a/raft/util.go b/raft/util.go index c57855a17..0db073003 100644 --- a/raft/util.go +++ b/raft/util.go @@ -48,7 +48,7 @@ func max(a, b uint64) uint64 { func IsLocalMsg(msgt pb.MessageType) bool { return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable || - msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum || msgt == pb.MsgTransferLeader + msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum } func IsResponseMsg(msgt pb.MessageType) bool { From e64ef3f2611decad26b42135031acdb19f1fa460 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 10 Aug 2016 16:25:22 -0700 Subject: [PATCH 2/3] raft: add 'TransferLeadership' to Node interface --- raft/node.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/raft/node.go b/raft/node.go index 4c2a8968f..800fb0737 100644 --- a/raft/node.go +++ b/raft/node.go @@ -144,6 +144,9 @@ type Node interface { // to match MemoryStorage.Compact. ApplyConfChange(cc pb.ConfChange) *pb.ConfState + // TransferLeadership attempts to transfer leadership to the given transferee. + TransferLeadership(ctx context.Context, lead, transferee uint64) + // ReadIndex request a read state. The read state will be set in the ready. // Read state has a read index. Once the application advances further than the read // index, any linearizable read requests issued before the read request can be @@ -485,6 +488,15 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { } } +func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) { + select { + // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership + case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}: + case <-n.done: + case <-ctx.Done(): + } +} + func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) } From a56cb821800aa7b7680079d431a92ba898060c24 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 10 Aug 2016 16:26:11 -0700 Subject: [PATCH 3/3] etcdserver: add TransferLeadership for raft.Node --- etcdserver/server_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 5b319e1c6..11e6ab49e 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1369,10 +1369,11 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { n.Record(testutil.Action{Name: "Step"}) return nil } -func (n *nodeRecorder) Status() raft.Status { return raft.Status{} } -func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } -func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil } -func (n *nodeRecorder) Advance() {} +func (n *nodeRecorder) Status() raft.Status { return raft.Status{} } +func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } +func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {} +func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil } +func (n *nodeRecorder) Advance() {} func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState { n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}}) return &raftpb.ConfState{}