diff --git a/server/demote_command.go b/server/demote_command.go deleted file mode 100644 index 56b4dd6b3..000000000 --- a/server/demote_command.go +++ /dev/null @@ -1,70 +0,0 @@ -package server - -import ( - "fmt" - - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/third_party/github.com/goraft/raft" -) - -func init() { - raft.RegisterCommand(&DemoteCommand{}) -} - -// DemoteCommand represents a command to change a peer to a standby. -type DemoteCommand struct { - Name string `json:"name"` -} - -// CommandName returns the name of the command. -func (c *DemoteCommand) CommandName() string { - return "etcd:demote" -} - -// Apply executes the command. -func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) { - ps, _ := context.Server().Context().(*PeerServer) - - // Ignore this command if there is no peer. - if !ps.registry.PeerExists(c.Name) { - return nil, fmt.Errorf("peer does not exist: %s", c.Name) - } - - // Save URLs. - clientURL, _ := ps.registry.ClientURL(c.Name) - peerURL, _ := ps.registry.PeerURL(c.Name) - - // Remove node from the shared registry. - err := ps.registry.UnregisterPeer(c.Name) - if err != nil { - log.Debugf("Demote peer %s: Error while unregistering (%v)", c.Name, err) - return nil, err - } - - // Delete from stats - delete(ps.followersStats.Followers, c.Name) - - // Remove peer in raft - err = context.Server().RemovePeer(c.Name) - if err != nil { - log.Debugf("Demote peer %s: (%v)", c.Name, err) - return nil, err - } - - // Register node as a standby. - ps.registry.RegisterStandby(c.Name, peerURL, clientURL) - - // Update mode if this change applies to this server. - if c.Name == ps.Config.Name { - log.Infof("Demote peer %s: Set mode to standby with %s", c.Name, ps.server.Leader()) - ps.standbyPeerURL, _ = ps.registry.PeerURL(ps.server.Leader()) - go ps.setMode(StandbyMode) - } - - return nil, nil -} - -// NodeName returns the name of the affected node. -func (c *DemoteCommand) NodeName() string { - return c.Name -} diff --git a/server/join_command.go b/server/join_command.go index 4bd1b2148..25eb58927 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -2,7 +2,6 @@ package server import ( "encoding/binary" - "encoding/json" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" @@ -10,13 +9,12 @@ import ( ) func init() { - raft.RegisterCommand(&JoinCommandV1{}) - raft.RegisterCommand(&JoinCommandV2{}) + raft.RegisterCommand(&JoinCommand{}) } -// JoinCommandV1 represents a request to join the cluster. +// JoinCommand represents a request to join the cluster. // The command returns the join_index (Uvarint). -type JoinCommandV1 struct { +type JoinCommand struct { MinVersion int `json:"minVersion"` MaxVersion int `json:"maxVersion"` Name string `json:"name"` @@ -25,27 +23,30 @@ type JoinCommandV1 struct { } // The name of the join command in the log -func (c *JoinCommandV1) CommandName() string { +func (c *JoinCommand) CommandName() string { return "etcd:join" } -func (c *JoinCommandV1) updatePeerURL(ps *PeerServer) error { - log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL) - if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil { - log.Debugf("Error while updating in registry: %s (%v)", c.Name, err) - return err +// Apply attempts to join a machine to the cluster. +func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { + index, err := applyJoin(c, context) + if err != nil { + return nil, err } - // Flush commit index, so raft will replay to here when restarted - ps.raftServer.FlushCommitIndex() - return nil -} - -// Join a server to the cluster -func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { - ps, _ := context.Server().Context().(*PeerServer) b := make([]byte, 8) - binary.PutUvarint(b, context.CommitIndex()) + binary.PutUvarint(b, index) + return b, nil +} + +func (c *JoinCommand) NodeName() string { + return c.Name +} + +// applyJoin attempts to join a machine to the cluster. +func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) { + ps, _ := context.Server().Context().(*PeerServer) + commitIndex := context.CommitIndex() // Make sure we're not getting a cached value from the registry. ps.registry.Invalidate(c.Name) @@ -56,11 +57,11 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { // update its information. if peerURL != c.RaftURL { log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name) - if err := c.updatePeerURL(ps); err != nil { - return []byte{0}, err + if err := updatePeerURL(c, ps); err != nil { + return 0, err } } - return b, nil + return commitIndex, nil } // Check if the join command adds an instance that collides with existing one on peer URL. @@ -68,21 +69,23 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { for _, peerURL := range peerURLs { if peerURL == c.RaftURL { log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.EtcdURL) - return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex()) + return 0, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex()) } } // Check peer number in the cluster - if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize { + if ps.registry.Count() >= ps.ClusterConfig().ActiveSize { log.Debug("Reject join request from ", c.Name) - return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex()) + return 0, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex()) } // Add to shared peer registry. - ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL) + ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL) // Add peer in raft - err := context.Server().AddPeer(c.Name, "") + if err := context.Server().AddPeer(c.Name, ""); err != nil { + return 0, err + } // Add peer stats if c.Name != ps.RaftServer().Name() { @@ -90,30 +93,12 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) { ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 } - return b, err + return commitIndex, nil } -func (c *JoinCommandV1) NodeName() string { - return c.Name -} - -// JoinCommandV2 represents a request to join the cluster. -type JoinCommandV2 struct { - MinVersion int `json:"minVersion"` - MaxVersion int `json:"maxVersion"` - Name string `json:"name"` - PeerURL string `json:"peerURL"` - ClientURL string `json:"clientURL"` -} - -// CommandName returns the name of the command in the Raft log. -func (c *JoinCommandV2) CommandName() string { - return "etcd:v2:join" -} - -func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error { - log.Debugf("Update peer URL of %v to %v", c.Name, c.PeerURL) - if err := ps.registry.UpdatePeerURL(c.Name, c.PeerURL); err != nil { +func updatePeerURL(c *JoinCommand, ps *PeerServer) error { + log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL) + if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil { log.Debugf("Error while updating in registry: %s (%v)", c.Name, err) return err } @@ -121,76 +106,3 @@ func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error { ps.raftServer.FlushCommitIndex() return nil } - -// Apply attempts to join a machine to the cluster. -func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) { - ps, _ := context.Server().Context().(*PeerServer) - var msg = joinMessageV2{ - Mode: PeerMode, - CommitIndex: context.CommitIndex(), - } - - // Make sure we're not getting a cached value from the registry. - ps.registry.Invalidate(c.Name) - - // Check if the join command is from a previous peer, who lost all its previous log. - if peerURL, ok := ps.registry.PeerURL(c.Name); ok { - // If previous node restarts with different peer URL, - // update its information. - if peerURL != c.PeerURL { - log.Infof("Rejoin with %v instead of %v from %v", c.PeerURL, peerURL, c.Name) - if err := c.updatePeerURL(ps); err != nil { - return []byte{0}, err - } - } - return json.Marshal(msg) - } - - // Check if the join command adds an instance that collides with existing one on peer URL. - peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name) - for _, peerURL := range peerURLs { - if peerURL == c.PeerURL { - log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.PeerURL) - return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.PeerURL, context.CommitIndex()) - } - } - - // Check peer number in the cluster. - if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize { - log.Debug("Join as standby ", c.Name) - ps.registry.RegisterStandby(c.Name, c.PeerURL, c.ClientURL) - msg.Mode = StandbyMode - return json.Marshal(msg) - } - - // Remove it as a standby if it is one. - if ps.registry.StandbyExists(c.Name) { - ps.registry.UnregisterStandby(c.Name) - } - - // Add to shared peer registry. - ps.registry.RegisterPeer(c.Name, c.PeerURL, c.ClientURL) - - // Add peer in raft - if err := context.Server().AddPeer(c.Name, ""); err != nil { - b, _ := json.Marshal(msg) - return b, err - } - - // Add peer stats - if c.Name != ps.RaftServer().Name() { - ps.followersStats.Followers[c.Name] = &raftFollowerStats{} - ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 - } - - return json.Marshal(msg) -} - -func (c *JoinCommandV2) NodeName() string { - return c.Name -} - -type joinMessageV2 struct { - CommitIndex uint64 `json:"commitIndex"` - Mode Mode `json:"mode"` -} diff --git a/server/peer_server.go b/server/peer_server.go index bb096b2fd..9323e96fc 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -2,6 +2,7 @@ package server import ( "bytes" + "encoding/binary" "encoding/json" "fmt" "io/ioutil" @@ -31,8 +32,8 @@ const ( ThresholdMonitorTimeout = 5 * time.Second // ActiveMonitorTimeout is the time between checks on the active size of - // the cluster. If the active size is different than the actual size then - // etcd attempts to promote/demote to bring it to the correct number. + // the cluster. If the active size is bigger than the actual size then + // etcd attempts to demote to bring it to the correct number. ActiveMonitorTimeout = 1 * time.Second // PeerActivityMonitorTimeout is the time between checks for dead nodes in @@ -40,11 +41,6 @@ const ( PeerActivityMonitorTimeout = 1 * time.Second ) -const ( - peerModeFlag = 0 - standbyModeFlag = 1 -) - type PeerServerConfig struct { Name string Scheme string @@ -65,14 +61,10 @@ type PeerServer struct { registry *Registry store store.Store snapConf *snapshotConf - mode Mode closeChan chan bool timeoutThresholdChan chan interface{} - standbyPeerURL string - standbyClientURL string - metrics *metrics.Bucket sync.Mutex } @@ -128,29 +120,6 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) { s.raftServer = raftServer } -// Mode retrieves the current mode of the server. -func (s *PeerServer) Mode() Mode { - return s.mode -} - -// SetMode updates the current mode of the server. -// Switching to a peer mode will start the Raft server. -// Switching to a standby mode will stop the Raft server. -func (s *PeerServer) setMode(mode Mode) { - s.mode = mode - - switch mode { - case PeerMode: - if !s.raftServer.Running() { - s.raftServer.Start() - } - case StandbyMode: - if s.raftServer.Running() { - s.raftServer.Stop() - } - } -} - // ClusterConfig retrieves the current cluster configuration. func (s *PeerServer) ClusterConfig() *ClusterConfig { return s.clusterConfig @@ -326,7 +295,6 @@ func (s *PeerServer) HTTPHandler() http.Handler { router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler) router.HandleFunc("/upgrade", s.UpgradeHttpHandler) router.HandleFunc("/join", s.JoinHttpHandler) - router.HandleFunc("/promote", s.PromoteHttpHandler).Methods("POST") router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler) router.HandleFunc("/vote", s.VoteHttpHandler) router.HandleFunc("/log", s.GetLogHttpHandler) @@ -339,8 +307,6 @@ func (s *PeerServer) HTTPHandler() http.Handler { router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT") router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET") router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET") - router.HandleFunc("/v2/admin/machines/{name}", s.addMachineHttpHandler).Methods("PUT") - router.HandleFunc("/v2/admin/machines/{name}", s.removeMachineHttpHandler).Methods("DELETE") return router } @@ -359,15 +325,14 @@ func (s *PeerServer) startAsLeader() { s.raftServer.Start() // leader need to join self as a peer for { - c := &JoinCommandV1{ + c := &JoinCommand{ MinVersion: store.MinVersion(), MaxVersion: store.MaxVersion(), Name: s.raftServer.Name(), RaftURL: s.Config.URL, EtcdURL: s.server.URL(), } - _, err := s.raftServer.Do(c) - if err == nil { + if _, err := s.raftServer.Do(c); err == nil { break } } @@ -548,16 +513,16 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) } var b bytes.Buffer - c := &JoinCommandV2{ + c := &JoinCommand{ MinVersion: store.MinVersion(), MaxVersion: store.MaxVersion(), Name: server.Name(), - PeerURL: s.Config.URL, - ClientURL: s.server.URL(), + RaftURL: s.Config.URL, + EtcdURL: s.server.URL(), } json.NewEncoder(&b).Encode(c) - joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/v2/admin/machines/" + server.Name()} + joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"} log.Infof("Send Join Request to %s", joinURL.String()) req, _ := http.NewRequest("PUT", joinURL.String(), &b) @@ -572,30 +537,19 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) log.Infof("»»»» %d", resp.StatusCode) if resp.StatusCode == http.StatusOK { - var msg joinMessageV2 - if err := json.NewDecoder(resp.Body).Decode(&msg); err != nil { - log.Debugf("Error reading join response: %v", err) - return err - } - s.joinIndex = msg.CommitIndex - s.setMode(msg.Mode) - - if msg.Mode == StandbyMode { - s.standbyClientURL = resp.Header.Get("X-Leader-Client-URL") - s.standbyPeerURL = resp.Header.Get("X-Leader-Peer-URL") - } - + b, _ := ioutil.ReadAll(resp.Body) + s.joinIndex, _ = binary.Uvarint(b) return nil } if resp.StatusCode == http.StatusTemporaryRedirect { address := resp.Header.Get("Location") log.Debugf("Send Join Request to %s", address) - c := &JoinCommandV2{ + c := &JoinCommand{ MinVersion: store.MinVersion(), MaxVersion: store.MaxVersion(), Name: server.Name(), - PeerURL: s.Config.URL, - ClientURL: s.server.URL(), + RaftURL: s.Config.URL, + EtcdURL: s.server.URL(), } json.NewEncoder(&b).Encode(c) resp, _, err = t.Put(address, &b) @@ -755,49 +709,21 @@ func (s *PeerServer) monitorActiveSize(closeChan chan bool) { // Retrieve target active size and actual active size. activeSize := s.ClusterConfig().ActiveSize - peerCount := s.registry.PeerCount() - standbys := s.registry.Standbys() - peers := s.registry.Peers() + peerCount := s.registry.Count() + peers := s.registry.Names() if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { peers = append(peers[:index], peers[index+1:]...) } - // If we have more active nodes than we should then demote. + // If we have more active nodes than we should then remove. if peerCount > activeSize { peer := peers[rand.Intn(len(peers))] - log.Infof("%s: demoting: %v", s.Config.Name, peer) - if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil { - log.Infof("%s: warning: demotion error: %v", s.Config.Name, err) + log.Infof("%s: removing: %v", s.Config.Name, peer) + if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil { + log.Infof("%s: warning: remove error: %v", s.Config.Name, err) } continue } - - // If we don't have enough active nodes then try to promote a standby. - if peerCount < activeSize && len(standbys) > 0 { - loop: - for _, i := range rand.Perm(len(standbys)) { - standby := standbys[i] - standbyPeerURL, _ := s.registry.StandbyPeerURL(standby) - log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, standby, standbyPeerURL) - - // Notify standby to promote itself. - client := &http.Client{ - Transport: &http.Transport{ - DisableKeepAlives: false, - ResponseHeaderTimeout: ActiveMonitorTimeout, - }, - } - resp, err := client.Post(fmt.Sprintf("%s/promote", standbyPeerURL), "application/json", nil) - if err != nil { - log.Infof("%s: warning: promotion error: %v", s.Config.Name, err) - continue - } else if resp.StatusCode != http.StatusOK { - log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode) - continue - } - break loop - } - } } } @@ -823,8 +749,8 @@ func (s *PeerServer) monitorPeerActivity(closeChan chan bool) { // If the last response from the peer is longer than the promote delay // then automatically demote the peer. if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay { - log.Infof("%s: demoting node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity())) - if _, err := s.raftServer.Do(&DemoteCommand{Name: peer.Name}); err != nil { + log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity())) + if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil { log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err) } continue @@ -832,15 +758,3 @@ func (s *PeerServer) monitorPeerActivity(closeChan chan bool) { } } } - -// Mode represents whether the server is an active peer or if the server is -// simply acting as a standby. -type Mode string - -const ( - // PeerMode is when the server is an active node in Raft. - PeerMode = Mode("peer") - - // StandbyMode is when the server is an inactive, request-forwarding node. - StandbyMode = Mode("standby") -) diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index a207879ce..a8219c7a8 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -3,7 +3,6 @@ package server import ( "encoding/json" "net/http" - "net/url" "strconv" "time" @@ -150,7 +149,7 @@ func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Reques // Response to the join request func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) { - command := &JoinCommandV1{} + command := &JoinCommand{} if err := uhttp.DecodeJsonRequest(req, command); err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -170,25 +169,6 @@ func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) } } -// Attempt to rejoin the cluster as a peer. -func (ps *PeerServer) PromoteHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Infof("%s attempting to promote in cluster: %s", ps.Config.Name, ps.standbyPeerURL) - url, err := url.Parse(ps.standbyPeerURL) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - err = ps.joinByPeer(ps.raftServer, url.Host, ps.Config.Scheme) - if err != nil { - log.Infof("%s error while promoting: %v", ps.Config.Name, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - log.Infof("%s promoted in the cluster", ps.Config.Name) - w.WriteHeader(http.StatusOK) -} - // Response to remove request func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { if req.Method != "DELETE" { @@ -197,7 +177,7 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request } vars := mux.Vars(req) - command := &RemoveCommandV1{ + command := &RemoveCommand{ Name: vars["name"], } @@ -243,10 +223,7 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht // Retrieves a list of peers and standbys. func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) { machines := make([]*machineMessage, 0) - for _, name := range ps.registry.Peers() { - machines = append(machines, ps.getMachineMessage(name)) - } - for _, name := range ps.registry.Standbys() { + for _, name := range ps.registry.Names() { machines = append(machines, ps.getMachineMessage(name)) } json.NewEncoder(w).Encode(&machines) @@ -259,56 +236,17 @@ func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Req } func (ps *PeerServer) getMachineMessage(name string) *machineMessage { - if ps.registry.PeerExists(name) { - clientURL, _ := ps.registry.ClientURL(name) - peerURL, _ := ps.registry.PeerURL(name) - return &machineMessage{ - Name: name, - Mode: PeerMode, - ClientURL: clientURL, - PeerURL: peerURL, - } + if !ps.registry.Exists(name) { + return nil } - if ps.registry.StandbyExists(name) { - clientURL, _ := ps.registry.StandbyClientURL(name) - peerURL, _ := ps.registry.StandbyPeerURL(name) - return &machineMessage{ - Name: name, - Mode: StandbyMode, - ClientURL: clientURL, - PeerURL: peerURL, - } + clientURL, _ := ps.registry.ClientURL(name) + peerURL, _ := ps.registry.PeerURL(name) + return &machineMessage{ + Name: name, + ClientURL: clientURL, + PeerURL: peerURL, } - - return nil -} - -// Adds a machine to the cluster. -func (ps *PeerServer) addMachineHttpHandler(w http.ResponseWriter, req *http.Request) { - c := &JoinCommandV2{} - if err := uhttp.DecodeJsonRequest(req, c); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - log.Debugf("Receive Join Request (v2) from %s", c.Name) - if err := ps.server.Dispatch(c, w, req); err != nil { - if etcdErr, ok := err.(*etcdErr.Error); ok { - log.Debug("Return error: ", (*etcdErr).Error()) - etcdErr.Write(w) - } else { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } -} - -// Removes a machine from the cluster. -func (ps *PeerServer) removeMachineHttpHandler(w http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - c := &RemoveCommandV2{Name: vars["name"]} - log.Debugf("[recv] Remove Request [%s]", c.Name) - ps.server.Dispatch(c, w, req) } // Response to the name request @@ -360,7 +298,6 @@ func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Reques // machineMessage represents information about a peer or standby in the registry. type machineMessage struct { Name string `json:"name"` - Mode Mode `json:"mode"` ClientURL string `json:"clientURL"` PeerURL string `json:"peerURL"` } diff --git a/server/registry.go b/server/registry.go index 813ddef77..459205afe 100644 --- a/server/registry.go +++ b/server/registry.go @@ -14,17 +14,13 @@ import ( ) // The location of the peer URL data. -const RegistryPeerKey = "/_etcd/machines" - -// The location of the standby URL data. -const RegistryStandbyKey = "/_etcd/standbys" +const RegistryKey = "/_etcd/machines" // The Registry stores URL information for nodes. type Registry struct { sync.Mutex - store store.Store - peers map[string]*node - standbys map[string]*node + store store.Store + peers map[string]*node } // The internal storage format of the registry. @@ -37,14 +33,13 @@ type node struct { // Creates a new Registry. func NewRegistry(s store.Store) *Registry { return &Registry{ - store: s, - peers: make(map[string]*node), - standbys: make(map[string]*node), + store: s, + peers: make(map[string]*node), } } -// Peers returns a list of cached peer names. -func (r *Registry) Peers() []string { +// Names returns a list of cached peer names. +func (r *Registry) Names() []string { r.Lock() defer r.Unlock() @@ -56,120 +51,43 @@ func (r *Registry) Peers() []string { return names } -// Standbys returns a list of cached standby names. -func (r *Registry) Standbys() []string { - r.Lock() - defer r.Unlock() - - names := make([]string, 0, len(r.standbys)) - for name := range r.standbys { - names = append(names, name) - } - sort.Sort(sort.StringSlice(names)) - return names -} - -// RegisterPeer adds a peer to the registry. -func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) error { - if err := r.register(RegistryPeerKey, name, peerURL, machURL); err != nil { - return err - } - - r.Lock() - defer r.Unlock() - r.peers[name] = r.load(RegistryPeerKey, name) - return nil -} - -// RegisterStandby adds a standby to the registry. -func (r *Registry) RegisterStandby(name string, peerURL string, machURL string) error { - if err := r.register(RegistryStandbyKey, name, peerURL, machURL); err != nil { - return err - } - - r.Lock() - defer r.Unlock() - r.standbys[name] = r.load(RegistryStandbyKey, name) - return nil -} - -func (r *Registry) register(key, name string, peerURL string, machURL string) error { +// Register adds a peer to the registry. +func (r *Registry) Register(name string, peerURL string, machURL string) error { // Write data to store. v := url.Values{} v.Set("raft", peerURL) v.Set("etcd", machURL) - _, err := r.store.Create(path.Join(key, name), false, v.Encode(), false, store.Permanent) log.Debugf("Register: %s", name) - return err -} + if _, err := r.store.Create(path.Join(RegistryKey, name), false, v.Encode(), false, store.Permanent); err != nil { + return err + } -// UpdatePeerURL updates peer URL in registry -func (r *Registry) UpdatePeerURL(name string, peerURL string) error { r.Lock() defer r.Unlock() - - machURL, _ := r.clientURL(RegistryPeerKey, name) - // Write data to store. - key := path.Join(RegistryPeerKey, name) - v := url.Values{} - v.Set("raft", peerURL) - v.Set("etcd", machURL) - _, err := r.store.Update(key, v.Encode(), store.Permanent) - - // Invalidate outdated cache. - r.invalidate(name) - log.Debugf("Update PeerURL: %s", name) - return err + r.peers[name] = r.load(RegistryKey, name) + return nil } -// UnregisterPeer removes a peer from the registry. -func (r *Registry) UnregisterPeer(name string) error { - return r.unregister(RegistryPeerKey, name) -} - -// UnregisterStandby removes a standby from the registry. -func (r *Registry) UnregisterStandby(name string) error { - return r.unregister(RegistryStandbyKey, name) -} - -func (r *Registry) unregister(key, name string) error { +// Unregister removes a peer from the registry. +func (r *Registry) Unregister(name string) error { // Remove the key from the store. - _, err := r.store.Delete(path.Join(key, name), false, false) log.Debugf("Unregister: %s", name) + _, err := r.store.Delete(path.Join(RegistryKey, name), false, false) return err } -// PeerCount returns the number of peers in the cluster. -func (r *Registry) PeerCount() int { - return r.count(RegistryPeerKey) -} - -// StandbyCount returns the number of standbys in the cluster. -func (r *Registry) StandbyCount() int { - return r.count(RegistryStandbyKey) -} - -// Returns the number of nodes in the cluster. -func (r *Registry) count(key string) int { - e, err := r.store.Get(key, false, false) +// Count returns the number of peers in the cluster. +func (r *Registry) Count() int { + e, err := r.store.Get(RegistryKey, false, false) if err != nil { return 0 } return len(e.Node.Nodes) } -// PeerExists checks if a peer with the given name exists. -func (r *Registry) PeerExists(name string) bool { - return r.exists(RegistryPeerKey, name) -} - -// StandbyExists checks if a standby with the given name exists. -func (r *Registry) StandbyExists(name string) bool { - return r.exists(RegistryStandbyKey, name) -} - -func (r *Registry) exists(key, name string) bool { - e, err := r.store.Get(path.Join(key, name), false, false) +// Exists checks if a peer with the given name exists. +func (r *Registry) Exists(name string) bool { + e, err := r.store.Get(path.Join(RegistryKey, name), false, false) if err != nil { return false } @@ -180,18 +98,18 @@ func (r *Registry) exists(key, name string) bool { func (r *Registry) ClientURL(name string) (string, bool) { r.Lock() defer r.Unlock() - return r.clientURL(RegistryPeerKey, name) + return r.clientURL(RegistryKey, name) } func (r *Registry) clientURL(key, name string) (string, bool) { if r.peers[name] == nil { - if node := r.load(key, name); node != nil { - r.peers[name] = node + if peer := r.load(key, name); peer != nil { + r.peers[name] = peer } } - if node := r.peers[name]; node != nil { - return node.url, true + if peer := r.peers[name]; peer != nil { + return peer.url, true } return "", false @@ -213,69 +131,50 @@ func (r *Registry) PeerHost(name string) (string, bool) { func (r *Registry) PeerURL(name string) (string, bool) { r.Lock() defer r.Unlock() - return r.peerURL(RegistryPeerKey, name) + return r.peerURL(RegistryKey, name) } func (r *Registry) peerURL(key, name string) (string, bool) { if r.peers[name] == nil { - if node := r.load(key, name); node != nil { - r.peers[name] = node + if peer := r.load(key, name); peer != nil { + r.peers[name] = peer } } - if node := r.peers[name]; node != nil { - return node.peerURL, true + if peer := r.peers[name]; peer != nil { + return peer.peerURL, true } return "", false } -// Retrieves the client URL for a given standby by name. -func (r *Registry) StandbyClientURL(name string) (string, bool) { +// UpdatePeerURL updates peer URL in registry +func (r *Registry) UpdatePeerURL(name string, peerURL string) error { + machURL, _ := r.clientURL(RegistryKey, name) + // Write data to store. + v := url.Values{} + v.Set("raft", peerURL) + v.Set("etcd", machURL) + log.Debugf("Update PeerURL: %s", name) + if _, err := r.store.Update(path.Join(RegistryKey, name), v.Encode(), store.Permanent); err != nil { + return err + } + r.Lock() defer r.Unlock() - return r.standbyClientURL(RegistryStandbyKey, name) -} - -func (r *Registry) standbyClientURL(key, name string) (string, bool) { - if r.standbys[name] == nil { - if node := r.load(key, name); node != nil { - r.standbys[name] = node - } - } - if node := r.standbys[name]; node != nil { - return node.url, true - } - return "", false -} - -// Retrieves the peer URL for a given standby by name. -func (r *Registry) StandbyPeerURL(name string) (string, bool) { - r.Lock() - defer r.Unlock() - return r.standbyPeerURL(RegistryStandbyKey, name) -} - -func (r *Registry) standbyPeerURL(key, name string) (string, bool) { - if r.standbys[name] == nil { - if node := r.load(key, name); node != nil { - r.standbys[name] = node - } - } - if node := r.standbys[name]; node != nil { - return node.peerURL, true - } - return "", false + // Invalidate outdated cache. + r.invalidate(name) + return nil } // Retrieves the Client URLs for all nodes. func (r *Registry) ClientURLs(leaderName, selfName string) []string { - return r.urls(RegistryPeerKey, leaderName, selfName, r.clientURL) + return r.urls(RegistryKey, leaderName, selfName, r.clientURL) } // Retrieves the Peer URLs for all nodes. func (r *Registry) PeerURLs(leaderName, selfName string) []string { - return r.urls(RegistryPeerKey, leaderName, selfName, r.peerURL) + return r.urls(RegistryKey, leaderName, selfName, r.peerURL) } // Retrieves the URLs for all nodes using url function. @@ -313,7 +212,6 @@ func (r *Registry) Invalidate(name string) { func (r *Registry) invalidate(name string) { delete(r.peers, name) - delete(r.standbys, name) } // Loads the given node by name from the store into the cache. diff --git a/server/remove_command.go b/server/remove_command.go index 521919acb..2663fbf48 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -2,7 +2,6 @@ package server import ( "encoding/binary" - "encoding/json" "os" "github.com/coreos/etcd/log" @@ -10,107 +9,51 @@ import ( ) func init() { - raft.RegisterCommand(&RemoveCommandV1{}) - raft.RegisterCommand(&RemoveCommandV2{}) + raft.RegisterCommand(&RemoveCommand{}) } -// The RemoveCommandV1 removes a server from the cluster. -type RemoveCommandV1 struct { +// The RemoveCommand removes a server from the cluster. +type RemoveCommand struct { Name string `json:"name"` } // The name of the remove command in the log -func (c *RemoveCommandV1) CommandName() string { +func (c *RemoveCommand) CommandName() string { return "etcd:remove" } // Remove a server from the cluster -func (c *RemoveCommandV1) Apply(context raft.Context) (interface{}, error) { - ps, _ := context.Server().Context().(*PeerServer) - - // If this is a standby then remove it and exit. - if ps.registry.StandbyExists(c.Name) { - return []byte{0}, ps.registry.UnregisterStandby(c.Name) - } - - // Remove node from the shared registry. - err := ps.registry.UnregisterPeer(c.Name) - - // Delete from stats - delete(ps.followersStats.Followers, c.Name) - +func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) { + index, err := applyRemove(c, context) if err != nil { - log.Debugf("Error while unregistering: %s (%v)", c.Name, err) - return []byte{0}, err - } - - // Remove peer in raft - err = context.Server().RemovePeer(c.Name) - if err != nil { - log.Debugf("Unable to remove peer: %s (%v)", c.Name, err) - return []byte{0}, err - } - - if c.Name == context.Server().Name() { - // the removed node is this node - - // if the node is not replaying the previous logs - // and the node has sent out a join request in this - // start. It is sure that this node received a new remove - // command and need to be removed - if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { - log.Debugf("server [%s] is removed", context.Server().Name()) - os.Exit(0) - } else { - // else ignore remove - log.Debugf("ignore previous remove command.") - } + return nil, err } b := make([]byte, 8) - binary.PutUvarint(b, context.CommitIndex()) - - return b, err + binary.PutUvarint(b, index) + return b, nil } -// RemoveCommandV2 represents a command to remove a machine from the server. -type RemoveCommandV2 struct { - Name string `json:"name"` -} - -// CommandName returns the name of the command. -func (c *RemoveCommandV2) CommandName() string { - return "etcd:v2:remove" -} - -// Apply removes the given machine from the cluster. -func (c *RemoveCommandV2) Apply(context raft.Context) (interface{}, error) { +// applyRemove removes the given machine from the cluster. +func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) { ps, _ := context.Server().Context().(*PeerServer) - ret, _ := json.Marshal(removeMessageV2{CommitIndex: context.CommitIndex()}) - - // If this is a standby then remove it and exit. - if ps.registry.StandbyExists(c.Name) { - if err := ps.registry.UnregisterStandby(c.Name); err != nil { - return nil, err - } - return ret, nil - } + commitIndex := context.CommitIndex() // Remove node from the shared registry. - err := ps.registry.UnregisterPeer(c.Name) + err := ps.registry.Unregister(c.Name) // Delete from stats delete(ps.followersStats.Followers, c.Name) if err != nil { log.Debugf("Error while unregistering: %s (%v)", c.Name, err) - return nil, err + return 0, err } // Remove peer in raft if err := context.Server().RemovePeer(c.Name); err != nil { log.Debugf("Unable to remove peer: %s (%v)", c.Name, err) - return nil, err + return 0, err } if c.Name == context.Server().Name() { @@ -128,9 +71,5 @@ func (c *RemoveCommandV2) Apply(context raft.Context) (interface{}, error) { log.Debugf("ignore previous remove command.") } } - return ret, nil -} - -type removeMessageV2 struct { - CommitIndex uint64 `json:"commitIndex"` + return commitIndex, nil } diff --git a/server/server.go b/server/server.go index 946a15f67..653420349 100644 --- a/server/server.go +++ b/server/server.go @@ -176,17 +176,6 @@ func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWrit // Log request. log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr) - // Forward request along if the server is a standby. - if s.peerServer.Mode() == StandbyMode { - if s.peerServer.standbyClientURL == "" { - w.Header().Set("Content-Type", "application/json") - etcdErr.NewError(402, "", 0).Write(w) - return - } - uhttp.Redirect(s.peerServer.standbyClientURL, w, req) - return - } - // Execute handler function and return error if necessary. if err := f(w, req); err != nil { if etcdErr, ok := err.(*etcdErr.Error); ok { @@ -231,9 +220,6 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque return etcdErr.NewError(300, "Empty result from raft", s.Store().Index()) } - w.Header().Set("X-Leader-Client-URL", s.url) - w.Header().Set("X-Leader-Peer-URL", ps.Config.URL) - // response for raft related commands[join/remove] if b, ok := result.([]byte); ok { w.WriteHeader(http.StatusOK) @@ -276,8 +262,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque var url string switch c.(type) { - case *JoinCommandV1, *RemoveCommandV1, - *JoinCommandV2, *RemoveCommandV2, + case *JoinCommand, *RemoveCommand, *SetClusterConfigCommand: url, _ = ps.registry.PeerURL(leader) default: diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index fe42f2c23..cb87216e4 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -132,11 +132,5 @@ func writeHeaders(w http.ResponseWriter, s Server) { w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index())) w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex())) w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term())) - if url, ok := s.ClientURL(s.Leader()); ok { - w.Header().Set("X-Leader-Client-URL", url) - } - if url, ok := s.PeerURL(s.Leader()); ok { - w.Header().Set("X-Leader-Peer-URL", url) - } w.WriteHeader(http.StatusOK) } diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go index 5e22ba08e..273577007 100644 --- a/tests/functional/remove_node_test.go +++ b/tests/functional/remove_node_test.go @@ -25,7 +25,7 @@ func TestRemoveNode(t *testing.T) { c.SyncCluster() - rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/v2/admin/machines/node3", nil) + rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil) client := &http.Client{} for i := 0; i < 2; i++ { diff --git a/tests/functional/proxy_test.go b/tests/functional/standby_test.go similarity index 98% rename from tests/functional/proxy_test.go rename to tests/functional/standby_test.go index 7d0587c76..bba29b7be 100644 --- a/tests/functional/proxy_test.go +++ b/tests/functional/standby_test.go @@ -15,6 +15,8 @@ import ( // Create a full cluster and then add extra an extra standby node. func TestStandby(t *testing.T) { + t.Skip("functionality unimplemented") + clusterSize := 10 // DefaultActiveSize + 1 _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) assert.NoError(t, err) @@ -85,6 +87,8 @@ func TestStandby(t *testing.T) { // Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion. func TestStandbyAutoPromote(t *testing.T) { + t.Skip("functionality unimplemented") + clusterSize := 10 // DefaultActiveSize + 1 _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) if err != nil {