etcdmain, embed: export Config and StartEtcd into embed/

Lets programs embed etcd.

Fixes #5430
This commit is contained in:
Anthony Romano 2016-07-10 11:06:08 -07:00
parent b9f6de9277
commit f4f33ea767
10 changed files with 959 additions and 704 deletions

304
embed/config.go Normal file
View File

@ -0,0 +1,304 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package embed
import (
"fmt"
"io/ioutil"
"net/url"
"strings"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/ghodss/yaml"
)
const (
ClusterStateFlagNew = "new"
ClusterStateFlagExisting = "existing"
DefaultName = "default"
DefaultInitialAdvertisePeerURLs = "http://localhost:2380"
DefaultAdvertiseClientURLs = "http://localhost:2379"
DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
// maxElectionMs specifies the maximum value of election timeout.
// More details are listed in ../Documentation/tuning.md#time-parameters.
maxElectionMs = 50000
)
var (
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set. " +
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
ErrUnsetAdvertiseClientURLsFlag = fmt.Errorf("--advertise-client-urls is required when --listen-client-urls is set explicitly")
)
// Config holds the arguments for configuring an etcd server.
type Config struct {
// member
CorsInfo *cors.CORSInfo
LPUrls, LCUrls []url.URL
Dir string `json:"data-dir"`
WalDir string `json:"wal-dir"`
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
AutoCompactionRetention int `json:"auto-compaction-retention"`
// TickMs is the number of milliseconds between heartbeat ticks.
// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
// make ticks a cluster wide configuration.
TickMs uint `json:"heartbeat-interval"`
ElectionMs uint `json:"election-timeout"`
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
// clustering
APUrls, ACUrls []url.URL
ClusterState string `json:"initial-cluster-state"`
DNSCluster string `json:"discovery-srv"`
Dproxy string `json:"discovery-proxy"`
Durl string `json:"discovery"`
InitialCluster string `json:"initial-cluster"`
InitialClusterToken string `json:"initial-cluster-token"`
StrictReconfigCheck bool `json:"strict-reconfig-check"`
// security
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerTLSInfo transport.TLSInfo
PeerAutoTLS bool
// debug
Debug bool `json:"debug"`
LogPkgLevels string `json:"log-package-levels"`
EnablePprof bool
// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`
}
// configYAML holds the config suitable for yaml parsing
type configYAML struct {
Config
configJSON
}
// configJSON has file options that are translated into Config options
type configJSON struct {
LPUrlsJSON string `json:"listen-peer-urls"`
LCUrlsJSON string `json:"listen-client-urls"`
CorsJSON string `json:"cors"`
APUrlsJSON string `json:"initial-advertise-peer-urls"`
ACUrlsJSON string `json:"advertise-client-urls"`
ClientSecurityJSON securityConfig `json:"client-transport-security"`
PeerSecurityJSON securityConfig `json:"peer-transport-security"`
}
type securityConfig struct {
CAFile string `json:"ca-file"`
CertFile string `json:"cert-file"`
KeyFile string `json:"key-file"`
CertAuth bool `json:"client-cert-auth"`
TrustedCAFile string `json:"trusted-ca-file"`
AutoTLS bool `json:"auto-tls"`
}
// NewConfig creates a new Config populated with default values.
func NewConfig() *Config {
apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs)
acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
cfg := &Config{
CorsInfo: &cors.CORSInfo{},
MaxSnapFiles: DefaultMaxSnapshots,
MaxWalFiles: DefaultMaxWALs,
Name: DefaultName,
SnapCount: etcdserver.DefaultSnapCount,
TickMs: 100,
ElectionMs: 1000,
APUrls: []url.URL{*apurl},
ACUrls: []url.URL{*acurl},
ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg
}
func ConfigFromFile(path string) (*Config, error) {
cfg := &configYAML{}
if err := cfg.configFromFile(path); err != nil {
return nil, err
}
return &cfg.Config, nil
}
func (cfg *configYAML) configFromFile(path string) error {
b, err := ioutil.ReadFile(path)
if err != nil {
return err
}
err = yaml.Unmarshal(b, cfg)
if err != nil {
return err
}
if cfg.LPUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ","))
if err != nil {
plog.Fatalf("unexpected error setting up listen-peer-urls: %v", err)
}
cfg.LPUrls = []url.URL(u)
}
if cfg.LCUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ","))
if err != nil {
plog.Fatalf("unexpected error setting up listen-client-urls: %v", err)
}
cfg.LCUrls = []url.URL(u)
}
if cfg.CorsJSON != "" {
if err := cfg.CorsInfo.Set(cfg.CorsJSON); err != nil {
plog.Panicf("unexpected error setting up cors: %v", err)
}
}
if cfg.APUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ","))
if err != nil {
plog.Fatalf("unexpected error setting up initial-advertise-peer-urls: %v", err)
}
cfg.APUrls = []url.URL(u)
}
if cfg.ACUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ","))
if err != nil {
plog.Fatalf("unexpected error setting up advertise-peer-urls: %v", err)
}
cfg.ACUrls = []url.URL(u)
}
if cfg.ClusterState == "" {
cfg.ClusterState = ClusterStateFlagNew
}
copySecurityDetails := func(tls *transport.TLSInfo, ysc *securityConfig) {
tls.CAFile = ysc.CAFile
tls.CertFile = ysc.CertFile
tls.KeyFile = ysc.KeyFile
tls.ClientCertAuth = ysc.CertAuth
tls.TrustedCAFile = ysc.TrustedCAFile
}
copySecurityDetails(&cfg.ClientTLSInfo, &cfg.ClientSecurityJSON)
copySecurityDetails(&cfg.PeerTLSInfo, &cfg.PeerSecurityJSON)
cfg.ClientAutoTLS = cfg.ClientSecurityJSON.AutoTLS
cfg.PeerAutoTLS = cfg.PeerSecurityJSON.AutoTLS
return cfg.Validate()
}
func (cfg *Config) Validate() error {
// Check if conflicting flags are passed.
nSet := 0
for _, v := range []bool{cfg.Durl != "", cfg.InitialCluster != "", cfg.DNSCluster != ""} {
if v {
nSet++
}
}
if cfg.ClusterState != ClusterStateFlagNew && cfg.ClusterState != ClusterStateFlagExisting {
return fmt.Errorf("unexpected clusterState %q", cfg.ClusterState)
}
if nSet > 1 {
return ErrConflictBootstrapFlags
}
if 5*cfg.TickMs > cfg.ElectionMs {
return fmt.Errorf("--election-timeout[%vms] should be at least as 5 times as --heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
}
if cfg.ElectionMs > maxElectionMs {
return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
}
// check this last since proxying in etcdmain may make this OK
if cfg.LCUrls != nil && cfg.ACUrls == nil {
return ErrUnsetAdvertiseClientURLsFlag
}
return nil
}
// PeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery.
func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, token string, err error) {
switch {
case cfg.Durl != "":
urlsmap = types.URLsMap{}
// If using discovery, generate a temporary cluster based on
// self's advertised peer URLs
urlsmap[cfg.Name] = cfg.APUrls
token = cfg.Durl
case cfg.DNSCluster != "":
var clusterStr string
clusterStr, token, err = discovery.SRVGetCluster(cfg.Name, cfg.DNSCluster, cfg.InitialClusterToken, cfg.APUrls)
if err != nil {
return nil, "", err
}
urlsmap, err = types.NewURLsMap(clusterStr)
// only etcd member must belong to the discovered cluster.
// proxy does not need to belong to the discovered cluster.
if which == "etcd" {
if _, ok := urlsmap[cfg.Name]; !ok {
return nil, "", fmt.Errorf("cannot find local etcd member %q in SRV records", cfg.Name)
}
}
default:
// We're statically configured, and cluster has appropriately been set.
urlsmap, err = types.NewURLsMap(cfg.InitialCluster)
token = cfg.InitialClusterToken
}
return urlsmap, token, err
}
func (cfg Config) InitialClusterFromName(name string) (ret string) {
if len(cfg.APUrls) == 0 {
return ""
}
n := name
if name == "" {
n = DefaultName
}
for i := range cfg.APUrls {
ret = ret + "," + n + "=" + cfg.APUrls[i].String()
}
return ret[1:]
}
func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }

83
embed/config_test.go Normal file
View File

