fix(peer_server): recover from outage with discovery

This patch also contains the refactor of find cluster process.
It is changed based on @xiangli-cmu 's commits in 627 issue.
This commit is contained in:
Yicheng Qin 2014-04-10 02:17:01 -07:00
parent 8c0c427870
commit 03839ca806
4 changed files with 196 additions and 88 deletions

View File

@ -167,113 +167,91 @@ func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
s.clusterConfig = c
}
// Helper function to do discovery and return results in expected format
func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
// Try all possible ways to find clusters to join
// Include log data in -data-dir, -discovery and -peers
//
// Peer discovery follows this order:
// 1. previous peers in -data-dir
// 2. -discovery
// 3. -peers
//
// TODO(yichengq): RaftServer should be started as late as possible.
// Current implementation to start it is not that good,
// and should be refactored later.
func (s *PeerServer) findCluster(discoverURL string, peers []string) {
name := s.Config.Name
isNewNode := s.raftServer.IsLogEmpty()
// Warn about errors coming from discovery, this isn't fatal
// since the user might have provided a peer list elsewhere,
// or there is some log in data dir.
if err != nil {
log.Warnf("Discovery encountered an error: %v", err)
// Try its best to find possible peers, and connect with them.
if !isNewNode {
// Take old nodes into account.
allPeers := s.getKnownPeers()
// Discover registered peers.
// TODO(yichengq): It may mess up discoverURL if this is
// set wrong by mistake. This may need to refactor discovery
// module. Fix it later.
if discoverURL != "" {
discoverPeers, _ := s.handleDiscovery(discoverURL)
allPeers = append(allPeers, discoverPeers...)
}
allPeers = append(allPeers, peers...)
allPeers = s.removeSelfFromList(allPeers)
// If there is possible peer list, use it to find cluster.
if len(allPeers) > 0 {
// TODO(yichengq): joinCluster may fail if there's no leader for
// current cluster. It should wait if the cluster is under
// leader election, or the node with changed IP cannot join
// the cluster then.
if err := s.startAsFollower(allPeers, 1); err == nil {
log.Debugf("%s joins to the previous cluster %v", name, allPeers)
return
}
log.Warnf("%s cannot connect to previous cluster %v", name, allPeers)
}
// TODO(yichengq): Think about the action that should be done
// if it cannot connect any of the previous known node.
s.raftServer.Start()
log.Debugf("%s is restarting the cluster %v", name, allPeers)
return
}
for i := range peers {
// Strip the scheme off of the peer if it has one
// TODO(bp): clean this up!
purl, err := url.Parse(peers[i])
if err == nil {
peers[i] = purl.Host
}
}
log.Infof("Discovery fetched back peer list: %v", peers)
return
}
// Try all possible ways to find clusters to join
// Include -discovery, -peers and log data in -data-dir
//
// Peer discovery follows this order:
// 1. -discovery
// 2. -peers
// 3. previous peers in -data-dir
// RaftServer should be started as late as possible. Current implementation
// to start it is not that good, and will be refactored in #627.
func (s *PeerServer) findCluster(discoverURL string, peers []string) {
// Attempt cluster discovery
toDiscover := discoverURL != ""
if toDiscover {
if discoverURL != "" {
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
// It is registered in discover url
if discoverErr == nil {
// start as a leader in a new cluster
if len(discoverPeers) == 0 {
log.Debug("This peer is starting a brand new cluster based on discover URL.")
log.Debugf("%s is starting a new cluster via discover service", name)
s.startAsLeader()
} else {
s.startAsFollower(discoverPeers)
log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers)
if err := s.startAsFollower(discoverPeers, s.Config.RetryTimes); err != nil {
log.Fatal(err)
}
}
return
}
}
log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr)
hasPeerList := len(peers) > 0
// if there is log in data dir, append previous peers to peers in config
// to find cluster
prevPeers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
for i := 0; i < len(prevPeers); i++ {
u, err := url.Parse(prevPeers[i])
if err != nil {
log.Debug("rejoin cannot parse url: ", err)
}
prevPeers[i] = u.Host
}
peers = append(peers, prevPeers...)
// Remove its own peer address from the peer list to join
u, err := url.Parse(s.Config.URL)
if err != nil {
log.Fatalf("cannot parse peer address %v: %v", s.Config.URL, err)
}
filteredPeers := make([]string, 0)
for _, v := range peers {
if v != u.Host {
filteredPeers = append(filteredPeers, v)
if len(peers) == 0 {
log.Fatalf("%s, the new leader, must register itself to discovery service as required", name)
}
}
peers = filteredPeers
// if there is backup peer lists, use it to find cluster
if len(peers) > 0 {
ok := s.joinCluster(peers)
if !ok {
log.Warn("No living peers are found!")
} else {
s.raftServer.Start()
log.Debugf("%s restart as a follower based on peers[%v]", s.Config.Name)
return
if err := s.startAsFollower(peers, s.Config.RetryTimes); err != nil {
log.Fatalf("%s cannot connect to existing cluster %v", name, peers)
}
}
if !s.raftServer.IsLogEmpty() {
log.Debug("Entire cluster is down! %v will restart the cluster.", s.Config.Name)
s.raftServer.Start()
return
}
if toDiscover {
log.Fatalf("Discovery failed, no available peers in backup list, and no log data")
}
if hasPeerList {
log.Fatalf("No available peers in backup list, and no log data")
}
log.Infof("This peer is starting a brand new cluster now.")
log.Infof("%s is starting a new cluster.", s.Config.Name)
s.startAsLeader()
return
}
// Start the raft server
@ -373,19 +351,22 @@ func (s *PeerServer) startAsLeader() {
log.Debugf("%s start as a leader", s.Config.Name)
}
func (s *PeerServer) startAsFollower(cluster []string) {
func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error {
// start as a follower in a existing cluster
for i := 0; i < s.Config.RetryTimes; i++ {
for i := 0; ; i++ {
ok := s.joinCluster(cluster)
if ok {
s.raftServer.Start()
return
break
}
if i == retryTimes - 1 {
return fmt.Errorf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
}
log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
}
log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
s.raftServer.Start()
return nil
}
// getVersion fetches the peer version of a cluster.
@ -429,6 +410,61 @@ func (s *PeerServer) Upgradable() error {
return nil
}
// Helper function to do discovery and return results in expected format
func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
// Warn about errors coming from discovery, this isn't fatal
// since the user might have provided a peer list elsewhere,
// or there is some log in data dir.
if err != nil {
log.Warnf("Discovery encountered an error: %v", err)
return
}
for i := range peers {
// Strip the scheme off of the peer if it has one
// TODO(bp): clean this up!
purl, err := url.Parse(peers[i])
if err == nil {
peers[i] = purl.Host
}
}
log.Infof("Discovery fetched back peer list: %v", peers)
return
}
// getKnownPeers gets the previous peers from log
func (s *PeerServer) getKnownPeers() []string {
peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
for i := range peers {
u, err := url.Parse(peers[i])
if err != nil {
log.Debug("getPrevPeers cannot parse url %v", peers[i])
}
peers[i] = u.Host
}
return peers
}
// removeSelfFromList removes url of the peerServer from the peer list
func (s *PeerServer) removeSelfFromList(peers []string) []string {
// Remove its own peer address from the peer list to join
u, err := url.Parse(s.Config.URL)
if err != nil {
log.Fatalf("removeSelfFromList cannot parse peer address %v", s.Config.URL)
}
newPeers := make([]string, 0)
for _, v := range peers {
if v != u.Host {
newPeers = append(newPeers, v)
}
}
return newPeers
}
func (s *PeerServer) joinCluster(cluster []string) bool {
for _, peer := range cluster {
if len(peer) == 0 {

View File

@ -292,6 +292,53 @@ func TestDiscoverySecondPeerUp(t *testing.T) {
})
}
// TestDiscoveryRestart ensures that a discovery cluster could be restarted.
func TestDiscoveryRestart(t *testing.T) {
etcdtest.RunServer(func(s *server.Server) {
proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4"})
if err != nil {
t.Fatal(err.Error())
}
client := http.Client{}
err = assertServerFunctional(client, "http")
if err != nil {
t.Fatal(err.Error())
}
proc2, err := startServer2([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4", "-addr", "127.0.0.1:4002", "-peer-addr", "127.0.0.1:7002"})
if err != nil {
t.Fatal(err.Error())
}
err = assertServerFunctional(client, "http")
if err != nil {
t.Fatal(err.Error())
}
stopServer(proc)
stopServer(proc2)
proc, err = startServerWithDataDir([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4"})
if err != nil {
t.Fatal(err.Error())
}
proc2, err = startServer2WithDataDir([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4", "-addr", "127.0.0.1:4002", "-peer-addr", "127.0.0.1:7002"})
if err != nil {
t.Fatal(err.Error())
}
err = assertServerFunctional(client, "http")
if err != nil {
t.Fatal(err.Error())
}
stopServer(proc)
stopServer(proc2)
})
}
func assertServerNotUp(client http.Client, scheme string) error {
path := fmt.Sprintf("%s://127.0.0.1:4001/v2/keys/foo", scheme)
fields := url.Values(map[string][]string{"value": {"bar"}})

View File

@ -166,6 +166,19 @@ func startServer(extra []string) (*os.Process, error) {
return os.StartProcess(EtcdBinPath, cmd, procAttr)
}
// TODO(yichengq): refactor these helper functions in #645
func startServer2(extra []string) (*os.Process, error) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
cmd := []string{"etcd", "-f", "-data-dir=/tmp/node2", "-name=node2"}
cmd = append(cmd, extra...)
fmt.Println(strings.Join(cmd, " "))
return os.StartProcess(EtcdBinPath, cmd, procAttr)
}
func startServerWithDataDir(extra []string) (*os.Process, error) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
@ -173,6 +186,18 @@ func startServerWithDataDir(extra []string) (*os.Process, error) {
cmd := []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"}
cmd = append(cmd, extra...)
fmt.Println(strings.Join(cmd, " "))
return os.StartProcess(EtcdBinPath, cmd, procAttr)
}
func startServer2WithDataDir(extra []string) (*os.Process, error) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
cmd := []string{"etcd", "-data-dir=/tmp/node2", "-name=node2"}
cmd = append(cmd, extra...)
println(strings.Join(cmd, " "))
return os.StartProcess(EtcdBinPath, cmd, procAttr)

View File

@ -95,7 +95,7 @@ func TestV1ClusterMigration(t *testing.T) {
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":11}`+"\n")
assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":10}`+"\n")
// Ensure TTL'd message is removed.
resp, err = tests.Get("http://localhost:4001/v2/keys/foo")