*: configure server logger

- Add/Document "logger" to support structured logging.
  - This makes functional tests run easier, since zap logger
    provides built-in log redirect to files.
  - "etcd --logger-option=zap" to enable structured logging.
- Current "capnslog" will still be used as "default".
  - We may switch the default or deprecate "capnslog" in v3.5.
  - Either way, will clearly be documented.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee
2018-04-14 22:52:39 -07:00
parent 82e84a09e1
commit 041b9069a2
14 changed files with 899 additions and 283 deletions

View File

@@ -24,11 +24,14 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"syscall"
"time" "time"
"github.com/coreos/etcd/compactor" "github.com/coreos/etcd/compactor"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/flags" "github.com/coreos/etcd/pkg/flags"
"github.com/coreos/etcd/pkg/logutil"
"github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/srv" "github.com/coreos/etcd/pkg/srv"
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/transport"
@@ -107,21 +110,12 @@ func init() {
// Config holds the arguments for configuring an etcd server. // Config holds the arguments for configuring an etcd server.
type Config struct { type Config struct {
LPUrls, LCUrls []url.URL Name string `json:"name"`
Dir string `json:"data-dir"` Dir string `json:"data-dir"`
WalDir string `json:"wal-dir"` WalDir string `json:"wal-dir"`
SnapCount uint64 `json:"snapshot-count"`
MaxSnapFiles uint `json:"max-snapshots"` MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"` MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
// AutoCompactionMode is either 'periodic' or 'revision'.
AutoCompactionMode string `json:"auto-compaction-mode"`
// AutoCompactionRetention is either duration string with time unit
// (e.g. '5m' for 5-minute), or revision unit (e.g. '5000').
// If no time unit is provided and compaction mode is 'periodic',
// the unit defaults to hour. For example, '5' translates into 5-hour.
AutoCompactionRetention string `json:"auto-compaction-retention"`
// TickMs is the number of milliseconds between heartbeat ticks. // TickMs is the number of milliseconds between heartbeat ticks.
// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1). // TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
@@ -132,6 +126,31 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"` MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"` MaxRequestBytes uint `json:"max-request-bytes"`
LPUrls, LCUrls []url.URL
APUrls, ACUrls []url.URL
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerTLSInfo transport.TLSInfo
PeerAutoTLS bool
ClusterState string `json:"initial-cluster-state"`
DNSCluster string `json:"discovery-srv"`
DNSClusterServiceName string `json:"discovery-srv-name"`
Dproxy string `json:"discovery-proxy"`
Durl string `json:"discovery"`
InitialCluster string `json:"initial-cluster"`
InitialClusterToken string `json:"initial-cluster-token"`
StrictReconfigCheck bool `json:"strict-reconfig-check"`
EnableV2 bool `json:"enable-v2"`
// AutoCompactionMode is either 'periodic' or 'revision'.
AutoCompactionMode string `json:"auto-compaction-mode"`
// AutoCompactionRetention is either duration string with time unit
// (e.g. '5m' for 5-minute), or revision unit (e.g. '5000').
// If no time unit is provided and compaction mode is 'periodic',
// the unit defaults to hour. For example, '5' translates into 5-hour.
AutoCompactionRetention string `json:"auto-compaction-retention"`
// GRPCKeepAliveMinTime is the minimum interval that a client should // GRPCKeepAliveMinTime is the minimum interval that a client should
// wait before pinging server. When client pings "too fast", server // wait before pinging server. When client pings "too fast", server
// sends goaway and closes the connection (errors: too_many_pings, // sends goaway and closes the connection (errors: too_many_pings,
@@ -147,17 +166,6 @@ type Config struct {
// before closing a non-responsive connection. 0 to disable. // before closing a non-responsive connection. 0 to disable.
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"` GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`
APUrls, ACUrls []url.URL
ClusterState string `json:"initial-cluster-state"`
DNSCluster string `json:"discovery-srv"`
DNSClusterServiceName string `json:"discovery-srv-name"`
Dproxy string `json:"discovery-proxy"`
Durl string `json:"discovery"`
InitialCluster string `json:"initial-cluster"`
InitialClusterToken string `json:"initial-cluster-token"`
StrictReconfigCheck bool `json:"strict-reconfig-check"`
EnableV2 bool `json:"enable-v2"`
// PreVote is true to enable Raft Pre-Vote. // PreVote is true to enable Raft Pre-Vote.
// If enabled, Raft runs an additional election phase // If enabled, Raft runs an additional election phase
// to check whether it would get enough votes to win // to check whether it would get enough votes to win
@@ -165,11 +173,6 @@ type Config struct {
// TODO: enable by default in 3.5. // TODO: enable by default in 3.5.
PreVote bool `json:"pre-vote"` PreVote bool `json:"pre-vote"`
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerTLSInfo transport.TLSInfo
PeerAutoTLS bool
CORS map[string]struct{} CORS map[string]struct{}
// HostWhitelist lists acceptable hostnames from HTTP client requests. // HostWhitelist lists acceptable hostnames from HTTP client requests.
@@ -198,21 +201,6 @@ type Config struct {
// - https://github.com/coreos/etcd/issues/9353 // - https://github.com/coreos/etcd/issues/9353
HostWhitelist map[string]struct{} HostWhitelist map[string]struct{}
// Logger logs server-side operations.
// If nil, all logs are discarded.
// TODO: make it configurable with existing logger.
// Currently, only logs TLS transport.
Logger *zap.Logger
Debug bool `json:"debug"`
LogPkgLevels string `json:"log-package-levels"`
LogOutput string `json:"log-output"`
EnablePprof bool `json:"enable-pprof"`
Metrics string `json:"metrics"`
ListenMetricsUrls []url.URL
ListenMetricsUrlsJSON string `json:"listen-metrics-urls"`
// UserHandlers is for registering users handlers and only used for // UserHandlers is for registering users handlers and only used for
// embedding etcd into other applications. // embedding etcd into other applications.
// The map key is the route path for the handler, and // The map key is the route path for the handler, and
@@ -235,6 +223,36 @@ type Config struct {
// ForceNewCluster starts a new cluster even if previously started; unsafe. // ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"` ForceNewCluster bool `json:"force-new-cluster"`
EnablePprof bool `json:"enable-pprof"`
Metrics string `json:"metrics"`
ListenMetricsUrls []url.URL
ListenMetricsUrlsJSON string `json:"listen-metrics-urls"`
// logger logs server-side operations. The default is nil,
// and "setupLogging" must be called before starting server.
// Do not set logger directly.
loggerMu *sync.RWMutex
logger *zap.Logger
loggerConfig zap.Config
// Logger is logger options: "zap", "capnslog".
// WARN: "capnslog" is being deprecated in v3.5.
Logger string `json:"logger"`
// LogOutput is either:
// - "default" as os.Stderr
// - "stderr" as os.Stderr
// - "stdout" as os.Stdout
// - file path to append server logs to
LogOutput string `json:"log-output"`
// Debug is true, to enable debug level logging.
Debug bool `json:"debug"`
// LogPkgLevels is being deprecated in v3.5.
// Only valid if "logger" option is "capnslog".
// WARN: DO NOT USE THIS!
LogPkgLevels string `json:"log-package-levels"`
} }
// configYAML holds the config suitable for yaml parsing // configYAML holds the config suitable for yaml parsing
@@ -271,7 +289,6 @@ func NewConfig() *Config {
apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs) apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs)
lcurl, _ := url.Parse(DefaultListenClientURLs) lcurl, _ := url.Parse(DefaultListenClientURLs)
acurl, _ := url.Parse(DefaultAdvertiseClientURLs) acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
lg, _ := zap.NewProduction()
cfg := &Config{ cfg := &Config{
MaxSnapFiles: DefaultMaxSnapshots, MaxSnapFiles: DefaultMaxSnapshots,
MaxWalFiles: DefaultMaxWALs, MaxWalFiles: DefaultMaxWALs,
@@ -291,14 +308,19 @@ func NewConfig() *Config {
ClusterState: ClusterStateFlagNew, ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster", InitialClusterToken: "etcd-cluster",
StrictReconfigCheck: DefaultStrictReconfigCheck, StrictReconfigCheck: DefaultStrictReconfigCheck,
Logger: lg,
LogOutput: DefaultLogOutput,
Metrics: "basic", Metrics: "basic",
EnableV2: DefaultEnableV2, EnableV2: DefaultEnableV2,
CORS: map[string]struct{}{"*": {}}, CORS: map[string]struct{}{"*": {}},
HostWhitelist: map[string]struct{}{"*": {}}, HostWhitelist: map[string]struct{}{"*": {}},
AuthToken: "simple", AuthToken: "simple",
PreVote: false, // TODO: enable by default in v3.5 PreVote: false, // TODO: enable by default in v3.5
loggerMu: new(sync.RWMutex),
logger: nil,
Logger: "capnslog",
LogOutput: DefaultLogOutput,
Debug: false,
LogPkgLevels: "",
} }
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg return cfg
@@ -317,15 +339,24 @@ func logTLSHandshakeFailure(conn *tls.Conn, err error) {
} }
} }
// SetupLogging initializes etcd logging. // GetLogger returns the logger.
// Must be called after flag parsing. func (cfg Config) GetLogger() *zap.Logger {
func (cfg *Config) SetupLogging() { cfg.loggerMu.RLock()
l := cfg.logger
cfg.loggerMu.RUnlock()
return l
}
// setupLogging initializes etcd logging.
// Must be called after flag parsing or finishing configuring embed.Config.
func (cfg *Config) setupLogging() error {
switch cfg.Logger {
case "capnslog": // TODO: deprecate this in v3.5
capnslog.SetGlobalLogLevel(capnslog.INFO)
cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure
cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure
capnslog.SetGlobalLogLevel(capnslog.INFO)
if cfg.Debug { if cfg.Debug {
cfg.Logger = zap.NewExample()
capnslog.SetGlobalLogLevel(capnslog.DEBUG) capnslog.SetGlobalLogLevel(capnslog.DEBUG)
grpc.EnableTracing = true grpc.EnableTracing = true
// enable info, warning, error // enable info, warning, error
@@ -334,12 +365,14 @@ func (cfg *Config) SetupLogging() {
// only discard info // only discard info
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
} }
// TODO: deprecate with "capnslog"
if cfg.LogPkgLevels != "" { if cfg.LogPkgLevels != "" {
repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd") repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd")
settings, err := repoLog.ParseLogLevelConfig(cfg.LogPkgLevels) settings, err := repoLog.ParseLogLevelConfig(cfg.LogPkgLevels)
if err != nil { if err != nil {
plog.Warningf("couldn't parse log level string: %s, continuing with default levels", err.Error()) plog.Warningf("couldn't parse log level string: %s, continuing with default levels", err.Error())
return return nil
} }
repoLog.SetLogLevel(settings) repoLog.SetLogLevel(settings)
} }
@@ -356,6 +389,100 @@ func (cfg *Config) SetupLogging() {
default: default:
plog.Panicf(`unknown log-output %q (only supports %q, "stdout", "stderr")`, cfg.LogOutput, DefaultLogOutput) plog.Panicf(`unknown log-output %q (only supports %q, "stdout", "stderr")`, cfg.LogOutput, DefaultLogOutput)
} }
case "zap":
// TODO: make this more configurable
lcfg := zap.Config{
Level: zap.NewAtomicLevelAt(zap.InfoLevel),
Development: false,
Sampling: &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
},
Encoding: "json",
EncoderConfig: zap.NewProductionEncoderConfig(),
}
switch cfg.LogOutput {
case DefaultLogOutput:
if syscall.Getppid() == 1 {
// capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr))
// where "NewDefaultFormatter" returns "NewJournaldFormatter"
// when syscall.Getppid() == 1, specify 'stdout' or 'stderr' to
// skip journald logging even when running under systemd
fmt.Println("running under init, which may be systemd!")
// TODO: capnlog.NewJournaldFormatter()
lcfg.OutputPaths = []string{"stderr"}
lcfg.ErrorOutputPaths = []string{"stderr"}
} else {
lcfg.OutputPaths = []string{"stderr"}
lcfg.ErrorOutputPaths = []string{"stderr"}
}
case "stderr":
lcfg.OutputPaths = []string{"stderr"}
lcfg.ErrorOutputPaths = []string{"stderr"}
case "stdout":
lcfg.OutputPaths = []string{"stdout"}
lcfg.ErrorOutputPaths = []string{"stdout"}
default:
lcfg.OutputPaths = []string{cfg.LogOutput}
lcfg.ErrorOutputPaths = []string{cfg.LogOutput}
}
if cfg.Debug {
lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
grpc.EnableTracing = true
}
var err error
cfg.logger, err = lcfg.Build()
if err != nil {
return err
}
cfg.loggerConfig = lcfg
// debug true, enable info, warning, error
// debug false, only discard info
var gl grpclog.LoggerV2
gl, err = logutil.NewGRPCLoggerV2(lcfg)
if err != nil {
return err
}
grpclog.SetLoggerV2(gl)
logTLSHandshakeFailure := func(conn *tls.Conn, err error) {
state := conn.ConnectionState()
remoteAddr := conn.RemoteAddr().String()
serverName := state.ServerName
if len(state.PeerCertificates) > 0 {
cert := state.PeerCertificates[0]
ips := make([]string, 0, len(cert.IPAddresses))
for i := range cert.IPAddresses {
ips[i] = cert.IPAddresses[i].String()
}
cfg.logger.Warn(
"rejected connection",
zap.String("remote-addr", remoteAddr),
zap.String("server-name", serverName),
zap.Strings("ip-addresses", ips),
zap.Strings("dns-names", cert.DNSNames),
zap.Error(err),
)
} else {
cfg.logger.Warn(
"rejected connection",
zap.String("remote-addr", remoteAddr),
zap.String("server-name", serverName),
zap.Error(err),
)
}
}
cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure
cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure
default:
return fmt.Errorf("unknown logger option %q", cfg.Logger)
}
return nil
} }
func ConfigFromFile(path string) (*Config, error) { func ConfigFromFile(path string) (*Config, error) {
@@ -382,7 +509,8 @@ func (cfg *configYAML) configFromFile(path string) error {
if cfg.LPUrlsJSON != "" { if cfg.LPUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ",")) u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ","))
if err != nil { if err != nil {
plog.Fatalf("unexpected error setting up listen-peer-urls: %v", err) fmt.Fprintf(os.Stderr, "unexpected error setting up listen-peer-urls: %v\n", err)
os.Exit(1)
} }
cfg.LPUrls = []url.URL(u) cfg.LPUrls = []url.URL(u)
} }
@@ -390,7 +518,8 @@ func (cfg *configYAML) configFromFile(path string) error {
if cfg.LCUrlsJSON != "" { if cfg.LCUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ",")) u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ","))
if err != nil { if err != nil {
plog.Fatalf("unexpected error setting up listen-client-urls: %v", err) fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-urls: %v\n", err)
os.Exit(1)
} }
cfg.LCUrls = []url.URL(u) cfg.LCUrls = []url.URL(u)
} }
@@ -398,7 +527,8 @@ func (cfg *configYAML) configFromFile(path string) error {
if cfg.APUrlsJSON != "" { if cfg.APUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ",")) u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ","))
if err != nil { if err != nil {
plog.Fatalf("unexpected error setting up initial-advertise-peer-urls: %v", err) fmt.Fprintf(os.Stderr, "unexpected error setting up initial-advertise-peer-urls: %v\n", err)
os.Exit(1)
} }
cfg.APUrls = []url.URL(u) cfg.APUrls = []url.URL(u)
} }
@@ -406,7 +536,8 @@ func (cfg *configYAML) configFromFile(path string) error {
if cfg.ACUrlsJSON != "" { if cfg.ACUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ",")) u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ","))
if err != nil { if err != nil {
plog.Fatalf("unexpected error setting up advertise-peer-urls: %v", err) fmt.Fprintf(os.Stderr, "unexpected error setting up advertise-peer-urls: %v\n", err)
os.Exit(1)
} }
cfg.ACUrls = []url.URL(u) cfg.ACUrls = []url.URL(u)
} }
@@ -414,7 +545,8 @@ func (cfg *configYAML) configFromFile(path string) error {
if cfg.ListenMetricsUrlsJSON != "" { if cfg.ListenMetricsUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.ListenMetricsUrlsJSON, ",")) u, err := types.NewURLs(strings.Split(cfg.ListenMetricsUrlsJSON, ","))
if err != nil { if err != nil {
plog.Fatalf("unexpected error setting up listen-metrics-urls: %v", err) fmt.Fprintf(os.Stderr, "unexpected error setting up listen-metrics-urls: %v\n", err)
os.Exit(1)
} }
cfg.ListenMetricsUrls = []url.URL(u) cfg.ListenMetricsUrls = []url.URL(u)
} }
@@ -453,6 +585,9 @@ func (cfg *configYAML) configFromFile(path string) error {
// Validate ensures that '*embed.Config' fields are properly configured. // Validate ensures that '*embed.Config' fields are properly configured.
func (cfg *Config) Validate() error { func (cfg *Config) Validate() error {
if err := cfg.setupLogging(); err != nil {
return err
}
if err := checkBindURLs(cfg.LPUrls); err != nil { if err := checkBindURLs(cfg.LPUrls); err != nil {
return err return err
} }
@@ -532,14 +667,22 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok
token = cfg.Durl token = cfg.Durl
case cfg.DNSCluster != "": case cfg.DNSCluster != "":
clusterStrs, cerr := cfg.GetDNSClusterNames() clusterStrs, cerr := cfg.GetDNSClusterNames()
lg := cfg.logger
if cerr != nil { if cerr != nil {
if lg != nil {
lg.Error("failed to resolve during SRV discovery", zap.Error(cerr))
} else {
plog.Errorf("couldn't resolve during SRV discovery (%v)", cerr) plog.Errorf("couldn't resolve during SRV discovery (%v)", cerr)
}
return nil, "", cerr return nil, "", cerr
} }
for _, s := range clusterStrs { for _, s := range clusterStrs {
if lg != nil {
lg.Info("got bootstrap from DNS for etcd-server", zap.String("node", s))
} else {
plog.Noticef("got bootstrap from DNS for etcd-server at %s", s) plog.Noticef("got bootstrap from DNS for etcd-server at %s", s)
} }
}
clusterStr := strings.Join(clusterStrs, ",") clusterStr := strings.Join(clusterStrs, ",")
if strings.Contains(clusterStr, "https://") && cfg.PeerTLSInfo.TrustedCAFile == "" { if strings.Contains(clusterStr, "https://") && cfg.PeerTLSInfo.TrustedCAFile == "" {
cfg.PeerTLSInfo.ServerName = cfg.DNSCluster cfg.PeerTLSInfo.ServerName = cfg.DNSCluster
@@ -612,11 +755,15 @@ func (cfg *Config) ClientSelfCert() (err error) {
for i, u := range cfg.LCUrls { for i, u := range cfg.LCUrls {
chosts[i] = u.Host chosts[i] = u.Host
} }
cfg.ClientTLSInfo, err = transport.SelfCert(cfg.Logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts) cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts)
return err return err
} else if cfg.ClientAutoTLS { } else if cfg.ClientAutoTLS {
if cfg.logger != nil {
cfg.logger.Warn("ignoring client auto TLS since certs given")
} else {
plog.Warningf("ignoring client auto TLS since certs given") plog.Warningf("ignoring client auto TLS since certs given")
} }
}
return nil return nil
} }
@@ -626,11 +773,15 @@ func (cfg *Config) PeerSelfCert() (err error) {
for i, u := range cfg.LPUrls { for i, u := range cfg.LPUrls {
phosts[i] = u.Host phosts[i] = u.Host
} }
cfg.PeerTLSInfo, err = transport.SelfCert(cfg.Logger, filepath.Join(cfg.Dir, "fixtures", "peer"), phosts) cfg.PeerTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "peer"), phosts)
return err return err
} else if cfg.PeerAutoTLS { } else if cfg.PeerAutoTLS {
if cfg.logger != nil {
cfg.logger.Warn("ignoring peer auto TLS since certs given")
} else {
plog.Warningf("ignoring peer auto TLS since certs given") plog.Warningf("ignoring peer auto TLS since certs given")
} }
}
return nil return nil
} }

