diff --git a/discovery/discovery.go b/discovery/discovery.go index 94d35b9f3..f5dc0b8d1 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -67,6 +67,8 @@ type discovery struct { clock clockwork.Clock } +type proxyDiscovery struct{ *discovery } + // proxyFuncFromEnv builds a proxy function if the appropriate environment // variable is set. It performs basic sanitization of the environment variable // and returns any error encountered. @@ -97,6 +99,18 @@ func proxyFuncFromEnv() (func(*http.Request) (*url.URL, error), error) { } func New(durl string, id types.ID, config string) (Discoverer, error) { + return newDiscovery(durl, id, config) +} + +func ProxyNew(durl string) (Discoverer, error) { + d, err := newDiscovery(durl, 0, "") + if err != nil { + return nil, err + } + return &proxyDiscovery{d}, nil +} + +func newDiscovery(durl string, id types.ID, config string) (*discovery, error) { u, err := url.Parse(durl) if err != nil { return nil, err @@ -150,6 +164,22 @@ func (d *discovery) Discover() (string, error) { return nodesToCluster(all), nil } +func (pd *proxyDiscovery) Discover() (string, error) { + nodes, size, err := pd.checkCluster() + if err != nil { + if err == ErrFullCluster { + return nodesToCluster(nodes), nil + } + return "", err + } + + all, err := pd.waitNodes(nodes, size) + if err != nil { + return "", err + } + return nodesToCluster(all), nil +} + func (d *discovery) createSelf() error { ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) resp, err := d.c.Create(ctx, d.selfKey(), d.config, -1) @@ -210,7 +240,7 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) { break } if i >= size-1 { - return nil, size, ErrFullCluster + return nodes[:size], size, ErrFullCluster } } return nodes, size, nil diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index fa4939b67..9400fee67 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -25,6 +25,7 @@ import ( "os" "strings" + "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdhttp" "github.com/coreos/etcd/pkg/cors" @@ -46,6 +47,7 @@ var ( name = fs.String("name", "default", "Unique human-readable name for this node") dir = fs.String("data-dir", "", "Path to the data directory") durl = fs.String("discovery", "", "Discovery service used to bootstrap the cluster") + dfallback = new(flags.Fallback) snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") printVersion = fs.Bool("version", false, "Print the version and exit") @@ -95,6 +97,11 @@ func init() { // Should never happen. log.Panicf("unexpected error setting up proxyFlag: %v", err) } + fs.Var(dfallback, "discovery-fallback", fmt.Sprintf("Valid values include %s", strings.Join(flags.FallbackValues, ", "))) + if err := dfallback.Set(flags.FallbackProxy); err != nil { + // Should never happen. + log.Panicf("unexpected error setting up discovery-fallback flag: %v", err) + } fs.StringVar(&clientTLSInfo.CAFile, "ca-file", "", "Path to the client server TLS CA file.") fs.StringVar(&clientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.") @@ -137,74 +144,97 @@ func Main() { flags.SetFlagsFromEnv(fs) if string(*proxyFlag) == flags.ProxyValueOff { - startEtcd() - } else { - startProxy() + if err := startEtcd(); err == nil { + // Block indefinitely + <-make(chan struct{}) + } else { + if err == discovery.ErrFullCluster && *dfallback == flags.FallbackProxy { + fmt.Printf("etcd: dicovery cluster full, falling back to %s", flags.FallbackProxy) + } else { + log.Fatalf("etcd: %v", err) + } + } + } + if err = startProxy(); err != nil { + log.Fatalf("proxy: %v", err) } - // Block indefinitely <-make(chan struct{}) } // startEtcd launches the etcd server and HTTP handlers for client/server communication. -func startEtcd() { +func startEtcd() error { cls, err := setupCluster() if err != nil { - log.Fatalf("etcd: error setting up initial cluster: %v", err) + fmt.Errorf("error setting up initial cluster: %v", err) } if *dir == "" { *dir = fmt.Sprintf("%v.etcd", *name) - log.Printf("etcd: no data-dir provided, using default data-dir ./%s", *dir) + fmt.Errorf("no data-dir provided, using default data-dir ./%s", *dir) } if err := os.MkdirAll(*dir, privateDirMode); err != nil { - log.Fatalf("etcd: cannot create data directory: %v", err) + fmt.Errorf("cannot create data directory: %v", err) } if err := fileutil.IsDirWriteable(*dir); err != nil { - log.Fatalf("etcd: cannot write to data directory: %v", err) + fmt.Errorf("cannot write to data directory: %v", err) } pt, err := transport.NewTransport(peerTLSInfo) if err != nil { - log.Fatal(err) + return err } acurls, err := flags.URLsFromFlags(fs, "advertise-client-urls", "addr", clientTLSInfo) if err != nil { - log.Fatal(err.Error()) + return err } lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo) if err != nil { - log.Fatal(err.Error()) + return err } plns := make([]net.Listener, 0) for _, u := range lpurls { - l, err := transport.NewListener(u.Host, peerTLSInfo) + var l net.Listener + l, err = transport.NewListener(u.Host, peerTLSInfo) if err != nil { - log.Fatal(err) + return err } urlStr := u.String() log.Print("etcd: listening for peers on ", urlStr) + defer func() { + if err != nil { + l.Close() + log.Print("etcd: stopping listening for peers on ", urlStr) + } + }() plns = append(plns, l) } lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo) if err != nil { - log.Fatal(err.Error()) + return err } clns := make([]net.Listener, 0) for _, u := range lcurls { - l, err := transport.NewListener(u.Host, clientTLSInfo) + var l net.Listener + l, err = transport.NewListener(u.Host, clientTLSInfo) if err != nil { - log.Fatal(err) + return err } urlStr := u.String() log.Print("etcd: listening for client requests on ", urlStr) + defer func() { + if err != nil { + l.Close() + log.Print("etcd: stopping listening for client requests on ", urlStr) + } + }() clns = append(clns, l) } @@ -218,7 +248,11 @@ func startEtcd() { ClusterState: *clusterState, Transport: pt, } - s := etcdserver.NewServer(cfg) + var s *etcdserver.EtcdServer + s, err = etcdserver.NewServer(cfg) + if err != nil { + return err + } s.Start() ch := &cors.CORSHandler{ @@ -238,18 +272,33 @@ func startEtcd() { log.Fatal(http.Serve(l, ch)) }(l) } + return nil } // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. -func startProxy() { +func startProxy() error { cls, err := setupCluster() if err != nil { - log.Fatalf("etcd: error setting up initial cluster: %v", err) + return fmt.Errorf("error setting up initial cluster: %v", err) + } + + if *durl != "" { + d, err := discovery.ProxyNew(*durl) + if err != nil { + return fmt.Errorf("cannot init discovery %v", err) + } + s, err := d.Discover() + if err != nil { + return err + } + if cls, err = etcdserver.NewClusterFromString(*durl, s); err != nil { + return err + } } pt, err := transport.NewTransport(clientTLSInfo) if err != nil { - log.Fatal(err) + return err } // TODO(jonboulle): update peerURLs dynamically (i.e. when updating @@ -258,7 +307,7 @@ func startProxy() { uf := func() []string { cls, err := etcdserver.GetClusterFromPeers(peerURLs) if err != nil { - log.Printf("etcd: %v", err) + log.Printf("proxy: %v", err) return []string{} } return cls.ClientURLs() @@ -272,24 +321,24 @@ func startProxy() { if string(*proxyFlag) == flags.ProxyValueReadonly { ph = proxy.NewReadonlyHandler(ph) } - lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo) if err != nil { - log.Fatal(err.Error()) + return err } // Start a proxy server goroutine for each listen address for _, u := range lcurls { l, err := transport.NewListener(u.Host, clientTLSInfo) if err != nil { - log.Fatal(err) + return err } host := u.Host go func() { - log.Print("etcd: proxy listening for client requests on ", host) + log.Print("proxy: listening for client requests on ", host) log.Fatal(http.Serve(l, ph)) }() } + return nil } // setupCluster sets up the cluster definition for bootstrap or discovery. diff --git a/etcdserver/server.go b/etcdserver/server.go index eb3cf2a26..f526a2f5a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -175,9 +175,9 @@ type EtcdServer struct { // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. -func NewServer(cfg *ServerConfig) *EtcdServer { +func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { - log.Fatalf("etcdserver: cannot create snapshot directory: %v", err) + return nil, fmt.Errorf("cannot create snapshot directory: %v", err) } ss := snap.New(cfg.SnapDir()) st := store.New() @@ -192,27 +192,27 @@ func NewServer(cfg *ServerConfig) *EtcdServer { log.Fatal(err) } if err := cfg.Cluster.ValidateAndAssignIDs(cl.Members()); err != nil { - log.Fatalf("etcdserver: error validating IDs from cluster %s: %v", cl, err) + return nil, fmt.Errorf("error validating IDs from cluster %s: %v", cl, err) } cfg.Cluster.SetID(cl.id) cfg.Cluster.SetStore(st) id, n, w = startNode(cfg, nil) case !haveWAL && cfg.ClusterState == ClusterStateValueNew: if err := cfg.VerifyBootstrapConfig(); err != nil { - log.Fatalf("etcdserver: %v", err) + return nil, err } m := cfg.Cluster.MemberByName(cfg.Name) if cfg.ShouldDiscover() { d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String()) if err != nil { - log.Fatalf("etcdserver: cannot init discovery %v", err) + return nil, fmt.Errorf("cannot init discovery %v", err) } s, err := d.Discover() if err != nil { - log.Fatalf("etcdserver: %v", err) + return nil, err } if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, s); err != nil { - log.Fatalf("etcdserver: %v", err) + return nil, err } } cfg.Cluster.SetStore(st) @@ -225,7 +225,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer { var index uint64 snapshot, err := ss.Load() if err != nil && err != snap.ErrNoSnapshot { - log.Fatal(err) + return nil, err } if snapshot != nil { log.Printf("etcdserver: recovering from snapshot at index %d", snapshot.Index) @@ -235,7 +235,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer { cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st) id, n, w = restartNode(cfg, index, snapshot) default: - log.Fatalf("etcdserver: unsupported bootstrap config") + return nil, fmt.Errorf("unsupported bootstrap config") } sstats := &stats.ServerStats{ @@ -261,7 +261,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer { SyncTicker: time.Tick(500 * time.Millisecond), snapCount: cfg.SnapCount, } - return s + return s, nil } // Start prepares and starts server in a new goroutine. It is no longer safe to diff --git a/hack/insta-discovery/Procfile b/hack/insta-discovery/Procfile index 32ff0686c..e4d2bf02a 100644 --- a/hack/insta-discovery/Procfile +++ b/hack/insta-discovery/Procfile @@ -1,4 +1,6 @@ # Use goreman to run `go get github.com/mattn/goreman` +# One of the four etcd members falls back to a proxy etcd1: ../../bin/etcd -name infra1 -listen-client-urls http://127.0.0.1:4001 -advertise-client-urls http://127.0.0.1:4001 -listen-peer-urls http://127.0.0.1:7001 -initial-advertise-peer-urls http://127.0.0.1:7001 etcd2: ../../bin/etcd -name infra2 -listen-client-urls http://127.0.0.1:4002 -advertise-client-urls http://127.0.0.1:4002 -listen-peer-urls http://127.0.0.1:7002 -initial-advertise-peer-urls http://127.0.0.1:7002 etcd3: ../../bin/etcd -name infra3 -listen-client-urls http://127.0.0.1:4003 -advertise-client-urls http://127.0.0.1:4003 -listen-peer-urls http://127.0.0.1:7003 -initial-advertise-peer-urls http://127.0.0.1:7003 +etcd4: ../../bin/etcd -name infra4 -listen-client-urls http://127.0.0.1:4004 -advertise-client-urls http://127.0.0.1:4004 -listen-peer-urls http://127.0.0.1:7004 -initial-advertise-peer-urls http://127.0.0.1:7004 diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 70650b7cb..6cb78d8d0 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -176,7 +176,10 @@ type member struct { // Launch starts a member based on ServerConfig, PeerListeners // and ClientListeners. func (m *member) Launch(t *testing.T) { - m.s = etcdserver.NewServer(&m.ServerConfig) + var err error + if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { + t.Fatalf("failed to initialize the etcd server: %v", err) + } m.s.Ticker = time.Tick(tickDuration) m.s.SyncTicker = time.Tick(tickDuration) m.s.Start() diff --git a/pkg/flags/fallback.go b/pkg/flags/fallback.go new file mode 100644 index 000000000..197c8c651 --- /dev/null +++ b/pkg/flags/fallback.go @@ -0,0 +1,53 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package flags + +import ( + "errors" +) + +const ( + FallbackExit = "exit" + FallbackProxy = "proxy" +) + +var ( + FallbackValues = []string{ + FallbackExit, + FallbackProxy, + } +) + +// FallbackFlag implements the flag.Value interface. +type Fallback string + +// Set verifies the argument to be a valid member of FallbackFlagValues +// before setting the underlying flag value. +func (fb *Fallback) Set(s string) error { + for _, v := range FallbackValues { + if s == v { + *fb = Fallback(s) + return nil + } + } + + return errors.New("invalid value") +} + +func (fb *Fallback) String() string { + return string(*fb) +}