diff --git a/Documentation/design/proxies.md b/Documentation/design/proxies.md new file mode 100644 index 000000000..46b52bab8 --- /dev/null +++ b/Documentation/design/proxies.md @@ -0,0 +1,74 @@ +## Proxies + +Adding peers in an etcd cluster adds network, CPU, and disk overhead to the leader since each one requires replication. +Peers primarily provide resiliency in the event of a leader failure but the benefit of more failover nodes decreases as the cluster size increases. +A lightweight alternative is the proxy. + +Proxies are a way for an etcd node to forward requests along to the cluster but the proxies are not part of the Raft cluster themselves. +This provides an easier API for local applications while reducing the overhead required by a regular peer node. +Proxies also act as standby nodes in the event that a peer node in the cluster has not recovered after a long duration. + + +## Configuration Parameters + +Proxies require two additional configuration parameters: active size & promotion delay. +The active size specifies a target size for the number of peers in the cluster. +If there are not enough peers to meet the active size then proxies are promoted to peers until the peer count is equal to the active size. +If there are more peers than the target active size then peers are demoted to proxies. + +The promotion delay specifies how long the cluster should wait before removing a dead peer and promoting a proxy. +By default this is 30 minutes. +If a peer is inactive for 30 minutes then the peer is removed and a live proxy is found to take its place. + + +## Logical Workflow + +Start a etcd machine and join the cluster: + +``` +If peer count less than active size: + If machine already exists as a proxy: + Remove machine from proxy list + Join as peer + +If peer count greater than or equal to active size: + Join as proxy +``` + +Remove an existing etcd machine from the cluster: + +``` +If machine exists in peer list: + Remove from peer list + +If machine exists in proxy list: + Remove from proxy list +``` + +Leader's active size monitor: + +``` +Loop: + Sleep 5 seconds + + If peer count less than active size: + If proxy count greater than zero: + Request a random proxy to rejoin + Goto Loop + + If peer count greater than active size: + Demote randomly selected peer + Goto Loop +``` + +Leader's peer activity monitor: + +``` +Loop: + Sleep 5 seconds + + For each peer: + If peer last activity time greater than promote delay: + Demote peer + Goto Loop +``` diff --git a/scripts/test-cluster b/scripts/test-cluster index f2c11fcf1..a0cd26935 100755 --- a/scripts/test-cluster +++ b/scripts/test-cluster @@ -31,7 +31,7 @@ done tmux new-window -t $SESSION:2 -n 'proxy' tmux split-window -h tmux select-pane -t 0 -tmux send-keys "curl -XPUT -H \"Content-Type: application/json\" -d '{\"activeSize\":3, \"promoteDelay\":30}' http://127.0.0.1:7001/config" C-m +tmux send-keys "curl -XPUT -H \"Content-Type: application/json\" -d '{\"activeSize\":3, \"promoteDelay\":30}' http://127.0.0.1:7001/v2/admin/config" C-m for i in 4 5 6; do tmux select-pane -t 0 diff --git a/server/demote_command.go b/server/demote_command.go index c5e9add1e..d13b4dc11 100644 --- a/server/demote_command.go +++ b/server/demote_command.go @@ -34,16 +34,31 @@ func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) { clientURL, _ := ps.registry.ClientURL(c.Name) peerURL, _ := ps.registry.PeerURL(c.Name) - // Perform a removal. - (&RemoveCommand{Name: c.Name}).Apply(context) + // Remove node from the shared registry. + err := ps.registry.UnregisterPeer(c.Name) + if err != nil { + log.Debugf("Demote peer %s: Error while unregistering (%v)", c.Name, err) + return nil, err + } + + // Delete from stats + delete(ps.followersStats.Followers, c.Name) + + // Remove peer in raft + err = context.Server().RemovePeer(c.Name) + if err != nil { + log.Debugf("Demote peer %s: (%v)", c.Name, err) + return nil, err + } // Register node as a proxy. ps.registry.RegisterProxy(c.Name, peerURL, clientURL) // Update mode if this change applies to this server. if c.Name == ps.Config.Name { - log.Infof("Set mode after demotion: %s", c.Name) - ps.setMode(ProxyMode) + log.Infof("Demote peer %s: Set mode to proxy with %s", c.Name, ps.server.Leader()) + ps.proxyPeerURL, _ = ps.registry.PeerURL(ps.server.Leader()) + go ps.setMode(ProxyMode) } return nil, nil diff --git a/server/join_command.go b/server/join_command.go index 44ffc23ac..37be0d132 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -1,27 +1,22 @@ package server import ( - "bytes" "encoding/binary" + "encoding/json" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" "github.com/coreos/etcd/third_party/github.com/coreos/raft" ) func init() { - raft.RegisterCommand(&JoinCommand{}) + raft.RegisterCommand(&JoinCommandV1{}) + raft.RegisterCommand(&JoinCommandV2{}) } -// The JoinCommand adds a node to the cluster. -// -// The command returns the join_index (Uvarint) and peer flag (peer=0, proxy=1) -// in following binary format: -// -// 8 bytes | 1 byte -// join_index | join_mode -// -// This binary protocol is for backward compatibility. -type JoinCommand struct { +// JoinCommandV1 represents a request to join the cluster. +// The command returns the join_index (Uvarint). +type JoinCommandV1 struct { MinVersion int `json:"minVersion"` MaxVersion int `json:"maxVersion"` Name string `json:"name"` @@ -29,50 +24,30 @@ type JoinCommand struct { EtcdURL string `json:"etcdURL"` } -func NewJoinCommand(minVersion int, maxVersion int, name, raftUrl, etcdUrl string) *JoinCommand { - return &JoinCommand{ - MinVersion: minVersion, - MaxVersion: maxVersion, - Name: name, - RaftURL: raftUrl, - EtcdURL: etcdUrl, - } -} - // The name of the join command in the log -func (c *JoinCommand) CommandName() string { +func (c *JoinCommandV1) CommandName() string { return "etcd:join" } // Join a server to the cluster -func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { +func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) - var buf bytes.Buffer b := make([]byte, 8) - n := binary.PutUvarint(b, context.CommitIndex()) - buf.Write(b[:n]) + binary.PutUvarint(b, context.CommitIndex()) // Make sure we're not getting a cached value from the registry. ps.registry.Invalidate(c.Name) // Check if the join command is from a previous peer, who lost all its previous log. if _, ok := ps.registry.ClientURL(c.Name); ok { - binary.Write(&buf, binary.BigEndian, uint8(peerModeFlag)) // Mark as peer. - return buf.Bytes(), nil + return b, nil } // Check peer number in the cluster if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize { - log.Debug("Join as proxy ", c.Name) - ps.registry.RegisterProxy(c.Name, c.RaftURL, c.EtcdURL) - binary.Write(&buf, binary.BigEndian, uint8(proxyModeFlag)) // Mark as proxy. - return buf.Bytes(), nil - } - - // Remove it as a proxy if it is one. - if ps.registry.ProxyExists(c.Name) { - ps.registry.UnregisterProxy(c.Name) + log.Debug("Reject join request from ", c.Name) + return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex()) } // Add to shared peer registry. @@ -87,10 +62,79 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 } - binary.Write(&buf, binary.BigEndian, uint8(peerModeFlag)) // Mark as peer. - return buf.Bytes(), err + return b, err } -func (c *JoinCommand) NodeName() string { +func (c *JoinCommandV1) NodeName() string { return c.Name } + +// JoinCommandV2 represents a request to join the cluster. +type JoinCommandV2 struct { + MinVersion int `json:"minVersion"` + MaxVersion int `json:"maxVersion"` + Name string `json:"name"` + PeerURL string `json:"peerURL"` + ClientURL string `json:"clientURL"` +} + +// CommandName returns the name of the command in the Raft log. +func (c *JoinCommandV2) CommandName() string { + return "etcd:v2:join" +} + +// Apply attempts to join a machine to the cluster. +func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) { + ps, _ := context.Server().Context().(*PeerServer) + var msg = joinMessageV2{ + Mode: PeerMode, + CommitIndex: context.CommitIndex(), + } + + // Make sure we're not getting a cached value from the registry. + ps.registry.Invalidate(c.Name) + + // Check if the join command is from a previous peer, who lost all its previous log. + if _, ok := ps.registry.ClientURL(c.Name); ok { + return json.Marshal(msg) + } + + // Check peer number in the cluster. + if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize { + log.Debug("Join as proxy ", c.Name) + ps.registry.RegisterProxy(c.Name, c.PeerURL, c.ClientURL) + msg.Mode = ProxyMode + return json.Marshal(msg) + } + + // Remove it as a proxy if it is one. + if ps.registry.ProxyExists(c.Name) { + ps.registry.UnregisterProxy(c.Name) + } + + // Add to shared peer registry. + ps.registry.RegisterPeer(c.Name, c.PeerURL, c.ClientURL) + + // Add peer in raft + if err := context.Server().AddPeer(c.Name, ""); err != nil { + b, _ := json.Marshal(msg) + return b, err + } + + // Add peer stats + if c.Name != ps.RaftServer().Name() { + ps.followersStats.Followers[c.Name] = &raftFollowerStats{} + ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 + } + + return json.Marshal(msg) +} + +func (c *JoinCommandV2) NodeName() string { + return c.Name +} + +type joinMessageV2 struct { + CommitIndex uint64 `json:"commitIndex"` + Mode Mode `json:"mode"` +} diff --git a/server/peer_server.go b/server/peer_server.go index 82e39defa..449ca52fc 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -1,12 +1,9 @@ package server import ( - "bufio" "bytes" - "encoding/binary" "encoding/json" "fmt" - "io" "io/ioutil" "math/rand" "net/http" @@ -313,10 +310,6 @@ func (s *PeerServer) HTTPHandler() http.Handler { router.HandleFunc("/join", s.JoinHttpHandler) router.HandleFunc("/promote", s.PromoteHttpHandler).Methods("POST") router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler) - router.HandleFunc("/config", s.getClusterConfigHttpHandler).Methods("GET") - router.HandleFunc("/config", s.setClusterConfigHttpHandler).Methods("PUT") - router.HandleFunc("/machines", s.getMachinesHttpHandler).Methods("GET") - router.HandleFunc("/machines/{name}", s.getMachineHttpHandler).Methods("GET") router.HandleFunc("/vote", s.VoteHttpHandler) router.HandleFunc("/log", s.GetLogHttpHandler) router.HandleFunc("/log/append", s.AppendEntriesHttpHandler) @@ -324,6 +317,13 @@ func (s *PeerServer) HTTPHandler() http.Handler { router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) + router.HandleFunc("/v2/admin/config", s.getClusterConfigHttpHandler).Methods("GET") + router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT") + router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET") + router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET") + router.HandleFunc("/v2/admin/machines/{name}", s.addMachineHttpHandler).Methods("PUT") + router.HandleFunc("/v2/admin/machines/{name}", s.removeMachineHttpHandler).Methods("DELETE") + return router } @@ -340,7 +340,14 @@ func (s *PeerServer) SetServer(server *Server) { func (s *PeerServer) startAsLeader() { // leader need to join self as a peer for { - _, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.Config.URL, s.server.URL())) + c := &JoinCommandV1{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + Name: s.raftServer.Name(), + RaftURL: s.Config.URL, + EtcdURL: s.server.URL(), + } + _, err := s.raftServer.Do(c) if err == nil { break } @@ -429,8 +436,6 @@ func (s *PeerServer) joinCluster(cluster []string) bool { // Send join requests to peer. func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error { - var b bytes.Buffer - // t must be ok t, _ := server.Transporter().(*transporter) @@ -444,14 +449,21 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion()) } - json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL())) - - joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"} + var b bytes.Buffer + c := &JoinCommandV2{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + Name: server.Name(), + PeerURL: s.Config.URL, + ClientURL: s.server.URL(), + } + json.NewEncoder(&b).Encode(c) + joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/v2/admin/machines/" + server.Name()} log.Debugf("Send Join Request to %s", joinURL.String()) - resp, req, err := t.Post(joinURL.String(), &b) - + req, _ := http.NewRequest("PUT", joinURL.String(), &b) + resp, err := t.client.Do(req) for { if err != nil { return fmt.Errorf("Unable to join: %v", err) @@ -462,28 +474,17 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) t.CancelWhenTimeout(req) if resp.StatusCode == http.StatusOK { - r := bufio.NewReader(resp.Body) - s.joinIndex, _ = binary.ReadUvarint(r) - - // Determine whether the server joined as a proxy or peer. - var mode uint64 - if mode, err = binary.ReadUvarint(r); err == io.EOF { - mode = peerModeFlag - } else if err != nil { - log.Debugf("Error reading join mode: %v", err) + var msg joinMessageV2 + if err := json.NewDecoder(resp.Body).Decode(&msg); err != nil { + log.Debugf("Error reading join response: %v", err) return err } + s.joinIndex = msg.CommitIndex + s.setMode(msg.Mode) - switch mode { - case peerModeFlag: - s.setMode(PeerMode) - case proxyModeFlag: - s.setMode(ProxyMode) + if msg.Mode == ProxyMode { s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL") s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL") - default: - log.Debugf("Invalid join mode: %v", err) - return fmt.Errorf("Invalid join mode (%d): %v", mode, err) } return nil @@ -491,7 +492,14 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) if resp.StatusCode == http.StatusTemporaryRedirect { address := resp.Header.Get("Location") log.Debugf("Send Join Request to %s", address) - json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL())) + c := &JoinCommandV1{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + Name: server.Name(), + RaftURL: s.Config.URL, + EtcdURL: s.server.URL(), + } + json.NewEncoder(&b).Encode(c) resp, req, err = t.Post(address, &b) } else if resp.StatusCode == http.StatusBadRequest { diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index bde6f50fd..7932a1a83 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -150,16 +150,14 @@ func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Reques // Response to the join request func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) { - command := &JoinCommand{} - - err := uhttp.DecodeJsonRequest(req, command) - if err != nil { + command := &JoinCommandV1{} + if err := uhttp.DecodeJsonRequest(req, command); err != nil { w.WriteHeader(http.StatusInternalServerError) return } log.Debugf("Receive Join Request from %s", command.Name) - err = ps.server.Dispatch(command, w, req) + err := ps.server.Dispatch(command, w, req) // Return status. if err != nil { @@ -199,7 +197,7 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request } vars := mux.Vars(req) - command := &RemoveCommand{ + command := &RemoveCommandV1{ Name: vars["name"], } @@ -286,6 +284,33 @@ func (ps *PeerServer) getMachineMessage(name string) *machineMessage { return nil } +// Adds a machine to the cluster. +func (ps *PeerServer) addMachineHttpHandler(w http.ResponseWriter, req *http.Request) { + c := &JoinCommandV2{} + if err := uhttp.DecodeJsonRequest(req, c); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + log.Debugf("Receive Join Request (v2) from %s", c.Name) + if err := ps.server.Dispatch(c, w, req); err != nil { + if etcdErr, ok := err.(*etcdErr.Error); ok { + log.Debug("Return error: ", (*etcdErr).Error()) + etcdErr.Write(w) + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } +} + +// Removes a machine from the cluster. +func (ps *PeerServer) removeMachineHttpHandler(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + c := &RemoveCommandV2{Name: vars["name"]} + log.Debugf("[recv] Remove Request [%s]", c.Name) + ps.server.Dispatch(c, w, req) +} + // Response to the name request func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { log.Debugf("[recv] Get %s/name/ ", ps.Config.URL) diff --git a/server/registry.go b/server/registry.go index 9c5f02570..54fd884f7 100644 --- a/server/registry.go +++ b/server/registry.go @@ -65,13 +65,15 @@ func (r *Registry) Proxies() []string { // RegisterPeer adds a peer to the registry. func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) error { - // TODO(benbjohnson): Disallow peers that are already proxies. - return r.register(RegistryPeerKey, name, peerURL, machURL) + if err := r.register(RegistryPeerKey, name, peerURL, machURL); err != nil { + return err + } + r.peers[name] = r.load(RegistryPeerKey, name) + return nil } // RegisterProxy adds a proxy to the registry. func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) error { - // TODO(benbjohnson): Disallow proxies that are already peers. if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil { return err } diff --git a/server/remove_command.go b/server/remove_command.go index 3019f9d9d..6f4ecea5a 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -2,6 +2,7 @@ package server import ( "encoding/binary" + "encoding/json" "os" "github.com/coreos/etcd/log" @@ -9,21 +10,22 @@ import ( ) func init() { - raft.RegisterCommand(&RemoveCommand{}) + raft.RegisterCommand(&RemoveCommandV1{}) + raft.RegisterCommand(&RemoveCommandV2{}) } -// The RemoveCommand removes a server from the cluster. -type RemoveCommand struct { +// The RemoveCommandV1 removes a server from the cluster. +type RemoveCommandV1 struct { Name string `json:"name"` } // The name of the remove command in the log -func (c *RemoveCommand) CommandName() string { +func (c *RemoveCommandV1) CommandName() string { return "etcd:remove" } // Remove a server from the cluster -func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) { +func (c *RemoveCommandV1) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) // If this is a proxy then remove it and exit. @@ -70,3 +72,65 @@ func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) { return b, err } + +// RemoveCommandV2 represents a command to remove a machine from the server. +type RemoveCommandV2 struct { + Name string `json:"name"` +} + +// CommandName returns the name of the command. +func (c *RemoveCommandV2) CommandName() string { + return "etcd:v2:remove" +} + +// Apply removes the given machine from the cluster. +func (c *RemoveCommandV2) Apply(context raft.Context) (interface{}, error) { + ps, _ := context.Server().Context().(*PeerServer) + ret, _ := json.Marshal(removeMessageV2{CommitIndex: context.CommitIndex()}) + + // If this is a proxy then remove it and exit. + if ps.registry.ProxyExists(c.Name) { + if err := ps.registry.UnregisterProxy(c.Name); err != nil { + return nil, err + } + return ret, nil + } + + // Remove node from the shared registry. + err := ps.registry.UnregisterPeer(c.Name) + + // Delete from stats + delete(ps.followersStats.Followers, c.Name) + + if err != nil { + log.Debugf("Error while unregistering: %s (%v)", c.Name, err) + return nil, err + } + + // Remove peer in raft + if err := context.Server().RemovePeer(c.Name); err != nil { + log.Debugf("Unable to remove peer: %s (%v)", c.Name, err) + return nil, err + } + + if c.Name == context.Server().Name() { + // the removed node is this node + + // if the node is not replaying the previous logs + // and the node has sent out a join request in this + // start. It is sure that this node received a new remove + // command and need to be removed + if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { + log.Debugf("server [%s] is removed", context.Server().Name()) + os.Exit(0) + } else { + // else ignore remove + log.Debugf("ignore previous remove command.") + } + } + return ret, nil +} + +type removeMessageV2 struct { + CommitIndex uint64 `json:"commitIndex"` +} diff --git a/server/server.go b/server/server.go index 51c0a17a0..83c133bad 100644 --- a/server/server.go +++ b/server/server.go @@ -262,7 +262,9 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque var url string switch c.(type) { - case *JoinCommand, *RemoveCommand: + case *JoinCommandV1, *RemoveCommandV1: + url, _ = ps.registry.PeerURL(leader) + case *JoinCommandV2, *RemoveCommandV2: url, _ = ps.registry.PeerURL(leader) default: url, _ = ps.registry.ClientURL(leader) diff --git a/tests/functional/cluster_config_test.go b/tests/functional/cluster_config_test.go index 8f8b667f0..c75ce1de4 100644 --- a/tests/functional/cluster_config_test.go +++ b/tests/functional/cluster_config_test.go @@ -16,12 +16,12 @@ func TestClusterConfig(t *testing.T) { assert.NoError(t, err) defer DestroyCluster(etcds) - resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "promoteDelay":60}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "promoteDelay":60}`)) assert.Equal(t, resp.StatusCode, 200) time.Sleep(1 * time.Second) - resp, _ = tests.Get("http://localhost:7002/config") + resp, _ = tests.Get("http://localhost:7002/v2/admin/config") body := tests.ReadBodyJSON(resp) assert.Equal(t, resp.StatusCode, 200) assert.Equal(t, body["activeSize"], 3) diff --git a/tests/functional/proxy_test.go b/tests/functional/proxy_test.go index 6c9d52e0b..665862aad 100644 --- a/tests/functional/proxy_test.go +++ b/tests/functional/proxy_test.go @@ -51,7 +51,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, len(result.Node.Nodes), 1) // Reconfigure with larger active size (10 nodes) and wait for promotion. - resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } @@ -64,7 +64,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, len(result.Node.Nodes), 0) // Reconfigure with a smaller active size (8 nodes). - resp, _ = tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "promoteDelay":1800}`)) + resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "promoteDelay":1800}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } @@ -107,7 +107,7 @@ func TestProxyAutoPromote(t *testing.T) { assert.Equal(t, len(result.Node.Nodes), 1) // Reconfigure with a short promote delay (2 second). - resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":9, "promoteDelay":2}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":9, "promoteDelay":2}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go index 273577007..5e22ba08e 100644 --- a/tests/functional/remove_node_test.go +++ b/tests/functional/remove_node_test.go @@ -25,7 +25,7 @@ func TestRemoveNode(t *testing.T) { c.SyncCluster() - rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil) + rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/v2/admin/machines/node3", nil) client := &http.Client{} for i := 0; i < 2; i++ { diff --git a/third_party/github.com/coreos/raft/server.go b/third_party/github.com/coreos/raft/server.go index 5fc3205d9..8cef1341a 100644 --- a/third_party/github.com/coreos/raft/server.go +++ b/third_party/github.com/coreos/raft/server.go @@ -477,6 +477,7 @@ func (s *server) Stop() { // make sure the server has stopped before we close the log <-s.stopped s.log.close() + s.state = Stopped } // Checks if the server is currently running.