Merge pull request #512 from philips/bootstrap-protocol

feat(discovery): initial working code
This commit is contained in:
Brandon Philips
2014-02-05 09:27:52 -08:00
31 changed files with 1226 additions and 491 deletions

View File

@@ -19,6 +19,7 @@ configuration files.
### Optional
* `-addr` - The advertised public hostname:port for client communication. Defaults to `127.0.0.1:4001`.
* `-discovery` - A URL to use for discovering the peer list. (i.e `"https://discovery.etcd.io/your-unique-key"`).
* `-bind-addr` - The listening hostname for client communication. Defaults to advertised ip.
* `-peers` - A comma separated list of peers in the cluster (i.e `"203.0.113.101:7001,203.0.113.102:7001"`).
* `-peers-file` - The file path containing a comma separated list of peers in the cluster.

View File

@@ -0,0 +1,88 @@
# Discovery Protocol
Starting a new etcd cluster can be painful since each machine needs to know of at least one live machine in the cluster. If you are trying to bring up a new cluster all at once, say using an AWS cloud formation, you also need to coordinate who will be the initial cluster leader. The discovery protocol uses an existing running etcd cluster to start a second etcd cluster.
To use this feature you add the command line flag `-discovery` to your etcd args. In this example we will use `http://example.com/v2/keys/_etcd/registry` as the URL prefix.
## The Protocol
By convention the etcd discovery protocol uses the key prefix `_etcd/registry`. A full URL to the keyspace will be `http://example.com/v2/keys/_etcd/registry`.
### Creating a New Cluster
Generate a unique token that will identify the new cluster and create a key called "_state". If you get a `201 Created` back then your key is unused and you can proceed with cluster creation. If the return value is `412 Precondition Failed` then you will need to create a new token.
```
UUID=$(uuidgen)
curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/_state?prevExist=false" -d value=init
```
### Bringing up Machines
Now that you have your cluster ID you can start bringing up machines. Every machine will follow this protocol internally in etcd if given a `-discovery`.
### Registering your Machine
The first thing etcd must do is register your machine. This is done by using the machine name (from the `-name` arg) and posting it with a long TTL to the given key.
```
curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/${etcd_machine_name}?ttl=604800" -d value=${peer_addr}
```
### Discovering Peers
Now that this etcd machine is registered it must discover its peers.
But, the tricky bit of starting a new cluster is that one machine needs to assume the initial role of leader and will have no peers. To figure out if another machine has already started the cluster etcd needs to update the `_state` key from "init" to "started":
```
curl -X PUT "http://example.com/v2/keys/_etcd/registry/${UUID}/_state?prevValue=init" -d value=started
```
If this returns a `200 OK` response then this machine is the initial leader and should start with no peers configured. If, however, this returns a `412 Precondition Failed` then you need to find all of the registered peers:
```
curl -X GET "http://example.com/v2/keys/_etcd/registry/${UUID}?recursive=true"
```
```
{
"action": "get",
"node": {
"createdIndex": 11,
"dir": true,
"key": "/_etcd/registry/9D4258A5-A1D3-4074-8837-31C1E091131D",
"modifiedIndex": 11,
"nodes": [
{
"createdIndex": 16,
"expiration": "2014-02-03T13:19:57.631253589-08:00",
"key": "/_etcd/registry/9D4258A5-A1D3-4074-8837-31C1E091131D/peer1",
"modifiedIndex": 16,
"ttl": 604765,
"value": "127.0.0.1:7001"
},
{
"createdIndex": 17,
"expiration": "2014-02-03T13:19:57.631253589-08:00",
"key": "/_etcd/registry/9D4258A5-A1D3-4074-8837-31C1E091131D/peer2",
"modifiedIndex": 17,
"ttl": 604765,
"value": "127.0.0.1:7002"
}
]
}
}
```
Using this information you can connect to the rest of the peers in the cluster.
### Heartbeating
At this point etcd will start heart beating to your registration URL. The
protocol uses a heartbeat so permanently deleted nodes get slowly removed from
the discovery information cluster.
The heartbeat interval is about once per day and the TTL is one week. This
should give a sufficiently wide window to protect against a discovery service
taking a temporary outage yet provide adequate cleanup.

View File

