From 0e50d9787abc27e3c118efdfc8f9ec7061413409 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Mon, 27 Jan 2014 14:20:05 -0800 Subject: [PATCH 01/11] feat(*): bootstrap initial commit Setup the flags, and checkin the docs. Lets do this! --- Documentation/boostrap-protocol.md | 82 ++++++++++++++++++++++++++++++ Documentation/configuration.md | 1 + server/config.go | 2 + server/usage.go | 1 + 4 files changed, 86 insertions(+) create mode 100644 Documentation/boostrap-protocol.md diff --git a/Documentation/boostrap-protocol.md b/Documentation/boostrap-protocol.md new file mode 100644 index 000000000..c259f0329 --- /dev/null +++ b/Documentation/boostrap-protocol.md @@ -0,0 +1,82 @@ +# Bootstrap Protocol + +Bootstrapping an etcd cluster can be painful since each node needs to know of another node in the cluster to get started. If you are trying to bring up a cluster all at once, say using a cloud formation, you also need to coordinate who will be the initial cluster leader. The bootstrapping protocol helps you by providing a way to bootstrap an etcd instance using another already running instance. + +To enable use of this protocol you add the command line flag `-bootstrap-url` to your etcd args. In this example we will use `http://example.com/v2/keys/bootstrap/` as the URL prefix. + +## The Protocol + +By convention the etcd bootstrapping protocol uses the key prefix `bootstrap`. A full URL to the keyspace will be `http://example.com/v2/keys/bootstrap`. + +## Creating a New Cluster + +Generate a unique (secret) token that will identify the cluster and create a key called "_state". If you get a `201 Created` back then your key is unused and you can proceed with cluster creation. If the return value is `412 Precondition Failed` then you will need to create a new token. + +``` +UUID=$(uuidgen) +curl -X PUT "http://example.com/v2/keys/bootstrap/${UUID}/_state?prevExist=false?ttl=" -d value=init +``` + +## Bringing up Machines + +Now that you have your cluster ID you can start bringing up machines. Every machine will follow this protocol internally in etcd if given a `-bootstrap-url`. + +### Registering your Machine + +The first thing etcd must do is register your machine. This is done by using the machine name (from the `-name` arg) and posting it with a long TTL to the given key. + +``` +curl -X PUT "http://example.com/v2/keys/bootstrap/${UUID}/${etcd_machine_name}?ttl=604800" -d value=${peer_addr} +``` + +### Figuring out your Peers + +Now that this etcd machine is registered it must figure out its peers. + +But, the tricky bit of bootstrapping is that one machine needs to assume the initial role of leader and will have no peers. To figure out if another machine has taken on this etcd needs to update the `_state` key from "init" to "started": + +``` +curl -X PUT "http://example.com/v2/keys/bootstrap/${UUID}/_state?prevValue=init" -d value=started +``` + +If this returns a `200 OK` response then this machine is the initial leader and should start with no peers configured. If, however, this returns a `412 Precondition Failed` then you need to find all of the registered peers: + +``` +curl -X GET "http://example.com/v2/keys/bootstrap/${UUID}?recursive=true" +``` + +``` +{ + "action": "get", + "node": { + "createdIndex": 11, + "dir": true, + "key": "/bootstrap/9D4258A5-A1D3-4074-8837-31C1E091131D", + "modifiedIndex": 11, + "nodes": [ + { + "createdIndex": 16, + "expiration": "2014-02-03T13:19:57.631253589-08:00", + "key": "/bootstrap/9D4258A5-A1D3-4074-8837-31C1E091131D/peer1", + "modifiedIndex": 16, + "ttl": 604765, + "value": "127.0.0.1:7001" + }, + { + "createdIndex": 17, + "expiration": "2014-02-03T13:19:57.631253589-08:00", + "key": "/bootstrap/9D4258A5-A1D3-4074-8837-31C1E091131D/peer2", + "modifiedIndex": 17, + "ttl": 604765, + "value": "127.0.0.1:7002" + } + ] + } +} +``` + +Using this information you can connect to the rest of the peers in the cluster. + +### Heartbeating + +At this point you will want to heartbeat your registration URL every few hours. This will be done via a Go routine inside of etcd. \ No newline at end of file diff --git a/Documentation/configuration.md b/Documentation/configuration.md index 7142c19ef..894a3a2f3 100644 --- a/Documentation/configuration.md +++ b/Documentation/configuration.md @@ -19,6 +19,7 @@ configuration files. ### Optional * `-addr` - The advertised public hostname:port for client communication. Defaults to `127.0.0.1:4001`. +* `-bootstrap-url` - A URL to use for bootstrapping the peer list. (i.e `"https://bootstrap.etcd.io/unique-key"`). * `-bind-addr` - The listening hostname for client communication. Defaults to advertised ip. * `-peers` - A comma separated list of peers in the cluster (i.e `"203.0.113.101:7001,203.0.113.102:7001"`). * `-peers-file` - The file path containing a comma separated list of peers in the cluster. diff --git a/server/config.go b/server/config.go index bec140d18..da4bd4cf2 100644 --- a/server/config.go +++ b/server/config.go @@ -48,6 +48,7 @@ type Config struct { Addr string `toml:"addr" env:"ETCD_ADDR"` BindAddr string `toml:"bind_addr" env:"ETCD_BIND_ADDR"` + BootstrapURL string `toml:"bootstrap_url" env:"ETCD_BOOTSTRAP_URL"` CAFile string `toml:"ca_file" env:"ETCD_CA_FILE"` CertFile string `toml:"cert_file" env:"ETCD_CERT_FILE"` CPUProfileFile string @@ -225,6 +226,7 @@ func (c *Config) LoadFlags(arguments []string) error { f.StringVar(&c.Name, "name", c.Name, "") f.StringVar(&c.Addr, "addr", c.Addr, "") + f.StringVar(&c.BootstrapURL, "bootstrap-url", c.BootstrapURL, "") f.StringVar(&c.BindAddr, "bind-addr", c.BindAddr, "") f.StringVar(&c.Peer.Addr, "peer-addr", c.Peer.Addr, "") f.StringVar(&c.Peer.BindAddr, "peer-bind-addr", c.Peer.BindAddr, "") diff --git a/server/usage.go b/server/usage.go index 55762f28e..4d9340515 100644 --- a/server/usage.go +++ b/server/usage.go @@ -26,6 +26,7 @@ Options: -vv Enabled very verbose logging. Cluster Configuration Options: + -bootstrap-url= URL to use for bootstrapping the peer list. -peers-file= Path to a file containing the peer list. -peers=, Comma-separated list of peers. The members should match the peer's '-peer-addr' flag. From 69922340f67f2e124d8922b534946595d64bb112 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Mon, 27 Jan 2014 15:00:10 -0800 Subject: [PATCH 02/11] refactor(server): move utilities into pkg like camlistore lets move these utilities into a `pkg` prefix. --- server/util.go => pkg/http/http.go | 18 +++--------------- server/peer_server_handlers.go | 4 +++- server/server.go | 5 +++-- 3 files changed, 9 insertions(+), 18 deletions(-) rename server/util.go => pkg/http/http.go (57%) diff --git a/server/util.go b/pkg/http/http.go similarity index 57% rename from server/util.go rename to pkg/http/http.go index 729b64f65..a849c1dac 100644 --- a/server/util.go +++ b/pkg/http/http.go @@ -1,4 +1,4 @@ -package server +package http import ( "encoding/json" @@ -6,12 +6,11 @@ import ( "io" "net/http" "net/url" - "strings" "github.com/coreos/etcd/log" ) -func decodeJsonRequest(req *http.Request, data interface{}) error { +func DecodeJsonRequest(req *http.Request, data interface{}) error { decoder := json.NewDecoder(req.Body) if err := decoder.Decode(&data); err != nil && err != io.EOF { log.Warnf("Malformed json request: %v", err) @@ -20,7 +19,7 @@ func decodeJsonRequest(req *http.Request, data interface{}) error { return nil } -func redirect(hostname string, w http.ResponseWriter, req *http.Request) { +func Redirect(hostname string, w http.ResponseWriter, req *http.Request) { originalURL := req.URL redirectURL, _ := url.Parse(hostname) @@ -32,14 +31,3 @@ func redirect(hostname string, w http.ResponseWriter, req *http.Request) { log.Debugf("Redirect to %s", redirectURL.String()) http.Redirect(w, req, redirectURL.String(), http.StatusTemporaryRedirect) } - -// trimsplit slices s into all substrings separated by sep and returns a -// slice of the substrings between the separator with all leading and trailing -// white space removed, as defined by Unicode. -func trimsplit(s, sep string) []string { - trimmed := strings.Split(s, sep) - for i := range trimmed { - trimmed[i] = strings.TrimSpace(trimmed[i]) - } - return trimmed -} diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index 92129add5..f97c2470f 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -7,8 +7,10 @@ import ( "time" etcdErr "github.com/coreos/etcd/error" + uhttp "github.com/coreos/etcd/pkg/http" "github.com/coreos/etcd/log" "github.com/coreos/etcd/store" + "github.com/coreos/etcd/third_party/github.com/coreos/raft" "github.com/coreos/etcd/third_party/github.com/gorilla/mux" ) @@ -149,7 +151,7 @@ func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Reques func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) { command := &JoinCommand{} - err := decodeJsonRequest(req, command) + err := uhttp.DecodeJsonRequest(req, command) if err != nil { w.WriteHeader(http.StatusInternalServerError) return diff --git a/server/server.go b/server/server.go index 60af52c97..eb6539f54 100644 --- a/server/server.go +++ b/server/server.go @@ -15,6 +15,7 @@ import ( "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" "github.com/coreos/etcd/mod" + uhttp "github.com/coreos/etcd/pkg/http" "github.com/coreos/etcd/server/v1" "github.com/coreos/etcd/server/v2" "github.com/coreos/etcd/store" @@ -244,7 +245,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque default: url, _ = ps.registry.ClientURL(leader) } - redirect(url, w, req) + uhttp.Redirect(url, w, req) return nil } @@ -295,7 +296,7 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) return etcdErr.NewError(300, "", s.Store().Index()) } hostname, _ := s.registry.ClientURL(leader) - redirect(hostname, w, req) + uhttp.Redirect(hostname, w, req) return nil } From f56965b1c03d190bafb70455188803152268acff Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Mon, 27 Jan 2014 15:03:01 -0800 Subject: [PATCH 03/11] refactor(config): make config its own package Refactor config into its own package. Trying to tease the config from the server so that all of the control surfaces are exposed in the Server for easier testing. --- {server => config}/config.go | 32 ++++++++++++++++--------------- {server => config}/config_test.go | 2 +- {server => config}/timeout.go | 2 +- etcd.go | 3 ++- pkg/strings/string.go | 16 ++++++++++++++++ 5 files changed, 37 insertions(+), 18 deletions(-) rename {server => config}/config.go (95%) rename {server => config}/config_test.go (99%) rename {server => config}/timeout.go (93%) create mode 100644 pkg/strings/string.go diff --git a/server/config.go b/config/config.go similarity index 95% rename from server/config.go rename to config/config.go index da4bd4cf2..8aa534317 100644 --- a/server/config.go +++ b/config/config.go @@ -1,4 +1,4 @@ -package server +package config import ( "encoding/json" @@ -15,6 +15,8 @@ import ( "github.com/coreos/etcd/third_party/github.com/BurntSushi/toml" "github.com/coreos/etcd/log" + "github.com/coreos/etcd/server" + ustrings "github.com/coreos/etcd/pkg/strings" ) // The default location for the etcd configuration file. @@ -82,8 +84,8 @@ type Config struct { GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"` } -// NewConfig returns a Config initialized with default values. -func NewConfig() *Config { +// New returns a Config initialized with default values. +func New() *Config { c := new(Config) c.SystemPath = DefaultSystemConfigPath c.Addr = "127.0.0.1:4001" @@ -197,7 +199,7 @@ func (c *Config) loadEnv(target interface{}) error { case reflect.String: value.Field(i).SetString(v) case reflect.Slice: - value.Field(i).Set(reflect.ValueOf(trimsplit(v, ","))) + value.Field(i).Set(reflect.ValueOf(ustrings.TrimSplit(v, ","))) } } return nil @@ -293,10 +295,10 @@ func (c *Config) LoadFlags(arguments []string) error { // Convert some parameters to lists. if peers != "" { - c.Peers = trimsplit(peers, ",") + c.Peers = ustrings.TrimSplit(peers, ",") } if cors != "" { - c.CorsOrigins = trimsplit(cors, ",") + c.CorsOrigins = ustrings.TrimSplit(cors, ",") } return nil @@ -312,7 +314,7 @@ func (c *Config) LoadPeersFile() error { if err != nil { return fmt.Errorf("Peers file error: %s", err) } - c.Peers = trimsplit(string(b), ",") + c.Peers = ustrings.TrimSplit(string(b), ",") return nil } @@ -355,8 +357,8 @@ func (c *Config) Reset() error { } // Reads the info file from the file system or initializes it based on the config. -func (c *Config) Info() (*Info, error) { - info := &Info{} +func (c *Config) Info() (*server.Info, error) { + info := &server.Info{} path := filepath.Join(c.DataDir, "info") // Open info file and read it out. @@ -434,8 +436,8 @@ func (c *Config) Sanitize() error { } // TLSInfo retrieves a TLSInfo object for the client server. -func (c *Config) TLSInfo() TLSInfo { - return TLSInfo{ +func (c *Config) TLSInfo() server.TLSInfo { + return server.TLSInfo{ CAFile: c.CAFile, CertFile: c.CertFile, KeyFile: c.KeyFile, @@ -443,13 +445,13 @@ func (c *Config) TLSInfo() TLSInfo { } // ClientTLSConfig generates the TLS configuration for the client server. -func (c *Config) TLSConfig() (TLSConfig, error) { +func (c *Config) TLSConfig() (server.TLSConfig, error) { return c.TLSInfo().Config() } // PeerTLSInfo retrieves a TLSInfo object for the peer server. -func (c *Config) PeerTLSInfo() TLSInfo { - return TLSInfo{ +func (c *Config) PeerTLSInfo() server.TLSInfo { + return server.TLSInfo{ CAFile: c.Peer.CAFile, CertFile: c.Peer.CertFile, KeyFile: c.Peer.KeyFile, @@ -457,7 +459,7 @@ func (c *Config) PeerTLSInfo() TLSInfo { } // PeerTLSConfig generates the TLS configuration for the peer server. -func (c *Config) PeerTLSConfig() (TLSConfig, error) { +func (c *Config) PeerTLSConfig() (server.TLSConfig, error) { return c.PeerTLSInfo().Config() } diff --git a/server/config_test.go b/config/config_test.go similarity index 99% rename from server/config_test.go rename to config/config_test.go index 0cbee1217..5eb390511 100644 --- a/server/config_test.go +++ b/config/config_test.go @@ -1,4 +1,4 @@ -package server +package config import ( "io/ioutil" diff --git a/server/timeout.go b/config/timeout.go similarity index 93% rename from server/timeout.go rename to config/timeout.go index 9d117f9a2..551635e07 100644 --- a/server/timeout.go +++ b/config/timeout.go @@ -1,4 +1,4 @@ -package server +package config const ( // The amount of time (in ms) to elapse without a heartbeat before becoming a candidate diff --git a/etcd.go b/etcd.go index 8a81ed23a..5edee4c16 100644 --- a/etcd.go +++ b/etcd.go @@ -27,6 +27,7 @@ import ( "github.com/coreos/etcd/third_party/github.com/coreos/raft" ehttp "github.com/coreos/etcd/http" + "github.com/coreos/etcd/config" "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" "github.com/coreos/etcd/server" @@ -35,7 +36,7 @@ import ( func main() { // Load configuration. - var config = server.NewConfig() + var config = config.New() if err := config.Load(os.Args[1:]); err != nil { fmt.Println(server.Usage() + "\n") fmt.Println(err.Error() + "\n") diff --git a/pkg/strings/string.go b/pkg/strings/string.go new file mode 100644 index 000000000..5d898e43a --- /dev/null +++ b/pkg/strings/string.go @@ -0,0 +1,16 @@ +package string + +import ( + "strings" +) + +// TrimSplit slices s into all substrings separated by sep and returns a +// slice of the substrings between the separator with all leading and trailing +// white space removed, as defined by Unicode. +func TrimSplit(s, sep string) []string { + trimmed := strings.Split(s, sep) + for i := range trimmed { + trimmed[i] = strings.TrimSpace(trimmed[i]) + } + return trimmed +} From 40a8542c22958540a4991fb8b0e805c6a6633b55 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Mon, 27 Jan 2014 15:06:19 -0800 Subject: [PATCH 04/11] feat(bootstrap): wire up the flag This wires up `-bootstrap-url` to some code (which crashes) :) --- bootstrap/bootstrap.go | 5 +++++ config/config.go | 8 ++++++++ 2 files changed, 13 insertions(+) create mode 100644 bootstrap/bootstrap.go diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go new file mode 100644 index 000000000..6387bd8b5 --- /dev/null +++ b/bootstrap/bootstrap.go @@ -0,0 +1,5 @@ +package bootstrap + +func Do(c string) error { + panic(c) +} diff --git a/config/config.go b/config/config.go index 8aa534317..ea886b3fa 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,8 @@ import ( "strings" "github.com/coreos/etcd/third_party/github.com/BurntSushi/toml" + + "github.com/coreos/etcd/bootstrap" "github.com/coreos/etcd/log" "github.com/coreos/etcd/server" ustrings "github.com/coreos/etcd/pkg/strings" @@ -136,6 +138,12 @@ func (c *Config) Load(arguments []string) error { return err } + if c.BootstrapURL != "" { + if err := bootstrap.Do(c.BootstrapURL); err != nil { + return nil + } + } + // Sanitize all the input fields. if err := c.Sanitize(); err != nil { return fmt.Errorf("sanitize: %v", err) From 72514f8ab2e6469093682436cf56e8ff2e59e8bf Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Wed, 29 Jan 2014 08:21:23 -0800 Subject: [PATCH 05/11] feat(bootstrap): initial working code This is an initial version of the bootstrap code that seems to work under the normal circumstances. I need to mock out a server that will test out all of the error cases now. --- Documentation/boostrap-protocol.md | 82 ---------------- Documentation/configuration.md | 2 +- Documentation/discovery-protocol.md | 82 ++++++++++++++++ bootstrap/bootstrap.go | 5 - config/config.go | 33 +++++-- config/config_test.go | 108 ++++++++++----------- discovery/discovery.go | 139 ++++++++++++++++++++++++++++ server/peer_server.go | 17 ++-- 8 files changed, 310 insertions(+), 158 deletions(-) delete mode 100644 Documentation/boostrap-protocol.md create mode 100644 Documentation/discovery-protocol.md delete mode 100644 bootstrap/bootstrap.go create mode 100644 discovery/discovery.go diff --git a/Documentation/boostrap-protocol.md b/Documentation/boostrap-protocol.md deleted file mode 100644 index c259f0329..000000000 --- a/Documentation/boostrap-protocol.md +++ /dev/null @@ -1,82 +0,0 @@ -# Bootstrap Protocol - -Bootstrapping an etcd cluster can be painful since each node needs to know of another node in the cluster to get started. If you are trying to bring up a cluster all at once, say using a cloud formation, you also need to coordinate who will be the initial cluster leader. The bootstrapping protocol helps you by providing a way to bootstrap an etcd instance using another already running instance. - -To enable use of this protocol you add the command line flag `-bootstrap-url` to your etcd args. In this example we will use `http://example.com/v2/keys/bootstrap/` as the URL prefix. - -## The Protocol - -By convention the etcd bootstrapping protocol uses the key prefix `bootstrap`. A full URL to the keyspace will be `http://example.com/v2/keys/bootstrap`. - -## Creating a New Cluster - -Generate a unique (secret) token that will identify the cluster and create a key called "_state". If you get a `201 Created` back then your key is unused and you can proceed with cluster creation. If the return value is `412 Precondition Failed` then you will need to create a new token. - -``` -UUID=$(uuidgen) -curl -X PUT "http://example.com/v2/keys/bootstrap/${UUID}/_state?prevExist=false?ttl=" -d value=init -``` - -## Bringing up Machines - -Now that you have your cluster ID you can start bringing up machines. Every machine will follow this protocol internally in etcd if given a `-bootstrap-url`. - -### Registering your Machine - -The first thing etcd must do is register your machine. This is done by using the machine name (from the `-name` arg) and posting it with a long TTL to the given key. - -``` -curl -X PUT "http://example.com/v2/keys/bootstrap/${UUID}/${etcd_machine_name}?ttl=604800" -d value=${peer_addr} -``` - -### Figuring out your Peers - -Now that this etcd machine is registered it must figure out its peers. - -But, the tricky bit of bootstrapping is that one machine needs to assume the initial role of leader and will have no peers. To figure out if another machine has taken on this etcd needs to update the `_state` key from "init" to "started": - -``` -curl -X PUT "http://example.com/v2/keys/bootstrap/${UUID}/_state?prevValue=init" -d value=started -``` - -If this returns a `200 OK` response then this machine is the initial leader and should start with no peers configured. If, however, this returns a `412 Precondition Failed` then you need to find all of the registered peers: - -``` -curl -X GET "http://example.com/v2/keys/bootstrap/${UUID}?recursive=true" -``` - -``` -{ - "action": "get", - "node": { - "createdIndex": 11, - "dir": true, - "key": "/bootstrap/9D4258A5-A1D3-4074-8837-31C1E091131D", - "modifiedIndex": 11, - "nodes": [ - { - "createdIndex": 16, - "expiration": "2014-02-03T13:19:57.631253589-08:00", - "key": "/bootstrap/9D4258A5-A1D3-4074-8837-31C1E091131D/peer1", - "modifiedIndex": 16, - "ttl": 604765, - "value": "127.0.0.1:7001" - }, - { - "createdIndex": 17, - "expiration": "2014-02-03T13:19:57.631253589-08:00", - "key": "/bootstrap/9D4258A5-A1D3-4074-8837-31C1E091131D/peer2", - "modifiedIndex": 17, - "ttl": 604765, - "value": "127.0.0.1:7002" - } - ] - } -} -``` - -Using this information you can connect to the rest of the peers in the cluster. - -### Heartbeating - -At this point you will want to heartbeat your registration URL every few hours. This will be done via a Go routine inside of etcd. \ No newline at end of file diff --git a/Documentation/configuration.md b/Documentation/configuration.md index 894a3a2f3..780769d61 100644 --- a/Documentation/configuration.md +++ b/Documentation/configuration.md @@ -19,7 +19,7 @@ configuration files. ### Optional * `-addr` - The advertised public hostname:port for client communication. Defaults to `127.0.0.1:4001`. -* `-bootstrap-url` - A URL to use for bootstrapping the peer list. (i.e `"https://bootstrap.etcd.io/unique-key"`). +* `-discovery` - A URL to use for discovering the peer list. (i.e `"https://discovery.etcd.io/your-unique-key"`). * `-bind-addr` - The listening hostname for client communication. Defaults to advertised ip. * `-peers` - A comma separated list of peers in the cluster (i.e `"203.0.113.101:7001,203.0.113.102:7001"`). * `-peers-file` - The file path containing a comma separated list of peers in the cluster. diff --git a/Documentation/discovery-protocol.md b/Documentation/discovery-protocol.md new file mode 100644 index 000000000..2d8a2c734 --- /dev/null +++ b/Documentation/discovery-protocol.md @@ -0,0 +1,82 @@ +# Discovery Protocol + +Starting an etcd cluster initially can be painful since each machine needs to know of at least one live machine in the cluster. If you are trying to bring up a cluster all at once, say using an AWS cloud formation, you also need to coordinate who will be the initial cluster leader. The discovery protocol helps you by providing a way to discover the peers in a new etcd cluster using another already running etcd cluster. + +To use this protocol you add the command line flag `-discovery` to your etcd args. In this example we will use `http://example.com/v2/keys/_etcd/registry` as the URL prefix. + +## The Protocol + +By convention the etcd discovery protocol uses the key prefix `_etcd/registry`. A full URL to the keyspace will be `http://example.com/v2/keys/_etcd/registry`. + +## Creating a New Cluster + +Generate a unique token that will identify the new cluster and create a key called "_state". If you get a `201 Created` back then your key is unused and you can proceed with cluster creation. If the return value is `412 Precondition Failed` then you will need to create a new token. + +``` +UUID=$(uuidgen) +curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/_state?prevExist=false" -d value=init +``` + +## Bringing up Machines + +Now that you have your cluster ID you can start bringing up machines. Every machine will follow this protocol internally in etcd if given a `-discovery`. + +### Registering your Machine + +The first thing etcd must do is register your machine. This is done by using the machine name (from the `-name` arg) and posting it with a long TTL to the given key. + +``` +curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/${etcd_machine_name}?ttl=604800" -d value=${peer_addr} +``` + +### Figuring out your Peers + +Now that this etcd machine is registered it must discover its peers. + +But, the tricky bit of starting a new cluster is that one machine needs to assume the initial role of leader and will have no peers. To figure out if another machine has already started the cluster etcd needs to update the `_state` key from "init" to "started": + +``` +curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/_state?prevValue=init" -d value=started +``` + +If this returns a `200 OK` response then this machine is the initial leader and should start with no peers configured. If, however, this returns a `412 Precondition Failed` then you need to find all of the registered peers: + +``` +curl -X GET "http://example.com/v2/keys/_etcd/registry/${UUID}?recursive=true" +``` + +``` +{ + "action": "get", + "node": { + "createdIndex": 11, + "dir": true, + "key": "/_etcd/registry/9D4258A5-A1D3-4074-8837-31C1E091131D", + "modifiedIndex": 11, + "nodes": [ + { + "createdIndex": 16, + "expiration": "2014-02-03T13:19:57.631253589-08:00", + "key": "/_etcd/registry/9D4258A5-A1D3-4074-8837-31C1E091131D/peer1", + "modifiedIndex": 16, + "ttl": 604765, + "value": "127.0.0.1:7001" + }, + { + "createdIndex": 17, + "expiration": "2014-02-03T13:19:57.631253589-08:00", + "key": "/_etcd/registry/9D4258A5-A1D3-4074-8837-31C1E091131D/peer2", + "modifiedIndex": 17, + "ttl": 604765, + "value": "127.0.0.1:7002" + } + ] + } +} +``` + +Using this information you can connect to the rest of the peers in the cluster. + +### Heartbeating + +At this point you will want to heartbeat your registration URL every few hours. This will be done via a Go routine inside of etcd. diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go deleted file mode 100644 index 6387bd8b5..000000000 --- a/bootstrap/bootstrap.go +++ /dev/null @@ -1,5 +0,0 @@ -package bootstrap - -func Do(c string) error { - panic(c) -} diff --git a/config/config.go b/config/config.go index ea886b3fa..ee066b3a2 100644 --- a/config/config.go +++ b/config/config.go @@ -15,10 +15,10 @@ import ( "github.com/coreos/etcd/third_party/github.com/BurntSushi/toml" - "github.com/coreos/etcd/bootstrap" + "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/log" - "github.com/coreos/etcd/server" ustrings "github.com/coreos/etcd/pkg/strings" + "github.com/coreos/etcd/server" ) // The default location for the etcd configuration file. @@ -52,12 +52,12 @@ type Config struct { Addr string `toml:"addr" env:"ETCD_ADDR"` BindAddr string `toml:"bind_addr" env:"ETCD_BIND_ADDR"` - BootstrapURL string `toml:"bootstrap_url" env:"ETCD_BOOTSTRAP_URL"` CAFile string `toml:"ca_file" env:"ETCD_CA_FILE"` CertFile string `toml:"cert_file" env:"ETCD_CERT_FILE"` CPUProfileFile string CorsOrigins []string `toml:"cors" env:"ETCD_CORS"` DataDir string `toml:"data_dir" env:"ETCD_DATA_DIR"` + Discovery string `toml:"discovery" env:"ETCD_DISCOVERY"` Force bool KeyFile string `toml:"key_file" env:"ETCD_KEY_FILE"` Peers []string `toml:"peers" env:"ETCD_PEERS"` @@ -138,17 +138,30 @@ func (c *Config) Load(arguments []string) error { return err } - if c.BootstrapURL != "" { - if err := bootstrap.Do(c.BootstrapURL); err != nil { - return nil - } - } - // Sanitize all the input fields. if err := c.Sanitize(); err != nil { return fmt.Errorf("sanitize: %v", err) } + // Attempt cluster discovery + if c.Discovery != "" { + p, err := discovery.Do(c.Discovery, c.Name, c.Peer.Addr) + if err != nil { + log.Fatalf("Bootstrapping encountered an unexpected error: %v", err) + } + + for i := range p { + // Strip the scheme off of the peer if it has one + // TODO(bp): clean this up! + purl, err := url.Parse(p[i]) + if err == nil { + p[i] = purl.Host + } + } + + c.Peers = p + } + // Force remove server configuration if specified. if c.Force { c.Reset() @@ -236,7 +249,7 @@ func (c *Config) LoadFlags(arguments []string) error { f.StringVar(&c.Name, "name", c.Name, "") f.StringVar(&c.Addr, "addr", c.Addr, "") - f.StringVar(&c.BootstrapURL, "bootstrap-url", c.BootstrapURL, "") + f.StringVar(&c.Discovery, "discovery", c.Discovery, "") f.StringVar(&c.BindAddr, "bind-addr", c.BindAddr, "") f.StringVar(&c.Peer.Addr, "peer-addr", c.Peer.Addr, "") f.StringVar(&c.Peer.BindAddr, "peer-bind-addr", c.Peer.BindAddr, "") diff --git a/config/config_test.go b/config/config_test.go index 5eb390511..96408bab0 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -18,6 +18,7 @@ func TestConfigTOML(t *testing.T) { cors = ["*"] cpu_profile_file = "XXX" data_dir = "/tmp/data" + discovery = "http://example.com/foobar" key_file = "/tmp/file.key" bind_addr = "127.0.0.1:4003" peers = ["coreos.com:4001", "coreos.com:4002"] @@ -37,7 +38,7 @@ func TestConfigTOML(t *testing.T) { key_file = "/tmp/peer/file.key" bind_addr = "127.0.0.1:7003" ` - c := NewConfig() + c := New() _, err := toml.Decode(content, &c) assert.Nil(t, err, "") assert.Equal(t, c.Addr, "127.0.0.1:4002", "") @@ -45,6 +46,7 @@ func TestConfigTOML(t *testing.T) { assert.Equal(t, c.CertFile, "/tmp/file.cert", "") assert.Equal(t, c.CorsOrigins, []string{"*"}, "") assert.Equal(t, c.DataDir, "/tmp/data", "") + assert.Equal(t, c.Discovery, "http://example.com/foobar", "") assert.Equal(t, c.KeyFile, "/tmp/file.key", "") assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "") assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "") @@ -70,6 +72,7 @@ func TestConfigEnv(t *testing.T) { os.Setenv("ETCD_CPU_PROFILE_FILE", "XXX") os.Setenv("ETCD_CORS", "localhost:4001,localhost:4002") os.Setenv("ETCD_DATA_DIR", "/tmp/data") + os.Setenv("ETCD_DISCOVERY", "http://example.com/foobar") os.Setenv("ETCD_KEY_FILE", "/tmp/file.key") os.Setenv("ETCD_BIND_ADDR", "127.0.0.1:4003") os.Setenv("ETCD_PEERS", "coreos.com:4001,coreos.com:4002") @@ -87,12 +90,13 @@ func TestConfigEnv(t *testing.T) { os.Setenv("ETCD_PEER_KEY_FILE", "/tmp/peer/file.key") os.Setenv("ETCD_PEER_BIND_ADDR", "127.0.0.1:7003") - c := NewConfig() + c := New() c.LoadEnv() assert.Equal(t, c.CAFile, "/tmp/file.ca", "") assert.Equal(t, c.CertFile, "/tmp/file.cert", "") assert.Equal(t, c.CorsOrigins, []string{"localhost:4001", "localhost:4002"}, "") assert.Equal(t, c.DataDir, "/tmp/data", "") + assert.Equal(t, c.Discovery, "http://example.com/foobar", "") assert.Equal(t, c.KeyFile, "/tmp/file.key", "") assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "") assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "") @@ -113,35 +117,35 @@ func TestConfigEnv(t *testing.T) { // Ensures that the "help" flag can be parsed. func TestConfigHelpFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-help"}), "") assert.True(t, c.ShowHelp) } // Ensures that the abbreviated "help" flag can be parsed. func TestConfigAbbreviatedHelpFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-h"}), "") assert.True(t, c.ShowHelp) } // Ensures that the "version" flag can be parsed. func TestConfigVersionFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-version"}), "") assert.True(t, c.ShowVersion) } // Ensures that the "force config" flag can be parsed. func TestConfigForceFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-force"}), "") assert.True(t, c.Force) } // Ensures that the abbreviated "force config" flag can be parsed. func TestConfigAbbreviatedForceFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-f"}), "") assert.True(t, c.Force) } @@ -156,7 +160,7 @@ func TestConfigAddrEnv(t *testing.T) { // Ensures that a the advertised flag can be parsed. func TestConfigAddrFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4002"}), "") assert.Equal(t, c.Addr, "127.0.0.1:4002", "") } @@ -171,7 +175,7 @@ func TestConfigCAFileEnv(t *testing.T) { // Ensures that a the CA file flag can be parsed. func TestConfigCAFileFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-ca-file", "/tmp/file.ca"}), "") assert.Equal(t, c.CAFile, "/tmp/file.ca", "") } @@ -186,7 +190,7 @@ func TestConfigCertFileEnv(t *testing.T) { // Ensures that a the Cert file flag can be parsed. func TestConfigCertFileFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-cert-file", "/tmp/file.cert"}), "") assert.Equal(t, c.CertFile, "/tmp/file.cert", "") } @@ -201,7 +205,7 @@ func TestConfigKeyFileEnv(t *testing.T) { // Ensures that a the Key file flag can be parsed. func TestConfigKeyFileFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-key-file", "/tmp/file.key"}), "") assert.Equal(t, c.KeyFile, "/tmp/file.key", "") } @@ -216,14 +220,14 @@ func TestConfigBindAddrEnv(t *testing.T) { // Ensures that a the Listen Host file flag can be parsed. func TestConfigBindAddrFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-bind-addr", "127.0.0.1:4003"}), "") assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "") } // Ensures that a the Listen Host port overrides the advertised port func TestConfigBindAddrOverride(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", "127.0.0.1:4010"}), "") assert.Nil(t, c.Sanitize()) assert.Equal(t, c.BindAddr, "127.0.0.1:4010", "") @@ -231,7 +235,7 @@ func TestConfigBindAddrOverride(t *testing.T) { // Ensures that a the Listen Host inherits its port from the advertised addr func TestConfigBindAddrInheritPort(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", "127.0.0.1"}), "") assert.Nil(t, c.Sanitize()) assert.Equal(t, c.BindAddr, "127.0.0.1:4009", "") @@ -239,7 +243,7 @@ func TestConfigBindAddrInheritPort(t *testing.T) { // Ensures that a port only argument errors out func TestConfigBindAddrErrorOnNoHost(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", ":4010"}), "") assert.Error(t, c.Sanitize()) } @@ -254,7 +258,7 @@ func TestConfigPeersEnv(t *testing.T) { // Ensures that a the Peers flag can be parsed. func TestConfigPeersFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-peers", "coreos.com:4001,coreos.com:4002"}), "") assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "") } @@ -269,7 +273,7 @@ func TestConfigPeersFileEnv(t *testing.T) { // Ensures that a the Peers File flag can be parsed. func TestConfigPeersFileFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-peers-file", "/tmp/peers"}), "") assert.Equal(t, c.PeersFile, "/tmp/peers", "") } @@ -284,7 +288,7 @@ func TestConfigMaxClusterSizeEnv(t *testing.T) { // Ensures that a the Max Cluster Size flag can be parsed. func TestConfigMaxClusterSizeFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-max-cluster-size", "5"}), "") assert.Equal(t, c.MaxClusterSize, 5, "") } @@ -299,7 +303,7 @@ func TestConfigMaxResultBufferEnv(t *testing.T) { // Ensures that a the Max Result Buffer flag can be parsed. func TestConfigMaxResultBufferFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-max-result-buffer", "512"}), "") assert.Equal(t, c.MaxResultBuffer, 512, "") } @@ -314,7 +318,7 @@ func TestConfigMaxRetryAttemptsEnv(t *testing.T) { // Ensures that a the Max Retry Attempts flag can be parsed. func TestConfigMaxRetryAttemptsFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-max-retry-attempts", "10"}), "") assert.Equal(t, c.MaxRetryAttempts, 10, "") } @@ -329,14 +333,14 @@ func TestConfigNameEnv(t *testing.T) { // Ensures that a the Name flag can be parsed. func TestConfigNameFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-name", "test-name"}), "") assert.Equal(t, c.Name, "test-name", "") } // Ensures that a Name gets guessed if not specified func TestConfigNameGuess(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{}), "") assert.Nil(t, c.Sanitize()) name, _ := os.Hostname() @@ -345,7 +349,7 @@ func TestConfigNameGuess(t *testing.T) { // Ensures that a DataDir gets guessed if not specified func TestConfigDataDirGuess(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{}), "") assert.Nil(t, c.Sanitize()) name, _ := os.Hostname() @@ -362,7 +366,7 @@ func TestConfigSnapshotEnv(t *testing.T) { // Ensures that a the Snapshot flag can be parsed. func TestConfigSnapshotFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-snapshot"}), "") assert.Equal(t, c.Snapshot, true, "") } @@ -377,7 +381,7 @@ func TestConfigVerboseEnv(t *testing.T) { // Ensures that a the Verbose flag can be parsed. func TestConfigVerboseFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-v"}), "") assert.Equal(t, c.Verbose, true, "") } @@ -392,7 +396,7 @@ func TestConfigVeryVerboseEnv(t *testing.T) { // Ensures that a the Very Verbose flag can be parsed. func TestConfigVeryVerboseFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-vv"}), "") assert.Equal(t, c.VeryVerbose, true, "") } @@ -407,7 +411,7 @@ func TestConfigPeerAddrEnv(t *testing.T) { // Ensures that a the Peer Advertised URL flag can be parsed. func TestConfigPeerAddrFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-peer-addr", "localhost:7002"}), "") assert.Equal(t, c.Peer.Addr, "localhost:7002", "") } @@ -422,7 +426,7 @@ func TestConfigPeerCAFileEnv(t *testing.T) { // Ensures that a the Peer CA file flag can be parsed. func TestConfigPeerCAFileFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-peer-ca-file", "/tmp/peer/file.ca"}), "") assert.Equal(t, c.Peer.CAFile, "/tmp/peer/file.ca", "") } @@ -437,7 +441,7 @@ func TestConfigPeerCertFileEnv(t *testing.T) { // Ensures that a the Cert file flag can be parsed. func TestConfigPeerCertFileFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-peer-cert-file", "/tmp/peer/file.cert"}), "") assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "") } @@ -452,7 +456,7 @@ func TestConfigPeerKeyFileEnv(t *testing.T) { // Ensures that a the Peer Key file flag can be parsed. func TestConfigPeerKeyFileFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-peer-key-file", "/tmp/peer/file.key"}), "") assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") } @@ -467,7 +471,7 @@ func TestConfigPeerBindAddrEnv(t *testing.T) { // Ensures that a bad flag returns an error. func TestConfigBadFlag(t *testing.T) { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-no-such-flag"}) assert.Error(t, err) assert.Equal(t, err.Error(), `flag provided but not defined: -no-such-flag`) @@ -475,7 +479,7 @@ func TestConfigBadFlag(t *testing.T) { // Ensures that a the Peer Listen Host file flag can be parsed. func TestConfigPeerBindAddrFlag(t *testing.T) { - c := NewConfig() + c := New() assert.Nil(t, c.LoadFlags([]string{"-peer-bind-addr", "127.0.0.1:4003"}), "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:4003", "") } @@ -486,7 +490,7 @@ func TestConfigCustomConfigOverrideSystemConfig(t *testing.T) { custom := `addr = "127.0.0.1:6000"` withTempFile(system, func(p1 string) { withTempFile(custom, func(p2 string) { - c := NewConfig() + c := New() c.SystemPath = p1 assert.Nil(t, c.Load([]string{"-config", p2}), "") assert.Equal(t, c.Addr, "http://127.0.0.1:6000", "") @@ -501,7 +505,7 @@ func TestConfigEnvVarOverrideCustomConfig(t *testing.T) { custom := `[peer]` + "\n" + `advertised_url = "127.0.0.1:9000"` withTempFile(custom, func(path string) { - c := NewConfig() + c := New() c.SystemPath = "" assert.Nil(t, c.Load([]string{"-config", path}), "") assert.Equal(t, c.Peer.Addr, "http://127.0.0.1:8000", "") @@ -513,7 +517,7 @@ func TestConfigCLIArgsOverrideEnvVar(t *testing.T) { os.Setenv("ETCD_ADDR", "127.0.0.1:1000") defer os.Setenv("ETCD_ADDR", "") - c := NewConfig() + c := New() c.SystemPath = "" assert.Nil(t, c.Load([]string{"-addr", "127.0.0.1:2000"}), "") assert.Equal(t, c.Addr, "http://127.0.0.1:2000", "") @@ -525,7 +529,7 @@ func TestConfigCLIArgsOverrideEnvVar(t *testing.T) { func TestConfigDeprecatedAddrFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-c", "127.0.0.1:4002"}) assert.NoError(t, err) assert.Equal(t, c.Addr, "127.0.0.1:4002") @@ -535,7 +539,7 @@ func TestConfigDeprecatedAddrFlag(t *testing.T) { func TestConfigDeprecatedBindAddrFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-cl", "127.0.0.1:4003"}) assert.NoError(t, err) assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "") @@ -545,7 +549,7 @@ func TestConfigDeprecatedBindAddrFlag(t *testing.T) { func TestConfigDeprecatedCAFileFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-clientCAFile", "/tmp/file.ca"}) assert.NoError(t, err) assert.Equal(t, c.CAFile, "/tmp/file.ca", "") @@ -555,7 +559,7 @@ func TestConfigDeprecatedCAFileFlag(t *testing.T) { func TestConfigDeprecatedCertFileFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-clientCert", "/tmp/file.cert"}) assert.NoError(t, err) assert.Equal(t, c.CertFile, "/tmp/file.cert", "") @@ -565,7 +569,7 @@ func TestConfigDeprecatedCertFileFlag(t *testing.T) { func TestConfigDeprecatedKeyFileFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-clientKey", "/tmp/file.key"}) assert.NoError(t, err) assert.Equal(t, c.KeyFile, "/tmp/file.key", "") @@ -575,7 +579,7 @@ func TestConfigDeprecatedKeyFileFlag(t *testing.T) { func TestConfigDeprecatedPeersFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-C", "coreos.com:4001,coreos.com:4002"}) assert.NoError(t, err) assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "") @@ -585,7 +589,7 @@ func TestConfigDeprecatedPeersFlag(t *testing.T) { func TestConfigDeprecatedPeersFileFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-CF", "/tmp/machines"}) assert.NoError(t, err) assert.Equal(t, c.PeersFile, "/tmp/machines", "") @@ -595,7 +599,7 @@ func TestConfigDeprecatedPeersFileFlag(t *testing.T) { func TestConfigDeprecatedMaxClusterSizeFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-maxsize", "5"}) assert.NoError(t, err) assert.Equal(t, c.MaxClusterSize, 5, "") @@ -605,7 +609,7 @@ func TestConfigDeprecatedMaxClusterSizeFlag(t *testing.T) { func TestConfigDeprecatedMaxResultBufferFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-m", "512"}) assert.NoError(t, err) assert.Equal(t, c.MaxResultBuffer, 512, "") @@ -615,7 +619,7 @@ func TestConfigDeprecatedMaxResultBufferFlag(t *testing.T) { func TestConfigDeprecatedMaxRetryAttemptsFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-r", "10"}) assert.NoError(t, err) assert.Equal(t, c.MaxRetryAttempts, 10, "") @@ -625,7 +629,7 @@ func TestConfigDeprecatedMaxRetryAttemptsFlag(t *testing.T) { func TestConfigDeprecatedNameFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-n", "test-name"}) assert.NoError(t, err) assert.Equal(t, c.Name, "test-name", "") @@ -635,7 +639,7 @@ func TestConfigDeprecatedNameFlag(t *testing.T) { func TestConfigDeprecatedPeerAddrFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-s", "localhost:7002"}) assert.NoError(t, err) assert.Equal(t, c.Peer.Addr, "localhost:7002", "") @@ -645,7 +649,7 @@ func TestConfigDeprecatedPeerAddrFlag(t *testing.T) { func TestConfigDeprecatedPeerBindAddrFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-sl", "127.0.0.1:4003"}) assert.NoError(t, err) assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:4003", "") @@ -655,7 +659,7 @@ func TestConfigDeprecatedPeerBindAddrFlag(t *testing.T) { func TestConfigDeprecatedPeerCAFileFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-serverCAFile", "/tmp/peer/file.ca"}) assert.NoError(t, err) assert.Equal(t, c.Peer.CAFile, "/tmp/peer/file.ca", "") @@ -665,7 +669,7 @@ func TestConfigDeprecatedPeerCAFileFlag(t *testing.T) { func TestConfigDeprecatedPeerCertFileFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-serverCert", "/tmp/peer/file.cert"}) assert.NoError(t, err) assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "") @@ -675,7 +679,7 @@ func TestConfigDeprecatedPeerCertFileFlag(t *testing.T) { func TestConfigDeprecatedPeerKeyFileFlag(t *testing.T) { _, stderr := capture(func() { - c := NewConfig() + c := New() err := c.LoadFlags([]string{"-serverKey", "/tmp/peer/file.key"}) assert.NoError(t, err) assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") @@ -691,7 +695,7 @@ func TestConfigDeprecatedPeerKeyFileFlag(t *testing.T) { func withEnv(key, value string, f func(c *Config)) { os.Setenv(key, value) defer os.Setenv(key, "") - c := NewConfig() + c := New() f(c) } diff --git a/discovery/discovery.go b/discovery/discovery.go new file mode 100644 index 000000000..ea3b56812 --- /dev/null +++ b/discovery/discovery.go @@ -0,0 +1,139 @@ +package discovery + +import ( + "errors" + "fmt" + "net/url" + "path" + "strings" + "time" + + "github.com/coreos/etcd/log" + "github.com/coreos/go-etcd/etcd" +) + +const ( + stateKey = "_state" + initState = "init" + startedState = "started" + defaultTTL = 604800 // One week TTL +) + +type Discoverer struct { + client *etcd.Client + name string + peer string + prefix string + discoveryURL string +} + +var defaultDiscoverer *Discoverer + +func init() { + defaultDiscoverer = &Discoverer{} +} + +func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []string, err error) { + d.name = name + d.peer = peer + d.discoveryURL = discoveryURL + + u, err := url.Parse(discoveryURL) + + if err != nil { + return + } + + // prefix is appended to all keys + d.prefix = strings.TrimPrefix(u.Path, "/v2/keys/") + + // Connect to a scheme://host not a full URL with path + u.Path = "" + log.Infof("Bootstrapping via %s using prefix %s.", u.String(), d.prefix) + d.client = etcd.NewClient([]string{u.String()}) + + // Register this machine first and announce that we are a member of + // this cluster + err = d.heartbeat() + if err != nil { + return + } + + // Start the very slow heartbeat to the cluster now in anticipation + // that everything is going to go alright now + go d.startHeartbeat() + + // Attempt to take the leadership role, if there is no error we are it! + resp, err := d.client.CompareAndSwap(path.Join(d.prefix, stateKey), startedState, 0, initState, 0) + + // Bail out on unexpected errors + if err != nil { + if etcdErr, ok := err.(etcd.EtcdError); !ok || etcdErr.ErrorCode != 101 { + return nil, err + } + } + + // If we got a response then the CAS was successful, we are leader + if resp != nil && resp.Node.Value == startedState { + // We are the leader, we have no peers + log.Infof("Bootstrapping was in 'init' state this machine is the initial leader.") + return nil, nil + } + + // Fall through to finding the other discoveryped peers + return d.findPeers() +} + +func (d *Discoverer) findPeers() (peers []string, err error) { + resp, err := d.client.Get(path.Join(d.prefix), false, true) + if err != nil { + return nil, err + } + + node := resp.Node + + if node == nil { + return nil, errors.New(fmt.Sprintf("%s key doesn't exist.", d.prefix)) + } + + for _, n := range node.Nodes { + // Skip our own entry in the list, there is no point + if strings.HasSuffix(n.Key, "/"+d.name) { + continue + } + peers = append(peers, n.Value) + } + + if len(peers) == 0 { + return nil, errors.New("No peers found.") + } + + log.Infof("Bootstrap found peers %v", peers) + + return +} + +func (d *Discoverer) startHeartbeat() { + // In case of errors we should attempt to heartbeat fairly frequently + heartbeatInterval := defaultTTL / 8 + ticker := time.Tick(time.Second * time.Duration(heartbeatInterval)) + for { + select { + case <-ticker: + err := d.heartbeat() + if err != nil { + log.Warnf("Bootstrapping heartbeat failed: %v", err) + } + } + } +} + +func (d *Discoverer) heartbeat() error { + _, err := d.client.Set(path.Join(d.prefix, d.name), d.peer, defaultTTL) + + return err +} + +func Do(discoveryURL string, name string, peer string) ([]string, error) { + return defaultDiscoverer.Do(discoveryURL, name, peer) +} diff --git a/server/peer_server.go b/server/peer_server.go index 6cb16c7de..38b0e1f02 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -209,7 +209,7 @@ func (s *PeerServer) startAsFollower(cluster []string) { if ok { return } - log.Warnf("cannot join to cluster via given peers, retry in %d seconds", retryInterval) + log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %d seconds", cluster, retryInterval) time.Sleep(time.Second * retryInterval) } @@ -266,17 +266,18 @@ func (s *PeerServer) joinCluster(cluster []string) bool { err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme) if err == nil { - log.Debugf("%s success join to the cluster via peer %s", s.Config.Name, peer) + log.Debugf("%s joined the cluster via peer %s", s.Config.Name, peer) return true - } else { - if _, ok := err.(etcdErr.Error); ok { - log.Fatal(err) - } - - log.Debugf("cannot join to cluster via peer %s %s", peer, err) } + + if _, ok := err.(etcdErr.Error); ok { + log.Fatal(err) + } + + log.Warnf("Attempt to join via %s failed: %s", peer, err) } + return false } From 40021ab72e179982d68d76ae6ee38f7a01e9b5da Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 2 Feb 2014 17:41:30 -0800 Subject: [PATCH 06/11] bump(github.com/coreos/go-etcd): 526d936ffe75284ca80290ea6386f883f573c232 --- etcd.go | 2 +- .../github.com/coreos/go-etcd/etcd/client.go | 389 +++++++++--------- .../coreos/go-etcd/etcd/client_test.go | 6 +- .../github.com/coreos/go-etcd/etcd/cluster.go | 51 +++ .../coreos/go-etcd/etcd/compare_and_delete.go | 34 ++ .../go-etcd/etcd/compare_and_delete_test.go | 46 +++ .../coreos/go-etcd/etcd/compare_and_swap.go | 15 +- .../coreos/go-etcd/etcd/config.json | 1 + .../github.com/coreos/go-etcd/etcd/debug.go | 65 ++- .../github.com/coreos/go-etcd/etcd/delete.go | 6 +- .../github.com/coreos/go-etcd/etcd/error.go | 10 +- .../coreos/go-etcd/etcd/get_test.go | 82 ++-- .../github.com/coreos/go-etcd/etcd/options.go | 2 + .../coreos/go-etcd/etcd/requests.go | 60 +-- .../coreos/go-etcd/etcd/requests_test.go | 50 +++ .../github.com/coreos/go-etcd/etcd/utils.go | 33 -- 16 files changed, 519 insertions(+), 333 deletions(-) create mode 100644 third_party/github.com/coreos/go-etcd/etcd/cluster.go create mode 100644 third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go create mode 100644 third_party/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go create mode 100644 third_party/github.com/coreos/go-etcd/etcd/config.json create mode 100644 third_party/github.com/coreos/go-etcd/etcd/requests_test.go delete mode 100644 third_party/github.com/coreos/go-etcd/etcd/utils.go diff --git a/etcd.go b/etcd.go index 5edee4c16..25455c60c 100644 --- a/etcd.go +++ b/etcd.go @@ -26,8 +26,8 @@ import ( "github.com/coreos/etcd/third_party/github.com/coreos/raft" - ehttp "github.com/coreos/etcd/http" "github.com/coreos/etcd/config" + ehttp "github.com/coreos/etcd/http" "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" "github.com/coreos/etcd/server" diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index caa5fe2be..48ba9b423 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -12,15 +12,9 @@ import ( "net/url" "os" "path" - "strings" "time" ) -const ( - HTTP = iota - HTTPS -) - // See SetConsistency for how to use these constants. const ( // Using strings rather than iota because the consistency level @@ -34,45 +28,27 @@ const ( defaultBufferSize = 10 ) -type Cluster struct { - Leader string `json:"leader"` - Machines []string `json:"machines"` -} - type Config struct { CertFile string `json:"certFile"` KeyFile string `json:"keyFile"` - CaCertFile string `json:"caCertFile"` - Scheme string `json:"scheme"` + CaCertFile []string `json:"caCertFiles"` Timeout time.Duration `json:"timeout"` Consistency string `json: "consistency"` } type Client struct { - cluster Cluster `json:"cluster"` - config Config `json:"config"` + config Config `json:"config"` + cluster *Cluster `json:"cluster"` httpClient *http.Client persistence io.Writer cURLch chan string + keyPrefix string } // NewClient create a basic client that is configured to be used // with the given machine list. func NewClient(machines []string) *Client { - // if an empty slice was sent in then just assume localhost - if len(machines) == 0 { - machines = []string{"http://127.0.0.1:4001"} - } - - // default leader and machines - cluster := Cluster{ - Leader: machines[0], - Machines: machines, - } - config := Config{ - // default use http - Scheme: "http", // default timeout is one second Timeout: time.Second, // default consistency level is STRONG @@ -80,75 +56,146 @@ func NewClient(machines []string) *Client { } client := &Client{ - cluster: cluster, - config: config, + cluster: NewCluster(machines), + config: config, + keyPrefix: path.Join(version, "keys"), } - err := setupHttpClient(client) - if err != nil { - panic(err) - } + client.initHTTPClient() + client.saveConfig() return client } -// NewClientFile creates a client from a given file path. +// NewTLSClient create a basic client with TLS configuration +func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error) { + // overwrite the default machine to use https + if len(machines) == 0 { + machines = []string{"https://127.0.0.1:4001"} + } + + config := Config{ + // default timeout is one second + Timeout: time.Second, + // default consistency level is STRONG + Consistency: STRONG_CONSISTENCY, + CertFile: cert, + KeyFile: key, + CaCertFile: make([]string, 0), + } + + client := &Client{ + cluster: NewCluster(machines), + config: config, + keyPrefix: path.Join(version, "keys"), + } + + err := client.initHTTPSClient(cert, key) + if err != nil { + return nil, err + } + + err = client.AddRootCA(caCert) + + client.saveConfig() + + return client, nil +} + +// NewClientFromFile creates a client from a given file path. // The given file is expected to use the JSON format. -func NewClientFile(fpath string) (*Client, error) { +func NewClientFromFile(fpath string) (*Client, error) { fi, err := os.Open(fpath) if err != nil { return nil, err } + defer func() { if err := fi.Close(); err != nil { panic(err) } }() - return NewClientReader(fi) + return NewClientFromReader(fi) } -// NewClientReader creates a Client configured from a given reader. -// The config is expected to use the JSON format. -func NewClientReader(reader io.Reader) (*Client, error) { - var client Client +// NewClientFromReader creates a Client configured from a given reader. +// The configuration is expected to use the JSON format. +func NewClientFromReader(reader io.Reader) (*Client, error) { + c := new(Client) b, err := ioutil.ReadAll(reader) if err != nil { return nil, err } - err = json.Unmarshal(b, &client) + err = json.Unmarshal(b, c) + if err != nil { + return nil, err + } + if c.config.CertFile == "" { + c.initHTTPClient() + } else { + err = c.initHTTPSClient(c.config.CertFile, c.config.KeyFile) + } + if err != nil { return nil, err } - err = setupHttpClient(&client) - if err != nil { - return nil, err + for _, caCert := range c.config.CaCertFile { + if err := c.AddRootCA(caCert); err != nil { + return nil, err + } } - return &client, nil + return c, nil } -func setupHttpClient(client *Client) error { - if client.config.CertFile != "" && client.config.KeyFile != "" { - err := client.SetCertAndKey(client.config.CertFile, client.config.KeyFile, client.config.CaCertFile) - if err != nil { - return err - } - } else { - client.config.CertFile = "" - client.config.KeyFile = "" - tr := &http.Transport{ - Dial: dialTimeout, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - } - client.httpClient = &http.Client{Transport: tr} +// Override the Client's HTTP Transport object +func (c *Client) SetTransport(tr *http.Transport) { + c.httpClient.Transport = tr +} + +// SetKeyPrefix changes the key prefix from the default `/v2/keys` to whatever +// is set. +func (c *Client) SetKeyPrefix(prefix string) { + c.keyPrefix = prefix +} + +// initHTTPClient initializes a HTTP client for etcd client +func (c *Client) initHTTPClient() { + tr := &http.Transport{ + Dial: dialTimeout, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + c.httpClient = &http.Client{Transport: tr} +} + +// initHTTPClient initializes a HTTPS client for etcd client +func (c *Client) initHTTPSClient(cert, key string) error { + if cert == "" || key == "" { + return errors.New("Require both cert and key path") } + tlsCert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return err + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + InsecureSkipVerify: true, + } + + tr := &http.Transport{ + TLSClientConfig: tlsConfig, + Dial: dialTimeout, + } + + c.httpClient = &http.Client{Transport: tr} return nil } @@ -179,114 +226,45 @@ func (c *Client) SetConsistency(consistency string) error { return nil } -// MarshalJSON implements the Marshaller interface -// as defined by the standard JSON package. -func (c *Client) MarshalJSON() ([]byte, error) { - b, err := json.Marshal(struct { - Config Config `json:"config"` - Cluster Cluster `json:"cluster"` - }{ - Config: c.config, - Cluster: c.cluster, - }) - - if err != nil { - return nil, err +// AddRootCA adds a root CA cert for the etcd client +func (c *Client) AddRootCA(caCert string) error { + if c.httpClient == nil { + return errors.New("Client has not been initialized yet!") } - return b, nil -} - -// UnmarshalJSON implements the Unmarshaller interface -// as defined by the standard JSON package. -func (c *Client) UnmarshalJSON(b []byte) error { - temp := struct { - Config Config `json: "config"` - Cluster Cluster `json: "cluster"` - }{} - err := json.Unmarshal(b, &temp) + certBytes, err := ioutil.ReadFile(caCert) if err != nil { return err } - c.cluster = temp.Cluster - c.config = temp.Config - return nil -} + tr, ok := c.httpClient.Transport.(*http.Transport) -// saveConfig saves the current config using c.persistence. -func (c *Client) saveConfig() error { - if c.persistence != nil { - b, err := json.Marshal(c) - if err != nil { - return err - } - - _, err = c.persistence.Write(b) - if err != nil { - return err - } + if !ok { + panic("AddRootCA(): Transport type assert should not fail") } - return nil + if tr.TLSClientConfig.RootCAs == nil { + caCertPool := x509.NewCertPool() + ok = caCertPool.AppendCertsFromPEM(certBytes) + if ok { + tr.TLSClientConfig.RootCAs = caCertPool + } + tr.TLSClientConfig.InsecureSkipVerify = false + } else { + ok = tr.TLSClientConfig.RootCAs.AppendCertsFromPEM(certBytes) + } + + if !ok { + err = errors.New("Unable to load caCert") + } + + c.config.CaCertFile = append(c.config.CaCertFile, caCert) + c.saveConfig() + + return err } -func (c *Client) SetCertAndKey(cert string, key string, caCert string) error { - if cert != "" && key != "" { - tlsCert, err := tls.LoadX509KeyPair(cert, key) - - if err != nil { - return err - } - - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - } - - if caCert != "" { - caCertPool := x509.NewCertPool() - - certBytes, err := ioutil.ReadFile(caCert) - if err != nil { - return err - } - - if !caCertPool.AppendCertsFromPEM(certBytes) { - return errors.New("Unable to load caCert") - } - - tlsConfig.RootCAs = caCertPool - } else { - tlsConfig.InsecureSkipVerify = true - } - - tr := &http.Transport{ - TLSClientConfig: tlsConfig, - Dial: dialTimeout, - } - - c.httpClient = &http.Client{Transport: tr} - c.saveConfig() - return nil - } - return errors.New("Require both cert and key path") -} - -func (c *Client) SetScheme(scheme int) error { - if scheme == HTTP { - c.config.Scheme = "http" - c.saveConfig() - return nil - } - if scheme == HTTPS { - c.config.Scheme = "https" - c.saveConfig() - return nil - } - return errors.New("Unknown Scheme") -} - -// SetCluster updates config using the given machine list. +// SetCluster updates cluster information using the given machine list. func (c *Client) SetCluster(machines []string) bool { success := c.internalSyncCluster(machines) return success @@ -296,16 +274,15 @@ func (c *Client) GetCluster() []string { return c.cluster.Machines } -// SyncCluster updates config using the internal machine list. +// SyncCluster updates the cluster information using the internal machine list. func (c *Client) SyncCluster() bool { - success := c.internalSyncCluster(c.cluster.Machines) - return success + return c.internalSyncCluster(c.cluster.Machines) } // internalSyncCluster syncs cluster information using the given machine list. func (c *Client) internalSyncCluster(machines []string) bool { for _, machine := range machines { - httpPath := c.createHttpPath(machine, version+"/machines") + httpPath := c.createHttpPath(machine, path.Join(version, "machines")) resp, err := c.httpClient.Get(httpPath) if err != nil { // try another machine in the cluster @@ -319,12 +296,11 @@ func (c *Client) internalSyncCluster(machines []string) bool { } // update Machines List - c.cluster.Machines = strings.Split(string(b), ", ") + c.cluster.updateFromStr(string(b)) // update leader // the first one in the machine list is the leader - logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0]) - c.cluster.Leader = c.cluster.Machines[0] + c.cluster.switchLeader(0) logger.Debug("sync.machines ", c.cluster.Machines) c.saveConfig() @@ -337,8 +313,12 @@ func (c *Client) internalSyncCluster(machines []string) bool { // createHttpPath creates a complete HTTP URL. // serverName should contain both the host name and a port number, if any. func (c *Client) createHttpPath(serverName string, _path string) string { - u, _ := url.Parse(serverName) - u.Path = path.Join(u.Path, "/", _path) + u, err := url.Parse(serverName) + if err != nil { + panic(err) + } + + u.Path = path.Join(u.Path, _path) if u.Scheme == "" { u.Scheme = "http" @@ -351,27 +331,6 @@ func dialTimeout(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, time.Second) } -func (c *Client) updateLeader(u *url.URL) { - var leader string - if u.Scheme == "" { - leader = "http://" + u.Host - } else { - leader = u.Scheme + "://" + u.Host - } - - logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader) - c.cluster.Leader = leader - c.saveConfig() -} - -// switchLeader switch the current leader to machines[num] -func (c *Client) switchLeader(num int) { - logger.Debugf("switch.leader[from %v to %v]", - c.cluster.Leader, c.cluster.Machines[num]) - - c.cluster.Leader = c.cluster.Machines[num] -} - func (c *Client) OpenCURL() { c.cURLch = make(chan string, defaultBufferSize) } @@ -392,3 +351,55 @@ func (c *Client) sendCURL(command string) { func (c *Client) RecvCURL() string { return <-c.cURLch } + +// saveConfig saves the current config using c.persistence. +func (c *Client) saveConfig() error { + if c.persistence != nil { + b, err := json.Marshal(c) + if err != nil { + return err + } + + _, err = c.persistence.Write(b) + if err != nil { + return err + } + } + + return nil +} + +// MarshalJSON implements the Marshaller interface +// as defined by the standard JSON package. +func (c *Client) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(struct { + Config Config `json:"config"` + Cluster *Cluster `json:"cluster"` + }{ + Config: c.config, + Cluster: c.cluster, + }) + + if err != nil { + return nil, err + } + + return b, nil +} + +// UnmarshalJSON implements the Unmarshaller interface +// as defined by the standard JSON package. +func (c *Client) UnmarshalJSON(b []byte) error { + temp := struct { + Config Config `json: "config"` + Cluster *Cluster `json: "cluster"` + }{} + err := json.Unmarshal(b, &temp) + if err != nil { + return err + } + + c.cluster = temp.Cluster + c.config = temp.Config + return nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/client_test.go b/third_party/github.com/coreos/go-etcd/etcd/client_test.go index b25611b22..c245e4798 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client_test.go @@ -14,7 +14,9 @@ import ( func TestSync(t *testing.T) { fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003") - c := NewClient(nil) + // Explicit trailing slash to ensure this doesn't reproduce: + // https://github.com/coreos/go-etcd/issues/82 + c := NewClient([]string{"http://127.0.0.1:4001/"}) success := c.SyncCluster() if !success { @@ -79,7 +81,7 @@ func TestPersistence(t *testing.T) { t.Fatal(err) } - c2, err := NewClientFile("config.json") + c2, err := NewClientFromFile("config.json") if err != nil { t.Fatal(err) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/cluster.go b/third_party/github.com/coreos/go-etcd/etcd/cluster.go new file mode 100644 index 000000000..aaa20546e --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/cluster.go @@ -0,0 +1,51 @@ +package etcd + +import ( + "net/url" + "strings" +) + +type Cluster struct { + Leader string `json:"leader"` + Machines []string `json:"machines"` +} + +func NewCluster(machines []string) *Cluster { + // if an empty slice was sent in then just assume HTTP 4001 on localhost + if len(machines) == 0 { + machines = []string{"http://127.0.0.1:4001"} + } + + // default leader and machines + return &Cluster{ + Leader: machines[0], + Machines: machines, + } +} + +// switchLeader switch the current leader to machines[num] +func (cl *Cluster) switchLeader(num int) { + logger.Debugf("switch.leader[from %v to %v]", + cl.Leader, cl.Machines[num]) + + cl.Leader = cl.Machines[num] +} + +func (cl *Cluster) updateFromStr(machines string) { + cl.Machines = strings.Split(machines, ", ") +} + +func (cl *Cluster) updateLeader(leader string) { + logger.Debugf("update.leader[%s,%s]", cl.Leader, leader) + cl.Leader = leader +} + +func (cl *Cluster) updateLeaderFromURL(u *url.URL) { + var leader string + if u.Scheme == "" { + leader = "http://" + u.Host + } else { + leader = u.Scheme + "://" + u.Host + } + cl.updateLeader(leader) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go new file mode 100644 index 000000000..924778ddb --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go @@ -0,0 +1,34 @@ +package etcd + +import "fmt" + +func (c *Client) CompareAndDelete(key string, prevValue string, prevIndex uint64) (*Response, error) { + raw, err := c.RawCompareAndDelete(key, prevValue, prevIndex) + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +func (c *Client) RawCompareAndDelete(key string, prevValue string, prevIndex uint64) (*RawResponse, error) { + if prevValue == "" && prevIndex == 0 { + return nil, fmt.Errorf("You must give either prevValue or prevIndex.") + } + + options := options{} + if prevValue != "" { + options["prevValue"] = prevValue + } + if prevIndex != 0 { + options["prevIndex"] = prevIndex + } + + raw, err := c.delete(key, options) + + if err != nil { + return nil, err + } + + return raw, err +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go new file mode 100644 index 000000000..223e50f29 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go @@ -0,0 +1,46 @@ +package etcd + +import ( + "testing" +) + +func TestCompareAndDelete(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + }() + + c.Set("foo", "bar", 5) + + // This should succeed an correct prevValue + resp, err := c.CompareAndDelete("foo", "bar", 0) + if err != nil { + t.Fatal(err) + } + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndDelete 1 prevNode failed: %#v", resp) + } + + resp, _ = c.Set("foo", "bar", 5) + // This should fail because it gives an incorrect prevValue + _, err = c.CompareAndDelete("foo", "xxx", 0) + if err == nil { + t.Fatalf("CompareAndDelete 2 should have failed. The response is: %#v", resp) + } + + // This should succeed because it gives an correct prevIndex + resp, err = c.CompareAndDelete("foo", "", resp.Node.ModifiedIndex) + if err != nil { + t.Fatal(err) + } + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndSwap 3 prevNode failed: %#v", resp) + } + + c.Set("foo", "bar", 5) + // This should fail because it gives an incorrect prevIndex + resp, err = c.CompareAndDelete("foo", "", 29817514) + if err == nil { + t.Fatalf("CompareAndDelete 4 should have failed. The response is: %#v", resp) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go index 4099d1348..0beaee57f 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go @@ -2,7 +2,18 @@ package etcd import "fmt" -func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*Response, error) { +func (c *Client) CompareAndSwap(key string, value string, ttl uint64, + prevValue string, prevIndex uint64) (*Response, error) { + raw, err := c.RawCompareAndSwap(key, value, ttl, prevValue, prevIndex) + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +func (c *Client) RawCompareAndSwap(key string, value string, ttl uint64, + prevValue string, prevIndex uint64) (*RawResponse, error) { if prevValue == "" && prevIndex == 0 { return nil, fmt.Errorf("You must give either prevValue or prevIndex.") } @@ -21,5 +32,5 @@ func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue return nil, err } - return raw.toResponse() + return raw, err } diff --git a/third_party/github.com/coreos/go-etcd/etcd/config.json b/third_party/github.com/coreos/go-etcd/etcd/config.json new file mode 100644 index 000000000..0e271b184 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/config.json @@ -0,0 +1 @@ +{"config":{"certFile":"","keyFile":"","caCertFiles":null,"timeout":1000000000,"Consistency":"STRONG"},"cluster":{"leader":"http://127.0.0.1:4001","machines":["http://127.0.0.1:4001","http://127.0.0.1:4002"]}} \ No newline at end of file diff --git a/third_party/github.com/coreos/go-etcd/etcd/debug.go b/third_party/github.com/coreos/go-etcd/etcd/debug.go index a4ee8b0d2..fce23f07f 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/debug.go +++ b/third_party/github.com/coreos/go-etcd/etcd/debug.go @@ -1,28 +1,53 @@ package etcd import ( - "os" - - "github.com/coreos/etcd/third_party/github.com/coreos/go-log/log" + "io/ioutil" + "log" + "strings" ) -var logger *log.Logger +type Logger interface { + Debug(args ...interface{}) + Debugf(fmt string, args ...interface{}) + Warning(args ...interface{}) + Warningf(fmt string, args ...interface{}) +} + +var logger Logger + +func SetLogger(log Logger) { + logger = log +} + +func GetLogger() Logger { + return logger +} + +type defaultLogger struct { + log *log.Logger +} + +func (p *defaultLogger) Debug(args ...interface{}) { + p.log.Println(args) +} + +func (p *defaultLogger) Debugf(fmt string, args ...interface{}) { + // Append newline if necessary + if !strings.HasSuffix(fmt, "\n") { + fmt = fmt + "\n" + } + p.log.Printf(fmt, args) +} + +func (p *defaultLogger) Warning(args ...interface{}) { + p.Debug(args) +} + +func (p *defaultLogger) Warningf(fmt string, args ...interface{}) { + p.Debugf(fmt, args) +} func init() { - setLogger(log.PriErr) -} - -func OpenDebug() { - setLogger(log.PriDebug) -} - -func CloseDebug() { - setLogger(log.PriErr) -} - -func setLogger(priority log.Priority) { - logger = log.NewSimple( - log.PriorityFilter( - priority, - log.WriterSink(os.Stdout, log.BasicFormat, log.BasicFields))) + // Default logger uses the go default log. + SetLogger(&defaultLogger{log.New(ioutil.Discard, "go-etcd", log.LstdFlags)}) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete.go b/third_party/github.com/coreos/go-etcd/etcd/delete.go index 51d1c8546..6c60e4df3 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete.go @@ -10,7 +10,7 @@ package etcd // then everything under the directory (including all child directories) // will be deleted. func (c *Client) Delete(key string, recursive bool) (*Response, error) { - raw, err := c.DeleteRaw(key, recursive, false) + raw, err := c.RawDelete(key, recursive, false) if err != nil { return nil, err @@ -21,7 +21,7 @@ func (c *Client) Delete(key string, recursive bool) (*Response, error) { // DeleteDir deletes an empty directory or a key value pair func (c *Client) DeleteDir(key string) (*Response, error) { - raw, err := c.DeleteRaw(key, false, true) + raw, err := c.RawDelete(key, false, true) if err != nil { return nil, err @@ -30,7 +30,7 @@ func (c *Client) DeleteDir(key string) (*Response, error) { return raw.toResponse() } -func (c *Client) DeleteRaw(key string, recursive bool, dir bool) (*RawResponse, error) { +func (c *Client) RawDelete(key string, recursive bool, dir bool) (*RawResponse, error) { ops := options{ "recursive": recursive, "dir": dir, diff --git a/third_party/github.com/coreos/go-etcd/etcd/error.go b/third_party/github.com/coreos/go-etcd/etcd/error.go index 9a33d1665..7e6928724 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/error.go +++ b/third_party/github.com/coreos/go-etcd/etcd/error.go @@ -36,9 +36,13 @@ func newError(errorCode int, cause string, index uint64) *EtcdError { } func handleError(b []byte) error { - var err EtcdError + etcdErr := new(EtcdError) - json.Unmarshal(b, &err) + err := json.Unmarshal(b, etcdErr) + if err != nil { + logger.Warningf("cannot unmarshal etcd error: %v", err) + return err + } - return err + return etcdErr } diff --git a/third_party/github.com/coreos/go-etcd/etcd/get_test.go b/third_party/github.com/coreos/go-etcd/etcd/get_test.go index 06755e556..eccae1894 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get_test.go @@ -5,6 +5,26 @@ import ( "testing" ) +// cleanNode scrubs Expiration, ModifiedIndex and CreatedIndex of a node. +func cleanNode(n *Node) { + n.Expiration = nil + n.ModifiedIndex = 0 + n.CreatedIndex = 0 +} + +// cleanResult scrubs a result object two levels deep of Expiration, +// ModifiedIndex and CreatedIndex. +func cleanResult(result *Response) { + // TODO(philips): make this recursive. + cleanNode(result.Node) + for i, _ := range result.Node.Nodes { + cleanNode(&result.Node.Nodes[i]) + for j, _ := range result.Node.Nodes[i].Nodes { + cleanNode(&result.Node.Nodes[i].Nodes[j]) + } + } +} + func TestGet(t *testing.T) { c := NewClient(nil) defer func() { @@ -48,25 +68,18 @@ func TestGetAll(t *testing.T) { expected := Nodes{ Node{ - Key: "/fooDir/k0", - Value: "v0", - TTL: 5, - ModifiedIndex: 31, - CreatedIndex: 31, + Key: "/fooDir/k0", + Value: "v0", + TTL: 5, }, Node{ - Key: "/fooDir/k1", - Value: "v1", - TTL: 5, - ModifiedIndex: 32, - CreatedIndex: 32, + Key: "/fooDir/k1", + Value: "v1", + TTL: 5, }, } - // do not check expiration time, too hard to fake - for i, _ := range result.Node.Nodes { - result.Node.Nodes[i].Expiration = nil - } + cleanResult(result) if !reflect.DeepEqual(result.Node.Nodes, expected) { t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected) @@ -79,16 +92,7 @@ func TestGetAll(t *testing.T) { // Return kv-pairs in sorted order result, err = c.Get("fooDir", true, true) - // do not check expiration time, too hard to fake - result.Node.Expiration = nil - for i, _ := range result.Node.Nodes { - result.Node.Nodes[i].Expiration = nil - if result.Node.Nodes[i].Nodes != nil { - for j, _ := range result.Node.Nodes[i].Nodes { - result.Node.Nodes[i].Nodes[j].Expiration = nil - } - } - } + cleanResult(result) if err != nil { t.Fatal(err) @@ -100,33 +104,27 @@ func TestGetAll(t *testing.T) { Dir: true, Nodes: Nodes{ Node{ - Key: "/fooDir/childDir/k2", - Value: "v2", - TTL: 5, - ModifiedIndex: 34, - CreatedIndex: 34, + Key: "/fooDir/childDir/k2", + Value: "v2", + TTL: 5, }, }, - TTL: 5, - ModifiedIndex: 33, - CreatedIndex: 33, + TTL: 5, }, Node{ - Key: "/fooDir/k0", - Value: "v0", - TTL: 5, - ModifiedIndex: 31, - CreatedIndex: 31, + Key: "/fooDir/k0", + Value: "v0", + TTL: 5, }, Node{ - Key: "/fooDir/k1", - Value: "v1", - TTL: 5, - ModifiedIndex: 32, - CreatedIndex: 32, + Key: "/fooDir/k1", + Value: "v1", + TTL: 5, }, } + cleanResult(result) + if !reflect.DeepEqual(result.Node.Nodes, expected) { t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/options.go b/third_party/github.com/coreos/go-etcd/etcd/options.go index 93efdca57..335a0c218 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/options.go +++ b/third_party/github.com/coreos/go-etcd/etcd/options.go @@ -36,6 +36,8 @@ var ( VALID_DELETE_OPTIONS = validOptions{ "recursive": reflect.Bool, "dir": reflect.Bool, + "prevValue": reflect.String, + "prevIndex": reflect.Uint64, } ) diff --git a/third_party/github.com/coreos/go-etcd/etcd/requests.go b/third_party/github.com/coreos/go-etcd/etcd/requests.go index 511c51aac..c16f7d464 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/requests.go +++ b/third_party/github.com/coreos/go-etcd/etcd/requests.go @@ -13,9 +13,6 @@ import ( // get issues a GET request func (c *Client) get(key string, options options) (*RawResponse, error) { - logger.Debugf("get %s [%s]", key, c.cluster.Leader) - p := keyToPath(key) - // If consistency level is set to STRONG, append // the `consistent` query string. if c.config.Consistency == STRONG_CONSISTENCY { @@ -26,9 +23,8 @@ func (c *Client) get(key string, options options) (*RawResponse, error) { if err != nil { return nil, err } - p += str - resp, err := c.sendRequest("GET", p, nil) + resp, err := c.sendKeyRequest("GET", key, str, nil) if err != nil { return nil, err @@ -41,16 +37,12 @@ func (c *Client) get(key string, options options) (*RawResponse, error) { func (c *Client) put(key string, value string, ttl uint64, options options) (*RawResponse, error) { - logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) - p := keyToPath(key) - str, err := options.toParameters(VALID_PUT_OPTIONS) if err != nil { return nil, err } - p += str - resp, err := c.sendRequest("PUT", p, buildValues(value, ttl)) + resp, err := c.sendKeyRequest("PUT", key, str, buildValues(value, ttl)) if err != nil { return nil, err @@ -61,10 +53,7 @@ func (c *Client) put(key string, value string, ttl uint64, // post issues a POST request func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) { - logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) - p := keyToPath(key) - - resp, err := c.sendRequest("POST", p, buildValues(value, ttl)) + resp, err := c.sendKeyRequest("POST", key, "", buildValues(value, ttl)) if err != nil { return nil, err @@ -75,16 +64,12 @@ func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error // delete issues a DELETE request func (c *Client) delete(key string, options options) (*RawResponse, error) { - logger.Debugf("delete %s [%s]", key, c.cluster.Leader) - p := keyToPath(key) - str, err := options.toParameters(VALID_DELETE_OPTIONS) if err != nil { return nil, err } - p += str - resp, err := c.sendRequest("DELETE", p, nil) + resp, err := c.sendKeyRequest("DELETE", key, str, nil) if err != nil { return nil, err @@ -93,8 +78,8 @@ func (c *Client) delete(key string, options options) (*RawResponse, error) { return resp, nil } -// sendRequest sends a HTTP request and returns a Response as defined by etcd -func (c *Client) sendRequest(method string, relativePath string, +// sendKeyRequest sends a HTTP request and returns a Response as defined by etcd +func (c *Client) sendKeyRequest(method string, key string, params string, values url.Values) (*RawResponse, error) { var req *http.Request @@ -105,6 +90,11 @@ func (c *Client) sendRequest(method string, relativePath string, trial := 0 + logger.Debugf("%s %s %s [%s]", method, key, params, c.cluster.Leader) + + // Build the request path if no prefix exists + relativePath := path.Join(c.keyPrefix, key) + params + // if we connect to a follower, we will retry until we found a leader for { trial++ @@ -146,7 +136,8 @@ func (c *Client) sendRequest(method string, relativePath string, // network error, change a machine! if resp, err = c.httpClient.Do(req); err != nil { - c.switchLeader(trial % len(c.cluster.Machines)) + logger.Debug("network error: ", err.Error()) + c.cluster.switchLeader(trial % len(c.cluster.Machines)) time.Sleep(time.Millisecond * 200) continue } @@ -195,7 +186,7 @@ func (c *Client) handleResp(resp *http.Response) (bool, []byte) { if err != nil { logger.Warning(err) } else { - c.updateLeader(u) + c.cluster.updateLeaderFromURL(u) } return false, nil @@ -219,18 +210,14 @@ func (c *Client) handleResp(resp *http.Response) (bool, []byte) { func (c *Client) getHttpPath(random bool, s ...string) string { var machine string + if random { machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))] } else { machine = c.cluster.Leader } - fullPath := machine + "/" + version - for _, seg := range s { - fullPath = fullPath + "/" + seg - } - - return fullPath + return machine + "/" + strings.Join(s, "/") } // buildValues builds a url.Values map according to the given value and ttl @@ -249,17 +236,14 @@ func buildValues(value string, ttl uint64) url.Values { } // convert key string to http path exclude version -// for example: key[foo] -> path[keys/foo] -// key[/] -> path[keys/] +// for example: key[foo] -> path[foo] +// key[] -> path[/] func keyToPath(key string) string { - p := path.Join("keys", key) + clean := path.Clean(key) - // corner case: if key is "/" or "//" ect - // path join will clear the tailing "/" - // we need to add it back - if p == "keys" { - p = "keys/" + if clean == "" || clean == "." { + return "/" } - return p + return clean } diff --git a/third_party/github.com/coreos/go-etcd/etcd/requests_test.go b/third_party/github.com/coreos/go-etcd/etcd/requests_test.go new file mode 100644 index 000000000..a7160c126 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/requests_test.go @@ -0,0 +1,50 @@ +package etcd + +import ( + "path" + "testing" +) + +func testKey(t *testing.T, in, exp string) { + if keyToPath(in) != exp { + t.Errorf("Expected %s got %s", exp, keyToPath(in)) + } +} + +// TestKeyToPath ensures the key cleaning funciton keyToPath works in a number +// of cases. +func TestKeyToPath(t *testing.T) { + testKey(t, "", "/") + testKey(t, "/", "/") + testKey(t, "///", "/") + testKey(t, "hello/world/", "hello/world") + testKey(t, "///hello////world/../", "/hello") +} + +func testPath(t *testing.T, c *Client, in, exp string) { + out := c.getHttpPath(false, in) + + if out != exp { + t.Errorf("Expected %s got %s", exp, out) + } +} + +// TestHttpPath ensures that the URLs generated make sense for the given keys +func TestHttpPath(t *testing.T) { + c := NewClient(nil) + + testPath(t, c, + path.Join(c.keyPrefix, "hello") + "?prevInit=true", + "http://127.0.0.1:4001/v2/keys/hello?prevInit=true") + + testPath(t, c, + path.Join(c.keyPrefix, "///hello///world") + "?prevInit=true", + "http://127.0.0.1:4001/v2/keys/hello/world?prevInit=true") + + c = NewClient([]string{"https://discovery.etcd.io"}) + c.SetKeyPrefix("") + + testPath(t, c, + path.Join(c.keyPrefix, "hello") + "?prevInit=true", + "https://discovery.etcd.io/hello?prevInit=true") +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/utils.go b/third_party/github.com/coreos/go-etcd/etcd/utils.go deleted file mode 100644 index eb2f6046f..000000000 --- a/third_party/github.com/coreos/go-etcd/etcd/utils.go +++ /dev/null @@ -1,33 +0,0 @@ -// Utility functions - -package etcd - -import ( - "fmt" - "net/url" - "reflect" -) - -// Convert options to a string of HTML parameters -func optionsToString(options options, vops validOptions) (string, error) { - p := "?" - v := url.Values{} - for opKey, opVal := range options { - // Check if the given option is valid (that it exists) - kind := vops[opKey] - if kind == reflect.Invalid { - return "", fmt.Errorf("Invalid option: %v", opKey) - } - - // Check if the given option is of the valid type - t := reflect.TypeOf(opVal) - if kind != t.Kind() { - return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.", - opKey, kind, t.Kind()) - } - - v.Set(opKey, fmt.Sprintf("%v", opVal)) - } - p += v.Encode() - return p, nil -} From 8687dd38020ec86085d6df167eca3eb79648fbab Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Tue, 4 Feb 2014 08:34:27 -0800 Subject: [PATCH 07/11] feat(discovery): fully working discovery now --- Documentation/discovery-protocol.md | 10 +++---- config/config.go | 46 ++++++++++++++++++++--------- discovery/discovery.go | 30 +++++++++++-------- scripts/test-cluster | 13 ++++++-- server/peer_server_handlers.go | 6 ++-- server/usage.go | 2 +- 6 files changed, 70 insertions(+), 37 deletions(-) diff --git a/Documentation/discovery-protocol.md b/Documentation/discovery-protocol.md index 2d8a2c734..24253455e 100644 --- a/Documentation/discovery-protocol.md +++ b/Documentation/discovery-protocol.md @@ -1,14 +1,14 @@ # Discovery Protocol -Starting an etcd cluster initially can be painful since each machine needs to know of at least one live machine in the cluster. If you are trying to bring up a cluster all at once, say using an AWS cloud formation, you also need to coordinate who will be the initial cluster leader. The discovery protocol helps you by providing a way to discover the peers in a new etcd cluster using another already running etcd cluster. +Starting a new etcd cluster can be painful since each machine needs to know of at least one live machine in the cluster. If you are trying to bring up a new cluster all at once, say using an AWS cloud formation, you also need to coordinate who will be the initial cluster leader. The discovery protocol uses an existing running etcd cluster to start a second etcd cluster. -To use this protocol you add the command line flag `-discovery` to your etcd args. In this example we will use `http://example.com/v2/keys/_etcd/registry` as the URL prefix. +To use this feature you add the command line flag `-discovery` to your etcd args. In this example we will use `http://example.com/v2/keys/_etcd/registry` as the URL prefix. ## The Protocol By convention the etcd discovery protocol uses the key prefix `_etcd/registry`. A full URL to the keyspace will be `http://example.com/v2/keys/_etcd/registry`. -## Creating a New Cluster +### Creating a New Cluster Generate a unique token that will identify the new cluster and create a key called "_state". If you get a `201 Created` back then your key is unused and you can proceed with cluster creation. If the return value is `412 Precondition Failed` then you will need to create a new token. @@ -17,7 +17,7 @@ UUID=$(uuidgen) curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/_state?prevExist=false" -d value=init ``` -## Bringing up Machines +### Bringing up Machines Now that you have your cluster ID you can start bringing up machines. Every machine will follow this protocol internally in etcd if given a `-discovery`. @@ -29,7 +29,7 @@ The first thing etcd must do is register your machine. This is done by using the curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/${etcd_machine_name}?ttl=604800" -d value=${peer_addr} ``` -### Figuring out your Peers +### Discovering Peers Now that this etcd machine is registered it must discover its peers. diff --git a/config/config.go b/config/config.go index ee066b3a2..831d7ef86 100644 --- a/config/config.go +++ b/config/config.go @@ -145,21 +145,9 @@ func (c *Config) Load(arguments []string) error { // Attempt cluster discovery if c.Discovery != "" { - p, err := discovery.Do(c.Discovery, c.Name, c.Peer.Addr) - if err != nil { - log.Fatalf("Bootstrapping encountered an unexpected error: %v", err) + if err := c.handleDiscovery(); err != nil { + return err } - - for i := range p { - // Strip the scheme off of the peer if it has one - // TODO(bp): clean this up! - purl, err := url.Parse(p[i]) - if err == nil { - p[i] = purl.Host - } - } - - c.Peers = p } // Force remove server configuration if specified. @@ -226,6 +214,36 @@ func (c *Config) loadEnv(target interface{}) error { return nil } +func (c *Config) handleDiscovery() error { + p, err := discovery.Do(c.Discovery, c.Name, c.Peer.Addr) + + // This is fatal, discovery encountered an unexpected error + // and we have no peer list. + if err != nil && len(c.Peers) == 0 { + log.Fatalf("Discovery failed and a backup peer list wasn't provided: %v", err) + return err + } + + // Warn about errors coming from discovery, this isn't fatal + // since the user might have provided a peer list elsewhere. + if err != nil { + log.Warnf("Discovery encountered an error but a backup peer list (%v) was provided: %v", c.Peers, err) + } + + for i := range p { + // Strip the scheme off of the peer if it has one + // TODO(bp): clean this up! + purl, err := url.Parse(p[i]) + if err == nil { + p[i] = purl.Host + } + } + + c.Peers = p + + return nil +} + // Loads configuration from command line flags. func (c *Config) LoadFlags(arguments []string) error { var peers, cors, path string diff --git a/discovery/discovery.go b/discovery/discovery.go index ea3b56812..f87af2366 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -9,7 +9,7 @@ import ( "time" "github.com/coreos/etcd/log" - "github.com/coreos/go-etcd/etcd" + "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" ) const ( @@ -44,14 +44,21 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers [] return } - // prefix is appended to all keys + // prefix is prepended to all keys for this discovery d.prefix = strings.TrimPrefix(u.Path, "/v2/keys/") - // Connect to a scheme://host not a full URL with path + // keep the old path in case we need to set the KeyPrefix below + oldPath := u.Path u.Path = "" - log.Infof("Bootstrapping via %s using prefix %s.", u.String(), d.prefix) + + // Connect to a scheme://host not a full URL with path + log.Infof("Discovery via %s using prefix %s.", u.String(), d.prefix) d.client = etcd.NewClient([]string{u.String()}) + if !strings.HasPrefix(oldPath, "/v2/keys") { + d.client.SetKeyPrefix("") + } + // Register this machine first and announce that we are a member of // this cluster err = d.heartbeat() @@ -68,7 +75,7 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers [] // Bail out on unexpected errors if err != nil { - if etcdErr, ok := err.(etcd.EtcdError); !ok || etcdErr.ErrorCode != 101 { + if etcdErr, ok := err.(*etcd.EtcdError); !ok || etcdErr.ErrorCode != 101 { return nil, err } } @@ -76,11 +83,11 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers [] // If we got a response then the CAS was successful, we are leader if resp != nil && resp.Node.Value == startedState { // We are the leader, we have no peers - log.Infof("Bootstrapping was in 'init' state this machine is the initial leader.") + log.Infof("Discovery was in the 'init' state this machine is the initial leader.") return nil, nil } - // Fall through to finding the other discoveryped peers + // Fall through to finding the other discovery peers return d.findPeers() } @@ -93,7 +100,7 @@ func (d *Discoverer) findPeers() (peers []string, err error) { node := resp.Node if node == nil { - return nil, errors.New(fmt.Sprintf("%s key doesn't exist.", d.prefix)) + return nil, fmt.Errorf("%s key doesn't exist.", d.prefix) } for _, n := range node.Nodes { @@ -105,10 +112,10 @@ func (d *Discoverer) findPeers() (peers []string, err error) { } if len(peers) == 0 { - return nil, errors.New("No peers found.") + return nil, errors.New("Discovery found an initialized cluster but no peers are registered.") } - log.Infof("Bootstrap found peers %v", peers) + log.Infof("Discovery found peers %v", peers) return } @@ -122,7 +129,7 @@ func (d *Discoverer) startHeartbeat() { case <-ticker: err := d.heartbeat() if err != nil { - log.Warnf("Bootstrapping heartbeat failed: %v", err) + log.Warnf("Discovery heartbeat failed: %v", err) } } } @@ -130,7 +137,6 @@ func (d *Discoverer) startHeartbeat() { func (d *Discoverer) heartbeat() error { _, err := d.client.Set(path.Join(d.prefix, d.name), d.peer, defaultTTL) - return err } diff --git a/scripts/test-cluster b/scripts/test-cluster index af6022534..29510a6bc 100755 --- a/scripts/test-cluster +++ b/scripts/test-cluster @@ -6,16 +6,25 @@ ulimit -n unlimited tmux new-session -d -s $SESSION +peer_args= +if [ -n "${DISCOVERY_URL}" ]; then + peer_args="-discovery ${DISCOVERY_URL}" +fi + # Setup a window for tailing log files tmux new-window -t $SESSION:1 -n 'peers' tmux split-window -h tmux select-pane -t 0 -tmux send-keys "${DIR}/../bin/etcd -peer-addr 127.0.0.1:7001 -addr 127.0.0.1:4001 -data-dir peer1 -name peer1" C-m +tmux send-keys "${DIR}/../bin/etcd -peer-addr 127.0.0.1:7001 -addr 127.0.0.1:4001 -data-dir peer1 -name peer1 ${peer_args}" C-m + +if [ -n "${peer_args}" ]; then + peer_args="-peers 127.0.0.1:7001" +fi for i in 2 3; do tmux select-pane -t 0 tmux split-window -v - tmux send-keys "${DIR}/../bin/etcd -cors='*' -peer-addr 127.0.0.1:700${i} -addr 127.0.0.1:400${i} -peers 127.0.0.1:7001 -data-dir peer${i} -name peer${i}" C-m + tmux send-keys "${DIR}/../bin/etcd -cors='*' -peer-addr 127.0.0.1:700${i} -addr 127.0.0.1:400${i} -data-dir peer${i} -name peer${i} ${peer_args}" C-m done # Attach to session diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index f97c2470f..a32adb3cd 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -6,13 +6,13 @@ import ( "strconv" "time" + "github.com/coreos/etcd/third_party/github.com/coreos/raft" + "github.com/coreos/etcd/third_party/github.com/gorilla/mux" + etcdErr "github.com/coreos/etcd/error" uhttp "github.com/coreos/etcd/pkg/http" "github.com/coreos/etcd/log" "github.com/coreos/etcd/store" - - "github.com/coreos/etcd/third_party/github.com/coreos/raft" - "github.com/coreos/etcd/third_party/github.com/gorilla/mux" ) // Get all the current logs diff --git a/server/usage.go b/server/usage.go index 4d9340515..4e47512c5 100644 --- a/server/usage.go +++ b/server/usage.go @@ -26,7 +26,7 @@ Options: -vv Enabled very verbose logging. Cluster Configuration Options: - -bootstrap-url= URL to use for bootstrapping the peer list. + -discovery= Discovery service used to find a peer list. -peers-file= Path to a file containing the peer list. -peers=, Comma-separated list of peers. The members should match the peer's '-peer-addr' flag. From a8b07b1b4840e4b6f21ab9fadc8088edf6d2547c Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Tue, 4 Feb 2014 11:59:23 -0800 Subject: [PATCH 08/11] chore(config): go fmt --- config/config.go | 116 +++++++++++++++++++++++------------------------ 1 file changed, 58 insertions(+), 58 deletions(-) diff --git a/config/config.go b/config/config.go index 831d7ef86..983435a9b 100644 --- a/config/config.go +++ b/config/config.go @@ -26,64 +26,64 @@ const DefaultSystemConfigPath = "/etc/etcd/etcd.conf" // A lookup of deprecated flags to their new flag name. var newFlagNameLookup = map[string]string{ - "C": "peers", - "CF": "peers-file", - "n": "name", - "c": "addr", - "cl": "bind-addr", - "s": "peer-addr", - "sl": "peer-bind-addr", - "d": "data-dir", - "m": "max-result-buffer", - "r": "max-retry-attempts", - "maxsize": "max-cluster-size", - "clientCAFile": "ca-file", - "clientCert": "cert-file", - "clientKey": "key-file", - "serverCAFile": "peer-ca-file", - "serverCert": "peer-cert-file", - "serverKey": "peer-key-file", - "snapshotCount": "snapshot-count", + "C": "peers", + "CF": "peers-file", + "n": "name", + "c": "addr", + "cl": "bind-addr", + "s": "peer-addr", + "sl": "peer-bind-addr", + "d": "data-dir", + "m": "max-result-buffer", + "r": "max-retry-attempts", + "maxsize": "max-cluster-size", + "clientCAFile": "ca-file", + "clientCert": "cert-file", + "clientKey": "key-file", + "serverCAFile": "peer-ca-file", + "serverCert": "peer-cert-file", + "serverKey": "peer-key-file", + "snapshotCount": "snapshot-count", } // Config represents the server configuration. type Config struct { - SystemPath string + SystemPath string - Addr string `toml:"addr" env:"ETCD_ADDR"` - BindAddr string `toml:"bind_addr" env:"ETCD_BIND_ADDR"` - CAFile string `toml:"ca_file" env:"ETCD_CA_FILE"` - CertFile string `toml:"cert_file" env:"ETCD_CERT_FILE"` - CPUProfileFile string - CorsOrigins []string `toml:"cors" env:"ETCD_CORS"` - DataDir string `toml:"data_dir" env:"ETCD_DATA_DIR"` - Discovery string `toml:"discovery" env:"ETCD_DISCOVERY"` - Force bool - KeyFile string `toml:"key_file" env:"ETCD_KEY_FILE"` - Peers []string `toml:"peers" env:"ETCD_PEERS"` - PeersFile string `toml:"peers_file" env:"ETCD_PEERS_FILE"` - MaxClusterSize int `toml:"max_cluster_size" env:"ETCD_MAX_CLUSTER_SIZE"` - MaxResultBuffer int `toml:"max_result_buffer" env:"ETCD_MAX_RESULT_BUFFER"` - MaxRetryAttempts int `toml:"max_retry_attempts" env:"ETCD_MAX_RETRY_ATTEMPTS"` - Name string `toml:"name" env:"ETCD_NAME"` - Snapshot bool `toml:"snapshot" env:"ETCD_SNAPSHOT"` - SnapshotCount int `toml:"snapshot_count" env:"ETCD_SNAPSHOTCOUNT"` - ShowHelp bool - ShowVersion bool - Verbose bool `toml:"verbose" env:"ETCD_VERBOSE"` - VeryVerbose bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"` - VeryVeryVerbose bool `toml:"very_very_verbose" env:"ETCD_VERY_VERY_VERBOSE"` - Peer struct { - Addr string `toml:"addr" env:"ETCD_PEER_ADDR"` - BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"` - CAFile string `toml:"ca_file" env:"ETCD_PEER_CA_FILE"` - CertFile string `toml:"cert_file" env:"ETCD_PEER_CERT_FILE"` - KeyFile string `toml:"key_file" env:"ETCD_PEER_KEY_FILE"` - HeartbeatTimeout int `toml:"heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"` - ElectionTimeout int `toml:"election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"` + Addr string `toml:"addr" env:"ETCD_ADDR"` + BindAddr string `toml:"bind_addr" env:"ETCD_BIND_ADDR"` + CAFile string `toml:"ca_file" env:"ETCD_CA_FILE"` + CertFile string `toml:"cert_file" env:"ETCD_CERT_FILE"` + CPUProfileFile string + CorsOrigins []string `toml:"cors" env:"ETCD_CORS"` + DataDir string `toml:"data_dir" env:"ETCD_DATA_DIR"` + Discovery string `toml:"discovery" env:"ETCD_DISCOVERY"` + Force bool + KeyFile string `toml:"key_file" env:"ETCD_KEY_FILE"` + Peers []string `toml:"peers" env:"ETCD_PEERS"` + PeersFile string `toml:"peers_file" env:"ETCD_PEERS_FILE"` + MaxClusterSize int `toml:"max_cluster_size" env:"ETCD_MAX_CLUSTER_SIZE"` + MaxResultBuffer int `toml:"max_result_buffer" env:"ETCD_MAX_RESULT_BUFFER"` + MaxRetryAttempts int `toml:"max_retry_attempts" env:"ETCD_MAX_RETRY_ATTEMPTS"` + Name string `toml:"name" env:"ETCD_NAME"` + Snapshot bool `toml:"snapshot" env:"ETCD_SNAPSHOT"` + SnapshotCount int `toml:"snapshot_count" env:"ETCD_SNAPSHOTCOUNT"` + ShowHelp bool + ShowVersion bool + Verbose bool `toml:"verbose" env:"ETCD_VERBOSE"` + VeryVerbose bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"` + VeryVeryVerbose bool `toml:"very_very_verbose" env:"ETCD_VERY_VERY_VERBOSE"` + Peer struct { + Addr string `toml:"addr" env:"ETCD_PEER_ADDR"` + BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"` + CAFile string `toml:"ca_file" env:"ETCD_PEER_CA_FILE"` + CertFile string `toml:"cert_file" env:"ETCD_PEER_CERT_FILE"` + KeyFile string `toml:"key_file" env:"ETCD_PEER_KEY_FILE"` + HeartbeatTimeout int `toml:"heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"` + ElectionTimeout int `toml:"election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"` } - strTrace string `toml:"trace" env:"ETCD_TRACE"` - GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"` + strTrace string `toml:"trace" env:"ETCD_TRACE"` + GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"` } // New returns a Config initialized with default values. @@ -477,9 +477,9 @@ func (c *Config) Sanitize() error { // TLSInfo retrieves a TLSInfo object for the client server. func (c *Config) TLSInfo() server.TLSInfo { return server.TLSInfo{ - CAFile: c.CAFile, - CertFile: c.CertFile, - KeyFile: c.KeyFile, + CAFile: c.CAFile, + CertFile: c.CertFile, + KeyFile: c.KeyFile, } } @@ -491,9 +491,9 @@ func (c *Config) TLSConfig() (server.TLSConfig, error) { // PeerTLSInfo retrieves a TLSInfo object for the peer server. func (c *Config) PeerTLSInfo() server.TLSInfo { return server.TLSInfo{ - CAFile: c.Peer.CAFile, - CertFile: c.Peer.CertFile, - KeyFile: c.Peer.KeyFile, + CAFile: c.Peer.CAFile, + CertFile: c.Peer.CertFile, + KeyFile: c.Peer.KeyFile, } } From ff6090836ceb7f1f0371ed337687c4446fc864e9 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Tue, 4 Feb 2014 17:06:56 -0800 Subject: [PATCH 09/11] fix(tests/server_utils): add a metrics bucket This is required to avoid getting nil pointer exceptions if a peer joins this test server. --- tests/server_utils.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/server_utils.go b/tests/server_utils.go index 00f629d00..8c1f360f7 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -8,6 +8,7 @@ import ( "github.com/coreos/etcd/third_party/github.com/coreos/raft" + "github.com/coreos/etcd/metrics" "github.com/coreos/etcd/server" "github.com/coreos/etcd/store" ) @@ -39,7 +40,10 @@ func RunServer(f func(*server.Server)) { SnapshotCount: testSnapshotCount, MaxClusterSize: 9, } - ps := server.NewPeerServer(psConfig, registry, store, nil, followersStats, serverStats) + + mb := metrics.NewBucket("") + + ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats) psListener, err := server.NewListener(testRaftURL) if err != nil { panic(err) From 2822b9c5791eeb0a3cf9b2b67c37490a34704e0d Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Tue, 4 Feb 2014 17:07:51 -0800 Subject: [PATCH 10/11] tests(tests/functional): add tests for Discovery This tests a variety of failure cases for the Discovery service including: - Initial leader failures - Discovery service failures - Positive tests for discovery working flawlessly --- tests/functional/discovery_test.go | 245 +++++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 tests/functional/discovery_test.go diff --git a/tests/functional/discovery_test.go b/tests/functional/discovery_test.go new file mode 100644 index 000000000..0ba391b90 --- /dev/null +++ b/tests/functional/discovery_test.go @@ -0,0 +1,245 @@ +package test + +import ( + "errors" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" + + etcdtest "github.com/coreos/etcd/tests" + "github.com/coreos/etcd/server" + goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" +) + +type garbageHandler struct { + t *testing.T + success bool +} + +func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "Hello, client") + println("HI") + if r.URL.String() != "/v2/keys/_etcd/registry/1/node1" { + g.t.Fatalf("Unexpected web request") + } + g.success = true +} + +// TestDiscoveryDownNoBackupPeers ensures that etcd stops if it is started with a +// bad discovery URL and no backups. +func TestDiscoveryDownNoBackupPeers(t *testing.T) { + g := garbageHandler{t: t} + ts := httptest.NewServer(&g) + defer ts.Close() + + discover := ts.URL + "/v2/keys/_etcd/registry/1" + proc, err := startServer([]string{"-discovery", discover}) + + if err != nil { + t.Fatal(err.Error()) + } + defer stopServer(proc) + + client := http.Client{} + err = assertServerNotUp(client, "http") + if err != nil { + t.Fatal(err.Error()) + } + + if !g.success { + t.Fatal("Discovery server never called") + } +} + +// TestDiscoveryDownWithBackupPeers ensures that etcd runs if it is started with a +// bad discovery URL and a peer list. +func TestDiscoveryDownWithBackupPeers(t *testing.T) { + etcdtest.RunServer(func(s *server.Server) { + g := garbageHandler{t: t} + ts := httptest.NewServer(&g) + defer ts.Close() + + discover := ts.URL + "/v2/keys/_etcd/registry/1" + u, ok := s.PeerURL("ETCDTEST") + if !ok { + t.Fatalf("Couldn't find the URL") + } + proc, err := startServer([]string{"-discovery", discover, "-peers", u}) + + if err != nil { + t.Fatal(err.Error()) + } + defer stopServer(proc) + + client := http.Client{} + err = assertServerFunctional(client, "http") + if err != nil { + t.Fatal(err.Error()) + } + + if !g.success { + t.Fatal("Discovery server never called") + } + }) +} + +// TestDiscoveryFirstPeer ensures that etcd starts as the leader if it +// registers as the first peer. +func TestDiscoveryFirstPeer(t *testing.T) { + etcdtest.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "init") + resp, err := etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/_state"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + + proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/2"}) + if err != nil { + t.Fatal(err.Error()) + } + defer stopServer(proc) + + client := http.Client{} + err = assertServerFunctional(client, "http") + if err != nil { + t.Fatal(err.Error()) + } + }) +} + +// TestDiscoverySecondPeerFirstDown ensures that etcd stops if it is started with a +// correct discovery URL but no active machines are found. +func TestDiscoverySecondPeerFirstDown(t *testing.T) { + etcdtest.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "started") + resp, err := etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/_state"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + + proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/2"}) + if err != nil { + t.Fatal(err.Error()) + } + defer stopServer(proc) + + client := http.Client{} + err = assertServerNotUp(client, "http") + if err != nil { + t.Fatal(err.Error()) + } + }) +} + +// TestDiscoverySecondPeerFirstNoResponse ensures that if the first etcd +// machine stops after heartbeating that the second machine fails too. +func TestDiscoverySecondPeerFirstNoResponse(t *testing.T) { + etcdtest.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "started") + resp, err := etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/_state"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + + v = url.Values{} + v.Set("value", "http://127.0.0.1:49151") + resp, err = etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/ETCDTEST"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + + proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/2"}) + if err != nil { + t.Fatal(err.Error()) + } + defer stopServer(proc) + + // TODO(bp): etcd will take 30 seconds to shutdown, figure this + // out instead + time.Sleep(35 * time.Second) + + client := http.Client{} + _, err = client.Get("/") + if err != nil && strings.Contains(err.Error(), "connection reset by peer") { + t.Fatal(err.Error()) + } + }) +} + +// TestDiscoverySecondPeerUp ensures that a second peer joining a discovery +// cluster works. +func TestDiscoverySecondPeerUp(t *testing.T) { + etcdtest.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "started") + resp, err := etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/3/_state"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + + u, ok := s.PeerURL("ETCDTEST") + if !ok { + t.Fatalf("Couldn't find the URL") + } + + wc := goetcd.NewClient([]string{s.URL()}) + _, err = wc.Set("test", "0", 0) + + if err != nil { + t.Fatalf("Couldn't set a test key on the leader %v", err) + } + + receiver := make(chan *goetcd.Response) + stop := make(chan bool) + + go wc.Watch("_etcd/registry/3/node1", 0, false, receiver, stop) + + v = url.Values{} + v.Set("value", u) + resp, err = etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/3/ETCDTEST"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + + proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/3"}) + if err != nil { + t.Fatal(err.Error()) + } + defer stopServer(proc) + + // Test to ensure the machine registered iteslf + watchResp := <-receiver + if watchResp.Node.Value != "http://127.0.0.1:7001" { + t.Fatalf("Second peer didn't register! %s", watchResp.Node.Value) + } + + // TODO(bp): need to have a better way of knowing a machine is up + time.Sleep(1 * time.Second) + + etcdc := goetcd.NewClient(nil) + _, err = etcdc.Set("foobar", "baz", 0) + if err != nil { + t.Fatal(err.Error()) + } + }) +} + +func assertServerNotUp(client http.Client, scheme string) error { + path := fmt.Sprintf("%s://127.0.0.1:4001/v2/keys/foo", scheme) + fields := url.Values(map[string][]string{"value": []string{"bar"}}) + + for i := 0; i < 10; i++ { + time.Sleep(1 * time.Second) + + _, err := client.PostForm(path, fields) + if err == nil { + return errors.New("Expected error during POST, got nil") + } else { + errString := err.Error() + if strings.Contains(errString, "connection refused") { + return nil + } else { + return err + } + } + } + + return nil +} From 2d75ef0c7a58c7c94dd3d96a1e3532f763221d1b Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Tue, 4 Feb 2014 17:13:24 -0800 Subject: [PATCH 11/11] feat(Documentation/discovery-protocol): explain heartbeating Explain more information about how the TTL works and etcds role. --- Documentation/discovery-protocol.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Documentation/discovery-protocol.md b/Documentation/discovery-protocol.md index 24253455e..177a7186a 100644 --- a/Documentation/discovery-protocol.md +++ b/Documentation/discovery-protocol.md @@ -79,4 +79,10 @@ Using this information you can connect to the rest of the peers in the cluster. ### Heartbeating -At this point you will want to heartbeat your registration URL every few hours. This will be done via a Go routine inside of etcd. +At this point etcd will start heart beating to your registration URL. The +protocol uses a heartbeat so permanently deleted nodes get slowly removed from +the discovery information cluster. + +The heartbeat interval is about once per day and the TTL is one week. This +should give a sufficiently wide window to protect against a discovery service +taking a temporary outage yet provide adequate cleanup.