From 1d961b8e56c939dfb4c5b27fa19775a5649f190a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 18 Feb 2014 13:29:18 -0700 Subject: [PATCH] Add proxy mode. --- error/error.go | 7 ++ server/cluster_config.go | 33 +++++ server/join_command.go | 22 ++-- server/peer_server.go | 134 +++++++++++++++++++- server/peer_server_handlers.go | 17 +++ server/promote_command.go | 64 ++++++++++ server/registry.go | 180 +++++++++++++++++++++------ server/remove_command.go | 7 +- server/server.go | 49 +++++--- server/set_cluster_config_command.go | 25 ++++ server/v2/get_handler.go | 6 + tests/functional/proxy_test.go | 45 +++++++ tests/functional/util.go | 2 +- 13 files changed, 522 insertions(+), 69 deletions(-) create mode 100644 server/cluster_config.go create mode 100644 server/promote_command.go create mode 100644 server/set_cluster_config_command.go create mode 100644 tests/functional/proxy_test.go diff --git a/error/error.go b/error/error.go index cc86c9117..447a5f7c3 100644 --- a/error/error.go +++ b/error/error.go @@ -51,6 +51,10 @@ const ( EcodeWatcherCleared = 400 EcodeEventIndexCleared = 401 + EcodeProxyInternal = 402 + EcodeInvalidActiveSize = 403 + EcodeInvalidPromoteDelay = 404 + EcodePromoteError = 405 ) func init() { @@ -86,6 +90,9 @@ func init() { // etcd related errors errors[EcodeWatcherCleared] = "watcher is cleared due to etcd recovery" errors[EcodeEventIndexCleared] = "The event in requested index is outdated and cleared" + errors[EcodeProxyInternal] = "Proxy Internal Error" + errors[EcodeInvalidActiveSize] = "Invalid active size" + errors[EcodeInvalidPromoteDelay] = "Proxy promote delay" } diff --git a/server/cluster_config.go b/server/cluster_config.go new file mode 100644 index 000000000..807cfa7dd --- /dev/null +++ b/server/cluster_config.go @@ -0,0 +1,33 @@ +package server + +import ( + "time" +) + +const ( + // DefaultActiveSize is the default number of active followers allowed. + DefaultActiveSize = 9 + + // DefaultPromoteDelay is the default elapsed time before promotion. + DefaultPromoteDelay = int((30 * time.Minute) / time.Second) +) + +// ClusterConfig represents cluster-wide configuration settings. +// These settings can only be changed through Raft. +type ClusterConfig struct { + // ActiveSize is the maximum number of node that can join as Raft followers. + // Nodes that join the cluster after the limit is reached are proxies. + ActiveSize int `json:"activeSize"` + + // PromoteDelay is the amount of time, in seconds, after a node is + // unreachable that it will be swapped out for a proxy node, if available. + PromoteDelay int `json:"PromoteDelay"` +} + +// NewClusterConfig returns a cluster configuration with default settings. +func NewClusterConfig() *ClusterConfig { + return &ClusterConfig{ + ActiveSize: DefaultActiveSize, + PromoteDelay: DefaultPromoteDelay, + } +} diff --git a/server/join_command.go b/server/join_command.go index de65db9eb..567e6e182 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -1,9 +1,9 @@ package server import ( + "bytes" "encoding/binary" - etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" "github.com/coreos/etcd/third_party/github.com/coreos/raft" ) @@ -40,25 +40,30 @@ func (c *JoinCommand) CommandName() string { func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) + var buf bytes.Buffer b := make([]byte, 8) - binary.PutUvarint(b, context.CommitIndex()) + n := binary.PutUvarint(b, context.CommitIndex()) + buf.Write(b[:n]) // Make sure we're not getting a cached value from the registry. ps.registry.Invalidate(c.Name) // Check if the join command is from a previous peer, who lost all its previous log. if _, ok := ps.registry.ClientURL(c.Name); ok { - return b, nil + binary.Write(&buf, binary.BigEndian, uint8(0)) // Mark as peer. + return buf.Bytes(), nil } // Check peer number in the cluster - if ps.registry.Count() == ps.Config.MaxClusterSize { - log.Debug("Reject join request from ", c.Name) - return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex()) + if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize { + log.Debug("Join as proxy ", c.Name) + ps.registry.RegisterProxy(c.Name, c.RaftURL, c.EtcdURL) + binary.Write(&buf, binary.BigEndian, uint8(1)) // Mark as proxy. + return buf.Bytes(), nil } // Add to shared peer registry. - ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL) + ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL) // Add peer in raft err := context.Server().AddPeer(c.Name, "") @@ -69,7 +74,8 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 } - return b, err + binary.Write(&buf, binary.BigEndian, uint8(0)) // Mark as peer. + return buf.Bytes(), err } func (c *JoinCommand) NodeName() string { diff --git a/server/peer_server.go b/server/peer_server.go index 422733c3b..c0783e9d5 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -2,12 +2,16 @@ package server import ( "bytes" + "bufio" "encoding/binary" "encoding/json" "fmt" + "io" "io/ioutil" + "math/rand" "net/http" "net/url" + "sort" "strconv" "time" @@ -22,19 +26,20 @@ import ( ) const ThresholdMonitorTimeout = 5 * time.Second +const ActiveMonitorTimeout = 5 * time.Second type PeerServerConfig struct { Name string Scheme string URL string SnapshotCount int - MaxClusterSize int RetryTimes int RetryInterval float64 } type PeerServer struct { Config PeerServerConfig + clusterConfig *ClusterConfig raftServer raft.Server server *Server joinIndex uint64 @@ -43,10 +48,14 @@ type PeerServer struct { registry *Registry store store.Store snapConf *snapshotConf + mode Mode closeChan chan bool timeoutThresholdChan chan interface{} + proxyPeerURL string + proxyClientURL string + metrics *metrics.Bucket } @@ -66,6 +75,7 @@ type snapshotConf struct { func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { s := &PeerServer{ Config: psConfig, + clusterConfig: NewClusterConfig(), registry: registry, store: store, followersStats: followersStats, @@ -100,6 +110,50 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) { s.raftServer = raftServer } +// Mode retrieves the current mode of the server. +func (s *PeerServer) Mode() Mode { + return s.mode +} + +// SetMode updates the current mode of the server. +// Switching to a peer mode will start the Raft server. +// Switching to a proxy mode will stop the Raft server. +func (s *PeerServer) SetMode(mode Mode) { + s.mode = mode + + switch mode { + case PeerMode: + if s.raftServer.Running() { + s.raftServer.Start() + } + case ProxyMode: + if !s.raftServer.Running() { + s.raftServer.Stop() + } + } +} + +// ClusterConfig retrieves the current cluster configuration. +func (s *PeerServer) ClusterConfig() *ClusterConfig { + return s.clusterConfig +} + +// SetClusterConfig updates the current cluster configuration. +// Adjusting the active size will +func (s *PeerServer) SetClusterConfig(c *ClusterConfig) error { + prevActiveSize := s.clusterConfig.ActiveSize + s.clusterConfig = c + + // Validate configuration. + if c.ActiveSize < 1 { + return etcdErr.NewError(etcdErr.EcodeInvalidActiveSize, "Post", 0) + } else if c.PromoteDelay < 0 { + return etcdErr.NewError(etcdErr.EcodeInvalidPromoteDelay, "Post", 0) + } + + return nil +} + // Helper function to do discovery and return results in expected format func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL) @@ -213,6 +267,7 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er go s.monitorSync() go s.monitorTimeoutThreshold(s.closeChan) + go s.monitorActive(s.closeChan) // open the snapshot if snapshot { @@ -240,6 +295,8 @@ func (s *PeerServer) HTTPHandler() http.Handler { router.HandleFunc("/upgrade", s.UpgradeHttpHandler) router.HandleFunc("/join", s.JoinHttpHandler) router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler) + router.HandleFunc("/config", s.getClusterConfigHttpHandler).Methods("GET") + router.HandleFunc("/config", s.setClusterConfigHttpHandler).Methods("POST") router.HandleFunc("/vote", s.VoteHttpHandler) router.HandleFunc("/log", s.GetLogHttpHandler) router.HandleFunc("/log/append", s.AppendEntriesHttpHandler) @@ -385,8 +442,30 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) t.CancelWhenTimeout(req) if resp.StatusCode == http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) - s.joinIndex, _ = binary.Uvarint(b) + r := bufio.NewReader(resp.Body) + s.joinIndex, _ = binary.ReadUvarint(r) + + // Determine whether the server joined as a proxy or peer. + var mode uint64 + if mode, err = binary.ReadUvarint(r); err == io.EOF { + mode = 0 + } else if err != nil { + log.Debugf("Error reading join mode: %v", err) + return err + } + + switch mode { + case 0: + s.SetMode(PeerMode) + case 1: + s.SetMode(ProxyMode) + s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL") + s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL") + default: + log.Debugf("Invalid join mode: %v", err) + return fmt.Errorf("Invalid join mode (%d): %v", mode, err) + } + return nil } if resp.StatusCode == http.StatusTemporaryRedirect { @@ -532,3 +611,52 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) { time.Sleep(ThresholdMonitorTimeout) } } + +// monitorActive periodically checks the status of cluster nodes and swaps them +// out for proxies as needed. +func (s *PeerServer) monitorActive(closeChan chan bool) { + for { + select { + case <- time.After(ActiveMonitorTimeout): + case <-closeChan: + return + } + + // Ignore while this peer is not a leader. + if s.raftServer.State() != raft.Leader { + continue + } + + // Retrieve target active size and actual active size. + activeSize := s.ClusterConfig().ActiveSize + peerCount := s.registry.PeerCount() + proxies := s.registry.Proxies() + peers := s.registry.Peers() + if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { + peers = append(peers[:index], peers[index+1:]...) + } + + // If we have more active nodes than we should then demote. + if peerCount > activeSize { + peer := peers[rand.Intn(len(peers))] + if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil { + log.Infof("%s: warning: demotion error: %v", s.Config.Name, err) + } + continue + } + } +} + + +// Mode represents whether the server is an active peer or if the server is +// simply acting as a proxy. +type Mode string + +const ( + // PeerMode is when the server is an active node in Raft. + PeerMode = Mode("peer") + + // ProxyMode is when the server is an inactive, request-forwarding node. + ProxyMode = Mode("proxy") +) + diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index 6b60e2d5e..96abbd0be 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -188,6 +188,23 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request ps.server.Dispatch(command, w, req) } +// Returns a JSON-encoded cluster configuration. +func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) { + json.NewEncoder(w).Encode(&ps.clusterConfig) +} + +// Updates the cluster configuration. +func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) { + c := &SetClusterConfigCommand{Config:&ClusterConfig{}} + if err := json.NewDecoder(req.Body).Decode(&c.Config); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + log.Debugf("[recv] Update Cluster Config Request") + ps.server.Dispatch(c, w, req) +} + // Response to the name request func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { log.Debugf("[recv] Get %s/name/ ", ps.Config.URL) diff --git a/server/promote_command.go b/server/promote_command.go new file mode 100644 index 000000000..9558abcdb --- /dev/null +++ b/server/promote_command.go @@ -0,0 +1,64 @@ +package server + +import ( + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/third_party/github.com/coreos/raft" +) + +func init() { + raft.RegisterCommand(&PromoteCommand{}) +} + +// PromoteCommand represents a Raft command for converting a proxy to a peer. +type PromoteCommand struct { + Name string `json:"name"` +} + +// CommandName returns the name of the command. +func (c *PromoteCommand) CommandName() string { + return "etcd:promote" +} + +// Apply promotes a named proxy to a peer. +func (c *PromoteCommand) Apply(context raft.Context) (interface{}, error) { + ps, _ := context.Server().Context().(*PeerServer) + config := ps.ClusterConfig() + + // If cluster size is larger than max cluster size then return an error. + if ps.registry.PeerCount() >= config.ActiveSize { + return etcdErr.NewError(etcdErr.EcodePromoteError, "", 0) + } + + // If proxy doesn't exist then return an error. + if !ps.registry.ProxyExists(c.Name) { + return etcdErr.NewError(etcdErr.EcodePromoteError, "", 0) + } + + // Retrieve proxy settings. + proxyClientURL := ps.registry.ProxyClientURL() + proxyPeerURL := ps.registry.ProxyPeerURL() + + // Remove from registry as a proxy. + if err := ps.registry.UnregisterProxy(c.Name); err != nil { + log.Info("Cannot remove proxy: ", c.Name) + return nil, err + } + + // Add to shared peer registry. + ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL) + + // Add peer in raft + err := context.Server().AddPeer(c.Name, "") + + // Add peer stats + if c.Name != ps.RaftServer().Name() { + ps.followersStats.Followers[c.Name] = &raftFollowerStats{} + ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 + } + + return nil, err +} + +func (c *JoinCommand) NodeName() string { + return c.Name +} diff --git a/server/registry.go b/server/registry.go index e1e99edee..1d6413cfe 100644 --- a/server/registry.go +++ b/server/registry.go @@ -13,13 +13,17 @@ import ( ) // The location of the peer URL data. -const RegistryKey = "/_etcd/machines" +const RegistryPeerKey = "/_etcd/machines" + +// The location of the proxy URL data. +const RegistryProxyKey = "/_etcd/proxies" // The Registry stores URL information for nodes. type Registry struct { sync.Mutex store store.Store - nodes map[string]*node + peers map[string]*node + proxies map[string]*node } // The internal storage format of the registry. @@ -33,61 +37,126 @@ type node struct { func NewRegistry(s store.Store) *Registry { return &Registry{ store: s, - nodes: make(map[string]*node), + peers: make(map[string]*node), + proxies: make(map[string]*node), } } -// Adds a node to the registry. -func (r *Registry) Register(name string, peerURL string, machURL string) error { +// Peers returns a list of peer names. +func (r *Registry) Peers() []string { + names := make([]string, 0, len(r.peers)) + for name, _ := range r.peers { + names = append(names, name) + } + return names +} + +// Proxies returns a list of proxy names. +func (r *Registry) Proxies() []string { + names := make([]string, 0, len(r.proxies)) + for name, _ := range r.proxies { + names = append(names, name) + } + return names +} + + +// RegisterPeer adds a peer to the registry. +func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) error { + // TODO(benbjohnson): Disallow peers that are already proxies. + return r.register(RegistryPeerKey, name, peerURL, machURL) +} + +// RegisterProxy adds a proxy to the registry. +func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) error { + // TODO(benbjohnson): Disallow proxies that are already peers. + return r.register(RegistryProxyKey, name, peerURL, machURL) +} + +func (r *Registry) register(key, name string, peerURL string, machURL string) error { r.Lock() defer r.Unlock() // Write data to store. - key := path.Join(RegistryKey, name) v := url.Values{} v.Set("raft", peerURL) v.Set("etcd", machURL) - _, err := r.store.Create(key, false, v.Encode(), false, store.Permanent) + _, err := r.store.Create(path.Join(key, name), false, v.Encode(), false, store.Permanent) log.Debugf("Register: %s", name) return err } -// Removes a node from the registry. -func (r *Registry) Unregister(name string) error { +// UnregisterPeer removes a peer from the registry. +func (r *Registry) UnregisterPeer(name string) error { + return r.unregister(RegistryPeerKey, name) +} + +// UnregisterProxy removes a proxy from the registry. +func (r *Registry) UnregisterProxy(name string) error { + return r.unregister(RegistryProxyKey, name) +} + +func (r *Registry) unregister(key, name string) error { r.Lock() defer r.Unlock() - // Remove from cache. - // delete(r.nodes, name) - // Remove the key from the store. - _, err := r.store.Delete(path.Join(RegistryKey, name), false, false) + _, err := r.store.Delete(path.Join(key, name), false, false) log.Debugf("Unregister: %s", name) return err } +// PeerCount returns the number of peers in the cluster. +func (r *Registry) PeerCount() int { + return r.count(RegistryPeerKey) +} + +// ProxyCount returns the number of proxies in the cluster. +func (r *Registry) ProxyCount() int { + return r.count(RegistryProxyKey) +} + // Returns the number of nodes in the cluster. -func (r *Registry) Count() int { - e, err := r.store.Get(RegistryKey, false, false) +func (r *Registry) count(key string) int { + e, err := r.store.Get(key, false, false) if err != nil { return 0 } return len(e.Node.Nodes) } +// PeerExists checks if a peer with the given name exists. +func (r *Registry) PeerExists(name string) bool { + return r.exists(RegistryPeerKey, name) +} + +// ProxyExists checks if a proxy with the given name exists. +func (r *Registry) ProxyExists(name string) bool { + return r.exists(RegistryProxyKey, name) +} + +func (r *Registry) exists(key, name string) bool { + e, err := r.store.Get(path.Join(key, name), false, false) + if err != nil { + return false + } + return (e.Node != nil) +} + + // Retrieves the client URL for a given node by name. func (r *Registry) ClientURL(name string) (string, bool) { r.Lock() defer r.Unlock() - return r.clientURL(name) + return r.clientURL(RegistryPeerKey, name) } -func (r *Registry) clientURL(name string) (string, bool) { - if r.nodes[name] == nil { - r.load(name) +func (r *Registry) clientURL(key, name string) (string, bool) { + if r.peers[name] == nil { + r.peers[name] = r.load(key, name) } - if node := r.nodes[name]; node != nil { + if node := r.peers[name]; node != nil { return node.url, true } @@ -110,73 +179,108 @@ func (r *Registry) PeerHost(name string) (string, bool) { func (r *Registry) PeerURL(name string) (string, bool) { r.Lock() defer r.Unlock() - return r.peerURL(name) + return r.peerURL(RegistryPeerKey,name) } -func (r *Registry) peerURL(name string) (string, bool) { - if r.nodes[name] == nil { - r.load(name) +func (r *Registry) peerURL(key, name string) (string, bool) { + if r.peers[name] == nil { + r.peers[name] = r.load(key, name) } - if node := r.nodes[name]; node != nil { + if node := r.peers[name]; node != nil { return node.peerURL, true } return "", false } +// Retrieves the client URL for a given proxy by name. +func (r *Registry) ProxyClientURL(name string) (string, bool) { + r.Lock() + defer r.Unlock() + return r.proxyClientURL(RegistryProxyKey, name) +} + +func (r *Registry) proxyClientURL(key, name string) (string, bool) { + if r.proxies[name] == nil { + r.proxies[name] = r.load(key, name) + } + if node := r.proxies[name]; node != nil { + return node.url, true + } + return "", false +} + +// Retrieves the peer URL for a given proxy by name. +func (r *Registry) ProxyPeerURL(name string) (string, bool) { + r.Lock() + defer r.Unlock() + return r.proxyPeerURL(RegistryProxyKey,name) +} + +func (r *Registry) proxyPeerURL(key, name string) (string, bool) { + if r.proxies[name] == nil { + r.proxies[name] = r.load(key, name) + } + if node := r.proxies[name]; node != nil { + return node.peerURL, true + } + return "", false +} + // Retrieves the Client URLs for all nodes. func (r *Registry) ClientURLs(leaderName, selfName string) []string { - return r.urls(leaderName, selfName, r.clientURL) + return r.urls(RegistryPeerKey, leaderName, selfName, r.clientURL) } // Retrieves the Peer URLs for all nodes. func (r *Registry) PeerURLs(leaderName, selfName string) []string { - return r.urls(leaderName, selfName, r.peerURL) + return r.urls(RegistryPeerKey, leaderName, selfName, r.peerURL) } // Retrieves the URLs for all nodes using url function. -func (r *Registry) urls(leaderName, selfName string, url func(name string) (string, bool)) []string { +func (r *Registry) urls(key, leaderName, selfName string, url func(key, name string) (string, bool)) []string { r.Lock() defer r.Unlock() // Build list including the leader and self. urls := make([]string, 0) - if url, _ := url(leaderName); len(url) > 0 { + if url, _ := url(key, leaderName); len(url) > 0 { urls = append(urls, url) } // Retrieve a list of all nodes. - if e, _ := r.store.Get(RegistryKey, false, false); e != nil { + if e, _ := r.store.Get(key, false, false); e != nil { // Lookup the URL for each one. for _, pair := range e.Node.Nodes { _, name := filepath.Split(pair.Key) - if url, _ := url(name); len(url) > 0 && name != leaderName { + if url, _ := url(key, name); len(url) > 0 && name != leaderName { urls = append(urls, url) } } } - log.Infof("URLs: %s / %s (%s)", leaderName, selfName, strings.Join(urls, ",")) + log.Infof("URLs: %s / %s (%s)", key, leaderName, selfName, strings.Join(urls, ",")) return urls } // Removes a node from the cache. func (r *Registry) Invalidate(name string) { - delete(r.nodes, name) + delete(r.peers, name) + delete(r.proxies, name) } // Loads the given node by name from the store into the cache. -func (r *Registry) load(name string) { +func (r *Registry) load(key, name string) *node { if name == "" { - return + return nil } // Retrieve from store. - e, err := r.store.Get(path.Join(RegistryKey, name), false, false) + e, err := r.store.Get(path.Join(RegistryPeerKey, name), false, false) if err != nil { - return + return nil } // Parse as a query string. @@ -186,7 +290,7 @@ func (r *Registry) load(name string) { } // Create node. - r.nodes[name] = &node{ + return &node{ url: m["etcd"][0], peerURL: m["raft"][0], } diff --git a/server/remove_command.go b/server/remove_command.go index 4cbf98a25..3019f9d9d 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -26,8 +26,13 @@ func (c *RemoveCommand) CommandName() string { func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) { ps, _ := context.Server().Context().(*PeerServer) + // If this is a proxy then remove it and exit. + if ps.registry.ProxyExists(c.Name) { + return []byte{0}, ps.registry.UnregisterProxy(c.Name) + } + // Remove node from the shared registry. - err := ps.registry.Unregister(c.Name) + err := ps.registry.UnregisterPeer(c.Name) // Delete from stats delete(ps.followersStats.Followers, c.Name) diff --git a/server/server.go b/server/server.go index 033765164..f51972b85 100644 --- a/server/server.go +++ b/server/server.go @@ -164,6 +164,17 @@ func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWrit // Log request. log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr) + // Forward request along if the server is a proxy. + if s.peerServer.Mode() == ProxyMode { + if s.peerServer.proxyClientURL == "" { + w.Header().Set("Content-Type", "application/json") + etcdErr.NewError(402, "", 0).Write(w) + return + } + uhttp.Redirect(s.peerServer.proxyClientURL, w, req) + return + } + // Execute handler function and return error if necessary. if err := f(w, req); err != nil { if etcdErr, ok := err.(*etcdErr.Error); ok { @@ -206,6 +217,9 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque return etcdErr.NewError(300, "Empty result from raft", s.Store().Index()) } + w.Header().Set("X-Leader-Client-URL", s.url) + w.Header().Set("X-Leader-Peer-URL", ps.Config.URL) + // response for raft related commands[join/remove] if b, ok := result.([]byte); ok { w.WriteHeader(http.StatusOK) @@ -239,25 +253,24 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque return nil - } else { - leader := ps.raftServer.Leader() - - // No leader available. - if leader == "" { - return etcdErr.NewError(300, "", s.Store().Index()) - } - - var url string - switch c.(type) { - case *JoinCommand, *RemoveCommand: - url, _ = ps.registry.PeerURL(leader) - default: - url, _ = ps.registry.ClientURL(leader) - } - uhttp.Redirect(url, w, req) - - return nil } + + leader := ps.raftServer.Leader() + if leader == "" { + return etcdErr.NewError(300, "", s.Store().Index()) + } + + var url string + switch c.(type) { + case *JoinCommand, *RemoveCommand: + url, _ = ps.registry.PeerURL(leader) + default: + url, _ = ps.registry.ClientURL(leader) + } + + uhttp.Redirect(url, w, req) + + return nil } // Handler to return the current version of etcd. diff --git a/server/set_cluster_config_command.go b/server/set_cluster_config_command.go new file mode 100644 index 000000000..e5954a5ec --- /dev/null +++ b/server/set_cluster_config_command.go @@ -0,0 +1,25 @@ +package server + +import ( + "github.com/coreos/etcd/third_party/github.com/coreos/raft" +) + +func init() { + raft.RegisterCommand(&SetClusterConfigCommand{}) +} + +// SetClusterConfigCommand sets the cluster-level configuration. +type SetClusterConfigCommand struct { + Config *ClusterConfig `json:"config"` +} + +// CommandName returns the name of the command. +func (c *SetClusterConfigCommand) CommandName() string { + return "etcd:setClusterConfig" +} + +// Apply updates the cluster configuration. +func (c *SetClusterConfigCommand) Apply(context raft.Context) (interface{}, error) { + ps, _ := context.Server().Context().(*PeerServer) + return nil, ps.SetClusterConfig(c.Config) +} diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index ee55a4ca6..019e18cdc 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -122,5 +122,11 @@ func writeHeaders(w http.ResponseWriter, s Server) { w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index())) w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex())) w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term())) + if url, ok := s.ClientURL(s.Leader()); ok { + w.Header().Set("X-Leader-Client-URL", url) + } + if url, ok := s.PeerURL(s.Leader()); ok { + w.Header().Set("X-Leader-Peer-URL", url) + } w.WriteHeader(http.StatusOK) } diff --git a/tests/functional/proxy_test.go b/tests/functional/proxy_test.go new file mode 100644 index 000000000..eaa48439c --- /dev/null +++ b/tests/functional/proxy_test.go @@ -0,0 +1,45 @@ +package test + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/coreos/etcd/tests" + "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" +) + +// Create a full cluster and then add extra an extra proxy node. +func TestProxy(t *testing.T) { + clusterSize := 10 // MaxClusterSize + 1 + _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) + assert.NoError(t, err) + defer DestroyCluster(etcds) + + if err != nil { + t.Fatal("cannot create cluster") + } + + c := etcd.NewClient(nil) + c.SyncCluster() + + // Set key. + time.Sleep(time.Second) + if _, err := c.Set("foo", "bar", 0); err != nil { + panic(err) + } + time.Sleep(time.Second) + + // Check that all peers and proxies have the value. + for i, _ := range etcds { + resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000 + (i+1))) + if assert.NoError(t, err) { + body := tests.ReadBodyJSON(resp) + if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) { + assert.Equal(t, node["value"], "bar") + } + } + } +} diff --git a/tests/functional/util.go b/tests/functional/util.go index 7544e3fbc..9e5284cc2 100644 --- a/tests/functional/util.go +++ b/tests/functional/util.go @@ -109,7 +109,7 @@ func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os } } else { strI := strconv.Itoa(i + 1) - argGroup[i] = []string{"etcd", "-name=node" + strI, "-addr=127.0.0.1:400" + strI, "-peer-addr=127.0.0.1:700" + strI, "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001"} + argGroup[i] = []string{"etcd", "-name=node" + strI, fmt.Sprintf("-addr=127.0.0.1:%d", 4001 + i), fmt.Sprintf("-peer-addr=127.0.0.1:%d", 7001 + i), "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001"} if ssl { argGroup[i] = append(argGroup[i], sslServer2...) }