@ -0,0 +1,83 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package embed
import (
"io/ioutil"
"os"
"testing"
"github.com/coreos/etcd/pkg/transport"
"github.com/ghodss/yaml"
)
func TestConfigFileOtherFields(t *testing.T) {
ctls := securityConfig{CAFile: "cca", CertFile: "ccert", KeyFile: "ckey"}
ptls := securityConfig{CAFile: "pca", CertFile: "pcert", KeyFile: "pkey"}
yc := struct {
ClientSecurityCfgFile securityConfig `json:"client-transport-security"`
PeerSecurityCfgFile securityConfig `json:"peer-transport-security"`
ForceNewCluster bool `json:"force-new-cluster"`
}{
ctls,
ptls,
true,
}
b, err := yaml.Marshal(&yc)
if err != nil {
t.Fatal(err)
}
tmpfile := mustCreateCfgFile(t, b)
defer os.Remove(tmpfile.Name())
cfg, err := ConfigFromFile(tmpfile.Name())
if err != nil {
t.Fatal(err)
}
if !cfg.ForceNewCluster {
t.Errorf("ForceNewCluster = %v, want %v", cfg.ForceNewCluster, true)
}
if !ctls.equals(&cfg.ClientTLSInfo) {
t.Errorf("ClientTLS = %v, want %v", cfg.ClientTLSInfo, ctls)
}
if !ptls.equals(&cfg.PeerTLSInfo) {
t.Errorf("PeerTLS = %v, want %v", cfg.PeerTLSInfo, ptls)
}
}
func (s *securityConfig) equals(t *transport.TLSInfo) bool {
return s.CAFile == t.CAFile &&
s.CertFile == t.CertFile &&
s.CertAuth == t.ClientCertAuth &&
s.TrustedCAFile == t.TrustedCAFile
}
func mustCreateCfgFile(t *testing.T, b []byte) *os.File {
tmpfile, err := ioutil.TempFile("", "servercfg")
if err != nil {
t.Fatal(err)
}
if _, err = tmpfile.Write(b); err != nil {
t.Fatal(err)
}
if err = tmpfile.Close(); err != nil {
t.Fatal(err)
}
return tmpfile
}

View File

@ -1,4 +1,4 @@
// Copyright 2015 The etcd Authors
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,16 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows,!plan9
/*
Package embed provides bindings for embedding an etcd server in a program.
package etcdmain
Launch an embedded etcd server using the configuration defaults:
import (
// import procfs FIX godeps.
_ "github.com/prometheus/procfs"
)
import (
"log"
const (
defaultMaxSnapshots = 5
defaultMaxWALs = 5
)
"github.com/coreos/etcd/embed"
)
func main() {
cfg := embed.NewConfig()
cfg.Dir = "default.etcd"
e, err := embed.StartEtcd(cfg)
if err != nil {
log.Fatal(err)
}
defer e.Close()
log.Fatal(<-e.Err())
}
*/
package embed

302
embed/etcd.go Normal file
View File

