From fddbf35df27d3b6f7fceb0a03542b99bfead23b7 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 25 Feb 2014 10:02:01 -0700 Subject: [PATCH] Add automatic node promotion / demotion. --- server/demote_command.go | 49 ++++++++++++++++++++++++++++++++++ server/peer_server.go | 4 +-- tests/functional/proxy_test.go | 29 +++++++++++++++++--- 3 files changed, 77 insertions(+), 5 deletions(-) create mode 100644 server/demote_command.go diff --git a/server/demote_command.go b/server/demote_command.go new file mode 100644 index 000000000..0e832da11 --- /dev/null +++ b/server/demote_command.go @@ -0,0 +1,49 @@ +package server + +import ( + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/third_party/github.com/coreos/raft" +) + +func init() { + raft.RegisterCommand(&DemoteCommand{}) +} + +// DemoteCommand represents a command to change a peer to a proxy. +type DemoteCommand struct { + Name string `json:"name"` +} + +// CommandName returns the name of the command. +func (c *DemoteCommand) CommandName() string { + return "etcd:demote" +} + +// Apply executes the command. +func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) { + ps, _ := context.Server().Context().(*PeerServer) + + // Save URLs. + clientURL, _ := ps.registry.ClientURL(c.Name) + peerURL, _ := ps.registry.PeerURL(c.Name) + + // Perform a removal. + (&RemoveCommand{Name: c.Name}).Apply(context) + + // Register node as a proxy. + ps.registry.RegisterProxy(c.Name, peerURL, clientURL) + + // Update mode if this change applies to this server. + if c.Name == ps.Config.Name { + log.Infof("Set mode after demotion: %s", c.Name) + ps.SetMode(ProxyMode) + } + + return nil, nil +} + +// NodeName returns the name of the affected node. +func (c *DemoteCommand) NodeName() string { + return c.Name +} + diff --git a/server/peer_server.go b/server/peer_server.go index ab6eddbe8..c99ff2fba 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -26,7 +26,7 @@ import ( ) const ThresholdMonitorTimeout = 5 * time.Second -const ActiveMonitorTimeout = 5 * time.Second +const ActiveMonitorTimeout = 1 * time.Second type PeerServerConfig struct { Name string @@ -644,7 +644,7 @@ func (s *PeerServer) monitorActive(closeChan chan bool) { if peerCount > activeSize { peer := peers[rand.Intn(len(peers))] fmt.Println("active.demoteĀ»", peer) - if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil { + if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil { log.Infof("%s: warning: demotion error: %v", s.Config.Name, err) } continue diff --git a/tests/functional/proxy_test.go b/tests/functional/proxy_test.go index 5c162eb93..d7a0b66b6 100644 --- a/tests/functional/proxy_test.go +++ b/tests/functional/proxy_test.go @@ -45,7 +45,10 @@ func TestProxy(t *testing.T) { } } - time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + // Verify that we have one proxy. + result, err := c.Get("_etcd/proxies", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 1) // Reconfigure with larger active size (10 nodes) and wait for promotion. resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`)) @@ -56,6 +59,26 @@ func TestProxy(t *testing.T) { time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) // Verify that the proxy node is now a peer. - fmt.Println("CHECK!") - time.Sleep(30 * time.Second) + result, err = c.Get("_etcd/proxies", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 0) + + // Reconfigure with a smaller active size (8 nodes). + resp, _ = tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "promoteDelay":1800}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + // Wait for two monitor cycles before checking for demotion. + time.Sleep((2 * server.ActiveMonitorTimeout) + (1 * time.Second)) + + // Verify that we now have eight peers. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 8) + + // Verify that we now have two proxies. + result, err = c.Get("_etcd/proxies", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 2) }