From f5698d3566419b75c98eb015ed089d2fde76ee7f Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 24 Feb 2014 17:01:04 -0700 Subject: [PATCH] Proxy promotion. --- config/config.go | 4 -- config/config_test.go | 37 -------------- etcd.go | 1 - server/cluster_config.go | 2 +- server/join_command.go | 5 ++ server/peer_server.go | 31 +++++++++++- server/peer_server_handlers.go | 20 ++++++++ server/promote_command.go | 64 ------------------------- server/registry.go | 29 ++++++++--- tests/functional/cluster_config_test.go | 29 +++++++++++ tests/functional/proxy_test.go | 18 ++++++- tests/server_utils.go | 1 - 12 files changed, 123 insertions(+), 118 deletions(-) delete mode 100644 server/promote_command.go create mode 100644 tests/functional/cluster_config_test.go diff --git a/config/config.go b/config/config.go index 14baaf6f9..de9889f6c 100644 --- a/config/config.go +++ b/config/config.go @@ -60,7 +60,6 @@ type Config struct { KeyFile string `toml:"key_file" env:"ETCD_KEY_FILE"` Peers []string `toml:"peers" env:"ETCD_PEERS"` PeersFile string `toml:"peers_file" env:"ETCD_PEERS_FILE"` - MaxClusterSize int `toml:"max_cluster_size" env:"ETCD_MAX_CLUSTER_SIZE"` MaxResultBuffer int `toml:"max_result_buffer" env:"ETCD_MAX_RESULT_BUFFER"` MaxRetryAttempts int `toml:"max_retry_attempts" env:"ETCD_MAX_RETRY_ATTEMPTS"` RetryInterval float64 `toml:"retry_interval" env:"ETCD_RETRY_INTERVAL"` @@ -90,7 +89,6 @@ func New() *Config { c := new(Config) c.SystemPath = DefaultSystemConfigPath c.Addr = "127.0.0.1:4001" - c.MaxClusterSize = 9 c.MaxResultBuffer = 1024 c.MaxRetryAttempts = 3 c.RetryInterval = 10.0 @@ -247,7 +245,6 @@ func (c *Config) LoadFlags(arguments []string) error { f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "") f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "") f.Float64Var(&c.RetryInterval, "retry-interval", c.RetryInterval, "") - f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "") f.IntVar(&c.Peer.HeartbeatTimeout, "peer-heartbeat-timeout", c.Peer.HeartbeatTimeout, "") f.IntVar(&c.Peer.ElectionTimeout, "peer-election-timeout", c.Peer.ElectionTimeout, "") @@ -281,7 +278,6 @@ func (c *Config) LoadFlags(arguments []string) error { f.StringVar(&c.DataDir, "d", c.DataDir, "(deprecated)") f.IntVar(&c.MaxResultBuffer, "m", c.MaxResultBuffer, "(deprecated)") f.IntVar(&c.MaxRetryAttempts, "r", c.MaxRetryAttempts, "(deprecated)") - f.IntVar(&c.MaxClusterSize, "maxsize", c.MaxClusterSize, "(deprecated)") f.IntVar(&c.SnapshotCount, "snapshotCount", c.SnapshotCount, "(deprecated)") // END DEPRECATED FLAGS diff --git a/config/config_test.go b/config/config_test.go index d006e4d48..bc33cd212 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -51,7 +51,6 @@ func TestConfigTOML(t *testing.T) { assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "") assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "") assert.Equal(t, c.PeersFile, "/tmp/peers", "") - assert.Equal(t, c.MaxClusterSize, 10, "") assert.Equal(t, c.MaxResultBuffer, 512, "") assert.Equal(t, c.MaxRetryAttempts, 5, "") assert.Equal(t, c.Name, "test-name", "") @@ -101,7 +100,6 @@ func TestConfigEnv(t *testing.T) { assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "") assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "") assert.Equal(t, c.PeersFile, "/tmp/peers", "") - assert.Equal(t, c.MaxClusterSize, 10, "") assert.Equal(t, c.MaxResultBuffer, 512, "") assert.Equal(t, c.MaxRetryAttempts, 5, "") assert.Equal(t, c.Name, "test-name", "") @@ -281,21 +279,6 @@ func TestConfigPeersFileFlag(t *testing.T) { assert.Equal(t, c.PeersFile, "/tmp/peers", "") } -// Ensures that the Max Cluster Size can be parsed from the environment. -func TestConfigMaxClusterSizeEnv(t *testing.T) { - withEnv("ETCD_MAX_CLUSTER_SIZE", "5", func(c *Config) { - assert.Nil(t, c.LoadEnv(), "") - assert.Equal(t, c.MaxClusterSize, 5, "") - }) -} - -// Ensures that a the Max Cluster Size flag can be parsed. -func TestConfigMaxClusterSizeFlag(t *testing.T) { - c := New() - assert.Nil(t, c.LoadFlags([]string{"-max-cluster-size", "5"}), "") - assert.Equal(t, c.MaxClusterSize, 5, "") -} - // Ensures that the Max Result Buffer can be parsed from the environment. func TestConfigMaxResultBufferEnv(t *testing.T) { withEnv("ETCD_MAX_RESULT_BUFFER", "512", func(c *Config) { @@ -600,26 +583,6 @@ func TestConfigDeprecatedPeersFileFlag(t *testing.T) { assert.Equal(t, stderr, "[deprecated] use -peers-file, not -CF\n", "") } -func TestConfigDeprecatedMaxClusterSizeFlag(t *testing.T) { - _, stderr := capture(func() { - c := New() - err := c.LoadFlags([]string{"-maxsize", "5"}) - assert.NoError(t, err) - assert.Equal(t, c.MaxClusterSize, 5, "") - }) - assert.Equal(t, stderr, "[deprecated] use -max-cluster-size, not -maxsize\n", "") -} - -func TestConfigDeprecatedMaxResultBufferFlag(t *testing.T) { - _, stderr := capture(func() { - c := New() - err := c.LoadFlags([]string{"-m", "512"}) - assert.NoError(t, err) - assert.Equal(t, c.MaxResultBuffer, 512, "") - }) - assert.Equal(t, stderr, "[deprecated] use -max-result-buffer, not -m\n", "") -} - func TestConfigDeprecatedMaxRetryAttemptsFlag(t *testing.T) { _, stderr := capture(func() { c := New() diff --git a/etcd.go b/etcd.go index 6984b6320..c06e14313 100644 --- a/etcd.go +++ b/etcd.go @@ -120,7 +120,6 @@ func main() { Scheme: config.PeerTLSInfo().Scheme(), URL: config.Peer.Addr, SnapshotCount: config.SnapshotCount, - MaxClusterSize: config.MaxClusterSize, RetryTimes: config.MaxRetryAttempts, RetryInterval: config.RetryInterval, } diff --git a/server/cluster_config.go b/server/cluster_config.go index 807cfa7dd..bdb1ff231 100644 --- a/server/cluster_config.go +++ b/server/cluster_config.go @@ -21,7 +21,7 @@ type ClusterConfig struct { // PromoteDelay is the amount of time, in seconds, after a node is // unreachable that it will be swapped out for a proxy node, if available. - PromoteDelay int `json:"PromoteDelay"` + PromoteDelay int `json:"promoteDelay"` } // NewClusterConfig returns a cluster configuration with default settings. diff --git a/server/join_command.go b/server/join_command.go index 567e6e182..e247efa3f 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -62,6 +62,11 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { return buf.Bytes(), nil } + // Remove it as a proxy if it is one. + if ps.registry.ProxyExists(c.Name) { + ps.registry.UnregisterProxy(c.Name) + } + // Add to shared peer registry. ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL) diff --git a/server/peer_server.go b/server/peer_server.go index c0783e9d5..ab6eddbe8 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -141,7 +141,6 @@ func (s *PeerServer) ClusterConfig() *ClusterConfig { // SetClusterConfig updates the current cluster configuration. // Adjusting the active size will func (s *PeerServer) SetClusterConfig(c *ClusterConfig) error { - prevActiveSize := s.clusterConfig.ActiveSize s.clusterConfig = c // Validate configuration. @@ -294,9 +293,10 @@ func (s *PeerServer) HTTPHandler() http.Handler { router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler) router.HandleFunc("/upgrade", s.UpgradeHttpHandler) router.HandleFunc("/join", s.JoinHttpHandler) + router.HandleFunc("/promote", s.PromoteHttpHandler).Methods("POST") router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler) router.HandleFunc("/config", s.getClusterConfigHttpHandler).Methods("GET") - router.HandleFunc("/config", s.setClusterConfigHttpHandler).Methods("POST") + router.HandleFunc("/config", s.setClusterConfigHttpHandler).Methods("PUT") router.HandleFunc("/vote", s.VoteHttpHandler) router.HandleFunc("/log", s.GetLogHttpHandler) router.HandleFunc("/log/append", s.AppendEntriesHttpHandler) @@ -632,18 +632,45 @@ func (s *PeerServer) monitorActive(closeChan chan bool) { peerCount := s.registry.PeerCount() proxies := s.registry.Proxies() peers := s.registry.Peers() + fmt.Println("active.3»", peers) if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { peers = append(peers[:index], peers[index+1:]...) } + fmt.Println("active.1»", activeSize, peerCount) + fmt.Println("active.2»", proxies) + // If we have more active nodes than we should then demote. if peerCount > activeSize { peer := peers[rand.Intn(len(peers))] + fmt.Println("active.demote»", peer) if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil { log.Infof("%s: warning: demotion error: %v", s.Config.Name, err) } continue } + + // If we don't have enough active nodes then try to promote a proxy. + if peerCount < activeSize && len(proxies) > 0 { + proxy := proxies[rand.Intn(len(proxies))] + proxyPeerURL, _ := s.registry.ProxyPeerURL(proxy) + log.Infof("%s: promoting: %v (%s)", s.Config.Name, proxy, proxyPeerURL) + + // Notify proxy to promote itself. + client := &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: false, + ResponseHeaderTimeout: ActiveMonitorTimeout, + }, + } + resp, err := client.Post(fmt.Sprintf("%s/promote", proxyPeerURL), "application/json", nil) + if err != nil { + log.Infof("%s: warning: promotion error: %v", s.Config.Name, err) + } else if resp.StatusCode != http.StatusOK { + log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode) + } + continue + } } } diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index 96abbd0be..50603cee2 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -3,6 +3,7 @@ package server import ( "encoding/json" "net/http" + "net/url" "strconv" "time" @@ -171,6 +172,25 @@ func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) } } +// Attempt to rejoin the cluster as a peer. +func (ps *PeerServer) PromoteHttpHandler(w http.ResponseWriter, req *http.Request) { + log.Infof("%s attempting to promote in cluster: %s", ps.Config.Name, ps.proxyPeerURL) + url, err := url.Parse(ps.proxyPeerURL) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + err = ps.joinByPeer(ps.raftServer, url.Host, ps.Config.Scheme) + if err != nil { + log.Infof("%s error while promoting: %v", ps.Config.Name, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + log.Infof("%s promoted in the cluster", ps.Config.Name) + w.WriteHeader(http.StatusOK) +} + // Response to remove request func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { if req.Method != "DELETE" { diff --git a/server/promote_command.go b/server/promote_command.go deleted file mode 100644 index 9558abcdb..000000000 --- a/server/promote_command.go +++ /dev/null @@ -1,64 +0,0 @@ -package server - -import ( - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/third_party/github.com/coreos/raft" -) - -func init() { - raft.RegisterCommand(&PromoteCommand{}) -} - -// PromoteCommand represents a Raft command for converting a proxy to a peer. -type PromoteCommand struct { - Name string `json:"name"` -} - -// CommandName returns the name of the command. -func (c *PromoteCommand) CommandName() string { - return "etcd:promote" -} - -// Apply promotes a named proxy to a peer. -func (c *PromoteCommand) Apply(context raft.Context) (interface{}, error) { - ps, _ := context.Server().Context().(*PeerServer) - config := ps.ClusterConfig() - - // If cluster size is larger than max cluster size then return an error. - if ps.registry.PeerCount() >= config.ActiveSize { - return etcdErr.NewError(etcdErr.EcodePromoteError, "", 0) - } - - // If proxy doesn't exist then return an error. - if !ps.registry.ProxyExists(c.Name) { - return etcdErr.NewError(etcdErr.EcodePromoteError, "", 0) - } - - // Retrieve proxy settings. - proxyClientURL := ps.registry.ProxyClientURL() - proxyPeerURL := ps.registry.ProxyPeerURL() - - // Remove from registry as a proxy. - if err := ps.registry.UnregisterProxy(c.Name); err != nil { - log.Info("Cannot remove proxy: ", c.Name) - return nil, err - } - - // Add to shared peer registry. - ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL) - - // Add peer in raft - err := context.Server().AddPeer(c.Name, "") - - // Add peer stats - if c.Name != ps.RaftServer().Name() { - ps.followersStats.Followers[c.Name] = &raftFollowerStats{} - ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 - } - - return nil, err -} - -func (c *JoinCommand) NodeName() string { - return c.Name -} diff --git a/server/registry.go b/server/registry.go index 1d6413cfe..3a2015309 100644 --- a/server/registry.go +++ b/server/registry.go @@ -5,6 +5,7 @@ import ( "net/url" "path" "path/filepath" + "sort" "strings" "sync" @@ -48,6 +49,7 @@ func (r *Registry) Peers() []string { for name, _ := range r.peers { names = append(names, name) } + sort.Sort(sort.StringSlice(names)) return names } @@ -57,6 +59,7 @@ func (r *Registry) Proxies() []string { for name, _ := range r.proxies { names = append(names, name) } + sort.Sort(sort.StringSlice(names)) return names } @@ -70,7 +73,11 @@ func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) err // RegisterProxy adds a proxy to the registry. func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) error { // TODO(benbjohnson): Disallow proxies that are already peers. - return r.register(RegistryProxyKey, name, peerURL, machURL) + if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil { + return err + } + r.proxies[name] = r.load(RegistryProxyKey, name) + return nil } func (r *Registry) register(key, name string, peerURL string, machURL string) error { @@ -153,7 +160,9 @@ func (r *Registry) ClientURL(name string) (string, bool) { func (r *Registry) clientURL(key, name string) (string, bool) { if r.peers[name] == nil { - r.peers[name] = r.load(key, name) + if node := r.load(key, name); node != nil { + r.peers[name] = node + } } if node := r.peers[name]; node != nil { @@ -184,7 +193,9 @@ func (r *Registry) PeerURL(name string) (string, bool) { func (r *Registry) peerURL(key, name string) (string, bool) { if r.peers[name] == nil { - r.peers[name] = r.load(key, name) + if node := r.load(key, name); node != nil { + r.peers[name] = node + } } if node := r.peers[name]; node != nil { @@ -203,7 +214,9 @@ func (r *Registry) ProxyClientURL(name string) (string, bool) { func (r *Registry) proxyClientURL(key, name string) (string, bool) { if r.proxies[name] == nil { - r.proxies[name] = r.load(key, name) + if node := r.load(key, name); node != nil { + r.proxies[name] = node + } } if node := r.proxies[name]; node != nil { return node.url, true @@ -215,12 +228,14 @@ func (r *Registry) proxyClientURL(key, name string) (string, bool) { func (r *Registry) ProxyPeerURL(name string) (string, bool) { r.Lock() defer r.Unlock() - return r.proxyPeerURL(RegistryProxyKey,name) + return r.proxyPeerURL(RegistryProxyKey, name) } func (r *Registry) proxyPeerURL(key, name string) (string, bool) { if r.proxies[name] == nil { - r.proxies[name] = r.load(key, name) + if node := r.load(key, name); node != nil { + r.proxies[name] = node + } } if node := r.proxies[name]; node != nil { return node.peerURL, true @@ -278,7 +293,7 @@ func (r *Registry) load(key, name string) *node { } // Retrieve from store. - e, err := r.store.Get(path.Join(RegistryPeerKey, name), false, false) + e, err := r.store.Get(path.Join(key, name), false, false) if err != nil { return nil } diff --git a/tests/functional/cluster_config_test.go b/tests/functional/cluster_config_test.go new file mode 100644 index 000000000..8f8b667f0 --- /dev/null +++ b/tests/functional/cluster_config_test.go @@ -0,0 +1,29 @@ +package test + +import ( + "bytes" + "os" + "testing" + "time" + + "github.com/coreos/etcd/tests" + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" +) + +// Ensure that the cluster configuration can be updated. +func TestClusterConfig(t *testing.T) { + _, etcds, err := CreateCluster(3, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) + assert.NoError(t, err) + defer DestroyCluster(etcds) + + resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "promoteDelay":60}`)) + assert.Equal(t, resp.StatusCode, 200) + + time.Sleep(1 * time.Second) + + resp, _ = tests.Get("http://localhost:7002/config") + body := tests.ReadBodyJSON(resp) + assert.Equal(t, resp.StatusCode, 200) + assert.Equal(t, body["activeSize"], 3) + assert.Equal(t, body["promoteDelay"], 60) +} diff --git a/tests/functional/proxy_test.go b/tests/functional/proxy_test.go index eaa48439c..5c162eb93 100644 --- a/tests/functional/proxy_test.go +++ b/tests/functional/proxy_test.go @@ -1,11 +1,13 @@ package test import ( + "bytes" "fmt" "os" "testing" "time" + "github.com/coreos/etcd/server" "github.com/coreos/etcd/tests" "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" @@ -13,7 +15,7 @@ import ( // Create a full cluster and then add extra an extra proxy node. func TestProxy(t *testing.T) { - clusterSize := 10 // MaxClusterSize + 1 + clusterSize := 10 // DefaultActiveSize + 1 _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) assert.NoError(t, err) defer DestroyCluster(etcds) @@ -42,4 +44,18 @@ func TestProxy(t *testing.T) { } } } + + time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + + // Reconfigure with larger active size (10 nodes) and wait for promotion. + resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + + // Verify that the proxy node is now a peer. + fmt.Println("CHECK!") + time.Sleep(30 * time.Second) } diff --git a/tests/server_utils.go b/tests/server_utils.go index eefe1782d..1a86170c4 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -39,7 +39,6 @@ func RunServer(f func(*server.Server)) { URL: "http://" + testRaftURL, Scheme: "http", SnapshotCount: testSnapshotCount, - MaxClusterSize: 9, } mb := metrics.NewBucket("")