@@ -1,4 +1,4 @@
package server
package config
import (
"encoding/json"
@@ -14,7 +14,11 @@ import (
"strings"
"github.com/coreos/etcd/third_party/github.com/BurntSushi/toml"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/log"
ustrings "github.com/coreos/etcd/pkg/strings"
"github.com/coreos/etcd/server"
)
// The default location for the etcd configuration file.
@@ -22,67 +26,68 @@ const DefaultSystemConfigPath = "/etc/etcd/etcd.conf"
// A lookup of deprecated flags to their new flag name.
var newFlagNameLookup = map[string]string{
"C": "peers",
"CF": "peers-file",
"n": "name",
"c": "addr",
"cl": "bind-addr",
"s": "peer-addr",
"sl": "peer-bind-addr",
"d": "data-dir",
"m": "max-result-buffer",
"r": "max-retry-attempts",
"maxsize": "max-cluster-size",
"clientCAFile": "ca-file",
"clientCert": "cert-file",
"clientKey": "key-file",
"serverCAFile": "peer-ca-file",
"serverCert": "peer-cert-file",
"serverKey": "peer-key-file",
"snapshotCount": "snapshot-count",
"C": "peers",
"CF": "peers-file",
"n": "name",
"c": "addr",
"cl": "bind-addr",
"s": "peer-addr",
"sl": "peer-bind-addr",
"d": "data-dir",
"m": "max-result-buffer",
"r": "max-retry-attempts",
"maxsize": "max-cluster-size",
"clientCAFile": "ca-file",
"clientCert": "cert-file",
"clientKey": "key-file",
"serverCAFile": "peer-ca-file",
"serverCert": "peer-cert-file",
"serverKey": "peer-key-file",
"snapshotCount": "snapshot-count",
}
// Config represents the server configuration.
type Config struct {
SystemPath string
SystemPath string
Addr string `toml:"addr" env:"ETCD_ADDR"`
BindAddr string `toml:"bind_addr" env:"ETCD_BIND_ADDR"`
CAFile string `toml:"ca_file" env:"ETCD_CA_FILE"`
CertFile string `toml:"cert_file" env:"ETCD_CERT_FILE"`
CPUProfileFile string
CorsOrigins []string `toml:"cors" env:"ETCD_CORS"`
DataDir string `toml:"data_dir" env:"ETCD_DATA_DIR"`
Force bool
KeyFile string `toml:"key_file" env:"ETCD_KEY_FILE"`
Peers []string `toml:"peers" env:"ETCD_PEERS"`
PeersFile string `toml:"peers_file" env:"ETCD_PEERS_FILE"`
MaxClusterSize int `toml:"max_cluster_size" env:"ETCD_MAX_CLUSTER_SIZE"`
MaxResultBuffer int `toml:"max_result_buffer" env:"ETCD_MAX_RESULT_BUFFER"`
MaxRetryAttempts int `toml:"max_retry_attempts" env:"ETCD_MAX_RETRY_ATTEMPTS"`
Name string `toml:"name" env:"ETCD_NAME"`
Snapshot bool `toml:"snapshot" env:"ETCD_SNAPSHOT"`
SnapshotCount int `toml:"snapshot_count" env:"ETCD_SNAPSHOTCOUNT"`
ShowHelp bool
ShowVersion bool
Verbose bool `toml:"verbose" env:"ETCD_VERBOSE"`
VeryVerbose bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"`
VeryVeryVerbose bool `toml:"very_very_verbose" env:"ETCD_VERY_VERY_VERBOSE"`
Peer struct {
Addr string `toml:"addr" env:"ETCD_PEER_ADDR"`
BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"`
CAFile string `toml:"ca_file" env:"ETCD_PEER_CA_FILE"`
CertFile string `toml:"cert_file" env:"ETCD_PEER_CERT_FILE"`
KeyFile string `toml:"key_file" env:"ETCD_PEER_KEY_FILE"`
HeartbeatTimeout int `toml:"heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
ElectionTimeout int `toml:"election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
Addr string `toml:"addr" env:"ETCD_ADDR"`
BindAddr string `toml:"bind_addr" env:"ETCD_BIND_ADDR"`
CAFile string `toml:"ca_file" env:"ETCD_CA_FILE"`
CertFile string `toml:"cert_file" env:"ETCD_CERT_FILE"`
CPUProfileFile string
CorsOrigins []string `toml:"cors" env:"ETCD_CORS"`
DataDir string `toml:"data_dir" env:"ETCD_DATA_DIR"`
Discovery string `toml:"discovery" env:"ETCD_DISCOVERY"`
Force bool
KeyFile string `toml:"key_file" env:"ETCD_KEY_FILE"`
Peers []string `toml:"peers" env:"ETCD_PEERS"`
PeersFile string `toml:"peers_file" env:"ETCD_PEERS_FILE"`
MaxClusterSize int `toml:"max_cluster_size" env:"ETCD_MAX_CLUSTER_SIZE"`
MaxResultBuffer int `toml:"max_result_buffer" env:"ETCD_MAX_RESULT_BUFFER"`
MaxRetryAttempts int `toml:"max_retry_attempts" env:"ETCD_MAX_RETRY_ATTEMPTS"`
Name string `toml:"name" env:"ETCD_NAME"`
Snapshot bool `toml:"snapshot" env:"ETCD_SNAPSHOT"`
SnapshotCount int `toml:"snapshot_count" env:"ETCD_SNAPSHOTCOUNT"`
ShowHelp bool
ShowVersion bool
Verbose bool `toml:"verbose" env:"ETCD_VERBOSE"`
VeryVerbose bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"`
VeryVeryVerbose bool `toml:"very_very_verbose" env:"ETCD_VERY_VERY_VERBOSE"`
Peer struct {
Addr string `toml:"addr" env:"ETCD_PEER_ADDR"`
BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"`
CAFile string `toml:"ca_file" env:"ETCD_PEER_CA_FILE"`
CertFile string `toml:"cert_file" env:"ETCD_PEER_CERT_FILE"`
KeyFile string `toml:"key_file" env:"ETCD_PEER_KEY_FILE"`
HeartbeatTimeout int `toml:"heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
ElectionTimeout int `toml:"election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
}
strTrace string `toml:"trace" env:"ETCD_TRACE"`
GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"`
strTrace string `toml:"trace" env:"ETCD_TRACE"`
GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"`
}
// NewConfig returns a Config initialized with default values.
func NewConfig() *Config {
// New returns a Config initialized with default values.
func New() *Config {
c := new(Config)
c.SystemPath = DefaultSystemConfigPath
c.Addr = "127.0.0.1:4001"
@@ -138,6 +143,13 @@ func (c *Config) Load(arguments []string) error {
return fmt.Errorf("sanitize: %v", err)
}
// Attempt cluster discovery
if c.Discovery != "" {
if err := c.handleDiscovery(); err != nil {
return err
}
}
// Force remove server configuration if specified.
if c.Force {
c.Reset()
@@ -196,12 +208,42 @@ func (c *Config) loadEnv(target interface{}) error {
case reflect.String:
value.Field(i).SetString(v)
case reflect.Slice:
value.Field(i).Set(reflect.ValueOf(trimsplit(v, ",")))
value.Field(i).Set(reflect.ValueOf(ustrings.TrimSplit(v, ",")))
}
}
return nil
}
func (c *Config) handleDiscovery() error {
p, err := discovery.Do(c.Discovery, c.Name, c.Peer.Addr)
// This is fatal, discovery encountered an unexpected error
// and we have no peer list.
if err != nil && len(c.Peers) == 0 {
log.Fatalf("Discovery failed and a backup peer list wasn't provided: %v", err)
return err
}
// Warn about errors coming from discovery, this isn't fatal
// since the user might have provided a peer list elsewhere.
if err != nil {
log.Warnf("Discovery encountered an error but a backup peer list (%v) was provided: %v", c.Peers, err)
}
for i := range p {
// Strip the scheme off of the peer if it has one
// TODO(bp): clean this up!
purl, err := url.Parse(p[i])
if err == nil {
p[i] = purl.Host
}
}
c.Peers = p
return nil
}
// Loads configuration from command line flags.
func (c *Config) LoadFlags(arguments []string) error {
var peers, cors, path string
@@ -225,6 +267,7 @@ func (c *Config) LoadFlags(arguments []string) error {
f.StringVar(&c.Name, "name", c.Name, "")
f.StringVar(&c.Addr, "addr", c.Addr, "")
f.StringVar(&c.Discovery, "discovery", c.Discovery, "")
f.StringVar(&c.BindAddr, "bind-addr", c.BindAddr, "")
f.StringVar(&c.Peer.Addr, "peer-addr", c.Peer.Addr, "")
f.StringVar(&c.Peer.BindAddr, "peer-bind-addr", c.Peer.BindAddr, "")
@@ -291,10 +334,10 @@ func (c *Config) LoadFlags(arguments []string) error {
// Convert some parameters to lists.
if peers != "" {
c.Peers = trimsplit(peers, ",")
c.Peers = ustrings.TrimSplit(peers, ",")
}
if cors != "" {
c.CorsOrigins = trimsplit(cors, ",")
c.CorsOrigins = ustrings.TrimSplit(cors, ",")
}
return nil
@@ -310,7 +353,7 @@ func (c *Config) LoadPeersFile() error {
if err != nil {
return fmt.Errorf("Peers file error: %s", err)
}
c.Peers = trimsplit(string(b), ",")
c.Peers = ustrings.TrimSplit(string(b), ",")
return nil
}
@@ -353,8 +396,8 @@ func (c *Config) Reset() error {
}
// Reads the info file from the file system or initializes it based on the config.
func (c *Config) Info() (*Info, error) {
info := &Info{}
func (c *Config) Info() (*server.Info, error) {
info := &server.Info{}
path := filepath.Join(c.DataDir, "info")
// Open info file and read it out.
@@ -432,30 +475,30 @@ func (c *Config) Sanitize() error {
}
// TLSInfo retrieves a TLSInfo object for the client server.
func (c *Config) TLSInfo() TLSInfo {
return TLSInfo{
CAFile: c.CAFile,
CertFile: c.CertFile,
KeyFile: c.KeyFile,
func (c *Config) TLSInfo() server.TLSInfo {
return server.TLSInfo{
CAFile: c.CAFile,
CertFile: c.CertFile,
KeyFile: c.KeyFile,
}
}
// ClientTLSConfig generates the TLS configuration for the client server.
func (c *Config) TLSConfig() (TLSConfig, error) {
func (c *Config) TLSConfig() (server.TLSConfig, error) {
return c.TLSInfo().Config()
}
// PeerTLSInfo retrieves a TLSInfo object for the peer server.
func (c *Config) PeerTLSInfo() TLSInfo {
return TLSInfo{
CAFile: c.Peer.CAFile,
CertFile: c.Peer.CertFile,
KeyFile: c.Peer.KeyFile,
func (c *Config) PeerTLSInfo() server.TLSInfo {
return server.TLSInfo{
CAFile: c.Peer.CAFile,
CertFile: c.Peer.CertFile,
KeyFile: c.Peer.KeyFile,
}
}
// PeerTLSConfig generates the TLS configuration for the peer server.
func (c *Config) PeerTLSConfig() (TLSConfig, error) {
func (c *Config) PeerTLSConfig() (server.TLSConfig, error) {
return c.PeerTLSInfo().Config()
}

View File

@@ -1,4 +1,4 @@
package server
package config
import (
"io/ioutil"
@@ -18,6 +18,7 @@ func TestConfigTOML(t *testing.T) {
cors = ["*"]
cpu_profile_file = "XXX"
data_dir = "/tmp/data"
discovery = "http://example.com/foobar"
key_file = "/tmp/file.key"
bind_addr = "127.0.0.1:4003"
peers = ["coreos.com:4001", "coreos.com:4002"]
@@ -37,7 +38,7 @@ func TestConfigTOML(t *testing.T) {
key_file = "/tmp/peer/file.key"
bind_addr = "127.0.0.1:7003"
`
c := NewConfig()
c := New()
_, err := toml.Decode(content, &c)
assert.Nil(t, err, "")
assert.Equal(t, c.Addr, "127.0.0.1:4002", "")
@@ -45,6 +46,7 @@ func TestConfigTOML(t *testing.T) {
assert.Equal(t, c.CertFile, "/tmp/file.cert", "")
assert.Equal(t, c.CorsOrigins, []string{"*"}, "")
assert.Equal(t, c.DataDir, "/tmp/data", "")
assert.Equal(t, c.Discovery, "http://example.com/foobar", "")
assert.Equal(t, c.KeyFile, "/tmp/file.key", "")
assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
@@ -70,6 +72,7 @@ func TestConfigEnv(t *testing.T) {
os.Setenv("ETCD_CPU_PROFILE_FILE", "XXX")
os.Setenv("ETCD_CORS", "localhost:4001,localhost:4002")
os.Setenv("ETCD_DATA_DIR", "/tmp/data")
os.Setenv("ETCD_DISCOVERY", "http://example.com/foobar")
os.Setenv("ETCD_KEY_FILE", "/tmp/file.key")
os.Setenv("ETCD_BIND_ADDR", "127.0.0.1:4003")
os.Setenv("ETCD_PEERS", "coreos.com:4001,coreos.com:4002")
@@ -87,12 +90,13 @@ func TestConfigEnv(t *testing.T) {
os.Setenv("ETCD_PEER_KEY_FILE", "/tmp/peer/file.key")
os.Setenv("ETCD_PEER_BIND_ADDR", "127.0.0.1:7003")
c := NewConfig()
c := New()
c.LoadEnv()
assert.Equal(t, c.CAFile, "/tmp/file.ca", "")
assert.Equal(t, c.CertFile, "/tmp/file.cert", "")
assert.Equal(t, c.CorsOrigins, []string{"localhost:4001", "localhost:4002"}, "")
assert.Equal(t, c.DataDir, "/tmp/data", "")
assert.Equal(t, c.Discovery, "http://example.com/foobar", "")
assert.Equal(t, c.KeyFile, "/tmp/file.key", "")
assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
@@ -113,35 +117,35 @@ func TestConfigEnv(t *testing.T) {
// Ensures that the "help" flag can be parsed.
func TestConfigHelpFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-help"}), "")
assert.True(t, c.ShowHelp)
}
// Ensures that the abbreviated "help" flag can be parsed.
func TestConfigAbbreviatedHelpFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-h"}), "")
assert.True(t, c.ShowHelp)
}
// Ensures that the "version" flag can be parsed.
func TestConfigVersionFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-version"}), "")
assert.True(t, c.ShowVersion)
}
// Ensures that the "force config" flag can be parsed.
func TestConfigForceFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-force"}), "")
assert.True(t, c.Force)
}
// Ensures that the abbreviated "force config" flag can be parsed.
func TestConfigAbbreviatedForceFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-f"}), "")
assert.True(t, c.Force)
}
@@ -156,7 +160,7 @@ func TestConfigAddrEnv(t *testing.T) {
// Ensures that a the advertised flag can be parsed.
func TestConfigAddrFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4002"}), "")
assert.Equal(t, c.Addr, "127.0.0.1:4002", "")
}
@@ -171,7 +175,7 @@ func TestConfigCAFileEnv(t *testing.T) {
// Ensures that a the CA file flag can be parsed.
func TestConfigCAFileFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-ca-file", "/tmp/file.ca"}), "")
assert.Equal(t, c.CAFile, "/tmp/file.ca", "")
}
@@ -186,7 +190,7 @@ func TestConfigCertFileEnv(t *testing.T) {
// Ensures that a the Cert file flag can be parsed.
func TestConfigCertFileFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-cert-file", "/tmp/file.cert"}), "")
assert.Equal(t, c.CertFile, "/tmp/file.cert", "")
}
@@ -201,7 +205,7 @@ func TestConfigKeyFileEnv(t *testing.T) {
// Ensures that a the Key file flag can be parsed.
func TestConfigKeyFileFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-key-file", "/tmp/file.key"}), "")
assert.Equal(t, c.KeyFile, "/tmp/file.key", "")
}
@@ -216,14 +220,14 @@ func TestConfigBindAddrEnv(t *testing.T) {
// Ensures that a the Listen Host file flag can be parsed.
func TestConfigBindAddrFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-bind-addr", "127.0.0.1:4003"}), "")
assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
}
// Ensures that a the Listen Host port overrides the advertised port
func TestConfigBindAddrOverride(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", "127.0.0.1:4010"}), "")
assert.Nil(t, c.Sanitize())
assert.Equal(t, c.BindAddr, "127.0.0.1:4010", "")
@@ -231,7 +235,7 @@ func TestConfigBindAddrOverride(t *testing.T) {
// Ensures that a the Listen Host inherits its port from the advertised addr
func TestConfigBindAddrInheritPort(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", "127.0.0.1"}), "")
assert.Nil(t, c.Sanitize())
assert.Equal(t, c.BindAddr, "127.0.0.1:4009", "")
@@ -239,7 +243,7 @@ func TestConfigBindAddrInheritPort(t *testing.T) {
// Ensures that a port only argument errors out
func TestConfigBindAddrErrorOnNoHost(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", ":4010"}), "")
assert.Error(t, c.Sanitize())
}
@@ -254,7 +258,7 @@ func TestConfigPeersEnv(t *testing.T) {
// Ensures that a the Peers flag can be parsed.
func TestConfigPeersFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-peers", "coreos.com:4001,coreos.com:4002"}), "")
assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
}
@@ -269,7 +273,7 @@ func TestConfigPeersFileEnv(t *testing.T) {
// Ensures that a the Peers File flag can be parsed.
func TestConfigPeersFileFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-peers-file", "/tmp/peers"}), "")
assert.Equal(t, c.PeersFile, "/tmp/peers", "")
}
@@ -284,7 +288,7 @@ func TestConfigMaxClusterSizeEnv(t *testing.T) {
// Ensures that a the Max Cluster Size flag can be parsed.
func TestConfigMaxClusterSizeFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-max-cluster-size", "5"}), "")
assert.Equal(t, c.MaxClusterSize, 5, "")
}
@@ -299,7 +303,7 @@ func TestConfigMaxResultBufferEnv(t *testing.T) {
// Ensures that a the Max Result Buffer flag can be parsed.
func TestConfigMaxResultBufferFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-max-result-buffer", "512"}), "")
assert.Equal(t, c.MaxResultBuffer, 512, "")
}
@@ -314,7 +318,7 @@ func TestConfigMaxRetryAttemptsEnv(t *testing.T) {
// Ensures that a the Max Retry Attempts flag can be parsed.
func TestConfigMaxRetryAttemptsFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-max-retry-attempts", "10"}), "")
assert.Equal(t, c.MaxRetryAttempts, 10, "")
}
@@ -329,14 +333,14 @@ func TestConfigNameEnv(t *testing.T) {
// Ensures that a the Name flag can be parsed.
func TestConfigNameFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-name", "test-name"}), "")
assert.Equal(t, c.Name, "test-name", "")
}
// Ensures that a Name gets guessed if not specified
func TestConfigNameGuess(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{}), "")
assert.Nil(t, c.Sanitize())
name, _ := os.Hostname()
@@ -345,7 +349,7 @@ func TestConfigNameGuess(t *testing.T) {
// Ensures that a DataDir gets guessed if not specified
func TestConfigDataDirGuess(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{}), "")
assert.Nil(t, c.Sanitize())
name, _ := os.Hostname()
@@ -362,7 +366,7 @@ func TestConfigSnapshotEnv(t *testing.T) {
// Ensures that a the Snapshot flag can be parsed.
func TestConfigSnapshotFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-snapshot"}), "")
assert.Equal(t, c.Snapshot, true, "")
}
@@ -377,7 +381,7 @@ func TestConfigVerboseEnv(t *testing.T) {
// Ensures that a the Verbose flag can be parsed.
func TestConfigVerboseFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-v"}), "")
assert.Equal(t, c.Verbose, true, "")
}
@@ -392,7 +396,7 @@ func TestConfigVeryVerboseEnv(t *testing.T) {
// Ensures that a the Very Verbose flag can be parsed.
func TestConfigVeryVerboseFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-vv"}), "")
assert.Equal(t, c.VeryVerbose, true, "")
}
@@ -407,7 +411,7 @@ func TestConfigPeerAddrEnv(t *testing.T) {
// Ensures that a the Peer Advertised URL flag can be parsed.
func TestConfigPeerAddrFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-peer-addr", "localhost:7002"}), "")
assert.Equal(t, c.Peer.Addr, "localhost:7002", "")
}
@@ -422,7 +426,7 @@ func TestConfigPeerCAFileEnv(t *testing.T) {
// Ensures that a the Peer CA file flag can be parsed.
func TestConfigPeerCAFileFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-peer-ca-file", "/tmp/peer/file.ca"}), "")
assert.Equal(t, c.Peer.CAFile, "/tmp/peer/file.ca", "")
}
@@ -437,7 +441,7 @@ func TestConfigPeerCertFileEnv(t *testing.T) {
// Ensures that a the Cert file flag can be parsed.
func TestConfigPeerCertFileFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-peer-cert-file", "/tmp/peer/file.cert"}), "")
assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "")
}
@@ -452,7 +456,7 @@ func TestConfigPeerKeyFileEnv(t *testing.T) {
// Ensures that a the Peer Key file flag can be parsed.
func TestConfigPeerKeyFileFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-peer-key-file", "/tmp/peer/file.key"}), "")
assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "")
}
@@ -467,7 +471,7 @@ func TestConfigPeerBindAddrEnv(t *testing.T) {
// Ensures that a bad flag returns an error.
func TestConfigBadFlag(t *testing.T) {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-no-such-flag"})
assert.Error(t, err)
assert.Equal(t, err.Error(), `flag provided but not defined: -no-such-flag`)
@@ -475,7 +479,7 @@ func TestConfigBadFlag(t *testing.T) {
// Ensures that a the Peer Listen Host file flag can be parsed.
func TestConfigPeerBindAddrFlag(t *testing.T) {
c := NewConfig()
c := New()
assert.Nil(t, c.LoadFlags([]string{"-peer-bind-addr", "127.0.0.1:4003"}), "")
assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:4003", "")
}
@@ -486,7 +490,7 @@ func TestConfigCustomConfigOverrideSystemConfig(t *testing.T) {
custom := `addr = "127.0.0.1:6000"`
withTempFile(system, func(p1 string) {
withTempFile(custom, func(p2 string) {
c := NewConfig()
c := New()
c.SystemPath = p1
assert.Nil(t, c.Load([]string{"-config", p2}), "")
assert.Equal(t, c.Addr, "http://127.0.0.1:6000", "")
@@ -501,7 +505,7 @@ func TestConfigEnvVarOverrideCustomConfig(t *testing.T) {
custom := `[peer]` + "\n" + `advertised_url = "127.0.0.1:9000"`
withTempFile(custom, func(path string) {
c := NewConfig()
c := New()
c.SystemPath = ""
assert.Nil(t, c.Load([]string{"-config", path}), "")
assert.Equal(t, c.Peer.Addr, "http://127.0.0.1:8000", "")
@@ -513,7 +517,7 @@ func TestConfigCLIArgsOverrideEnvVar(t *testing.T) {
os.Setenv("ETCD_ADDR", "127.0.0.1:1000")
defer os.Setenv("ETCD_ADDR", "")
c := NewConfig()
c := New()
c.SystemPath = ""
assert.Nil(t, c.Load([]string{"-addr", "127.0.0.1:2000"}), "")
assert.Equal(t, c.Addr, "http://127.0.0.1:2000", "")
@@ -525,7 +529,7 @@ func TestConfigCLIArgsOverrideEnvVar(t *testing.T) {
func TestConfigDeprecatedAddrFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-c", "127.0.0.1:4002"})
assert.NoError(t, err)
assert.Equal(t, c.Addr, "127.0.0.1:4002")
@@ -535,7 +539,7 @@ func TestConfigDeprecatedAddrFlag(t *testing.T) {
func TestConfigDeprecatedBindAddrFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-cl", "127.0.0.1:4003"})
assert.NoError(t, err)
assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
@@ -545,7 +549,7 @@ func TestConfigDeprecatedBindAddrFlag(t *testing.T) {
func TestConfigDeprecatedCAFileFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-clientCAFile", "/tmp/file.ca"})
assert.NoError(t, err)
assert.Equal(t, c.CAFile, "/tmp/file.ca", "")
@@ -555,7 +559,7 @@ func TestConfigDeprecatedCAFileFlag(t *testing.T) {
func TestConfigDeprecatedCertFileFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-clientCert", "/tmp/file.cert"})
assert.NoError(t, err)
assert.Equal(t, c.CertFile, "/tmp/file.cert", "")
@@ -565,7 +569,7 @@ func TestConfigDeprecatedCertFileFlag(t *testing.T) {
func TestConfigDeprecatedKeyFileFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-clientKey", "/tmp/file.key"})
assert.NoError(t, err)
assert.Equal(t, c.KeyFile, "/tmp/file.key", "")
@@ -575,7 +579,7 @@ func TestConfigDeprecatedKeyFileFlag(t *testing.T) {
func TestConfigDeprecatedPeersFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-C", "coreos.com:4001,coreos.com:4002"})
assert.NoError(t, err)
assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
@@ -585,7 +589,7 @@ func TestConfigDeprecatedPeersFlag(t *testing.T) {
func TestConfigDeprecatedPeersFileFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-CF", "/tmp/machines"})
assert.NoError(t, err)
assert.Equal(t, c.PeersFile, "/tmp/machines", "")
@@ -595,7 +599,7 @@ func TestConfigDeprecatedPeersFileFlag(t *testing.T) {
func TestConfigDeprecatedMaxClusterSizeFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-maxsize", "5"})
assert.NoError(t, err)
assert.Equal(t, c.MaxClusterSize, 5, "")
@@ -605,7 +609,7 @@ func TestConfigDeprecatedMaxClusterSizeFlag(t *testing.T) {
func TestConfigDeprecatedMaxResultBufferFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-m", "512"})
assert.NoError(t, err)
assert.Equal(t, c.MaxResultBuffer, 512, "")
@@ -615,7 +619,7 @@ func TestConfigDeprecatedMaxResultBufferFlag(t *testing.T) {
func TestConfigDeprecatedMaxRetryAttemptsFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-r", "10"})
assert.NoError(t, err)
assert.Equal(t, c.MaxRetryAttempts, 10, "")
@@ -625,7 +629,7 @@ func TestConfigDeprecatedMaxRetryAttemptsFlag(t *testing.T) {
func TestConfigDeprecatedNameFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-n", "test-name"})
assert.NoError(t, err)
assert.Equal(t, c.Name, "test-name", "")
@@ -635,7 +639,7 @@ func TestConfigDeprecatedNameFlag(t *testing.T) {
func TestConfigDeprecatedPeerAddrFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-s", "localhost:7002"})
assert.NoError(t, err)
assert.Equal(t, c.Peer.Addr, "localhost:7002", "")
@@ -645,7 +649,7 @@ func TestConfigDeprecatedPeerAddrFlag(t *testing.T) {
func TestConfigDeprecatedPeerBindAddrFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-sl", "127.0.0.1:4003"})
assert.NoError(t, err)
assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:4003", "")
@@ -655,7 +659,7 @@ func TestConfigDeprecatedPeerBindAddrFlag(t *testing.T) {
func TestConfigDeprecatedPeerCAFileFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-serverCAFile", "/tmp/peer/file.ca"})
assert.NoError(t, err)
assert.Equal(t, c.Peer.CAFile, "/tmp/peer/file.ca", "")
@@ -665,7 +669,7 @@ func TestConfigDeprecatedPeerCAFileFlag(t *testing.T) {
func TestConfigDeprecatedPeerCertFileFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-serverCert", "/tmp/peer/file.cert"})
assert.NoError(t, err)
assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "")
@@ -675,7 +679,7 @@ func TestConfigDeprecatedPeerCertFileFlag(t *testing.T) {
func TestConfigDeprecatedPeerKeyFileFlag(t *testing.T) {
_, stderr := capture(func() {
c := NewConfig()
c := New()
err := c.LoadFlags([]string{"-serverKey", "/tmp/peer/file.key"})
assert.NoError(t, err)
assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "")
@@ -691,7 +695,7 @@ func TestConfigDeprecatedPeerKeyFileFlag(t *testing.T) {
func withEnv(key, value string, f func(c *Config)) {
os.Setenv(key, value)
defer os.Setenv(key, "")
c := NewConfig()
c := New()
f(c)
}

View File

@@ -1,4 +1,4 @@
package server
package config
const (
// The amount of time (in ms) to elapse without a heartbeat before becoming a candidate

145
discovery/discovery.go Normal file
View File

@@ -0,0 +1,145 @@
package discovery
import (
"errors"
"fmt"
"net/url"
"path"
"strings"
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
)
const (
stateKey = "_state"
initState = "init"
startedState = "started"
defaultTTL = 604800 // One week TTL
)
type Discoverer struct {
client *etcd.Client
name string
peer string
prefix string
discoveryURL string
}
var defaultDiscoverer *Discoverer
func init() {
defaultDiscoverer = &Discoverer{}
}
func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []string, err error) {
d.name = name
d.peer = peer
d.discoveryURL = discoveryURL
u, err := url.Parse(discoveryURL)
if err != nil {
return
}
// prefix is prepended to all keys for this discovery
d.prefix = strings.TrimPrefix(u.Path, "/v2/keys/")
// keep the old path in case we need to set the KeyPrefix below
oldPath := u.Path
u.Path = ""
// Connect to a scheme://host not a full URL with path
log.Infof("Discovery via %s using prefix %s.", u.String(), d.prefix)
d.client = etcd.NewClient([]string{u.String()})
if !strings.HasPrefix(oldPath, "/v2/keys") {
d.client.SetKeyPrefix("")
}
// Register this machine first and announce that we are a member of
// this cluster
err = d.heartbeat()
if err != nil {
return
}
// Start the very slow heartbeat to the cluster now in anticipation
// that everything is going to go alright now
go d.startHeartbeat()
// Attempt to take the leadership role, if there is no error we are it!
resp, err := d.client.CompareAndSwap(path.Join(d.prefix, stateKey), startedState, 0, initState, 0)
// Bail out on unexpected errors
if err != nil {
if etcdErr, ok := err.(*etcd.EtcdError); !ok || etcdErr.ErrorCode != 101 {
return nil, err
}
}
// If we got a response then the CAS was successful, we are leader
if resp != nil && resp.Node.Value == startedState {
// We are the leader, we have no peers
log.Infof("Discovery was in the 'init' state this machine is the initial leader.")
return nil, nil
}
// Fall through to finding the other discovery peers
return d.findPeers()
}
func (d *Discoverer) findPeers() (peers []string, err error) {
resp, err := d.client.Get(path.Join(d.prefix), false, true)
if err != nil {
return nil, err
}
node := resp.Node
if node == nil {
return nil, fmt.Errorf("%s key doesn't exist.", d.prefix)
}
for _, n := range node.Nodes {
// Skip our own entry in the list, there is no point
if strings.HasSuffix(n.Key, "/"+d.name) {
continue
}
peers = append(peers, n.Value)
}
if len(peers) == 0 {
return nil, errors.New("Discovery found an initialized cluster but no peers are registered.")
}
log.Infof("Discovery found peers %v", peers)
return
}
func (d *Discoverer) startHeartbeat() {
// In case of errors we should attempt to heartbeat fairly frequently
heartbeatInterval := defaultTTL / 8
ticker := time.Tick(time.Second * time.Duration(heartbeatInterval))
for {
select {
case <-ticker:
err := d.heartbeat()
if err != nil {
log.Warnf("Discovery heartbeat failed: %v", err)
}
}
}
}
func (d *Discoverer) heartbeat() error {
_, err := d.client.Set(path.Join(d.prefix, d.name), d.peer, defaultTTL)
return err
}
func Do(discoveryURL string, name string, peer string) ([]string, error) {
return defaultDiscoverer.Do(discoveryURL, name, peer)
}

View File

@@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/config"
ehttp "github.com/coreos/etcd/http"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/metrics"
@@ -35,7 +36,7 @@ import (
func main() {
// Load configuration.
var config = server.NewConfig()
var config = config.New()
if err := config.Load(os.Args[1:]); err != nil {
fmt.Println(server.Usage() + "\n")
fmt.Println(err.Error() + "\n")

View File

@@ -1,4 +1,4 @@
package server
package http
import (
"encoding/json"
@@ -6,12 +6,11 @@ import (
"io"
"net/http"
"net/url"
"strings"
"github.com/coreos/etcd/log"
)
func decodeJsonRequest(req *http.Request, data interface{}) error {
func DecodeJsonRequest(req *http.Request, data interface{}) error {
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&data); err != nil && err != io.EOF {
log.Warnf("Malformed json request: %v", err)
@@ -20,7 +19,7 @@ func decodeJsonRequest(req *http.Request, data interface{}) error {
return nil
}
func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
func Redirect(hostname string, w http.ResponseWriter, req *http.Request) {
originalURL := req.URL
redirectURL, _ := url.Parse(hostname)
@@ -32,14 +31,3 @@ func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
log.Debugf("Redirect to %s", redirectURL.String())
http.Redirect(w, req, redirectURL.String(), http.StatusTemporaryRedirect)
}
// trimsplit slices s into all substrings separated by sep and returns a
// slice of the substrings between the separator with all leading and trailing
// white space removed, as defined by Unicode.
func trimsplit(s, sep string) []string {
trimmed := strings.Split(s, sep)
for i := range trimmed {
trimmed[i] = strings.TrimSpace(trimmed[i])
}
return trimmed
}

16
pkg/strings/string.go Normal file
View File

@@ -0,0 +1,16 @@
package string
import (
"strings"
)
// TrimSplit slices s into all substrings separated by sep and returns a
// slice of the substrings between the separator with all leading and trailing
// white space removed, as defined by Unicode.
func TrimSplit(s, sep string) []string {
trimmed := strings.Split(s, sep)
for i := range trimmed {
trimmed[i] = strings.TrimSpace(trimmed[i])
}
return trimmed
}

View File

@@ -6,16 +6,25 @@ ulimit -n unlimited
tmux new-session -d -s $SESSION
peer_args=
if [ -n "${DISCOVERY_URL}" ]; then
peer_args="-discovery ${DISCOVERY_URL}"
fi
# Setup a window for tailing log files
tmux new-window -t $SESSION:1 -n 'peers'
tmux split-window -h
tmux select-pane -t 0
tmux send-keys "${DIR}/../bin/etcd -peer-addr 127.0.0.1:7001 -addr 127.0.0.1:4001 -data-dir peer1 -name peer1" C-m
tmux send-keys "${DIR}/../bin/etcd -peer-addr 127.0.0.1:7001 -addr 127.0.0.1:4001 -data-dir peer1 -name peer1 ${peer_args}" C-m
if [ -n "${peer_args}" ]; then
peer_args="-peers 127.0.0.1:7001"
fi
for i in 2 3; do
tmux select-pane -t 0
tmux split-window -v
tmux send-keys "${DIR}/../bin/etcd -cors='*' -peer-addr 127.0.0.1:700${i} -addr 127.0.0.1:400${i} -peers 127.0.0.1:7001 -data-dir peer${i} -name peer${i}" C-m
tmux send-keys "${DIR}/../bin/etcd -cors='*' -peer-addr 127.0.0.1:700${i} -addr 127.0.0.1:400${i} -data-dir peer${i} -name peer${i} ${peer_args}" C-m
done
# Attach to session

View File

@@ -209,7 +209,7 @@ func (s *PeerServer) startAsFollower(cluster []string) {
if ok {
return
}
log.Warnf("cannot join to cluster via given peers, retry in %d seconds", retryInterval)
log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %d seconds", cluster, retryInterval)
time.Sleep(time.Second * retryInterval)
}
@@ -266,17 +266,18 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme)
if err == nil {
log.Debugf("%s success join to the cluster via peer %s", s.Config.Name, peer)
log.Debugf("%s joined the cluster via peer %s", s.Config.Name, peer)
return true
} else {
if _, ok := err.(etcdErr.Error); ok {
log.Fatal(err)
}
log.Debugf("cannot join to cluster via peer %s %s", peer, err)
}
if _, ok := err.(etcdErr.Error); ok {
log.Fatal(err)
}
log.Warnf("Attempt to join via %s failed: %s", peer, err)
}
return false
}

View File

@@ -6,11 +6,13 @@ import (
"strconv"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
)
// Get all the current logs
@@ -149,7 +151,7 @@ func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Reques
func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommand{}
err := decodeJsonRequest(req, command)
err := uhttp.DecodeJsonRequest(req, command)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return

View File

@@ -15,6 +15,7 @@ import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/metrics"
"github.com/coreos/etcd/mod"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/server/v1"
"github.com/coreos/etcd/server/v2"
"github.com/coreos/etcd/store"
@@ -244,7 +245,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
default:
url, _ = ps.registry.ClientURL(leader)
}
redirect(url, w, req)
uhttp.Redirect(url, w, req)
return nil
}
@@ -295,7 +296,7 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request)
return etcdErr.NewError(300, "", s.Store().Index())
}
hostname, _ := s.registry.ClientURL(leader)
redirect(hostname, w, req)
uhttp.Redirect(hostname, w, req)
return nil
}

View File

@@ -26,6 +26,7 @@ Options:
-vv Enabled very verbose logging.
Cluster Configuration Options:
-discovery=<url> Discovery service used to find a peer list.
-peers-file=<path> Path to a file containing the peer list.
-peers=<host:port>,<host:port> Comma-separated list of peers. The members
should match the peer's '-peer-addr' flag.

View File

@@ -0,0 +1,245 @@
package test
import (
"errors"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
etcdtest "github.com/coreos/etcd/tests"
"github.com/coreos/etcd/server"
goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
)
type garbageHandler struct {
t *testing.T
success bool
}
func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, client")
println("HI")
if r.URL.String() != "/v2/keys/_etcd/registry/1/node1" {
g.t.Fatalf("Unexpected web request")
}
g.success = true
}
// TestDiscoveryDownNoBackupPeers ensures that etcd stops if it is started with a
// bad discovery URL and no backups.
func TestDiscoveryDownNoBackupPeers(t *testing.T) {
g := garbageHandler{t: t}
ts := httptest.NewServer(&g)
defer ts.Close()
discover := ts.URL + "/v2/keys/_etcd/registry/1"
proc, err := startServer([]string{"-discovery", discover})
if err != nil {
t.Fatal(err.Error())
}
defer stopServer(proc)
client := http.Client{}
err = assertServerNotUp(client, "http")
if err != nil {
t.Fatal(err.Error())
}
if !g.success {
t.Fatal("Discovery server never called")
}
}
// TestDiscoveryDownWithBackupPeers ensures that etcd runs if it is started with a
// bad discovery URL and a peer list.
func TestDiscoveryDownWithBackupPeers(t *testing.T) {
etcdtest.RunServer(func(s *server.Server) {
g := garbageHandler{t: t}
ts := httptest.NewServer(&g)
defer ts.Close()
discover := ts.URL + "/v2/keys/_etcd/registry/1"
u, ok := s.PeerURL("ETCDTEST")
if !ok {
t.Fatalf("Couldn't find the URL")
}
proc, err := startServer([]string{"-discovery", discover, "-peers", u})
if err != nil {
t.Fatal(err.Error())
}
defer stopServer(proc)
client := http.Client{}
err = assertServerFunctional(client, "http")
if err != nil {
t.Fatal(err.Error())
}
if !g.success {
t.Fatal("Discovery server never called")
}
})
}
// TestDiscoveryFirstPeer ensures that etcd starts as the leader if it
// registers as the first peer.
func TestDiscoveryFirstPeer(t *testing.T) {
etcdtest.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "init")
resp, err := etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/_state"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/2"})
if err != nil {
t.Fatal(err.Error())
}
defer stopServer(proc)
client := http.Client{}
err = assertServerFunctional(client, "http")
if err != nil {
t.Fatal(err.Error())
}
})
}
// TestDiscoverySecondPeerFirstDown ensures that etcd stops if it is started with a
// correct discovery URL but no active machines are found.
func TestDiscoverySecondPeerFirstDown(t *testing.T) {
etcdtest.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "started")
resp, err := etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/_state"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/2"})
if err != nil {
t.Fatal(err.Error())
}
defer stopServer(proc)
client := http.Client{}
err = assertServerNotUp(client, "http")
if err != nil {
t.Fatal(err.Error())
}
})
}
// TestDiscoverySecondPeerFirstNoResponse ensures that if the first etcd
// machine stops after heartbeating that the second machine fails too.
func TestDiscoverySecondPeerFirstNoResponse(t *testing.T) {
etcdtest.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "started")
resp, err := etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/_state"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
v = url.Values{}
v.Set("value", "http://127.0.0.1:49151")
resp, err = etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/ETCDTEST"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/2"})
if err != nil {
t.Fatal(err.Error())
}
defer stopServer(proc)
// TODO(bp): etcd will take 30 seconds to shutdown, figure this
// out instead
time.Sleep(35 * time.Second)
client := http.Client{}
_, err = client.Get("/")
if err != nil && strings.Contains(err.Error(), "connection reset by peer") {
t.Fatal(err.Error())
}
})
}
// TestDiscoverySecondPeerUp ensures that a second peer joining a discovery
// cluster works.
func TestDiscoverySecondPeerUp(t *testing.T) {
etcdtest.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "started")
resp, err := etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/3/_state"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
u, ok := s.PeerURL("ETCDTEST")
if !ok {
t.Fatalf("Couldn't find the URL")
}
wc := goetcd.NewClient([]string{s.URL()})
_, err = wc.Set("test", "0", 0)
if err != nil {
t.Fatalf("Couldn't set a test key on the leader %v", err)
}
receiver := make(chan *goetcd.Response)
stop := make(chan bool)
go wc.Watch("_etcd/registry/3/node1", 0, false, receiver, stop)
v = url.Values{}
v.Set("value", u)
resp, err = etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/3/ETCDTEST"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/3"})
if err != nil {
t.Fatal(err.Error())
}
defer stopServer(proc)
// Test to ensure the machine registered iteslf
watchResp := <-receiver
if watchResp.Node.Value != "http://127.0.0.1:7001" {
t.Fatalf("Second peer didn't register! %s", watchResp.Node.Value)
}
// TODO(bp): need to have a better way of knowing a machine is up
time.Sleep(1 * time.Second)
etcdc := goetcd.NewClient(nil)
_, err = etcdc.Set("foobar", "baz", 0)
if err != nil {
t.Fatal(err.Error())
}
})
}
func assertServerNotUp(client http.Client, scheme string) error {
path := fmt.Sprintf("%s://127.0.0.1:4001/v2/keys/foo", scheme)
fields := url.Values(map[string][]string{"value": []string{"bar"}})
for i := 0; i < 10; i++ {
time.Sleep(1 * time.Second)
_, err := client.PostForm(path, fields)
if err == nil {
return errors.New("Expected error during POST, got nil")
} else {
errString := err.Error()
if strings.Contains(errString, "connection refused") {
return nil
} else {
return err
}
}
}
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/metrics"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/store"
)
@@ -39,7 +40,10 @@ func RunServer(f func(*server.Server)) {
SnapshotCount: testSnapshotCount,
MaxClusterSize: 9,
}
ps := server.NewPeerServer(psConfig, registry, store, nil, followersStats, serverStats)
mb := metrics.NewBucket("")
ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats)
psListener, err := server.NewListener(testRaftURL)
if err != nil {
panic(err)

View File

@@ -12,15 +12,9 @@ import (
"net/url"
"os"
"path"
"strings"
"time"
)
const (
HTTP = iota
HTTPS
)
// See SetConsistency for how to use these constants.
const (
// Using strings rather than iota because the consistency level
@@ -34,45 +28,27 @@ const (
defaultBufferSize = 10
)
type Cluster struct {
Leader string `json:"leader"`
Machines []string `json:"machines"`
}
type Config struct {
CertFile string `json:"certFile"`
KeyFile string `json:"keyFile"`
CaCertFile string `json:"caCertFile"`
Scheme string `json:"scheme"`
CaCertFile []string `json:"caCertFiles"`
Timeout time.Duration `json:"timeout"`
Consistency string `json: "consistency"`
}
type Client struct {
cluster Cluster `json:"cluster"`
config Config `json:"config"`
config Config `json:"config"`
cluster *Cluster `json:"cluster"`
httpClient *http.Client
persistence io.Writer
cURLch chan string
keyPrefix string
}
// NewClient create a basic client that is configured to be used
// with the given machine list.
func NewClient(machines []string) *Client {
// if an empty slice was sent in then just assume localhost
if len(machines) == 0 {
machines = []string{"http://127.0.0.1:4001"}
}
// default leader and machines
cluster := Cluster{
Leader: machines[0],
Machines: machines,
}
config := Config{
// default use http
Scheme: "http",
// default timeout is one second
Timeout: time.Second,
// default consistency level is STRONG
@@ -80,75 +56,146 @@ func NewClient(machines []string) *Client {
}
client := &Client{
cluster: cluster,
config: config,
cluster: NewCluster(machines),
config: config,
keyPrefix: path.Join(version, "keys"),
}
err := setupHttpClient(client)
if err != nil {
panic(err)
}
client.initHTTPClient()
client.saveConfig()
return client
}
// NewClientFile creates a client from a given file path.
// NewTLSClient create a basic client with TLS configuration
func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error) {
// overwrite the default machine to use https
if len(machines) == 0 {
machines = []string{"https://127.0.0.1:4001"}
}
config := Config{
// default timeout is one second
Timeout: time.Second,
// default consistency level is STRONG
Consistency: STRONG_CONSISTENCY,
CertFile: cert,
KeyFile: key,
CaCertFile: make([]string, 0),
}
client := &Client{
cluster: NewCluster(machines),
config: config,
keyPrefix: path.Join(version, "keys"),
}
err := client.initHTTPSClient(cert, key)
if err != nil {
return nil, err
}
err = client.AddRootCA(caCert)
client.saveConfig()
return client, nil
}
// NewClientFromFile creates a client from a given file path.
// The given file is expected to use the JSON format.
func NewClientFile(fpath string) (*Client, error) {
func NewClientFromFile(fpath string) (*Client, error) {
fi, err := os.Open(fpath)
if err != nil {
return nil, err
}
defer func() {
if err := fi.Close(); err != nil {
panic(err)
}
}()
return NewClientReader(fi)
return NewClientFromReader(fi)
}
// NewClientReader creates a Client configured from a given reader.
// The config is expected to use the JSON format.
func NewClientReader(reader io.Reader) (*Client, error) {
var client Client
// NewClientFromReader creates a Client configured from a given reader.
// The configuration is expected to use the JSON format.
func NewClientFromReader(reader io.Reader) (*Client, error) {
c := new(Client)
b, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, &client)
err = json.Unmarshal(b, c)
if err != nil {
return nil, err
}
if c.config.CertFile == "" {
c.initHTTPClient()
} else {
err = c.initHTTPSClient(c.config.CertFile, c.config.KeyFile)
}
if err != nil {
return nil, err
}
err = setupHttpClient(&client)
if err != nil {
return nil, err
for _, caCert := range c.config.CaCertFile {
if err := c.AddRootCA(caCert); err != nil {
return nil, err
}
}
return &client, nil
return c, nil
}
func setupHttpClient(client *Client) error {
if client.config.CertFile != "" && client.config.KeyFile != "" {
err := client.SetCertAndKey(client.config.CertFile, client.config.KeyFile, client.config.CaCertFile)
if err != nil {
return err
}
} else {
client.config.CertFile = ""
client.config.KeyFile = ""
tr := &http.Transport{
Dial: dialTimeout,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
client.httpClient = &http.Client{Transport: tr}
// Override the Client's HTTP Transport object
func (c *Client) SetTransport(tr *http.Transport) {
c.httpClient.Transport = tr
}
// SetKeyPrefix changes the key prefix from the default `/v2/keys` to whatever
// is set.
func (c *Client) SetKeyPrefix(prefix string) {
c.keyPrefix = prefix
}
// initHTTPClient initializes a HTTP client for etcd client
func (c *Client) initHTTPClient() {
tr := &http.Transport{
Dial: dialTimeout,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
c.httpClient = &http.Client{Transport: tr}
}
// initHTTPClient initializes a HTTPS client for etcd client
func (c *Client) initHTTPSClient(cert, key string) error {
if cert == "" || key == "" {
return errors.New("Require both cert and key path")
}
tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return err
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{tlsCert},
InsecureSkipVerify: true,
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
Dial: dialTimeout,
}
c.httpClient = &http.Client{Transport: tr}
return nil
}
@@ -179,114 +226,45 @@ func (c *Client) SetConsistency(consistency string) error {
return nil
}
// MarshalJSON implements the Marshaller interface
// as defined by the standard JSON package.
func (c *Client) MarshalJSON() ([]byte, error) {
b, err := json.Marshal(struct {
Config Config `json:"config"`
Cluster Cluster `json:"cluster"`
}{
Config: c.config,
Cluster: c.cluster,
})
if err != nil {
return nil, err
// AddRootCA adds a root CA cert for the etcd client
func (c *Client) AddRootCA(caCert string) error {
if c.httpClient == nil {
return errors.New("Client has not been initialized yet!")
}
return b, nil
}
// UnmarshalJSON implements the Unmarshaller interface
// as defined by the standard JSON package.
func (c *Client) UnmarshalJSON(b []byte) error {
temp := struct {
Config Config `json: "config"`
Cluster Cluster `json: "cluster"`
}{}
err := json.Unmarshal(b, &temp)
certBytes, err := ioutil.ReadFile(caCert)
if err != nil {
return err
}
c.cluster = temp.Cluster
c.config = temp.Config
return nil
}
tr, ok := c.httpClient.Transport.(*http.Transport)
// saveConfig saves the current config using c.persistence.
func (c *Client) saveConfig() error {
if c.persistence != nil {
b, err := json.Marshal(c)
if err != nil {
return err
}
_, err = c.persistence.Write(b)
if err != nil {
return err
}
if !ok {
panic("AddRootCA(): Transport type assert should not fail")
}
return nil
if tr.TLSClientConfig.RootCAs == nil {
caCertPool := x509.NewCertPool()
ok = caCertPool.AppendCertsFromPEM(certBytes)
if ok {
tr.TLSClientConfig.RootCAs = caCertPool
}
tr.TLSClientConfig.InsecureSkipVerify = false
} else {
ok = tr.TLSClientConfig.RootCAs.AppendCertsFromPEM(certBytes)
}
if !ok {
err = errors.New("Unable to load caCert")
}
c.config.CaCertFile = append(c.config.CaCertFile, caCert)
c.saveConfig()
return err
}
func (c *Client) SetCertAndKey(cert string, key string, caCert string) error {
if cert != "" && key != "" {
tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return err
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
if caCert != "" {
caCertPool := x509.NewCertPool()
certBytes, err := ioutil.ReadFile(caCert)
if err != nil {
return err
}
if !caCertPool.AppendCertsFromPEM(certBytes) {
return errors.New("Unable to load caCert")
}
tlsConfig.RootCAs = caCertPool
} else {
tlsConfig.InsecureSkipVerify = true
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
Dial: dialTimeout,
}
c.httpClient = &http.Client{Transport: tr}
c.saveConfig()
return nil
}
return errors.New("Require both cert and key path")
}
func (c *Client) SetScheme(scheme int) error {
if scheme == HTTP {
c.config.Scheme = "http"
c.saveConfig()
return nil
}
if scheme == HTTPS {
c.config.Scheme = "https"
c.saveConfig()
return nil
}
return errors.New("Unknown Scheme")
}
// SetCluster updates config using the given machine list.
// SetCluster updates cluster information using the given machine list.
func (c *Client) SetCluster(machines []string) bool {
success := c.internalSyncCluster(machines)
return success
@@ -296,16 +274,15 @@ func (c *Client) GetCluster() []string {
return c.cluster.Machines
}
// SyncCluster updates config using the internal machine list.
// SyncCluster updates the cluster information using the internal machine list.
func (c *Client) SyncCluster() bool {
success := c.internalSyncCluster(c.cluster.Machines)
return success
return c.internalSyncCluster(c.cluster.Machines)
}
// internalSyncCluster syncs cluster information using the given machine list.
func (c *Client) internalSyncCluster(machines []string) bool {
for _, machine := range machines {
httpPath := c.createHttpPath(machine, version+"/machines")
httpPath := c.createHttpPath(machine, path.Join(version, "machines"))
resp, err := c.httpClient.Get(httpPath)
if err != nil {
// try another machine in the cluster
@@ -319,12 +296,11 @@ func (c *Client) internalSyncCluster(machines []string) bool {
}
// update Machines List
c.cluster.Machines = strings.Split(string(b), ", ")
c.cluster.updateFromStr(string(b))
// update leader
// the first one in the machine list is the leader
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0])
c.cluster.Leader = c.cluster.Machines[0]
c.cluster.switchLeader(0)
logger.Debug("sync.machines ", c.cluster.Machines)
c.saveConfig()
@@ -337,8 +313,12 @@ func (c *Client) internalSyncCluster(machines []string) bool {
// createHttpPath creates a complete HTTP URL.
// serverName should contain both the host name and a port number, if any.
func (c *Client) createHttpPath(serverName string, _path string) string {
u, _ := url.Parse(serverName)
u.Path = path.Join(u.Path, "/", _path)
u, err := url.Parse(serverName)
if err != nil {
panic(err)
}
u.Path = path.Join(u.Path, _path)
if u.Scheme == "" {
u.Scheme = "http"
@@ -351,27 +331,6 @@ func dialTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, time.Second)
}
func (c *Client) updateLeader(u *url.URL) {
var leader string
if u.Scheme == "" {
leader = "http://" + u.Host
} else {
leader = u.Scheme + "://" + u.Host
}
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader)
c.cluster.Leader = leader
c.saveConfig()
}
// switchLeader switch the current leader to machines[num]
func (c *Client) switchLeader(num int) {
logger.Debugf("switch.leader[from %v to %v]",
c.cluster.Leader, c.cluster.Machines[num])
c.cluster.Leader = c.cluster.Machines[num]
}
func (c *Client) OpenCURL() {
c.cURLch = make(chan string, defaultBufferSize)
}
@@ -392,3 +351,55 @@ func (c *Client) sendCURL(command string) {
func (c *Client) RecvCURL() string {
return <-c.cURLch
}
// saveConfig saves the current config using c.persistence.
func (c *Client) saveConfig() error {
if c.persistence != nil {
b, err := json.Marshal(c)
if err != nil {
return err
}
_, err = c.persistence.Write(b)
if err != nil {
return err
}
}
return nil
}
// MarshalJSON implements the Marshaller interface
// as defined by the standard JSON package.
func (c *Client) MarshalJSON() ([]byte, error) {
b, err := json.Marshal(struct {
Config Config `json:"config"`
Cluster *Cluster `json:"cluster"`
}{
Config: c.config,
Cluster: c.cluster,
})
if err != nil {
return nil, err
}
return b, nil
}
// UnmarshalJSON implements the Unmarshaller interface
// as defined by the standard JSON package.
func (c *Client) UnmarshalJSON(b []byte) error {
temp := struct {
Config Config `json: "config"`
Cluster *Cluster `json: "cluster"`
}{}
err := json.Unmarshal(b, &temp)
if err != nil {
return err
}
c.cluster = temp.Cluster
c.config = temp.Config
return nil
}

View File

@@ -14,7 +14,9 @@ import (
func TestSync(t *testing.T) {
fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003")
c := NewClient(nil)
// Explicit trailing slash to ensure this doesn't reproduce:
// https://github.com/coreos/go-etcd/issues/82
c := NewClient([]string{"http://127.0.0.1:4001/"})
success := c.SyncCluster()
if !success {
@@ -79,7 +81,7 @@ func TestPersistence(t *testing.T) {
t.Fatal(err)
}
c2, err := NewClientFile("config.json")
c2, err := NewClientFromFile("config.json")
if err != nil {
t.Fatal(err)
}

View File

@@ -0,0 +1,51 @@
package etcd
import (
"net/url"
"strings"
)
type Cluster struct {
Leader string `json:"leader"`
Machines []string `json:"machines"`
}
func NewCluster(machines []string) *Cluster {
// if an empty slice was sent in then just assume HTTP 4001 on localhost
if len(machines) == 0 {
machines = []string{"http://127.0.0.1:4001"}
}
// default leader and machines
return &Cluster{
Leader: machines[0],
Machines: machines,
}
}
// switchLeader switch the current leader to machines[num]
func (cl *Cluster) switchLeader(num int) {
logger.Debugf("switch.leader[from %v to %v]",
cl.Leader, cl.Machines[num])
cl.Leader = cl.Machines[num]
}
func (cl *Cluster) updateFromStr(machines string) {
cl.Machines = strings.Split(machines, ", ")
}
func (cl *Cluster) updateLeader(leader string) {
logger.Debugf("update.leader[%s,%s]", cl.Leader, leader)
cl.Leader = leader
}
func (cl *Cluster) updateLeaderFromURL(u *url.URL) {
var leader string
if u.Scheme == "" {
leader = "http://" + u.Host
} else {
leader = u.Scheme + "://" + u.Host
}
cl.updateLeader(leader)
}

View File

@@ -0,0 +1,34 @@
package etcd
import "fmt"
func (c *Client) CompareAndDelete(key string, prevValue string, prevIndex uint64) (*Response, error) {
raw, err := c.RawCompareAndDelete(key, prevValue, prevIndex)
if err != nil {
return nil, err
}
return raw.toResponse()
}
func (c *Client) RawCompareAndDelete(key string, prevValue string, prevIndex uint64) (*RawResponse, error) {
if prevValue == "" && prevIndex == 0 {
return nil, fmt.Errorf("You must give either prevValue or prevIndex.")
}
options := options{}
if prevValue != "" {
options["prevValue"] = prevValue
}
if prevIndex != 0 {
options["prevIndex"] = prevIndex
}
raw, err := c.delete(key, options)
if err != nil {
return nil, err
}
return raw, err
}

View File

@@ -0,0 +1,46 @@
package etcd
import (
"testing"
)
func TestCompareAndDelete(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("foo", true)
}()
c.Set("foo", "bar", 5)
// This should succeed an correct prevValue
resp, err := c.CompareAndDelete("foo", "bar", 0)
if err != nil {
t.Fatal(err)
}
if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) {
t.Fatalf("CompareAndDelete 1 prevNode failed: %#v", resp)
}
resp, _ = c.Set("foo", "bar", 5)
// This should fail because it gives an incorrect prevValue
_, err = c.CompareAndDelete("foo", "xxx", 0)
if err == nil {
t.Fatalf("CompareAndDelete 2 should have failed. The response is: %#v", resp)
}
// This should succeed because it gives an correct prevIndex
resp, err = c.CompareAndDelete("foo", "", resp.Node.ModifiedIndex)
if err != nil {
t.Fatal(err)
}
if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) {
t.Fatalf("CompareAndSwap 3 prevNode failed: %#v", resp)
}
c.Set("foo", "bar", 5)
// This should fail because it gives an incorrect prevIndex
resp, err = c.CompareAndDelete("foo", "", 29817514)
if err == nil {
t.Fatalf("CompareAndDelete 4 should have failed. The response is: %#v", resp)
}
}

View File

@@ -2,7 +2,18 @@ package etcd
import "fmt"
func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*Response, error) {
func (c *Client) CompareAndSwap(key string, value string, ttl uint64,
prevValue string, prevIndex uint64) (*Response, error) {
raw, err := c.RawCompareAndSwap(key, value, ttl, prevValue, prevIndex)
if err != nil {
return nil, err
}
return raw.toResponse()
}
func (c *Client) RawCompareAndSwap(key string, value string, ttl uint64,
prevValue string, prevIndex uint64) (*RawResponse, error) {
if prevValue == "" && prevIndex == 0 {
return nil, fmt.Errorf("You must give either prevValue or prevIndex.")
}
@@ -21,5 +32,5 @@ func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue
return nil, err
}
return raw.toResponse()
return raw, err
}

View File

@@ -0,0 +1 @@
{"config":{"certFile":"","keyFile":"","caCertFiles":null,"timeout":1000000000,"Consistency":"STRONG"},"cluster":{"leader":"http://127.0.0.1:4001","machines":["http://127.0.0.1:4001","http://127.0.0.1:4002"]}}

View File

@@ -1,28 +1,53 @@
package etcd
import (
"os"
"github.com/coreos/etcd/third_party/github.com/coreos/go-log/log"
"io/ioutil"
"log"
"strings"
)
var logger *log.Logger
type Logger interface {
Debug(args ...interface{})
Debugf(fmt string, args ...interface{})
Warning(args ...interface{})
Warningf(fmt string, args ...interface{})
}
var logger Logger
func SetLogger(log Logger) {
logger = log
}
func GetLogger() Logger {
return logger
}
type defaultLogger struct {
log *log.Logger
}
func (p *defaultLogger) Debug(args ...interface{}) {
p.log.Println(args)
}
func (p *defaultLogger) Debugf(fmt string, args ...interface{}) {
// Append newline if necessary
if !strings.HasSuffix(fmt, "\n") {
fmt = fmt + "\n"
}
p.log.Printf(fmt, args)
}
func (p *defaultLogger) Warning(args ...interface{}) {
p.Debug(args)
}
func (p *defaultLogger) Warningf(fmt string, args ...interface{}) {
p.Debugf(fmt, args)
}
func init() {
setLogger(log.PriErr)
}
func OpenDebug() {
setLogger(log.PriDebug)
}
func CloseDebug() {
setLogger(log.PriErr)
}
func setLogger(priority log.Priority) {
logger = log.NewSimple(
log.PriorityFilter(
priority,
log.WriterSink(os.Stdout, log.BasicFormat, log.BasicFields)))
// Default logger uses the go default log.
SetLogger(&defaultLogger{log.New(ioutil.Discard, "go-etcd", log.LstdFlags)})
}

View File

@@ -10,7 +10,7 @@ package etcd
// then everything under the directory (including all child directories)
// will be deleted.
func (c *Client) Delete(key string, recursive bool) (*Response, error) {
raw, err := c.DeleteRaw(key, recursive, false)
raw, err := c.RawDelete(key, recursive, false)
if err != nil {
return nil, err
@@ -21,7 +21,7 @@ func (c *Client) Delete(key string, recursive bool) (*Response, error) {
// DeleteDir deletes an empty directory or a key value pair
func (c *Client) DeleteDir(key string) (*Response, error) {
raw, err := c.DeleteRaw(key, false, true)
raw, err := c.RawDelete(key, false, true)
if err != nil {
return nil, err
@@ -30,7 +30,7 @@ func (c *Client) DeleteDir(key string) (*Response, error) {
return raw.toResponse()
}
func (c *Client) DeleteRaw(key string, recursive bool, dir bool) (*RawResponse, error) {
func (c *Client) RawDelete(key string, recursive bool, dir bool) (*RawResponse, error) {
ops := options{
"recursive": recursive,
"dir": dir,

View File

@@ -36,9 +36,13 @@ func newError(errorCode int, cause string, index uint64) *EtcdError {
}
func handleError(b []byte) error {
var err EtcdError
etcdErr := new(EtcdError)
json.Unmarshal(b, &err)
err := json.Unmarshal(b, etcdErr)
if err != nil {
logger.Warningf("cannot unmarshal etcd error: %v", err)
return err
}
return err
return etcdErr
}

View File

@@ -5,6 +5,26 @@ import (
"testing"
)
// cleanNode scrubs Expiration, ModifiedIndex and CreatedIndex of a node.
func cleanNode(n *Node) {
n.Expiration = nil
n.ModifiedIndex = 0
n.CreatedIndex = 0
}
// cleanResult scrubs a result object two levels deep of Expiration,
// ModifiedIndex and CreatedIndex.
func cleanResult(result *Response) {
// TODO(philips): make this recursive.
cleanNode(result.Node)
for i, _ := range result.Node.Nodes {
cleanNode(&result.Node.Nodes[i])
for j, _ := range result.Node.Nodes[i].Nodes {
cleanNode(&result.Node.Nodes[i].Nodes[j])
}
}
}
func TestGet(t *testing.T) {
c := NewClient(nil)
defer func() {
@@ -48,25 +68,18 @@ func TestGetAll(t *testing.T) {
expected := Nodes{
Node{
Key: "/fooDir/k0",
Value: "v0",
TTL: 5,
ModifiedIndex: 31,
CreatedIndex: 31,
Key: "/fooDir/k0",
Value: "v0",
TTL: 5,
},
Node{
Key: "/fooDir/k1",
Value: "v1",
TTL: 5,
ModifiedIndex: 32,
CreatedIndex: 32,
Key: "/fooDir/k1",
Value: "v1",
TTL: 5,
},
}
// do not check expiration time, too hard to fake
for i, _ := range result.Node.Nodes {
result.Node.Nodes[i].Expiration = nil
}
cleanResult(result)
if !reflect.DeepEqual(result.Node.Nodes, expected) {
t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected)
@@ -79,16 +92,7 @@ func TestGetAll(t *testing.T) {
// Return kv-pairs in sorted order
result, err = c.Get("fooDir", true, true)
// do not check expiration time, too hard to fake
result.Node.Expiration = nil
for i, _ := range result.Node.Nodes {
result.Node.Nodes[i].Expiration = nil
if result.Node.Nodes[i].Nodes != nil {
for j, _ := range result.Node.Nodes[i].Nodes {
result.Node.Nodes[i].Nodes[j].Expiration = nil
}
}
}
cleanResult(result)
if err != nil {
t.Fatal(err)
@@ -100,33 +104,27 @@ func TestGetAll(t *testing.T) {
Dir: true,
Nodes: Nodes{
Node{
Key: "/fooDir/childDir/k2",
Value: "v2",
TTL: 5,
ModifiedIndex: 34,
CreatedIndex: 34,
Key: "/fooDir/childDir/k2",
Value: "v2",
TTL: 5,
},
},
TTL: 5,
ModifiedIndex: 33,
CreatedIndex: 33,
TTL: 5,
},
Node{
Key: "/fooDir/k0",
Value: "v0",
TTL: 5,
ModifiedIndex: 31,
CreatedIndex: 31,
Key: "/fooDir/k0",
Value: "v0",
TTL: 5,
},
Node{
Key: "/fooDir/k1",
Value: "v1",
TTL: 5,
ModifiedIndex: 32,
CreatedIndex: 32,
Key: "/fooDir/k1",
Value: "v1",
TTL: 5,
},
}
cleanResult(result)
if !reflect.DeepEqual(result.Node.Nodes, expected) {
t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected)
}

View File

@@ -36,6 +36,8 @@ var (
VALID_DELETE_OPTIONS = validOptions{
"recursive": reflect.Bool,
"dir": reflect.Bool,
"prevValue": reflect.String,
"prevIndex": reflect.Uint64,
}
)

View File

@@ -13,9 +13,6 @@ import (
// get issues a GET request
func (c *Client) get(key string, options options) (*RawResponse, error) {
logger.Debugf("get %s [%s]", key, c.cluster.Leader)
p := keyToPath(key)
// If consistency level is set to STRONG, append
// the `consistent` query string.
if c.config.Consistency == STRONG_CONSISTENCY {
@@ -26,9 +23,8 @@ func (c *Client) get(key string, options options) (*RawResponse, error) {
if err != nil {
return nil, err
}
p += str
resp, err := c.sendRequest("GET", p, nil)
resp, err := c.sendKeyRequest("GET", key, str, nil)
if err != nil {
return nil, err
@@ -41,16 +37,12 @@ func (c *Client) get(key string, options options) (*RawResponse, error) {
func (c *Client) put(key string, value string, ttl uint64,
options options) (*RawResponse, error) {
logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
p := keyToPath(key)
str, err := options.toParameters(VALID_PUT_OPTIONS)
if err != nil {
return nil, err
}
p += str
resp, err := c.sendRequest("PUT", p, buildValues(value, ttl))
resp, err := c.sendKeyRequest("PUT", key, str, buildValues(value, ttl))
if err != nil {
return nil, err
@@ -61,10 +53,7 @@ func (c *Client) put(key string, value string, ttl uint64,
// post issues a POST request
func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) {
logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
p := keyToPath(key)
resp, err := c.sendRequest("POST", p, buildValues(value, ttl))
resp, err := c.sendKeyRequest("POST", key, "", buildValues(value, ttl))
if err != nil {
return nil, err
@@ -75,16 +64,12 @@ func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error
// delete issues a DELETE request
func (c *Client) delete(key string, options options) (*RawResponse, error) {
logger.Debugf("delete %s [%s]", key, c.cluster.Leader)
p := keyToPath(key)
str, err := options.toParameters(VALID_DELETE_OPTIONS)
if err != nil {
return nil, err
}
p += str
resp, err := c.sendRequest("DELETE", p, nil)
resp, err := c.sendKeyRequest("DELETE", key, str, nil)
if err != nil {
return nil, err
@@ -93,8 +78,8 @@ func (c *Client) delete(key string, options options) (*RawResponse, error) {
return resp, nil
}
// sendRequest sends a HTTP request and returns a Response as defined by etcd
func (c *Client) sendRequest(method string, relativePath string,
// sendKeyRequest sends a HTTP request and returns a Response as defined by etcd
func (c *Client) sendKeyRequest(method string, key string, params string,
values url.Values) (*RawResponse, error) {
var req *http.Request
@@ -105,6 +90,11 @@ func (c *Client) sendRequest(method string, relativePath string,
trial := 0
logger.Debugf("%s %s %s [%s]", method, key, params, c.cluster.Leader)
// Build the request path if no prefix exists
relativePath := path.Join(c.keyPrefix, key) + params
// if we connect to a follower, we will retry until we found a leader
for {
trial++
@@ -146,7 +136,8 @@ func (c *Client) sendRequest(method string, relativePath string,
// network error, change a machine!
if resp, err = c.httpClient.Do(req); err != nil {
c.switchLeader(trial % len(c.cluster.Machines))
logger.Debug("network error: ", err.Error())
c.cluster.switchLeader(trial % len(c.cluster.Machines))
time.Sleep(time.Millisecond * 200)
continue
}
@@ -195,7 +186,7 @@ func (c *Client) handleResp(resp *http.Response) (bool, []byte) {
if err != nil {
logger.Warning(err)
} else {
c.updateLeader(u)
c.cluster.updateLeaderFromURL(u)
}
return false, nil
@@ -219,18 +210,14 @@ func (c *Client) handleResp(resp *http.Response) (bool, []byte) {
func (c *Client) getHttpPath(random bool, s ...string) string {
var machine string
if random {
machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))]
} else {
machine = c.cluster.Leader
}
fullPath := machine + "/" + version
for _, seg := range s {
fullPath = fullPath + "/" + seg
}
return fullPath
return machine + "/" + strings.Join(s, "/")
}
// buildValues builds a url.Values map according to the given value and ttl
@@ -249,17 +236,14 @@ func buildValues(value string, ttl uint64) url.Values {
}
// convert key string to http path exclude version
// for example: key[foo] -> path[keys/foo]
// key[/] -> path[keys/]
// for example: key[foo] -> path[foo]
// key[] -> path[/]
func keyToPath(key string) string {
p := path.Join("keys", key)
clean := path.Clean(key)
// corner case: if key is "/" or "//" ect
// path join will clear the tailing "/"
// we need to add it back
if p == "keys" {
p = "keys/"
if clean == "" || clean == "." {
return "/"
}
return p
return clean
}

View File

@@ -0,0 +1,50 @@
package etcd
import (
"path"
"testing"
)
func testKey(t *testing.T, in, exp string) {
if keyToPath(in) != exp {
t.Errorf("Expected %s got %s", exp, keyToPath(in))
}
}
// TestKeyToPath ensures the key cleaning funciton keyToPath works in a number
// of cases.
func TestKeyToPath(t *testing.T) {
testKey(t, "", "/")
testKey(t, "/", "/")
testKey(t, "///", "/")
testKey(t, "hello/world/", "hello/world")
testKey(t, "///hello////world/../", "/hello")
}
func testPath(t *testing.T, c *Client, in, exp string) {
out := c.getHttpPath(false, in)
if out != exp {
t.Errorf("Expected %s got %s", exp, out)
}
}
// TestHttpPath ensures that the URLs generated make sense for the given keys
func TestHttpPath(t *testing.T) {
c := NewClient(nil)
testPath(t, c,
path.Join(c.keyPrefix, "hello") + "?prevInit=true",
"http://127.0.0.1:4001/v2/keys/hello?prevInit=true")
testPath(t, c,
path.Join(c.keyPrefix, "///hello///world") + "?prevInit=true",
"http://127.0.0.1:4001/v2/keys/hello/world?prevInit=true")
c = NewClient([]string{"https://discovery.etcd.io"})
c.SetKeyPrefix("")
testPath(t, c,
path.Join(c.keyPrefix, "hello") + "?prevInit=true",
"https://discovery.etcd.io/hello?prevInit=true")
}

View File

@@ -1,33 +0,0 @@
// Utility functions
package etcd
import (
"fmt"
"net/url"
"reflect"
)
// Convert options to a string of HTML parameters
func optionsToString(options options, vops validOptions) (string, error) {
p := "?"
v := url.Values{}
for opKey, opVal := range options {
// Check if the given option is valid (that it exists)
kind := vops[opKey]
if kind == reflect.Invalid {
return "", fmt.Errorf("Invalid option: %v", opKey)
}
// Check if the given option is of the valid type
t := reflect.TypeOf(opVal)
if kind != t.Kind() {
return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.",
opKey, kind, t.Kind())
}
v.Set(opKey, fmt.Sprintf("%v", opVal))
}
p += v.Encode()
return p, nil
}