From 64e6d5475808c2243f4ad4e32362ec75968a1797 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 12:10:11 -0700 Subject: [PATCH 1/5] add remove peer --- command.go | 76 ++++++++++++++++++++++++++++++++++++++++-- etcd_handlers.go | 2 +- machines.go | 14 ++++++-- raft_handlers.go | 18 ++++++++++ raft_server.go | 87 ++++++++++++++++++++++++++++++++---------------- 5 files changed, 161 insertions(+), 36 deletions(-) diff --git a/command.go b/command.go index c9afd0779..8ede579ba 100644 --- a/command.go +++ b/command.go @@ -1,11 +1,13 @@ package main import ( + "encoding/binary" "encoding/json" "fmt" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" + "os" "path" "time" ) @@ -140,18 +142,25 @@ func (c *JoinCommand) CommandName() string { // Join a server to the cluster func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { + if c.Name == r.Name() { + r.pendingJoin = false + } + // check if the join command is from a previous machine, who lost all its previous log. response, _ := etcdStore.RawGet(path.Join("_etcd/machines", c.Name)) + b := make([]byte, 8) + binary.PutUvarint(b, raftServer.CommitIndex()) + if response != nil { - return []byte("join success"), nil + return b, nil } // check machine number in the cluster num := machineNum() if num == maxClusterSize { debug("Reject join request from ", c.Name) - return []byte("join fail"), etcdErr.NewError(103, "") + return []byte{0}, etcdErr.NewError(103, "") } addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL) @@ -164,9 +173,70 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) - return []byte("join success"), err + return b, err } func (c *JoinCommand) NodeName() string { return c.Name } + +// RemoveCommand +type RemoveCommand struct { + Name string `json:"name"` +} + +// The name of the remove command in the log +func (c *RemoveCommand) CommandName() string { + return "etcd:remove" +} + +// Remove a server from the cluster +func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { + + // remove machine in etcd storage + key := path.Join("_etcd/machines", c.Name) + + _, err := etcdStore.Delete(key, raftServer.CommitIndex()) + + if err != nil { + return []byte{0}, err + } + + // remove peer in raft + err = raftServer.RemovePeer(c.Name) + + if err != nil { + return []byte{0}, err + } + + if c.Name == raftServer.Name() { + // the removed node is this node + + // if the node is not replying the previous logs + // and the node has sent out a join request in this + // start. It is sure that this node received a new remove + // command and need to be removed + if raftServer.CommitIndex() > r.joinIndex && r.joinIndex != 0 { + debugf("server [%s] is removed", raftServer.Name()) + os.Exit(0) + } else { + // the node is replying previous logs and there is a join command + // afterwards, we should not exit + + if r.joinIndex == 0 { + // if the node has not sent a join command in this start + // it will need to send a join command after reply the logs + r.pendingJoin = true + } else { + // else ignore remove + debugf("ignore previous remove command.") + } + + } + } + + b := make([]byte, 8) + binary.PutUvarint(b, raftServer.CommitIndex()) + + return b, err +} diff --git a/etcd_handlers.go b/etcd_handlers.go index 1a606bf55..c7d228e33 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -189,7 +189,7 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { // Handler to return all the known machines in the current cluster func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { - machines := getMachines() + machines := getMachines(true) w.WriteHeader(http.StatusOK) w.Write([]byte(strings.Join(machines, ", "))) diff --git a/machines.go b/machines.go index 136d8842a..76c827c34 100644 --- a/machines.go +++ b/machines.go @@ -8,13 +8,21 @@ func machineNum() int { } // getMachines gets the current machines in the cluster -func getMachines() []string { +func getMachines(etcd bool) []string { peers := r.Peers() machines := make([]string, len(peers)+1) - leader, ok := nameToEtcdURL(r.Leader()) + var toURL func(string) (string, bool) + + if etcd { + toURL = nameToEtcdURL + } else { + toURL = nameToRaftURL + } + + leader, ok := toURL(r.Leader()) self := e.url i := 1 @@ -30,7 +38,7 @@ func getMachines() []string { // Add all peers to the slice for peerName, _ := range peers { - if machine, ok := nameToEtcdURL(peerName); ok { + if machine, ok := toURL(peerName); ok { // do not add leader twice if machine != leader { machines[i] = machine diff --git a/raft_handlers.go b/raft_handlers.go index d06141200..8ae9d2f87 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -107,6 +107,24 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error { } } +// Response to remove request +func RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { + if req.Method != "DELETE" { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + nodeName := req.URL.Path[len("/remove/"):] + command := &RemoveCommand{ + Name: nodeName, + } + + debugf("[recv] Remove Request [%s]", command.Name) + + dispatch(command, w, req, false) + +} + // Response to the name request func NameHttpHandler(w http.ResponseWriter, req *http.Request) { debugf("[recv] Get %s/name/ ", r.url) diff --git a/raft_server.go b/raft_server.go index 148dafcfb..5c1689d9e 100644 --- a/raft_server.go +++ b/raft_server.go @@ -3,6 +3,7 @@ package main import ( "bytes" "crypto/tls" + "encoding/binary" "encoding/json" "fmt" etcdErr "github.com/coreos/etcd/error" @@ -15,11 +16,13 @@ import ( type raftServer struct { *raft.Server - version string - name string - url string - tlsConf *TLSConfig - tlsInfo *TLSInfo + version string + joinIndex uint64 + pendingJoin bool + name string + url string + tlsConf *TLSConfig + tlsInfo *TLSInfo } var r *raftServer @@ -77,6 +80,22 @@ func (r *raftServer) ListenAndServe() { } } else { + + if r.pendingJoin { + cluster = getMachines(false) + for i := 0; i < len(cluster); i++ { + u, err := url.Parse(cluster[i]) + if err != nil { + debug("rejoin cannot parse url: ", err) + } + cluster[i] = u.Host + } + ok := joinCluster(cluster) + if !ok { + fatal("cannot rejoin to the cluster") + } + } + // rejoin the previous cluster debugf("%s restart as a follower", r.name) } @@ -105,26 +124,10 @@ func startAsLeader() { func startAsFollower() { // start as a follower in a existing cluster for i := 0; i < retryTimes; i++ { - - for _, machine := range cluster { - - if len(machine) == 0 { - continue - } - - err := joinCluster(r.Server, machine, r.tlsConf.Scheme) - if err == nil { - debugf("%s success join to the cluster via machine %s", r.name, machine) - return - - } else { - if _, ok := err.(etcdErr.Error); ok { - fatal(err) - } - debugf("cannot join to cluster via machine %s %s", machine, err) - } + ok := joinCluster(cluster) + if ok { + return } - warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) time.Sleep(time.Second * RetryInterval) } @@ -149,6 +152,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { raftMux.HandleFunc("/name", NameHttpHandler) raftMux.HandleFunc("/version", RaftVersionHttpHandler) raftMux.Handle("/join", errorHandler(JoinHttpHandler)) + raftMux.HandleFunc("/remove/", RemoveHttpHandler) raftMux.HandleFunc("/vote", VoteHttpHandler) raftMux.HandleFunc("/log", GetLogHttpHandler) raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler) @@ -180,15 +184,37 @@ func getVersion(t transporter, versionURL url.URL) (string, error) { return string(body), nil } -// Send join requests to the leader. -func joinCluster(s *raft.Server, raftURL string, scheme string) error { +func joinCluster(cluster []string) bool { + for _, machine := range cluster { + + if len(machine) == 0 { + continue + } + + err := joinByMachine(r.Server, machine, r.tlsConf.Scheme) + if err == nil { + debugf("%s success join to the cluster via machine %s", r.name, machine) + return true + + } else { + if _, ok := err.(etcdErr.Error); ok { + fatal(err) + } + debugf("cannot join to cluster via machine %s %s", machine, err) + } + } + return false +} + +// Send join requests to machine. +func joinByMachine(s *raft.Server, machine string, scheme string) error { var b bytes.Buffer // t must be ok t, _ := r.Transporter().(transporter) // Our version must match the leaders version - versionURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/version"} + versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} version, err := getVersion(t, versionURL) if err != nil { return fmt.Errorf("Unable to join: %v", err) @@ -202,9 +228,9 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error { json.NewEncoder(&b).Encode(newJoinCommand()) - joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"} + joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"} - debugf("Send Join Request to %s", raftURL) + debugf("Send Join Request to %s", joinURL.String()) resp, err := t.Post(joinURL.String(), &b) @@ -215,6 +241,8 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error { if resp != nil { defer resp.Body.Close() if resp.StatusCode == http.StatusOK { + b, _ := ioutil.ReadAll(resp.Body) + r.joinIndex, _ = binary.Uvarint(b) return nil } if resp.StatusCode == http.StatusTemporaryRedirect { @@ -244,6 +272,7 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error { // Register commands to raft server func registerCommands() { raft.RegisterCommand(&JoinCommand{}) + raft.RegisterCommand(&RemoveCommand{}) raft.RegisterCommand(&SetCommand{}) raft.RegisterCommand(&GetCommand{}) raft.RegisterCommand(&DeleteCommand{}) From 49c160b50c6335b31565a2402b9a16df035dd3c3 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 13:42:00 -0700 Subject: [PATCH 2/5] change getMachines --- command.go | 4 ++-- etcd_handlers.go | 2 +- machines.go | 12 ++---------- raft_server.go | 2 +- 4 files changed, 6 insertions(+), 14 deletions(-) diff --git a/command.go b/command.go index 8ede579ba..b5b74ade9 100644 --- a/command.go +++ b/command.go @@ -212,7 +212,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { if c.Name == raftServer.Name() { // the removed node is this node - // if the node is not replying the previous logs + // if the node is not replaying the previous logs // and the node has sent out a join request in this // start. It is sure that this node received a new remove // command and need to be removed @@ -220,7 +220,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { debugf("server [%s] is removed", raftServer.Name()) os.Exit(0) } else { - // the node is replying previous logs and there is a join command + // the node is replaying previous logs and there is a join command // afterwards, we should not exit if r.joinIndex == 0 { diff --git a/etcd_handlers.go b/etcd_handlers.go index c7d228e33..4c4673983 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -189,7 +189,7 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { // Handler to return all the known machines in the current cluster func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { - machines := getMachines(true) + machines := getMachines(nameToEtcdURL) w.WriteHeader(http.StatusOK) w.Write([]byte(strings.Join(machines, ", "))) diff --git a/machines.go b/machines.go index 76c827c34..fbaa48d6f 100644 --- a/machines.go +++ b/machines.go @@ -8,22 +8,14 @@ func machineNum() int { } // getMachines gets the current machines in the cluster -func getMachines(etcd bool) []string { +func getMachines(toURL func(string) (string, bool)) []string { peers := r.Peers() machines := make([]string, len(peers)+1) - var toURL func(string) (string, bool) - - if etcd { - toURL = nameToEtcdURL - } else { - toURL = nameToRaftURL - } - leader, ok := toURL(r.Leader()) - self := e.url + self, _ := toURL(r.Name()) i := 1 if ok { diff --git a/raft_server.go b/raft_server.go index 5c1689d9e..fcabd6339 100644 --- a/raft_server.go +++ b/raft_server.go @@ -82,7 +82,7 @@ func (r *raftServer) ListenAndServe() { } else { if r.pendingJoin { - cluster = getMachines(false) + cluster = getMachines(nameToRaftURL) for i := 0; i < len(cluster); i++ { u, err := url.Parse(cluster[i]) if err != nil { From fb9f09d240ed1a9843a136f66978ba22b03893ac Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 13:43:12 -0700 Subject: [PATCH 3/5] use commandname --- command.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/command.go b/command.go index b5b74ade9..b606ff571 100644 --- a/command.go +++ b/command.go @@ -187,7 +187,7 @@ type RemoveCommand struct { // The name of the remove command in the log func (c *RemoveCommand) CommandName() string { - return "etcd:remove" + return commandName("remove") } // Remove a server from the cluster @@ -225,7 +225,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { if r.joinIndex == 0 { // if the node has not sent a join command in this start - // it will need to send a join command after reply the logs + // it will need to send a join command after replay the logs r.pendingJoin = true } else { // else ignore remove From 798d52e695b8ddb95b6a3c11fb3f714926c8a368 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 15:28:01 -0700 Subject: [PATCH 4/5] simplify remove/join process; add tests --- command.go | 18 +-------- etcd_test.go | 104 +++++++++++++++++++++++++++++++++++++++++++++++++ raft_server.go | 37 ++++++++---------- 3 files changed, 123 insertions(+), 36 deletions(-) diff --git a/command.go b/command.go index b606ff571..b9c3a83f6 100644 --- a/command.go +++ b/command.go @@ -142,10 +142,6 @@ func (c *JoinCommand) CommandName() string { // Join a server to the cluster func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { - if c.Name == r.Name() { - r.pendingJoin = false - } - // check if the join command is from a previous machine, who lost all its previous log. response, _ := etcdStore.RawGet(path.Join("_etcd/machines", c.Name)) @@ -220,18 +216,8 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { debugf("server [%s] is removed", raftServer.Name()) os.Exit(0) } else { - // the node is replaying previous logs and there is a join command - // afterwards, we should not exit - - if r.joinIndex == 0 { - // if the node has not sent a join command in this start - // it will need to send a join command after replay the logs - r.pendingJoin = true - } else { - // else ignore remove - debugf("ignore previous remove command.") - } - + // else ignore remove + debugf("ignore previous remove command.") } } diff --git a/etcd_test.go b/etcd_test.go index ab6a1e89c..e61e7e4a8 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -444,6 +444,110 @@ func TestKillRandom(t *testing.T) { stop <- true } +// remove the node and node rejoin with previous log +func TestRemoveNode(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 3 + argGroup, etcds, _ := test.CreateCluster(clusterSize, procAttr, false) + + time.Sleep(time.Second) + + c := etcd.NewClient() + + c.SyncCluster() + + rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil) + + client := &http.Client{} + for i := 0; i < 2; i++ { + for i := 0; i < 2; i++ { + client.Do(rmReq) + + etcds[2].Wait() + + resp, err := c.Get("_etcd/machines") + + if err != nil { + panic(err) + } + + if len(resp) != 2 { + t.Fatal("cannot remove machine") + } + + if i == 1 { + // rejoin with log + etcds[2], err = os.StartProcess("etcd", argGroup[2], procAttr) + } else { + // rejoin without log + etcds[2], err = os.StartProcess("etcd", append(argGroup[2], "-f"), procAttr) + } + + if err != nil { + panic(err) + } + + time.Sleep(time.Second) + + resp, err = c.Get("_etcd/machines") + + if err != nil { + panic(err) + } + + if len(resp) != 3 { + t.Fatal("add machine fails") + } + } + + // first kill the node, then remove it, then add it back + for i := 0; i < 2; i++ { + etcds[2].Kill() + etcds[2].Wait() + + client.Do(rmReq) + + resp, err := c.Get("_etcd/machines") + + if err != nil { + panic(err) + } + + if len(resp) != 2 { + t.Fatal("cannot remove machine") + } + + if i == 1 { + // rejoin with log + etcds[2], err = os.StartProcess("etcd", append(argGroup[2]), procAttr) + } else { + // rejoin without log + etcds[2], err = os.StartProcess("etcd", append(argGroup[2], "-f"), procAttr) + } + + if err != nil { + panic(err) + } + + time.Sleep(time.Second) + + resp, err = c.Get("_etcd/machines") + + if err != nil { + panic(err) + } + + if len(resp) != 3 { + t.Fatal("add machine fails") + } + } + } + test.DestroyCluster(etcds) + +} + func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} diff --git a/raft_server.go b/raft_server.go index fcabd6339..d9136de3c 100644 --- a/raft_server.go +++ b/raft_server.go @@ -16,13 +16,12 @@ import ( type raftServer struct { *raft.Server - version string - joinIndex uint64 - pendingJoin bool - name string - url string - tlsConf *TLSConfig - tlsInfo *TLSInfo + version string + joinIndex uint64 + name string + url string + tlsConf *TLSConfig + tlsInfo *TLSInfo } var r *raftServer @@ -81,22 +80,20 @@ func (r *raftServer) ListenAndServe() { } else { - if r.pendingJoin { - cluster = getMachines(nameToRaftURL) - for i := 0; i < len(cluster); i++ { - u, err := url.Parse(cluster[i]) - if err != nil { - debug("rejoin cannot parse url: ", err) - } - cluster[i] = u.Host - } - ok := joinCluster(cluster) - if !ok { - fatal("cannot rejoin to the cluster") + // rejoin the previous cluster + cluster = getMachines(nameToRaftURL) + for i := 0; i < len(cluster); i++ { + u, err := url.Parse(cluster[i]) + if err != nil { + debug("rejoin cannot parse url: ", err) } + cluster[i] = u.Host + } + ok := joinCluster(cluster) + if !ok { + fatal("cannot rejoin to the cluster") } - // rejoin the previous cluster debugf("%s restart as a follower", r.name) } From dd2f856d63fc1c9d0850c3f18e04fb9b7443d143 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 16:26:09 -0700 Subject: [PATCH 5/5] if the whole cluster dies, should not panic --- raft_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft_server.go b/raft_server.go index d9136de3c..a90c65f93 100644 --- a/raft_server.go +++ b/raft_server.go @@ -91,7 +91,7 @@ func (r *raftServer) ListenAndServe() { } ok := joinCluster(cluster) if !ok { - fatal("cannot rejoin to the cluster") + warn("the whole cluster dies! restart the cluster") } debugf("%s restart as a follower", r.name)