View File

@@ -43,6 +43,7 @@ import (
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
"github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux" "github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
) )
@@ -124,7 +125,6 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
urlsmap types.URLsMap urlsmap types.URLsMap
token string token string
) )
memberInitialized := true memberInitialized := true
if !isMemberInitialized(cfg) { if !isMemberInitialized(cfg) {
memberInitialized = false memberInitialized = false
@@ -173,10 +173,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
PreVote: cfg.PreVote, PreVote: cfg.PreVote,
Logger: cfg.logger,
LoggerConfig: cfg.loggerConfig,
Debug: cfg.Debug, Debug: cfg.Debug,
ForceNewCluster: cfg.ForceNewCluster, ForceNewCluster: cfg.ForceNewCluster,
} }
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err return e, err
} }
@@ -187,16 +188,32 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ss = append(ss, v) ss = append(ss, v)
} }
sort.Strings(ss) sort.Strings(ss)
if e.cfg.logger != nil {
e.cfg.logger.Info(
"starting with CORS",
zap.String("server-id", e.Server.ID().String()),
zap.Strings("cors", ss),
)
} else {
plog.Infof("%s starting with cors %q", e.Server.ID(), ss) plog.Infof("%s starting with cors %q", e.Server.ID(), ss)
} }
}
if len(e.cfg.HostWhitelist) > 0 { if len(e.cfg.HostWhitelist) > 0 {
ss := make([]string, 0, len(e.cfg.HostWhitelist)) ss := make([]string, 0, len(e.cfg.HostWhitelist))
for v := range e.cfg.HostWhitelist { for v := range e.cfg.HostWhitelist {
ss = append(ss, v) ss = append(ss, v)
} }
sort.Strings(ss) sort.Strings(ss)
if e.cfg.logger != nil {
e.cfg.logger.Info(
"starting with host whitelist",
zap.String("server-id", e.Server.ID().String()),
zap.Strings("hosts", ss),
)
} else {
plog.Infof("%s starting with host whitelist %q", e.Server.ID(), ss) plog.Infof("%s starting with host whitelist %q", e.Server.ID(), ss)
} }
}
// buffer channel so goroutines on closed connections won't wait forever // buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs)) e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
@@ -321,11 +338,19 @@ func (e *Etcd) Err() <-chan error { return e.errc }
func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
if err = cfg.PeerSelfCert(); err != nil { if err = cfg.PeerSelfCert(); err != nil {
if cfg.logger != nil {
cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err))
} else {
plog.Fatalf("could not get certs (%v)", err) plog.Fatalf("could not get certs (%v)", err)
} }
}
if !cfg.PeerTLSInfo.Empty() { if !cfg.PeerTLSInfo.Empty() {
if cfg.logger != nil {
cfg.logger.Info("starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)))
} else {
plog.Infof("peerTLS: %s", cfg.PeerTLSInfo) plog.Infof("peerTLS: %s", cfg.PeerTLSInfo)
} }
}
peers = make([]*peerListener, len(cfg.LPUrls)) peers = make([]*peerListener, len(cfg.LPUrls))
defer func() { defer func() {
@@ -334,7 +359,11 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
} }
for i := range peers { for i := range peers {
if peers[i] != nil && peers[i].close != nil { if peers[i] != nil && peers[i].close != nil {
if cfg.logger != nil {
cfg.logger.Info("stopping listening for peers", zap.String("address", cfg.LPUrls[i].String()))
} else {
plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String()) plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
peers[i].close(ctx) peers[i].close(ctx)
cancel() cancel()
@@ -345,12 +374,20 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
for i, u := range cfg.LPUrls { for i, u := range cfg.LPUrls {
if u.Scheme == "http" { if u.Scheme == "http" {
if !cfg.PeerTLSInfo.Empty() { if !cfg.PeerTLSInfo.Empty() {
if cfg.logger != nil {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String()))
} else {
plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
} }
}
if cfg.PeerTLSInfo.ClientCertAuth { if cfg.PeerTLSInfo.ClientCertAuth {
if cfg.logger != nil {
cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String()))
} else {
plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
} }
} }
}
peers[i] = &peerListener{close: func(context.Context) error { return nil }} peers[i] = &peerListener{close: func(context.Context) error { return nil }}
peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo) peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
if err != nil { if err != nil {
@@ -360,8 +397,12 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
peers[i].close = func(context.Context) error { peers[i].close = func(context.Context) error {
return peers[i].Listener.Close() return peers[i].Listener.Close()
} }
if cfg.logger != nil {
cfg.logger.Info("listening for peers", zap.String("address", u.String()))
} else {
plog.Info("listening for peers on ", u.String()) plog.Info("listening for peers on ", u.String())
} }
}
return peers, nil return peers, nil
} }
@@ -406,24 +447,40 @@ func (e *Etcd) servePeers() (err error) {
func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
if err = cfg.ClientSelfCert(); err != nil { if err = cfg.ClientSelfCert(); err != nil {
if cfg.logger != nil {
cfg.logger.Fatal("failed to get client self-signed certs", zap.Error(err))
} else {
plog.Fatalf("could not get certs (%v)", err) plog.Fatalf("could not get certs (%v)", err)
} }
}
if cfg.EnablePprof { if cfg.EnablePprof {
if cfg.logger != nil {
cfg.logger.Info("pprof is enabled", zap.String("path", debugutil.HTTPPrefixPProf))
} else {
plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf) plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
} }
}
sctxs = make(map[string]*serveCtx) sctxs = make(map[string]*serveCtx)
for _, u := range cfg.LCUrls { for _, u := range cfg.LCUrls {
sctx := newServeCtx() sctx := newServeCtx(cfg.logger)
if u.Scheme == "http" || u.Scheme == "unix" { if u.Scheme == "http" || u.Scheme == "unix" {
if !cfg.ClientTLSInfo.Empty() { if !cfg.ClientTLSInfo.Empty() {
if cfg.logger != nil {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String()))
} else {
plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String()) plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
} }
}
if cfg.ClientTLSInfo.ClientCertAuth { if cfg.ClientTLSInfo.ClientCertAuth {
if cfg.logger != nil {
cfg.logger.Warn("scheme is HTTP while --client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("client-url", u.String()))
} else {
plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
} }
} }
}
if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() { if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String()) return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
} }
@@ -452,8 +509,16 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil { if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
if fdLimit <= reservedInternalFDNum { if fdLimit <= reservedInternalFDNum {
if cfg.logger != nil {
cfg.logger.Fatal(
"file descriptor limit of etcd process is too low; please set higher",
zap.Uint64("limit", fdLimit),
zap.Int("recommended-limit", reservedInternalFDNum),
)
} else {
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
} }
}
sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
} }
@@ -463,12 +528,20 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
} }
} }
if cfg.logger != nil {
cfg.logger.Info("listening for client requests", zap.String("host", u.Host))
} else {
plog.Info("listening for client requests on ", u.Host) plog.Info("listening for client requests on ", u.Host)
}
defer func() { defer func() {
if err != nil { if err != nil {
sctx.l.Close() sctx.l.Close()
if cfg.logger != nil {
cfg.logger.Info("stopping listening for client requests", zap.String("host", u.Host))
} else {
plog.Info("stopping listening for client requests on ", u.Host) plog.Info("stopping listening for client requests on ", u.Host)
} }
}
}() }()
for k := range cfg.UserHandlers { for k := range cfg.UserHandlers {
sctx.userHandlers[k] = cfg.UserHandlers[k] sctx.userHandlers[k] = cfg.UserHandlers[k]
@@ -487,8 +560,12 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
func (e *Etcd) serveClients() (err error) { func (e *Etcd) serveClients() (err error) {
if !e.cfg.ClientTLSInfo.Empty() { if !e.cfg.ClientTLSInfo.Empty() {
if e.cfg.logger != nil {
e.cfg.logger.Info("starting with client TLS", zap.String("tls-info", fmt.Sprintf("%+v", e.cfg.ClientTLSInfo)))
} else {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo) plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
} }
}
// Start a client server goroutine for each listen address // Start a client server goroutine for each listen address
var h http.Handler var h http.Handler
@@ -549,7 +626,11 @@ func (e *Etcd) serveMetrics() (err error) {
} }
e.metricsListeners = append(e.metricsListeners, ml) e.metricsListeners = append(e.metricsListeners, ml)
go func(u url.URL, ln net.Listener) { go func(u url.URL, ln net.Listener) {
if e.cfg.logger != nil {
e.cfg.logger.Info("listening for metrics", zap.String("url", u.String()))
} else {
plog.Info("listening for metrics on ", u.String()) plog.Info("listening for metrics on ", u.String())
}
e.errHandler(http.Serve(ln, metricsMux)) e.errHandler(http.Serve(ln, metricsMux))
}(murl, ml) }(murl, ml)
} }
@@ -569,6 +650,14 @@ func (e *Etcd) errHandler(err error) {
} }
} }
// GetLogger returns the logger.
func (e *Etcd) GetLogger() *zap.Logger {
e.cfg.loggerMu.RLock()
l := e.cfg.logger
e.cfg.loggerMu.RUnlock()
return l
}
func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) { func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
h, err := strconv.Atoi(retention) h, err := strconv.Atoi(retention)
if err == nil { if err == nil {

View File

@@ -40,12 +40,14 @@ import (
gw "github.com/grpc-ecosystem/grpc-gateway/runtime" gw "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/soheilhy/cmux" "github.com/soheilhy/cmux"
"github.com/tmc/grpc-websocket-proxy/wsproxy" "github.com/tmc/grpc-websocket-proxy/wsproxy"
"go.uber.org/zap"
"golang.org/x/net/trace" "golang.org/x/net/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
) )
type serveCtx struct { type serveCtx struct {
lg *zap.Logger
l net.Listener l net.Listener
addr string addr string
secure bool secure bool
@@ -65,9 +67,13 @@ type servers struct {
http *http.Server http *http.Server
} }
func newServeCtx() *serveCtx { func newServeCtx(lg *zap.Logger) *serveCtx {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler), return &serveCtx{
lg: lg,
ctx: ctx,
cancel: cancel,
userHandlers: make(map[string]http.Handler),
serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
} }
} }
@@ -83,7 +89,12 @@ func (sctx *serveCtx) serve(
gopts ...grpc.ServerOption) (err error) { gopts ...grpc.ServerOption) (err error) {
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
<-s.ReadyNotify() <-s.ReadyNotify()
if sctx.lg != nil {
sctx.lg.Info("ready to server client requests")
} else {
plog.Info("ready to serve client requests") plog.Info("ready to serve client requests")
}
m := cmux.New(sctx.l) m := cmux.New(sctx.l)
v3c := v3client.New(s) v3c := v3client.New(s)
@@ -116,15 +127,22 @@ func (sctx *serveCtx) serve(
httpmux := sctx.createMux(gwmux, handler) httpmux := sctx.createMux(gwmux, handler)
srvhttp := &http.Server{ srvhttp := &http.Server{
Handler: createAccessController(s, httpmux), Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error ErrorLog: logger, // do not log user error
} }
httpl := m.Match(cmux.HTTP1()) httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }() go func() { errHandler(srvhttp.Serve(httpl)) }()
sctx.serversC <- &servers{grpc: gs, http: srvhttp} sctx.serversC <- &servers{grpc: gs, http: srvhttp}
if sctx.lg != nil {
sctx.lg.Info(
"serving insecure client requests; this is strongly discouraged!",
zap.String("address", sctx.l.Addr().String()),
)
} else {
plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String()) plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
} }
}
if sctx.secure { if sctx.secure {
tlscfg, tlsErr := tlsinfo.ServerConfig() tlscfg, tlsErr := tlsinfo.ServerConfig()
@@ -159,15 +177,22 @@ func (sctx *serveCtx) serve(
httpmux := sctx.createMux(gwmux, handler) httpmux := sctx.createMux(gwmux, handler)
srv := &http.Server{ srv := &http.Server{
Handler: createAccessController(s, httpmux), Handler: createAccessController(sctx.lg, s, httpmux),
TLSConfig: tlscfg, TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error ErrorLog: logger, // do not log user error
} }
go func() { errHandler(srv.Serve(tlsl)) }() go func() { errHandler(srv.Serve(tlsl)) }()
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
if sctx.lg != nil {
sctx.lg.Info(
"serving client requests",
zap.String("address", sctx.l.Addr().String()),
)
} else {
plog.Infof("serving client requests on %s", sctx.l.Addr().String()) plog.Infof("serving client requests on %s", sctx.l.Addr().String())
} }
}
close(sctx.serversC) close(sctx.serversC)
return m.Serve() return m.Serve()
@@ -218,8 +243,16 @@ func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, err
go func() { go func() {
<-ctx.Done() <-ctx.Done()
if cerr := conn.Close(); cerr != nil { if cerr := conn.Close(); cerr != nil {
if sctx.lg != nil {
sctx.lg.Warn(
"failed to close connection",
zap.String("address", sctx.l.Addr().String()),
zap.Error(cerr),
)
} else {
plog.Warningf("failed to close conn to %s: %v", sctx.l.Addr().String(), cerr) plog.Warningf("failed to close conn to %s: %v", sctx.l.Addr().String(), cerr)
} }
}
}() }()
return gwmux, nil return gwmux, nil
@@ -254,11 +287,12 @@ func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.
// - mutate gRPC gateway request paths // - mutate gRPC gateway request paths
// - check hostname whitelist // - check hostname whitelist
// client HTTP requests goes here first // client HTTP requests goes here first
func createAccessController(s *etcdserver.EtcdServer, mux *http.ServeMux) http.Handler { func createAccessController(lg *zap.Logger, s *etcdserver.EtcdServer, mux *http.ServeMux) http.Handler {
return &accessController{s: s, mux: mux} return &accessController{lg: lg, s: s, mux: mux}
} }
type accessController struct { type accessController struct {
lg *zap.Logger
s *etcdserver.EtcdServer s *etcdserver.EtcdServer
mux *http.ServeMux mux *http.ServeMux
} }
@@ -272,7 +306,14 @@ func (ac *accessController) ServeHTTP(rw http.ResponseWriter, req *http.Request)
if req.TLS == nil { // check origin if client connection is not secure if req.TLS == nil { // check origin if client connection is not secure
host := httputil.GetHostname(req) host := httputil.GetHostname(req)
if !ac.s.AccessController.IsHostWhitelisted(host) { if !ac.s.AccessController.IsHostWhitelisted(host) {
if ac.lg != nil {
ac.lg.Warn(
"rejecting HTTP request to prevent DNS rebinding attacks",
zap.String("host", host),
)
} else {
plog.Warningf("rejecting HTTP request from %q to prevent DNS rebinding attacks", host) plog.Warningf("rejecting HTTP request from %q to prevent DNS rebinding attacks", host)
}
// TODO: use Go's "http.StatusMisdirectedRequest" (421) // TODO: use Go's "http.StatusMisdirectedRequest" (421)
// https://github.com/golang/go/commit/4b8a7eafef039af1834ef9bfa879257c4a72b7b5 // https://github.com/golang/go/commit/4b8a7eafef039af1834ef9bfa879257c4a72b7b5
http.Error(rw, errCVE20185702(host), 421) http.Error(rw, errCVE20185702(host), 421)
@@ -347,7 +388,11 @@ func (ch *corsHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) { func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) {
if sctx.userHandlers[s] != nil { if sctx.userHandlers[s] != nil {
if sctx.lg != nil {
sctx.lg.Warn("path is already registered by user handler", zap.String("path", s))
} else {
plog.Warningf("path %s already registered by user handler", s) plog.Warningf("path %s already registered by user handler", s)
}
return return
} }
sctx.userHandlers[s] = h sctx.userHandlers[s] = h

View File

@@ -25,6 +25,5 @@ func isMemberInitialized(cfg *Config) bool {
if waldir == "" { if waldir == "" {
waldir = filepath.Join(cfg.Dir, "member", "wal") waldir = filepath.Join(cfg.Dir, "member", "wal")
} }
return wal.Exist(waldir) return wal.Exist(waldir)
} }

View File

@@ -20,6 +20,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"net/url" "net/url"
"os" "os"
"runtime" "runtime"
@@ -31,6 +32,7 @@ import (
"github.com/coreos/etcd/version" "github.com/coreos/etcd/version"
"github.com/ghodss/yaml" "github.com/ghodss/yaml"
"go.uber.org/zap"
) )
var ( var (
@@ -213,9 +215,10 @@ func newConfig() *config {
fs.Var(flags.NewUniqueStringsValue("*"), "host-whitelist", "Comma-separated acceptable hostnames from HTTP client requests, if server is not secure (empty means allow all).") fs.Var(flags.NewUniqueStringsValue("*"), "host-whitelist", "Comma-separated acceptable hostnames from HTTP client requests, if server is not secure (empty means allow all).")
// logging // logging
fs.BoolVar(&cfg.ec.Debug, "debug", false, "Enable debug-level logging for etcd.") fs.StringVar(&cfg.ec.Logger, "logger", "capnslog", "Specify 'zap' for structured logging or 'capnslog'.")
fs.StringVar(&cfg.ec.LogPkgLevels, "log-package-levels", "", "Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG').")
fs.StringVar(&cfg.ec.LogOutput, "log-output", embed.DefaultLogOutput, "Specify 'stdout' or 'stderr' to skip journald logging even when running under systemd.") fs.StringVar(&cfg.ec.LogOutput, "log-output", embed.DefaultLogOutput, "Specify 'stdout' or 'stderr' to skip journald logging even when running under systemd.")
fs.BoolVar(&cfg.ec.Debug, "debug", false, "Enable debug-level logging for etcd.")
fs.StringVar(&cfg.ec.LogPkgLevels, "log-package-levels", "", "(To be deprecated) Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG').")
// version // version
fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.") fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")
@@ -271,18 +274,26 @@ func (cfg *config) parse(arguments []string) error {
var err error var err error
if cfg.configFile != "" { if cfg.configFile != "" {
plog.Infof("Loading server configuration from %q. Other configuration command line flags and environment variables will be ignored if provided.", cfg.configFile)
err = cfg.configFromFile(cfg.configFile) err = cfg.configFromFile(cfg.configFile)
if lg := cfg.ec.GetLogger(); lg != nil {
lg.Info(
"loaded server configuraionl, other configuration command line flags and environment variables will be ignored if provided",
zap.String("path", cfg.configFile),
)
} else {
plog.Infof("Loading server configuration from %q. Other configuration command line flags and environment variables will be ignored if provided.", cfg.configFile)
}
} else { } else {
err = cfg.configFromCmdLine() err = cfg.configFromCmdLine()
} }
// now logger is set up
return err return err
} }
func (cfg *config) configFromCmdLine() error { func (cfg *config) configFromCmdLine() error {
err := flags.SetFlagsFromEnv("ETCD", cfg.cf.flagSet) err := flags.SetFlagsFromEnv("ETCD", cfg.cf.flagSet)
if err != nil { if err != nil {
plog.Fatalf("%v", err) return err
} }
cfg.ec.LPUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") cfg.ec.LPUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls")
@@ -331,21 +342,21 @@ func (cfg *config) configFromFile(path string) error {
if cfg.ec.ListenMetricsUrlsJSON != "" { if cfg.ec.ListenMetricsUrlsJSON != "" {
us, err := types.NewURLs(strings.Split(cfg.ec.ListenMetricsUrlsJSON, ",")) us, err := types.NewURLs(strings.Split(cfg.ec.ListenMetricsUrlsJSON, ","))
if err != nil { if err != nil {
plog.Panicf("unexpected error setting up listen-metrics-urls: %v", err) log.Fatalf("unexpected error setting up listen-metrics-urls: %v", err)
} }
cfg.ec.ListenMetricsUrls = []url.URL(us) cfg.ec.ListenMetricsUrls = []url.URL(us)
} }
if cfg.cp.FallbackJSON != "" { if cfg.cp.FallbackJSON != "" {
if err := cfg.cf.fallback.Set(cfg.cp.FallbackJSON); err != nil { if err := cfg.cf.fallback.Set(cfg.cp.FallbackJSON); err != nil {
plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err) log.Fatalf("unexpected error setting up discovery-fallback flag: %v", err)
} }
cfg.cp.Fallback = cfg.cf.fallback.String() cfg.cp.Fallback = cfg.cf.fallback.String()
} }
if cfg.cp.ProxyJSON != "" { if cfg.cp.ProxyJSON != "" {
if err := cfg.cf.proxy.Set(cfg.cp.ProxyJSON); err != nil { if err := cfg.cf.proxy.Set(cfg.cp.ProxyJSON); err != nil {
plog.Panicf("unexpected error setting up proxyFlag: %v", err) log.Fatalf("unexpected error setting up proxyFlag: %v", err)
} }
cfg.cp.Proxy = cfg.cf.proxy.String() cfg.cp.Proxy = cfg.cf.proxy.String()
} }

View File

@@ -39,6 +39,7 @@ import (
"github.com/coreos/etcd/version" "github.com/coreos/etcd/version"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
"go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@@ -60,42 +61,86 @@ func startEtcdOrProxyV2() {
err := cfg.parse(os.Args[1:]) err := cfg.parse(os.Args[1:])
if err != nil { if err != nil {
lg := cfg.ec.GetLogger()
if lg != nil {
lg.Error("failed to verify flags", zap.Error(err))
} else {
plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err) plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err)
}
switch err { switch err {
case embed.ErrUnsetAdvertiseClientURLsFlag: case embed.ErrUnsetAdvertiseClientURLsFlag:
if lg != nil {
lg.Error("advertise client URLs are not set", zap.Error(err))
} else {
plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.") plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
} }
}
os.Exit(1) os.Exit(1)
} }
cfg.ec.SetupLogging()
var stopped <-chan struct{} maxProcs, cpus := runtime.GOMAXPROCS(0), runtime.NumCPU()
var errc <-chan error
lg := cfg.ec.GetLogger()
if lg != nil {
lg.Info(
"starting etcd",
zap.String("etcd-version", version.Version),
zap.String("git-sha", version.GitSHA),
zap.String("go-version", runtime.Version()),
zap.String("go-os", runtime.GOOS),
zap.String("go-arch", runtime.GOARCH),
zap.Int("max-cpu-set", maxProcs),
zap.Int("max-cpu-available", cpus),
)
} else {
plog.Infof("etcd Version: %s\n", version.Version) plog.Infof("etcd Version: %s\n", version.Version)
plog.Infof("Git SHA: %s\n", version.GitSHA) plog.Infof("Git SHA: %s\n", version.GitSHA)
plog.Infof("Go Version: %s\n", runtime.Version()) plog.Infof("Go Version: %s\n", runtime.Version())
plog.Infof("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH) plog.Infof("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", maxProcs, cpus)
GoMaxProcs := runtime.GOMAXPROCS(0) }
plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU())
defaultHost, dhErr := (&cfg.ec).UpdateDefaultClusterFromName(defaultInitialCluster) defaultHost, dhErr := (&cfg.ec).UpdateDefaultClusterFromName(defaultInitialCluster)
if defaultHost != "" { if defaultHost != "" {
if lg != nil {
lg.Info(
"detected default host for advertise",
zap.String("host", defaultHost),
)
} else {
plog.Infof("advertising using detected default host %q", defaultHost) plog.Infof("advertising using detected default host %q", defaultHost)
} }
}
if dhErr != nil { if dhErr != nil {
if lg != nil {
lg.Info("failed to detect default host", zap.Error(dhErr))
} else {
plog.Noticef("failed to detect default host (%v)", dhErr) plog.Noticef("failed to detect default host (%v)", dhErr)
} }
}
if cfg.ec.Dir == "" { if cfg.ec.Dir == "" {
cfg.ec.Dir = fmt.Sprintf("%v.etcd", cfg.ec.Name) cfg.ec.Dir = fmt.Sprintf("%v.etcd", cfg.ec.Name)
if lg != nil {
lg.Warn(
"'data-dir' was empty; using default",
zap.String("data-dir", cfg.ec.Dir),
)
} else {
plog.Warningf("no data-dir provided, using default data-dir ./%s", cfg.ec.Dir) plog.Warningf("no data-dir provided, using default data-dir ./%s", cfg.ec.Dir)
} }
}
which := identifyDataDirOrDie(cfg.ec.Dir) var stopped <-chan struct{}
var errc <-chan error
which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
if which != dirEmpty { if which != dirEmpty {
if lg != nil {
} else {
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which) plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
}
switch which { switch which {
case dirMember: case dirMember:
stopped, errc, err = startEtcd(&cfg.ec) stopped, errc, err = startEtcd(&cfg.ec)
@@ -110,7 +155,11 @@ func startEtcdOrProxyV2() {
stopped, errc, err = startEtcd(&cfg.ec) stopped, errc, err = startEtcd(&cfg.ec)
if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster { if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster {
if cfg.shouldFallbackToProxy() { if cfg.shouldFallbackToProxy() {
if lg != nil {
} else {
plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy) plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
}
shouldProxy = true shouldProxy = true
} }
} }
@@ -124,37 +173,91 @@ func startEtcdOrProxyV2() {
if derr, ok := err.(*etcdserver.DiscoveryError); ok { if derr, ok := err.(*etcdserver.DiscoveryError); ok {
switch derr.Err { switch derr.Err {
case discovery.ErrDuplicateID: case discovery.ErrDuplicateID:
if lg != nil {
lg.Error(
"member has been registered with discovery service",
zap.String("name", cfg.ec.Name),
zap.String("discovery-token", cfg.ec.Durl),
zap.Error(derr.Err),
)
lg.Error(
"but could not find valid cluster configuration",
zap.String("data-dir", cfg.ec.Dir),
)
lg.Warn("check data dir if previous bootstrap succeeded")
lg.Warn("or use a new discovery token if previous bootstrap failed")
} else {
plog.Errorf("member %q has previously registered with discovery service token (%s).", cfg.ec.Name, cfg.ec.Durl) plog.Errorf("member %q has previously registered with discovery service token (%s).", cfg.ec.Name, cfg.ec.Durl)
plog.Errorf("But etcd could not find valid cluster configuration in the given data dir (%s).", cfg.ec.Dir) plog.Errorf("But etcd could not find valid cluster configuration in the given data dir (%s).", cfg.ec.Dir)
plog.Infof("Please check the given data dir path if the previous bootstrap succeeded") plog.Infof("Please check the given data dir path if the previous bootstrap succeeded")
plog.Infof("or use a new discovery token if the previous bootstrap failed.") plog.Infof("or use a new discovery token if the previous bootstrap failed.")
}
case discovery.ErrDuplicateName: case discovery.ErrDuplicateName:
if lg != nil {
lg.Error(
"member with duplicated name has already been registered",
zap.String("discovery-token", cfg.ec.Durl),
zap.Error(derr.Err),
)
lg.Warn("cURL the discovery token URL for details")
lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
} else {
plog.Errorf("member with duplicated name has registered with discovery service token(%s).", cfg.ec.Durl) plog.Errorf("member with duplicated name has registered with discovery service token(%s).", cfg.ec.Durl)
plog.Errorf("please check (cURL) the discovery token for more information.") plog.Errorf("please check (cURL) the discovery token for more information.")
plog.Errorf("please do not reuse the discovery token and generate a new one to bootstrap the cluster.") plog.Errorf("please do not reuse the discovery token and generate a new one to bootstrap the cluster.")
}
default: default:
if lg != nil {
lg.Error(
"failed to bootstrap; discovery token was already used",
zap.String("discovery-token", cfg.ec.Durl),
zap.Error(err),
)
lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
} else {
plog.Errorf("%v", err) plog.Errorf("%v", err)
plog.Infof("discovery token %s was used, but failed to bootstrap the cluster.", cfg.ec.Durl) plog.Infof("discovery token %s was used, but failed to bootstrap the cluster.", cfg.ec.Durl)
plog.Infof("please generate a new discovery token and try to bootstrap again.") plog.Infof("please generate a new discovery token and try to bootstrap again.")
} }
}
os.Exit(1) os.Exit(1)
} }
if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") { if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
if lg != nil {
lg.Error("failed to start", zap.Error(err))
} else {
plog.Infof("%v", err) plog.Infof("%v", err)
}
if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) { if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) {
if lg != nil {
lg.Warn("forgot to set --initial-cluster?")
} else {
plog.Infof("forgot to set --initial-cluster flag?") plog.Infof("forgot to set --initial-cluster flag?")
} }
}
if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs { if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
if lg != nil {
lg.Warn("forgot to set --initial-advertise-peer-urls?")
} else {
plog.Infof("forgot to set --initial-advertise-peer-urls flag?") plog.Infof("forgot to set --initial-advertise-peer-urls flag?")
} }
}
if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 { if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 {
if lg != nil {
lg.Warn("--discovery flag is not set")
} else {
plog.Infof("if you want to use discovery service, please set --discovery flag.") plog.Infof("if you want to use discovery service, please set --discovery flag.")
} }
}
os.Exit(1) os.Exit(1)
} }
if lg != nil {
lg.Fatal("discovery failed", zap.Error(err))
} else {
plog.Fatalf("%v", err) plog.Fatalf("%v", err)
} }
}
osutil.HandleInterrupts() osutil.HandleInterrupts()
@@ -163,12 +266,16 @@ func startEtcdOrProxyV2() {
// for accepting connections. The etcd instance should be // for accepting connections. The etcd instance should be
// joined with the cluster and ready to serve incoming // joined with the cluster and ready to serve incoming
// connections. // connections.
notifySystemd() notifySystemd(lg)
select { select {
case lerr := <-errc: case lerr := <-errc:
// fatal out on listener errors // fatal out on listener errors
if lg != nil {
lg.Fatal("listener failed", zap.Error(err))
} else {
plog.Fatal(lerr) plog.Fatal(lerr)
}
case <-stopped: case <-stopped:
} }
@@ -191,7 +298,12 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
func startProxy(cfg *config) error { func startProxy(cfg *config) error {
lg := cfg.ec.GetLogger()
if lg != nil {
lg.Info("v2 API proxy starting")
} else {
plog.Notice("proxy: this proxy supports v2 API only!") plog.Notice("proxy: this proxy supports v2 API only!")
}
clientTLSInfo := cfg.ec.ClientTLSInfo clientTLSInfo := cfg.ec.ClientTLSInfo
if clientTLSInfo.Empty() { if clientTLSInfo.Empty() {
@@ -209,8 +321,12 @@ func startProxy(cfg *config) error {
pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost
if err = cfg.ec.PeerSelfCert(); err != nil { if err = cfg.ec.PeerSelfCert(); err != nil {
if lg != nil {
lg.Fatal("failed to get self-signed certs for peer", zap.Error(err))
} else {
plog.Fatalf("could not get certs (%v)", err) plog.Fatalf("could not get certs (%v)", err)
} }
}
tr, err := transport.NewTimeoutTransport(cfg.ec.PeerTLSInfo, time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond) tr, err := transport.NewTimeoutTransport(cfg.ec.PeerTLSInfo, time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond)
if err != nil { if err != nil {
return err return err
@@ -229,18 +345,40 @@ func startProxy(cfg *config) error {
switch { switch {
case err == nil: case err == nil:
if cfg.ec.Durl != "" { if cfg.ec.Durl != "" {
if lg != nil {
lg.Warn(
"discovery token ignored since the proxy has already been initialized; valid cluster file found",
zap.String("cluster-file", clusterfile),
)
} else {
plog.Warningf("discovery token ignored since the proxy has already been initialized. Valid cluster file found at %q", clusterfile) plog.Warningf("discovery token ignored since the proxy has already been initialized. Valid cluster file found at %q", clusterfile)
} }
}
if cfg.ec.DNSCluster != "" { if cfg.ec.DNSCluster != "" {
if lg != nil {
lg.Warn(
"DNS SRV discovery ignored since the proxy has already been initialized; valid cluster file found",
zap.String("cluster-file", clusterfile),
)
} else {
plog.Warningf("DNS SRV discovery ignored since the proxy has already been initialized. Valid cluster file found at %q", clusterfile) plog.Warningf("DNS SRV discovery ignored since the proxy has already been initialized. Valid cluster file found at %q", clusterfile)
} }
}
urls := struct{ PeerURLs []string }{} urls := struct{ PeerURLs []string }{}
err = json.Unmarshal(b, &urls) err = json.Unmarshal(b, &urls)
if err != nil { if err != nil {
return err return err
} }
peerURLs = urls.PeerURLs peerURLs = urls.PeerURLs
if lg != nil {
lg.Info(
"proxy using peer URLS from cluster file",
zap.Strings("peer-urls", peerURLs),
zap.String("cluster-file", clusterfile),
)
} else {
plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile) plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile)
}
case os.IsNotExist(err): case os.IsNotExist(err):
var urlsmap types.URLsMap var urlsmap types.URLsMap
urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy") urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy")
@@ -259,7 +397,11 @@ func startProxy(cfg *config) error {
} }
} }
peerURLs = urlsmap.URLs() peerURLs = urlsmap.URLs()
if lg != nil {
lg.Info("proxy using peer URLS", zap.Strings("peer-urls", peerURLs))
} else {
plog.Infof("proxy: using peer urls %v ", peerURLs) plog.Infof("proxy: using peer urls %v ", peerURLs)
}
default: default:
return err return err
} }
@@ -267,34 +409,64 @@ func startProxy(cfg *config) error {
clientURLs := []string{} clientURLs := []string{}
uf := func() []string { uf := func() []string {
gcls, gerr := etcdserver.GetClusterFromRemotePeers(peerURLs, tr) gcls, gerr := etcdserver.GetClusterFromRemotePeers(peerURLs, tr)
if gerr != nil { if gerr != nil {
if lg != nil {
lg.Warn(
"failed to get cluster from remote peers",
zap.Strings("peer-urls", peerURLs),
zap.Error(gerr),
)
} else {
plog.Warningf("proxy: %v", gerr) plog.Warningf("proxy: %v", gerr)
}
return []string{} return []string{}
} }
clientURLs = gcls.ClientURLs() clientURLs = gcls.ClientURLs()
urls := struct{ PeerURLs []string }{gcls.PeerURLs()} urls := struct{ PeerURLs []string }{gcls.PeerURLs()}
b, jerr := json.Marshal(urls) b, jerr := json.Marshal(urls)
if jerr != nil { if jerr != nil {
if lg != nil {
lg.Warn("proxy failed to marshal peer URLs", zap.Error(jerr))
} else {
plog.Warningf("proxy: error on marshal peer urls %s", jerr) plog.Warningf("proxy: error on marshal peer urls %s", jerr)
}
return clientURLs return clientURLs
} }
err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600) err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600)
if err != nil { if err != nil {
if lg != nil {
lg.Warn("proxy failed to write cluster file", zap.Error(err))
} else {
plog.Warningf("proxy: error on writing urls %s", err) plog.Warningf("proxy: error on writing urls %s", err)
}
return clientURLs return clientURLs
} }
err = os.Rename(clusterfile+".bak", clusterfile) err = os.Rename(clusterfile+".bak", clusterfile)
if err != nil { if err != nil {
if lg != nil {
lg.Warn(
"proxy failed to rename cluster file",
zap.String("path", clusterfile),
zap.Error(err),
)
} else {
plog.Warningf("proxy: error on updating clusterfile %s", err) plog.Warningf("proxy: error on updating clusterfile %s", err)
}
return clientURLs return clientURLs
} }
if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) { if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) {
if lg != nil {
lg.Info(
"proxy updated peer URLs",
zap.Strings("from", peerURLs),
zap.Strings("to", gcls.PeerURLs()),
)
} else {
plog.Noticef("proxy: updated peer urls in cluster file from %v to %v", peerURLs, gcls.PeerURLs()) plog.Noticef("proxy: updated peer urls in cluster file from %v to %v", peerURLs, gcls.PeerURLs())
} }
}
peerURLs = gcls.PeerURLs() peerURLs = gcls.PeerURLs()
return clientURLs return clientURLs
@@ -318,11 +490,15 @@ func startProxy(cfg *config) error {
} }
listenerTLS := cfg.ec.ClientTLSInfo listenerTLS := cfg.ec.ClientTLSInfo
if cfg.ec.ClientAutoTLS && cTLS { if cfg.ec.ClientAutoTLS && cTLS {
listenerTLS, err = transport.SelfCert(cfg.ec.Logger, filepath.Join(cfg.ec.Dir, "clientCerts"), cHosts) listenerTLS, err = transport.SelfCert(cfg.ec.GetLogger(), filepath.Join(cfg.ec.Dir, "clientCerts"), cHosts)
if err != nil { if err != nil {
if lg != nil {
lg.Fatal("failed to initialize self-signed client cert", zap.Error(err))
} else {
plog.Fatalf("proxy: could not initialize self-signed client certs (%v)", err) plog.Fatalf("proxy: could not initialize self-signed client certs (%v)", err)
} }
} }
}
// Start a proxy server goroutine for each listen address // Start a proxy server goroutine for each listen address
for _, u := range cfg.ec.LCUrls { for _, u := range cfg.ec.LCUrls {
@@ -333,7 +509,11 @@ func startProxy(cfg *config) error {
host := u.String() host := u.String()
go func() { go func() {
if lg != nil {
lg.Info("proxy started listening on client requests", zap.String("host", host))
} else {
plog.Info("proxy: listening for client requests on ", host) plog.Info("proxy: listening for client requests on ", host)
}
mux := http.NewServeMux() mux := http.NewServeMux()
etcdhttp.HandlePrometheus(mux) // v2 proxy just uses the same port etcdhttp.HandlePrometheus(mux) // v2 proxy just uses the same port
mux.Handle("/", ph) mux.Handle("/", ph)
@@ -345,14 +525,18 @@ func startProxy(cfg *config) error {
// identifyDataDirOrDie returns the type of the data dir. // identifyDataDirOrDie returns the type of the data dir.
// Dies if the datadir is invalid. // Dies if the datadir is invalid.
func identifyDataDirOrDie(dir string) dirType { func identifyDataDirOrDie(lg *zap.Logger, dir string) dirType {
names, err := fileutil.ReadDir(dir) names, err := fileutil.ReadDir(dir)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return dirEmpty return dirEmpty
} }
if lg != nil {
lg.Fatal("failed to list data directory", zap.String("dir", dir), zap.Error(err))
} else {
plog.Fatalf("error listing data dir: %s", dir) plog.Fatalf("error listing data dir: %s", dir)
} }
}
var m, p bool var m, p bool
for _, name := range names { for _, name := range names {
@@ -362,13 +546,25 @@ func identifyDataDirOrDie(dir string) dirType {
case dirProxy: case dirProxy:
p = true p = true
default: default:
if lg != nil {
lg.Warn(
"found invalid file under data directory",
zap.String("filename", name),
zap.String("data-dir", dir),
)
} else {
plog.Warningf("found invalid file/dir %s under data dir %s (Ignore this if you are upgrading etcd)", name, dir) plog.Warningf("found invalid file/dir %s under data dir %s (Ignore this if you are upgrading etcd)", name, dir)
} }
} }
}
if m && p { if m && p {
if lg != nil {
lg.Fatal("invalid datadir; both member and proxy directories exist")
} else {
plog.Fatal("invalid datadir. Both member and proxy directories exist.") plog.Fatal("invalid datadir. Both member and proxy directories exist.")
} }
}
if m { if m {
return dirMember return dirMember
} }
@@ -387,9 +583,10 @@ func checkSupportArch() {
// so unset here to not parse through flag // so unset here to not parse through flag
defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH") defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH { if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
plog.Warningf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set", env) fmt.Printf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set\n", env)
return return
} }
plog.Errorf("etcd on unsupported platform without ETCD_UNSUPPORTED_ARCH=%s set.", runtime.GOARCH)
fmt.Printf("etcd on unsupported platform without ETCD_UNSUPPORTED_ARCH=%s set\n", runtime.GOARCH)
os.Exit(1) os.Exit(1)
} }

View File

@@ -21,6 +21,8 @@ import (
"os" "os"
"time" "time"
"go.uber.org/zap"
"github.com/coreos/etcd/proxy/tcpproxy" "github.com/coreos/etcd/proxy/tcpproxy"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@@ -79,16 +81,12 @@ func newGatewayStartCommand() *cobra.Command {
func stripSchema(eps []string) []string { func stripSchema(eps []string) []string {
var endpoints []string var endpoints []string
for _, ep := range eps { for _, ep := range eps {
if u, err := url.Parse(ep); err == nil && u.Host != "" { if u, err := url.Parse(ep); err == nil && u.Host != "" {
ep = u.Host ep = u.Host
} }
endpoints = append(endpoints, ep) endpoints = append(endpoints, ep)
} }
return endpoints return endpoints
} }
@@ -104,7 +102,8 @@ func startGateway(cmd *cobra.Command, args []string) {
for _, ep := range srvs.Endpoints { for _, ep := range srvs.Endpoints {
h, p, err := net.SplitHostPort(ep) h, p, err := net.SplitHostPort(ep)
if err != nil { if err != nil {
plog.Fatalf("error parsing endpoint %q", ep) fmt.Printf("error parsing endpoint %q", ep)
os.Exit(1)
} }
var port uint16 var port uint16
fmt.Sscanf(p, "%d", &port) fmt.Sscanf(p, "%d", &port)
@@ -113,23 +112,33 @@ func startGateway(cmd *cobra.Command, args []string) {
} }
if len(srvs.Endpoints) == 0 { if len(srvs.Endpoints) == 0 {
plog.Fatalf("no endpoints found") fmt.Println("no endpoints found")
os.Exit(1)
} }
l, err := net.Listen("tcp", gatewayListenAddr) var lg *zap.Logger
lg, err := zap.NewProduction()
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
var l net.Listener
l, err = net.Listen("tcp", gatewayListenAddr)
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
os.Exit(1) os.Exit(1)
} }
tp := tcpproxy.TCPProxy{ tp := tcpproxy.TCPProxy{
Logger: lg,
Listener: l, Listener: l,
Endpoints: srvs.SRVs, Endpoints: srvs.SRVs,
MonitorInterval: getewayRetryDelay, MonitorInterval: getewayRetryDelay,
} }
// At this point, etcd gateway listener is initialized // At this point, etcd gateway listener is initialized
notifySystemd() notifySystemd(lg)
tp.Run() tp.Run()
} }

View File

@@ -17,7 +17,7 @@ package etcdmain
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil" "log"
"math" "math"
"net" "net"
"net/http" "net/http"
@@ -35,10 +35,10 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb" "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/debugutil" "github.com/coreos/etcd/pkg/debugutil"
"github.com/coreos/etcd/pkg/logutil"
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/proxy/grpcproxy" "github.com/coreos/etcd/proxy/grpcproxy"
"github.com/coreos/pkg/capnslog"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux" "github.com/soheilhy/cmux"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@@ -148,61 +148,75 @@ func newGRPCProxyStartCommand() *cobra.Command {
func startGRPCProxy(cmd *cobra.Command, args []string) { func startGRPCProxy(cmd *cobra.Command, args []string) {
checkArgs() checkArgs()
capnslog.SetGlobalLogLevel(capnslog.INFO) lcfg := zap.Config{
if grpcProxyDebug { Level: zap.NewAtomicLevelAt(zap.InfoLevel),
capnslog.SetGlobalLogLevel(capnslog.DEBUG) Development: false,
grpc.EnableTracing = true Sampling: &zap.SamplingConfig{
// enable info, warning, error Initial: 100,
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) Thereafter: 100,
} else { },
// only discard info Encoding: "json",
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) EncoderConfig: zap.NewProductionEncoderConfig(),
} }
if grpcProxyDebug {
lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
grpc.EnableTracing = true
}
lg, err := lcfg.Build()
if err != nil {
log.Fatal(err)
}
defer lg.Sync()
var gl grpclog.LoggerV2
gl, err = logutil.NewGRPCLoggerV2(lcfg)
if err != nil {
log.Fatal(err)
}
grpclog.SetLoggerV2(gl)
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey) tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
if tlsinfo == nil && grpcProxyListenAutoTLS { if tlsinfo == nil && grpcProxyListenAutoTLS {
host := []string{"https://" + grpcProxyListenAddr} host := []string{"https://" + grpcProxyListenAddr}
dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy") dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
lg, _ := zap.NewProduction()
if grpcProxyDebug {
lg = zap.NewExample()
}
autoTLS, err := transport.SelfCert(lg, dir, host) autoTLS, err := transport.SelfCert(lg, dir, host)
if err != nil { if err != nil {
plog.Fatal(err) log.Fatal(err)
} }
tlsinfo = &autoTLS tlsinfo = &autoTLS
} }
if tlsinfo != nil { if tlsinfo != nil {
plog.Infof("ServerTLS: %s", tlsinfo) lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsinfo)))
} }
m := mustListenCMux(tlsinfo) m := mustListenCMux(lg, tlsinfo)
grpcl := m.Match(cmux.HTTP2()) grpcl := m.Match(cmux.HTTP2())
defer func() { defer func() {
grpcl.Close() grpcl.Close()
plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr) lg.Info("stopping listening gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
}() }()
client := mustNewClient() client := mustNewClient(lg)
srvhttp, httpl := mustHTTPListener(m, tlsinfo, client) srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client)
errc := make(chan error) errc := make(chan error)
go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }() go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
go func() { errc <- srvhttp.Serve(httpl) }() go func() { errc <- srvhttp.Serve(httpl) }()
go func() { errc <- m.Serve() }() go func() { errc <- m.Serve() }()
if len(grpcProxyMetricsListenAddr) > 0 { if len(grpcProxyMetricsListenAddr) > 0 {
mhttpl := mustMetricsListener(tlsinfo) mhttpl := mustMetricsListener(lg, tlsinfo)
go func() { go func() {
mux := http.NewServeMux() mux := http.NewServeMux()
etcdhttp.HandlePrometheus(mux) etcdhttp.HandlePrometheus(mux)
grpcproxy.HandleHealth(mux, client) grpcproxy.HandleHealth(mux, client)
plog.Fatal(http.Serve(mhttpl, mux)) herr := http.Serve(mhttpl, mux)
lg.Fatal("gRPC proxy server serve returned", zap.Error(herr))
}() }()
} }
// grpc-proxy is initialized, ready to serve // grpc-proxy is initialized, ready to serve
notifySystemd() notifySystemd(lg)
fmt.Fprintln(os.Stderr, <-errc) fmt.Fprintln(os.Stderr, <-errc)
os.Exit(1) os.Exit(1)
@@ -223,13 +237,13 @@ func checkArgs() {
} }
} }
func mustNewClient() *clientv3.Client { func mustNewClient(lg *zap.Logger) *clientv3.Client {
srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery) srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
eps := srvs.Endpoints eps := srvs.Endpoints
if len(eps) == 0 { if len(eps) == 0 {
eps = grpcProxyEndpoints eps = grpcProxyEndpoints
} }
cfg, err := newClientCfg(eps) cfg, err := newClientCfg(lg, eps)
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
os.Exit(1) os.Exit(1)
@@ -246,7 +260,7 @@ func mustNewClient() *clientv3.Client {
return client return client
} }
func newClientCfg(eps []string) (*clientv3.Config, error) { func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) {
// set tls if any one tls option set // set tls if any one tls option set
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: eps, Endpoints: eps,
@@ -271,7 +285,7 @@ func newClientCfg(eps []string) (*clientv3.Config, error) {
} }
clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
cfg.TLS = clientTLS cfg.TLS = clientTLS
plog.Infof("ClientTLS: %s", tls) lg.Info("gRPC proxy client TLS", zap.String("tls-info", fmt.Sprintf("%+v", tls)))
} }
return &cfg, nil return &cfg, nil
} }
@@ -283,7 +297,7 @@ func newTLS(ca, cert, key string) *transport.TLSInfo {
return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key} return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key}
} }
func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux { func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
l, err := net.Listen("tcp", grpcProxyListenAddr) l, err := net.Listen("tcp", grpcProxyListenAddr)
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
@@ -297,25 +311,25 @@ func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
if tlsinfo != nil { if tlsinfo != nil {
tlsinfo.CRLFile = grpcProxyListenCRL tlsinfo.CRLFile = grpcProxyListenCRL
if l, err = transport.NewTLSListener(l, tlsinfo); err != nil { if l, err = transport.NewTLSListener(l, tlsinfo); err != nil {
plog.Fatal(err) lg.Fatal("failed to create TLS listener", zap.Error(err))
} }
} }
plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr) lg.Info("listening for gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
return cmux.New(l) return cmux.New(l)
} }
func newGRPCProxyServer(client *clientv3.Client) *grpc.Server { func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
if grpcProxyEnableOrdering { if grpcProxyEnableOrdering {
vf := ordering.NewOrderViolationSwitchEndpointClosure(*client) vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
client.KV = ordering.NewKV(client.KV, vf) client.KV = ordering.NewKV(client.KV, vf)
plog.Infof("waiting for linearized read from cluster to recover ordering") lg.Info("waiting for linearized read from cluster to recover ordering")
for { for {
_, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly()) _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
if err == nil { if err == nil {
break break
} }
plog.Warningf("ordering recovery failed, retrying in 1s (%v)", err) lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err))
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} }
@@ -363,7 +377,7 @@ func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
return server return server
} }
func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) { func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) {
httpmux := http.NewServeMux() httpmux := http.NewServeMux()
httpmux.HandleFunc("/", http.NotFound) httpmux.HandleFunc("/", http.NotFound)
etcdhttp.HandlePrometheus(httpmux) etcdhttp.HandlePrometheus(httpmux)
@@ -372,7 +386,7 @@ func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Clien
for p, h := range debugutil.PProfHandlers() { for p, h := range debugutil.PProfHandlers() {
httpmux.Handle(p, h) httpmux.Handle(p, h)
} }
plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf) lg.Info("gRPC proxy enabled pprof", zap.String("path", debugutil.HTTPPrefixPProf))
} }
srvhttp := &http.Server{Handler: httpmux} srvhttp := &http.Server{Handler: httpmux}
@@ -382,13 +396,13 @@ func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Clien
srvTLS, err := tlsinfo.ServerConfig() srvTLS, err := tlsinfo.ServerConfig()
if err != nil { if err != nil {
plog.Fatalf("could not setup TLS (%v)", err) lg.Fatal("failed to set up TLS", zap.Error(err))
} }
srvhttp.TLSConfig = srvTLS srvhttp.TLSConfig = srvTLS
return srvhttp, m.Match(cmux.Any()) return srvhttp, m.Match(cmux.Any())
} }
func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener { func mustMetricsListener(lg *zap.Logger, tlsinfo *transport.TLSInfo) net.Listener {
murl, err := url.Parse(grpcProxyMetricsListenAddr) murl, err := url.Parse(grpcProxyMetricsListenAddr)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr) fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
@@ -399,6 +413,6 @@ func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener {
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
os.Exit(1) os.Exit(1)
} }
plog.Info("grpc-proxy: listening for metrics on ", murl.String()) lg.Info("gRPC proxy listening for metrics", zap.String("address", murl.String()))
return ml return ml
} }

