*: support discovery fallback

This commit is contained in:
Xiang Li 2014-11-04 13:09:24 -08:00
parent ac49e1d50f
commit 5cb13fd071
6 changed files with 173 additions and 38 deletions

View File

@ -56,6 +56,10 @@ type Discoverer interface {
Discover() (string, error) Discover() (string, error)
} }
type ProxyDiscovery interface {
ProxyDiscover() (string, error)
}
type discovery struct { type discovery struct {
cluster string cluster string
id types.ID id types.ID
@ -97,6 +101,14 @@ func proxyFuncFromEnv() (func(*http.Request) (*url.URL, error), error) {
} }
func New(durl string, id types.ID, config string) (Discoverer, error) { func New(durl string, id types.ID, config string) (Discoverer, error) {
return new(durl, id, config)
}
func ProxyNew(durl string) (ProxyDiscovery, error) {
return new(durl, 0, "")
}
func new(durl string, id types.ID, config string) (*discovery, error) {
u, err := url.Parse(durl) u, err := url.Parse(durl)
if err != nil { if err != nil {
return nil, err return nil, err
@ -150,6 +162,22 @@ func (d *discovery) Discover() (string, error) {
return nodesToCluster(all), nil return nodesToCluster(all), nil
} }
func (d *discovery) ProxyDiscover() (string, error) {
nodes, size, err := d.checkCluster()
if err != nil {
if err == ErrFullCluster {
return nodesToCluster(nodes), nil
}
return "", err
}
all, err := d.waitNodes(nodes, size)
if err != nil {
return "", err
}
return nodesToCluster(all), nil
}
func (d *discovery) createSelf() error { func (d *discovery) createSelf() error {
ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
resp, err := d.c.Create(ctx, d.selfKey(), d.config, -1) resp, err := d.c.Create(ctx, d.selfKey(), d.config, -1)
@ -210,7 +238,7 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
break break
} }
if i >= size-1 { if i >= size-1 {
return nil, size, ErrFullCluster return nodes[:size], size, ErrFullCluster
} }
} }
return nodes, size, nil return nodes, size, nil

View File

@ -25,6 +25,7 @@ import (
"os" "os"
"strings" "strings"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdhttp" "github.com/coreos/etcd/etcdserver/etcdhttp"
"github.com/coreos/etcd/pkg/cors" "github.com/coreos/etcd/pkg/cors"
@ -46,6 +47,7 @@ var (
name = fs.String("name", "default", "Unique human-readable name for this node") name = fs.String("name", "default", "Unique human-readable name for this node")
dir = fs.String("data-dir", "", "Path to the data directory") dir = fs.String("data-dir", "", "Path to the data directory")
durl = fs.String("discovery", "", "Discovery service used to bootstrap the cluster") durl = fs.String("discovery", "", "Discovery service used to bootstrap the cluster")
dfallback = new(flags.Fallback)
snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
printVersion = fs.Bool("version", false, "Print the version and exit") printVersion = fs.Bool("version", false, "Print the version and exit")
@ -95,6 +97,11 @@ func init() {
// Should never happen. // Should never happen.
log.Panicf("unexpected error setting up proxyFlag: %v", err) log.Panicf("unexpected error setting up proxyFlag: %v", err)
} }
fs.Var(dfallback, "discovery-fallback", fmt.Sprintf("Valid values include %s", strings.Join(flags.FallbackValues, ", ")))
if err := dfallback.Set(flags.FallbackProxy); err != nil {
// Should never happen.
log.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
}
fs.StringVar(&clientTLSInfo.CAFile, "ca-file", "", "Path to the client server TLS CA file.") fs.StringVar(&clientTLSInfo.CAFile, "ca-file", "", "Path to the client server TLS CA file.")
fs.StringVar(&clientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.") fs.StringVar(&clientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.")
@ -137,74 +144,97 @@ func Main() {
flags.SetFlagsFromEnv(fs) flags.SetFlagsFromEnv(fs)
if string(*proxyFlag) == flags.ProxyValueOff { if string(*proxyFlag) == flags.ProxyValueOff {
startEtcd() if err := startEtcd(); err == nil {
} else { // Block indefinitely
startProxy() <-make(chan struct{})
} else {
if err == discovery.ErrFullCluster && *dfallback == flags.FallbackProxy {
fmt.Printf("etcd: dicovery cluster is full, falls back to %s", flags.FallbackProxy)
} else {
log.Fatalf("etcd: %v", err)
}
}
}
if err = startProxy(); err != nil {
log.Fatalf("proxy: %v", err)
} }
// Block indefinitely // Block indefinitely
<-make(chan struct{}) <-make(chan struct{})
} }
// startEtcd launches the etcd server and HTTP handlers for client/server communication. // startEtcd launches the etcd server and HTTP handlers for client/server communication.
func startEtcd() { func startEtcd() error {
cls, err := setupCluster() cls, err := setupCluster()
if err != nil { if err != nil {
log.Fatalf("etcd: error setting up initial cluster: %v", err) fmt.Errorf("error setting up initial cluster: %v", err)
} }
if *dir == "" { if *dir == "" {
*dir = fmt.Sprintf("%v.etcd", *name) *dir = fmt.Sprintf("%v.etcd", *name)
log.Printf("etcd: no data-dir provided, using default data-dir ./%s", *dir) fmt.Errorf("no data-dir provided, using default data-dir ./%s", *dir)
} }
if err := os.MkdirAll(*dir, privateDirMode); err != nil { if err := os.MkdirAll(*dir, privateDirMode); err != nil {
log.Fatalf("etcd: cannot create data directory: %v", err) fmt.Errorf("cannot create data directory: %v", err)
} }
if err := fileutil.IsDirWriteable(*dir); err != nil { if err := fileutil.IsDirWriteable(*dir); err != nil {
log.Fatalf("etcd: cannot write to data directory: %v", err) fmt.Errorf("cannot write to data directory: %v", err)
} }
pt, err := transport.NewTransport(peerTLSInfo) pt, err := transport.NewTransport(peerTLSInfo)
if err != nil { if err != nil {
log.Fatal(err) return err
} }
acurls, err := flags.URLsFromFlags(fs, "advertise-client-urls", "addr", clientTLSInfo) acurls, err := flags.URLsFromFlags(fs, "advertise-client-urls", "addr", clientTLSInfo)
if err != nil { if err != nil {
log.Fatal(err.Error()) return err
} }
lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo) lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo)
if err != nil { if err != nil {
log.Fatal(err.Error()) return err
} }
plns := make([]net.Listener, 0) plns := make([]net.Listener, 0)
for _, u := range lpurls { for _, u := range lpurls {
l, err := transport.NewListener(u.Host, peerTLSInfo) var l net.Listener
l, err = transport.NewListener(u.Host, peerTLSInfo)
if err != nil { if err != nil {
log.Fatal(err) return err
} }
urlStr := u.String() urlStr := u.String()
log.Print("etcd: listening for peers on ", urlStr) log.Print("etcd: listening for peers on ", urlStr)
defer func() {
if err != nil {
l.Close()
log.Print("etcd: stopping listening for peers on ", urlStr)
}
}()
plns = append(plns, l) plns = append(plns, l)
} }
lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo) lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
if err != nil { if err != nil {
log.Fatal(err.Error()) return err
} }
clns := make([]net.Listener, 0) clns := make([]net.Listener, 0)
for _, u := range lcurls { for _, u := range lcurls {
l, err := transport.NewListener(u.Host, clientTLSInfo) var l net.Listener
l, err = transport.NewListener(u.Host, clientTLSInfo)
if err != nil { if err != nil {
log.Fatal(err) return err
} }
urlStr := u.String() urlStr := u.String()
log.Print("etcd: listening for client requests on ", urlStr) log.Print("etcd: listening for client requests on ", urlStr)
defer func() {
if err != nil {
l.Close()
log.Print("etcd: stopping listening for client requests on ", urlStr)
}
}()
clns = append(clns, l) clns = append(clns, l)
} }
@ -218,7 +248,11 @@ func startEtcd() {
ClusterState: *clusterState, ClusterState: *clusterState,
Transport: pt, Transport: pt,
} }
s := etcdserver.NewServer(cfg) var s *etcdserver.EtcdServer
s, err = etcdserver.NewServer(cfg)
if err != nil {
return err
}
s.Start() s.Start()
ch := &cors.CORSHandler{ ch := &cors.CORSHandler{
@ -238,18 +272,33 @@ func startEtcd() {
log.Fatal(http.Serve(l, ch)) log.Fatal(http.Serve(l, ch))
}(l) }(l)
} }
return nil
} }
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
func startProxy() { func startProxy() error {
cls, err := setupCluster() cls, err := setupCluster()
if err != nil { if err != nil {
log.Fatalf("etcd: error setting up initial cluster: %v", err) return fmt.Errorf("error setting up initial cluster: %v", err)
}
if *durl != "" {
d, err := discovery.ProxyNew(*durl)
if err != nil {
return fmt.Errorf("cannot init discovery %v", err)
}
s, err := d.ProxyDiscover()
if err != nil {
return err
}
if cls, err = etcdserver.NewClusterFromString(*durl, s); err != nil {
return err
}
} }
pt, err := transport.NewTransport(clientTLSInfo) pt, err := transport.NewTransport(clientTLSInfo)
if err != nil { if err != nil {
log.Fatal(err) return err
} }
// TODO(jonboulle): update peerURLs dynamically (i.e. when updating // TODO(jonboulle): update peerURLs dynamically (i.e. when updating
@ -258,7 +307,7 @@ func startProxy() {
uf := func() []string { uf := func() []string {
cls, err := etcdserver.GetClusterFromPeers(peerURLs) cls, err := etcdserver.GetClusterFromPeers(peerURLs)
if err != nil { if err != nil {
log.Printf("etcd: %v", err) log.Printf("proxy: %v", err)
return []string{} return []string{}
} }
return cls.ClientURLs() return cls.ClientURLs()
@ -272,24 +321,24 @@ func startProxy() {
if string(*proxyFlag) == flags.ProxyValueReadonly { if string(*proxyFlag) == flags.ProxyValueReadonly {
ph = proxy.NewReadonlyHandler(ph) ph = proxy.NewReadonlyHandler(ph)
} }
lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo) lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
if err != nil { if err != nil {
log.Fatal(err.Error()) return err
} }
// Start a proxy server goroutine for each listen address // Start a proxy server goroutine for each listen address
for _, u := range lcurls { for _, u := range lcurls {
l, err := transport.NewListener(u.Host, clientTLSInfo) l, err := transport.NewListener(u.Host, clientTLSInfo)
if err != nil { if err != nil {
log.Fatal(err) return err
} }
host := u.Host host := u.Host
go func() { go func() {
log.Print("etcd: proxy listening for client requests on ", host) log.Print("proxy: listening for client requests on ", host)
log.Fatal(http.Serve(l, ph)) log.Fatal(http.Serve(l, ph))
}() }()
} }
return nil
} }
// setupCluster sets up the cluster definition for bootstrap or discovery. // setupCluster sets up the cluster definition for bootstrap or discovery.

View File

@ -175,9 +175,9 @@ type EtcdServer struct {
// NewServer creates a new EtcdServer from the supplied configuration. The // NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer. // configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg *ServerConfig) *EtcdServer { func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
log.Fatalf("etcdserver: cannot create snapshot directory: %v", err) return nil, fmt.Errorf("cannot create snapshot directory: %v", err)
} }
ss := snap.New(cfg.SnapDir()) ss := snap.New(cfg.SnapDir())
st := store.New() st := store.New()
@ -192,27 +192,27 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
log.Fatal(err) log.Fatal(err)
} }
if err := cfg.Cluster.ValidateAndAssignIDs(cl.Members()); err != nil { if err := cfg.Cluster.ValidateAndAssignIDs(cl.Members()); err != nil {
log.Fatalf("etcdserver: error validating IDs from cluster %s: %v", cl, err) return nil, fmt.Errorf("error validating IDs from cluster %s: %v", cl, err)
} }
cfg.Cluster.SetID(cl.id) cfg.Cluster.SetID(cl.id)
cfg.Cluster.SetStore(st) cfg.Cluster.SetStore(st)
id, n, w = startNode(cfg, nil) id, n, w = startNode(cfg, nil)
case !haveWAL && cfg.ClusterState == ClusterStateValueNew: case !haveWAL && cfg.ClusterState == ClusterStateValueNew:
if err := cfg.VerifyBootstrapConfig(); err != nil { if err := cfg.VerifyBootstrapConfig(); err != nil {
log.Fatalf("etcdserver: %v", err) return nil, err
} }
m := cfg.Cluster.MemberByName(cfg.Name) m := cfg.Cluster.MemberByName(cfg.Name)
if cfg.ShouldDiscover() { if cfg.ShouldDiscover() {
d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String()) d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String())
if err != nil { if err != nil {
log.Fatalf("etcdserver: cannot init discovery %v", err) return nil, fmt.Errorf("cannot init discovery %v", err)
} }
s, err := d.Discover() s, err := d.Discover()
if err != nil { if err != nil {
log.Fatalf("etcdserver: %v", err) return nil, err
} }
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, s); err != nil { if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, s); err != nil {
log.Fatalf("etcdserver: %v", err) return nil, err
} }
} }
cfg.Cluster.SetStore(st) cfg.Cluster.SetStore(st)
@ -225,7 +225,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
var index uint64 var index uint64
snapshot, err := ss.Load() snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot { if err != nil && err != snap.ErrNoSnapshot {
log.Fatal(err) return nil, err
} }
if snapshot != nil { if snapshot != nil {
log.Printf("etcdserver: recovering from snapshot at index %d", snapshot.Index) log.Printf("etcdserver: recovering from snapshot at index %d", snapshot.Index)
@ -235,7 +235,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st) cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st)
id, n, w = restartNode(cfg, index, snapshot) id, n, w = restartNode(cfg, index, snapshot)
default: default:
log.Fatalf("etcdserver: unsupported bootstrap config") return nil, fmt.Errorf("unsupported bootstrap config")
} }
sstats := &stats.ServerStats{ sstats := &stats.ServerStats{
@ -261,7 +261,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
SyncTicker: time.Tick(500 * time.Millisecond), SyncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount, snapCount: cfg.SnapCount,
} }
return s return s, nil
} }
// Start prepares and starts server in a new goroutine. It is no longer safe to // Start prepares and starts server in a new goroutine. It is no longer safe to

