From 3a4df1612ce0cd69822a243ce48e8f9197dfb23b Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 13 Feb 2014 12:30:52 -0800 Subject: [PATCH] feat(discovery): adjust boot order to find peers The boot order for peers is -discovery, -peers, log data, forming new cluster itself. Special rules: 1. If discovery succeeds, it would find peers specified by discover URL only. 2. Etcd would fail when meeting bad -discovery, no -peers and log data. Add TestDiscoveryDownNoBackupPeersWithDataDir as the test. --- config/config.go | 38 --------- etcd.go | 2 +- server/peer_server.go | 123 ++++++++++++++++++++++------- server/registry.go | 4 +- server/server.go | 2 +- tests/functional/discovery_test.go | 53 +++++++++++++ tests/functional/etcd_tls_test.go | 17 +++- tests/server_utils.go | 2 +- 8 files changed, 170 insertions(+), 71 deletions(-) diff --git a/config/config.go b/config/config.go index 58a97e1c1..14baaf6f9 100644 --- a/config/config.go +++ b/config/config.go @@ -14,7 +14,6 @@ import ( "github.com/coreos/etcd/third_party/github.com/BurntSushi/toml" - "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/log" ustrings "github.com/coreos/etcd/pkg/strings" "github.com/coreos/etcd/server" @@ -144,13 +143,6 @@ func (c *Config) Load(arguments []string) error { return fmt.Errorf("sanitize: %v", err) } - // Attempt cluster discovery - if c.Discovery != "" { - if err := c.handleDiscovery(); err != nil { - return err - } - } - // Force remove server configuration if specified. if c.Force { c.Reset() @@ -215,36 +207,6 @@ func (c *Config) loadEnv(target interface{}) error { return nil } -func (c *Config) handleDiscovery() error { - p, err := discovery.Do(c.Discovery, c.Name, c.Peer.Addr) - - // This is fatal, discovery encountered an unexpected error - // and we have no peer list. - if err != nil && len(c.Peers) == 0 { - log.Fatalf("Discovery failed and a backup peer list wasn't provided: %v", err) - return err - } - - // Warn about errors coming from discovery, this isn't fatal - // since the user might have provided a peer list elsewhere. - if err != nil { - log.Warnf("Discovery encountered an error but a backup peer list (%v) was provided: %v", c.Peers, err) - } - - for i := range p { - // Strip the scheme off of the peer if it has one - // TODO(bp): clean this up! - purl, err := url.Parse(p[i]) - if err == nil { - p[i] = purl.Host - } - } - - c.Peers = p - - return nil -} - // Loads configuration from command line flags. func (c *Config) LoadFlags(arguments []string) error { var peers, cors, path string diff --git a/etcd.go b/etcd.go index 35b0f644c..8f4db1f49 100644 --- a/etcd.go +++ b/etcd.go @@ -187,7 +187,7 @@ func main() { } ps.SetServer(s) - ps.Start(config.Snapshot, config.Peers) + ps.Start(config.Snapshot, config.Discovery, config.Peers) go func() { log.Infof("peer server [name %s, listen on %s, advertised url %s]", ps.Config.Name, psListener.Addr(), ps.Config.URL) diff --git a/server/peer_server.go b/server/peer_server.go index 85fabd3b3..422733c3b 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -14,6 +14,7 @@ import ( "github.com/coreos/etcd/third_party/github.com/coreos/raft" "github.com/coreos/etcd/third_party/github.com/gorilla/mux" + "github.com/coreos/etcd/discovery" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" @@ -99,8 +100,100 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) { s.raftServer = raftServer } +// Helper function to do discovery and return results in expected format +func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { + peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL) + + // Warn about errors coming from discovery, this isn't fatal + // since the user might have provided a peer list elsewhere, + // or there is some log in data dir. + if err != nil { + log.Warnf("Discovery encountered an error: %v", err) + return + } + + for i := range peers { + // Strip the scheme off of the peer if it has one + // TODO(bp): clean this up! + purl, err := url.Parse(peers[i]) + if err == nil { + peers[i] = purl.Host + } + } + + log.Infof("Discovery fetched back peer list: %v", peers) + + return +} + +// Try all possible ways to find clusters to join +// Include -discovery, -peers and log data in -data-dir +// +// Peer discovery follows this order: +// 1. -discovery +// 2. -peers +// 3. previous peers in -data-dir +func (s *PeerServer) findCluster(discoverURL string, peers []string) { + // Attempt cluster discovery + toDiscover := discoverURL != "" + if toDiscover { + discoverPeers, discoverErr := s.handleDiscovery(discoverURL) + // It is registered in discover url + if discoverErr == nil { + // start as a leader in a new cluster + if len(discoverPeers) == 0 { + log.Debug("This peer is starting a brand new cluster based on discover URL.") + s.startAsLeader() + } else { + s.startAsFollower(discoverPeers) + } + return + } + } + + hasPeerList := len(peers) > 0 + // if there is log in data dir, append previous peers to peers in config + // to find cluster + prevPeers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) + for i := 0; i < len(prevPeers); i++ { + u, err := url.Parse(prevPeers[i]) + if err != nil { + log.Debug("rejoin cannot parse url: ", err) + } + prevPeers[i] = u.Host + } + peers = append(peers, prevPeers...) + + // if there is backup peer lists, use it to find cluster + if len(peers) > 0 { + ok := s.joinCluster(peers) + if !ok { + log.Warn("No living peers are found!") + } else { + log.Debugf("%s restart as a follower based on peers[%v]", s.Config.Name) + return + } + } + + if !s.raftServer.IsLogEmpty() { + log.Debug("Entire cluster is down! %v will restart the cluster.", s.Config.Name) + return + } + + if toDiscover { + log.Fatalf("Discovery failed, no available peers in backup list, and no log data") + } + + if hasPeerList { + log.Fatalf("No available peers in backup list, and no log data") + } + + log.Infof("This peer is starting a brand new cluster now.") + s.startAsLeader() +} + // Start the raft server -func (s *PeerServer) Start(snapshot bool, cluster []string) error { +func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error { // LoadSnapshot if snapshot { err := s.raftServer.LoadSnapshot() @@ -114,31 +207,7 @@ func (s *PeerServer) Start(snapshot bool, cluster []string) error { s.raftServer.Start() - if s.raftServer.IsLogEmpty() { - // start as a leader in a new cluster - if len(cluster) == 0 { - s.startAsLeader() - } else { - s.startAsFollower(cluster) - } - - } else { - // Rejoin the previous cluster - cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) - for i := 0; i < len(cluster); i++ { - u, err := url.Parse(cluster[i]) - if err != nil { - log.Debug("rejoin cannot parse url: ", err) - } - cluster[i] = u.Host - } - ok := s.joinCluster(cluster) - if !ok { - log.Warn("the entire cluster is down! this peer will restart the cluster.") - } - - log.Debugf("%s restart as a follower", s.Config.Name) - } + s.findCluster(discoverURL, peers) s.closeChan = make(chan bool) @@ -209,7 +278,7 @@ func (s *PeerServer) startAsFollower(cluster []string) { if ok { return } - log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %.1f seconds", cluster, s.Config.RetryInterval) + log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval) time.Sleep(time.Second * time.Duration(s.Config.RetryInterval)) } diff --git a/server/registry.go b/server/registry.go index 669cb00c5..0e1a5daf2 100644 --- a/server/registry.go +++ b/server/registry.go @@ -94,7 +94,9 @@ func (r *Registry) clientURL(name string) (string, bool) { return "", false } -// Retrieves the host part of peer URL for a given node by name. +// TODO(yichengq): have all of the code use a full URL with scheme +// and remove this method +// PeerHost retrieves the host part of peer URL for a given node by name. func (r *Registry) PeerHost(name string) (string, bool) { rawurl, ok := r.PeerURL(name) if ok { diff --git a/server/server.go b/server/server.go index 66d4680d0..033765164 100644 --- a/server/server.go +++ b/server/server.go @@ -79,7 +79,7 @@ func (s *Server) URL() string { return s.url } -// Returns the host part of Peer URL for a given node name. +// PeerHost retrieves the host part of Peer URL for a given node name. func (s *Server) PeerHost(name string) (string, bool) { return s.registry.PeerHost(name) } diff --git a/tests/functional/discovery_test.go b/tests/functional/discovery_test.go index b8e2c97d6..16f3d2331 100644 --- a/tests/functional/discovery_test.go +++ b/tests/functional/discovery_test.go @@ -111,6 +111,59 @@ func TestDiscoveryNoWithBackupPeers(t *testing.T) { }) } +// TestDiscoveryDownNoBackupPeersWithDataDir ensures that etcd runs if it is +// started with a bad discovery URL, no backups and valid data dir. +func TestDiscoveryDownNoBackupPeersWithDataDir(t *testing.T) { + etcdtest.RunServer(func(s *server.Server) { + u, ok := s.PeerHost("ETCDTEST") + if !ok { + t.Fatalf("Couldn't find the URL") + } + + // run etcd and connect to ETCDTEST server + proc, err := startServer([]string{"-peers", u}) + if err != nil { + t.Fatal(err.Error()) + } + + // check it runs well + client := http.Client{} + err = assertServerFunctional(client, "http") + if err != nil { + t.Fatal(err.Error()) + } + + // stop etcd, and leave valid data dir for later usage + stopServer(proc) + + g := garbageHandler{t: t} + ts := httptest.NewServer(&g) + defer ts.Close() + + discover := ts.URL + "/v2/keys/_etcd/registry/1" + // connect to ETCDTEST server again with previous data dir + proc, err = startServerWithDataDir([]string{"-discovery", discover}) + if err != nil { + t.Fatal(err.Error()) + } + defer stopServer(proc) + + // TODO(yichengq): it needs some time to do leader election + // improve to get rid of it + time.Sleep(1 * time.Second) + + client = http.Client{} + err = assertServerFunctional(client, "http") + if err != nil { + t.Fatal(err.Error()) + } + + if !g.success { + t.Fatal("Discovery server never called") + } + }) +} + // TestDiscoveryFirstPeer ensures that etcd starts as the leader if it // registers as the first peer. func TestDiscoveryFirstPeer(t *testing.T) { diff --git a/tests/functional/etcd_tls_test.go b/tests/functional/etcd_tls_test.go index f2332eea2..d6299f061 100644 --- a/tests/functional/etcd_tls_test.go +++ b/tests/functional/etcd_tls_test.go @@ -167,6 +167,18 @@ func startServer(extra []string) (*os.Process, error) { return os.StartProcess(EtcdBinPath, cmd, procAttr) } +func startServerWithDataDir(extra []string) (*os.Process, error) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + cmd := []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"} + cmd = append(cmd, extra...) + + println(strings.Join(cmd, " ")) + + return os.StartProcess(EtcdBinPath, cmd, procAttr) +} + func stopServer(proc *os.Process) { err := proc.Kill() if err != nil { @@ -194,7 +206,8 @@ func assertServerFunctional(client http.Client, scheme string) error { } if err == nil { - if resp.StatusCode != 201 { + // Internal error may mean that servers are in leader election + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusInternalServerError { return errors.New(fmt.Sprintf("resp.StatusCode == %s", resp.Status)) } else { return nil @@ -202,7 +215,7 @@ func assertServerFunctional(client http.Client, scheme string) error { } } - return errors.New("etcd server was not reachable in time") + return errors.New("etcd server was not reachable in time / had internal error") } func assertServerNotFunctional(client http.Client, scheme string) error { diff --git a/tests/server_utils.go b/tests/server_utils.go index e8b510423..eefe1782d 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -76,7 +76,7 @@ func RunServer(f func(*server.Server)) { c := make(chan bool) go func() { c <- true - ps.Start(false, []string{}) + ps.Start(false, "", []string{}) h := waitHandler{w, ps.HTTPHandler()} http.Serve(psListener, &h) }()