From 03839ca80631939178c0bda30cc685dfc9f51e57 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 10 Apr 2014 02:17:01 -0700 Subject: [PATCH] fix(peer_server): recover from outage with discovery This patch also contains the refactor of find cluster process. It is changed based on @xiangli-cmu 's commits in 627 issue. --- server/peer_server.go | 210 +++++++++++++++----------- tests/functional/discovery_test.go | 47 ++++++ tests/functional/etcd_tls_test.go | 25 +++ tests/functional/v1_migration_test.go | 2 +- 4 files changed, 196 insertions(+), 88 deletions(-) diff --git a/server/peer_server.go b/server/peer_server.go index e7acdc000..c85f98fa5 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -167,113 +167,91 @@ func (s *PeerServer) SetClusterConfig(c *ClusterConfig) { s.clusterConfig = c } -// Helper function to do discovery and return results in expected format -func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { - peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL) +// Try all possible ways to find clusters to join +// Include log data in -data-dir, -discovery and -peers +// +// Peer discovery follows this order: +// 1. previous peers in -data-dir +// 2. -discovery +// 3. -peers +// +// TODO(yichengq): RaftServer should be started as late as possible. +// Current implementation to start it is not that good, +// and should be refactored later. +func (s *PeerServer) findCluster(discoverURL string, peers []string) { + name := s.Config.Name + isNewNode := s.raftServer.IsLogEmpty() - // Warn about errors coming from discovery, this isn't fatal - // since the user might have provided a peer list elsewhere, - // or there is some log in data dir. - if err != nil { - log.Warnf("Discovery encountered an error: %v", err) + // Try its best to find possible peers, and connect with them. + if !isNewNode { + // Take old nodes into account. + allPeers := s.getKnownPeers() + // Discover registered peers. + // TODO(yichengq): It may mess up discoverURL if this is + // set wrong by mistake. This may need to refactor discovery + // module. Fix it later. + if discoverURL != "" { + discoverPeers, _ := s.handleDiscovery(discoverURL) + allPeers = append(allPeers, discoverPeers...) + } + allPeers = append(allPeers, peers...) + allPeers = s.removeSelfFromList(allPeers) + + // If there is possible peer list, use it to find cluster. + if len(allPeers) > 0 { + // TODO(yichengq): joinCluster may fail if there's no leader for + // current cluster. It should wait if the cluster is under + // leader election, or the node with changed IP cannot join + // the cluster then. + if err := s.startAsFollower(allPeers, 1); err == nil { + log.Debugf("%s joins to the previous cluster %v", name, allPeers) + return + } + + log.Warnf("%s cannot connect to previous cluster %v", name, allPeers) + } + + // TODO(yichengq): Think about the action that should be done + // if it cannot connect any of the previous known node. + s.raftServer.Start() + log.Debugf("%s is restarting the cluster %v", name, allPeers) return } - for i := range peers { - // Strip the scheme off of the peer if it has one - // TODO(bp): clean this up! - purl, err := url.Parse(peers[i]) - if err == nil { - peers[i] = purl.Host - } - } - - log.Infof("Discovery fetched back peer list: %v", peers) - - return -} - -// Try all possible ways to find clusters to join -// Include -discovery, -peers and log data in -data-dir -// -// Peer discovery follows this order: -// 1. -discovery -// 2. -peers -// 3. previous peers in -data-dir -// RaftServer should be started as late as possible. Current implementation -// to start it is not that good, and will be refactored in #627. -func (s *PeerServer) findCluster(discoverURL string, peers []string) { // Attempt cluster discovery - toDiscover := discoverURL != "" - if toDiscover { + if discoverURL != "" { discoverPeers, discoverErr := s.handleDiscovery(discoverURL) // It is registered in discover url if discoverErr == nil { // start as a leader in a new cluster if len(discoverPeers) == 0 { - log.Debug("This peer is starting a brand new cluster based on discover URL.") + log.Debugf("%s is starting a new cluster via discover service", name) s.startAsLeader() } else { - s.startAsFollower(discoverPeers) + log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers) + if err := s.startAsFollower(discoverPeers, s.Config.RetryTimes); err != nil { + log.Fatal(err) + } } return } - } + log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr) - hasPeerList := len(peers) > 0 - // if there is log in data dir, append previous peers to peers in config - // to find cluster - prevPeers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) - for i := 0; i < len(prevPeers); i++ { - u, err := url.Parse(prevPeers[i]) - if err != nil { - log.Debug("rejoin cannot parse url: ", err) - } - prevPeers[i] = u.Host - } - peers = append(peers, prevPeers...) - - // Remove its own peer address from the peer list to join - u, err := url.Parse(s.Config.URL) - if err != nil { - log.Fatalf("cannot parse peer address %v: %v", s.Config.URL, err) - } - filteredPeers := make([]string, 0) - for _, v := range peers { - if v != u.Host { - filteredPeers = append(filteredPeers, v) + if len(peers) == 0 { + log.Fatalf("%s, the new leader, must register itself to discovery service as required", name) } } - peers = filteredPeers - // if there is backup peer lists, use it to find cluster if len(peers) > 0 { - ok := s.joinCluster(peers) - if !ok { - log.Warn("No living peers are found!") - } else { - s.raftServer.Start() - log.Debugf("%s restart as a follower based on peers[%v]", s.Config.Name) - return + if err := s.startAsFollower(peers, s.Config.RetryTimes); err != nil { + log.Fatalf("%s cannot connect to existing cluster %v", name, peers) } - } - - if !s.raftServer.IsLogEmpty() { - log.Debug("Entire cluster is down! %v will restart the cluster.", s.Config.Name) - s.raftServer.Start() return } - if toDiscover { - log.Fatalf("Discovery failed, no available peers in backup list, and no log data") - } - - if hasPeerList { - log.Fatalf("No available peers in backup list, and no log data") - } - - log.Infof("This peer is starting a brand new cluster now.") + log.Infof("%s is starting a new cluster.", s.Config.Name) s.startAsLeader() + return } // Start the raft server @@ -373,19 +351,22 @@ func (s *PeerServer) startAsLeader() { log.Debugf("%s start as a leader", s.Config.Name) } -func (s *PeerServer) startAsFollower(cluster []string) { +func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error { // start as a follower in a existing cluster - for i := 0; i < s.Config.RetryTimes; i++ { + for i := 0; ; i++ { ok := s.joinCluster(cluster) if ok { - s.raftServer.Start() - return + break + } + if i == retryTimes - 1 { + return fmt.Errorf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes) } log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval) time.Sleep(time.Second * time.Duration(s.Config.RetryInterval)) } - log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes) + s.raftServer.Start() + return nil } // getVersion fetches the peer version of a cluster. @@ -429,6 +410,61 @@ func (s *PeerServer) Upgradable() error { return nil } +// Helper function to do discovery and return results in expected format +func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { + peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL) + + // Warn about errors coming from discovery, this isn't fatal + // since the user might have provided a peer list elsewhere, + // or there is some log in data dir. + if err != nil { + log.Warnf("Discovery encountered an error: %v", err) + return + } + + for i := range peers { + // Strip the scheme off of the peer if it has one + // TODO(bp): clean this up! + purl, err := url.Parse(peers[i]) + if err == nil { + peers[i] = purl.Host + } + } + + log.Infof("Discovery fetched back peer list: %v", peers) + + return +} + +// getKnownPeers gets the previous peers from log +func (s *PeerServer) getKnownPeers() []string { + peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) + for i := range peers { + u, err := url.Parse(peers[i]) + if err != nil { + log.Debug("getPrevPeers cannot parse url %v", peers[i]) + } + peers[i] = u.Host + } + return peers +} + +// removeSelfFromList removes url of the peerServer from the peer list +func (s *PeerServer) removeSelfFromList(peers []string) []string { + // Remove its own peer address from the peer list to join + u, err := url.Parse(s.Config.URL) + if err != nil { + log.Fatalf("removeSelfFromList cannot parse peer address %v", s.Config.URL) + } + newPeers := make([]string, 0) + for _, v := range peers { + if v != u.Host { + newPeers = append(newPeers, v) + } + } + return newPeers +} + func (s *PeerServer) joinCluster(cluster []string) bool { for _, peer := range cluster { if len(peer) == 0 { diff --git a/tests/functional/discovery_test.go b/tests/functional/discovery_test.go index 9a8c2ea3a..9a7e2bb64 100644 --- a/tests/functional/discovery_test.go +++ b/tests/functional/discovery_test.go @@ -292,6 +292,53 @@ func TestDiscoverySecondPeerUp(t *testing.T) { }) } +// TestDiscoveryRestart ensures that a discovery cluster could be restarted. +func TestDiscoveryRestart(t *testing.T) { + etcdtest.RunServer(func(s *server.Server) { + proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4"}) + if err != nil { + t.Fatal(err.Error()) + } + + client := http.Client{} + err = assertServerFunctional(client, "http") + if err != nil { + t.Fatal(err.Error()) + } + + proc2, err := startServer2([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4", "-addr", "127.0.0.1:4002", "-peer-addr", "127.0.0.1:7002"}) + if err != nil { + t.Fatal(err.Error()) + } + + err = assertServerFunctional(client, "http") + if err != nil { + t.Fatal(err.Error()) + } + + stopServer(proc) + stopServer(proc2) + + proc, err = startServerWithDataDir([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4"}) + if err != nil { + t.Fatal(err.Error()) + } + proc2, err = startServer2WithDataDir([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4", "-addr", "127.0.0.1:4002", "-peer-addr", "127.0.0.1:7002"}) + if err != nil { + t.Fatal(err.Error()) + } + + err = assertServerFunctional(client, "http") + if err != nil { + t.Fatal(err.Error()) + } + + stopServer(proc) + stopServer(proc2) + }) +} + + func assertServerNotUp(client http.Client, scheme string) error { path := fmt.Sprintf("%s://127.0.0.1:4001/v2/keys/foo", scheme) fields := url.Values(map[string][]string{"value": {"bar"}}) diff --git a/tests/functional/etcd_tls_test.go b/tests/functional/etcd_tls_test.go index 1089dcd04..367700816 100644 --- a/tests/functional/etcd_tls_test.go +++ b/tests/functional/etcd_tls_test.go @@ -166,6 +166,19 @@ func startServer(extra []string) (*os.Process, error) { return os.StartProcess(EtcdBinPath, cmd, procAttr) } +// TODO(yichengq): refactor these helper functions in #645 +func startServer2(extra []string) (*os.Process, error) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + cmd := []string{"etcd", "-f", "-data-dir=/tmp/node2", "-name=node2"} + cmd = append(cmd, extra...) + + fmt.Println(strings.Join(cmd, " ")) + + return os.StartProcess(EtcdBinPath, cmd, procAttr) +} + func startServerWithDataDir(extra []string) (*os.Process, error) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} @@ -173,6 +186,18 @@ func startServerWithDataDir(extra []string) (*os.Process, error) { cmd := []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"} cmd = append(cmd, extra...) + fmt.Println(strings.Join(cmd, " ")) + + return os.StartProcess(EtcdBinPath, cmd, procAttr) +} + +func startServer2WithDataDir(extra []string) (*os.Process, error) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + cmd := []string{"etcd", "-data-dir=/tmp/node2", "-name=node2"} + cmd = append(cmd, extra...) + println(strings.Join(cmd, " ")) return os.StartProcess(EtcdBinPath, cmd, procAttr) diff --git a/tests/functional/v1_migration_test.go b/tests/functional/v1_migration_test.go index 75aaebb3f..0e457a780 100644 --- a/tests/functional/v1_migration_test.go +++ b/tests/functional/v1_migration_test.go @@ -95,7 +95,7 @@ func TestV1ClusterMigration(t *testing.T) { body := tests.ReadBody(resp) assert.Nil(t, err, "") assert.Equal(t, resp.StatusCode, http.StatusNotFound) - assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":11}`+"\n") + assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":10}`+"\n") // Ensure TTL'd message is removed. resp, err = tests.Get("http://localhost:4001/v2/keys/foo")