diff --git a/config/config.go b/config/config.go index 633877394..58a97e1c1 100644 --- a/config/config.go +++ b/config/config.go @@ -64,6 +64,7 @@ type Config struct { MaxClusterSize int `toml:"max_cluster_size" env:"ETCD_MAX_CLUSTER_SIZE"` MaxResultBuffer int `toml:"max_result_buffer" env:"ETCD_MAX_RESULT_BUFFER"` MaxRetryAttempts int `toml:"max_retry_attempts" env:"ETCD_MAX_RETRY_ATTEMPTS"` + RetryInterval float64 `toml:"retry_interval" env:"ETCD_RETRY_INTERVAL"` Name string `toml:"name" env:"ETCD_NAME"` Snapshot bool `toml:"snapshot" env:"ETCD_SNAPSHOT"` SnapshotCount int `toml:"snapshot_count" env:"ETCD_SNAPSHOTCOUNT"` @@ -93,6 +94,7 @@ func New() *Config { c.MaxClusterSize = 9 c.MaxResultBuffer = 1024 c.MaxRetryAttempts = 3 + c.RetryInterval = 10.0 c.Snapshot = true c.SnapshotCount = 10000 c.Peer.Addr = "127.0.0.1:7001" @@ -282,6 +284,7 @@ func (c *Config) LoadFlags(arguments []string) error { f.StringVar(&c.DataDir, "data-dir", c.DataDir, "") f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "") f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "") + f.Float64Var(&c.RetryInterval, "retry-interval", c.RetryInterval, "") f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "") f.IntVar(&c.Peer.HeartbeatTimeout, "peer-heartbeat-timeout", c.Peer.HeartbeatTimeout, "") f.IntVar(&c.Peer.ElectionTimeout, "peer-election-timeout", c.Peer.ElectionTimeout, "") @@ -393,24 +396,16 @@ func (c *Config) Reset() error { // Sanitize cleans the input fields. func (c *Config) Sanitize() error { - tlsConfig, err := c.TLSConfig() - if err != nil { - return err - } - - peerTlsConfig, err := c.PeerTLSConfig() - if err != nil { - return err - } + var err error // Sanitize the URLs first. - if c.Addr, err = sanitizeURL(c.Addr, tlsConfig.Scheme); err != nil { + if c.Addr, err = sanitizeURL(c.Addr, c.EtcdTLSInfo().Scheme()); err != nil { return fmt.Errorf("Advertised URL: %s", err) } if c.BindAddr, err = sanitizeBindAddr(c.BindAddr, c.Addr); err != nil { return fmt.Errorf("Listen Host: %s", err) } - if c.Peer.Addr, err = sanitizeURL(c.Peer.Addr, peerTlsConfig.Scheme); err != nil { + if c.Peer.Addr, err = sanitizeURL(c.Peer.Addr, c.PeerTLSInfo().Scheme()); err != nil { return fmt.Errorf("Peer Advertised URL: %s", err) } if c.Peer.BindAddr, err = sanitizeBindAddr(c.Peer.BindAddr, c.Peer.Addr); err != nil { @@ -430,34 +425,24 @@ func (c *Config) Sanitize() error { return nil } -// TLSInfo retrieves a TLSInfo object for the client server. -func (c *Config) TLSInfo() server.TLSInfo { +// EtcdTLSInfo retrieves a TLSInfo object for the etcd server +func (c *Config) EtcdTLSInfo() server.TLSInfo { return server.TLSInfo{ - CAFile: c.CAFile, - CertFile: c.CertFile, - KeyFile: c.KeyFile, + CAFile: c.CAFile, + CertFile: c.CertFile, + KeyFile: c.KeyFile, } } -// ClientTLSConfig generates the TLS configuration for the client server. -func (c *Config) TLSConfig() (server.TLSConfig, error) { - return c.TLSInfo().Config() -} - -// PeerTLSInfo retrieves a TLSInfo object for the peer server. +// PeerRaftInfo retrieves a TLSInfo object for the peer server. func (c *Config) PeerTLSInfo() server.TLSInfo { return server.TLSInfo{ - CAFile: c.Peer.CAFile, - CertFile: c.Peer.CertFile, - KeyFile: c.Peer.KeyFile, + CAFile: c.Peer.CAFile, + CertFile: c.Peer.CertFile, + KeyFile: c.Peer.KeyFile, } } -// PeerTLSConfig generates the TLS configuration for the peer server. -func (c *Config) PeerTLSConfig() (server.TLSConfig, error) { - return c.PeerTLSInfo().Config() -} - // MetricsBucketName generates the name that should be used for a // corresponding MetricsBucket object func (c *Config) MetricsBucketName() string { diff --git a/config/config_test.go b/config/config_test.go index 96408bab0..d006e4d48 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -113,6 +113,9 @@ func TestConfigEnv(t *testing.T) { assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "") assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "") + + // Clear this as it will mess up other tests + os.Setenv("ETCD_DISCOVERY", "") } // Ensures that the "help" flag can be parsed. diff --git a/etcd.go b/etcd.go index c63f71fb6..c66df13ef 100644 --- a/etcd.go +++ b/etcd.go @@ -79,16 +79,6 @@ func main() { log.Warnf("All cached configuration is now ignored. The file %s can be removed.", info) } - // Retrieve TLS configuration. - tlsConfig, err := config.TLSInfo().Config() - if err != nil { - log.Fatal("Client TLS:", err) - } - peerTLSConfig, err := config.PeerTLSInfo().Config() - if err != nil { - log.Fatal("Peer TLS:", err) - } - var mbName string if config.Trace() { mbName = config.MetricsBucketName() @@ -124,31 +114,44 @@ func main() { dialTimeout := (3 * heartbeatTimeout) + electionTimeout responseHeaderTimeout := (3 * heartbeatTimeout) + electionTimeout - // Create peer server. + // Create peer server psConfig := server.PeerServerConfig{ Name: config.Name, - Scheme: peerTLSConfig.Scheme, + Scheme: config.PeerTLSInfo().Scheme(), URL: config.Peer.Addr, SnapshotCount: config.SnapshotCount, MaxClusterSize: config.MaxClusterSize, RetryTimes: config.MaxRetryAttempts, + RetryInterval: config.RetryInterval, } ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats) var psListener net.Listener if psConfig.Scheme == "https" { - psListener, err = server.NewTLSListener(&tlsConfig.Server, config.Peer.BindAddr, config.PeerTLSInfo().CertFile, config.PeerTLSInfo().KeyFile) + peerServerTLSConfig, err := config.PeerTLSInfo().ServerConfig() + if err != nil { + log.Fatal("peer server TLS error: ", err) + } + + psListener, err = server.NewTLSListener(config.Peer.BindAddr, peerServerTLSConfig) + if err != nil { + log.Fatal("Failed to create peer listener: ", err) + } } else { psListener, err = server.NewListener(config.Peer.BindAddr) - } - if err != nil { - panic(err) + if err != nil { + log.Fatal("Failed to create peer listener: ", err) + } } - // Create Raft transporter and server + // Create raft transporter and server raftTransporter := server.NewTransporter(followersStats, serverStats, registry, heartbeatTimeout, dialTimeout, responseHeaderTimeout) if psConfig.Scheme == "https" { - raftTransporter.SetTLSConfig(peerTLSConfig.Client) + raftClientTLSConfig, err := config.PeerTLSInfo().ClientConfig() + if err != nil { + log.Fatal("raft client TLS error: ", err) + } + raftTransporter.SetTLSConfig(*raftClientTLSConfig) } raftServer, err := raft.NewServer(config.Name, config.DataDir, raftTransporter, store, ps, "") if err != nil { @@ -158,7 +161,7 @@ func main() { raftServer.SetHeartbeatTimeout(heartbeatTimeout) ps.SetRaftServer(raftServer) - // Create client server. + // Create etcd server s := server.New(config.Name, config.Addr, ps, registry, store, &mb) if config.Trace() { @@ -166,22 +169,28 @@ func main() { } var sListener net.Listener - if tlsConfig.Scheme == "https" { - sListener, err = server.NewTLSListener(&tlsConfig.Server, config.BindAddr, config.TLSInfo().CertFile, config.TLSInfo().KeyFile) + if config.EtcdTLSInfo().Scheme() == "https" { + etcdServerTLSConfig, err := config.EtcdTLSInfo().ServerConfig() + if err != nil { + log.Fatal("etcd TLS error: ", err) + } + + sListener, err = server.NewTLSListener(config.BindAddr, etcdServerTLSConfig) + if err != nil { + log.Fatal("Failed to create TLS etcd listener: ", err) + } } else { sListener, err = server.NewListener(config.BindAddr) - } - if err != nil { - panic(err) + if err != nil { + log.Fatal("Failed to create etcd listener: ", err) + } } ps.SetServer(s) - ps.Start(config.Snapshot, config.Peers) - // Run peer server in separate thread while the client server blocks. go func() { - log.Infof("raft server [name %s, listen on %s, advertised url %s]", ps.Config.Name, psListener.Addr(), ps.Config.URL) + log.Infof("peer server [name %s, listen on %s, advertised url %s]", ps.Config.Name, psListener.Addr(), ps.Config.URL) sHTTP := &ehttp.CORSHandler{ps.HTTPHandler(), corsInfo} log.Fatal(http.Serve(psListener, sHTTP)) }() diff --git a/scripts/test-cluster b/scripts/test-cluster index 29510a6bc..7239a2149 100755 --- a/scripts/test-cluster +++ b/scripts/test-cluster @@ -17,7 +17,7 @@ tmux split-window -h tmux select-pane -t 0 tmux send-keys "${DIR}/../bin/etcd -peer-addr 127.0.0.1:7001 -addr 127.0.0.1:4001 -data-dir peer1 -name peer1 ${peer_args}" C-m -if [ -n "${peer_args}" ]; then +if [ -z "${peer_args}" ]; then peer_args="-peers 127.0.0.1:7001" fi diff --git a/server/listener.go b/server/listener.go index f007f0cb3..93527d66c 100644 --- a/server/listener.go +++ b/server/listener.go @@ -16,28 +16,15 @@ func NewListener(addr string) (net.Listener, error) { return l, nil } -func NewTLSListener(config *tls.Config, addr, certFile, keyFile string) (net.Listener, error) { +func NewTLSListener(addr string, cfg *tls.Config) (net.Listener, error) { if addr == "" { addr = ":https" } - if config == nil { - config = &tls.Config{} - } - - config.NextProtos = []string{"http/1.1"} - - var err error - config.Certificates = make([]tls.Certificate, 1) - config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, err - } - conn, err := net.Listen("tcp", addr) if err != nil { return nil, err } - return tls.NewListener(conn, config), nil + return tls.NewListener(conn, cfg), nil } diff --git a/server/peer_server.go b/server/peer_server.go index 38b0e1f02..1f400b611 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -20,17 +20,16 @@ import ( "github.com/coreos/etcd/store" ) -const retryInterval = 10 - const ThresholdMonitorTimeout = 5 * time.Second type PeerServerConfig struct { - Name string - Scheme string - URL string - SnapshotCount int - MaxClusterSize int - RetryTimes int + Name string + Scheme string + URL string + SnapshotCount int + MaxClusterSize int + RetryTimes int + RetryInterval float64 } type PeerServer struct { @@ -159,6 +158,7 @@ func (s *PeerServer) Stop() { close(s.closeChan) s.closeChan = nil } + s.raftServer.Stop() } func (s *PeerServer) HTTPHandler() http.Handler { @@ -209,8 +209,8 @@ func (s *PeerServer) startAsFollower(cluster []string) { if ok { return } - log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %d seconds", cluster, retryInterval) - time.Sleep(time.Second * retryInterval) + log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %.1f seconds", cluster, s.Config.RetryInterval) + time.Sleep(time.Second * time.Duration(s.Config.RetryInterval)) } log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes) diff --git a/server/tls_config.go b/server/tls_config.go deleted file mode 100644 index 733f9c082..000000000 --- a/server/tls_config.go +++ /dev/null @@ -1,12 +0,0 @@ -package server - -import ( - "crypto/tls" -) - -// TLSConfig holds the TLS configuration. -type TLSConfig struct { - Scheme string // http or https - Server tls.Config // Used by the Raft or etcd Server transporter. - Client tls.Config // Used by the Raft peer client. -} diff --git a/server/tls_info.go b/server/tls_info.go index 6b16db013..bc2d1099b 100644 --- a/server/tls_info.go +++ b/server/tls_info.go @@ -15,62 +15,88 @@ type TLSInfo struct { CAFile string `json:"CAFile"` } -// Generates a TLS configuration from the given files. -func (info TLSInfo) Config() (TLSConfig, error) { - var t TLSConfig - t.Scheme = "http" - - // If the user do not specify key file, cert file and CA file, the type will be HTTP - if info.KeyFile == "" && info.CertFile == "" && info.CAFile == "" { - return t, nil +func (info TLSInfo) Scheme() string { + if info.KeyFile != "" && info.CertFile != "" { + return "https" + } else { + return "http" } +} +// Generates a tls.Config object for a server from the given files. +func (info TLSInfo) ServerConfig() (*tls.Config, error) { // Both the key and cert must be present. if info.KeyFile == "" || info.CertFile == "" { - return t, fmt.Errorf("KeyFile and CertFile must both be present[key: %v, cert: %v]", info.KeyFile, info.CertFile) + return nil, fmt.Errorf("KeyFile and CertFile must both be present[key: %v, cert: %v]", info.KeyFile, info.CertFile) + } + + var cfg tls.Config + + tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile) + if err != nil { + return nil, err + } + + cfg.Certificates = []tls.Certificate{tlsCert} + + if info.CAFile != "" { + cfg.ClientAuth = tls.RequireAndVerifyClientCert + cp, err := newCertPool(info.CAFile) + if err != nil { + return nil, err + } + + cfg.RootCAs = cp + cfg.ClientCAs = cp + } else { + cfg.ClientAuth = tls.NoClientCert + } + + return &cfg, nil +} + +// Generates a tls.Config object for a client from the given files. +func (info TLSInfo) ClientConfig() (*tls.Config, error) { + var cfg tls.Config + + if info.KeyFile == "" || info.CertFile == "" { + return &cfg, nil } tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile) if err != nil { - return t, err + return nil, err } - t.Scheme = "https" - t.Server.ClientAuth, t.Server.ClientCAs, err = newCertPool(info.CAFile) - if err != nil { - return t, err + cfg.Certificates = []tls.Certificate{tlsCert} + + if info.CAFile != "" { + cp, err := newCertPool(info.CAFile) + if err != nil { + return nil, err + } + + cfg.RootCAs = cp } - // The client should trust the RootCA that the Server uses since - // everyone is a peer in the network. - t.Client.Certificates = []tls.Certificate{tlsCert} - t.Client.RootCAs = t.Server.ClientCAs - - return t, nil + return &cfg, nil } -// newCertPool creates x509 certPool and corresponding Auth Type. -// If the given CAfile is valid, add the cert into the pool and verify the clients' -// certs against the cert in the pool. -// If the given CAfile is empty, do not verify the clients' cert. -// If the given CAfile is not valid, fatal. -func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool, error) { - if CAFile == "" { - return tls.NoClientCert, nil, nil - } +// newCertPool creates x509 certPool with provided CA file +func newCertPool(CAFile string) (*x509.CertPool, error) { pemByte, err := ioutil.ReadFile(CAFile) if err != nil { - return 0, nil, err + return nil, err } block, pemByte := pem.Decode(pemByte) cert, err := x509.ParseCertificate(block.Bytes) if err != nil { - return 0, nil, err + return nil, err } certPool := x509.NewCertPool() certPool.AddCert(cert) - return tls.RequireAndVerifyClientCert, certPool, nil + return certPool, nil } diff --git a/server/usage.go b/server/usage.go index 4e47512c5..bf55dcc67 100644 --- a/server/usage.go +++ b/server/usage.go @@ -52,6 +52,7 @@ Peer Communication Options: Other Options: -max-result-buffer Max size of the result buffer. -max-retry-attempts Number of times a node will try to join a cluster. + -retry-interval Seconds to wait between cluster join retry attempts. -max-cluster-size Maximum number of nodes in the cluster. -snapshot=false Disable log snapshots -snapshot-count Number of transactions before issuing a snapshot. diff --git a/test.sh b/test.sh index 6f5cee203..0dbc2cd38 100755 --- a/test.sh +++ b/test.sh @@ -8,6 +8,9 @@ go test -v ./store go test -i ./server go test -v ./server +go test -i ./config +go test -v ./config + go test -i ./server/v2/tests go test -v ./server/v2/tests diff --git a/tests/functional/discovery_test.go b/tests/functional/discovery_test.go index 9ffa60b2b..2bff288dd 100644 --- a/tests/functional/discovery_test.go +++ b/tests/functional/discovery_test.go @@ -24,7 +24,6 @@ type garbageHandler struct { func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "Hello, client") - println("HI") if r.URL.String() != "/v2/keys/_etcd/registry/1/node1" { g.t.Fatalf("Unexpected web request") } @@ -144,7 +143,7 @@ func TestDiscoverySecondPeerFirstNoResponse(t *testing.T) { resp, err = etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/ETCDTEST"), v) assert.Equal(t, resp.StatusCode, http.StatusCreated) - proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/2"}) + proc, err := startServer([]string{"-retry-interval", "0.2", "-discovery", s.URL() + "/v2/keys/_etcd/registry/2"}) if err != nil { t.Fatal(err.Error()) } @@ -152,7 +151,7 @@ func TestDiscoverySecondPeerFirstNoResponse(t *testing.T) { // TODO(bp): etcd will take 30 seconds to shutdown, figure this // out instead - time.Sleep(35 * time.Second) + time.Sleep(1 * time.Second) client := http.Client{} _, err = client.Get("/") @@ -177,17 +176,12 @@ func TestDiscoverySecondPeerUp(t *testing.T) { } wc := goetcd.NewClient([]string{s.URL()}) - _, err = wc.Set("test", "0", 0) + testResp, err := wc.Set("test", "0", 0) if err != nil { t.Fatalf("Couldn't set a test key on the leader %v", err) } - receiver := make(chan *goetcd.Response) - stop := make(chan bool) - - go wc.Watch("_etcd/registry/3/node1", 0, false, receiver, stop) - v = url.Values{} v.Set("value", u) resp, err = etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/3/ETCDTEST"), v) @@ -199,10 +193,10 @@ func TestDiscoverySecondPeerUp(t *testing.T) { } defer stopServer(proc) - // Test to ensure the machine registered iteslf - watchResp := <-receiver - if watchResp.Node.Value != "http://127.0.0.1:7001" { - t.Fatalf("Second peer didn't register! %s", watchResp.Node.Value) + watch := fmt.Sprintf("%s%s%d", s.URL(), "/v2/keys/_etcd/registry/3/node1?wait=true&waitIndex=", testResp.EtcdIndex) + resp, err = http.Get(watch) + if err != nil { + t.Fatal(err.Error()) } // TODO(bp): need to have a better way of knowing a machine is up diff --git a/tests/functional/etcd_tls_test.go b/tests/functional/etcd_tls_test.go index 493dd7bd7..2f24cc74a 100644 --- a/tests/functional/etcd_tls_test.go +++ b/tests/functional/etcd_tls_test.go @@ -162,6 +162,8 @@ func startServer(extra []string) (*os.Process, error) { cmd := []string{"etcd", "-f", "-data-dir=/tmp/node1", "-name=node1"} cmd = append(cmd, extra...) + println(strings.Join(cmd, " ")) + return os.StartProcess(EtcdBinPath, cmd, procAttr) } diff --git a/tests/functional/v1_migration_test.go b/tests/functional/v1_migration_test.go index 487af436a..b698fb103 100644 --- a/tests/functional/v1_migration_test.go +++ b/tests/functional/v1_migration_test.go @@ -37,6 +37,7 @@ func TestV1SoloMigration(t *testing.T) { args := []string{"etcd", fmt.Sprintf("-data-dir=%s", nodepath)} args = append(args, "-addr", "127.0.0.1:4001") args = append(args, "-peer-addr", "127.0.0.1:7001") + args = append(args, "-name", "v1") process, err := os.StartProcess(EtcdBinPath, args, procAttr) if err != nil { t.Fatal("start process failed:" + err.Error()) @@ -79,6 +80,7 @@ func TestV1ClusterMigration(t *testing.T) { args := []string{"etcd", fmt.Sprintf("-data-dir=%s", nodepath)} args = append(args, "-addr", fmt.Sprintf("127.0.0.1:%d", 4001+i)) args = append(args, "-peer-addr", fmt.Sprintf("127.0.0.1:%d", 7001+i)) + args = append(args, "-name", node) process, err := os.StartProcess(EtcdBinPath, args, procAttr) if err != nil { t.Fatal("start process failed:" + err.Error()) diff --git a/tests/server_utils.go b/tests/server_utils.go index 8c1f360f7..63dc9a3f0 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "net/http" "os" + "sync" "time" "github.com/coreos/etcd/third_party/github.com/coreos/raft" @@ -69,19 +70,23 @@ func RunServer(f func(*server.Server)) { ps.SetServer(s) + w := &sync.WaitGroup{} + // Start up peer server. c := make(chan bool) go func() { c <- true ps.Start(false, []string{}) - http.Serve(psListener, ps.HTTPHandler()) + h := waitHandler{w, ps.HTTPHandler()} + http.Serve(psListener, &h) }() <-c // Start up etcd server. go func() { c <- true - http.Serve(sListener, s.HTTPHandler()) + h := waitHandler{w, s.HTTPHandler()} + http.Serve(sListener, &h) }() <-c @@ -95,4 +100,20 @@ func RunServer(f func(*server.Server)) { ps.Stop() psListener.Close() sListener.Close() + w.Wait() +} + +type waitHandler struct { + wg *sync.WaitGroup + handler http.Handler +} + +func (h *waitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request){ + h.wg.Add(1) + defer h.wg.Done() + h.handler.ServeHTTP(w, r) + + //important to flush before decrementing the wait group. + //we won't get a chance to once main() ends. + w.(http.Flusher).Flush() }