mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
feat(bootstrap): initial working code
This is an initial version of the bootstrap code that seems to work under the normal circumstances. I need to mock out a server that will test out all of the error cases now.
This commit is contained in:
parent
40a8542c22
commit
72514f8ab2
@ -1,82 +0,0 @@
|
||||
# Bootstrap Protocol
|
||||
|
||||
Bootstrapping an etcd cluster can be painful since each node needs to know of another node in the cluster to get started. If you are trying to bring up a cluster all at once, say using a cloud formation, you also need to coordinate who will be the initial cluster leader. The bootstrapping protocol helps you by providing a way to bootstrap an etcd instance using another already running instance.
|
||||
|
||||
To enable use of this protocol you add the command line flag `-bootstrap-url` to your etcd args. In this example we will use `http://example.com/v2/keys/bootstrap/` as the URL prefix.
|
||||
|
||||
## The Protocol
|
||||
|
||||
By convention the etcd bootstrapping protocol uses the key prefix `bootstrap`. A full URL to the keyspace will be `http://example.com/v2/keys/bootstrap`.
|
||||
|
||||
## Creating a New Cluster
|
||||
|
||||
Generate a unique (secret) token that will identify the cluster and create a key called "_state". If you get a `201 Created` back then your key is unused and you can proceed with cluster creation. If the return value is `412 Precondition Failed` then you will need to create a new token.
|
||||
|
||||
```
|
||||
UUID=$(uuidgen)
|
||||
curl -X PUT "http://example.com/v2/keys/bootstrap/${UUID}/_state?prevExist=false?ttl=" -d value=init
|
||||
```
|
||||
|
||||
## Bringing up Machines
|
||||
|
||||
Now that you have your cluster ID you can start bringing up machines. Every machine will follow this protocol internally in etcd if given a `-bootstrap-url`.
|
||||
|
||||
### Registering your Machine
|
||||
|
||||
The first thing etcd must do is register your machine. This is done by using the machine name (from the `-name` arg) and posting it with a long TTL to the given key.
|
||||
|
||||
```
|
||||
curl -X PUT "http://example.com/v2/keys/bootstrap/${UUID}/${etcd_machine_name}?ttl=604800" -d value=${peer_addr}
|
||||
```
|
||||
|
||||
### Figuring out your Peers
|
||||
|
||||
Now that this etcd machine is registered it must figure out its peers.
|
||||
|
||||
But, the tricky bit of bootstrapping is that one machine needs to assume the initial role of leader and will have no peers. To figure out if another machine has taken on this etcd needs to update the `_state` key from "init" to "started":
|
||||
|
||||
```
|
||||
curl -X PUT "http://example.com/v2/keys/bootstrap/${UUID}/_state?prevValue=init" -d value=started
|
||||
```
|
||||
|
||||
If this returns a `200 OK` response then this machine is the initial leader and should start with no peers configured. If, however, this returns a `412 Precondition Failed` then you need to find all of the registered peers:
|
||||
|
||||
```
|
||||
curl -X GET "http://example.com/v2/keys/bootstrap/${UUID}?recursive=true"
|
||||
```
|
||||
|
||||
```
|
||||
{
|
||||
"action": "get",
|
||||
"node": {
|
||||
"createdIndex": 11,
|
||||
"dir": true,
|
||||
"key": "/bootstrap/9D4258A5-A1D3-4074-8837-31C1E091131D",
|
||||
"modifiedIndex": 11,
|
||||
"nodes": [
|
||||
{
|
||||
"createdIndex": 16,
|
||||
"expiration": "2014-02-03T13:19:57.631253589-08:00",
|
||||
"key": "/bootstrap/9D4258A5-A1D3-4074-8837-31C1E091131D/peer1",
|
||||
"modifiedIndex": 16,
|
||||
"ttl": 604765,
|
||||
"value": "127.0.0.1:7001"
|
||||
},
|
||||
{
|
||||
"createdIndex": 17,
|
||||
"expiration": "2014-02-03T13:19:57.631253589-08:00",
|
||||
"key": "/bootstrap/9D4258A5-A1D3-4074-8837-31C1E091131D/peer2",
|
||||
"modifiedIndex": 17,
|
||||
"ttl": 604765,
|
||||
"value": "127.0.0.1:7002"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Using this information you can connect to the rest of the peers in the cluster.
|
||||
|
||||
### Heartbeating
|
||||
|
||||
At this point you will want to heartbeat your registration URL every few hours. This will be done via a Go routine inside of etcd.
|
@ -19,7 +19,7 @@ configuration files.
|
||||
### Optional
|
||||
|
||||
* `-addr` - The advertised public hostname:port for client communication. Defaults to `127.0.0.1:4001`.
|
||||
* `-bootstrap-url` - A URL to use for bootstrapping the peer list. (i.e `"https://bootstrap.etcd.io/unique-key"`).
|
||||
* `-discovery` - A URL to use for discovering the peer list. (i.e `"https://discovery.etcd.io/your-unique-key"`).
|
||||
* `-bind-addr` - The listening hostname for client communication. Defaults to advertised ip.
|
||||
* `-peers` - A comma separated list of peers in the cluster (i.e `"203.0.113.101:7001,203.0.113.102:7001"`).
|
||||
* `-peers-file` - The file path containing a comma separated list of peers in the cluster.
|
||||
|
82
Documentation/discovery-protocol.md
Normal file
82
Documentation/discovery-protocol.md
Normal file
@ -0,0 +1,82 @@
|
||||
# Discovery Protocol
|
||||
|
||||
Starting an etcd cluster initially can be painful since each machine needs to know of at least one live machine in the cluster. If you are trying to bring up a cluster all at once, say using an AWS cloud formation, you also need to coordinate who will be the initial cluster leader. The discovery protocol helps you by providing a way to discover the peers in a new etcd cluster using another already running etcd cluster.
|
||||
|
||||
To use this protocol you add the command line flag `-discovery` to your etcd args. In this example we will use `http://example.com/v2/keys/_etcd/registry` as the URL prefix.
|
||||
|
||||
## The Protocol
|
||||
|
||||
By convention the etcd discovery protocol uses the key prefix `_etcd/registry`. A full URL to the keyspace will be `http://example.com/v2/keys/_etcd/registry`.
|
||||
|
||||
## Creating a New Cluster
|
||||
|
||||
Generate a unique token that will identify the new cluster and create a key called "_state". If you get a `201 Created` back then your key is unused and you can proceed with cluster creation. If the return value is `412 Precondition Failed` then you will need to create a new token.
|
||||
|
||||
```
|
||||
UUID=$(uuidgen)
|
||||
curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/_state?prevExist=false" -d value=init
|
||||
```
|
||||
|
||||
## Bringing up Machines
|
||||
|
||||
Now that you have your cluster ID you can start bringing up machines. Every machine will follow this protocol internally in etcd if given a `-discovery`.
|
||||
|
||||
### Registering your Machine
|
||||
|
||||
The first thing etcd must do is register your machine. This is done by using the machine name (from the `-name` arg) and posting it with a long TTL to the given key.
|
||||
|
||||
```
|
||||
curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/${etcd_machine_name}?ttl=604800" -d value=${peer_addr}
|
||||
```
|
||||
|
||||
### Figuring out your Peers
|
||||
|
||||
Now that this etcd machine is registered it must discover its peers.
|
||||
|
||||
But, the tricky bit of starting a new cluster is that one machine needs to assume the initial role of leader and will have no peers. To figure out if another machine has already started the cluster etcd needs to update the `_state` key from "init" to "started":
|
||||
|
||||
```
|
||||
curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/_state?prevValue=init" -d value=started
|
||||
```
|
||||
|
||||
If this returns a `200 OK` response then this machine is the initial leader and should start with no peers configured. If, however, this returns a `412 Precondition Failed` then you need to find all of the registered peers:
|
||||
|
||||
```
|
||||
curl -X GET "http://example.com/v2/keys/_etcd/registry/${UUID}?recursive=true"
|
||||
```
|
||||
|
||||
```
|
||||
{
|
||||
"action": "get",
|
||||
"node": {
|
||||
"createdIndex": 11,
|
||||
"dir": true,
|
||||
"key": "/_etcd/registry/9D4258A5-A1D3-4074-8837-31C1E091131D",
|
||||
"modifiedIndex": 11,
|
||||
"nodes": [
|
||||
{
|
||||
"createdIndex": 16,
|
||||
"expiration": "2014-02-03T13:19:57.631253589-08:00",
|
||||
"key": "/_etcd/registry/9D4258A5-A1D3-4074-8837-31C1E091131D/peer1",
|
||||
"modifiedIndex": 16,
|
||||
"ttl": 604765,
|
||||
"value": "127.0.0.1:7001"
|
||||
},
|
||||
{
|
||||
"createdIndex": 17,
|
||||
"expiration": "2014-02-03T13:19:57.631253589-08:00",
|
||||
"key": "/_etcd/registry/9D4258A5-A1D3-4074-8837-31C1E091131D/peer2",
|
||||
"modifiedIndex": 17,
|
||||
"ttl": 604765,
|
||||
"value": "127.0.0.1:7002"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Using this information you can connect to the rest of the peers in the cluster.
|
||||
|
||||
### Heartbeating
|
||||
|
||||
At this point you will want to heartbeat your registration URL every few hours. This will be done via a Go routine inside of etcd.
|
@ -1,5 +0,0 @@
|
||||
package bootstrap
|
||||
|
||||
func Do(c string) error {
|
||||
panic(c)
|
||||
}
|
@ -15,10 +15,10 @@ import (
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/BurntSushi/toml"
|
||||
|
||||
"github.com/coreos/etcd/bootstrap"
|
||||
"github.com/coreos/etcd/discovery"
|
||||
"github.com/coreos/etcd/log"
|
||||
"github.com/coreos/etcd/server"
|
||||
ustrings "github.com/coreos/etcd/pkg/strings"
|
||||
"github.com/coreos/etcd/server"
|
||||
)
|
||||
|
||||
// The default location for the etcd configuration file.
|
||||
@ -52,12 +52,12 @@ type Config struct {
|
||||
|
||||
Addr string `toml:"addr" env:"ETCD_ADDR"`
|
||||
BindAddr string `toml:"bind_addr" env:"ETCD_BIND_ADDR"`
|
||||
BootstrapURL string `toml:"bootstrap_url" env:"ETCD_BOOTSTRAP_URL"`
|
||||
CAFile string `toml:"ca_file" env:"ETCD_CA_FILE"`
|
||||
CertFile string `toml:"cert_file" env:"ETCD_CERT_FILE"`
|
||||
CPUProfileFile string
|
||||
CorsOrigins []string `toml:"cors" env:"ETCD_CORS"`
|
||||
DataDir string `toml:"data_dir" env:"ETCD_DATA_DIR"`
|
||||
Discovery string `toml:"discovery" env:"ETCD_DISCOVERY"`
|
||||
Force bool
|
||||
KeyFile string `toml:"key_file" env:"ETCD_KEY_FILE"`
|
||||
Peers []string `toml:"peers" env:"ETCD_PEERS"`
|
||||
@ -138,17 +138,30 @@ func (c *Config) Load(arguments []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.BootstrapURL != "" {
|
||||
if err := bootstrap.Do(c.BootstrapURL); err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Sanitize all the input fields.
|
||||
if err := c.Sanitize(); err != nil {
|
||||
return fmt.Errorf("sanitize: %v", err)
|
||||
}
|
||||
|
||||
// Attempt cluster discovery
|
||||
if c.Discovery != "" {
|
||||
p, err := discovery.Do(c.Discovery, c.Name, c.Peer.Addr)
|
||||
if err != nil {
|
||||
log.Fatalf("Bootstrapping encountered an unexpected error: %v", err)
|
||||
}
|
||||
|
||||
for i := range p {
|
||||
// Strip the scheme off of the peer if it has one
|
||||
// TODO(bp): clean this up!
|
||||
purl, err := url.Parse(p[i])
|
||||
if err == nil {
|
||||
p[i] = purl.Host
|
||||
}
|
||||
}
|
||||
|
||||
c.Peers = p
|
||||
}
|
||||
|
||||
// Force remove server configuration if specified.
|
||||
if c.Force {
|
||||
c.Reset()
|
||||
@ -236,7 +249,7 @@ func (c *Config) LoadFlags(arguments []string) error {
|
||||
|
||||
f.StringVar(&c.Name, "name", c.Name, "")
|
||||
f.StringVar(&c.Addr, "addr", c.Addr, "")
|
||||
f.StringVar(&c.BootstrapURL, "bootstrap-url", c.BootstrapURL, "")
|
||||
f.StringVar(&c.Discovery, "discovery", c.Discovery, "")
|
||||
f.StringVar(&c.BindAddr, "bind-addr", c.BindAddr, "")
|
||||
f.StringVar(&c.Peer.Addr, "peer-addr", c.Peer.Addr, "")
|
||||
f.StringVar(&c.Peer.BindAddr, "peer-bind-addr", c.Peer.BindAddr, "")
|
||||
|
@ -18,6 +18,7 @@ func TestConfigTOML(t *testing.T) {
|
||||
cors = ["*"]
|
||||
cpu_profile_file = "XXX"
|
||||
data_dir = "/tmp/data"
|
||||
discovery = "http://example.com/foobar"
|
||||
key_file = "/tmp/file.key"
|
||||
bind_addr = "127.0.0.1:4003"
|
||||
peers = ["coreos.com:4001", "coreos.com:4002"]
|
||||
@ -37,7 +38,7 @@ func TestConfigTOML(t *testing.T) {
|
||||
key_file = "/tmp/peer/file.key"
|
||||
bind_addr = "127.0.0.1:7003"
|
||||
`
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
_, err := toml.Decode(content, &c)
|
||||
assert.Nil(t, err, "")
|
||||
assert.Equal(t, c.Addr, "127.0.0.1:4002", "")
|
||||
@ -45,6 +46,7 @@ func TestConfigTOML(t *testing.T) {
|
||||
assert.Equal(t, c.CertFile, "/tmp/file.cert", "")
|
||||
assert.Equal(t, c.CorsOrigins, []string{"*"}, "")
|
||||
assert.Equal(t, c.DataDir, "/tmp/data", "")
|
||||
assert.Equal(t, c.Discovery, "http://example.com/foobar", "")
|
||||
assert.Equal(t, c.KeyFile, "/tmp/file.key", "")
|
||||
assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
|
||||
assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
|
||||
@ -70,6 +72,7 @@ func TestConfigEnv(t *testing.T) {
|
||||
os.Setenv("ETCD_CPU_PROFILE_FILE", "XXX")
|
||||
os.Setenv("ETCD_CORS", "localhost:4001,localhost:4002")
|
||||
os.Setenv("ETCD_DATA_DIR", "/tmp/data")
|
||||
os.Setenv("ETCD_DISCOVERY", "http://example.com/foobar")
|
||||
os.Setenv("ETCD_KEY_FILE", "/tmp/file.key")
|
||||
os.Setenv("ETCD_BIND_ADDR", "127.0.0.1:4003")
|
||||
os.Setenv("ETCD_PEERS", "coreos.com:4001,coreos.com:4002")
|
||||
@ -87,12 +90,13 @@ func TestConfigEnv(t *testing.T) {
|
||||
os.Setenv("ETCD_PEER_KEY_FILE", "/tmp/peer/file.key")
|
||||
os.Setenv("ETCD_PEER_BIND_ADDR", "127.0.0.1:7003")
|
||||
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
c.LoadEnv()
|
||||
assert.Equal(t, c.CAFile, "/tmp/file.ca", "")
|
||||
assert.Equal(t, c.CertFile, "/tmp/file.cert", "")
|
||||
assert.Equal(t, c.CorsOrigins, []string{"localhost:4001", "localhost:4002"}, "")
|
||||
assert.Equal(t, c.DataDir, "/tmp/data", "")
|
||||
assert.Equal(t, c.Discovery, "http://example.com/foobar", "")
|
||||
assert.Equal(t, c.KeyFile, "/tmp/file.key", "")
|
||||
assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
|
||||
assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
|
||||
@ -113,35 +117,35 @@ func TestConfigEnv(t *testing.T) {
|
||||
|
||||
// Ensures that the "help" flag can be parsed.
|
||||
func TestConfigHelpFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-help"}), "")
|
||||
assert.True(t, c.ShowHelp)
|
||||
}
|
||||
|
||||
// Ensures that the abbreviated "help" flag can be parsed.
|
||||
func TestConfigAbbreviatedHelpFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-h"}), "")
|
||||
assert.True(t, c.ShowHelp)
|
||||
}
|
||||
|
||||
// Ensures that the "version" flag can be parsed.
|
||||
func TestConfigVersionFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-version"}), "")
|
||||
assert.True(t, c.ShowVersion)
|
||||
}
|
||||
|
||||
// Ensures that the "force config" flag can be parsed.
|
||||
func TestConfigForceFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-force"}), "")
|
||||
assert.True(t, c.Force)
|
||||
}
|
||||
|
||||
// Ensures that the abbreviated "force config" flag can be parsed.
|
||||
func TestConfigAbbreviatedForceFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-f"}), "")
|
||||
assert.True(t, c.Force)
|
||||
}
|
||||
@ -156,7 +160,7 @@ func TestConfigAddrEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the advertised flag can be parsed.
|
||||
func TestConfigAddrFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4002"}), "")
|
||||
assert.Equal(t, c.Addr, "127.0.0.1:4002", "")
|
||||
}
|
||||
@ -171,7 +175,7 @@ func TestConfigCAFileEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the CA file flag can be parsed.
|
||||
func TestConfigCAFileFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-ca-file", "/tmp/file.ca"}), "")
|
||||
assert.Equal(t, c.CAFile, "/tmp/file.ca", "")
|
||||
}
|
||||
@ -186,7 +190,7 @@ func TestConfigCertFileEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Cert file flag can be parsed.
|
||||
func TestConfigCertFileFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-cert-file", "/tmp/file.cert"}), "")
|
||||
assert.Equal(t, c.CertFile, "/tmp/file.cert", "")
|
||||
}
|
||||
@ -201,7 +205,7 @@ func TestConfigKeyFileEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Key file flag can be parsed.
|
||||
func TestConfigKeyFileFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-key-file", "/tmp/file.key"}), "")
|
||||
assert.Equal(t, c.KeyFile, "/tmp/file.key", "")
|
||||
}
|
||||
@ -216,14 +220,14 @@ func TestConfigBindAddrEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Listen Host file flag can be parsed.
|
||||
func TestConfigBindAddrFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-bind-addr", "127.0.0.1:4003"}), "")
|
||||
assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
|
||||
}
|
||||
|
||||
// Ensures that a the Listen Host port overrides the advertised port
|
||||
func TestConfigBindAddrOverride(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", "127.0.0.1:4010"}), "")
|
||||
assert.Nil(t, c.Sanitize())
|
||||
assert.Equal(t, c.BindAddr, "127.0.0.1:4010", "")
|
||||
@ -231,7 +235,7 @@ func TestConfigBindAddrOverride(t *testing.T) {
|
||||
|
||||
// Ensures that a the Listen Host inherits its port from the advertised addr
|
||||
func TestConfigBindAddrInheritPort(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", "127.0.0.1"}), "")
|
||||
assert.Nil(t, c.Sanitize())
|
||||
assert.Equal(t, c.BindAddr, "127.0.0.1:4009", "")
|
||||
@ -239,7 +243,7 @@ func TestConfigBindAddrInheritPort(t *testing.T) {
|
||||
|
||||
// Ensures that a port only argument errors out
|
||||
func TestConfigBindAddrErrorOnNoHost(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", ":4010"}), "")
|
||||
assert.Error(t, c.Sanitize())
|
||||
}
|
||||
@ -254,7 +258,7 @@ func TestConfigPeersEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Peers flag can be parsed.
|
||||
func TestConfigPeersFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-peers", "coreos.com:4001,coreos.com:4002"}), "")
|
||||
assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
|
||||
}
|
||||
@ -269,7 +273,7 @@ func TestConfigPeersFileEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Peers File flag can be parsed.
|
||||
func TestConfigPeersFileFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-peers-file", "/tmp/peers"}), "")
|
||||
assert.Equal(t, c.PeersFile, "/tmp/peers", "")
|
||||
}
|
||||
@ -284,7 +288,7 @@ func TestConfigMaxClusterSizeEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Max Cluster Size flag can be parsed.
|
||||
func TestConfigMaxClusterSizeFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-max-cluster-size", "5"}), "")
|
||||
assert.Equal(t, c.MaxClusterSize, 5, "")
|
||||
}
|
||||
@ -299,7 +303,7 @@ func TestConfigMaxResultBufferEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Max Result Buffer flag can be parsed.
|
||||
func TestConfigMaxResultBufferFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-max-result-buffer", "512"}), "")
|
||||
assert.Equal(t, c.MaxResultBuffer, 512, "")
|
||||
}
|
||||
@ -314,7 +318,7 @@ func TestConfigMaxRetryAttemptsEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Max Retry Attempts flag can be parsed.
|
||||
func TestConfigMaxRetryAttemptsFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-max-retry-attempts", "10"}), "")
|
||||
assert.Equal(t, c.MaxRetryAttempts, 10, "")
|
||||
}
|
||||
@ -329,14 +333,14 @@ func TestConfigNameEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Name flag can be parsed.
|
||||
func TestConfigNameFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-name", "test-name"}), "")
|
||||
assert.Equal(t, c.Name, "test-name", "")
|
||||
}
|
||||
|
||||
// Ensures that a Name gets guessed if not specified
|
||||
func TestConfigNameGuess(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{}), "")
|
||||
assert.Nil(t, c.Sanitize())
|
||||
name, _ := os.Hostname()
|
||||
@ -345,7 +349,7 @@ func TestConfigNameGuess(t *testing.T) {
|
||||
|
||||
// Ensures that a DataDir gets guessed if not specified
|
||||
func TestConfigDataDirGuess(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{}), "")
|
||||
assert.Nil(t, c.Sanitize())
|
||||
name, _ := os.Hostname()
|
||||
@ -362,7 +366,7 @@ func TestConfigSnapshotEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Snapshot flag can be parsed.
|
||||
func TestConfigSnapshotFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-snapshot"}), "")
|
||||
assert.Equal(t, c.Snapshot, true, "")
|
||||
}
|
||||
@ -377,7 +381,7 @@ func TestConfigVerboseEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Verbose flag can be parsed.
|
||||
func TestConfigVerboseFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-v"}), "")
|
||||
assert.Equal(t, c.Verbose, true, "")
|
||||
}
|
||||
@ -392,7 +396,7 @@ func TestConfigVeryVerboseEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Very Verbose flag can be parsed.
|
||||
func TestConfigVeryVerboseFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-vv"}), "")
|
||||
assert.Equal(t, c.VeryVerbose, true, "")
|
||||
}
|
||||
@ -407,7 +411,7 @@ func TestConfigPeerAddrEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Peer Advertised URL flag can be parsed.
|
||||
func TestConfigPeerAddrFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-peer-addr", "localhost:7002"}), "")
|
||||
assert.Equal(t, c.Peer.Addr, "localhost:7002", "")
|
||||
}
|
||||
@ -422,7 +426,7 @@ func TestConfigPeerCAFileEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Peer CA file flag can be parsed.
|
||||
func TestConfigPeerCAFileFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-peer-ca-file", "/tmp/peer/file.ca"}), "")
|
||||
assert.Equal(t, c.Peer.CAFile, "/tmp/peer/file.ca", "")
|
||||
}
|
||||
@ -437,7 +441,7 @@ func TestConfigPeerCertFileEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Cert file flag can be parsed.
|
||||
func TestConfigPeerCertFileFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-peer-cert-file", "/tmp/peer/file.cert"}), "")
|
||||
assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "")
|
||||
}
|
||||
@ -452,7 +456,7 @@ func TestConfigPeerKeyFileEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a the Peer Key file flag can be parsed.
|
||||
func TestConfigPeerKeyFileFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-peer-key-file", "/tmp/peer/file.key"}), "")
|
||||
assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "")
|
||||
}
|
||||
@ -467,7 +471,7 @@ func TestConfigPeerBindAddrEnv(t *testing.T) {
|
||||
|
||||
// Ensures that a bad flag returns an error.
|
||||
func TestConfigBadFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-no-such-flag"})
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, err.Error(), `flag provided but not defined: -no-such-flag`)
|
||||
@ -475,7 +479,7 @@ func TestConfigBadFlag(t *testing.T) {
|
||||
|
||||
// Ensures that a the Peer Listen Host file flag can be parsed.
|
||||
func TestConfigPeerBindAddrFlag(t *testing.T) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
assert.Nil(t, c.LoadFlags([]string{"-peer-bind-addr", "127.0.0.1:4003"}), "")
|
||||
assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:4003", "")
|
||||
}
|
||||
@ -486,7 +490,7 @@ func TestConfigCustomConfigOverrideSystemConfig(t *testing.T) {
|
||||
custom := `addr = "127.0.0.1:6000"`
|
||||
withTempFile(system, func(p1 string) {
|
||||
withTempFile(custom, func(p2 string) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
c.SystemPath = p1
|
||||
assert.Nil(t, c.Load([]string{"-config", p2}), "")
|
||||
assert.Equal(t, c.Addr, "http://127.0.0.1:6000", "")
|
||||
@ -501,7 +505,7 @@ func TestConfigEnvVarOverrideCustomConfig(t *testing.T) {
|
||||
|
||||
custom := `[peer]` + "\n" + `advertised_url = "127.0.0.1:9000"`
|
||||
withTempFile(custom, func(path string) {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
c.SystemPath = ""
|
||||
assert.Nil(t, c.Load([]string{"-config", path}), "")
|
||||
assert.Equal(t, c.Peer.Addr, "http://127.0.0.1:8000", "")
|
||||
@ -513,7 +517,7 @@ func TestConfigCLIArgsOverrideEnvVar(t *testing.T) {
|
||||
os.Setenv("ETCD_ADDR", "127.0.0.1:1000")
|
||||
defer os.Setenv("ETCD_ADDR", "")
|
||||
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
c.SystemPath = ""
|
||||
assert.Nil(t, c.Load([]string{"-addr", "127.0.0.1:2000"}), "")
|
||||
assert.Equal(t, c.Addr, "http://127.0.0.1:2000", "")
|
||||
@ -525,7 +529,7 @@ func TestConfigCLIArgsOverrideEnvVar(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedAddrFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-c", "127.0.0.1:4002"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.Addr, "127.0.0.1:4002")
|
||||
@ -535,7 +539,7 @@ func TestConfigDeprecatedAddrFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedBindAddrFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-cl", "127.0.0.1:4003"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
|
||||
@ -545,7 +549,7 @@ func TestConfigDeprecatedBindAddrFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedCAFileFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-clientCAFile", "/tmp/file.ca"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.CAFile, "/tmp/file.ca", "")
|
||||
@ -555,7 +559,7 @@ func TestConfigDeprecatedCAFileFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedCertFileFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-clientCert", "/tmp/file.cert"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.CertFile, "/tmp/file.cert", "")
|
||||
@ -565,7 +569,7 @@ func TestConfigDeprecatedCertFileFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedKeyFileFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-clientKey", "/tmp/file.key"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.KeyFile, "/tmp/file.key", "")
|
||||
@ -575,7 +579,7 @@ func TestConfigDeprecatedKeyFileFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedPeersFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-C", "coreos.com:4001,coreos.com:4002"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
|
||||
@ -585,7 +589,7 @@ func TestConfigDeprecatedPeersFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedPeersFileFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-CF", "/tmp/machines"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.PeersFile, "/tmp/machines", "")
|
||||
@ -595,7 +599,7 @@ func TestConfigDeprecatedPeersFileFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedMaxClusterSizeFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-maxsize", "5"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.MaxClusterSize, 5, "")
|
||||
@ -605,7 +609,7 @@ func TestConfigDeprecatedMaxClusterSizeFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedMaxResultBufferFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-m", "512"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.MaxResultBuffer, 512, "")
|
||||
@ -615,7 +619,7 @@ func TestConfigDeprecatedMaxResultBufferFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedMaxRetryAttemptsFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-r", "10"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.MaxRetryAttempts, 10, "")
|
||||
@ -625,7 +629,7 @@ func TestConfigDeprecatedMaxRetryAttemptsFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedNameFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-n", "test-name"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.Name, "test-name", "")
|
||||
@ -635,7 +639,7 @@ func TestConfigDeprecatedNameFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedPeerAddrFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-s", "localhost:7002"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.Peer.Addr, "localhost:7002", "")
|
||||
@ -645,7 +649,7 @@ func TestConfigDeprecatedPeerAddrFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedPeerBindAddrFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-sl", "127.0.0.1:4003"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:4003", "")
|
||||
@ -655,7 +659,7 @@ func TestConfigDeprecatedPeerBindAddrFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedPeerCAFileFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-serverCAFile", "/tmp/peer/file.ca"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.Peer.CAFile, "/tmp/peer/file.ca", "")
|
||||
@ -665,7 +669,7 @@ func TestConfigDeprecatedPeerCAFileFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedPeerCertFileFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-serverCert", "/tmp/peer/file.cert"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "")
|
||||
@ -675,7 +679,7 @@ func TestConfigDeprecatedPeerCertFileFlag(t *testing.T) {
|
||||
|
||||
func TestConfigDeprecatedPeerKeyFileFlag(t *testing.T) {
|
||||
_, stderr := capture(func() {
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
err := c.LoadFlags([]string{"-serverKey", "/tmp/peer/file.key"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "")
|
||||
@ -691,7 +695,7 @@ func TestConfigDeprecatedPeerKeyFileFlag(t *testing.T) {
|
||||
func withEnv(key, value string, f func(c *Config)) {
|
||||
os.Setenv(key, value)
|
||||
defer os.Setenv(key, "")
|
||||
c := NewConfig()
|
||||
c := New()
|
||||
f(c)
|
||||
}
|
||||
|
||||
|
139
discovery/discovery.go
Normal file
139
discovery/discovery.go
Normal file
@ -0,0 +1,139 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/log"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
const (
|
||||
stateKey = "_state"
|
||||
initState = "init"
|
||||
startedState = "started"
|
||||
defaultTTL = 604800 // One week TTL
|
||||
)
|
||||
|
||||
type Discoverer struct {
|
||||
client *etcd.Client
|
||||
name string
|
||||
peer string
|
||||
prefix string
|
||||
discoveryURL string
|
||||
}
|
||||
|
||||
var defaultDiscoverer *Discoverer
|
||||
|
||||
func init() {
|
||||
defaultDiscoverer = &Discoverer{}
|
||||
}
|
||||
|
||||
func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []string, err error) {
|
||||
d.name = name
|
||||
d.peer = peer
|
||||
d.discoveryURL = discoveryURL
|
||||
|
||||
u, err := url.Parse(discoveryURL)
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// prefix is appended to all keys
|
||||
d.prefix = strings.TrimPrefix(u.Path, "/v2/keys/")
|
||||
|
||||
// Connect to a scheme://host not a full URL with path
|
||||
u.Path = ""
|
||||
log.Infof("Bootstrapping via %s using prefix %s.", u.String(), d.prefix)
|
||||
d.client = etcd.NewClient([]string{u.String()})
|
||||
|
||||
// Register this machine first and announce that we are a member of
|
||||
// this cluster
|
||||
err = d.heartbeat()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Start the very slow heartbeat to the cluster now in anticipation
|
||||
// that everything is going to go alright now
|
||||
go d.startHeartbeat()
|
||||
|
||||
// Attempt to take the leadership role, if there is no error we are it!
|
||||
resp, err := d.client.CompareAndSwap(path.Join(d.prefix, stateKey), startedState, 0, initState, 0)
|
||||
|
||||
// Bail out on unexpected errors
|
||||
if err != nil {
|
||||
if etcdErr, ok := err.(etcd.EtcdError); !ok || etcdErr.ErrorCode != 101 {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// If we got a response then the CAS was successful, we are leader
|
||||
if resp != nil && resp.Node.Value == startedState {
|
||||
// We are the leader, we have no peers
|
||||
log.Infof("Bootstrapping was in 'init' state this machine is the initial leader.")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Fall through to finding the other discoveryped peers
|
||||
return d.findPeers()
|
||||
}
|
||||
|
||||
func (d *Discoverer) findPeers() (peers []string, err error) {
|
||||
resp, err := d.client.Get(path.Join(d.prefix), false, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
node := resp.Node
|
||||
|
||||
if node == nil {
|
||||
return nil, errors.New(fmt.Sprintf("%s key doesn't exist.", d.prefix))
|
||||
}
|
||||
|
||||
for _, n := range node.Nodes {
|
||||
// Skip our own entry in the list, there is no point
|
||||
if strings.HasSuffix(n.Key, "/"+d.name) {
|
||||
continue
|
||||
}
|
||||
peers = append(peers, n.Value)
|
||||
}
|
||||
|
||||
if len(peers) == 0 {
|
||||
return nil, errors.New("No peers found.")
|
||||
}
|
||||
|
||||
log.Infof("Bootstrap found peers %v", peers)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (d *Discoverer) startHeartbeat() {
|
||||
// In case of errors we should attempt to heartbeat fairly frequently
|
||||
heartbeatInterval := defaultTTL / 8
|
||||
ticker := time.Tick(time.Second * time.Duration(heartbeatInterval))
|
||||
for {
|
||||
select {
|
||||
case <-ticker:
|
||||
err := d.heartbeat()
|
||||
if err != nil {
|
||||
log.Warnf("Bootstrapping heartbeat failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Discoverer) heartbeat() error {
|
||||
_, err := d.client.Set(path.Join(d.prefix, d.name), d.peer, defaultTTL)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func Do(discoveryURL string, name string, peer string) ([]string, error) {
|
||||
return defaultDiscoverer.Do(discoveryURL, name, peer)
|
||||
}
|
@ -209,7 +209,7 @@ func (s *PeerServer) startAsFollower(cluster []string) {
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
log.Warnf("cannot join to cluster via given peers, retry in %d seconds", retryInterval)
|
||||
log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %d seconds", cluster, retryInterval)
|
||||
time.Sleep(time.Second * retryInterval)
|
||||
}
|
||||
|
||||
@ -266,17 +266,18 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
|
||||
|
||||
err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme)
|
||||
if err == nil {
|
||||
log.Debugf("%s success join to the cluster via peer %s", s.Config.Name, peer)
|
||||
log.Debugf("%s joined the cluster via peer %s", s.Config.Name, peer)
|
||||
return true
|
||||
|
||||
} else {
|
||||
if _, ok := err.(etcdErr.Error); ok {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
log.Debugf("cannot join to cluster via peer %s %s", peer, err)
|
||||
}
|
||||
|
||||
if _, ok := err.(etcdErr.Error); ok {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
log.Warnf("Attempt to join via %s failed: %s", peer, err)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user