View File

@@ -154,12 +154,16 @@ Profiling:
List of URLs to listen on for metrics. List of URLs to listen on for metrics.
Logging: Logging:
--debug 'false' --logger 'capnslog'
Enable debug-level logging for etcd. Specify 'zap' for structured logging or 'capnslog'.
--log-package-levels ''
Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG').
--log-output 'default' --log-output 'default'
Specify 'stdout' or 'stderr' to skip journald logging even when running under systemd. Specify 'stdout' or 'stderr' to skip journald logging even when running under systemd.
--debug 'false'
Enable debug-level logging for etcd.
Logging (to be deprecated in v3.5):
--log-package-levels ''
Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG').
v2 Proxy (to be deprecated in v4): v2 Proxy (to be deprecated in v4):
--proxy 'off' --proxy 'off'

View File

@@ -21,6 +21,7 @@ import (
"github.com/coreos/go-systemd/daemon" "github.com/coreos/go-systemd/daemon"
systemdutil "github.com/coreos/go-systemd/util" systemdutil "github.com/coreos/go-systemd/util"
"go.uber.org/zap"
) )
func Main() { func Main() {
@@ -46,15 +47,28 @@ func Main() {
startEtcdOrProxyV2() startEtcdOrProxyV2()
} }
func notifySystemd() { func notifySystemd(lg *zap.Logger) {
if !systemdutil.IsRunningSystemd() { if !systemdutil.IsRunningSystemd() {
return return
} }
if lg != nil {
lg.Info("host was booted with systemd, sends READY=1 message to init daemon")
}
sent, err := daemon.SdNotify(false, "READY=1") sent, err := daemon.SdNotify(false, "READY=1")
if err != nil { if err != nil {
if lg != nil {
lg.Error("failed to notify systemd for readiness", zap.Error(err))
} else {
plog.Errorf("failed to notify systemd for readiness: %v", err) plog.Errorf("failed to notify systemd for readiness: %v", err)
} }
}
if !sent { if !sent {
if lg != nil {
lg.Warn("forgot to set Type=notify in systemd service file?")
} else {
plog.Errorf("forgot to set Type=notify in systemd service file?") plog.Errorf("forgot to set Type=notify in systemd service file?")
} }
}
} }

View File

@@ -25,6 +25,8 @@ import (
"github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"go.uber.org/zap"
) )
// ServerConfig holds the configuration of etcd as taken from the command line or discovery. // ServerConfig holds the configuration of etcd as taken from the command line or discovery.
@@ -80,6 +82,12 @@ type ServerConfig struct {
// PreVote is true to enable Raft Pre-Vote. // PreVote is true to enable Raft Pre-Vote.
PreVote bool PreVote bool
// Logger logs server-side operations.
// If not nil, it disables "capnslog" and uses the given logger.
Logger *zap.Logger
// LoggerConfig is server logger configuration for Raft logger.
LoggerConfig zap.Config
Debug bool Debug bool
ForceNewCluster bool ForceNewCluster bool
@@ -214,6 +222,8 @@ func (c *ServerConfig) PrintWithInitial() { c.print(true) }
func (c *ServerConfig) Print() { c.print(false) } func (c *ServerConfig) Print() { c.print(false) }
func (c *ServerConfig) print(initial bool) { func (c *ServerConfig) print(initial bool) {
// TODO: remove this after dropping "capnslog"
if c.Logger == nil {
plog.Infof("name = %s", c.Name) plog.Infof("name = %s", c.Name)
if c.ForceNewCluster { if c.ForceNewCluster {
plog.Infof("force new cluster") plog.Infof("force new cluster")
@@ -237,6 +247,44 @@ func (c *ServerConfig) print(initial bool) {
plog.Infof("initial advertise peer URLs = %s", c.PeerURLs) plog.Infof("initial advertise peer URLs = %s", c.PeerURLs)
plog.Infof("initial cluster = %s", c.InitialPeerURLsMap) plog.Infof("initial cluster = %s", c.InitialPeerURLsMap)
} }
} else {
caddrs := make([]string, len(c.ClientURLs))
for i := range c.ClientURLs {
caddrs[i] = c.ClientURLs[i].String()
}
paddrs := make([]string, len(c.PeerURLs))
for i := range c.PeerURLs {
paddrs[i] = c.PeerURLs[i].String()
}
state := "new"
if !c.NewCluster {
state = "existing"
}
c.Logger.Info(
"server starting",
zap.String("name", c.Name),
zap.String("data-dir", c.DataDir),
zap.String("member-dir", c.MemberDir()),
zap.String("dedicated-wal-dir", c.DedicatedWALDir),
zap.Bool("force-new-cluster", c.ForceNewCluster),
zap.Uint("heartbeat-tick-ms", c.TickMs),
zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(c.TickMs)*time.Millisecond)),
zap.Int("election-tick-ms", c.ElectionTicks),
zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond)),
zap.Uint64("snapshot-count", c.SnapCount),
zap.Strings("advertise-client-urls", caddrs),
zap.Strings("initial-advertise-peer-urls", paddrs),
zap.Bool("initial", initial),
zap.String("initial-cluster", c.InitialPeerURLsMap.String()),
zap.String("initial-cluster-state", state),
zap.String("initial-cluster-token", c.InitialClusterToken),
zap.Bool("pre-vote", c.PreVote),
zap.Bool("initial-corrupt-check", c.InitialCorruptCheck),
zap.Duration("corrupt-check-time", c.CorruptCheckTime),
zap.String("discovery-url", c.DiscoveryURL),
zap.String("discovery-proxy", c.DiscoveryProxy),
)
}
} }
func checkDuplicateURL(urlsmap types.URLsMap) bool { func checkDuplicateURL(urlsmap types.URLsMap) bool {

View File

@@ -17,6 +17,7 @@ package etcdserver
import ( import (
"encoding/json" "encoding/json"
"expvar" "expvar"
"log"
"sort" "sort"
"sync" "sync"
"time" "time"
@@ -24,6 +25,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/contention" "github.com/coreos/etcd/pkg/contention"
"github.com/coreos/etcd/pkg/logutil"
"github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft"
@@ -408,6 +410,13 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
CheckQuorum: true, CheckQuorum: true,
PreVote: cfg.PreVote, PreVote: cfg.PreVote,
} }
if cfg.Logger != nil {
// called after capnslog setting in "init" function
c.Logger, err = logutil.NewRaftLogger(cfg.LoggerConfig)
if err != nil {
log.Fatalf("cannot create raft logger %v", err)
}
}
n = raft.StartNode(c, peers) n = raft.StartNode(c, peers)
raftStatusMu.Lock() raftStatusMu.Lock()
@@ -442,6 +451,14 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
CheckQuorum: true, CheckQuorum: true,
PreVote: cfg.PreVote, PreVote: cfg.PreVote,
} }
if cfg.Logger != nil {
// called after capnslog setting in "init" function
var err error
c.Logger, err = logutil.NewRaftLogger(cfg.LoggerConfig)
if err != nil {
log.Fatalf("cannot create raft logger %v", err)
}
}
n := raft.RestartNode(c) n := raft.RestartNode(c)
raftStatusMu.Lock() raftStatusMu.Lock()
@@ -498,6 +515,14 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
CheckQuorum: true, CheckQuorum: true,
PreVote: cfg.PreVote, PreVote: cfg.PreVote,
} }
if cfg.Logger != nil {
// called after capnslog setting in "init" function
c.Logger, err = logutil.NewRaftLogger(cfg.LoggerConfig)
if err != nil {
log.Fatalf("cannot create raft logger %v", err)
}
}
n := raft.RestartNode(c) n := raft.RestartNode(c)
raftStatus = n.Status raftStatus = n.Status
return id, cl, n, s, w return id, cl, n, s, w

