diff --git a/embed/config.go b/embed/config.go index 256eb6784..55cee0846 100644 --- a/embed/config.go +++ b/embed/config.go @@ -24,11 +24,14 @@ import ( "os" "path/filepath" "strings" + "sync" + "syscall" "time" "github.com/coreos/etcd/compactor" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/pkg/flags" + "github.com/coreos/etcd/pkg/logutil" "github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/srv" "github.com/coreos/etcd/pkg/transport" @@ -107,21 +110,12 @@ func init() { // Config holds the arguments for configuring an etcd server. type Config struct { - LPUrls, LCUrls []url.URL - Dir string `json:"data-dir"` - WalDir string `json:"wal-dir"` - MaxSnapFiles uint `json:"max-snapshots"` - MaxWalFiles uint `json:"max-wals"` - Name string `json:"name"` - SnapCount uint64 `json:"snapshot-count"` - - // AutoCompactionMode is either 'periodic' or 'revision'. - AutoCompactionMode string `json:"auto-compaction-mode"` - // AutoCompactionRetention is either duration string with time unit - // (e.g. '5m' for 5-minute), or revision unit (e.g. '5000'). - // If no time unit is provided and compaction mode is 'periodic', - // the unit defaults to hour. For example, '5' translates into 5-hour. - AutoCompactionRetention string `json:"auto-compaction-retention"` + Name string `json:"name"` + Dir string `json:"data-dir"` + WalDir string `json:"wal-dir"` + SnapCount uint64 `json:"snapshot-count"` + MaxSnapFiles uint `json:"max-snapshots"` + MaxWalFiles uint `json:"max-wals"` // TickMs is the number of milliseconds between heartbeat ticks. // TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1). @@ -132,6 +126,31 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` + LPUrls, LCUrls []url.URL + APUrls, ACUrls []url.URL + ClientTLSInfo transport.TLSInfo + ClientAutoTLS bool + PeerTLSInfo transport.TLSInfo + PeerAutoTLS bool + + ClusterState string `json:"initial-cluster-state"` + DNSCluster string `json:"discovery-srv"` + DNSClusterServiceName string `json:"discovery-srv-name"` + Dproxy string `json:"discovery-proxy"` + Durl string `json:"discovery"` + InitialCluster string `json:"initial-cluster"` + InitialClusterToken string `json:"initial-cluster-token"` + StrictReconfigCheck bool `json:"strict-reconfig-check"` + EnableV2 bool `json:"enable-v2"` + + // AutoCompactionMode is either 'periodic' or 'revision'. + AutoCompactionMode string `json:"auto-compaction-mode"` + // AutoCompactionRetention is either duration string with time unit + // (e.g. '5m' for 5-minute), or revision unit (e.g. '5000'). + // If no time unit is provided and compaction mode is 'periodic', + // the unit defaults to hour. For example, '5' translates into 5-hour. + AutoCompactionRetention string `json:"auto-compaction-retention"` + // GRPCKeepAliveMinTime is the minimum interval that a client should // wait before pinging server. When client pings "too fast", server // sends goaway and closes the connection (errors: too_many_pings, @@ -147,17 +166,6 @@ type Config struct { // before closing a non-responsive connection. 0 to disable. GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"` - APUrls, ACUrls []url.URL - ClusterState string `json:"initial-cluster-state"` - DNSCluster string `json:"discovery-srv"` - DNSClusterServiceName string `json:"discovery-srv-name"` - Dproxy string `json:"discovery-proxy"` - Durl string `json:"discovery"` - InitialCluster string `json:"initial-cluster"` - InitialClusterToken string `json:"initial-cluster-token"` - StrictReconfigCheck bool `json:"strict-reconfig-check"` - EnableV2 bool `json:"enable-v2"` - // PreVote is true to enable Raft Pre-Vote. // If enabled, Raft runs an additional election phase // to check whether it would get enough votes to win @@ -165,11 +173,6 @@ type Config struct { // TODO: enable by default in 3.5. PreVote bool `json:"pre-vote"` - ClientTLSInfo transport.TLSInfo - ClientAutoTLS bool - PeerTLSInfo transport.TLSInfo - PeerAutoTLS bool - CORS map[string]struct{} // HostWhitelist lists acceptable hostnames from HTTP client requests. @@ -198,21 +201,6 @@ type Config struct { // - https://github.com/coreos/etcd/issues/9353 HostWhitelist map[string]struct{} - // Logger logs server-side operations. - // If nil, all logs are discarded. - // TODO: make it configurable with existing logger. - // Currently, only logs TLS transport. - Logger *zap.Logger - - Debug bool `json:"debug"` - LogPkgLevels string `json:"log-package-levels"` - LogOutput string `json:"log-output"` - - EnablePprof bool `json:"enable-pprof"` - Metrics string `json:"metrics"` - ListenMetricsUrls []url.URL - ListenMetricsUrlsJSON string `json:"listen-metrics-urls"` - // UserHandlers is for registering users handlers and only used for // embedding etcd into other applications. // The map key is the route path for the handler, and @@ -235,6 +223,36 @@ type Config struct { // ForceNewCluster starts a new cluster even if previously started; unsafe. ForceNewCluster bool `json:"force-new-cluster"` + + EnablePprof bool `json:"enable-pprof"` + Metrics string `json:"metrics"` + ListenMetricsUrls []url.URL + ListenMetricsUrlsJSON string `json:"listen-metrics-urls"` + + // logger logs server-side operations. The default is nil, + // and "setupLogging" must be called before starting server. + // Do not set logger directly. + loggerMu *sync.RWMutex + logger *zap.Logger + loggerConfig zap.Config + + // Logger is logger options: "zap", "capnslog". + // WARN: "capnslog" is being deprecated in v3.5. + Logger string `json:"logger"` + + // LogOutput is either: + // - "default" as os.Stderr + // - "stderr" as os.Stderr + // - "stdout" as os.Stdout + // - file path to append server logs to + LogOutput string `json:"log-output"` + // Debug is true, to enable debug level logging. + Debug bool `json:"debug"` + + // LogPkgLevels is being deprecated in v3.5. + // Only valid if "logger" option is "capnslog". + // WARN: DO NOT USE THIS! + LogPkgLevels string `json:"log-package-levels"` } // configYAML holds the config suitable for yaml parsing @@ -271,7 +289,6 @@ func NewConfig() *Config { apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs) lcurl, _ := url.Parse(DefaultListenClientURLs) acurl, _ := url.Parse(DefaultAdvertiseClientURLs) - lg, _ := zap.NewProduction() cfg := &Config{ MaxSnapFiles: DefaultMaxSnapshots, MaxWalFiles: DefaultMaxWALs, @@ -291,14 +308,19 @@ func NewConfig() *Config { ClusterState: ClusterStateFlagNew, InitialClusterToken: "etcd-cluster", StrictReconfigCheck: DefaultStrictReconfigCheck, - Logger: lg, - LogOutput: DefaultLogOutput, Metrics: "basic", EnableV2: DefaultEnableV2, CORS: map[string]struct{}{"*": {}}, HostWhitelist: map[string]struct{}{"*": {}}, AuthToken: "simple", PreVote: false, // TODO: enable by default in v3.5 + + loggerMu: new(sync.RWMutex), + logger: nil, + Logger: "capnslog", + LogOutput: DefaultLogOutput, + Debug: false, + LogPkgLevels: "", } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg @@ -317,45 +339,150 @@ func logTLSHandshakeFailure(conn *tls.Conn, err error) { } } -// SetupLogging initializes etcd logging. -// Must be called after flag parsing. -func (cfg *Config) SetupLogging() { - cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure - cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure +// GetLogger returns the logger. +func (cfg Config) GetLogger() *zap.Logger { + cfg.loggerMu.RLock() + l := cfg.logger + cfg.loggerMu.RUnlock() + return l +} - capnslog.SetGlobalLogLevel(capnslog.INFO) - if cfg.Debug { - cfg.Logger = zap.NewExample() - capnslog.SetGlobalLogLevel(capnslog.DEBUG) - grpc.EnableTracing = true - // enable info, warning, error - grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) - } else { - // only discard info - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) - } - if cfg.LogPkgLevels != "" { - repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd") - settings, err := repoLog.ParseLogLevelConfig(cfg.LogPkgLevels) - if err != nil { - plog.Warningf("couldn't parse log level string: %s, continuing with default levels", err.Error()) - return +// setupLogging initializes etcd logging. +// Must be called after flag parsing or finishing configuring embed.Config. +func (cfg *Config) setupLogging() error { + switch cfg.Logger { + case "capnslog": // TODO: deprecate this in v3.5 + capnslog.SetGlobalLogLevel(capnslog.INFO) + cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure + cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure + + if cfg.Debug { + capnslog.SetGlobalLogLevel(capnslog.DEBUG) + grpc.EnableTracing = true + // enable info, warning, error + grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) + } else { + // only discard info + grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) } - repoLog.SetLogLevel(settings) + + // TODO: deprecate with "capnslog" + if cfg.LogPkgLevels != "" { + repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd") + settings, err := repoLog.ParseLogLevelConfig(cfg.LogPkgLevels) + if err != nil { + plog.Warningf("couldn't parse log level string: %s, continuing with default levels", err.Error()) + return nil + } + repoLog.SetLogLevel(settings) + } + + // capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr)) + // where NewDefaultFormatter returns NewJournaldFormatter when syscall.Getppid() == 1 + // specify 'stdout' or 'stderr' to skip journald logging even when running under systemd + switch cfg.LogOutput { + case "stdout": + capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stdout, cfg.Debug)) + case "stderr": + capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stderr, cfg.Debug)) + case DefaultLogOutput: + default: + plog.Panicf(`unknown log-output %q (only supports %q, "stdout", "stderr")`, cfg.LogOutput, DefaultLogOutput) + } + + case "zap": + // TODO: make this more configurable + lcfg := zap.Config{ + Level: zap.NewAtomicLevelAt(zap.InfoLevel), + Development: false, + Sampling: &zap.SamplingConfig{ + Initial: 100, + Thereafter: 100, + }, + Encoding: "json", + EncoderConfig: zap.NewProductionEncoderConfig(), + } + switch cfg.LogOutput { + case DefaultLogOutput: + if syscall.Getppid() == 1 { + // capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr)) + // where "NewDefaultFormatter" returns "NewJournaldFormatter" + // when syscall.Getppid() == 1, specify 'stdout' or 'stderr' to + // skip journald logging even when running under systemd + fmt.Println("running under init, which may be systemd!") + // TODO: capnlog.NewJournaldFormatter() + lcfg.OutputPaths = []string{"stderr"} + lcfg.ErrorOutputPaths = []string{"stderr"} + } else { + lcfg.OutputPaths = []string{"stderr"} + lcfg.ErrorOutputPaths = []string{"stderr"} + } + case "stderr": + lcfg.OutputPaths = []string{"stderr"} + lcfg.ErrorOutputPaths = []string{"stderr"} + case "stdout": + lcfg.OutputPaths = []string{"stdout"} + lcfg.ErrorOutputPaths = []string{"stdout"} + default: + lcfg.OutputPaths = []string{cfg.LogOutput} + lcfg.ErrorOutputPaths = []string{cfg.LogOutput} + } + if cfg.Debug { + lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + grpc.EnableTracing = true + } + + var err error + cfg.logger, err = lcfg.Build() + if err != nil { + return err + } + cfg.loggerConfig = lcfg + + // debug true, enable info, warning, error + // debug false, only discard info + var gl grpclog.LoggerV2 + gl, err = logutil.NewGRPCLoggerV2(lcfg) + if err != nil { + return err + } + grpclog.SetLoggerV2(gl) + + logTLSHandshakeFailure := func(conn *tls.Conn, err error) { + state := conn.ConnectionState() + remoteAddr := conn.RemoteAddr().String() + serverName := state.ServerName + if len(state.PeerCertificates) > 0 { + cert := state.PeerCertificates[0] + ips := make([]string, 0, len(cert.IPAddresses)) + for i := range cert.IPAddresses { + ips[i] = cert.IPAddresses[i].String() + } + cfg.logger.Warn( + "rejected connection", + zap.String("remote-addr", remoteAddr), + zap.String("server-name", serverName), + zap.Strings("ip-addresses", ips), + zap.Strings("dns-names", cert.DNSNames), + zap.Error(err), + ) + } else { + cfg.logger.Warn( + "rejected connection", + zap.String("remote-addr", remoteAddr), + zap.String("server-name", serverName), + zap.Error(err), + ) + } + } + cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure + cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure + + default: + return fmt.Errorf("unknown logger option %q", cfg.Logger) } - // capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr)) - // where NewDefaultFormatter returns NewJournaldFormatter when syscall.Getppid() == 1 - // specify 'stdout' or 'stderr' to skip journald logging even when running under systemd - switch cfg.LogOutput { - case "stdout": - capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stdout, cfg.Debug)) - case "stderr": - capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stderr, cfg.Debug)) - case DefaultLogOutput: - default: - plog.Panicf(`unknown log-output %q (only supports %q, "stdout", "stderr")`, cfg.LogOutput, DefaultLogOutput) - } + return nil } func ConfigFromFile(path string) (*Config, error) { @@ -382,7 +509,8 @@ func (cfg *configYAML) configFromFile(path string) error { if cfg.LPUrlsJSON != "" { u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ",")) if err != nil { - plog.Fatalf("unexpected error setting up listen-peer-urls: %v", err) + fmt.Fprintf(os.Stderr, "unexpected error setting up listen-peer-urls: %v\n", err) + os.Exit(1) } cfg.LPUrls = []url.URL(u) } @@ -390,7 +518,8 @@ func (cfg *configYAML) configFromFile(path string) error { if cfg.LCUrlsJSON != "" { u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ",")) if err != nil { - plog.Fatalf("unexpected error setting up listen-client-urls: %v", err) + fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-urls: %v\n", err) + os.Exit(1) } cfg.LCUrls = []url.URL(u) } @@ -398,7 +527,8 @@ func (cfg *configYAML) configFromFile(path string) error { if cfg.APUrlsJSON != "" { u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ",")) if err != nil { - plog.Fatalf("unexpected error setting up initial-advertise-peer-urls: %v", err) + fmt.Fprintf(os.Stderr, "unexpected error setting up initial-advertise-peer-urls: %v\n", err) + os.Exit(1) } cfg.APUrls = []url.URL(u) } @@ -406,7 +536,8 @@ func (cfg *configYAML) configFromFile(path string) error { if cfg.ACUrlsJSON != "" { u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ",")) if err != nil { - plog.Fatalf("unexpected error setting up advertise-peer-urls: %v", err) + fmt.Fprintf(os.Stderr, "unexpected error setting up advertise-peer-urls: %v\n", err) + os.Exit(1) } cfg.ACUrls = []url.URL(u) } @@ -414,7 +545,8 @@ func (cfg *configYAML) configFromFile(path string) error { if cfg.ListenMetricsUrlsJSON != "" { u, err := types.NewURLs(strings.Split(cfg.ListenMetricsUrlsJSON, ",")) if err != nil { - plog.Fatalf("unexpected error setting up listen-metrics-urls: %v", err) + fmt.Fprintf(os.Stderr, "unexpected error setting up listen-metrics-urls: %v\n", err) + os.Exit(1) } cfg.ListenMetricsUrls = []url.URL(u) } @@ -453,6 +585,9 @@ func (cfg *configYAML) configFromFile(path string) error { // Validate ensures that '*embed.Config' fields are properly configured. func (cfg *Config) Validate() error { + if err := cfg.setupLogging(); err != nil { + return err + } if err := checkBindURLs(cfg.LPUrls); err != nil { return err } @@ -532,13 +667,21 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok token = cfg.Durl case cfg.DNSCluster != "": clusterStrs, cerr := cfg.GetDNSClusterNames() - + lg := cfg.logger if cerr != nil { - plog.Errorf("couldn't resolve during SRV discovery (%v)", cerr) + if lg != nil { + lg.Error("failed to resolve during SRV discovery", zap.Error(cerr)) + } else { + plog.Errorf("couldn't resolve during SRV discovery (%v)", cerr) + } return nil, "", cerr } for _, s := range clusterStrs { - plog.Noticef("got bootstrap from DNS for etcd-server at %s", s) + if lg != nil { + lg.Info("got bootstrap from DNS for etcd-server", zap.String("node", s)) + } else { + plog.Noticef("got bootstrap from DNS for etcd-server at %s", s) + } } clusterStr := strings.Join(clusterStrs, ",") if strings.Contains(clusterStr, "https://") && cfg.PeerTLSInfo.TrustedCAFile == "" { @@ -612,10 +755,14 @@ func (cfg *Config) ClientSelfCert() (err error) { for i, u := range cfg.LCUrls { chosts[i] = u.Host } - cfg.ClientTLSInfo, err = transport.SelfCert(cfg.Logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts) + cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts) return err } else if cfg.ClientAutoTLS { - plog.Warningf("ignoring client auto TLS since certs given") + if cfg.logger != nil { + cfg.logger.Warn("ignoring client auto TLS since certs given") + } else { + plog.Warningf("ignoring client auto TLS since certs given") + } } return nil } @@ -626,10 +773,14 @@ func (cfg *Config) PeerSelfCert() (err error) { for i, u := range cfg.LPUrls { phosts[i] = u.Host } - cfg.PeerTLSInfo, err = transport.SelfCert(cfg.Logger, filepath.Join(cfg.Dir, "fixtures", "peer"), phosts) + cfg.PeerTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "peer"), phosts) return err } else if cfg.PeerAutoTLS { - plog.Warningf("ignoring peer auto TLS since certs given") + if cfg.logger != nil { + cfg.logger.Warn("ignoring peer auto TLS since certs given") + } else { + plog.Warningf("ignoring peer auto TLS since certs given") + } } return nil } diff --git a/embed/etcd.go b/embed/etcd.go index 04e743ded..723239f62 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -43,6 +43,7 @@ import ( "github.com/coreos/pkg/capnslog" "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/soheilhy/cmux" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) @@ -124,7 +125,6 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { urlsmap types.URLsMap token string ) - memberInitialized := true if !isMemberInitialized(cfg) { memberInitialized = false @@ -173,10 +173,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, PreVote: cfg.PreVote, + Logger: cfg.logger, + LoggerConfig: cfg.loggerConfig, Debug: cfg.Debug, ForceNewCluster: cfg.ForceNewCluster, } - if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { return e, err } @@ -187,7 +188,15 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { ss = append(ss, v) } sort.Strings(ss) - plog.Infof("%s starting with cors %q", e.Server.ID(), ss) + if e.cfg.logger != nil { + e.cfg.logger.Info( + "starting with CORS", + zap.String("server-id", e.Server.ID().String()), + zap.Strings("cors", ss), + ) + } else { + plog.Infof("%s starting with cors %q", e.Server.ID(), ss) + } } if len(e.cfg.HostWhitelist) > 0 { ss := make([]string, 0, len(e.cfg.HostWhitelist)) @@ -195,7 +204,15 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { ss = append(ss, v) } sort.Strings(ss) - plog.Infof("%s starting with host whitelist %q", e.Server.ID(), ss) + if e.cfg.logger != nil { + e.cfg.logger.Info( + "starting with host whitelist", + zap.String("server-id", e.Server.ID().String()), + zap.Strings("hosts", ss), + ) + } else { + plog.Infof("%s starting with host whitelist %q", e.Server.ID(), ss) + } } // buffer channel so goroutines on closed connections won't wait forever @@ -321,10 +338,18 @@ func (e *Etcd) Err() <-chan error { return e.errc } func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { if err = cfg.PeerSelfCert(); err != nil { - plog.Fatalf("could not get certs (%v)", err) + if cfg.logger != nil { + cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) + } else { + plog.Fatalf("could not get certs (%v)", err) + } } if !cfg.PeerTLSInfo.Empty() { - plog.Infof("peerTLS: %s", cfg.PeerTLSInfo) + if cfg.logger != nil { + cfg.logger.Info("starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo))) + } else { + plog.Infof("peerTLS: %s", cfg.PeerTLSInfo) + } } peers = make([]*peerListener, len(cfg.LPUrls)) @@ -334,7 +359,11 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { } for i := range peers { if peers[i] != nil && peers[i].close != nil { - plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String()) + if cfg.logger != nil { + cfg.logger.Info("stopping listening for peers", zap.String("address", cfg.LPUrls[i].String())) + } else { + plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String()) + } ctx, cancel := context.WithTimeout(context.Background(), time.Second) peers[i].close(ctx) cancel() @@ -345,10 +374,18 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { for i, u := range cfg.LPUrls { if u.Scheme == "http" { if !cfg.PeerTLSInfo.Empty() { - plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) + if cfg.logger != nil { + cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) + } else { + plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) + } } if cfg.PeerTLSInfo.ClientCertAuth { - plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + if cfg.logger != nil { + cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) + } else { + plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + } } } peers[i] = &peerListener{close: func(context.Context) error { return nil }} @@ -360,7 +397,11 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { peers[i].close = func(context.Context) error { return peers[i].Listener.Close() } - plog.Info("listening for peers on ", u.String()) + if cfg.logger != nil { + cfg.logger.Info("listening for peers", zap.String("address", u.String())) + } else { + plog.Info("listening for peers on ", u.String()) + } } return peers, nil } @@ -406,22 +447,38 @@ func (e *Etcd) servePeers() (err error) { func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { if err = cfg.ClientSelfCert(); err != nil { - plog.Fatalf("could not get certs (%v)", err) + if cfg.logger != nil { + cfg.logger.Fatal("failed to get client self-signed certs", zap.Error(err)) + } else { + plog.Fatalf("could not get certs (%v)", err) + } } if cfg.EnablePprof { - plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf) + if cfg.logger != nil { + cfg.logger.Info("pprof is enabled", zap.String("path", debugutil.HTTPPrefixPProf)) + } else { + plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf) + } } sctxs = make(map[string]*serveCtx) for _, u := range cfg.LCUrls { - sctx := newServeCtx() + sctx := newServeCtx(cfg.logger) if u.Scheme == "http" || u.Scheme == "unix" { if !cfg.ClientTLSInfo.Empty() { - plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String()) + if cfg.logger != nil { + cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String())) + } else { + plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String()) + } } if cfg.ClientTLSInfo.ClientCertAuth { - plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + if cfg.logger != nil { + cfg.logger.Warn("scheme is HTTP while --client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("client-url", u.String())) + } else { + plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + } } } if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() { @@ -452,7 +509,15 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil { if fdLimit <= reservedInternalFDNum { - plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) + if cfg.logger != nil { + cfg.logger.Fatal( + "file descriptor limit of etcd process is too low; please set higher", + zap.Uint64("limit", fdLimit), + zap.Int("recommended-limit", reservedInternalFDNum), + ) + } else { + plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) + } } sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) } @@ -463,11 +528,19 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { } } - plog.Info("listening for client requests on ", u.Host) + if cfg.logger != nil { + cfg.logger.Info("listening for client requests", zap.String("host", u.Host)) + } else { + plog.Info("listening for client requests on ", u.Host) + } defer func() { if err != nil { sctx.l.Close() - plog.Info("stopping listening for client requests on ", u.Host) + if cfg.logger != nil { + cfg.logger.Info("stopping listening for client requests", zap.String("host", u.Host)) + } else { + plog.Info("stopping listening for client requests on ", u.Host) + } } }() for k := range cfg.UserHandlers { @@ -487,7 +560,11 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { func (e *Etcd) serveClients() (err error) { if !e.cfg.ClientTLSInfo.Empty() { - plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo) + if e.cfg.logger != nil { + e.cfg.logger.Info("starting with client TLS", zap.String("tls-info", fmt.Sprintf("%+v", e.cfg.ClientTLSInfo))) + } else { + plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo) + } } // Start a client server goroutine for each listen address @@ -549,7 +626,11 @@ func (e *Etcd) serveMetrics() (err error) { } e.metricsListeners = append(e.metricsListeners, ml) go func(u url.URL, ln net.Listener) { - plog.Info("listening for metrics on ", u.String()) + if e.cfg.logger != nil { + e.cfg.logger.Info("listening for metrics", zap.String("url", u.String())) + } else { + plog.Info("listening for metrics on ", u.String()) + } e.errHandler(http.Serve(ln, metricsMux)) }(murl, ml) } @@ -569,6 +650,14 @@ func (e *Etcd) errHandler(err error) { } } +// GetLogger returns the logger. +func (e *Etcd) GetLogger() *zap.Logger { + e.cfg.loggerMu.RLock() + l := e.cfg.logger + e.cfg.loggerMu.RUnlock() + return l +} + func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) { h, err := strconv.Atoi(retention) if err == nil { diff --git a/embed/serve.go b/embed/serve.go index 5f78719a0..13b900188 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -40,12 +40,14 @@ import ( gw "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/soheilhy/cmux" "github.com/tmc/grpc-websocket-proxy/wsproxy" + "go.uber.org/zap" "golang.org/x/net/trace" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) type serveCtx struct { + lg *zap.Logger l net.Listener addr string secure bool @@ -65,10 +67,14 @@ type servers struct { http *http.Server } -func newServeCtx() *serveCtx { +func newServeCtx(lg *zap.Logger) *serveCtx { ctx, cancel := context.WithCancel(context.Background()) - return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler), - serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true + return &serveCtx{ + lg: lg, + ctx: ctx, + cancel: cancel, + userHandlers: make(map[string]http.Handler), + serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true } } @@ -83,7 +89,12 @@ func (sctx *serveCtx) serve( gopts ...grpc.ServerOption) (err error) { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) <-s.ReadyNotify() - plog.Info("ready to serve client requests") + + if sctx.lg != nil { + sctx.lg.Info("ready to server client requests") + } else { + plog.Info("ready to serve client requests") + } m := cmux.New(sctx.l) v3c := v3client.New(s) @@ -116,14 +127,21 @@ func (sctx *serveCtx) serve( httpmux := sctx.createMux(gwmux, handler) srvhttp := &http.Server{ - Handler: createAccessController(s, httpmux), + Handler: createAccessController(sctx.lg, s, httpmux), ErrorLog: logger, // do not log user error } httpl := m.Match(cmux.HTTP1()) go func() { errHandler(srvhttp.Serve(httpl)) }() sctx.serversC <- &servers{grpc: gs, http: srvhttp} - plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String()) + if sctx.lg != nil { + sctx.lg.Info( + "serving insecure client requests; this is strongly discouraged!", + zap.String("address", sctx.l.Addr().String()), + ) + } else { + plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String()) + } } if sctx.secure { @@ -159,14 +177,21 @@ func (sctx *serveCtx) serve( httpmux := sctx.createMux(gwmux, handler) srv := &http.Server{ - Handler: createAccessController(s, httpmux), + Handler: createAccessController(sctx.lg, s, httpmux), TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } go func() { errHandler(srv.Serve(tlsl)) }() sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} - plog.Infof("serving client requests on %s", sctx.l.Addr().String()) + if sctx.lg != nil { + sctx.lg.Info( + "serving client requests", + zap.String("address", sctx.l.Addr().String()), + ) + } else { + plog.Infof("serving client requests on %s", sctx.l.Addr().String()) + } } close(sctx.serversC) @@ -218,7 +243,15 @@ func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, err go func() { <-ctx.Done() if cerr := conn.Close(); cerr != nil { - plog.Warningf("failed to close conn to %s: %v", sctx.l.Addr().String(), cerr) + if sctx.lg != nil { + sctx.lg.Warn( + "failed to close connection", + zap.String("address", sctx.l.Addr().String()), + zap.Error(cerr), + ) + } else { + plog.Warningf("failed to close conn to %s: %v", sctx.l.Addr().String(), cerr) + } } }() @@ -254,11 +287,12 @@ func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http. // - mutate gRPC gateway request paths // - check hostname whitelist // client HTTP requests goes here first -func createAccessController(s *etcdserver.EtcdServer, mux *http.ServeMux) http.Handler { - return &accessController{s: s, mux: mux} +func createAccessController(lg *zap.Logger, s *etcdserver.EtcdServer, mux *http.ServeMux) http.Handler { + return &accessController{lg: lg, s: s, mux: mux} } type accessController struct { + lg *zap.Logger s *etcdserver.EtcdServer mux *http.ServeMux } @@ -272,7 +306,14 @@ func (ac *accessController) ServeHTTP(rw http.ResponseWriter, req *http.Request) if req.TLS == nil { // check origin if client connection is not secure host := httputil.GetHostname(req) if !ac.s.AccessController.IsHostWhitelisted(host) { - plog.Warningf("rejecting HTTP request from %q to prevent DNS rebinding attacks", host) + if ac.lg != nil { + ac.lg.Warn( + "rejecting HTTP request to prevent DNS rebinding attacks", + zap.String("host", host), + ) + } else { + plog.Warningf("rejecting HTTP request from %q to prevent DNS rebinding attacks", host) + } // TODO: use Go's "http.StatusMisdirectedRequest" (421) // https://github.com/golang/go/commit/4b8a7eafef039af1834ef9bfa879257c4a72b7b5 http.Error(rw, errCVE20185702(host), 421) @@ -347,7 +388,11 @@ func (ch *corsHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) { if sctx.userHandlers[s] != nil { - plog.Warningf("path %s already registered by user handler", s) + if sctx.lg != nil { + sctx.lg.Warn("path is already registered by user handler", zap.String("path", s)) + } else { + plog.Warningf("path %s already registered by user handler", s) + } return } sctx.userHandlers[s] = h diff --git a/embed/util.go b/embed/util.go index 168e03138..d76b596ee 100644 --- a/embed/util.go +++ b/embed/util.go @@ -25,6 +25,5 @@ func isMemberInitialized(cfg *Config) bool { if waldir == "" { waldir = filepath.Join(cfg.Dir, "member", "wal") } - return wal.Exist(waldir) } diff --git a/etcdmain/config.go b/etcdmain/config.go index 9bb3b8358..13cae74e1 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "io/ioutil" + "log" "net/url" "os" "runtime" @@ -31,6 +32,7 @@ import ( "github.com/coreos/etcd/version" "github.com/ghodss/yaml" + "go.uber.org/zap" ) var ( @@ -213,9 +215,10 @@ func newConfig() *config { fs.Var(flags.NewUniqueStringsValue("*"), "host-whitelist", "Comma-separated acceptable hostnames from HTTP client requests, if server is not secure (empty means allow all).") // logging - fs.BoolVar(&cfg.ec.Debug, "debug", false, "Enable debug-level logging for etcd.") - fs.StringVar(&cfg.ec.LogPkgLevels, "log-package-levels", "", "Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG').") + fs.StringVar(&cfg.ec.Logger, "logger", "capnslog", "Specify 'zap' for structured logging or 'capnslog'.") fs.StringVar(&cfg.ec.LogOutput, "log-output", embed.DefaultLogOutput, "Specify 'stdout' or 'stderr' to skip journald logging even when running under systemd.") + fs.BoolVar(&cfg.ec.Debug, "debug", false, "Enable debug-level logging for etcd.") + fs.StringVar(&cfg.ec.LogPkgLevels, "log-package-levels", "", "(To be deprecated) Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG').") // version fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.") @@ -271,18 +274,26 @@ func (cfg *config) parse(arguments []string) error { var err error if cfg.configFile != "" { - plog.Infof("Loading server configuration from %q. Other configuration command line flags and environment variables will be ignored if provided.", cfg.configFile) err = cfg.configFromFile(cfg.configFile) + if lg := cfg.ec.GetLogger(); lg != nil { + lg.Info( + "loaded server configuraionl, other configuration command line flags and environment variables will be ignored if provided", + zap.String("path", cfg.configFile), + ) + } else { + plog.Infof("Loading server configuration from %q. Other configuration command line flags and environment variables will be ignored if provided.", cfg.configFile) + } } else { err = cfg.configFromCmdLine() } + // now logger is set up return err } func (cfg *config) configFromCmdLine() error { err := flags.SetFlagsFromEnv("ETCD", cfg.cf.flagSet) if err != nil { - plog.Fatalf("%v", err) + return err } cfg.ec.LPUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") @@ -331,21 +342,21 @@ func (cfg *config) configFromFile(path string) error { if cfg.ec.ListenMetricsUrlsJSON != "" { us, err := types.NewURLs(strings.Split(cfg.ec.ListenMetricsUrlsJSON, ",")) if err != nil { - plog.Panicf("unexpected error setting up listen-metrics-urls: %v", err) + log.Fatalf("unexpected error setting up listen-metrics-urls: %v", err) } cfg.ec.ListenMetricsUrls = []url.URL(us) } if cfg.cp.FallbackJSON != "" { if err := cfg.cf.fallback.Set(cfg.cp.FallbackJSON); err != nil { - plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err) + log.Fatalf("unexpected error setting up discovery-fallback flag: %v", err) } cfg.cp.Fallback = cfg.cf.fallback.String() } if cfg.cp.ProxyJSON != "" { if err := cfg.cf.proxy.Set(cfg.cp.ProxyJSON); err != nil { - plog.Panicf("unexpected error setting up proxyFlag: %v", err) + log.Fatalf("unexpected error setting up proxyFlag: %v", err) } cfg.cp.Proxy = cfg.cf.proxy.String() } diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 5aa41aa4e..e19949177 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -39,6 +39,7 @@ import ( "github.com/coreos/etcd/version" "github.com/coreos/pkg/capnslog" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -60,42 +61,86 @@ func startEtcdOrProxyV2() { err := cfg.parse(os.Args[1:]) if err != nil { - plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err) + lg := cfg.ec.GetLogger() + if lg != nil { + lg.Error("failed to verify flags", zap.Error(err)) + } else { + plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err) + } switch err { case embed.ErrUnsetAdvertiseClientURLsFlag: - plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.") + if lg != nil { + lg.Error("advertise client URLs are not set", zap.Error(err)) + } else { + plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.") + } } os.Exit(1) } - cfg.ec.SetupLogging() - var stopped <-chan struct{} - var errc <-chan error + maxProcs, cpus := runtime.GOMAXPROCS(0), runtime.NumCPU() - plog.Infof("etcd Version: %s\n", version.Version) - plog.Infof("Git SHA: %s\n", version.GitSHA) - plog.Infof("Go Version: %s\n", runtime.Version()) - plog.Infof("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH) - - GoMaxProcs := runtime.GOMAXPROCS(0) - plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU()) + lg := cfg.ec.GetLogger() + if lg != nil { + lg.Info( + "starting etcd", + zap.String("etcd-version", version.Version), + zap.String("git-sha", version.GitSHA), + zap.String("go-version", runtime.Version()), + zap.String("go-os", runtime.GOOS), + zap.String("go-arch", runtime.GOARCH), + zap.Int("max-cpu-set", maxProcs), + zap.Int("max-cpu-available", cpus), + ) + } else { + plog.Infof("etcd Version: %s\n", version.Version) + plog.Infof("Git SHA: %s\n", version.GitSHA) + plog.Infof("Go Version: %s\n", runtime.Version()) + plog.Infof("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH) + plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", maxProcs, cpus) + } defaultHost, dhErr := (&cfg.ec).UpdateDefaultClusterFromName(defaultInitialCluster) if defaultHost != "" { - plog.Infof("advertising using detected default host %q", defaultHost) + if lg != nil { + lg.Info( + "detected default host for advertise", + zap.String("host", defaultHost), + ) + } else { + plog.Infof("advertising using detected default host %q", defaultHost) + } } if dhErr != nil { - plog.Noticef("failed to detect default host (%v)", dhErr) + if lg != nil { + lg.Info("failed to detect default host", zap.Error(dhErr)) + } else { + plog.Noticef("failed to detect default host (%v)", dhErr) + } } if cfg.ec.Dir == "" { cfg.ec.Dir = fmt.Sprintf("%v.etcd", cfg.ec.Name) - plog.Warningf("no data-dir provided, using default data-dir ./%s", cfg.ec.Dir) + if lg != nil { + lg.Warn( + "'data-dir' was empty; using default", + zap.String("data-dir", cfg.ec.Dir), + ) + } else { + plog.Warningf("no data-dir provided, using default data-dir ./%s", cfg.ec.Dir) + } } - which := identifyDataDirOrDie(cfg.ec.Dir) + var stopped <-chan struct{} + var errc <-chan error + + which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir) if which != dirEmpty { - plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which) + if lg != nil { + + } else { + plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which) + } switch which { case dirMember: stopped, errc, err = startEtcd(&cfg.ec) @@ -110,7 +155,11 @@ func startEtcdOrProxyV2() { stopped, errc, err = startEtcd(&cfg.ec) if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster { if cfg.shouldFallbackToProxy() { - plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy) + if lg != nil { + + } else { + plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy) + } shouldProxy = true } } @@ -124,36 +173,90 @@ func startEtcdOrProxyV2() { if derr, ok := err.(*etcdserver.DiscoveryError); ok { switch derr.Err { case discovery.ErrDuplicateID: - plog.Errorf("member %q has previously registered with discovery service token (%s).", cfg.ec.Name, cfg.ec.Durl) - plog.Errorf("But etcd could not find valid cluster configuration in the given data dir (%s).", cfg.ec.Dir) - plog.Infof("Please check the given data dir path if the previous bootstrap succeeded") - plog.Infof("or use a new discovery token if the previous bootstrap failed.") + if lg != nil { + lg.Error( + "member has been registered with discovery service", + zap.String("name", cfg.ec.Name), + zap.String("discovery-token", cfg.ec.Durl), + zap.Error(derr.Err), + ) + lg.Error( + "but could not find valid cluster configuration", + zap.String("data-dir", cfg.ec.Dir), + ) + lg.Warn("check data dir if previous bootstrap succeeded") + lg.Warn("or use a new discovery token if previous bootstrap failed") + } else { + plog.Errorf("member %q has previously registered with discovery service token (%s).", cfg.ec.Name, cfg.ec.Durl) + plog.Errorf("But etcd could not find valid cluster configuration in the given data dir (%s).", cfg.ec.Dir) + plog.Infof("Please check the given data dir path if the previous bootstrap succeeded") + plog.Infof("or use a new discovery token if the previous bootstrap failed.") + } case discovery.ErrDuplicateName: - plog.Errorf("member with duplicated name has registered with discovery service token(%s).", cfg.ec.Durl) - plog.Errorf("please check (cURL) the discovery token for more information.") - plog.Errorf("please do not reuse the discovery token and generate a new one to bootstrap the cluster.") + if lg != nil { + lg.Error( + "member with duplicated name has already been registered", + zap.String("discovery-token", cfg.ec.Durl), + zap.Error(derr.Err), + ) + lg.Warn("cURL the discovery token URL for details") + lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster") + } else { + plog.Errorf("member with duplicated name has registered with discovery service token(%s).", cfg.ec.Durl) + plog.Errorf("please check (cURL) the discovery token for more information.") + plog.Errorf("please do not reuse the discovery token and generate a new one to bootstrap the cluster.") + } default: - plog.Errorf("%v", err) - plog.Infof("discovery token %s was used, but failed to bootstrap the cluster.", cfg.ec.Durl) - plog.Infof("please generate a new discovery token and try to bootstrap again.") + if lg != nil { + lg.Error( + "failed to bootstrap; discovery token was already used", + zap.String("discovery-token", cfg.ec.Durl), + zap.Error(err), + ) + lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster") + } else { + plog.Errorf("%v", err) + plog.Infof("discovery token %s was used, but failed to bootstrap the cluster.", cfg.ec.Durl) + plog.Infof("please generate a new discovery token and try to bootstrap again.") + } } os.Exit(1) } if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") { - plog.Infof("%v", err) + if lg != nil { + lg.Error("failed to start", zap.Error(err)) + } else { + plog.Infof("%v", err) + } if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) { - plog.Infof("forgot to set --initial-cluster flag?") + if lg != nil { + lg.Warn("forgot to set --initial-cluster?") + } else { + plog.Infof("forgot to set --initial-cluster flag?") + } } if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs { - plog.Infof("forgot to set --initial-advertise-peer-urls flag?") + if lg != nil { + lg.Warn("forgot to set --initial-advertise-peer-urls?") + } else { + plog.Infof("forgot to set --initial-advertise-peer-urls flag?") + } } if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 { - plog.Infof("if you want to use discovery service, please set --discovery flag.") + if lg != nil { + lg.Warn("--discovery flag is not set") + } else { + plog.Infof("if you want to use discovery service, please set --discovery flag.") + } } os.Exit(1) } - plog.Fatalf("%v", err) + if lg != nil { + lg.Fatal("discovery failed", zap.Error(err)) + } else { + plog.Fatalf("%v", err) + } } osutil.HandleInterrupts() @@ -163,12 +266,16 @@ func startEtcdOrProxyV2() { // for accepting connections. The etcd instance should be // joined with the cluster and ready to serve incoming // connections. - notifySystemd() + notifySystemd(lg) select { case lerr := <-errc: // fatal out on listener errors - plog.Fatal(lerr) + if lg != nil { + lg.Fatal("listener failed", zap.Error(err)) + } else { + plog.Fatal(lerr) + } case <-stopped: } @@ -191,7 +298,12 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) { // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. func startProxy(cfg *config) error { - plog.Notice("proxy: this proxy supports v2 API only!") + lg := cfg.ec.GetLogger() + if lg != nil { + lg.Info("v2 API proxy starting") + } else { + plog.Notice("proxy: this proxy supports v2 API only!") + } clientTLSInfo := cfg.ec.ClientTLSInfo if clientTLSInfo.Empty() { @@ -209,7 +321,11 @@ func startProxy(cfg *config) error { pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost if err = cfg.ec.PeerSelfCert(); err != nil { - plog.Fatalf("could not get certs (%v)", err) + if lg != nil { + lg.Fatal("failed to get self-signed certs for peer", zap.Error(err)) + } else { + plog.Fatalf("could not get certs (%v)", err) + } } tr, err := transport.NewTimeoutTransport(cfg.ec.PeerTLSInfo, time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond) if err != nil { @@ -229,10 +345,24 @@ func startProxy(cfg *config) error { switch { case err == nil: if cfg.ec.Durl != "" { - plog.Warningf("discovery token ignored since the proxy has already been initialized. Valid cluster file found at %q", clusterfile) + if lg != nil { + lg.Warn( + "discovery token ignored since the proxy has already been initialized; valid cluster file found", + zap.String("cluster-file", clusterfile), + ) + } else { + plog.Warningf("discovery token ignored since the proxy has already been initialized. Valid cluster file found at %q", clusterfile) + } } if cfg.ec.DNSCluster != "" { - plog.Warningf("DNS SRV discovery ignored since the proxy has already been initialized. Valid cluster file found at %q", clusterfile) + if lg != nil { + lg.Warn( + "DNS SRV discovery ignored since the proxy has already been initialized; valid cluster file found", + zap.String("cluster-file", clusterfile), + ) + } else { + plog.Warningf("DNS SRV discovery ignored since the proxy has already been initialized. Valid cluster file found at %q", clusterfile) + } } urls := struct{ PeerURLs []string }{} err = json.Unmarshal(b, &urls) @@ -240,7 +370,15 @@ func startProxy(cfg *config) error { return err } peerURLs = urls.PeerURLs - plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile) + if lg != nil { + lg.Info( + "proxy using peer URLS from cluster file", + zap.Strings("peer-urls", peerURLs), + zap.String("cluster-file", clusterfile), + ) + } else { + plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile) + } case os.IsNotExist(err): var urlsmap types.URLsMap urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy") @@ -259,7 +397,11 @@ func startProxy(cfg *config) error { } } peerURLs = urlsmap.URLs() - plog.Infof("proxy: using peer urls %v ", peerURLs) + if lg != nil { + lg.Info("proxy using peer URLS", zap.Strings("peer-urls", peerURLs)) + } else { + plog.Infof("proxy: using peer urls %v ", peerURLs) + } default: return err } @@ -267,33 +409,63 @@ func startProxy(cfg *config) error { clientURLs := []string{} uf := func() []string { gcls, gerr := etcdserver.GetClusterFromRemotePeers(peerURLs, tr) - if gerr != nil { - plog.Warningf("proxy: %v", gerr) + if lg != nil { + lg.Warn( + "failed to get cluster from remote peers", + zap.Strings("peer-urls", peerURLs), + zap.Error(gerr), + ) + } else { + plog.Warningf("proxy: %v", gerr) + } return []string{} } clientURLs = gcls.ClientURLs() - urls := struct{ PeerURLs []string }{gcls.PeerURLs()} b, jerr := json.Marshal(urls) if jerr != nil { - plog.Warningf("proxy: error on marshal peer urls %s", jerr) + if lg != nil { + lg.Warn("proxy failed to marshal peer URLs", zap.Error(jerr)) + } else { + plog.Warningf("proxy: error on marshal peer urls %s", jerr) + } return clientURLs } err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600) if err != nil { - plog.Warningf("proxy: error on writing urls %s", err) + if lg != nil { + lg.Warn("proxy failed to write cluster file", zap.Error(err)) + } else { + plog.Warningf("proxy: error on writing urls %s", err) + } return clientURLs } err = os.Rename(clusterfile+".bak", clusterfile) if err != nil { - plog.Warningf("proxy: error on updating clusterfile %s", err) + if lg != nil { + lg.Warn( + "proxy failed to rename cluster file", + zap.String("path", clusterfile), + zap.Error(err), + ) + } else { + plog.Warningf("proxy: error on updating clusterfile %s", err) + } return clientURLs } if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) { - plog.Noticef("proxy: updated peer urls in cluster file from %v to %v", peerURLs, gcls.PeerURLs()) + if lg != nil { + lg.Info( + "proxy updated peer URLs", + zap.Strings("from", peerURLs), + zap.Strings("to", gcls.PeerURLs()), + ) + } else { + plog.Noticef("proxy: updated peer urls in cluster file from %v to %v", peerURLs, gcls.PeerURLs()) + } } peerURLs = gcls.PeerURLs() @@ -318,9 +490,13 @@ func startProxy(cfg *config) error { } listenerTLS := cfg.ec.ClientTLSInfo if cfg.ec.ClientAutoTLS && cTLS { - listenerTLS, err = transport.SelfCert(cfg.ec.Logger, filepath.Join(cfg.ec.Dir, "clientCerts"), cHosts) + listenerTLS, err = transport.SelfCert(cfg.ec.GetLogger(), filepath.Join(cfg.ec.Dir, "clientCerts"), cHosts) if err != nil { - plog.Fatalf("proxy: could not initialize self-signed client certs (%v)", err) + if lg != nil { + lg.Fatal("failed to initialize self-signed client cert", zap.Error(err)) + } else { + plog.Fatalf("proxy: could not initialize self-signed client certs (%v)", err) + } } } @@ -333,7 +509,11 @@ func startProxy(cfg *config) error { host := u.String() go func() { - plog.Info("proxy: listening for client requests on ", host) + if lg != nil { + lg.Info("proxy started listening on client requests", zap.String("host", host)) + } else { + plog.Info("proxy: listening for client requests on ", host) + } mux := http.NewServeMux() etcdhttp.HandlePrometheus(mux) // v2 proxy just uses the same port mux.Handle("/", ph) @@ -345,13 +525,17 @@ func startProxy(cfg *config) error { // identifyDataDirOrDie returns the type of the data dir. // Dies if the datadir is invalid. -func identifyDataDirOrDie(dir string) dirType { +func identifyDataDirOrDie(lg *zap.Logger, dir string) dirType { names, err := fileutil.ReadDir(dir) if err != nil { if os.IsNotExist(err) { return dirEmpty } - plog.Fatalf("error listing data dir: %s", dir) + if lg != nil { + lg.Fatal("failed to list data directory", zap.String("dir", dir), zap.Error(err)) + } else { + plog.Fatalf("error listing data dir: %s", dir) + } } var m, p bool @@ -362,12 +546,24 @@ func identifyDataDirOrDie(dir string) dirType { case dirProxy: p = true default: - plog.Warningf("found invalid file/dir %s under data dir %s (Ignore this if you are upgrading etcd)", name, dir) + if lg != nil { + lg.Warn( + "found invalid file under data directory", + zap.String("filename", name), + zap.String("data-dir", dir), + ) + } else { + plog.Warningf("found invalid file/dir %s under data dir %s (Ignore this if you are upgrading etcd)", name, dir) + } } } if m && p { - plog.Fatal("invalid datadir. Both member and proxy directories exist.") + if lg != nil { + lg.Fatal("invalid datadir; both member and proxy directories exist") + } else { + plog.Fatal("invalid datadir. Both member and proxy directories exist.") + } } if m { return dirMember @@ -387,9 +583,10 @@ func checkSupportArch() { // so unset here to not parse through flag defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH") if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH { - plog.Warningf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set", env) + fmt.Printf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set\n", env) return } - plog.Errorf("etcd on unsupported platform without ETCD_UNSUPPORTED_ARCH=%s set.", runtime.GOARCH) + + fmt.Printf("etcd on unsupported platform without ETCD_UNSUPPORTED_ARCH=%s set\n", runtime.GOARCH) os.Exit(1) } diff --git a/etcdmain/gateway.go b/etcdmain/gateway.go index 5487414eb..2c4e4950e 100644 --- a/etcdmain/gateway.go +++ b/etcdmain/gateway.go @@ -21,6 +21,8 @@ import ( "os" "time" + "go.uber.org/zap" + "github.com/coreos/etcd/proxy/tcpproxy" "github.com/spf13/cobra" @@ -79,16 +81,12 @@ func newGatewayStartCommand() *cobra.Command { func stripSchema(eps []string) []string { var endpoints []string - for _, ep := range eps { - if u, err := url.Parse(ep); err == nil && u.Host != "" { ep = u.Host } - endpoints = append(endpoints, ep) } - return endpoints } @@ -104,7 +102,8 @@ func startGateway(cmd *cobra.Command, args []string) { for _, ep := range srvs.Endpoints { h, p, err := net.SplitHostPort(ep) if err != nil { - plog.Fatalf("error parsing endpoint %q", ep) + fmt.Printf("error parsing endpoint %q", ep) + os.Exit(1) } var port uint16 fmt.Sscanf(p, "%d", &port) @@ -113,23 +112,33 @@ func startGateway(cmd *cobra.Command, args []string) { } if len(srvs.Endpoints) == 0 { - plog.Fatalf("no endpoints found") + fmt.Println("no endpoints found") + os.Exit(1) } - l, err := net.Listen("tcp", gatewayListenAddr) + var lg *zap.Logger + lg, err := zap.NewProduction() + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + var l net.Listener + l, err = net.Listen("tcp", gatewayListenAddr) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } tp := tcpproxy.TCPProxy{ + Logger: lg, Listener: l, Endpoints: srvs.SRVs, MonitorInterval: getewayRetryDelay, } // At this point, etcd gateway listener is initialized - notifySystemd() + notifySystemd(lg) tp.Run() } diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 8e2b2edfc..baa824894 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -17,7 +17,7 @@ package etcdmain import ( "context" "fmt" - "io/ioutil" + "log" "math" "net" "net/http" @@ -35,10 +35,10 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/debugutil" + "github.com/coreos/etcd/pkg/logutil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/proxy/grpcproxy" - "github.com/coreos/pkg/capnslog" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/soheilhy/cmux" "github.com/spf13/cobra" @@ -148,61 +148,75 @@ func newGRPCProxyStartCommand() *cobra.Command { func startGRPCProxy(cmd *cobra.Command, args []string) { checkArgs() - capnslog.SetGlobalLogLevel(capnslog.INFO) - if grpcProxyDebug { - capnslog.SetGlobalLogLevel(capnslog.DEBUG) - grpc.EnableTracing = true - // enable info, warning, error - grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) - } else { - // only discard info - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) + lcfg := zap.Config{ + Level: zap.NewAtomicLevelAt(zap.InfoLevel), + Development: false, + Sampling: &zap.SamplingConfig{ + Initial: 100, + Thereafter: 100, + }, + Encoding: "json", + EncoderConfig: zap.NewProductionEncoderConfig(), } + if grpcProxyDebug { + lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + grpc.EnableTracing = true + } + + lg, err := lcfg.Build() + if err != nil { + log.Fatal(err) + } + defer lg.Sync() + + var gl grpclog.LoggerV2 + gl, err = logutil.NewGRPCLoggerV2(lcfg) + if err != nil { + log.Fatal(err) + } + grpclog.SetLoggerV2(gl) tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey) if tlsinfo == nil && grpcProxyListenAutoTLS { host := []string{"https://" + grpcProxyListenAddr} dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy") - lg, _ := zap.NewProduction() - if grpcProxyDebug { - lg = zap.NewExample() - } autoTLS, err := transport.SelfCert(lg, dir, host) if err != nil { - plog.Fatal(err) + log.Fatal(err) } tlsinfo = &autoTLS } if tlsinfo != nil { - plog.Infof("ServerTLS: %s", tlsinfo) + lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsinfo))) } - m := mustListenCMux(tlsinfo) + m := mustListenCMux(lg, tlsinfo) grpcl := m.Match(cmux.HTTP2()) defer func() { grpcl.Close() - plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr) + lg.Info("stopping listening gRPC proxy client requests", zap.String("address", grpcProxyListenAddr)) }() - client := mustNewClient() + client := mustNewClient(lg) - srvhttp, httpl := mustHTTPListener(m, tlsinfo, client) + srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client) errc := make(chan error) - go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }() + go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }() go func() { errc <- srvhttp.Serve(httpl) }() go func() { errc <- m.Serve() }() if len(grpcProxyMetricsListenAddr) > 0 { - mhttpl := mustMetricsListener(tlsinfo) + mhttpl := mustMetricsListener(lg, tlsinfo) go func() { mux := http.NewServeMux() etcdhttp.HandlePrometheus(mux) grpcproxy.HandleHealth(mux, client) - plog.Fatal(http.Serve(mhttpl, mux)) + herr := http.Serve(mhttpl, mux) + lg.Fatal("gRPC proxy server serve returned", zap.Error(herr)) }() } // grpc-proxy is initialized, ready to serve - notifySystemd() + notifySystemd(lg) fmt.Fprintln(os.Stderr, <-errc) os.Exit(1) @@ -223,13 +237,13 @@ func checkArgs() { } } -func mustNewClient() *clientv3.Client { +func mustNewClient(lg *zap.Logger) *clientv3.Client { srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery) eps := srvs.Endpoints if len(eps) == 0 { eps = grpcProxyEndpoints } - cfg, err := newClientCfg(eps) + cfg, err := newClientCfg(lg, eps) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -246,7 +260,7 @@ func mustNewClient() *clientv3.Client { return client } -func newClientCfg(eps []string) (*clientv3.Config, error) { +func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) { // set tls if any one tls option set cfg := clientv3.Config{ Endpoints: eps, @@ -271,7 +285,7 @@ func newClientCfg(eps []string) (*clientv3.Config, error) { } clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify cfg.TLS = clientTLS - plog.Infof("ClientTLS: %s", tls) + lg.Info("gRPC proxy client TLS", zap.String("tls-info", fmt.Sprintf("%+v", tls))) } return &cfg, nil } @@ -283,7 +297,7 @@ func newTLS(ca, cert, key string) *transport.TLSInfo { return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key} } -func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux { +func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux { l, err := net.Listen("tcp", grpcProxyListenAddr) if err != nil { fmt.Fprintln(os.Stderr, err) @@ -297,25 +311,25 @@ func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux { if tlsinfo != nil { tlsinfo.CRLFile = grpcProxyListenCRL if l, err = transport.NewTLSListener(l, tlsinfo); err != nil { - plog.Fatal(err) + lg.Fatal("failed to create TLS listener", zap.Error(err)) } } - plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr) + lg.Info("listening for gRPC proxy client requests", zap.String("address", grpcProxyListenAddr)) return cmux.New(l) } -func newGRPCProxyServer(client *clientv3.Client) *grpc.Server { +func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { if grpcProxyEnableOrdering { vf := ordering.NewOrderViolationSwitchEndpointClosure(*client) client.KV = ordering.NewKV(client.KV, vf) - plog.Infof("waiting for linearized read from cluster to recover ordering") + lg.Info("waiting for linearized read from cluster to recover ordering") for { _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly()) if err == nil { break } - plog.Warningf("ordering recovery failed, retrying in 1s (%v)", err) + lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err)) time.Sleep(time.Second) } } @@ -363,7 +377,7 @@ func newGRPCProxyServer(client *clientv3.Client) *grpc.Server { return server } -func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) { +func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) { httpmux := http.NewServeMux() httpmux.HandleFunc("/", http.NotFound) etcdhttp.HandlePrometheus(httpmux) @@ -372,7 +386,7 @@ func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Clien for p, h := range debugutil.PProfHandlers() { httpmux.Handle(p, h) } - plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf) + lg.Info("gRPC proxy enabled pprof", zap.String("path", debugutil.HTTPPrefixPProf)) } srvhttp := &http.Server{Handler: httpmux} @@ -382,13 +396,13 @@ func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Clien srvTLS, err := tlsinfo.ServerConfig() if err != nil { - plog.Fatalf("could not setup TLS (%v)", err) + lg.Fatal("failed to set up TLS", zap.Error(err)) } srvhttp.TLSConfig = srvTLS return srvhttp, m.Match(cmux.Any()) } -func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener { +func mustMetricsListener(lg *zap.Logger, tlsinfo *transport.TLSInfo) net.Listener { murl, err := url.Parse(grpcProxyMetricsListenAddr) if err != nil { fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr) @@ -399,6 +413,6 @@ func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - plog.Info("grpc-proxy: listening for metrics on ", murl.String()) + lg.Info("gRPC proxy listening for metrics", zap.String("address", murl.String())) return ml } diff --git a/etcdmain/help.go b/etcdmain/help.go index 72556ddc2..a825628d5 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -154,12 +154,16 @@ Profiling: List of URLs to listen on for metrics. Logging: - --debug 'false' - Enable debug-level logging for etcd. - --log-package-levels '' - Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG'). + --logger 'capnslog' + Specify 'zap' for structured logging or 'capnslog'. --log-output 'default' Specify 'stdout' or 'stderr' to skip journald logging even when running under systemd. + --debug 'false' + Enable debug-level logging for etcd. + +Logging (to be deprecated in v3.5): + --log-package-levels '' + Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG'). v2 Proxy (to be deprecated in v4): --proxy 'off' diff --git a/etcdmain/main.go b/etcdmain/main.go index 06bbae56b..b4f91d816 100644 --- a/etcdmain/main.go +++ b/etcdmain/main.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/go-systemd/daemon" systemdutil "github.com/coreos/go-systemd/util" + "go.uber.org/zap" ) func Main() { @@ -46,15 +47,28 @@ func Main() { startEtcdOrProxyV2() } -func notifySystemd() { +func notifySystemd(lg *zap.Logger) { if !systemdutil.IsRunningSystemd() { return } + + if lg != nil { + lg.Info("host was booted with systemd, sends READY=1 message to init daemon") + } sent, err := daemon.SdNotify(false, "READY=1") if err != nil { - plog.Errorf("failed to notify systemd for readiness: %v", err) + if lg != nil { + lg.Error("failed to notify systemd for readiness", zap.Error(err)) + } else { + plog.Errorf("failed to notify systemd for readiness: %v", err) + } } + if !sent { - plog.Errorf("forgot to set Type=notify in systemd service file?") + if lg != nil { + lg.Warn("forgot to set Type=notify in systemd service file?") + } else { + plog.Errorf("forgot to set Type=notify in systemd service file?") + } } } diff --git a/etcdserver/config.go b/etcdserver/config.go index 70dbf944b..b518bcd0d 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -25,6 +25,8 @@ import ( "github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" + + "go.uber.org/zap" ) // ServerConfig holds the configuration of etcd as taken from the command line or discovery. @@ -80,6 +82,12 @@ type ServerConfig struct { // PreVote is true to enable Raft Pre-Vote. PreVote bool + // Logger logs server-side operations. + // If not nil, it disables "capnslog" and uses the given logger. + Logger *zap.Logger + // LoggerConfig is server logger configuration for Raft logger. + LoggerConfig zap.Config + Debug bool ForceNewCluster bool @@ -214,28 +222,68 @@ func (c *ServerConfig) PrintWithInitial() { c.print(true) } func (c *ServerConfig) Print() { c.print(false) } func (c *ServerConfig) print(initial bool) { - plog.Infof("name = %s", c.Name) - if c.ForceNewCluster { - plog.Infof("force new cluster") - } - plog.Infof("data dir = %s", c.DataDir) - plog.Infof("member dir = %s", c.MemberDir()) - if c.DedicatedWALDir != "" { - plog.Infof("dedicated WAL dir = %s", c.DedicatedWALDir) - } - plog.Infof("heartbeat = %dms", c.TickMs) - plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs)) - plog.Infof("snapshot count = %d", c.SnapCount) - if len(c.DiscoveryURL) != 0 { - plog.Infof("discovery URL= %s", c.DiscoveryURL) - if len(c.DiscoveryProxy) != 0 { - plog.Infof("discovery proxy = %s", c.DiscoveryProxy) + // TODO: remove this after dropping "capnslog" + if c.Logger == nil { + plog.Infof("name = %s", c.Name) + if c.ForceNewCluster { + plog.Infof("force new cluster") } - } - plog.Infof("advertise client URLs = %s", c.ClientURLs) - if initial { - plog.Infof("initial advertise peer URLs = %s", c.PeerURLs) - plog.Infof("initial cluster = %s", c.InitialPeerURLsMap) + plog.Infof("data dir = %s", c.DataDir) + plog.Infof("member dir = %s", c.MemberDir()) + if c.DedicatedWALDir != "" { + plog.Infof("dedicated WAL dir = %s", c.DedicatedWALDir) + } + plog.Infof("heartbeat = %dms", c.TickMs) + plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs)) + plog.Infof("snapshot count = %d", c.SnapCount) + if len(c.DiscoveryURL) != 0 { + plog.Infof("discovery URL= %s", c.DiscoveryURL) + if len(c.DiscoveryProxy) != 0 { + plog.Infof("discovery proxy = %s", c.DiscoveryProxy) + } + } + plog.Infof("advertise client URLs = %s", c.ClientURLs) + if initial { + plog.Infof("initial advertise peer URLs = %s", c.PeerURLs) + plog.Infof("initial cluster = %s", c.InitialPeerURLsMap) + } + } else { + caddrs := make([]string, len(c.ClientURLs)) + for i := range c.ClientURLs { + caddrs[i] = c.ClientURLs[i].String() + } + paddrs := make([]string, len(c.PeerURLs)) + for i := range c.PeerURLs { + paddrs[i] = c.PeerURLs[i].String() + } + state := "new" + if !c.NewCluster { + state = "existing" + } + c.Logger.Info( + "server starting", + zap.String("name", c.Name), + zap.String("data-dir", c.DataDir), + zap.String("member-dir", c.MemberDir()), + zap.String("dedicated-wal-dir", c.DedicatedWALDir), + zap.Bool("force-new-cluster", c.ForceNewCluster), + zap.Uint("heartbeat-tick-ms", c.TickMs), + zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(c.TickMs)*time.Millisecond)), + zap.Int("election-tick-ms", c.ElectionTicks), + zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond)), + zap.Uint64("snapshot-count", c.SnapCount), + zap.Strings("advertise-client-urls", caddrs), + zap.Strings("initial-advertise-peer-urls", paddrs), + zap.Bool("initial", initial), + zap.String("initial-cluster", c.InitialPeerURLsMap.String()), + zap.String("initial-cluster-state", state), + zap.String("initial-cluster-token", c.InitialClusterToken), + zap.Bool("pre-vote", c.PreVote), + zap.Bool("initial-corrupt-check", c.InitialCorruptCheck), + zap.Duration("corrupt-check-time", c.CorruptCheckTime), + zap.String("discovery-url", c.DiscoveryURL), + zap.String("discovery-proxy", c.DiscoveryProxy), + ) } } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 0228d2f39..dce740703 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -17,6 +17,7 @@ package etcdserver import ( "encoding/json" "expvar" + "log" "sort" "sync" "time" @@ -24,6 +25,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/contention" + "github.com/coreos/etcd/pkg/logutil" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -408,6 +410,13 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id CheckQuorum: true, PreVote: cfg.PreVote, } + if cfg.Logger != nil { + // called after capnslog setting in "init" function + c.Logger, err = logutil.NewRaftLogger(cfg.LoggerConfig) + if err != nil { + log.Fatalf("cannot create raft logger %v", err) + } + } n = raft.StartNode(c, peers) raftStatusMu.Lock() @@ -442,6 +451,14 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member CheckQuorum: true, PreVote: cfg.PreVote, } + if cfg.Logger != nil { + // called after capnslog setting in "init" function + var err error + c.Logger, err = logutil.NewRaftLogger(cfg.LoggerConfig) + if err != nil { + log.Fatalf("cannot create raft logger %v", err) + } + } n := raft.RestartNode(c) raftStatusMu.Lock() @@ -498,6 +515,14 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types CheckQuorum: true, PreVote: cfg.PreVote, } + if cfg.Logger != nil { + // called after capnslog setting in "init" function + c.Logger, err = logutil.NewRaftLogger(cfg.LoggerConfig) + if err != nil { + log.Fatalf("cannot create raft logger %v", err) + } + } + n := raft.RestartNode(c) raftStatus = n.Status return id, cl, n, s, w diff --git a/etcdserver/server.go b/etcdserver/server.go index 02f3a144d..ccab11100 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -259,12 +259,6 @@ type EtcdServer struct { // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { - if cfg.PreVote { - plog.Info("Raft Pre-Vote is enabled") - } else { - plog.Info("Raft Pre-Vote is disabled") - } - st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) var ( diff --git a/proxy/tcpproxy/userspace.go b/proxy/tcpproxy/userspace.go index 6dc1d1d6d..381f4b8d3 100644 --- a/proxy/tcpproxy/userspace.go +++ b/proxy/tcpproxy/userspace.go @@ -23,11 +23,10 @@ import ( "time" "github.com/coreos/pkg/capnslog" + "go.uber.org/zap" ) -var ( - plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/tcpproxy") -) +var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/tcpproxy") type remote struct { mu sync.Mutex @@ -61,6 +60,7 @@ func (r *remote) isActive() bool { } type TCPProxy struct { + Logger *zap.Logger Listener net.Listener Endpoints []*net.SRV MonitorInterval time.Duration @@ -86,7 +86,11 @@ func (tp *TCPProxy) Run() error { for _, ep := range tp.Endpoints { eps = append(eps, fmt.Sprintf("%s:%d", ep.Target, ep.Port)) } - plog.Printf("ready to proxy client requests to %+v", eps) + if tp.Logger != nil { + tp.Logger.Info("ready to proxy client requests", zap.Strings("endpoints", eps)) + } else { + plog.Printf("ready to proxy client requests to %+v", eps) + } go tp.runMonitor() for { @@ -175,7 +179,11 @@ func (tp *TCPProxy) serve(in net.Conn) { break } remote.inactivate() - plog.Warningf("deactivated endpoint [%s] due to %v for %v", remote.addr, err, tp.MonitorInterval) + if tp.Logger != nil { + tp.Logger.Warn("deactivated endpoint", zap.String("address", remote.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err)) + } else { + plog.Warningf("deactivated endpoint [%s] due to %v for %v", remote.addr, err, tp.MonitorInterval) + } } if out == nil { @@ -205,9 +213,17 @@ func (tp *TCPProxy) runMonitor() { } go func(r *remote) { if err := r.tryReactivate(); err != nil { - plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval) + if tp.Logger != nil { + tp.Logger.Warn("failed to activate endpoint (stay inactive for another interval)", zap.String("address", r.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err)) + } else { + plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval) + } } else { - plog.Printf("activated %s", r.addr) + if tp.Logger != nil { + tp.Logger.Info("activated", zap.String("address", r.addr)) + } else { + plog.Printf("activated %s", r.addr) + } } }(rem) }