View File

@ -1,4 +1,6 @@
# Use goreman to run `go get github.com/mattn/goreman` # Use goreman to run `go get github.com/mattn/goreman`
# One of the four etcd members falls back to a proxy
etcd1: ../../bin/etcd -name infra1 -listen-client-urls http://127.0.0.1:4001 -advertise-client-urls http://127.0.0.1:4001 -listen-peer-urls http://127.0.0.1:7001 -initial-advertise-peer-urls http://127.0.0.1:7001 etcd1: ../../bin/etcd -name infra1 -listen-client-urls http://127.0.0.1:4001 -advertise-client-urls http://127.0.0.1:4001 -listen-peer-urls http://127.0.0.1:7001 -initial-advertise-peer-urls http://127.0.0.1:7001
etcd2: ../../bin/etcd -name infra2 -listen-client-urls http://127.0.0.1:4002 -advertise-client-urls http://127.0.0.1:4002 -listen-peer-urls http://127.0.0.1:7002 -initial-advertise-peer-urls http://127.0.0.1:7002 etcd2: ../../bin/etcd -name infra2 -listen-client-urls http://127.0.0.1:4002 -advertise-client-urls http://127.0.0.1:4002 -listen-peer-urls http://127.0.0.1:7002 -initial-advertise-peer-urls http://127.0.0.1:7002
etcd3: ../../bin/etcd -name infra3 -listen-client-urls http://127.0.0.1:4003 -advertise-client-urls http://127.0.0.1:4003 -listen-peer-urls http://127.0.0.1:7003 -initial-advertise-peer-urls http://127.0.0.1:7003 etcd3: ../../bin/etcd -name infra3 -listen-client-urls http://127.0.0.1:4003 -advertise-client-urls http://127.0.0.1:4003 -listen-peer-urls http://127.0.0.1:7003 -initial-advertise-peer-urls http://127.0.0.1:7003
etcd4: ../../bin/etcd -name infra4 -listen-client-urls http://127.0.0.1:4004 -advertise-client-urls http://127.0.0.1:4004 -listen-peer-urls http://127.0.0.1:7004 -initial-advertise-peer-urls http://127.0.0.1:7004

