Merge pull request #569 from unihorn/5

Ordering and functionality of `-discovery` `-peers` and data dir to find peers
This commit is contained in:
Yicheng Qin 2014-02-17 14:34:53 -08:00
commit 04f21b5976
10 changed files with 266 additions and 70 deletions

View File

@ -4,6 +4,8 @@
Starting an etcd cluster can be painful since each node needs to know of another node in the cluster to get started. If you are trying to bring up a cluster all at once, say using a cloud formation, you also need to coordinate who will be the initial cluster leader. The discovery protocol helps you by providing an automated way to discover other existing peers in a cluster.
Peer discovery for etcd is processed by `-discovery`, `-peers` and lastly log data in `-data-dir`. For more information see the [discovery design][discovery-design].
## Using discovery.etcd.io
### Create a Token
@ -43,3 +45,5 @@ The Discovery API submits the `-peer-addr` of each etcd instance to the configur
## Stale Peers
The discovery API will automatically clean up the address of a stale peer that is no longer part of the cluster. The TTL for this process is a week, which should be long enough to handle any extremely long outage you may encounter. There is no harm in having stale peers in the list until they are cleaned up, since an etcd instance only needs to connect to one valid peer in the cluster to join.
[discovery-design]: https://github.com/coreos/etcd/blob/master/Documentation/design/discovery.md

View File

@ -0,0 +1,45 @@
## Discovery Rule
Peer discovery uses the following sources in this order: `-discovery`, `-peers`, log data in `-data-dir`.
If none of these is set, it will start a new cluster by itself. If any of them is set, it will make
best efforts to find cluster, and panic if none is reachable.
If a discover URL is provided and the discovery process succeeds then it will find peers specified by the discover URL only.
This is because we assume that it has been registered in discover URL and
should not join other clusters.
If a discover URL is provided but the discovery process fails then we will prevent the node from forming
a new cluster. We assume the user doesn't want to start a brand new cluster without noticing discover URL.
## Logical Workflow
Start a etcd machine:
```
If discovery url is given:
Do discovery
If Success:
Join to the cluster discovered
return
If peer list is given:
Try to join as follower via peer list
If Success: return
If log data is given:
Try to join as follower via peers in previous cluster
If Success: return
If log data is given:
Restart the previous cluster which is down
return
If discovery url is given:
Panic
If peer list is given:
Panic
Start as the leader of a new cluster
```

View File

