mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
@@ -64,6 +64,7 @@ type Config struct {
|
||||
MaxClusterSize int `toml:"max_cluster_size" env:"ETCD_MAX_CLUSTER_SIZE"`
|
||||
MaxResultBuffer int `toml:"max_result_buffer" env:"ETCD_MAX_RESULT_BUFFER"`
|
||||
MaxRetryAttempts int `toml:"max_retry_attempts" env:"ETCD_MAX_RETRY_ATTEMPTS"`
|
||||
RetryInterval float64 `toml:"retry_interval" env:"ETCD_RETRY_INTERVAL"`
|
||||
Name string `toml:"name" env:"ETCD_NAME"`
|
||||
Snapshot bool `toml:"snapshot" env:"ETCD_SNAPSHOT"`
|
||||
SnapshotCount int `toml:"snapshot_count" env:"ETCD_SNAPSHOTCOUNT"`
|
||||
@@ -93,6 +94,7 @@ func New() *Config {
|
||||
c.MaxClusterSize = 9
|
||||
c.MaxResultBuffer = 1024
|
||||
c.MaxRetryAttempts = 3
|
||||
c.RetryInterval = 10.0
|
||||
c.Snapshot = true
|
||||
c.SnapshotCount = 10000
|
||||
c.Peer.Addr = "127.0.0.1:7001"
|
||||
@@ -282,6 +284,7 @@ func (c *Config) LoadFlags(arguments []string) error {
|
||||
f.StringVar(&c.DataDir, "data-dir", c.DataDir, "")
|
||||
f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "")
|
||||
f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "")
|
||||
f.Float64Var(&c.RetryInterval, "retry-interval", c.RetryInterval, "")
|
||||
f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "")
|
||||
f.IntVar(&c.Peer.HeartbeatTimeout, "peer-heartbeat-timeout", c.Peer.HeartbeatTimeout, "")
|
||||
f.IntVar(&c.Peer.ElectionTimeout, "peer-election-timeout", c.Peer.ElectionTimeout, "")
|
||||
@@ -393,24 +396,16 @@ func (c *Config) Reset() error {
|
||||
|
||||
// Sanitize cleans the input fields.
|
||||
func (c *Config) Sanitize() error {
|
||||
tlsConfig, err := c.TLSConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
peerTlsConfig, err := c.PeerTLSConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var err error
|
||||
|
||||
// Sanitize the URLs first.
|
||||
if c.Addr, err = sanitizeURL(c.Addr, tlsConfig.Scheme); err != nil {
|
||||
if c.Addr, err = sanitizeURL(c.Addr, c.EtcdTLSInfo().Scheme()); err != nil {
|
||||
return fmt.Errorf("Advertised URL: %s", err)
|
||||
}
|
||||
if c.BindAddr, err = sanitizeBindAddr(c.BindAddr, c.Addr); err != nil {
|
||||
return fmt.Errorf("Listen Host: %s", err)
|
||||
}
|
||||
if c.Peer.Addr, err = sanitizeURL(c.Peer.Addr, peerTlsConfig.Scheme); err != nil {
|
||||
if c.Peer.Addr, err = sanitizeURL(c.Peer.Addr, c.PeerTLSInfo().Scheme()); err != nil {
|
||||
return fmt.Errorf("Peer Advertised URL: %s", err)
|
||||
}
|
||||
if c.Peer.BindAddr, err = sanitizeBindAddr(c.Peer.BindAddr, c.Peer.Addr); err != nil {
|
||||
@@ -430,34 +425,24 @@ func (c *Config) Sanitize() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TLSInfo retrieves a TLSInfo object for the client server.
|
||||
func (c *Config) TLSInfo() server.TLSInfo {
|
||||
// EtcdTLSInfo retrieves a TLSInfo object for the etcd server
|
||||
func (c *Config) EtcdTLSInfo() server.TLSInfo {
|
||||
return server.TLSInfo{
|
||||
CAFile: c.CAFile,
|
||||
CertFile: c.CertFile,
|
||||
KeyFile: c.KeyFile,
|
||||
CAFile: c.CAFile,
|
||||
CertFile: c.CertFile,
|
||||
KeyFile: c.KeyFile,
|
||||
}
|
||||
}
|
||||
|
||||
// ClientTLSConfig generates the TLS configuration for the client server.
|
||||
func (c *Config) TLSConfig() (server.TLSConfig, error) {
|
||||
return c.TLSInfo().Config()
|
||||
}
|
||||
|
||||
// PeerTLSInfo retrieves a TLSInfo object for the peer server.
|
||||
// PeerRaftInfo retrieves a TLSInfo object for the peer server.
|
||||
func (c *Config) PeerTLSInfo() server.TLSInfo {
|
||||
return server.TLSInfo{
|
||||
CAFile: c.Peer.CAFile,
|
||||
CertFile: c.Peer.CertFile,
|
||||
KeyFile: c.Peer.KeyFile,
|
||||
CAFile: c.Peer.CAFile,
|
||||
CertFile: c.Peer.CertFile,
|
||||
KeyFile: c.Peer.KeyFile,
|
||||
}
|
||||
}
|
||||
|
||||
// PeerTLSConfig generates the TLS configuration for the peer server.
|
||||
func (c *Config) PeerTLSConfig() (server.TLSConfig, error) {
|
||||
return c.PeerTLSInfo().Config()
|
||||
}
|
||||
|
||||
// MetricsBucketName generates the name that should be used for a
|
||||
// corresponding MetricsBucket object
|
||||
func (c *Config) MetricsBucketName() string {
|
||||
|
||||
@@ -113,6 +113,9 @@ func TestConfigEnv(t *testing.T) {
|
||||
assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "")
|
||||
assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "")
|
||||
assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "")
|
||||
|
||||
// Clear this as it will mess up other tests
|
||||
os.Setenv("ETCD_DISCOVERY", "")
|
||||
}
|
||||
|
||||
// Ensures that the "help" flag can be parsed.
|
||||
|
||||
63
etcd.go
63
etcd.go
@@ -79,16 +79,6 @@ func main() {
|
||||
log.Warnf("All cached configuration is now ignored. The file %s can be removed.", info)
|
||||
}
|
||||
|
||||
// Retrieve TLS configuration.
|
||||
tlsConfig, err := config.TLSInfo().Config()
|
||||
if err != nil {
|
||||
log.Fatal("Client TLS:", err)
|
||||
}
|
||||
peerTLSConfig, err := config.PeerTLSInfo().Config()
|
||||
if err != nil {
|
||||
log.Fatal("Peer TLS:", err)
|
||||
}
|
||||
|
||||
var mbName string
|
||||
if config.Trace() {
|
||||
mbName = config.MetricsBucketName()
|
||||
@@ -124,31 +114,44 @@ func main() {
|
||||
dialTimeout := (3 * heartbeatTimeout) + electionTimeout
|
||||
responseHeaderTimeout := (3 * heartbeatTimeout) + electionTimeout
|
||||
|
||||
// Create peer server.
|
||||
// Create peer server
|
||||
psConfig := server.PeerServerConfig{
|
||||
Name: config.Name,
|
||||
Scheme: peerTLSConfig.Scheme,
|
||||
Scheme: config.PeerTLSInfo().Scheme(),
|
||||
URL: config.Peer.Addr,
|
||||
SnapshotCount: config.SnapshotCount,
|
||||
MaxClusterSize: config.MaxClusterSize,
|
||||
RetryTimes: config.MaxRetryAttempts,
|
||||
RetryInterval: config.RetryInterval,
|
||||
}
|
||||
ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats)
|
||||
|
||||
var psListener net.Listener
|
||||
if psConfig.Scheme == "https" {
|
||||
psListener, err = server.NewTLSListener(&tlsConfig.Server, config.Peer.BindAddr, config.PeerTLSInfo().CertFile, config.PeerTLSInfo().KeyFile)
|
||||
peerServerTLSConfig, err := config.PeerTLSInfo().ServerConfig()
|
||||
if err != nil {
|
||||
log.Fatal("peer server TLS error: ", err)
|
||||
}
|
||||
|
||||
psListener, err = server.NewTLSListener(config.Peer.BindAddr, peerServerTLSConfig)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to create peer listener: ", err)
|
||||
}
|
||||
} else {
|
||||
psListener, err = server.NewListener(config.Peer.BindAddr)
|
||||
}
|
||||
if err != nil {
|
||||
panic(err)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to create peer listener: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create Raft transporter and server
|
||||
// Create raft transporter and server
|
||||
raftTransporter := server.NewTransporter(followersStats, serverStats, registry, heartbeatTimeout, dialTimeout, responseHeaderTimeout)
|
||||
if psConfig.Scheme == "https" {
|
||||
raftTransporter.SetTLSConfig(peerTLSConfig.Client)
|
||||
raftClientTLSConfig, err := config.PeerTLSInfo().ClientConfig()
|
||||
if err != nil {
|
||||
log.Fatal("raft client TLS error: ", err)
|
||||
}
|
||||
raftTransporter.SetTLSConfig(*raftClientTLSConfig)
|
||||
}
|
||||
raftServer, err := raft.NewServer(config.Name, config.DataDir, raftTransporter, store, ps, "")
|
||||
if err != nil {
|
||||
@@ -158,7 +161,7 @@ func main() {
|
||||
raftServer.SetHeartbeatTimeout(heartbeatTimeout)
|
||||
ps.SetRaftServer(raftServer)
|
||||
|
||||
// Create client server.
|
||||
// Create etcd server
|
||||
s := server.New(config.Name, config.Addr, ps, registry, store, &mb)
|
||||
|
||||
if config.Trace() {
|
||||
@@ -166,22 +169,28 @@ func main() {
|
||||
}
|
||||
|
||||
var sListener net.Listener
|
||||
if tlsConfig.Scheme == "https" {
|
||||
sListener, err = server.NewTLSListener(&tlsConfig.Server, config.BindAddr, config.TLSInfo().CertFile, config.TLSInfo().KeyFile)
|
||||
if config.EtcdTLSInfo().Scheme() == "https" {
|
||||
etcdServerTLSConfig, err := config.EtcdTLSInfo().ServerConfig()
|
||||
if err != nil {
|
||||
log.Fatal("etcd TLS error: ", err)
|
||||
}
|
||||
|
||||
sListener, err = server.NewTLSListener(config.BindAddr, etcdServerTLSConfig)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to create TLS etcd listener: ", err)
|
||||
}
|
||||
} else {
|
||||
sListener, err = server.NewListener(config.BindAddr)
|
||||
}
|
||||
if err != nil {
|
||||
panic(err)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to create etcd listener: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
ps.SetServer(s)
|
||||
|
||||
ps.Start(config.Snapshot, config.Peers)
|
||||
|
||||
// Run peer server in separate thread while the client server blocks.
|
||||
go func() {
|
||||
log.Infof("raft server [name %s, listen on %s, advertised url %s]", ps.Config.Name, psListener.Addr(), ps.Config.URL)
|
||||
log.Infof("peer server [name %s, listen on %s, advertised url %s]", ps.Config.Name, psListener.Addr(), ps.Config.URL)
|
||||
sHTTP := &ehttp.CORSHandler{ps.HTTPHandler(), corsInfo}
|
||||
log.Fatal(http.Serve(psListener, sHTTP))
|
||||
}()
|
||||
|
||||
@@ -17,7 +17,7 @@ tmux split-window -h
|
||||
tmux select-pane -t 0
|
||||
tmux send-keys "${DIR}/../bin/etcd -peer-addr 127.0.0.1:7001 -addr 127.0.0.1:4001 -data-dir peer1 -name peer1 ${peer_args}" C-m
|
||||
|
||||
if [ -n "${peer_args}" ]; then
|
||||
if [ -z "${peer_args}" ]; then
|
||||
peer_args="-peers 127.0.0.1:7001"
|
||||
fi
|
||||
|
||||
|
||||
@@ -16,28 +16,15 @@ func NewListener(addr string) (net.Listener, error) {
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func NewTLSListener(config *tls.Config, addr, certFile, keyFile string) (net.Listener, error) {
|
||||
func NewTLSListener(addr string, cfg *tls.Config) (net.Listener, error) {
|
||||
if addr == "" {
|
||||
addr = ":https"
|
||||
}
|
||||
|
||||
if config == nil {
|
||||
config = &tls.Config{}
|
||||
}
|
||||
|
||||
config.NextProtos = []string{"http/1.1"}
|
||||
|
||||
var err error
|
||||
config.Certificates = make([]tls.Certificate, 1)
|
||||
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tls.NewListener(conn, config), nil
|
||||
return tls.NewListener(conn, cfg), nil
|
||||
}
|
||||
|
||||
@@ -20,17 +20,16 @@ import (
|
||||
"github.com/coreos/etcd/store"
|
||||
)
|
||||
|
||||
const retryInterval = 10
|
||||
|
||||
const ThresholdMonitorTimeout = 5 * time.Second
|
||||
|
||||
type PeerServerConfig struct {
|
||||
Name string
|
||||
Scheme string
|
||||
URL string
|
||||
SnapshotCount int
|
||||
MaxClusterSize int
|
||||
RetryTimes int
|
||||
Name string
|
||||
Scheme string
|
||||
URL string
|
||||
SnapshotCount int
|
||||
MaxClusterSize int
|
||||
RetryTimes int
|
||||
RetryInterval float64
|
||||
}
|
||||
|
||||
type PeerServer struct {
|
||||
@@ -159,6 +158,7 @@ func (s *PeerServer) Stop() {
|
||||
close(s.closeChan)
|
||||
s.closeChan = nil
|
||||
}
|
||||
s.raftServer.Stop()
|
||||
}
|
||||
|
||||
func (s *PeerServer) HTTPHandler() http.Handler {
|
||||
@@ -209,8 +209,8 @@ func (s *PeerServer) startAsFollower(cluster []string) {
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %d seconds", cluster, retryInterval)
|
||||
time.Sleep(time.Second * retryInterval)
|
||||
log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %.1f seconds", cluster, s.Config.RetryInterval)
|
||||
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
|
||||
}
|
||||
|
||||
log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
)
|
||||
|
||||
// TLSConfig holds the TLS configuration.
|
||||
type TLSConfig struct {
|
||||
Scheme string // http or https
|
||||
Server tls.Config // Used by the Raft or etcd Server transporter.
|
||||
Client tls.Config // Used by the Raft peer client.
|
||||
}
|
||||
@@ -15,62 +15,88 @@ type TLSInfo struct {
|
||||
CAFile string `json:"CAFile"`
|
||||
}
|
||||
|
||||
// Generates a TLS configuration from the given files.
|
||||
func (info TLSInfo) Config() (TLSConfig, error) {
|
||||
var t TLSConfig
|
||||
t.Scheme = "http"
|
||||
|
||||
// If the user do not specify key file, cert file and CA file, the type will be HTTP
|
||||
if info.KeyFile == "" && info.CertFile == "" && info.CAFile == "" {
|
||||
return t, nil
|
||||
func (info TLSInfo) Scheme() string {
|
||||
if info.KeyFile != "" && info.CertFile != "" {
|
||||
return "https"
|
||||
} else {
|
||||
return "http"
|
||||
}
|
||||
}
|
||||
|
||||
// Generates a tls.Config object for a server from the given files.
|
||||
func (info TLSInfo) ServerConfig() (*tls.Config, error) {
|
||||
// Both the key and cert must be present.
|
||||
if info.KeyFile == "" || info.CertFile == "" {
|
||||
return t, fmt.Errorf("KeyFile and CertFile must both be present[key: %v, cert: %v]", info.KeyFile, info.CertFile)
|
||||
return nil, fmt.Errorf("KeyFile and CertFile must both be present[key: %v, cert: %v]", info.KeyFile, info.CertFile)
|
||||
}
|
||||
|
||||
var cfg tls.Config
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg.Certificates = []tls.Certificate{tlsCert}
|
||||
|
||||
if info.CAFile != "" {
|
||||
cfg.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
cp, err := newCertPool(info.CAFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg.RootCAs = cp
|
||||
cfg.ClientCAs = cp
|
||||
} else {
|
||||
cfg.ClientAuth = tls.NoClientCert
|
||||
}
|
||||
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
// Generates a tls.Config object for a client from the given files.
|
||||
func (info TLSInfo) ClientConfig() (*tls.Config, error) {
|
||||
var cfg tls.Config
|
||||
|
||||
if info.KeyFile == "" || info.CertFile == "" {
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile)
|
||||
if err != nil {
|
||||
return t, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.Scheme = "https"
|
||||
t.Server.ClientAuth, t.Server.ClientCAs, err = newCertPool(info.CAFile)
|
||||
if err != nil {
|
||||
return t, err
|
||||
cfg.Certificates = []tls.Certificate{tlsCert}
|
||||
|
||||
if info.CAFile != "" {
|
||||
cp, err := newCertPool(info.CAFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg.RootCAs = cp
|
||||
}
|
||||
|
||||
// The client should trust the RootCA that the Server uses since
|
||||
// everyone is a peer in the network.
|
||||
t.Client.Certificates = []tls.Certificate{tlsCert}
|
||||
t.Client.RootCAs = t.Server.ClientCAs
|
||||
|
||||
return t, nil
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
// newCertPool creates x509 certPool and corresponding Auth Type.
|
||||
// If the given CAfile is valid, add the cert into the pool and verify the clients'
|
||||
// certs against the cert in the pool.
|
||||
// If the given CAfile is empty, do not verify the clients' cert.
|
||||
// If the given CAfile is not valid, fatal.
|
||||
func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool, error) {
|
||||
if CAFile == "" {
|
||||
return tls.NoClientCert, nil, nil
|
||||
}
|
||||
// newCertPool creates x509 certPool with provided CA file
|
||||
func newCertPool(CAFile string) (*x509.CertPool, error) {
|
||||
pemByte, err := ioutil.ReadFile(CAFile)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
block, pemByte := pem.Decode(pemByte)
|
||||
cert, err := x509.ParseCertificate(block.Bytes)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
certPool := x509.NewCertPool()
|
||||
certPool.AddCert(cert)
|
||||
|
||||
return tls.RequireAndVerifyClientCert, certPool, nil
|
||||
return certPool, nil
|
||||
}
|
||||
|
||||
@@ -52,6 +52,7 @@ Peer Communication Options:
|
||||
Other Options:
|
||||
-max-result-buffer Max size of the result buffer.
|
||||
-max-retry-attempts Number of times a node will try to join a cluster.
|
||||
-retry-interval Seconds to wait between cluster join retry attempts.
|
||||
-max-cluster-size Maximum number of nodes in the cluster.
|
||||
-snapshot=false Disable log snapshots
|
||||
-snapshot-count Number of transactions before issuing a snapshot.
|
||||
|
||||
3
test.sh
3
test.sh
@@ -8,6 +8,9 @@ go test -v ./store
|
||||
go test -i ./server
|
||||
go test -v ./server
|
||||
|
||||
go test -i ./config
|
||||
go test -v ./config
|
||||
|
||||
go test -i ./server/v2/tests
|
||||
go test -v ./server/v2/tests
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ type garbageHandler struct {
|
||||
|
||||
func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintln(w, "Hello, client")
|
||||
println("HI")
|
||||
if r.URL.String() != "/v2/keys/_etcd/registry/1/node1" {
|
||||
g.t.Fatalf("Unexpected web request")
|
||||
}
|
||||
@@ -144,7 +143,7 @@ func TestDiscoverySecondPeerFirstNoResponse(t *testing.T) {
|
||||
resp, err = etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/2/ETCDTEST"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
|
||||
proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/2"})
|
||||
proc, err := startServer([]string{"-retry-interval", "0.2", "-discovery", s.URL() + "/v2/keys/_etcd/registry/2"})
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
@@ -152,7 +151,7 @@ func TestDiscoverySecondPeerFirstNoResponse(t *testing.T) {
|
||||
|
||||
// TODO(bp): etcd will take 30 seconds to shutdown, figure this
|
||||
// out instead
|
||||
time.Sleep(35 * time.Second)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
client := http.Client{}
|
||||
_, err = client.Get("/")
|
||||
@@ -177,17 +176,12 @@ func TestDiscoverySecondPeerUp(t *testing.T) {
|
||||
}
|
||||
|
||||
wc := goetcd.NewClient([]string{s.URL()})
|
||||
_, err = wc.Set("test", "0", 0)
|
||||
testResp, err := wc.Set("test", "0", 0)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't set a test key on the leader %v", err)
|
||||
}
|
||||
|
||||
receiver := make(chan *goetcd.Response)
|
||||
stop := make(chan bool)
|
||||
|
||||
go wc.Watch("_etcd/registry/3/node1", 0, false, receiver, stop)
|
||||
|
||||
v = url.Values{}
|
||||
v.Set("value", u)
|
||||
resp, err = etcdtest.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/_etcd/registry/3/ETCDTEST"), v)
|
||||
@@ -199,10 +193,10 @@ func TestDiscoverySecondPeerUp(t *testing.T) {
|
||||
}
|
||||
defer stopServer(proc)
|
||||
|
||||
// Test to ensure the machine registered iteslf
|
||||
watchResp := <-receiver
|
||||
if watchResp.Node.Value != "http://127.0.0.1:7001" {
|
||||
t.Fatalf("Second peer didn't register! %s", watchResp.Node.Value)
|
||||
watch := fmt.Sprintf("%s%s%d", s.URL(), "/v2/keys/_etcd/registry/3/node1?wait=true&waitIndex=", testResp.EtcdIndex)
|
||||
resp, err = http.Get(watch)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
// TODO(bp): need to have a better way of knowing a machine is up
|
||||
|
||||
@@ -162,6 +162,8 @@ func startServer(extra []string) (*os.Process, error) {
|
||||
cmd := []string{"etcd", "-f", "-data-dir=/tmp/node1", "-name=node1"}
|
||||
cmd = append(cmd, extra...)
|
||||
|
||||
println(strings.Join(cmd, " "))
|
||||
|
||||
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
||||
}
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ func TestV1SoloMigration(t *testing.T) {
|
||||
args := []string{"etcd", fmt.Sprintf("-data-dir=%s", nodepath)}
|
||||
args = append(args, "-addr", "127.0.0.1:4001")
|
||||
args = append(args, "-peer-addr", "127.0.0.1:7001")
|
||||
args = append(args, "-name", "v1")
|
||||
process, err := os.StartProcess(EtcdBinPath, args, procAttr)
|
||||
if err != nil {
|
||||
t.Fatal("start process failed:" + err.Error())
|
||||
@@ -79,6 +80,7 @@ func TestV1ClusterMigration(t *testing.T) {
|
||||
args := []string{"etcd", fmt.Sprintf("-data-dir=%s", nodepath)}
|
||||
args = append(args, "-addr", fmt.Sprintf("127.0.0.1:%d", 4001+i))
|
||||
args = append(args, "-peer-addr", fmt.Sprintf("127.0.0.1:%d", 7001+i))
|
||||
args = append(args, "-name", node)
|
||||
process, err := os.StartProcess(EtcdBinPath, args, procAttr)
|
||||
if err != nil {
|
||||
t.Fatal("start process failed:" + err.Error())
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
|
||||
@@ -69,19 +70,23 @@ func RunServer(f func(*server.Server)) {
|
||||
|
||||
ps.SetServer(s)
|
||||
|
||||
w := &sync.WaitGroup{}
|
||||
|
||||
// Start up peer server.
|
||||
c := make(chan bool)
|
||||
go func() {
|
||||
c <- true
|
||||
ps.Start(false, []string{})
|
||||
http.Serve(psListener, ps.HTTPHandler())
|
||||
h := waitHandler{w, ps.HTTPHandler()}
|
||||
http.Serve(psListener, &h)
|
||||
}()
|
||||
<-c
|
||||
|
||||
// Start up etcd server.
|
||||
go func() {
|
||||
c <- true
|
||||
http.Serve(sListener, s.HTTPHandler())
|
||||
h := waitHandler{w, s.HTTPHandler()}
|
||||
http.Serve(sListener, &h)
|
||||
}()
|
||||
<-c
|
||||
|
||||
@@ -95,4 +100,20 @@ func RunServer(f func(*server.Server)) {
|
||||
ps.Stop()
|
||||
psListener.Close()
|
||||
sListener.Close()
|
||||
w.Wait()
|
||||
}
|
||||
|
||||
type waitHandler struct {
|
||||
wg *sync.WaitGroup
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
func (h *waitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request){
|
||||
h.wg.Add(1)
|
||||
defer h.wg.Done()
|
||||
h.handler.ServeHTTP(w, r)
|
||||
|
||||
//important to flush before decrementing the wait group.
|
||||
//we won't get a chance to once main() ends.
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user