View File

@ -176,7 +176,10 @@ type member struct {
// Launch starts a member based on ServerConfig, PeerListeners // Launch starts a member based on ServerConfig, PeerListeners
// and ClientListeners. // and ClientListeners.
func (m *member) Launch(t *testing.T) { func (m *member) Launch(t *testing.T) {
m.s = etcdserver.NewServer(&m.ServerConfig) var err error
if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
t.Fatalf("failed to initialize the etcd server: %v", err)
}
m.s.Ticker = time.Tick(tickDuration) m.s.Ticker = time.Tick(tickDuration)
m.s.SyncTicker = time.Tick(tickDuration) m.s.SyncTicker = time.Tick(tickDuration)
m.s.Start() m.s.Start()

53
pkg/flags/fallback.go Normal file
View File

@ -0,0 +1,53 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flags
import (
"errors"
)
const (
FallbackExit = "exit"
FallbackProxy = "proxy"
)
var (
FallbackValues = []string{
FallbackExit,
FallbackProxy,
}
)
// FallbackFlag implements the flag.Value interface.
type Fallback string
// Set verifies the argument to be a valid member of FallbackFlagValues
// before setting the underlying flag value.
func (fb *Fallback) Set(s string) error {
for _, v := range FallbackValues {
if s == v {
*fb = Fallback(s)
return nil
}
}
return errors.New("invalid value")
}
func (fb *Fallback) String() string {
return string(*fb)
}