From 189fece683cf95027976919a10581381a122d879 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 20 May 2014 10:11:04 -0700 Subject: [PATCH 1/2] hack(server): notify removed peers when they try to become candidates A peer might be removed during a network partiton. When it comes back it will not have received any of the log entries that would have notified it of its removal and go onto propose a vote. This will disrupt the cluster and the cluster should give the machine feedback that it is no longer a member. The term of a denied vote is MaxUint64. The notification of the removal is a raft event. These two modification are quick heck. In reaction to this notification the machine should shutdown. In this case the shutdown just moves it towards becoming a standby server. --- server/peer_server.go | 10 ++++++++++ third_party/github.com/goraft/raft/event.go | 3 ++- third_party/github.com/goraft/raft/server.go | 11 +++++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/server/peer_server.go b/server/peer_server.go index 0a317c769..03e49f77c 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -122,6 +122,8 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) { raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent) + raftServer.AddEventListener(raft.RemovedEventType, s.removedEvent) + s.raftServer = raftServer s.removedInLog = false @@ -644,6 +646,14 @@ func (s *PeerServer) PeerStats() []byte { return nil } +// removedEvent handles the case where a machine has been removed from the +// cluster and is notified when it tries to become a candidate. +func (s *PeerServer) removedEvent(event raft.Event) { + // HACK(philips): we need to find a better notification for this. + log.Infof("removed during cluster re-configuration") + s.asyncRemove() +} + // raftEventLogger converts events from the Raft server into log messages. func (s *PeerServer) raftEventLogger(event raft.Event) { value := event.Value() diff --git a/third_party/github.com/goraft/raft/event.go b/third_party/github.com/goraft/raft/event.go index f2e40e6bd..b103c2281 100644 --- a/third_party/github.com/goraft/raft/event.go +++ b/third_party/github.com/goraft/raft/event.go @@ -4,9 +4,10 @@ const ( StateChangeEventType = "stateChange" LeaderChangeEventType = "leaderChange" TermChangeEventType = "termChange" - CommitEventType = "commit" + CommitEventType = "commit" AddPeerEventType = "addPeer" RemovePeerEventType = "removePeer" + RemovedEventType = "removed" HeartbeatIntervalEventType = "heartbeatInterval" ElectionTimeoutThresholdEventType = "electionTimeoutThreshold" diff --git a/third_party/github.com/goraft/raft/server.go b/third_party/github.com/goraft/raft/server.go index 8a9d05c15..5ce978ab7 100644 --- a/third_party/github.com/goraft/raft/server.go +++ b/third_party/github.com/goraft/raft/server.go @@ -6,6 +6,7 @@ import ( "fmt" "hash/crc32" "io/ioutil" + "math" "os" "path" "sort" @@ -1040,6 +1041,11 @@ func (s *server) processVoteResponse(resp *RequestVoteResponse) bool { return true } + if resp.Term == math.MaxUint64 { + s.debugln("got a removal notification, stopping") + s.DispatchEvent(newEvent(RemovedEventType, nil, nil)) + } + if resp.Term > s.currentTerm { s.debugln("server.candidate.vote.failed") s.updateCurrentTerm(resp.Term, "") @@ -1064,6 +1070,11 @@ func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse { // Processes a "request vote" request. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) { + // Deny the vote quest if the candidate is not in the current cluster + if _, ok := s.peers[req.CandidateName]; !ok { + s.debugln("server.rv.deny.vote: unknown peer ", req.CandidateName) + return newRequestVoteResponse(math.MaxUint64, false), false + } // If the request is coming from an old term then reject it. if req.Term < s.Term() { From 9e5b12f5912a3cc232ae7ffa999c00fc58b4f5a1 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 20 May 2014 10:35:43 -0700 Subject: [PATCH 2/2] tests(remove_node): add TestRemovePausedNode --- tests/functional/remove_node_test.go | 79 ++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go index 67e8e455b..ba7f3bf98 100644 --- a/tests/functional/remove_node_test.go +++ b/tests/functional/remove_node_test.go @@ -3,8 +3,10 @@ package test import ( "bytes" "fmt" + "math/rand" "net/http" "os" + "syscall" "testing" "time" @@ -148,3 +150,80 @@ func TestRemoveNode(t *testing.T) { } } } + +func TestRemovePausedNode(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 4 + _, etcds, _ := CreateCluster(clusterSize, procAttr, false) + defer DestroyCluster(etcds) + + time.Sleep(time.Second) + + c := etcd.NewClient(nil) + + c.SyncCluster() + + r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":1, "syncInterval":1}`)) + if !assert.Equal(t, r.StatusCode, 200) { + t.FailNow() + } + time.Sleep(2 * time.Second) + + resp, err := c.Get("_etcd/machines", false, false) + if err != nil { + panic(err) + } + if len(resp.Node.Nodes) != 3 { + t.Fatal("cannot remove peer") + } + + for i := 0; i < clusterSize; i++ { + // first pause the node, then remove it, then resume it + idx := rand.Int() % clusterSize + + etcds[idx].Signal(syscall.SIGSTOP) + fmt.Printf("pause node%d and let standby node take its place\n", idx+1) + time.Sleep(4 * time.Second) + + resp, err := c.Get("_etcd/machines", false, false) + if err != nil { + panic(err) + } + if len(resp.Node.Nodes) != 3 { + t.Fatal("cannot remove peer") + } + for i := 0; i < 3; i++ { + if resp.Node.Nodes[i].Key == fmt.Sprintf("node%d", idx+1) { + t.Fatal("node should be removed") + } + } + + etcds[idx].Signal(syscall.SIGCONT) + // let it change its state to candidate at least + time.Sleep(time.Second) + + stop := make(chan bool) + leaderChan := make(chan string, 1) + all := make(chan bool, 1) + + go Monitor(clusterSize, clusterSize, leaderChan, all, stop) + <-all + <-leaderChan + stop <- true + + resp, err = c.Get("_etcd/machines", false, false) + if err != nil { + panic(err) + } + if len(resp.Node.Nodes) != 3 { + t.Fatalf("add peer fails (%d != 3)", len(resp.Node.Nodes)) + } + for i := 0; i < 3; i++ { + if resp.Node.Nodes[i].Key == fmt.Sprintf("node%d", idx+1) { + t.Fatal("node should be removed") + } + } + } +}