mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdmain: add config.go
This commit is contained in:
parent
4f2d35679e
commit
0fa754d90e
248
etcdmain/config.go
Normal file
248
etcdmain/config.go
Normal file
@ -0,0 +1,248 @@
|
||||
/*
|
||||
Copyright 2014 CoreOS, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcdmain
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/flags"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/version"
|
||||
)
|
||||
|
||||
const (
|
||||
proxyFlagOff = "off"
|
||||
proxyFlagReadonly = "readonly"
|
||||
proxyFlagOn = "on"
|
||||
|
||||
fallbackFlagExit = "exit"
|
||||
fallbackFlagProxy = "proxy"
|
||||
|
||||
clusterStateFlagNew = "new"
|
||||
clusterStateFlagExisting = "existing"
|
||||
)
|
||||
|
||||
var (
|
||||
ignored = []string{
|
||||
"cluster-active-size",
|
||||
"cluster-remove-delay",
|
||||
"cluster-sync-interval",
|
||||
"config",
|
||||
"force",
|
||||
"max-result-buffer",
|
||||
"max-retry-attempts",
|
||||
"peer-heartbeat-interval",
|
||||
"peer-election-timeout",
|
||||
"retry-interval",
|
||||
"snapshot",
|
||||
"v",
|
||||
"vv",
|
||||
}
|
||||
)
|
||||
|
||||
type config struct {
|
||||
*flag.FlagSet
|
||||
|
||||
// member
|
||||
corsInfo *cors.CORSInfo
|
||||
dir string
|
||||
lpurls, lcurls []url.URL
|
||||
maxSnapFiles uint
|
||||
maxWalFiles uint
|
||||
name string
|
||||
snapCount uint64
|
||||
|
||||
// clustering
|
||||
apurls, acurls []url.URL
|
||||
clusterState *flags.StringsFlag
|
||||
dnsCluster string
|
||||
dproxy string
|
||||
durl string
|
||||
fallback *flags.StringsFlag
|
||||
initialCluster string
|
||||
initialClusterToken string
|
||||
|
||||
// proxy
|
||||
proxy *flags.StringsFlag
|
||||
|
||||
// security
|
||||
clientTLSInfo, peerTLSInfo transport.TLSInfo
|
||||
|
||||
// unsafe
|
||||
forceNewCluster bool
|
||||
|
||||
printVersion bool
|
||||
|
||||
ignored []string
|
||||
}
|
||||
|
||||
func NewConfig() *config {
|
||||
cfg := &config{
|
||||
corsInfo: &cors.CORSInfo{},
|
||||
clusterState: flags.NewStringsFlag(
|
||||
clusterStateFlagNew,
|
||||
clusterStateFlagExisting,
|
||||
),
|
||||
fallback: flags.NewStringsFlag(
|
||||
fallbackFlagExit,
|
||||
fallbackFlagProxy,
|
||||
),
|
||||
ignored: ignored,
|
||||
proxy: flags.NewStringsFlag(
|
||||
proxyFlagOff,
|
||||
proxyFlagReadonly,
|
||||
proxyFlagOn,
|
||||
),
|
||||
}
|
||||
|
||||
cfg.FlagSet = flag.NewFlagSet("etcd", flag.ContinueOnError)
|
||||
fs := cfg.FlagSet
|
||||
fs.Usage = func() {
|
||||
fmt.Println(usageline)
|
||||
fmt.Println(flagsline)
|
||||
}
|
||||
|
||||
// member
|
||||
fs.Var(cfg.corsInfo, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
|
||||
fs.StringVar(&cfg.dir, "data-dir", "", "Path to the data directory")
|
||||
fs.Var(flags.NewURLsValue("http://localhost:2380,http://localhost:7001"), "listen-peer-urls", "List of URLs to listen on for peer traffic")
|
||||
fs.Var(flags.NewURLsValue("http://localhost:2379,http://localhost:4001"), "listen-client-urls", "List of URLs to listen on for client traffic")
|
||||
fs.UintVar(&cfg.maxSnapFiles, "max-snapshots", defaultMaxSnapshots, "Maximum number of snapshot files to retain (0 is unlimited)")
|
||||
fs.UintVar(&cfg.maxWalFiles, "max-wals", defaultMaxWALs, "Maximum number of wal files to retain (0 is unlimited)")
|
||||
fs.StringVar(&cfg.name, "name", "default", "Unique human-readable name for this node")
|
||||
fs.Uint64Var(&cfg.snapCount, "snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
|
||||
|
||||
// clustering
|
||||
fs.Var(flags.NewURLsValue("http://localhost:2380,http://localhost:7001"), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster")
|
||||
fs.Var(flags.NewURLsValue("http://localhost:2379,http://localhost:4001"), "advertise-client-urls", "List of this member's client URLs to advertise to the rest of the cluster")
|
||||
fs.StringVar(&cfg.durl, "discovery", "", "Discovery service used to bootstrap the cluster")
|
||||
fs.Var(cfg.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %s", strings.Join(cfg.fallback.Values, ", ")))
|
||||
if err := cfg.fallback.Set(fallbackFlagProxy); err != nil {
|
||||
// Should never happen.
|
||||
log.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
|
||||
}
|
||||
fs.StringVar(&cfg.dproxy, "discovery-proxy", "", "HTTP proxy to use for traffic to discovery service")
|
||||
fs.StringVar(&cfg.dnsCluster, "discovery-srv", "", "Bootstrap initial cluster via DNS domain")
|
||||
fs.StringVar(&cfg.initialCluster, "initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping")
|
||||
fs.StringVar(&cfg.initialClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap")
|
||||
fs.Var(cfg.clusterState, "initial-cluster-state", "Initial cluster configuration for bootstrapping")
|
||||
if err := cfg.clusterState.Set(clusterStateFlagNew); err != nil {
|
||||
// Should never happen.
|
||||
log.Panicf("unexpected error setting up clusterStateFlag: %v", err)
|
||||
}
|
||||
|
||||
// proxy
|
||||
fs.Var(cfg.proxy, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(cfg.proxy.Values, ", ")))
|
||||
if err := cfg.proxy.Set(proxyFlagOff); err != nil {
|
||||
// Should never happen.
|
||||
log.Panicf("unexpected error setting up proxyFlag: %v", err)
|
||||
}
|
||||
|
||||
// security
|
||||
fs.StringVar(&cfg.clientTLSInfo.CAFile, "ca-file", "", "Path to the client server TLS CA file.")
|
||||
fs.StringVar(&cfg.clientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.")
|
||||
fs.StringVar(&cfg.clientTLSInfo.KeyFile, "key-file", "", "Path to the client server TLS key file.")
|
||||
fs.StringVar(&cfg.peerTLSInfo.CAFile, "peer-ca-file", "", "Path to the peer server TLS CA file.")
|
||||
fs.StringVar(&cfg.peerTLSInfo.CertFile, "peer-cert-file", "", "Path to the peer server TLS cert file.")
|
||||
fs.StringVar(&cfg.peerTLSInfo.KeyFile, "peer-key-file", "", "Path to the peer server TLS key file.")
|
||||
|
||||
// unsafe
|
||||
fs.BoolVar(&cfg.forceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster")
|
||||
|
||||
// version
|
||||
fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit")
|
||||
|
||||
// backwards-compatibility with v0.4.6
|
||||
fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.")
|
||||
fs.Var(&flags.IPAddressPort{}, "bind-addr", "DEPRECATED: Use -listen-client-urls instead.")
|
||||
fs.Var(&flags.IPAddressPort{}, "peer-addr", "DEPRECATED: Use -initial-advertise-peer-urls instead.")
|
||||
fs.Var(&flags.IPAddressPort{}, "peer-bind-addr", "DEPRECATED: Use -listen-peer-urls instead.")
|
||||
fs.Var(&flags.DeprecatedFlag{Name: "peers"}, "peers", "DEPRECATED: Use -initial-cluster instead")
|
||||
fs.Var(&flags.DeprecatedFlag{Name: "peers-file"}, "peers-file", "DEPRECATED: Use -initial-cluster instead")
|
||||
|
||||
// ignored
|
||||
for _, f := range cfg.ignored {
|
||||
fs.Var(&flags.IgnoredFlag{Name: f}, f, "")
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
func (cfg *config) Parse(arguments []string) error {
|
||||
perr := cfg.FlagSet.Parse(os.Args[1:])
|
||||
switch perr {
|
||||
case nil:
|
||||
case flag.ErrHelp:
|
||||
os.Exit(0)
|
||||
default:
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
if cfg.printVersion {
|
||||
fmt.Println("etcd version", version.Version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
err := flags.SetFlagsFromEnv(cfg.FlagSet)
|
||||
if err != nil {
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
|
||||
set := make(map[string]bool)
|
||||
cfg.FlagSet.Visit(func(f *flag.Flag) {
|
||||
set[f.Name] = true
|
||||
})
|
||||
nSet := 0
|
||||
for _, v := range []bool{set["discovery"], set["inital-cluster"], set["discovery-srv"]} {
|
||||
if v {
|
||||
nSet += 1
|
||||
}
|
||||
}
|
||||
if nSet > 1 {
|
||||
return fmt.Errorf("multiple discovery or bootstrap flags are set. Choose one of \"discovery\", \"initial-cluster\", or \"discovery-srv\"")
|
||||
}
|
||||
|
||||
cfg.lpurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-peer-urls", "peer-bind-addr", cfg.peerTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.apurls, err = flags.URLsFromFlags(cfg.FlagSet, "initial-advertise-peer-urls", "peer-addr", cfg.peerTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.lcurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-client-urls", "bind-addr", cfg.clientTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.acurls, err = flags.URLsFromFlags(cfg.FlagSet, "advertise-client-urls", "addr", cfg.clientTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg config) isNewCluster() bool { return cfg.clusterState.String() == clusterStateFlagNew }
|
||||
func (cfg config) isProxy() bool { return cfg.proxy.String() != proxyFlagOff }
|
||||
func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == proxyFlagReadonly }
|
||||
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
|
294
etcdmain/etcd.go
294
etcdmain/etcd.go
@ -17,12 +17,10 @@
|
||||
package etcdmain
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
@ -31,222 +29,86 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/flags"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/proxy"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/version"
|
||||
)
|
||||
|
||||
const (
|
||||
// the owner can make/remove files inside the directory
|
||||
privateDirMode = 0700
|
||||
|
||||
proxyFlagOff = "off"
|
||||
proxyFlagReadonly = "readonly"
|
||||
proxyFlagOn = "on"
|
||||
|
||||
fallbackFlagExit = "exit"
|
||||
fallbackFlagProxy = "proxy"
|
||||
|
||||
clusterStateFlagNew = "new"
|
||||
clusterStateFlagExisting = "existing"
|
||||
)
|
||||
|
||||
var (
|
||||
fs = flag.NewFlagSet("etcd", flag.ContinueOnError)
|
||||
name = fs.String("name", "default", "Unique human-readable name for this node")
|
||||
dir = fs.String("data-dir", "", "Path to the data directory")
|
||||
durl = fs.String("discovery", "", "Discovery service used to bootstrap the cluster")
|
||||
dnsCluster = fs.String("discovery-srv", "", "Bootstrap initial cluster via DNS domain")
|
||||
dproxy = fs.String("discovery-proxy", "", "HTTP proxy to use for traffic to discovery service")
|
||||
snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
|
||||
printVersion = fs.Bool("version", false, "Print the version and exit")
|
||||
forceNewCluster = fs.Bool("force-new-cluster", false, "Force to create a new one member cluster")
|
||||
maxSnapFiles = fs.Uint("max-snapshots", defaultMaxSnapshots, "Maximum number of snapshot files to retain (0 is unlimited)")
|
||||
maxWalFiles = fs.Uint("max-wals", defaultMaxWALs, "Maximum number of wal files to retain (0 is unlimited)")
|
||||
|
||||
initialCluster = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping")
|
||||
initialClusterToken = fs.String("initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap")
|
||||
|
||||
corsInfo = &cors.CORSInfo{}
|
||||
clientTLSInfo = transport.TLSInfo{}
|
||||
peerTLSInfo = transport.TLSInfo{}
|
||||
|
||||
proxyFlag = flags.NewStringsFlag(
|
||||
proxyFlagOff,
|
||||
proxyFlagReadonly,
|
||||
proxyFlagOn,
|
||||
)
|
||||
fallbackFlag = flags.NewStringsFlag(
|
||||
fallbackFlagExit,
|
||||
fallbackFlagProxy,
|
||||
)
|
||||
clusterStateFlag = flags.NewStringsFlag(
|
||||
clusterStateFlagNew,
|
||||
clusterStateFlagExisting,
|
||||
)
|
||||
|
||||
ignored = []string{
|
||||
"cluster-active-size",
|
||||
"cluster-remove-delay",
|
||||
"cluster-sync-interval",
|
||||
"config",
|
||||
"force",
|
||||
"max-result-buffer",
|
||||
"max-retry-attempts",
|
||||
"peer-heartbeat-interval",
|
||||
"peer-election-timeout",
|
||||
"retry-interval",
|
||||
"snapshot",
|
||||
"v",
|
||||
"vv",
|
||||
}
|
||||
|
||||
// indirection for testing
|
||||
lookupSRV = net.LookupSRV
|
||||
)
|
||||
|
||||
func init() {
|
||||
fs.Usage = func() {
|
||||
fmt.Println(usageline)
|
||||
fmt.Println(flagsline)
|
||||
}
|
||||
|
||||
fs.Var(clusterStateFlag, "initial-cluster-state", "Initial cluster configuration for bootstrapping")
|
||||
if err := clusterStateFlag.Set(clusterStateFlagNew); err != nil {
|
||||
// Should never happen.
|
||||
log.Panicf("unexpected error setting up clusterStateFlag: %v", err)
|
||||
}
|
||||
|
||||
fs.Var(flags.NewURLsValue("http://localhost:2380,http://localhost:7001"), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster")
|
||||
fs.Var(flags.NewURLsValue("http://localhost:2379,http://localhost:4001"), "advertise-client-urls", "List of this member's client URLs to advertise to the rest of the cluster")
|
||||
fs.Var(flags.NewURLsValue("http://localhost:2380,http://localhost:7001"), "listen-peer-urls", "List of URLs to listen on for peer traffic")
|
||||
fs.Var(flags.NewURLsValue("http://localhost:2379,http://localhost:4001"), "listen-client-urls", "List of URLs to listen on for client traffic")
|
||||
|
||||
fs.Var(corsInfo, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
|
||||
|
||||
fs.Var(proxyFlag, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(proxyFlag.Values, ", ")))
|
||||
if err := proxyFlag.Set(proxyFlagOff); err != nil {
|
||||
// Should never happen.
|
||||
log.Panicf("unexpected error setting up proxyFlag: %v", err)
|
||||
}
|
||||
fs.Var(fallbackFlag, "discovery-fallback", fmt.Sprintf("Valid values include %s", strings.Join(fallbackFlag.Values, ", ")))
|
||||
if err := fallbackFlag.Set(fallbackFlagProxy); err != nil {
|
||||
// Should never happen.
|
||||
log.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
|
||||
}
|
||||
|
||||
fs.StringVar(&clientTLSInfo.CAFile, "ca-file", "", "Path to the client server TLS CA file.")
|
||||
fs.StringVar(&clientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.")
|
||||
fs.StringVar(&clientTLSInfo.KeyFile, "key-file", "", "Path to the client server TLS key file.")
|
||||
|
||||
fs.StringVar(&peerTLSInfo.CAFile, "peer-ca-file", "", "Path to the peer server TLS CA file.")
|
||||
fs.StringVar(&peerTLSInfo.CertFile, "peer-cert-file", "", "Path to the peer server TLS cert file.")
|
||||
fs.StringVar(&peerTLSInfo.KeyFile, "peer-key-file", "", "Path to the peer server TLS key file.")
|
||||
|
||||
// backwards-compatibility with v0.4.6
|
||||
fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.")
|
||||
fs.Var(&flags.IPAddressPort{}, "bind-addr", "DEPRECATED: Use -listen-client-urls instead.")
|
||||
fs.Var(&flags.IPAddressPort{}, "peer-addr", "DEPRECATED: Use -initial-advertise-peer-urls instead.")
|
||||
fs.Var(&flags.IPAddressPort{}, "peer-bind-addr", "DEPRECATED: Use -listen-peer-urls instead.")
|
||||
|
||||
for _, f := range ignored {
|
||||
fs.Var(&flags.IgnoredFlag{Name: f}, f, "")
|
||||
}
|
||||
|
||||
fs.Var(&flags.DeprecatedFlag{Name: "peers"}, "peers", "DEPRECATED: Use -initial-cluster instead")
|
||||
fs.Var(&flags.DeprecatedFlag{Name: "peers-file"}, "peers-file", "DEPRECATED: Use -initial-cluster instead")
|
||||
}
|
||||
|
||||
func Main() {
|
||||
perr := fs.Parse(os.Args[1:])
|
||||
switch perr {
|
||||
case nil:
|
||||
case flag.ErrHelp:
|
||||
os.Exit(0)
|
||||
default:
|
||||
cfg := NewConfig()
|
||||
err := cfg.Parse(os.Args[1:])
|
||||
if err != nil {
|
||||
log.Printf("etcd: error verifying flags, %v", err)
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
if *printVersion {
|
||||
fmt.Println("etcd version", version.Version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
err := flags.SetFlagsFromEnv(fs)
|
||||
if err != nil {
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
|
||||
shouldProxy := proxyFlag.String() != proxyFlagOff
|
||||
var stopped <-chan struct{}
|
||||
|
||||
shouldProxy := cfg.isProxy()
|
||||
if !shouldProxy {
|
||||
stopped, err = startEtcd()
|
||||
if err == discovery.ErrFullCluster && fallbackFlag.String() == fallbackFlagProxy {
|
||||
stopped, err = startEtcd(cfg)
|
||||
if err == discovery.ErrFullCluster && cfg.shouldFallbackToProxy() {
|
||||
log.Printf("etcd: discovery cluster full, falling back to %s", fallbackFlagProxy)
|
||||
shouldProxy = true
|
||||
}
|
||||
}
|
||||
if shouldProxy {
|
||||
err = startProxy()
|
||||
err = startProxy(cfg)
|
||||
}
|
||||
if err != nil {
|
||||
switch err {
|
||||
case discovery.ErrDuplicateID:
|
||||
log.Fatalf("etcd: member %s has previously registered with discovery service (%s), but the data-dir (%s) on disk cannot be found.",
|
||||
*name, *durl, *dir)
|
||||
cfg.name, cfg.durl, cfg.dir)
|
||||
default:
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
<-stopped
|
||||
}
|
||||
|
||||
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
|
||||
func startEtcd() (<-chan struct{}, error) {
|
||||
apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "peer-addr", peerTLSInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cls, err := setupCluster(apurls)
|
||||
func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
cls, err := setupCluster(cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
|
||||
}
|
||||
|
||||
if *dir == "" {
|
||||
*dir = fmt.Sprintf("%v.etcd", *name)
|
||||
log.Printf("no data-dir provided, using default data-dir ./%s", *dir)
|
||||
if cfg.dir == "" {
|
||||
cfg.dir = fmt.Sprintf("%v.etcd", cfg.name)
|
||||
log.Printf("no data-dir provided, using default data-dir ./%s", cfg.dir)
|
||||
}
|
||||
if err := os.MkdirAll(*dir, privateDirMode); err != nil {
|
||||
if err := os.MkdirAll(cfg.dir, privateDirMode); err != nil {
|
||||
return nil, fmt.Errorf("cannot create data directory: %v", err)
|
||||
}
|
||||
if err := fileutil.IsDirWriteable(*dir); err != nil {
|
||||
if err := fileutil.IsDirWriteable(cfg.dir); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
|
||||
pt, err := transport.NewTimeoutTransport(peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
acurls, err := flags.URLsFromFlags(fs, "advertise-client-urls", "addr", clientTLSInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !peerTLSInfo.Empty() {
|
||||
log.Printf("etcd: peerTLS: %s", peerTLSInfo)
|
||||
if !cfg.peerTLSInfo.Empty() {
|
||||
log.Printf("etcd: peerTLS: %s", cfg.peerTLSInfo)
|
||||
}
|
||||
plns := make([]net.Listener, 0)
|
||||
for _, u := range lpurls {
|
||||
for _, u := range cfg.lpurls {
|
||||
var l net.Listener
|
||||
l, err = transport.NewTimeoutListener(u.Host, u.Scheme, peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -262,18 +124,13 @@ func startEtcd() (<-chan struct{}, error) {
|
||||
plns = append(plns, l)
|
||||
}
|
||||
|
||||
lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !clientTLSInfo.Empty() {
|
||||
log.Printf("etcd: clientTLS: %s", clientTLSInfo)
|
||||
if !cfg.clientTLSInfo.Empty() {
|
||||
log.Printf("etcd: clientTLS: %s", cfg.clientTLSInfo)
|
||||
}
|
||||
clns := make([]net.Listener, 0)
|
||||
for _, u := range lcurls {
|
||||
for _, u := range cfg.lcurls {
|
||||
var l net.Listener
|
||||
l, err = transport.NewListener(u.Host, u.Scheme, clientTLSInfo)
|
||||
l, err = transport.NewListener(u.Host, u.Scheme, cfg.clientTLSInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -289,34 +146,34 @@ func startEtcd() (<-chan struct{}, error) {
|
||||
clns = append(clns, l)
|
||||
}
|
||||
|
||||
cfg := &etcdserver.ServerConfig{
|
||||
Name: *name,
|
||||
ClientURLs: acurls,
|
||||
PeerURLs: apurls,
|
||||
DataDir: *dir,
|
||||
SnapCount: *snapCount,
|
||||
MaxSnapFiles: *maxSnapFiles,
|
||||
MaxWALFiles: *maxWalFiles,
|
||||
srvcfg := &etcdserver.ServerConfig{
|
||||
Name: cfg.name,
|
||||
ClientURLs: cfg.acurls,
|
||||
PeerURLs: cfg.apurls,
|
||||
DataDir: cfg.dir,
|
||||
SnapCount: cfg.snapCount,
|
||||
MaxSnapFiles: cfg.maxSnapFiles,
|
||||
MaxWALFiles: cfg.maxWalFiles,
|
||||
Cluster: cls,
|
||||
DiscoveryURL: *durl,
|
||||
DiscoveryProxy: *dproxy,
|
||||
NewCluster: clusterStateFlag.String() == clusterStateFlagNew,
|
||||
ForceNewCluster: *forceNewCluster,
|
||||
DiscoveryURL: cfg.durl,
|
||||
DiscoveryProxy: cfg.dproxy,
|
||||
NewCluster: cfg.isNewCluster(),
|
||||
ForceNewCluster: cfg.forceNewCluster,
|
||||
Transport: pt,
|
||||
}
|
||||
var s *etcdserver.EtcdServer
|
||||
s, err = etcdserver.NewServer(cfg)
|
||||
s, err = etcdserver.NewServer(srvcfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.Start()
|
||||
|
||||
if corsInfo.String() != "" {
|
||||
log.Printf("etcd: cors = %s", corsInfo)
|
||||
if cfg.corsInfo.String() != "" {
|
||||
log.Printf("etcd: cors = %s", cfg.corsInfo)
|
||||
}
|
||||
ch := &cors.CORSHandler{
|
||||
Handler: etcdhttp.NewClientHandler(s),
|
||||
Info: corsInfo,
|
||||
Info: cfg.corsInfo,
|
||||
}
|
||||
ph := etcdhttp.NewPeerHandler(s)
|
||||
// Start the peer server in a goroutine
|
||||
@ -335,27 +192,23 @@ func startEtcd() (<-chan struct{}, error) {
|
||||
}
|
||||
|
||||
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
|
||||
func startProxy() error {
|
||||
apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "peer-addr", peerTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cls, err := setupCluster(apurls)
|
||||
func startProxy(cfg *config) error {
|
||||
cls, err := setupCluster(cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting up initial cluster: %v", err)
|
||||
}
|
||||
|
||||
if *durl != "" {
|
||||
s, err := discovery.GetCluster(*durl, *dproxy)
|
||||
if cfg.durl != "" {
|
||||
s, err := discovery.GetCluster(cfg.durl, cfg.dproxy)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cls, err = etcdserver.NewClusterFromString(*durl, s); err != nil {
|
||||
if cls, err = etcdserver.NewClusterFromString(cfg.durl, s); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
pt, err := transport.NewTransport(clientTLSInfo)
|
||||
pt, err := transport.NewTransport(cfg.clientTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -374,19 +227,15 @@ func startProxy() error {
|
||||
ph := proxy.NewHandler(pt, uf)
|
||||
ph = &cors.CORSHandler{
|
||||
Handler: ph,
|
||||
Info: corsInfo,
|
||||
Info: cfg.corsInfo,
|
||||
}
|
||||
|
||||
if proxyFlag.String() == proxyFlagReadonly {
|
||||
if cfg.isReadonlyProxy() {
|
||||
ph = proxy.NewReadonlyHandler(ph)
|
||||
}
|
||||
lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Start a proxy server goroutine for each listen address
|
||||
for _, u := range lcurls {
|
||||
l, err := transport.NewListener(u.Host, u.Scheme, clientTLSInfo)
|
||||
for _, u := range cfg.lcurls {
|
||||
l, err := transport.NewListener(u.Host, u.Scheme, cfg.clientTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -401,39 +250,24 @@ func startProxy() error {
|
||||
}
|
||||
|
||||
// setupCluster sets up an initial cluster definition for bootstrap or discovery.
|
||||
func setupCluster(apurls []url.URL) (*etcdserver.Cluster, error) {
|
||||
set := make(map[string]bool)
|
||||
fs.Visit(func(f *flag.Flag) {
|
||||
set[f.Name] = true
|
||||
})
|
||||
nSet := 0
|
||||
for _, v := range []bool{set["discovery"], set["inital-cluster"], set["discovery-srv"]} {
|
||||
if v {
|
||||
nSet += 1
|
||||
}
|
||||
}
|
||||
if nSet > 1 {
|
||||
return nil, fmt.Errorf("multiple discovery or bootstrap flags are set. Choose one of \"discovery\", \"initial-cluster\", or \"discovery-srv\"")
|
||||
}
|
||||
func setupCluster(cfg *config) (*etcdserver.Cluster, error) {
|
||||
var cls *etcdserver.Cluster
|
||||
var err error
|
||||
switch {
|
||||
case set["discovery"]:
|
||||
case cfg.durl != "":
|
||||
// If using discovery, generate a temporary cluster based on
|
||||
// self's advertised peer URLs
|
||||
clusterStr := genClusterString(*name, apurls)
|
||||
cls, err = etcdserver.NewClusterFromString(*durl, clusterStr)
|
||||
case set["discovery-srv"]:
|
||||
clusterStr, clusterToken, err := genDNSClusterString(*initialClusterToken, apurls)
|
||||
clusterStr := genClusterString(cfg.name, cfg.apurls)
|
||||
cls, err = etcdserver.NewClusterFromString(cfg.durl, clusterStr)
|
||||
case cfg.dnsCluster != "":
|
||||
clusterStr, clusterToken, err := genDNSClusterString(cfg.name, cfg.dnsCluster, cfg.initialClusterToken, cfg.apurls)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cls, err = etcdserver.NewClusterFromString(clusterToken, clusterStr)
|
||||
case set["initial-cluster"]:
|
||||
fallthrough
|
||||
default:
|
||||
// We're statically configured, and cluster has appropriately been set.
|
||||
cls, err = etcdserver.NewClusterFromString(*initialClusterToken, *initialCluster)
|
||||
cls, err = etcdserver.NewClusterFromString(cfg.initialClusterToken, cfg.initialCluster)
|
||||
}
|
||||
return cls, err
|
||||
}
|
||||
@ -449,7 +283,7 @@ func genClusterString(name string, urls types.URLs) string {
|
||||
// TODO(barakmich): Currently ignores priority and weight (as they don't make as much sense for a bootstrap)
|
||||
// Also doesn't do any lookups for the token (though it could)
|
||||
// Also sees each entry as a separate instance.
|
||||
func genDNSClusterString(defaultToken string, apurls types.URLs) (string, string, error) {
|
||||
func genDNSClusterString(name, dns string, defaultToken string, apurls types.URLs) (string, string, error) {
|
||||
stringParts := make([]string, 0)
|
||||
tempName := int(0)
|
||||
tcpAPUrls := make([]string, 0)
|
||||
@ -465,7 +299,7 @@ func genDNSClusterString(defaultToken string, apurls types.URLs) (string, string
|
||||
}
|
||||
|
||||
updateNodeMap := func(service, prefix string) error {
|
||||
_, addrs, err := lookupSRV(service, "tcp", *dnsCluster)
|
||||
_, addrs, err := lookupSRV(service, "tcp", dns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -479,7 +313,7 @@ func genDNSClusterString(defaultToken string, apurls types.URLs) (string, string
|
||||
n := ""
|
||||
for _, url := range tcpAPUrls {
|
||||
if url == tcpAddr.String() {
|
||||
n = *name
|
||||
n = name
|
||||
}
|
||||
}
|
||||
if n == "" {
|
||||
|
@ -61,9 +61,7 @@ func TestGenClusterString(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGenDNSClusterString(t *testing.T) {
|
||||
oldname := *name
|
||||
*name = "dnsClusterTest"
|
||||
defer func() { *name = oldname }()
|
||||
name := "dnsClusterTest"
|
||||
tests := []struct {
|
||||
withSSL []*net.SRV
|
||||
withoutSSL []*net.SRV
|
||||
@ -124,7 +122,7 @@ func TestGenDNSClusterString(t *testing.T) {
|
||||
}
|
||||
defer func() { lookupSRV = net.LookupSRV }()
|
||||
urls := mustNewURLs(t, tt.urls)
|
||||
str, token, err := genDNSClusterString("token", urls)
|
||||
str, token, err := genDNSClusterString(name, "example.com", "token", urls)
|
||||
if err != nil {
|
||||
t.Fatalf("%d: err: %#v", i, err)
|
||||
}
|
||||
|
@ -45,6 +45,8 @@ clustering flags:
|
||||
expected behavior ('exit' or 'proxy') when discovery services fails.
|
||||
--discovery-proxy ''
|
||||
HTTP proxy to use for traffic to discovery service.
|
||||
--discovery-srv ''
|
||||
dns srv domain name used to bootstrap the cluster.
|
||||
|
||||
|
||||
proxy flags:
|
||||
|
Loading…
x
Reference in New Issue
Block a user