From 152676f43afe6981f69771a3bb295e34ba396a31 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 23 Dec 2014 15:47:41 -0800 Subject: [PATCH] *: support removing the leader from a 2 members cluster --- etcdserver/server.go | 18 +++++++++++++++--- integration/cluster_test.go | 2 +- raft/node.go | 5 +++++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 6748db446..1e055dbc7 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -435,7 +435,9 @@ func (s *EtcdServer) run() { } if len(ents) > 0 { if appliedi, shouldstop = s.apply(ents, &confState); shouldstop { - return + m1 := fmt.Sprintf("etcdserver: removed local member %s from cluster %s", s.ID(), s.Cluster.ID()) + m2 := fmt.Sprint("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") + go s.stopWithDelay(10*100*time.Millisecond, m1, m2) } } } @@ -460,14 +462,26 @@ func (s *EtcdServer) run() { // Stop stops the server gracefully, and shuts down the running goroutine. // Stop should be called after a Start(s), otherwise it will block forever. func (s *EtcdServer) Stop() { + s.stopWithMessages() +} + +func (s *EtcdServer) stopWithMessages(msgs ...string) { select { case s.stop <- struct{}{}: + for _, msg := range msgs { + log.Println(msg) + } case <-s.done: return } <-s.done } +func (s *EtcdServer) stopWithDelay(d time.Duration, msgs ...string) { + time.Sleep(d) + s.stopWithMessages(msgs...) +} + // StopNotify returns a channel that receives a empty struct // when the server is stopped. func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done } @@ -784,8 +798,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con id := types.ID(cc.NodeID) s.Cluster.RemoveMember(id) if id == s.id { - log.Printf("etcdserver: removed local member %s from cluster %s", id, s.Cluster.ID()) - log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") return true, nil } else { s.sendhub.Remove(id) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 056e140b7..c64d9392b 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -113,7 +113,7 @@ func testDecreaseClusterSize(t *testing.T, size int) { defer c.Terminate(t) // TODO: remove the last but one member - for i := 0; i < size-2; i++ { + for i := 0; i < size-1; i++ { id := c.Members[len(c.Members)-1].s.ID() c.RemoveMember(t, uint64(id)) c.waitLeader(t, c.Members) diff --git a/raft/node.go b/raft/node.go index 9980c4307..b2d2672ad 100644 --- a/raft/node.go +++ b/raft/node.go @@ -284,6 +284,11 @@ func (n *node) run(r *raft) { case pb.ConfChangeAddNode: r.addNode(cc.NodeID) case pb.ConfChangeRemoveNode: + // block incoming proposal when local node is + // removed + if cc.NodeID == r.id { + n.propc = nil + } r.removeNode(cc.NodeID) case pb.ConfChangeUpdateNode: r.resetPendingConf()