@ -0,0 +1,302 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package embed
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"path"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/pkg/cors"
runtimeutil "github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"
)
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
const (
// internal fd usage includes disk usage and transport usage.
// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
// read all logs after some snapshot index, which locates at the end of
// the second last and the head of the last. For purging, it needs to read
// directory, so it needs 1. For fd monitor, it needs 1.
// For transport, rafthttp builds two long-polling connections and at most
// four temporary connections with each member. There are at most 9 members
// in a cluster, so it should reserve 96.
// For the safety, we set the total reserved number to 150.
reservedInternalFDNum = 150
)
// Etcd contains a running etcd server and its listeners.
type Etcd struct {
Peers []net.Listener
Clients []net.Listener
Server *etcdserver.EtcdServer
cfg Config
errc chan error
sctxs map[string]*serveCtx
}
// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if err = inCfg.Validate(); err != nil {
return nil, err
}
e = &Etcd{cfg: *inCfg}
cfg := &e.cfg
defer func() {
if err != nil {
e.Close()
e = nil
}
}()
if e.Peers, err = startPeerListeners(cfg); err != nil {
return
}
if e.sctxs, err = startClientListeners(cfg); err != nil {
return
}
for _, sctx := range e.sctxs {
e.Clients = append(e.Clients, sctx.l)
}
urlsmap, token, uerr := cfg.PeerURLsMapAndToken("etcd")
if uerr != nil {
err = fmt.Errorf("error setting up initial cluster: %v", uerr)
return
}
srvcfg := &etcdserver.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.ACUrls,
PeerURLs: cfg.APUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapCount: cfg.SnapCount,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
NewCluster: cfg.IsNewCluster(),
ForceNewCluster: cfg.ForceNewCluster,
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
AutoCompactionRetention: cfg.AutoCompactionRetention,
QuotaBackendBytes: cfg.QuotaBackendBytes,
StrictReconfigCheck: cfg.StrictReconfigCheck,
EnablePprof: cfg.EnablePprof,
}
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return
}
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
e.Server.Start()
e.serve()
<-e.Server.ReadyNotify()
return
}
func (e *Etcd) Close() {
for _, sctx := range e.sctxs {
sctx.cancel()
}
for i := range e.Peers {
if e.Peers[i] != nil {
e.Peers[i].Close()
}
}
for i := range e.Clients {
if e.Clients[i] != nil {
e.Clients[i].Close()
}
}
if e.Server != nil {
e.Server.Stop()
}
}
func (e *Etcd) Err() <-chan error { return e.errc }
func startPeerListeners(cfg *Config) (plns []net.Listener, err error) {
if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() {
phosts := make([]string, len(cfg.LPUrls))
for i, u := range cfg.LPUrls {
phosts[i] = u.Host
}
cfg.PeerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}
} else if cfg.PeerAutoTLS {
plog.Warningf("ignoring peer auto TLS since certs given")
}
if !cfg.PeerTLSInfo.Empty() {
plog.Infof("peerTLS: %s", cfg.PeerTLSInfo)
}
plns = make([]net.Listener, len(cfg.LPUrls))
defer func() {
if err == nil {
return
}
for i := range plns {
if plns[i] == nil {
continue
}
plns[i].Close()
plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
}
}()
for i, u := range cfg.LPUrls {
var tlscfg *tls.Config
if u.Scheme == "http" {
if !cfg.PeerTLSInfo.Empty() {
plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
}
if cfg.PeerTLSInfo.ClientCertAuth {
plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
}
}
if !cfg.PeerTLSInfo.Empty() {
if tlscfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
return nil, err
}
}
if plns[i], err = rafthttp.NewListener(u, tlscfg); err != nil {
return nil, err
}
plog.Info("listening for peers on ", u.String())
}
return plns, nil
}
func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() {
chosts := make([]string, len(cfg.LCUrls))
for i, u := range cfg.LCUrls {
chosts[i] = u.Host
}
cfg.ClientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}
} else if cfg.ClientAutoTLS {
plog.Warningf("ignoring client auto TLS since certs given")
}
sctxs = make(map[string]*serveCtx)
for _, u := range cfg.LCUrls {
sctx := newServeCtx()
if u.Scheme == "http" {
if !cfg.ClientTLSInfo.Empty() {
plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
}
if cfg.ClientTLSInfo.ClientCertAuth {
plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
}
}
if u.Scheme == "https" && cfg.ClientTLSInfo.Empty() {
return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
}
sctx.secure = u.Scheme == "https"
sctx.insecure = !sctx.secure
if oldctx := sctxs[u.Host]; oldctx != nil {
oldctx.secure = oldctx.secure || sctx.secure
oldctx.insecure = oldctx.insecure || sctx.insecure
continue
}
if sctx.l, err = net.Listen("tcp", u.Host); err != nil {
return nil, err
}
if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
if fdLimit <= reservedInternalFDNum {
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
}
sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
}
if sctx.l, err = transport.NewKeepAliveListener(sctx.l, "tcp", nil); err != nil {
return nil, err
}
plog.Info("listening for client requests on ", u.Host)
defer func() {
if err != nil {
sctx.l.Close()
plog.Info("stopping listening for client requests on ", u.Host)
}
}()
sctxs[u.Host] = sctx
}
return sctxs, nil
}
func (e *Etcd) serve() (err error) {
var ctlscfg *tls.Config
if !e.cfg.ClientTLSInfo.Empty() {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
if ctlscfg, err = e.cfg.ClientTLSInfo.ServerConfig(); err != nil {
return err
}
}
if e.cfg.CorsInfo.String() != "" {
plog.Infof("cors = %s", e.cfg.CorsInfo)
}
// Start the peer server in a goroutine
ph := v2http.NewPeerHandler(e.Server)
for _, l := range e.Peers {
go func(l net.Listener) {
e.errc <- servePeerHTTP(l, ph)
}(l)
}
// Start a client server goroutine for each listen address
ch := http.Handler(&cors.CORSHandler{
Handler: v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout()),
Info: e.cfg.CorsInfo,
})
for _, sctx := range e.sctxs {
// read timeout does not work with http close notify
// TODO: https://github.com/golang/go/issues/9524
go func(s *serveCtx) {
e.errc <- s.serve(e.Server, ctlscfg, ch, e.errc)
}(sctx)
}
return nil
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdmain
package embed
import (
"crypto/tls"
@ -37,17 +37,23 @@ import (
type serveCtx struct {
l net.Listener
host string
secure bool
insecure bool
ctx context.Context
cancel context.CancelFunc
}
func newServeCtx() *serveCtx {
ctx, cancel := context.WithCancel(context.Background())
return &serveCtx{ctx: ctx, cancel: cancel}
}
// serve accepts incoming connections on the listener l,
// creating a new service goroutine for each. The service goroutines
// read requests and then call handler to reply to them.
func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error {
func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errc chan<- error) error {
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
<-s.ReadyNotify()
plog.Info("ready to serve client requests")
@ -56,12 +62,12 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler
if sctx.insecure {
gs := v3rpc.Server(s, nil)
grpcl := m.Match(cmux.HTTP2())
go func() { plog.Fatal(gs.Serve(grpcl)) }()
go func() { errc <- gs.Serve(grpcl) }()
opts := []grpc.DialOption{
grpc.WithInsecure(),
}
gwmux, err := registerGateway(sctx.l.Addr().String(), opts)
gwmux, err := sctx.registerGateway(opts)
if err != nil {
return err
}
@ -74,8 +80,8 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler
ErrorLog: logger, // do not log user error
}
httpl := m.Match(cmux.HTTP1())
go func() { plog.Fatal(srvhttp.Serve(httpl)) }()
plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.host)
go func() { errc <- srvhttp.Serve(httpl) }()
plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
}
if sctx.secure {
@ -87,7 +93,7 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler
dtls.InsecureSkipVerify = true
creds := credentials.NewTLS(dtls)
opts := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
gwmux, err := registerGateway(sctx.l.Addr().String(), opts)
gwmux, err := sctx.registerGateway(opts)
if err != nil {
return err
}
@ -102,9 +108,9 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
go func() { plog.Fatal(srv.Serve(tlsl)) }()
go func() { errc <- srv.Serve(tlsl) }()
plog.Infof("serving client requests on %s", sctx.host)
plog.Infof("serving client requests on %s", sctx.l.Addr().String())
}
return m.Serve()
@ -133,30 +139,32 @@ func servePeerHTTP(l net.Listener, handler http.Handler) error {
return srv.Serve(l)
}
func registerGateway(addr string, opts []grpc.DialOption) (*gw.ServeMux, error) {
func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
ctx := sctx.ctx
addr := sctx.l.Addr().String()
gwmux := gw.NewServeMux()
err := pb.RegisterKVHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
err := pb.RegisterKVHandlerFromEndpoint(ctx, gwmux, addr, opts)
if err != nil {
return nil, err
}
err = pb.RegisterWatchHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
err = pb.RegisterWatchHandlerFromEndpoint(ctx, gwmux, addr, opts)
if err != nil {
return nil, err
}
err = pb.RegisterLeaseHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
err = pb.RegisterLeaseHandlerFromEndpoint(ctx, gwmux, addr, opts)
if err != nil {
return nil, err
}
err = pb.RegisterClusterHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
err = pb.RegisterClusterHandlerFromEndpoint(ctx, gwmux, addr, opts)
if err != nil {
return nil, err
}
err = pb.RegisterMaintenanceHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
err = pb.RegisterMaintenanceHandlerFromEndpoint(ctx, gwmux, addr, opts)
if err != nil {
return nil, err
}
err = pb.RegisterAuthHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
err = pb.RegisterAuthHandlerFromEndpoint(ctx, gwmux, addr, opts)
if err != nil {
return nil, err
}

View File

@ -20,21 +20,17 @@ import (
"flag"
"fmt"
"io/ioutil"
"net/url"
"os"
"runtime"
"strings"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/pkg/flags"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/version"
"github.com/ghodss/yaml"
)
const (
var (
proxyFlagOff = "off"
proxyFlagReadonly = "readonly"
proxyFlagOn = "on"
@ -42,21 +38,6 @@ const (
fallbackFlagExit = "exit"
fallbackFlagProxy = "proxy"
clusterStateFlagNew = "new"
clusterStateFlagExisting = "existing"
defaultName = "default"
defaultInitialAdvertisePeerURLs = "http://localhost:2380"
defaultAdvertiseClientURLs = "http://localhost:2379"
defaultListenPeerURLs = "http://localhost:2380"
defaultListenClientURLs = "http://localhost:2379"
// maxElectionMs specifies the maximum value of election timeout.
// More details are listed in ../Documentation/tuning.md#time-parameters.
maxElectionMs = 50000
)
var (
ignored = []string{
"cluster-active-size",
"cluster-remove-delay",
@ -72,105 +53,60 @@ var (
"v",
"vv",
}
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set. " +
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
errUnsetAdvertiseClientURLsFlag = fmt.Errorf("--advertise-client-urls is required when --listen-client-urls is set explicitly")
)
type configProxy struct {
ProxyFailureWaitMs uint `json:"proxy-failure-wait"`
ProxyRefreshIntervalMs uint `json:"proxy-refresh-interval"`
ProxyDialTimeoutMs uint `json:"proxy-dial-timeout"`
ProxyWriteTimeoutMs uint `json:"proxy-write-timeout"`
ProxyReadTimeoutMs uint `json:"proxy-read-timeout"`
Fallback string
Proxy string
ProxyJSON string `json:"proxy"`
FallbackJSON string `json:"discovery-fallback"`
}
// config holds the config for a command line invocation of etcd
type config struct {
*flag.FlagSet
// member
corsInfo *cors.CORSInfo
lpurls, lcurls []url.URL
Dir string `json:"data-dir"`
WalDir string `json:"wal-dir"`
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
LPUrlsCfgFile string `json:"listen-peer-urls"`
LCUrlsCfgFile string `json:"listen-client-urls"`
CorsCfgFile string `json:"cors"`
// TickMs is the number of milliseconds between heartbeat ticks.
// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
// make ticks a cluster wide configuration.
TickMs uint `json:"heartbeat-interval"`
ElectionMs uint `json:"election-timeout"`
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
// clustering
apurls, acurls []url.URL
clusterState *flags.StringsFlag
DNSCluster string `json:"discovery-srv"`
Dproxy string `json:"discovery-proxy"`
Durl string `json:"discovery"`
fallback *flags.StringsFlag
InitialCluster string `json:"initial-cluster"`
InitialClusterToken string `json:"initial-cluster-token"`
StrictReconfigCheck bool `json:"strict-reconfig-check"`
ApurlsCfgFile string `json:"initial-advertise-peer-urls"`
AcurlsCfgFile string `json:"advertise-client-urls"`
ClusterStateCfgFile string `json:"initial-cluster-state"`
FallbackCfgFile string `json:"discovery-fallback"`
// proxy
proxy *flags.StringsFlag
ProxyFailureWaitMs uint `json:"proxy-failure-wait"`
ProxyRefreshIntervalMs uint `json:"proxy-refresh-interval"`
ProxyDialTimeoutMs uint `json:"proxy-dial-timeout"`
ProxyWriteTimeoutMs uint `json:"proxy-write-timeout"`
ProxyReadTimeoutMs uint `json:"proxy-read-timeout"`
ProxyCfgFile string `json:"proxy"`
// security
clientTLSInfo, peerTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerAutoTLS bool
ClientSecurityCfgFile securityConfig `json:"client-transport-security"`
PeerSecurityCfgFile securityConfig `json:"peer-transport-security"`
// Debug logging
Debug bool `json:"debug"`
LogPkgLevels string `json:"log-package-levels"`
// ForceNewCluster is unsafe
ForceNewCluster bool `json:"force-new-cluster"`
embed.Config
configProxy
configFlags
configFile string
printVersion bool
autoCompactionRetention int
enablePprof bool
configFile string
ignored []string
ignored []string
}
type securityConfig struct {
CAFile string `json:"ca-file"`
CertFile string `json:"cert-file"`
KeyFile string `json:"key-file"`
CertAuth bool `json:"client-cert-auth"`
TrustedCAFile string `json:"trusted-ca-file"`
AutoTLS bool `json:"auto-tls"`
// configFlags has the set of flags used for command line parsing a Config
type configFlags struct {
*flag.FlagSet
clusterState *flags.StringsFlag
fallback *flags.StringsFlag
proxy *flags.StringsFlag
}
func NewConfig() *config {
func newConfig() *config {
cfg := &config{
corsInfo: &cors.CORSInfo{},
Config: *embed.NewConfig(),
configProxy: configProxy{
Proxy: proxyFlagOff,
ProxyFailureWaitMs: 5000,
ProxyRefreshIntervalMs: 30000,
ProxyDialTimeoutMs: 1000,
ProxyWriteTimeoutMs: 5000,
},
ignored: ignored,
}
cfg.configFlags = configFlags{
FlagSet: flag.NewFlagSet("etcd", flag.ContinueOnError),
clusterState: flags.NewStringsFlag(
clusterStateFlagNew,
clusterStateFlagExisting,
embed.ClusterStateFlagNew,
embed.ClusterStateFlagExisting,
),
fallback: flags.NewStringsFlag(
fallbackFlagExit,
fallbackFlagProxy,
),
ignored: ignored,
proxy: flags.NewStringsFlag(
proxyFlagOff,
proxyFlagReadonly,
@ -178,7 +114,6 @@ func NewConfig() *config {
),
}
cfg.FlagSet = flag.NewFlagSet("etcd", flag.ContinueOnError)
fs := cfg.FlagSet
fs.Usage = func() {
fmt.Println(usageline)
@ -187,38 +122,38 @@ func NewConfig() *config {
fs.StringVar(&cfg.configFile, "config-file", "", "Path to the server configuration file")
// member
fs.Var(cfg.corsInfo, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
fs.StringVar(&cfg.Dir, "data-dir", "", "Path to the data directory.")
fs.StringVar(&cfg.WalDir, "wal-dir", "", "Path to the dedicated wal directory.")
fs.Var(flags.NewURLsValue(defaultListenPeerURLs), "listen-peer-urls", "List of URLs to listen on for peer traffic.")
fs.Var(flags.NewURLsValue(defaultListenClientURLs), "listen-client-urls", "List of URLs to listen on for client traffic.")
fs.UintVar(&cfg.MaxSnapFiles, "max-snapshots", defaultMaxSnapshots, "Maximum number of snapshot files to retain (0 is unlimited).")
fs.UintVar(&cfg.MaxWalFiles, "max-wals", defaultMaxWALs, "Maximum number of wal files to retain (0 is unlimited).")
fs.StringVar(&cfg.Name, "name", defaultName, "Human-readable name for this member.")
fs.Uint64Var(&cfg.SnapCount, "snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot to disk.")
fs.UintVar(&cfg.TickMs, "heartbeat-interval", 100, "Time (in milliseconds) of a heartbeat interval.")
fs.UintVar(&cfg.ElectionMs, "election-timeout", 1000, "Time (in milliseconds) for an election to timeout.")
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", 0, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
fs.Var(cfg.CorsInfo, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
fs.StringVar(&cfg.Dir, "data-dir", cfg.Dir, "Path to the data directory.")
fs.StringVar(&cfg.WalDir, "wal-dir", cfg.WalDir, "Path to the dedicated wal directory.")
fs.Var(flags.NewURLsValue(embed.DefaultListenPeerURLs), "listen-peer-urls", "List of URLs to listen on for peer traffic.")
fs.Var(flags.NewURLsValue(embed.DefaultListenClientURLs), "listen-client-urls", "List of URLs to listen on for client traffic.")
fs.UintVar(&cfg.MaxSnapFiles, "max-snapshots", cfg.MaxSnapFiles, "Maximum number of snapshot files to retain (0 is unlimited).")
fs.UintVar(&cfg.MaxWalFiles, "max-wals", cfg.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).")
fs.StringVar(&cfg.Name, "name", cfg.Name, "Human-readable name for this member.")
fs.Uint64Var(&cfg.SnapCount, "snapshot-count", cfg.SnapCount, "Number of committed transactions to trigger a snapshot to disk.")
fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
// clustering
fs.Var(flags.NewURLsValue(defaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
fs.Var(flags.NewURLsValue(defaultAdvertiseClientURLs), "advertise-client-urls", "List of this member's client URLs to advertise to the public.")
fs.StringVar(&cfg.Durl, "discovery", "", "Discovery URL used to bootstrap the cluster.")
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
fs.Var(flags.NewURLsValue(embed.DefaultAdvertiseClientURLs), "advertise-client-urls", "List of this member's client URLs to advertise to the public.")
fs.StringVar(&cfg.Durl, "discovery", cfg.Durl, "Discovery URL used to bootstrap the cluster.")
fs.Var(cfg.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %s", strings.Join(cfg.fallback.Values, ", ")))
if err := cfg.fallback.Set(fallbackFlagProxy); err != nil {
// Should never happen.
plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
}
fs.StringVar(&cfg.Dproxy, "discovery-proxy", "", "HTTP proxy to use for traffic to discovery service.")
fs.StringVar(&cfg.DNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster.")
fs.StringVar(&cfg.InitialCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for bootstrapping.")
fs.StringVar(&cfg.InitialClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap.")
fs.StringVar(&cfg.Dproxy, "discovery-proxy", cfg.Dproxy, "HTTP proxy to use for traffic to discovery service.")
fs.StringVar(&cfg.DNSCluster, "discovery-srv", cfg.DNSCluster, "DNS domain used to bootstrap initial cluster.")
fs.StringVar(&cfg.InitialCluster, "initial-cluster", cfg.InitialCluster, "Initial cluster configuration for bootstrapping.")
fs.StringVar(&cfg.InitialClusterToken, "initial-cluster-token", cfg.InitialClusterToken, "Initial cluster token for the etcd cluster during bootstrap.")
fs.Var(cfg.clusterState, "initial-cluster-state", "Initial cluster state ('new' or 'existing').")
if err := cfg.clusterState.Set(clusterStateFlagNew); err != nil {
if err := cfg.clusterState.Set(embed.ClusterStateFlagNew); err != nil {
// Should never happen.
plog.Panicf("unexpected error setting up clusterStateFlag: %v", err)
}
fs.BoolVar(&cfg.StrictReconfigCheck, "strict-reconfig-check", false, "Reject reconfiguration requests that would cause quorum loss.")
fs.BoolVar(&cfg.StrictReconfigCheck, "strict-reconfig-check", cfg.StrictReconfigCheck, "Reject reconfiguration requests that would cause quorum loss.")
// proxy
fs.Var(cfg.proxy, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(cfg.proxy.Values, ", ")))
@ -226,24 +161,24 @@ func NewConfig() *config {
// Should never happen.
plog.Panicf("unexpected error setting up proxyFlag: %v", err)
}
fs.UintVar(&cfg.ProxyFailureWaitMs, "proxy-failure-wait", 5000, "Time (in milliseconds) an endpoint will be held in a failed state.")
fs.UintVar(&cfg.ProxyRefreshIntervalMs, "proxy-refresh-interval", 30000, "Time (in milliseconds) of the endpoints refresh interval.")
fs.UintVar(&cfg.ProxyDialTimeoutMs, "proxy-dial-timeout", 1000, "Time (in milliseconds) for a dial to timeout.")
fs.UintVar(&cfg.ProxyWriteTimeoutMs, "proxy-write-timeout", 5000, "Time (in milliseconds) for a write to timeout.")
fs.UintVar(&cfg.ProxyReadTimeoutMs, "proxy-read-timeout", 0, "Time (in milliseconds) for a read to timeout.")
fs.UintVar(&cfg.ProxyFailureWaitMs, "proxy-failure-wait", cfg.ProxyFailureWaitMs, "Time (in milliseconds) an endpoint will be held in a failed state.")
fs.UintVar(&cfg.ProxyRefreshIntervalMs, "proxy-refresh-interval", cfg.ProxyRefreshIntervalMs, "Time (in milliseconds) of the endpoints refresh interval.")
fs.UintVar(&cfg.ProxyDialTimeoutMs, "proxy-dial-timeout", cfg.ProxyDialTimeoutMs, "Time (in milliseconds) for a dial to timeout.")
fs.UintVar(&cfg.ProxyWriteTimeoutMs, "proxy-write-timeout", cfg.ProxyWriteTimeoutMs, "Time (in milliseconds) for a write to timeout.")
fs.UintVar(&cfg.ProxyReadTimeoutMs, "proxy-read-timeout", cfg.ProxyReadTimeoutMs, "Time (in milliseconds) for a read to timeout.")
// security
fs.StringVar(&cfg.clientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.")
fs.StringVar(&cfg.clientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.")
fs.StringVar(&cfg.clientTLSInfo.KeyFile, "key-file", "", "Path to the client server TLS key file.")
fs.BoolVar(&cfg.clientTLSInfo.ClientCertAuth, "client-cert-auth", false, "Enable client cert authentication.")
fs.StringVar(&cfg.clientTLSInfo.TrustedCAFile, "trusted-ca-file", "", "Path to the client server TLS trusted CA key file.")
fs.StringVar(&cfg.ClientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.")
fs.StringVar(&cfg.ClientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.")
fs.StringVar(&cfg.ClientTLSInfo.KeyFile, "key-file", "", "Path to the client server TLS key file.")
fs.BoolVar(&cfg.ClientTLSInfo.ClientCertAuth, "client-cert-auth", false, "Enable client cert authentication.")
fs.StringVar(&cfg.ClientTLSInfo.TrustedCAFile, "trusted-ca-file", "", "Path to the client server TLS trusted CA key file.")
fs.BoolVar(&cfg.ClientAutoTLS, "auto-tls", false, "Client TLS using generated certificates")
fs.StringVar(&cfg.peerTLSInfo.CAFile, "peer-ca-file", "", "DEPRECATED: Path to the peer server TLS CA file.")
fs.StringVar(&cfg.peerTLSInfo.CertFile, "peer-cert-file", "", "Path to the peer server TLS cert file.")
fs.StringVar(&cfg.peerTLSInfo.KeyFile, "peer-key-file", "", "Path to the peer server TLS key file.")
fs.BoolVar(&cfg.peerTLSInfo.ClientCertAuth, "peer-client-cert-auth", false, "Enable peer client cert authentication.")
fs.StringVar(&cfg.peerTLSInfo.TrustedCAFile, "peer-trusted-ca-file", "", "Path to the peer server TLS trusted CA file.")
fs.StringVar(&cfg.PeerTLSInfo.CAFile, "peer-ca-file", "", "DEPRECATED: Path to the peer server TLS CA file.")
fs.StringVar(&cfg.PeerTLSInfo.CertFile, "peer-cert-file", "", "Path to the peer server TLS cert file.")
fs.StringVar(&cfg.PeerTLSInfo.KeyFile, "peer-key-file", "", "Path to the peer server TLS key file.")
fs.BoolVar(&cfg.PeerTLSInfo.ClientCertAuth, "peer-client-cert-auth", false, "Enable peer client cert authentication.")
fs.StringVar(&cfg.PeerTLSInfo.TrustedCAFile, "peer-trusted-ca-file", "", "Path to the peer server TLS trusted CA file.")
fs.BoolVar(&cfg.PeerAutoTLS, "peer-auto-tls", false, "Peer TLS using generated certificates")
// logging
@ -256,10 +191,10 @@ func NewConfig() *config {
// version
fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")
fs.IntVar(&cfg.autoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store in hour. 0 means disable auto compaction.")
fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store in hour. 0 means disable auto compaction.")
// pprof profiler via HTTP
fs.BoolVar(&cfg.enablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
// ignored
for _, f := range cfg.ignored {
@ -268,7 +203,7 @@ func NewConfig() *config {
return cfg
}
func (cfg *config) Parse(arguments []string) error {
func (cfg *config) parse(arguments []string) error {
perr := cfg.FlagSet.Parse(arguments)
switch perr {
case nil:
@ -293,11 +228,10 @@ func (cfg *config) Parse(arguments []string) error {
var err error
if cfg.configFile != "" {
plog.Infof("Loading server configuration from %q", cfg.configFile)
err = cfg.configFromFile()
err = cfg.configFromFile(cfg.configFile)
} else {
err = cfg.configFromCmdLine()
}
return err
}
@ -307,152 +241,72 @@ func (cfg *config) configFromCmdLine() error {
plog.Fatalf("%v", err)
}
cfg.lpurls = flags.URLsFromFlag(cfg.FlagSet, "listen-peer-urls")
cfg.apurls = flags.URLsFromFlag(cfg.FlagSet, "initial-advertise-peer-urls")
cfg.lcurls = flags.URLsFromFlag(cfg.FlagSet, "listen-client-urls")
cfg.acurls = flags.URLsFromFlag(cfg.FlagSet, "advertise-client-urls")
cfg.LPUrls = flags.URLsFromFlag(cfg.FlagSet, "listen-peer-urls")
cfg.APUrls = flags.URLsFromFlag(cfg.FlagSet, "initial-advertise-peer-urls")
cfg.LCUrls = flags.URLsFromFlag(cfg.FlagSet, "listen-client-urls")
cfg.ACUrls = flags.URLsFromFlag(cfg.FlagSet, "advertise-client-urls")
cfg.ClusterState = cfg.clusterState.String()
cfg.Fallback = cfg.fallback.String()
cfg.Proxy = cfg.proxy.String()
return cfg.validateConfig(func(field string) bool {
return flags.IsSet(cfg.FlagSet, field)
})
// disable default advertise-client-urls if lcurls is set
missingAC := flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls")
if !cfg.mayBeProxy() && missingAC {
cfg.ACUrls = nil
}
// disable default initial-cluster if discovery is set
if (cfg.Durl != "" || cfg.DNSCluster != "") && !flags.IsSet(cfg.FlagSet, "initial-cluster") {
cfg.InitialCluster = ""
}
return cfg.validate()
}
func (cfg *config) configFromFile() error {
b, err := ioutil.ReadFile(cfg.configFile)
func (cfg *config) configFromFile(path string) error {
eCfg, err := embed.ConfigFromFile(path)
if err != nil {
return err
}
cfg.Config = *eCfg
err = yaml.Unmarshal(b, cfg)
if err != nil {
return err
// load extra config information
b, rerr := ioutil.ReadFile(path)
if rerr != nil {
return rerr
}
if cfg.LPUrlsCfgFile != "" {
u, err := types.NewURLs(strings.Split(cfg.LPUrlsCfgFile, ","))
if err != nil {
plog.Fatalf("unexpected error setting up listen-peer-urls: %v", err)
}
cfg.lpurls = []url.URL(u)
if yerr := yaml.Unmarshal(b, &cfg.configProxy); yerr != nil {
return yerr
}
if cfg.LCUrlsCfgFile != "" {
u, err := types.NewURLs(strings.Split(cfg.LCUrlsCfgFile, ","))
if err != nil {
plog.Fatalf("unexpected error setting up listen-client-urls: %v", err)
}
cfg.lcurls = []url.URL(u)
}
if cfg.CorsCfgFile != "" {
if err := cfg.corsInfo.Set(cfg.CorsCfgFile); err != nil {
plog.Panicf("unexpected error setting up cors: %v", err)
}
}
if cfg.ApurlsCfgFile != "" {
u, err := types.NewURLs(strings.Split(cfg.ApurlsCfgFile, ","))
if err != nil {
plog.Fatalf("unexpected error setting up initial-advertise-peer-urls: %v", err)
}
cfg.apurls = []url.URL(u)
}
if cfg.AcurlsCfgFile != "" {
u, err := types.NewURLs(strings.Split(cfg.AcurlsCfgFile, ","))
if err != nil {
plog.Fatalf("unexpected error setting up advertise-peer-urls: %v", err)
}
cfg.acurls = []url.URL(u)
}
if cfg.ClusterStateCfgFile != "" {
if err := cfg.clusterState.Set(cfg.ClusterStateCfgFile); err != nil {
plog.Panicf("unexpected error setting up clusterStateFlag: %v", err)
}
}
if cfg.FallbackCfgFile != "" {
if err := cfg.fallback.Set(cfg.FallbackCfgFile); err != nil {
if cfg.FallbackJSON != "" {
if err := cfg.fallback.Set(cfg.FallbackJSON); err != nil {
plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
}
cfg.Fallback = cfg.fallback.String()
}
if cfg.ProxyCfgFile != "" {
if err := cfg.proxy.Set(cfg.ProxyCfgFile); err != nil {
if cfg.ProxyJSON != "" {
if err := cfg.proxy.Set(cfg.ProxyJSON); err != nil {
plog.Panicf("unexpected error setting up proxyFlag: %v", err)
}
cfg.Proxy = cfg.proxy.String()
}
copySecurityDetails := func(tls *transport.TLSInfo, ysc *securityConfig) {
tls.CAFile = ysc.CAFile
tls.CertFile = ysc.CertFile
tls.KeyFile = ysc.KeyFile
tls.ClientCertAuth = ysc.CertAuth
tls.TrustedCAFile = ysc.TrustedCAFile
}
copySecurityDetails(&cfg.clientTLSInfo, &cfg.ClientSecurityCfgFile)
copySecurityDetails(&cfg.peerTLSInfo, &cfg.PeerSecurityCfgFile)
cfg.ClientAutoTLS = cfg.ClientSecurityCfgFile.AutoTLS
cfg.PeerAutoTLS = cfg.PeerSecurityCfgFile.AutoTLS
fieldsToBeChecked := map[string]bool{
"discovery": (cfg.Durl != ""),
"listen-client-urls": (cfg.LCUrlsCfgFile != ""),
"advertise-client-urls": (cfg.AcurlsCfgFile != ""),
"initial-cluster": (cfg.InitialCluster != ""),
"discovery-srv": (cfg.DNSCluster != ""),
}
return cfg.validateConfig(func(field string) bool {
return fieldsToBeChecked[field]
})
}
func (cfg *config) validateConfig(isSet func(field string) bool) error {
// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
// TODO(yichengq): check this for joining through discovery service case
mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
mayBeProxy := cfg.proxy.String() != proxyFlagOff || mayFallbackToProxy
if !mayBeProxy {
if isSet("listen-client-urls") && !isSet("advertise-client-urls") {
return errUnsetAdvertiseClientURLsFlag
}
}
// Check if conflicting flags are passed.
nSet := 0
for _, v := range []bool{isSet("discovery"), isSet("initial-cluster"), isSet("discovery-srv")} {
if v {
nSet++
}
}
if nSet > 1 {
return ErrConflictBootstrapFlags
}
if 5*cfg.TickMs > cfg.ElectionMs {
return fmt.Errorf("--election-timeout[%vms] should be at least as 5 times as --heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
}
if cfg.ElectionMs > maxElectionMs {
return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
}
return nil
}
func initialClusterFromName(name string) string {
n := name
if name == "" {
n = defaultName
}
return fmt.Sprintf("%s=http://localhost:2380", n)
func (cfg *config) mayBeProxy() bool {
mayFallbackToProxy := cfg.Durl != "" && cfg.Fallback == fallbackFlagProxy
return cfg.Proxy != proxyFlagOff || mayFallbackToProxy
}
func (cfg *config) validate() error {
err := cfg.Config.Validate()
// TODO(yichengq): check this for joining through discovery service case
if err == embed.ErrUnsetAdvertiseClientURLsFlag && cfg.mayBeProxy() {
return nil
}
return err
}
func (cfg config) isNewCluster() bool { return cfg.clusterState.String() == clusterStateFlagNew }
func (cfg config) isProxy() bool { return cfg.proxy.String() != proxyFlagOff }
func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == proxyFlagReadonly }
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }

View File

@ -23,6 +23,7 @@ import (
"strings"
"testing"
"github.com/coreos/etcd/embed"
"github.com/ghodss/yaml"
)
@ -39,8 +40,8 @@ func TestConfigParsingMemberFlags(t *testing.T) {
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
}
cfg := NewConfig()
err := cfg.Parse(args)
cfg := newConfig()
err := cfg.parse(args)
if err != nil {
t.Fatal(err)
}
@ -81,9 +82,8 @@ func TestConfigFileMemberFields(t *testing.T) {
fmt.Sprintf("--config-file=%s", tmpfile.Name()),
}
cfg := NewConfig()
err = cfg.Parse(args)
if err != nil {
cfg := newConfig()
if err = cfg.parse(args); err != nil {
t.Fatal(err)
}
@ -100,9 +100,8 @@ func TestConfigParsingClusteringFlags(t *testing.T) {
"-discovery-fallback=exit",
}
cfg := NewConfig()
err := cfg.Parse(args)
if err != nil {
cfg := newConfig()
if err := cfg.parse(args); err != nil {
t.Fatal(err)
}
@ -137,8 +136,8 @@ func TestConfigFileClusteringFields(t *testing.T) {
args := []string{
fmt.Sprintf("--config-file=%s", tmpfile.Name()),
}
cfg := NewConfig()
err = cfg.Parse(args)
cfg := newConfig()
err = cfg.parse(args)
if err != nil {
t.Fatal(err)
}
@ -147,19 +146,10 @@ func TestConfigFileClusteringFields(t *testing.T) {
}
func TestConfigParsingOtherFlags(t *testing.T) {
args := []string{
"-proxy=readonly",
"-ca-file=cafile",
"-cert-file=certfile",
"-key-file=keyfile",
"-peer-ca-file=peercafile",
"-peer-cert-file=peercertfile",
"-peer-key-file=peerkeyfile",
"-force-new-cluster=true",
}
args := []string{"-proxy=readonly"}
cfg := NewConfig()
err := cfg.Parse(args)
cfg := newConfig()
err := cfg.parse(args)
if err != nil {
t.Fatal(err)
}
@ -169,23 +159,9 @@ func TestConfigParsingOtherFlags(t *testing.T) {
func TestConfigFileOtherFields(t *testing.T) {
yc := struct {
ProxyCfgFile string `json:"proxy"`
ClientSecurityCfgFile securityConfig `json:"client-transport-security"`
PeerSecurityCfgFile securityConfig `json:"peer-transport-security"`
ForceNewCluster bool `json:"force-new-cluster"`
ProxyCfgFile string `json:"proxy"`
}{
"readonly",
securityConfig{
CAFile: "cafile",
CertFile: "certfile",
KeyFile: "keyfile",
},
securityConfig{
CAFile: "peercafile",
CertFile: "peercertfile",
KeyFile: "peerkeyfile",
},
true,
}
b, err := yaml.Marshal(&yc)
@ -200,8 +176,8 @@ func TestConfigFileOtherFields(t *testing.T) {
fmt.Sprintf("--config-file=%s", tmpfile.Name()),
}
cfg := NewConfig()
err = cfg.Parse(args)
cfg := newConfig()
err = cfg.parse(args)
if err != nil {
t.Fatal(err)
}
@ -231,10 +207,9 @@ func TestConfigParsingConflictClusteringFlags(t *testing.T) {
}
for i, tt := range conflictArgs {
cfg := NewConfig()
err := cfg.Parse(tt)
if err != ErrConflictBootstrapFlags {
t.Errorf("%d: err = %v, want %v", i, err, ErrConflictBootstrapFlags)
cfg := newConfig()
if err := cfg.parse(tt); err != embed.ErrConflictBootstrapFlags {
t.Errorf("%d: err = %v, want %v", i, err, embed.ErrConflictBootstrapFlags)
}
}
}
@ -277,10 +252,9 @@ func TestConfigFileConflictClusteringFlags(t *testing.T) {
fmt.Sprintf("--config-file=%s", tmpfile.Name()),
}
cfg := NewConfig()
err = cfg.Parse(args)
if err != ErrConflictBootstrapFlags {
t.Errorf("%d: err = %v, want %v", i, err, ErrConflictBootstrapFlags)
cfg := newConfig()
if err := cfg.parse(args); err != embed.ErrConflictBootstrapFlags {
t.Errorf("%d: err = %v, want %v", i, err, embed.ErrConflictBootstrapFlags)
}
}
}
@ -295,14 +269,14 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
"-initial-cluster=infra1=http://127.0.0.1:2380",
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
embed.ErrUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-discovery-srv=example.com",
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
embed.ErrUnsetAdvertiseClientURLsFlag,
},
{
[]string{
@ -310,13 +284,13 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
"-discovery-fallback=exit",
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
embed.ErrUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
embed.ErrUnsetAdvertiseClientURLsFlag,
},
{
[]string{
@ -342,9 +316,8 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
}
for i, tt := range tests {
cfg := NewConfig()
err := cfg.Parse(tt.args)
if err != tt.werr {
cfg := newConfig()
if err := cfg.parse(tt.args); err != tt.werr {
t.Errorf("%d: err = %v, want %v", i, err, tt.werr)
}
}
@ -355,15 +328,16 @@ func TestConfigIsNewCluster(t *testing.T) {
state string
wIsNew bool
}{
{clusterStateFlagExisting, false},
{clusterStateFlagNew, true},
{embed.ClusterStateFlagExisting, false},
{embed.ClusterStateFlagNew, true},
}
for i, tt := range tests {
cfg := NewConfig()
if err := cfg.clusterState.Set(tt.state); err != nil {
cfg := newConfig()
args := []string{"--initial-cluster-state", tests[i].state}
if err := cfg.parse(args); err != nil {
t.Fatalf("#%d: unexpected clusterState.Set error: %v", i, err)
}
if g := cfg.isNewCluster(); g != tt.wIsNew {
if g := cfg.IsNewCluster(); g != tt.wIsNew {
t.Errorf("#%d: isNewCluster = %v, want %v", i, g, tt.wIsNew)
}
}
@ -379,7 +353,7 @@ func TestConfigIsProxy(t *testing.T) {
{proxyFlagOn, true},
}
for i, tt := range tests {
cfg := NewConfig()
cfg := newConfig()
if err := cfg.proxy.Set(tt.proxy); err != nil {
t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err)
}
@ -399,7 +373,7 @@ func TestConfigIsReadonlyProxy(t *testing.T) {
{proxyFlagOn, false},
}
for i, tt := range tests {
cfg := NewConfig()
cfg := newConfig()
if err := cfg.proxy.Set(tt.proxy); err != nil {
t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err)
}
@ -418,7 +392,7 @@ func TestConfigShouldFallbackToProxy(t *testing.T) {
{fallbackFlagExit, false},
}
for i, tt := range tests {
cfg := NewConfig()
cfg := newConfig()
if err := cfg.fallback.Set(tt.fallback); err != nil {
t.Fatalf("#%d: unexpected fallback.Set error: %v", i, err)
}
@ -458,9 +432,8 @@ func TestConfigFileElectionTimeout(t *testing.T) {
fmt.Sprintf("--config-file=%s", tmpfile.Name()),
}
cfg := NewConfig()
err = cfg.Parse(args)
if !strings.Contains(err.Error(), tt.errStr) {
cfg := newConfig()
if err := cfg.parse(args); err == nil || !strings.Contains(err.Error(), tt.errStr) {
t.Errorf("%d: Wrong err = %v", i, err)
}
}
@ -485,10 +458,10 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File {
}
func validateMemberFlags(t *testing.T, cfg *config) {
wcfg := &config{
wcfg := &embed.Config{
Dir: "testdir",
lpurls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
lcurls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
@ -510,25 +483,25 @@ func validateMemberFlags(t *testing.T, cfg *config) {
if cfg.SnapCount != wcfg.SnapCount {
t.Errorf("snapcount = %v, want %v", cfg.SnapCount, wcfg.SnapCount)
}
if !reflect.DeepEqual(cfg.lpurls, wcfg.lpurls) {
t.Errorf("listen-peer-urls = %v, want %v", cfg.lpurls, wcfg.lpurls)
if !reflect.DeepEqual(cfg.LPUrls, wcfg.LPUrls) {
t.Errorf("listen-peer-urls = %v, want %v", cfg.LPUrls, wcfg.LPUrls)
}
if !reflect.DeepEqual(cfg.lcurls, wcfg.lcurls) {
t.Errorf("listen-client-urls = %v, want %v", cfg.lcurls, wcfg.lcurls)
if !reflect.DeepEqual(cfg.LCUrls, wcfg.LCUrls) {
t.Errorf("listen-client-urls = %v, want %v", cfg.LCUrls, wcfg.LCUrls)
}
}
func validateClusteringFlags(t *testing.T, cfg *config) {
wcfg := NewConfig()
wcfg.apurls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}
wcfg.acurls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}
wcfg.clusterState.Set(clusterStateFlagExisting)
wcfg := newConfig()
wcfg.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}
wcfg.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}
wcfg.ClusterState = embed.ClusterStateFlagExisting
wcfg.fallback.Set(fallbackFlagExit)
wcfg.InitialCluster = "0=http://localhost:8000"
wcfg.InitialClusterToken = "etcdtest"
if cfg.clusterState.String() != wcfg.clusterState.String() {
t.Errorf("clusterState = %v, want %v", cfg.clusterState, wcfg.clusterState)
if cfg.ClusterState != wcfg.ClusterState {
t.Errorf("clusterState = %v, want %v", cfg.ClusterState, wcfg.ClusterState)
}
if cfg.fallback.String() != wcfg.fallback.String() {
t.Errorf("fallback = %v, want %v", cfg.fallback, wcfg.fallback)
@ -539,35 +512,18 @@ func validateClusteringFlags(t *testing.T, cfg *config) {
if cfg.InitialClusterToken != wcfg.InitialClusterToken {
t.Errorf("initialClusterToken = %v, want %v", cfg.InitialClusterToken, wcfg.InitialClusterToken)
}
if !reflect.DeepEqual(cfg.apurls, wcfg.apurls) {
t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.lpurls, wcfg.lpurls)
if !reflect.DeepEqual(cfg.APUrls, wcfg.APUrls) {
t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.LPUrls, wcfg.LPUrls)
}
if !reflect.DeepEqual(cfg.acurls, wcfg.acurls) {
t.Errorf("advertise-client-urls = %v, want %v", cfg.lcurls, wcfg.lcurls)
if !reflect.DeepEqual(cfg.ACUrls, wcfg.ACUrls) {
t.Errorf("advertise-client-urls = %v, want %v", cfg.LCUrls, wcfg.LCUrls)
}
}
func validateOtherFlags(t *testing.T, cfg *config) {
wcfg := NewConfig()
wcfg := newConfig()
wcfg.proxy.Set(proxyFlagReadonly)
wcfg.clientTLSInfo.CAFile = "cafile"
wcfg.clientTLSInfo.CertFile = "certfile"
wcfg.clientTLSInfo.KeyFile = "keyfile"
wcfg.peerTLSInfo.CAFile = "peercafile"
wcfg.peerTLSInfo.CertFile = "peercertfile"
wcfg.peerTLSInfo.KeyFile = "peerkeyfile"
wcfg.ForceNewCluster = true
if cfg.proxy.String() != wcfg.proxy.String() {
t.Errorf("proxy = %v, want %v", cfg.proxy, wcfg.proxy)
}
if cfg.clientTLSInfo.String() != wcfg.clientTLSInfo.String() {
t.Errorf("clientTLS = %v, want %v", cfg.clientTLSInfo, wcfg.clientTLSInfo)
}
if cfg.peerTLSInfo.String() != wcfg.peerTLSInfo.String() {
t.Errorf("peerTLS = %v, want %v", cfg.peerTLSInfo, wcfg.peerTLSInfo)
}
if cfg.ForceNewCluster != wcfg.ForceNewCluster {
t.Errorf("forceNewCluster = %t, want %t", cfg.ForceNewCluster, wcfg.ForceNewCluster)
}
}

View File

@ -1,26 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build windows
package etcdmain
// TODO(barakmich): So because file locking on Windows is untested, the
// temporary fix is to default to unlimited snapshots and WAL files, with manual
// removal. Perhaps not the most elegant solution, but it's at least safe and
// we'd totally love a PR to fix the story around locking.
const (
defaultMaxSnapshots = 0
defaultMaxWALs = 0
)

View File

@ -21,7 +21,6 @@ import (
"io/ioutil"
"net"
"net/http"
_ "net/http/pprof"
"os"
"path"
"reflect"
@ -30,17 +29,15 @@ import (
"time"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/fileutil"
pkgioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/osutil"
runtimeutil "github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/proxy/httpproxy"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/version"
"github.com/coreos/go-systemd/daemon"
systemdutil "github.com/coreos/go-systemd/util"
@ -52,23 +49,6 @@ type dirType string
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdmain")
const (
// the owner can make/remove files inside the directory
privateDirMode = 0700
// internal fd usage includes disk usage and transport usage.
// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
// read all logs after some snapshot index, which locates at the end of
// the second last and the head of the last. For purging, it needs to read
// directory, so it needs 1. For fd monitor, it needs 1.
// For transport, rafthttp builds two long-polling connections and at most
// four temporary connections with each member. There are at most 9 members
// in a cluster, so it should reserve 96.
// For the safety, we set the total reserved number to 150.
reservedInternalFDNum = 150
)
var (
dirMember = dirType("member")
dirProxy = dirType("proxy")
@ -76,12 +56,12 @@ var (
)
func startEtcdOrProxyV2() {
cfg := NewConfig()
err := cfg.Parse(os.Args[1:])
cfg := newConfig()
err := cfg.parse(os.Args[1:])
if err != nil {
plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err)
switch err {
case errUnsetAdvertiseClientURLsFlag:
case embed.ErrUnsetAdvertiseClientURLsFlag:
plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
}
os.Exit(1)
@ -89,6 +69,7 @@ func startEtcdOrProxyV2() {
setupLogging(cfg)
var stopped <-chan struct{}
var errc <-chan error
plog.Infof("etcd Version: %s\n", version.Version)
plog.Infof("Git SHA: %s\n", version.GitSHA)
@ -99,8 +80,8 @@ func startEtcdOrProxyV2() {
plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU())
// TODO: check whether fields are set instead of whether fields have default value
if cfg.Name != defaultName && cfg.InitialCluster == initialClusterFromName(defaultName) {
cfg.InitialCluster = initialClusterFromName(cfg.Name)
if cfg.Name != embed.DefaultName && cfg.InitialCluster == cfg.InitialClusterFromName(embed.DefaultName) {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
}
if cfg.Dir == "" {
@ -113,7 +94,7 @@ func startEtcdOrProxyV2() {
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
switch which {
case dirMember:
stopped, err = startEtcd(cfg)
stopped, errc, err = startEtcd(&cfg.Config)
case dirProxy:
err = startProxy(cfg)
default:
@ -122,7 +103,7 @@ func startEtcdOrProxyV2() {
} else {
shouldProxy := cfg.isProxy()
if !shouldProxy {
stopped, err = startEtcd(cfg)
stopped, errc, err = startEtcd(&cfg.Config)
if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster {
if cfg.shouldFallbackToProxy() {
plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
@ -157,13 +138,13 @@ func startEtcdOrProxyV2() {
if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
plog.Infof("%v", err)
if cfg.InitialCluster == initialClusterFromName(cfg.Name) {
if cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) {
plog.Infof("forgot to set --initial-cluster flag?")
}
if types.URLs(cfg.apurls).String() == defaultInitialAdvertisePeerURLs {
if types.URLs(cfg.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
plog.Infof("forgot to set --initial-advertise-peer-urls flag?")
}
if cfg.InitialCluster == initialClusterFromName(cfg.Name) && len(cfg.Durl) == 0 {
if cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) && len(cfg.Durl) == 0 {
plog.Infof("if you want to use discovery service, please set --discovery flag.")
}
os.Exit(1)
@ -188,233 +169,43 @@ func startEtcdOrProxyV2() {
}
}
<-stopped
select {
case lerr := <-errc:
// fatal out on listener errors
plog.Fatal(lerr)
case <-stopped:
}
osutil.Exit(0)
}
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
func startEtcd(cfg *config) (<-chan struct{}, error) {
urlsmap, token, err := getPeerURLsMapAndToken(cfg, "etcd")
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
return nil, nil, err
}
if cfg.PeerAutoTLS && cfg.peerTLSInfo.Empty() {
var phosts []string
for _, u := range cfg.lpurls {
phosts = append(phosts, u.Host)
}
cfg.peerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}
} else if cfg.PeerAutoTLS {
plog.Warningf("ignoring peer auto TLS since certs given")
}
if !cfg.peerTLSInfo.Empty() {
plog.Infof("peerTLS: %s", cfg.peerTLSInfo)
}
var plns []net.Listener
for _, u := range cfg.lpurls {
if u.Scheme == "http" {
if !cfg.peerTLSInfo.Empty() {
plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
}
if cfg.peerTLSInfo.ClientCertAuth {
plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
}
}
var (
l net.Listener
tlscfg *tls.Config
)
if !cfg.peerTLSInfo.Empty() {
tlscfg, err = cfg.peerTLSInfo.ServerConfig()
if err != nil {
return nil, err
}
}
l, err = rafthttp.NewListener(u, tlscfg)
if err != nil {
return nil, err
}
urlStr := u.String()
plog.Info("listening for peers on ", urlStr)
defer func() {
if err != nil {
l.Close()
plog.Info("stopping listening for peers on ", urlStr)
}
}()
plns = append(plns, l)
}
if cfg.ClientAutoTLS && cfg.clientTLSInfo.Empty() {
var chosts []string
for _, u := range cfg.lcurls {
chosts = append(chosts, u.Host)
}
cfg.clientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}
} else if cfg.ClientAutoTLS {
plog.Warningf("ignoring client auto TLS since certs given")
}
var ctlscfg *tls.Config
if !cfg.clientTLSInfo.Empty() {
plog.Infof("clientTLS: %s", cfg.clientTLSInfo)
ctlscfg, err = cfg.clientTLSInfo.ServerConfig()
if err != nil {
return nil, err
}
}
sctxs := make(map[string]*serveCtx)
for _, u := range cfg.lcurls {
if u.Scheme == "http" {
if !cfg.clientTLSInfo.Empty() {
plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
}
if cfg.clientTLSInfo.ClientCertAuth {
plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
}
}
if u.Scheme == "https" && ctlscfg == nil {
return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
}
ctx := &serveCtx{host: u.Host}
if u.Scheme == "https" {
ctx.secure = true
} else {
ctx.insecure = true
}
if sctxs[u.Host] != nil {
if ctx.secure {
sctxs[u.Host].secure = true
}
if ctx.insecure {
sctxs[u.Host].insecure = true
}
continue
}
var l net.Listener
l, err = net.Listen("tcp", u.Host)
if err != nil {
return nil, err
}
var fdLimit uint64
if fdLimit, err = runtimeutil.FDLimit(); err == nil {
if fdLimit <= reservedInternalFDNum {
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
}
l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
}
l, err = transport.NewKeepAliveListener(l, "tcp", nil)
ctx.l = l
if err != nil {
return nil, err
}
plog.Info("listening for client requests on ", u.Host)
defer func() {
if err != nil {
l.Close()
plog.Info("stopping listening for client requests on ", u.Host)
}
}()
sctxs[u.Host] = ctx
}
srvcfg := &etcdserver.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.acurls,
PeerURLs: cfg.apurls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapCount: cfg.SnapCount,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
NewCluster: cfg.isNewCluster(),
ForceNewCluster: cfg.ForceNewCluster,
PeerTLSInfo: cfg.peerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.electionTicks(),
AutoCompactionRetention: cfg.autoCompactionRetention,
QuotaBackendBytes: cfg.QuotaBackendBytes,
StrictReconfigCheck: cfg.StrictReconfigCheck,
EnablePprof: cfg.enablePprof,
}
var s *etcdserver.EtcdServer
s, err = etcdserver.NewServer(srvcfg)
if err != nil {
return nil, err
}
s.Start()
osutil.RegisterInterruptHandler(s.Stop)
if cfg.corsInfo.String() != "" {
plog.Infof("cors = %s", cfg.corsInfo)
}
ch := http.Handler(&cors.CORSHandler{
Handler: v2http.NewClientHandler(s, srvcfg.ReqTimeout()),
Info: cfg.corsInfo,
})
ph := v2http.NewPeerHandler(s)
// Start the peer server in a goroutine
for _, l := range plns {
go func(l net.Listener) {
plog.Fatal(servePeerHTTP(l, ph))
}(l)
}
// Start a client server goroutine for each listen address
for _, sctx := range sctxs {
go func(sctx *serveCtx) {
// read timeout does not work with http close notify
// TODO: https://github.com/golang/go/issues/9524
plog.Fatal(serve(sctx, s, ctlscfg, ch))
}(sctx)
}
<-s.ReadyNotify()
return s.StopNotify(), nil
osutil.RegisterInterruptHandler(e.Server.Stop)
return e.Server.StopNotify(), e.Err(), nil
}
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
func startProxy(cfg *config) error {
plog.Notice("proxy: this proxy supports v2 API only!")
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
if err != nil {
return err
}
pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost
tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
tr, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
if err != nil {
return err
}
cfg.Dir = path.Join(cfg.Dir, "proxy")
err = os.MkdirAll(cfg.Dir, privateDirMode)
err = os.MkdirAll(cfg.Dir, fileutil.PrivateDirMode)
if err != nil {
return err
}
@ -440,7 +231,7 @@ func startProxy(cfg *config) error {
plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile)
case os.IsNotExist(err):
var urlsmap types.URLsMap
urlsmap, _, err = getPeerURLsMapAndToken(cfg, "proxy")
urlsmap, _, err = cfg.PeerURLsMapAndToken("proxy")
if err != nil {
return fmt.Errorf("error setting up initial cluster: %v", err)
}
@ -502,20 +293,20 @@ func startProxy(cfg *config) error {
ph := httpproxy.NewHandler(pt, uf, time.Duration(cfg.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.ProxyRefreshIntervalMs)*time.Millisecond)
ph = &cors.CORSHandler{
Handler: ph,
Info: cfg.corsInfo,
Info: cfg.CorsInfo,
}
if cfg.isReadonlyProxy() {
ph = httpproxy.NewReadonlyHandler(ph)
}
// Start a proxy server goroutine for each listen address
for _, u := range cfg.lcurls {
for _, u := range cfg.LCUrls {
var (
l net.Listener
tlscfg *tls.Config
)
if !cfg.clientTLSInfo.Empty() {
tlscfg, err = cfg.clientTLSInfo.ServerConfig()
if !cfg.ClientTLSInfo.Empty() {
tlscfg, err = cfg.ClientTLSInfo.ServerConfig()
if err != nil {
return err
}
@ -538,37 +329,6 @@ func startProxy(cfg *config) error {
return nil
}
// getPeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery.
func getPeerURLsMapAndToken(cfg *config, which string) (urlsmap types.URLsMap, token string, err error) {
switch {
case cfg.Durl != "":
urlsmap = types.URLsMap{}
// If using discovery, generate a temporary cluster based on
// self's advertised peer URLs
urlsmap[cfg.Name] = cfg.apurls
token = cfg.Durl
case cfg.DNSCluster != "":
var clusterStr string
clusterStr, token, err = discovery.SRVGetCluster(cfg.Name, cfg.DNSCluster, cfg.InitialClusterToken, cfg.apurls)
if err != nil {
return nil, "", err
}
urlsmap, err = types.NewURLsMap(clusterStr)
// only etcd member must belong to the discovered cluster.
// proxy does not need to belong to the discovered cluster.
if which == "etcd" {
if _, ok := urlsmap[cfg.Name]; !ok {
return nil, "", fmt.Errorf("cannot find local etcd member %q in SRV records", cfg.Name)
}
}
default:
// We're statically configured, and cluster has appropriately been set.
urlsmap, err = types.NewURLsMap(cfg.InitialCluster)
token = cfg.InitialClusterToken
}
return urlsmap, token, err
}
// identifyDataDirOrDie returns the type of the data dir.
// Dies if the datadir is invalid.
func identifyDataDirOrDie(dir string) dirType {

View File

@ -14,7 +14,11 @@
package etcdmain
import "strconv"
import (
"strconv"
"github.com/coreos/etcd/embed"
)
var (
usageline = `usage: etcd [flags]
@ -48,9 +52,9 @@ member flags:
list of URLs to listen on for peer traffic.
--listen-client-urls 'http://localhost:2379'
list of URLs to listen on for client traffic.
--max-snapshots '` + strconv.Itoa(defaultMaxSnapshots) + `'
--max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `'
maximum number of snapshot files to retain (0 is unlimited).
--max-wals '` + strconv.Itoa(defaultMaxWALs) + `'
--max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `'
maximum number of wal files to retain (0 is unlimited).
--cors ''
comma-separated whitelist of origins for CORS (cross-origin resource sharing).