View File

@@ -259,12 +259,6 @@ type EtcdServer struct {
// NewServer creates a new EtcdServer from the supplied configuration. The // NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer. // configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
if cfg.PreVote {
plog.Info("Raft Pre-Vote is enabled")
} else {
plog.Info("Raft Pre-Vote is disabled")
}
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
var ( var (

View File

@@ -23,11 +23,10 @@ import (
"time" "time"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
"go.uber.org/zap"
) )
var ( var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/tcpproxy")
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/tcpproxy")
)
type remote struct { type remote struct {
mu sync.Mutex mu sync.Mutex
@@ -61,6 +60,7 @@ func (r *remote) isActive() bool {
} }
type TCPProxy struct { type TCPProxy struct {
Logger *zap.Logger
Listener net.Listener Listener net.Listener
Endpoints []*net.SRV Endpoints []*net.SRV
MonitorInterval time.Duration MonitorInterval time.Duration
@@ -86,7 +86,11 @@ func (tp *TCPProxy) Run() error {
for _, ep := range tp.Endpoints { for _, ep := range tp.Endpoints {
eps = append(eps, fmt.Sprintf("%s:%d", ep.Target, ep.Port)) eps = append(eps, fmt.Sprintf("%s:%d", ep.Target, ep.Port))
} }
if tp.Logger != nil {
tp.Logger.Info("ready to proxy client requests", zap.Strings("endpoints", eps))
} else {
plog.Printf("ready to proxy client requests to %+v", eps) plog.Printf("ready to proxy client requests to %+v", eps)
}
go tp.runMonitor() go tp.runMonitor()
for { for {
@@ -175,8 +179,12 @@ func (tp *TCPProxy) serve(in net.Conn) {
break break
} }
remote.inactivate() remote.inactivate()
if tp.Logger != nil {
tp.Logger.Warn("deactivated endpoint", zap.String("address", remote.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
} else {
plog.Warningf("deactivated endpoint [%s] due to %v for %v", remote.addr, err, tp.MonitorInterval) plog.Warningf("deactivated endpoint [%s] due to %v for %v", remote.addr, err, tp.MonitorInterval)
} }
}
if out == nil { if out == nil {
in.Close() in.Close()
@@ -205,10 +213,18 @@ func (tp *TCPProxy) runMonitor() {
} }
go func(r *remote) { go func(r *remote) {
if err := r.tryReactivate(); err != nil { if err := r.tryReactivate(); err != nil {
if tp.Logger != nil {
tp.Logger.Warn("failed to activate endpoint (stay inactive for another interval)", zap.String("address", r.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
} else {
plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval) plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
}
} else {
if tp.Logger != nil {
tp.Logger.Info("activated", zap.String("address", r.addr))
} else { } else {
plog.Printf("activated %s", r.addr) plog.Printf("activated %s", r.addr)
} }
}
}(rem) }(rem)
} }
tp.mu.Unlock() tp.mu.Unlock()