@ -14,7 +14,6 @@ import (
"github.com/coreos/etcd/third_party/github.com/BurntSushi/toml"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/log"
ustrings "github.com/coreos/etcd/pkg/strings"
"github.com/coreos/etcd/server"
@ -144,13 +143,6 @@ func (c *Config) Load(arguments []string) error {
return fmt.Errorf("sanitize: %v", err)
}
// Attempt cluster discovery
if c.Discovery != "" {
if err := c.handleDiscovery(); err != nil {
return err
}
}
// Force remove server configuration if specified.
if c.Force {
c.Reset()
@ -215,36 +207,6 @@ func (c *Config) loadEnv(target interface{}) error {
return nil
}
func (c *Config) handleDiscovery() error {
p, err := discovery.Do(c.Discovery, c.Name, c.Peer.Addr)
// This is fatal, discovery encountered an unexpected error
// and we have no peer list.
if err != nil && len(c.Peers) == 0 {
log.Fatalf("Discovery failed and a backup peer list wasn't provided: %v", err)
return err
}
// Warn about errors coming from discovery, this isn't fatal
// since the user might have provided a peer list elsewhere.
if err != nil {
log.Warnf("Discovery encountered an error but a backup peer list (%v) was provided: %v", c.Peers, err)
}
for i := range p {
// Strip the scheme off of the peer if it has one
// TODO(bp): clean this up!
purl, err := url.Parse(p[i])
if err == nil {
p[i] = purl.Host
}
}
c.Peers = p
return nil
}
// Loads configuration from command line flags.
func (c *Config) LoadFlags(arguments []string) error {
var peers, cors, path string

View File

@ -187,7 +187,7 @@ func main() {
}
ps.SetServer(s)
ps.Start(config.Snapshot, config.Peers)
ps.Start(config.Snapshot, config.Discovery, config.Peers)
go func() {
log.Infof("peer server [name %s, listen on %s, advertised url %s]", ps.Config.Name, psListener.Addr(), ps.Config.URL)

View File

@ -14,6 +14,7 @@ import (
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
"github.com/coreos/etcd/discovery"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/metrics"
@ -99,8 +100,100 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
s.raftServer = raftServer
}
// Helper function to do discovery and return results in expected format
func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
// Warn about errors coming from discovery, this isn't fatal
// since the user might have provided a peer list elsewhere,
// or there is some log in data dir.
if err != nil {
log.Warnf("Discovery encountered an error: %v", err)
return
}
for i := range peers {
// Strip the scheme off of the peer if it has one
// TODO(bp): clean this up!
purl, err := url.Parse(peers[i])
if err == nil {
peers[i] = purl.Host
}
}
log.Infof("Discovery fetched back peer list: %v", peers)
return
}
// Try all possible ways to find clusters to join
// Include -discovery, -peers and log data in -data-dir
//
// Peer discovery follows this order:
// 1. -discovery
// 2. -peers
// 3. previous peers in -data-dir
func (s *PeerServer) findCluster(discoverURL string, peers []string) {
// Attempt cluster discovery
toDiscover := discoverURL != ""
if toDiscover {
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
// It is registered in discover url
if discoverErr == nil {
// start as a leader in a new cluster
if len(discoverPeers) == 0 {
log.Debug("This peer is starting a brand new cluster based on discover URL.")
s.startAsLeader()
} else {
s.startAsFollower(discoverPeers)
}
return
}
}
hasPeerList := len(peers) > 0
// if there is log in data dir, append previous peers to peers in config
// to find cluster
prevPeers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
for i := 0; i < len(prevPeers); i++ {
u, err := url.Parse(prevPeers[i])
if err != nil {
log.Debug("rejoin cannot parse url: ", err)
}
prevPeers[i] = u.Host
}
peers = append(peers, prevPeers...)
// if there is backup peer lists, use it to find cluster
if len(peers) > 0 {
ok := s.joinCluster(peers)
if !ok {
log.Warn("No living peers are found!")
} else {
log.Debugf("%s restart as a follower based on peers[%v]", s.Config.Name)
return
}
}
if !s.raftServer.IsLogEmpty() {
log.Debug("Entire cluster is down! %v will restart the cluster.", s.Config.Name)
return
}
if toDiscover {
log.Fatalf("Discovery failed, no available peers in backup list, and no log data")
}
if hasPeerList {
log.Fatalf("No available peers in backup list, and no log data")
}
log.Infof("This peer is starting a brand new cluster now.")
s.startAsLeader()
}
// Start the raft server
func (s *PeerServer) Start(snapshot bool, cluster []string) error {
func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error {
// LoadSnapshot
if snapshot {
err := s.raftServer.LoadSnapshot()
@ -114,31 +207,7 @@ func (s *PeerServer) Start(snapshot bool, cluster []string) error {
s.raftServer.Start()
if s.raftServer.IsLogEmpty() {
// start as a leader in a new cluster
if len(cluster) == 0 {
s.startAsLeader()
} else {
s.startAsFollower(cluster)
}
} else {
// Rejoin the previous cluster
cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
for i := 0; i < len(cluster); i++ {
u, err := url.Parse(cluster[i])
if err != nil {
log.Debug("rejoin cannot parse url: ", err)
}
cluster[i] = u.Host
}
ok := s.joinCluster(cluster)
if !ok {
log.Warn("the entire cluster is down! this peer will restart the cluster.")
}
log.Debugf("%s restart as a follower", s.Config.Name)
}
s.findCluster(discoverURL, peers)
s.closeChan = make(chan bool)
@ -209,7 +278,7 @@ func (s *PeerServer) startAsFollower(cluster []string) {
if ok {
return
}
log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %.1f seconds", cluster, s.Config.RetryInterval)
log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
}

View File

@ -94,6 +94,18 @@ func (r *Registry) clientURL(name string) (string, bool) {
return "", false
}
// TODO(yichengq): have all of the code use a full URL with scheme
// and remove this method
// PeerHost retrieves the host part of peer URL for a given node by name.
func (r *Registry) PeerHost(name string) (string, bool) {
rawurl, ok := r.PeerURL(name)
if ok {
u, _ := url.Parse(rawurl)
return u.Host, ok
}
return rawurl, ok
}
// Retrieves the peer URL for a given node by name.
func (r *Registry) PeerURL(name string) (string, bool) {
r.Lock()

View File

@ -79,6 +79,11 @@ func (s *Server) URL() string {
return s.url
}
// PeerHost retrieves the host part of Peer URL for a given node name.
func (s *Server) PeerHost(name string) (string, bool) {
return s.registry.PeerHost(name)
}
// Retrives the Peer URL for a given node name.
func (s *Server) PeerURL(name string) (string, bool) {
return s.registry.PeerURL(name)

View File

@ -65,7 +65,7 @@ func TestDiscoveryDownWithBackupPeers(t *testing.T) {
defer ts.Close()
discover := ts.URL + "/v2/keys/_etcd/registry/1"
u, ok := s.PeerURL("ETCDTEST")
u, ok := s.PeerHost("ETCDTEST")
if !ok {
t.Fatalf("Couldn't find the URL")
}
@ -88,6 +88,82 @@ func TestDiscoveryDownWithBackupPeers(t *testing.T) {
})
}
// TestDiscoveryNoWithBackupPeers ensures that etcd runs if it is started with
// no discovery URL and a peer list.
func TestDiscoveryNoWithBackupPeers(t *testing.T) {
etcdtest.RunServer(func(s *server.Server) {
u, ok := s.PeerHost("ETCDTEST")
if !ok {
t.Fatalf("Couldn't find the URL")
}
proc, err := startServer([]string{"-peers", u})
if err != nil {
t.Fatal(err.Error())
}
defer stopServer(proc)
client := http.Client{}
err = assertServerFunctional(client, "http")
if err != nil {
t.Fatal(err.Error())
}
})
}
// TestDiscoveryDownNoBackupPeersWithDataDir ensures that etcd runs if it is
// started with a bad discovery URL, no backups and valid data dir.
func TestDiscoveryDownNoBackupPeersWithDataDir(t *testing.T) {
etcdtest.RunServer(func(s *server.Server) {
u, ok := s.PeerHost("ETCDTEST")
if !ok {
t.Fatalf("Couldn't find the URL")
}
// run etcd and connect to ETCDTEST server
proc, err := startServer([]string{"-peers", u})
if err != nil {
t.Fatal(err.Error())
}
// check it runs well
client := http.Client{}
err = assertServerFunctional(client, "http")
if err != nil {
t.Fatal(err.Error())
}
// stop etcd, and leave valid data dir for later usage
stopServer(proc)
g := garbageHandler{t: t}
ts := httptest.NewServer(&g)
defer ts.Close()
discover := ts.URL + "/v2/keys/_etcd/registry/1"
// connect to ETCDTEST server again with previous data dir
proc, err = startServerWithDataDir([]string{"-discovery", discover})
if err != nil {
t.Fatal(err.Error())
}
defer stopServer(proc)
// TODO(yichengq): it needs some time to do leader election
// improve to get rid of it
time.Sleep(1 * time.Second)
client = http.Client{}
err = assertServerFunctional(client, "http")
if err != nil {
t.Fatal(err.Error())
}
if !g.success {
t.Fatal("Discovery server never called")
}
})
}
// TestDiscoveryFirstPeer ensures that etcd starts as the leader if it
// registers as the first peer.
func TestDiscoveryFirstPeer(t *testing.T) {

View File

@ -167,6 +167,18 @@ func startServer(extra []string) (*os.Process, error) {
return os.StartProcess(EtcdBinPath, cmd, procAttr)
}
func startServerWithDataDir(extra []string) (*os.Process, error) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
cmd := []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"}
cmd = append(cmd, extra...)
println(strings.Join(cmd, " "))
return os.StartProcess(EtcdBinPath, cmd, procAttr)
}
func stopServer(proc *os.Process) {
err := proc.Kill()
if err != nil {
@ -183,8 +195,19 @@ func assertServerFunctional(client http.Client, scheme string) error {
time.Sleep(1 * time.Second)
resp, err := client.PostForm(path, fields)
// If the status is Temporary Redirect, we should follow the
// new location, because the request did not go to the leader yet.
// TODO(yichengq): the difference between Temporary Redirect(307)
// and Created(201) could distinguish between leader and followers
for err == nil && resp.StatusCode == http.StatusTemporaryRedirect {
loc, _ := resp.Location()
newPath := loc.String()
resp, err = client.PostForm(newPath, fields)
}
if err == nil {
if resp.StatusCode != 201 {
// Internal error may mean that servers are in leader election
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusInternalServerError {
return errors.New(fmt.Sprintf("resp.StatusCode == %s", resp.Status))
} else {
return nil
@ -192,7 +215,7 @@ func assertServerFunctional(client http.Client, scheme string) error {
}
}
return errors.New("etcd server was not reachable in time")
return errors.New("etcd server was not reachable in time / had internal error")
}
func assertServerNotFunctional(client http.Client, scheme string) error {

View File

@ -76,7 +76,7 @@ func RunServer(f func(*server.Server)) {
c := make(chan bool)
go func() {
c <- true
ps.Start(false, []string{})
ps.Start(false, "", []string{})
h := waitHandler{w, ps.HTTPHandler()}
http.Serve(psListener, &h)
}()