From 6de105e89b753e851735463844ec41b9967ffe95 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 10 Mar 2023 13:35:18 +0100 Subject: [PATCH 1/7] refactor: Use proper variable names for urls Signed-off-by: Marek Siarkowicz --- clientv3/snapshot/member_test.go | 4 +- clientv3/snapshot/v3_snapshot_test.go | 17 ++-- embed/config.go | 134 +++++++++++++------------- embed/config_test.go | 30 +++--- embed/etcd.go | 38 ++++---- etcdmain/config.go | 10 +- etcdmain/config_test.go | 50 +++++----- etcdmain/etcd.go | 8 +- integration/embed_test.go | 10 +- tools/etcd-dump-metrics/etcd.go | 4 +- 10 files changed, 153 insertions(+), 152 deletions(-) diff --git a/clientv3/snapshot/member_test.go b/clientv3/snapshot/member_test.go index a42066a56..14335304d 100644 --- a/clientv3/snapshot/member_test.go +++ b/clientv3/snapshot/member_test.go @@ -69,8 +69,8 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) { cfg.Name = "3" cfg.InitialClusterToken = testClusterTkn cfg.ClusterState = "existing" - cfg.LCUrls, cfg.ACUrls = newCURLs, newCURLs - cfg.LPUrls, cfg.APUrls = newPURLs, newPURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = newCURLs, newCURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = newPURLs, newPURLs cfg.InitialCluster = "" for i := 0; i < clusterN; i++ { cfg.InitialCluster += fmt.Sprintf(",%d=%s", i, pURLs[i].String()) diff --git a/clientv3/snapshot/v3_snapshot_test.go b/clientv3/snapshot/v3_snapshot_test.go index 0a4c3096c..748e6415a 100644 --- a/clientv3/snapshot/v3_snapshot_test.go +++ b/clientv3/snapshot/v3_snapshot_test.go @@ -51,8 +51,8 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { cfg.Name = "s1" cfg.InitialClusterToken = testClusterTkn cfg.ClusterState = "existing" - cfg.LCUrls, cfg.ACUrls = cURLs, cURLs - cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) @@ -87,7 +87,8 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { } var cli *clientv3.Client - cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}) + cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}}) + if err != nil { t.Fatal(err) } @@ -203,8 +204,8 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { cfg.Debug = false cfg.Name = "default" cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = cURLs, cURLs - cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) srv, err := embed.StartEtcd(cfg) @@ -221,7 +222,7 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { t.Fatalf("failed to start embed.Etcd for creating snapshots") } - ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} + ccfg := clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}} cli, err := clientv3.New(ccfg) if err != nil { t.Fatal(err) @@ -271,8 +272,8 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) ( cfg.Name = fmt.Sprintf("%d", i) cfg.InitialClusterToken = testClusterTkn cfg.ClusterState = "existing" - cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]} - cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]} + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} cfg.InitialCluster = ics cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i)) diff --git a/embed/config.go b/embed/config.go index 0b7c2f49b..8755bb52e 100644 --- a/embed/config.go +++ b/embed/config.go @@ -183,12 +183,12 @@ type Config struct { // streams that each client can open at a time. MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` - LPUrls, LCUrls []url.URL - APUrls, ACUrls []url.URL - ClientTLSInfo transport.TLSInfo - ClientAutoTLS bool - PeerTLSInfo transport.TLSInfo - PeerAutoTLS bool + ListenPeerUrls, ListenClientUrls []url.URL + AdvertisePeerUrls, AdvertiseClientUrls []url.URL + ClientTLSInfo transport.TLSInfo + ClientAutoTLS bool + PeerTLSInfo transport.TLSInfo + PeerAutoTLS bool // CipherSuites is a list of supported TLS cipher suites between // client/server and peers. If empty, Go auto-populates the list. @@ -373,10 +373,10 @@ type configYAML struct { // configJSON has file options that are translated into Config options type configJSON struct { - LPUrlsJSON string `json:"listen-peer-urls"` - LCUrlsJSON string `json:"listen-client-urls"` - APUrlsJSON string `json:"initial-advertise-peer-urls"` - ACUrlsJSON string `json:"advertise-client-urls"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + AdvertisePeerUrls string `json:"initial-advertise-peer-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` CORSJSON string `json:"cors"` HostWhitelistJSON string `json:"host-whitelist"` @@ -421,10 +421,10 @@ func NewConfig() *Config { ElectionMs: 1000, InitialElectionTickAdvance: true, - LPUrls: []url.URL{*lpurl}, - LCUrls: []url.URL{*lcurl}, - APUrls: []url.URL{*apurl}, - ACUrls: []url.URL{*acurl}, + ListenPeerUrls: []url.URL{*lpurl}, + ListenClientUrls: []url.URL{*lcurl}, + AdvertisePeerUrls: []url.URL{*apurl}, + AdvertiseClientUrls: []url.URL{*acurl}, ClusterState: ClusterStateFlagNew, InitialClusterToken: "etcd-cluster", @@ -489,40 +489,40 @@ func (cfg *configYAML) configFromFile(path string) error { return err } - if cfg.LPUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ",")) + if cfg.configJSON.ListenPeerUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenPeerUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up listen-peer-urls: %v\n", err) os.Exit(1) } - cfg.LPUrls = []url.URL(u) + cfg.Config.ListenPeerUrls = u } - if cfg.LCUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ",")) + if cfg.configJSON.ListenClientUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-urls: %v\n", err) os.Exit(1) } - cfg.LCUrls = []url.URL(u) + cfg.Config.ListenClientUrls = u } - if cfg.APUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ",")) + if cfg.configJSON.AdvertisePeerUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up initial-advertise-peer-urls: %v\n", err) os.Exit(1) } - cfg.APUrls = []url.URL(u) + cfg.Config.AdvertisePeerUrls = u } - if cfg.ACUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ",")) + if cfg.configJSON.AdvertiseClientUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertiseClientUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up advertise-peer-urls: %v\n", err) os.Exit(1) } - cfg.ACUrls = []url.URL(u) + cfg.Config.AdvertiseClientUrls = u } if cfg.ListenMetricsUrlsJSON != "" { @@ -596,21 +596,21 @@ func (cfg *Config) Validate() error { if err := cfg.setupLogging(); err != nil { return err } - if err := checkBindURLs(cfg.LPUrls); err != nil { + if err := checkBindURLs(cfg.ListenPeerUrls); err != nil { return err } - if err := checkBindURLs(cfg.LCUrls); err != nil { + if err := checkBindURLs(cfg.ListenClientUrls); err != nil { return err } if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil { return err } - if err := checkHostURLs(cfg.APUrls); err != nil { - addrs := cfg.getAPURLs() + if err := checkHostURLs(cfg.AdvertisePeerUrls); err != nil { + addrs := cfg.getAdvertisePeerUrls() return fmt.Errorf(`--initial-advertise-peer-urls %q must be "host:port" (%v)`, strings.Join(addrs, ","), err) } - if err := checkHostURLs(cfg.ACUrls); err != nil { - addrs := cfg.getACURLs() + if err := checkHostURLs(cfg.AdvertiseClientUrls); err != nil { + addrs := cfg.getAdvertiseClientUrls() return fmt.Errorf(`--advertise-client-urls %q must be "host:port" (%v)`, strings.Join(addrs, ","), err) } // Check if conflicting flags are passed. @@ -643,7 +643,7 @@ func (cfg *Config) Validate() error { } // check this last since proxying in etcdmain may make this OK - if cfg.LCUrls != nil && cfg.ACUrls == nil { + if cfg.ListenClientUrls != nil && cfg.AdvertiseClientUrls == nil { return ErrUnsetAdvertiseClientURLsFlag } @@ -692,7 +692,7 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok urlsmap = types.URLsMap{} // If using discovery, generate a temporary cluster based on // self's advertised peer URLs - urlsmap[cfg.Name] = cfg.APUrls + urlsmap[cfg.Name] = cfg.AdvertisePeerUrls token = cfg.Durl case cfg.DNSCluster != "": @@ -748,7 +748,7 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { // Use both etcd-server-ssl and etcd-server for discovery. // Combine the results if both are available. - clusterStrs, cerr = srv.GetCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls) + clusterStrs, cerr = srv.GetCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.AdvertisePeerUrls) if cerr != nil { clusterStrs = make([]string, 0) } @@ -759,13 +759,13 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { zap.String("service-name", "etcd-server-ssl"+serviceNameSuffix), zap.String("server-name", cfg.Name), zap.String("discovery-srv", cfg.DNSCluster), - zap.Strings("advertise-peer-urls", cfg.getAPURLs()), + zap.Strings("advertise-peer-urls", cfg.getAdvertisePeerUrls()), zap.Strings("found-cluster", clusterStrs), zap.Error(cerr), ) } - defaultHTTPClusterStrs, httpCerr := srv.GetCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls) + defaultHTTPClusterStrs, httpCerr := srv.GetCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.AdvertisePeerUrls) if httpCerr != nil { clusterStrs = append(clusterStrs, defaultHTTPClusterStrs...) } @@ -776,7 +776,7 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { zap.String("service-name", "etcd-server"+serviceNameSuffix), zap.String("server-name", cfg.Name), zap.String("discovery-srv", cfg.DNSCluster), - zap.Strings("advertise-peer-urls", cfg.getAPURLs()), + zap.Strings("advertise-peer-urls", cfg.getAdvertisePeerUrls()), zap.Strings("found-cluster", clusterStrs), zap.Error(httpCerr), ) @@ -786,15 +786,15 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { } func (cfg Config) InitialClusterFromName(name string) (ret string) { - if len(cfg.APUrls) == 0 { + if len(cfg.AdvertisePeerUrls) == 0 { return "" } n := name if name == "" { n = DefaultName } - for i := range cfg.APUrls { - ret = ret + "," + n + "=" + cfg.APUrls[i].String() + for i := range cfg.AdvertisePeerUrls { + ret = ret + "," + n + "=" + cfg.AdvertisePeerUrls[i].String() } return ret[1:] } @@ -803,11 +803,11 @@ func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateF func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) } func (cfg Config) defaultPeerHost() bool { - return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs + return len(cfg.AdvertisePeerUrls) == 1 && cfg.AdvertisePeerUrls[0].String() == DefaultInitialAdvertisePeerURLs } func (cfg Config) defaultClientHost() bool { - return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs + return len(cfg.AdvertiseClientUrls) == 1 && cfg.AdvertiseClientUrls[0].String() == DefaultAdvertiseClientURLs } func (cfg *Config) ClientSelfCert() (err error) { @@ -822,8 +822,8 @@ func (cfg *Config) ClientSelfCert() (err error) { } return nil } - chosts := make([]string, len(cfg.LCUrls)) - for i, u := range cfg.LCUrls { + chosts := make([]string, len(cfg.ListenClientUrls)) + for i, u := range cfg.ListenClientUrls { chosts[i] = u.Host } cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts) @@ -845,8 +845,8 @@ func (cfg *Config) PeerSelfCert() (err error) { } return nil } - phosts := make([]string, len(cfg.LPUrls)) - for i, u := range cfg.LPUrls { + phosts := make([]string, len(cfg.ListenPeerUrls)) + for i, u := range cfg.ListenPeerUrls { phosts[i] = u.Host } cfg.PeerTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "peer"), phosts) @@ -874,9 +874,9 @@ func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (s } used := false - pip, pport := cfg.LPUrls[0].Hostname(), cfg.LPUrls[0].Port() + pip, pport := cfg.ListenPeerUrls[0].Hostname(), cfg.ListenPeerUrls[0].Port() if cfg.defaultPeerHost() && pip == "0.0.0.0" { - cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)} + cfg.AdvertisePeerUrls[0] = url.URL{Scheme: cfg.AdvertisePeerUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)} used = true } // update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc') @@ -884,9 +884,9 @@ func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (s cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) } - cip, cport := cfg.LCUrls[0].Hostname(), cfg.LCUrls[0].Port() + cip, cport := cfg.ListenClientUrls[0].Hostname(), cfg.ListenClientUrls[0].Port() if cfg.defaultClientHost() && cip == "0.0.0.0" { - cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)} + cfg.AdvertiseClientUrls[0] = url.URL{Scheme: cfg.AdvertiseClientUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)} used = true } dhost := defaultHostname @@ -931,34 +931,34 @@ func checkHostURLs(urls []url.URL) error { return nil } -func (cfg *Config) getAPURLs() (ss []string) { - ss = make([]string, len(cfg.APUrls)) - for i := range cfg.APUrls { - ss[i] = cfg.APUrls[i].String() +func (cfg *Config) getAdvertisePeerUrls() (ss []string) { + ss = make([]string, len(cfg.AdvertisePeerUrls)) + for i := range cfg.AdvertisePeerUrls { + ss[i] = cfg.AdvertisePeerUrls[i].String() } return ss } -func (cfg *Config) getLPURLs() (ss []string) { - ss = make([]string, len(cfg.LPUrls)) - for i := range cfg.LPUrls { - ss[i] = cfg.LPUrls[i].String() +func (cfg *Config) getListenPeerUrls() (ss []string) { + ss = make([]string, len(cfg.ListenPeerUrls)) + for i := range cfg.ListenPeerUrls { + ss[i] = cfg.ListenPeerUrls[i].String() } return ss } -func (cfg *Config) getACURLs() (ss []string) { - ss = make([]string, len(cfg.ACUrls)) - for i := range cfg.ACUrls { - ss[i] = cfg.ACUrls[i].String() +func (cfg *Config) getAdvertiseClientUrls() (ss []string) { + ss = make([]string, len(cfg.AdvertiseClientUrls)) + for i := range cfg.AdvertiseClientUrls { + ss[i] = cfg.AdvertiseClientUrls[i].String() } return ss } -func (cfg *Config) getLCURLs() (ss []string) { - ss = make([]string, len(cfg.LCUrls)) - for i := range cfg.LCUrls { - ss[i] = cfg.LCUrls[i].String() +func (cfg *Config) getListenClientUrls() (ss []string) { + ss = make([]string, len(cfg.ListenClientUrls)) + for i := range cfg.ListenClientUrls { + ss[i] = cfg.ListenClientUrls[i].String() } return ss } diff --git a/embed/config_test.go b/embed/config_test.go index 8a06521b0..dad5a45a8 100644 --- a/embed/config_test.go +++ b/embed/config_test.go @@ -77,12 +77,12 @@ func TestConfigFileOtherFields(t *testing.T) { func TestUpdateDefaultClusterFromName(t *testing.T) { cfg := NewConfig() defaultInitialCluster := cfg.InitialCluster - oldscheme := cfg.APUrls[0].Scheme - origpeer := cfg.APUrls[0].String() - origadvc := cfg.ACUrls[0].String() + oldscheme := cfg.AdvertisePeerUrls[0].Scheme + origpeer := cfg.AdvertisePeerUrls[0].String() + origadvc := cfg.AdvertiseClientUrls[0].String() cfg.Name = "abc" - lpport := cfg.LPUrls[0].Port() + lpport := cfg.ListenPeerUrls[0].Port() // in case of 'etcd --name=abc' exp := fmt.Sprintf("%s=%s://localhost:%s", cfg.Name, oldscheme, lpport) @@ -91,12 +91,12 @@ func TestUpdateDefaultClusterFromName(t *testing.T) { t.Fatalf("initial-cluster expected %q, got %q", exp, cfg.InitialCluster) } // advertise peer URL should not be affected - if origpeer != cfg.APUrls[0].String() { - t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.APUrls[0].String()) + if origpeer != cfg.AdvertisePeerUrls[0].String() { + t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.AdvertisePeerUrls[0].String()) } // advertise client URL should not be affected - if origadvc != cfg.ACUrls[0].String() { - t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.ACUrls[0].String()) + if origadvc != cfg.AdvertiseClientUrls[0].String() { + t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.AdvertiseClientUrls[0].String()) } } @@ -109,17 +109,17 @@ func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) { cfg := NewConfig() defaultInitialCluster := cfg.InitialCluster - oldscheme := cfg.APUrls[0].Scheme - origadvc := cfg.ACUrls[0].String() + oldscheme := cfg.AdvertisePeerUrls[0].Scheme + origadvc := cfg.AdvertiseClientUrls[0].String() cfg.Name = "abc" - lpport := cfg.LPUrls[0].Port() - cfg.LPUrls[0] = url.URL{Scheme: cfg.LPUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)} + lpport := cfg.ListenPeerUrls[0].Port() + cfg.ListenPeerUrls[0] = url.URL{Scheme: cfg.ListenPeerUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)} dhost, _ := cfg.UpdateDefaultClusterFromName(defaultInitialCluster) if dhost != defaultHostname { t.Fatalf("expected default host %q, got %q", defaultHostname, dhost) } - aphost, apport := cfg.APUrls[0].Hostname(), cfg.APUrls[0].Port() + aphost, apport := cfg.AdvertisePeerUrls[0].Hostname(), cfg.AdvertisePeerUrls[0].Port() if apport != lpport { t.Fatalf("advertise peer url got different port %s, expected %s", apport, lpport) } @@ -132,8 +132,8 @@ func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) { } // advertise client URL should not be affected - if origadvc != cfg.ACUrls[0].String() { - t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.ACUrls[0].String()) + if origadvc != cfg.AdvertiseClientUrls[0].String() { + t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.AdvertiseClientUrls[0].String()) } } diff --git a/embed/etcd.go b/embed/etcd.go index 5a8638667..16b2d9024 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -116,7 +116,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if e.cfg.logger != nil { e.cfg.logger.Info( "configuring peer listeners", - zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), + zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()), ) } if e.Peers, err = configurePeerListeners(cfg); err != nil { @@ -126,7 +126,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if e.cfg.logger != nil { e.cfg.logger.Info( "configuring client listeners", - zap.Strings("listen-client-urls", e.cfg.getLCURLs()), + zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()), ) } if e.sctxs, err = configureClientListeners(cfg); err != nil { @@ -163,8 +163,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { srvcfg := etcdserver.ServerConfig{ Name: cfg.Name, - ClientURLs: cfg.ACUrls, - PeerURLs: cfg.APUrls, + ClientURLs: cfg.AdvertiseClientUrls, + PeerURLs: cfg.AdvertisePeerUrls, DataDir: cfg.Dir, DedicatedWALDir: cfg.WalDir, SnapshotCount: cfg.SnapshotCount, @@ -247,10 +247,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { e.cfg.logger.Info( "now serving peer/client/metrics", zap.String("local-member-id", e.Server.ID().String()), - zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()), - zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), - zap.Strings("advertise-client-urls", e.cfg.getACURLs()), - zap.Strings("listen-client-urls", e.cfg.getLCURLs()), + zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerUrls()), + zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()), + zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()), + zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()), zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()), ) } @@ -325,10 +325,10 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali zap.Uint("max-wals", sc.MaxWALFiles), zap.Uint("max-snapshots", sc.MaxSnapFiles), zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries), - zap.Strings("initial-advertise-peer-urls", ec.getAPURLs()), - zap.Strings("listen-peer-urls", ec.getLPURLs()), - zap.Strings("advertise-client-urls", ec.getACURLs()), - zap.Strings("listen-client-urls", ec.getLCURLs()), + zap.Strings("initial-advertise-peer-urls", ec.getAdvertisePeerUrls()), + zap.Strings("listen-peer-urls", ec.getListenPeerUrls()), + zap.Strings("advertise-client-urls", ec.getAdvertiseClientUrls()), + zap.Strings("listen-client-urls", ec.getListenClientUrls()), zap.Strings("listen-metrics-urls", ec.getMetricsURLs()), zap.Strings("cors", cors), zap.Strings("host-whitelist", hss), @@ -363,8 +363,8 @@ func (e *Etcd) Close() { fields := []zap.Field{ zap.String("name", e.cfg.Name), zap.String("data-dir", e.cfg.Dir), - zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()), - zap.Strings("advertise-client-urls", e.cfg.getACURLs()), + zap.Strings("advertise-peer-urls", e.cfg.getAdvertisePeerUrls()), + zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()), } lg := e.GetLogger() if lg != nil { @@ -486,7 +486,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { } } - peers = make([]*peerListener, len(cfg.LPUrls)) + peers = make([]*peerListener, len(cfg.ListenPeerUrls)) defer func() { if err == nil { return @@ -496,11 +496,11 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { if cfg.logger != nil { cfg.logger.Warn( "closing peer listener", - zap.String("address", cfg.LPUrls[i].String()), + zap.String("address", cfg.ListenPeerUrls[i].String()), zap.Error(err), ) } else { - plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String()) + plog.Info("stopping listening for peers on ", cfg.ListenPeerUrls[i].String()) } ctx, cancel := context.WithTimeout(context.Background(), time.Second) peers[i].close(ctx) @@ -509,7 +509,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { } }() - for i, u := range cfg.LPUrls { + for i, u := range cfg.ListenPeerUrls { if u.Scheme == "http" { if !cfg.PeerTLSInfo.Empty() { if cfg.logger != nil { @@ -623,7 +623,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } sctxs = make(map[string]*serveCtx) - for _, u := range cfg.LCUrls { + for _, u := range cfg.ListenClientUrls { sctx := newServeCtx(cfg.logger) if u.Scheme == "http" || u.Scheme == "unix" { if !cfg.ClientTLSInfo.Empty() { diff --git a/etcdmain/config.go b/etcdmain/config.go index d6f0e18ec..c383b513b 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -332,10 +332,10 @@ func (cfg *config) configFromCmdLine() error { return err } - cfg.ec.LPUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") - cfg.ec.APUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls") - cfg.ec.LCUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls") - cfg.ec.ACUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls") + cfg.ec.ListenPeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") + cfg.ec.AdvertisePeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls") + cfg.ec.ListenClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls") + cfg.ec.AdvertiseClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls") cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls") cfg.ec.CORS = flags.UniqueURLsMapFromFlag(cfg.cf.flagSet, "cors") @@ -356,7 +356,7 @@ func (cfg *config) configFromCmdLine() error { // disable default advertise-client-urls if lcurls is set missingAC := flags.IsSet(cfg.cf.flagSet, "listen-client-urls") && !flags.IsSet(cfg.cf.flagSet, "advertise-client-urls") if !cfg.mayBeProxy() && missingAC { - cfg.ec.ACUrls = nil + cfg.ec.AdvertiseClientUrls = nil } // disable default initial-cluster if discovery is set diff --git a/etcdmain/config_test.go b/etcdmain/config_test.go index de646e370..6074bc60d 100644 --- a/etcdmain/config_test.go +++ b/etcdmain/config_test.go @@ -51,14 +51,14 @@ func TestConfigParsingMemberFlags(t *testing.T) { func TestConfigFileMemberFields(t *testing.T) { yc := struct { - Dir string `json:"data-dir"` - MaxSnapFiles uint `json:"max-snapshots"` - MaxWalFiles uint `json:"max-wals"` - Name string `json:"name"` - SnapshotCount uint64 `json:"snapshot-count"` - LPUrls string `json:"listen-peer-urls"` - LCUrls string `json:"listen-client-urls"` - AcurlsCfgFile string `json:"advertise-client-urls"` + Dir string `json:"data-dir"` + MaxSnapFiles uint `json:"max-snapshots"` + MaxWalFiles uint `json:"max-wals"` + Name string `json:"name"` + SnapshotCount uint64 `json:"snapshot-count"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` }{ "testdir", 10, @@ -513,13 +513,13 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File { func validateMemberFlags(t *testing.T, cfg *config) { wcfg := &embed.Config{ - Dir: "testdir", - LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, - LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, - MaxSnapFiles: 10, - MaxWalFiles: 10, - Name: "testname", - SnapshotCount: 10, + Dir: "testdir", + ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, + ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, + MaxSnapFiles: 10, + MaxWalFiles: 10, + Name: "testname", + SnapshotCount: 10, } if cfg.ec.Dir != wcfg.Dir { @@ -537,18 +537,18 @@ func validateMemberFlags(t *testing.T, cfg *config) { if cfg.ec.SnapshotCount != wcfg.SnapshotCount { t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount) } - if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) { - t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls) + if !reflect.DeepEqual(cfg.ec.ListenPeerUrls, wcfg.ListenPeerUrls) { + t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.ListenPeerUrls, wcfg.ListenPeerUrls) } - if !reflect.DeepEqual(cfg.ec.LCUrls, wcfg.LCUrls) { - t.Errorf("listen-client-urls = %v, want %v", cfg.ec.LCUrls, wcfg.LCUrls) + if !reflect.DeepEqual(cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) { + t.Errorf("listen-client-urls = %v, want %v", cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) } } func validateClusteringFlags(t *testing.T, cfg *config) { wcfg := newConfig() - wcfg.ec.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} - wcfg.ec.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} + wcfg.ec.AdvertisePeerUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} + wcfg.ec.AdvertiseClientUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} wcfg.ec.ClusterState = embed.ClusterStateFlagExisting wcfg.cf.fallback.Set(fallbackFlagExit) wcfg.ec.InitialCluster = "0=http://localhost:8000" @@ -566,11 +566,11 @@ func validateClusteringFlags(t *testing.T, cfg *config) { if cfg.ec.InitialClusterToken != wcfg.ec.InitialClusterToken { t.Errorf("initialClusterToken = %v, want %v", cfg.ec.InitialClusterToken, wcfg.ec.InitialClusterToken) } - if !reflect.DeepEqual(cfg.ec.APUrls, wcfg.ec.APUrls) { - t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.ec.APUrls, wcfg.ec.APUrls) + if !reflect.DeepEqual(cfg.ec.AdvertisePeerUrls, wcfg.ec.AdvertisePeerUrls) { + t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.ec.AdvertisePeerUrls, wcfg.ec.AdvertisePeerUrls) } - if !reflect.DeepEqual(cfg.ec.ACUrls, wcfg.ec.ACUrls) { - t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.ACUrls, wcfg.ec.ACUrls) + if !reflect.DeepEqual(cfg.ec.AdvertiseClientUrls, wcfg.ec.AdvertiseClientUrls) { + t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.AdvertiseClientUrls, wcfg.ec.AdvertiseClientUrls) } } diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 73328a73d..51696290f 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -251,7 +251,7 @@ func startEtcdOrProxyV2() { plog.Infof("forgot to set --initial-cluster flag?") } } - if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs { + if types.URLs(cfg.ec.AdvertisePeerUrls).String() == embed.DefaultInitialAdvertisePeerURLs { if lg != nil { lg.Warn("forgot to set --initial-advertise-peer-urls?") } else { @@ -507,11 +507,11 @@ func startProxy(cfg *config) error { // setup self signed certs when serving https cHosts, cTLS := []string{}, false - for _, u := range cfg.ec.LCUrls { + for _, u := range cfg.ec.ListenClientUrls { cHosts = append(cHosts, u.Host) cTLS = cTLS || u.Scheme == "https" } - for _, u := range cfg.ec.ACUrls { + for _, u := range cfg.ec.AdvertiseClientUrls { cHosts = append(cHosts, u.Host) cTLS = cTLS || u.Scheme == "https" } @@ -528,7 +528,7 @@ func startProxy(cfg *config) error { } // Start a proxy server goroutine for each listen address - for _, u := range cfg.ec.LCUrls { + for _, u := range cfg.ec.ListenClientUrls { l, err := transport.NewListener(u.Host, u.Scheme, &listenerTLS) if err != nil { return err diff --git a/integration/embed_test.go b/integration/embed_test.go index 4d483e8fa..14e950e05 100644 --- a/integration/embed_test.go +++ b/integration/embed_test.go @@ -64,7 +64,7 @@ func TestEmbedEtcd(t *testing.T) { tests[0].cfg.Durl = "abc" setupEmbedCfg(&tests[1].cfg, []url.URL{urls[0]}, []url.URL{urls[1]}) - tests[1].cfg.ACUrls = nil + tests[1].cfg.AdvertiseClientUrls = nil tests[2].cfg.TickMs = tests[2].cfg.ElectionMs - 1 tests[3].cfg.ElectionMs = 999999 setupEmbedCfg(&tests[4].cfg, []url.URL{urls[2]}, []url.URL{urls[3]}) @@ -72,8 +72,8 @@ func TestEmbedEtcd(t *testing.T) { setupEmbedCfg(&tests[6].cfg, []url.URL{urls[7], urls[8]}, []url.URL{urls[9]}) dnsURL, _ := url.Parse("http://whatever.test:12345") - tests[7].cfg.LCUrls = []url.URL{*dnsURL} - tests[8].cfg.LPUrls = []url.URL{*dnsURL} + tests[7].cfg.ListenClientUrls = []url.URL{*dnsURL} + tests[8].cfg.ListenPeerUrls = []url.URL{*dnsURL} dir := filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd")) os.RemoveAll(dir) @@ -188,8 +188,8 @@ func setupEmbedCfg(cfg *embed.Config, curls []url.URL, purls []url.URL) { cfg.Debug = false cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = curls, curls - cfg.LPUrls, cfg.APUrls = purls, purls + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls cfg.InitialCluster = "" for i := range purls { cfg.InitialCluster += ",default=" + purls[i].String() diff --git a/tools/etcd-dump-metrics/etcd.go b/tools/etcd-dump-metrics/etcd.go index ceb089a6c..00708e73c 100644 --- a/tools/etcd-dump-metrics/etcd.go +++ b/tools/etcd-dump-metrics/etcd.go @@ -52,8 +52,8 @@ func setupEmbedCfg(cfg *embed.Config, curls, purls, ics []url.URL) { os.RemoveAll(cfg.Dir) cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = curls, curls - cfg.LPUrls, cfg.APUrls = purls, purls + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls cfg.InitialCluster = "" for i := range ics { From 66704b4c596fa84d0aefdbbbacbbf2d5c744072e Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 10 Mar 2023 14:06:44 +0100 Subject: [PATCH 2/7] server: Separate client listener grouping from serving Signed-off-by: Marek Siarkowicz --- embed/etcd.go | 46 +++++++++++++++++++++++++--------------------- embed/serve.go | 6 ++++-- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/embed/etcd.go b/embed/etcd.go index 16b2d9024..6fb020bdf 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -624,7 +624,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctxs = make(map[string]*serveCtx) for _, u := range cfg.ListenClientUrls { - sctx := newServeCtx(cfg.logger) if u.Scheme == "http" || u.Scheme == "unix" { if !cfg.ClientTLSInfo.Empty() { if cfg.logger != nil { @@ -644,29 +643,35 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() { return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String()) } + } - network := "tcp" + for _, u := range cfg.ListenClientUrls { addr := u.Host + network := "tcp" if u.Scheme == "unix" || u.Scheme == "unixs" { - network = "unix" addr = u.Host + u.Path + network = "unix" } + secure := u.Scheme == "https" || u.Scheme == "unixs" + insecure := !secure + + sctx := sctxs[addr] + if sctx == nil { + sctx = newServeCtx(cfg.logger) + sctxs[addr] = sctx + } + sctx.secure = sctx.secure || secure + sctx.insecure = sctx.insecure || insecure + sctx.scheme = u.Scheme + sctx.addr = addr sctx.network = network - - sctx.secure = u.Scheme == "https" || u.Scheme == "unixs" - sctx.insecure = !sctx.secure - if oldctx := sctxs[addr]; oldctx != nil { - oldctx.secure = oldctx.secure || sctx.secure - oldctx.insecure = oldctx.insecure || sctx.insecure - continue - } - - if sctx.l, err = net.Listen(network, addr); err != nil { + } + for _, sctx := range sctxs { + if sctx.l, err = net.Listen(sctx.network, sctx.addr); err != nil { return nil, err } // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking // hosts that disable ipv6. So, use the address given by the user. - sctx.addr = addr if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil { if fdLimit <= reservedInternalFDNum { @@ -683,13 +688,13 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) } - if network == "tcp" { - if sctx.l, err = transport.NewKeepAliveListener(sctx.l, network, nil); err != nil { + if sctx.network == "tcp" { + if sctx.l, err = transport.NewKeepAliveListener(sctx.l, sctx.network, nil); err != nil { return nil, err } } - defer func() { + defer func(addr string) { if err == nil { return } @@ -697,13 +702,13 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro if cfg.logger != nil { cfg.logger.Warn( "closing peer listener", - zap.String("address", u.Host), + zap.String("address", addr), zap.Error(err), ) } else { - plog.Info("stopping listening for client requests on ", u.Host) + plog.Info("stopping listening for client requests on ", addr) } - }() + }(sctx.addr) for k := range cfg.UserHandlers { sctx.userHandlers[k] = cfg.UserHandlers[k] } @@ -714,7 +719,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro if cfg.Debug { sctx.registerTrace() } - sctxs[addr] = sctx } return sctxs, nil } diff --git a/embed/serve.go b/embed/serve.go index 639a6b2d5..8120ed76b 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -49,8 +49,10 @@ import ( ) type serveCtx struct { - lg *zap.Logger - l net.Listener + lg *zap.Logger + l net.Listener + + scheme string addr string network string secure bool From a4ac849ec118bd029a12cda0be610a183c88d3ab Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 28 Mar 2023 11:47:57 +0200 Subject: [PATCH 3/7] server: Extract resolveUrl helper function Signed-off-by: Marek Siarkowicz --- embed/etcd.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/embed/etcd.go b/embed/etcd.go index 6fb020bdf..1e19c5929 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -646,22 +646,14 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } for _, u := range cfg.ListenClientUrls { - addr := u.Host - network := "tcp" - if u.Scheme == "unix" || u.Scheme == "unixs" { - addr = u.Host + u.Path - network = "unix" - } - secure := u.Scheme == "https" || u.Scheme == "unixs" - insecure := !secure - + addr, secure, network := resolveUrl(u) sctx := sctxs[addr] if sctx == nil { sctx = newServeCtx(cfg.logger) sctxs[addr] = sctx } sctx.secure = sctx.secure || secure - sctx.insecure = sctx.insecure || insecure + sctx.insecure = sctx.insecure || !secure sctx.scheme = u.Scheme sctx.addr = addr sctx.network = network @@ -723,6 +715,17 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro return sctxs, nil } +func resolveUrl(u url.URL) (addr string, secure bool, network string) { + addr = u.Host + network = "tcp" + if u.Scheme == "unix" || u.Scheme == "unixs" { + addr = u.Host + u.Path + network = "unix" + } + secure = u.Scheme == "https" || u.Scheme == "unixs" + return addr, secure, network +} + func (e *Etcd) serveClients() (err error) { if !e.cfg.ClientTLSInfo.Empty() { if e.cfg.logger != nil { From dd0bc664788bbb8fee6577ea2f371309d2ab3e54 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 13 Mar 2023 15:46:42 +0100 Subject: [PATCH 4/7] server: Pick one address that all grpc gateways connect to Signed-off-by: Marek Siarkowicz --- embed/etcd.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++- embed/serve.go | 51 +++++++++++++++----------------------------------- 2 files changed, 63 insertions(+), 37 deletions(-) diff --git a/embed/etcd.go b/embed/etcd.go index 1e19c5929..a732ee413 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" defaultLog "log" + "math" "net" "net/http" "net/url" @@ -29,6 +30,7 @@ import ( "sync" "time" + "go.etcd.io/etcd/clientv3/credentials" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver/api/etcdhttp" "go.etcd.io/etcd/etcdserver/api/rafthttp" @@ -772,12 +774,57 @@ func (e *Etcd) serveClients() (err error) { // start client servers in each goroutine for _, sctx := range e.sctxs { go func(s *serveCtx) { - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) + e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(), gopts...)) }(sctx) } return nil } +func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { + if !e.cfg.EnableGRPCGateway { + return nil + } + sctx := e.pickGrpcGatewayServeContext() + addr := sctx.addr + if network := sctx.network; network == "unix" { + // explicitly define unix network for gRPC socket support + addr = fmt.Sprintf("%s://%s", network, addr) + } + + opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))} + if sctx.secure { + tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig() + if tlsErr != nil { + return func(ctx context.Context) (*grpc.ClientConn, error) { + return nil, tlsErr + } + } + dtls := tlscfg.Clone() + // trust local server + dtls.InsecureSkipVerify = true + bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls}) + opts = append(opts, grpc.WithTransportCredentials(bundle.TransportCredentials())) + } else { + opts = append(opts, grpc.WithInsecure()) + } + + return func(ctx context.Context) (*grpc.ClientConn, error) { + conn, err := grpc.DialContext(ctx, addr, opts...) + if err != nil { + sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err)) + return nil, err + } + return conn, err + } +} + +func (e *Etcd) pickGrpcGatewayServeContext() *serveCtx { + for _, sctx := range e.sctxs { + return sctx + } + panic("Expect at least one context able to serve grpc") +} + func (e *Etcd) serveMetrics() (err error) { if e.cfg.Metrics == "extensive" { grpc_prometheus.EnableHandlingTimeHistogram() diff --git a/embed/serve.go b/embed/serve.go index 8120ed76b..e3b27144a 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -19,12 +19,10 @@ import ( "fmt" "io/ioutil" defaultLog "log" - "math" "net" "net/http" "strings" - "go.etcd.io/etcd/clientv3/credentials" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver/api/v3client" "go.etcd.io/etcd/etcdserver/api/v3election" @@ -91,6 +89,7 @@ func (sctx *serveCtx) serve( tlsinfo *transport.TLSInfo, handler http.Handler, errHandler func(error), + grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error), gopts ...grpc.ServerOption) (err error) { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) <-s.ReadyNotify() @@ -104,6 +103,18 @@ func (sctx *serveCtx) serve( servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) + // Make sure serversC is closed even if we prematurely exit the function. + defer close(sctx.serversC) + var gwmux *gw.ServeMux + if s.Cfg.EnableGRPCGateway { + // GRPC gateway connects to grpc server via connection provided by grpc dial. + gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends) + if err != nil { + sctx.lg.Error("registerGateway failed", zap.Error(err)) + return err + } + } + if sctx.insecure { gs := v3rpc.Server(s, nil, gopts...) v3electionpb.RegisterElectionServer(gs, servElection) @@ -137,14 +148,6 @@ func (sctx *serveCtx) serve( errHandler(gs.Serve(grpcLis)) }(gs, grpcl) - var gwmux *gw.ServeMux - if s.Cfg.EnableGRPCGateway { - gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithInsecure()}) - if err != nil { - return err - } - } - httpmux := sctx.createMux(gwmux, handler) srvhttp := &http.Server{ @@ -205,20 +208,6 @@ func (sctx *serveCtx) serve( }(gs) handler = grpcHandlerFunc(gs, handler) - - var gwmux *gw.ServeMux - if s.Cfg.EnableGRPCGateway { - dtls := tlscfg.Clone() - // trust local server - dtls.InsecureSkipVerify = true - bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls}) - opts := []grpc.DialOption{grpc.WithTransportCredentials(bundle.TransportCredentials())} - gwmux, err = sctx.registerGateway(opts) - if err != nil { - return err - } - } - var tlsl net.Listener tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) if err != nil { @@ -284,20 +273,10 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error -func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) { +func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.ClientConn, error)) (*gw.ServeMux, error) { ctx := sctx.ctx - addr := sctx.addr - if network := sctx.network; network == "unix" { - // explicitly define unix network for gRPC socket support - addr = fmt.Sprintf("%s://%s", network, addr) - } - - opts = append(opts, grpc.WithDefaultCallOptions([]grpc.CallOption{ - grpc.MaxCallRecvMsgSize(math.MaxInt32), - }...)) - - conn, err := grpc.DialContext(ctx, addr, opts...) + conn, err := dial(ctx) if err != nil { return nil, err } From 8dc124417908b2d2ea2039aaf8c5175b5994d8a7 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 10 Mar 2023 17:33:46 +0100 Subject: [PATCH 5/7] server: Add --listen-client-http-urls flag to allow running grpc server separate from http server Difference in load configuration for watch delay tests show how huge the impact is. Even with random write scheduler grpc under http server can only handle 500 KB with 2 seconds delay. On the other hand, separate grpc server easily hits 10, 100 or even 1000 MB within 100 miliseconds. Priority write scheduler that was used in most previous releases is far worse than random one. Tests configured to only 5 MB to avoid flakes and taking too long to fill etcd. Signed-off-by: Marek Siarkowicz --- embed/config.go | 57 +++++++-- embed/etcd.go | 43 +++++-- embed/serve.go | 222 ++++++++++++++++++++-------------- etcdmain/config.go | 7 +- etcdmain/config_test.go | 37 +++--- etcdmain/help.go | 4 +- tests/e2e/cluster_test.go | 6 + tests/e2e/etcdctl.go | 1 + tests/e2e/utils.go | 6 +- tests/e2e/watch_delay_test.go | 69 +++++++---- 10 files changed, 299 insertions(+), 153 deletions(-) diff --git a/embed/config.go b/embed/config.go index 8755bb52e..74dfcee93 100644 --- a/embed/config.go +++ b/embed/config.go @@ -183,12 +183,12 @@ type Config struct { // streams that each client can open at a time. MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` - ListenPeerUrls, ListenClientUrls []url.URL - AdvertisePeerUrls, AdvertiseClientUrls []url.URL - ClientTLSInfo transport.TLSInfo - ClientAutoTLS bool - PeerTLSInfo transport.TLSInfo - PeerAutoTLS bool + ListenPeerUrls, ListenClientUrls, ListenClientHttpUrls []url.URL + AdvertisePeerUrls, AdvertiseClientUrls []url.URL + ClientTLSInfo transport.TLSInfo + ClientAutoTLS bool + PeerTLSInfo transport.TLSInfo + PeerAutoTLS bool // CipherSuites is a list of supported TLS cipher suites between // client/server and peers. If empty, Go auto-populates the list. @@ -373,10 +373,11 @@ type configYAML struct { // configJSON has file options that are translated into Config options type configJSON struct { - ListenPeerUrls string `json:"listen-peer-urls"` - ListenClientUrls string `json:"listen-client-urls"` - AdvertisePeerUrls string `json:"initial-advertise-peer-urls"` - AdvertiseClientUrls string `json:"advertise-client-urls"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + ListenClientHttpUrls string `json:"listen-client-http-urls"` + AdvertisePeerUrls string `json:"initial-advertise-peer-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` CORSJSON string `json:"cors"` HostWhitelistJSON string `json:"host-whitelist"` @@ -507,6 +508,15 @@ func (cfg *configYAML) configFromFile(path string) error { cfg.Config.ListenClientUrls = u } + if cfg.configJSON.ListenClientHttpUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientHttpUrls, ",")) + if err != nil { + fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-http-urls: %v\n", err) + os.Exit(1) + } + cfg.Config.ListenClientHttpUrls = u + } + if cfg.configJSON.AdvertisePeerUrls != "" { u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ",")) if err != nil { @@ -602,6 +612,16 @@ func (cfg *Config) Validate() error { if err := checkBindURLs(cfg.ListenClientUrls); err != nil { return err } + if err := checkBindURLs(cfg.ListenClientHttpUrls); err != nil { + return err + } + if len(cfg.ListenClientHttpUrls) == 0 { + if cfg.logger != nil { + cfg.logger.Warn("Running http and grpc server on single port. This is not recommended for production.") + } else { + plog.Warning("Running http and grpc server on single port. This is not recommended for production.") + } + } if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil { return err } @@ -822,9 +842,12 @@ func (cfg *Config) ClientSelfCert() (err error) { } return nil } - chosts := make([]string, len(cfg.ListenClientUrls)) - for i, u := range cfg.ListenClientUrls { - chosts[i] = u.Host + chosts := make([]string, 0, len(cfg.ListenClientUrls)+len(cfg.ListenClientHttpUrls)) + for _, u := range cfg.ListenClientUrls { + chosts = append(chosts, u.Host) + } + for _, u := range cfg.ListenClientHttpUrls { + chosts = append(chosts, u.Host) } cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts) if err != nil { @@ -963,6 +986,14 @@ func (cfg *Config) getListenClientUrls() (ss []string) { return ss } +func (cfg *Config) getListenClientHttpUrls() (ss []string) { + ss = make([]string, len(cfg.ListenClientHttpUrls)) + for i := range cfg.ListenClientHttpUrls { + ss[i] = cfg.ListenClientHttpUrls[i].String() + } + return ss +} + func (cfg *Config) getMetricsURLs() (ss []string) { ss = make([]string, len(cfg.ListenMetricsUrls)) for i := range cfg.ListenMetricsUrls { diff --git a/embed/etcd.go b/embed/etcd.go index a732ee413..528fddbcd 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -434,7 +434,7 @@ func stopServers(ctx context.Context, ss *servers) { // do not grpc.Server.GracefulStop with TLS enabled etcd server // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 // and https://github.com/etcd-io/etcd/issues/8916 - if ss.secure { + if ss.secure && ss.http != nil { shutdownNow() return } @@ -625,7 +625,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } sctxs = make(map[string]*serveCtx) - for _, u := range cfg.ListenClientUrls { + for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) { if u.Scheme == "http" || u.Scheme == "unix" { if !cfg.ClientTLSInfo.Empty() { if cfg.logger != nil { @@ -660,6 +660,24 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctx.addr = addr sctx.network = network } + for _, u := range cfg.ListenClientHttpUrls { + addr, secure, network := resolveUrl(u) + + sctx := sctxs[addr] + if sctx == nil { + sctx = newServeCtx(cfg.logger) + sctxs[addr] = sctx + } else if !sctx.httpOnly { + return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String()) + } + sctx.secure = sctx.secure || secure + sctx.insecure = sctx.insecure || !secure + sctx.scheme = u.Scheme + sctx.addr = addr + sctx.network = network + sctx.httpOnly = true + } + for _, sctx := range sctxs { if sctx.l, err = net.Listen(sctx.network, sctx.addr); err != nil { return nil, err @@ -689,7 +707,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } defer func(addr string) { - if err == nil { + if err == nil || sctx.l == nil { return } sctx.l.Close() @@ -771,20 +789,27 @@ func (e *Etcd) serveClients() (err error) { })) } + splitHttp := false + for _, sctx := range e.sctxs { + if sctx.httpOnly { + splitHttp = true + } + } + // start client servers in each goroutine for _, sctx := range e.sctxs { go func(s *serveCtx) { - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(), gopts...)) + e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...)) }(sctx) } return nil } -func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { +func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { if !e.cfg.EnableGRPCGateway { return nil } - sctx := e.pickGrpcGatewayServeContext() + sctx := e.pickGrpcGatewayServeContext(splitHttp) addr := sctx.addr if network := sctx.network; network == "unix" { // explicitly define unix network for gRPC socket support @@ -818,9 +843,11 @@ func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.Clie } } -func (e *Etcd) pickGrpcGatewayServeContext() *serveCtx { +func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx { for _, sctx := range e.sctxs { - return sctx + if !splitHttp || !sctx.httpOnly { + return sctx + } } panic("Expect at least one context able to serve grpc") } diff --git a/embed/serve.go b/embed/serve.go index e3b27144a..f117a856b 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -55,6 +55,7 @@ type serveCtx struct { network string secure bool insecure bool + httpOnly bool ctx context.Context cancel context.CancelFunc @@ -90,6 +91,7 @@ func (sctx *serveCtx) serve( handler http.Handler, errHandler func(error), grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error), + splitHttp bool, gopts ...grpc.ServerOption) (err error) { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) <-s.ReadyNotify() @@ -99,6 +101,12 @@ func (sctx *serveCtx) serve( } m := cmux.New(sctx.l) + var server func() error + onlyGRPC := splitHttp && !sctx.httpOnly + onlyHttp := splitHttp && sctx.httpOnly + grpcEnabled := !onlyHttp + httpEnabled := !onlyGRPC + v3c := v3client.New(s) servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) @@ -114,60 +122,82 @@ func (sctx *serveCtx) serve( return err } } + var traffic string + switch { + case onlyGRPC: + traffic = "grpc" + case onlyHttp: + traffic = "http" + default: + traffic = "grpc+http" + } if sctx.insecure { - gs := v3rpc.Server(s, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) + var gs *grpc.Server + var srv *http.Server + if httpEnabled { + httpmux := sctx.createMux(gwmux, handler) + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + ErrorLog: logger, // do not log user error + } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) + return err + } + } + if grpcEnabled { + gs = v3rpc.Server(s, nil, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) + } + defer func(gs *grpc.Server) { + if err == nil { + return + } + + if sctx.lg != nil { + sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) + } else { + plog.Warningf("stopping insecure grpc server due to error: %s", err) + } + + gs.Stop() + + if sctx.lg != nil { + sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) + } else { + plog.Warningf("stopped insecure grpc server due to error: %s", err) + } + }(gs) + } + if onlyGRPC { + server = func() error { + return gs.Serve(sctx.l) + } + } else { + server = m.Serve + + httpl := m.Match(cmux.HTTP1()) + go func(srvhttp *http.Server, tlsLis net.Listener) { + errHandler(srvhttp.Serve(tlsLis)) + }(srv, httpl) + + if grpcEnabled { + grpcl := m.Match(cmux.HTTP2()) + go func(gs *grpc.Server, l net.Listener) { + errHandler(gs.Serve(l)) + }(gs, grpcl) + } } - defer func(gs *grpc.Server) { - if err == nil { - return - } - - if sctx.lg != nil { - sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) - } else { - plog.Warningf("stopping insecure grpc server due to error: %s", err) - } - - gs.Stop() - - if sctx.lg != nil { - sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) - } else { - plog.Warningf("stopped insecure grpc server due to error: %s", err) - } - }(gs) - - grpcl := m.Match(cmux.HTTP2()) - go func(gs *grpc.Server, grpcLis net.Listener) { - errHandler(gs.Serve(grpcLis)) - }(gs, grpcl) - - httpmux := sctx.createMux(gwmux, handler) - - srvhttp := &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - ErrorLog: logger, // do not log user error - } - if err = configureHttpServer(srvhttp, s.Cfg); err != nil { - sctx.lg.Error("Configure http server failed", zap.Error(err)) - return err - } - httpl := m.Match(cmux.HTTP1()) - - go func(srvhttp *http.Server, httpLis net.Listener) { - errHandler(srvhttp.Serve(httpLis)) - }(srvhttp, httpl) - - sctx.serversC <- &servers{grpc: gs, http: srvhttp} + sctx.serversC <- &servers{grpc: gs, http: srv} if sctx.lg != nil { sctx.lg.Info( "serving client traffic insecurely; this is strongly discouraged!", + zap.String("traffic", traffic), zap.String("address", sctx.l.Addr().String()), ) } else { @@ -176,64 +206,77 @@ func (sctx *serveCtx) serve( } if sctx.secure { + var gs *grpc.Server + var srv *http.Server + tlscfg, tlsErr := tlsinfo.ServerConfig() if tlsErr != nil { return tlsErr } - gs := v3rpc.Server(s, tlscfg, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) - } - defer func(gs *grpc.Server) { - if err == nil { - return + if grpcEnabled { + gs = v3rpc.Server(s, tlscfg, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) } + defer func(gs *grpc.Server) { + if err == nil { + return + } - if sctx.lg != nil { - sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) - } else { - plog.Warningf("stopping secure grpc server due to error: %s", err) + if sctx.lg != nil { + sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) + } else { + plog.Warningf("stopping secure grpc server due to error: %s", err) + } + + gs.Stop() + + if sctx.lg != nil { + sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) + } else { + plog.Warningf("stopped secure grpc server due to error: %s", err) + } + }(gs) + } + if httpEnabled { + if grpcEnabled { + handler = grpcHandlerFunc(gs, handler) } + httpmux := sctx.createMux(gwmux, handler) - gs.Stop() - - if sctx.lg != nil { - sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) - } else { - plog.Warningf("stopped secure grpc server due to error: %s", err) + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + TLSConfig: tlscfg, + ErrorLog: logger, // do not log user error + } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure https server failed", zap.Error(err)) + return err } - }(gs) - - handler = grpcHandlerFunc(gs, handler) - var tlsl net.Listener - tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) - if err != nil { - return err - } - // TODO: add debug flag; enable logging when debug flag is set - httpmux := sctx.createMux(gwmux, handler) - - srv := &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - TLSConfig: tlscfg, - ErrorLog: logger, // do not log user error - } - if err = configureHttpServer(srv, s.Cfg); err != nil { - sctx.lg.Error("Configure https server failed", zap.Error(err)) - return err } - go func(srvhttp *http.Server, tlsLis net.Listener) { - errHandler(srvhttp.Serve(tlsLis)) - }(srv, tlsl) + if onlyGRPC { + server = func() error { return gs.Serve(sctx.l) } + } else { + server = m.Serve + + tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) + if err != nil { + return err + } + go func(srvhttp *http.Server, tlsl net.Listener) { + errHandler(srvhttp.Serve(tlsl)) + }(srv, tlsl) + } sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} if sctx.lg != nil { sctx.lg.Info( "serving client traffic securely", + zap.String("traffic", traffic), zap.String("address", sctx.l.Addr().String()), ) } else { @@ -241,8 +284,7 @@ func (sctx *serveCtx) serve( } } - close(sctx.serversC) - return m.Serve() + return server() } func configureHttpServer(srv *http.Server, cfg etcdserver.ServerConfig) error { diff --git a/etcdmain/config.go b/etcdmain/config.go index c383b513b..119545f24 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -141,7 +141,11 @@ func newConfig() *config { ) fs.Var( flags.NewUniqueURLsWithExceptions(embed.DefaultListenClientURLs, ""), "listen-client-urls", - "List of URLs to listen on for client traffic.", + "List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.", + ) + fs.Var( + flags.NewUniqueURLsWithExceptions("", ""), "listen-client-http-urls", + "List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.", ) fs.Var( flags.NewUniqueURLsWithExceptions("", ""), @@ -335,6 +339,7 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.ListenPeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") cfg.ec.AdvertisePeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls") cfg.ec.ListenClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls") + cfg.ec.ListenClientHttpUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-http-urls") cfg.ec.AdvertiseClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls") cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls") diff --git a/etcdmain/config_test.go b/etcdmain/config_test.go index 6074bc60d..c83de1f14 100644 --- a/etcdmain/config_test.go +++ b/etcdmain/config_test.go @@ -36,6 +36,7 @@ func TestConfigParsingMemberFlags(t *testing.T) { "-snapshot-count=10", "-listen-peer-urls=http://localhost:8000,https://localhost:8001", "-listen-client-urls=http://localhost:7000,https://localhost:7001", + "-listen-client-http-urls=http://localhost:7002,https://localhost:7003", // it should be set if -listen-client-urls is set "-advertise-client-urls=http://localhost:7000,https://localhost:7001", } @@ -51,14 +52,15 @@ func TestConfigParsingMemberFlags(t *testing.T) { func TestConfigFileMemberFields(t *testing.T) { yc := struct { - Dir string `json:"data-dir"` - MaxSnapFiles uint `json:"max-snapshots"` - MaxWalFiles uint `json:"max-wals"` - Name string `json:"name"` - SnapshotCount uint64 `json:"snapshot-count"` - ListenPeerUrls string `json:"listen-peer-urls"` - ListenClientUrls string `json:"listen-client-urls"` - AdvertiseClientUrls string `json:"advertise-client-urls"` + Dir string `json:"data-dir"` + MaxSnapFiles uint `json:"max-snapshots"` + MaxWalFiles uint `json:"max-wals"` + Name string `json:"name"` + SnapshotCount uint64 `json:"snapshot-count"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + ListenClientHttpUrls string `json:"listen-client-http-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` }{ "testdir", 10, @@ -67,6 +69,7 @@ func TestConfigFileMemberFields(t *testing.T) { 10, "http://localhost:8000,https://localhost:8001", "http://localhost:7000,https://localhost:7001", + "http://localhost:7002,https://localhost:7003", "http://localhost:7000,https://localhost:7001", } @@ -513,13 +516,14 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File { func validateMemberFlags(t *testing.T, cfg *config) { wcfg := &embed.Config{ - Dir: "testdir", - ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, - ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, - MaxSnapFiles: 10, - MaxWalFiles: 10, - Name: "testname", - SnapshotCount: 10, + Dir: "testdir", + ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, + ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, + ListenClientHttpUrls: []url.URL{{Scheme: "http", Host: "localhost:7002"}, {Scheme: "https", Host: "localhost:7003"}}, + MaxSnapFiles: 10, + MaxWalFiles: 10, + Name: "testname", + SnapshotCount: 10, } if cfg.ec.Dir != wcfg.Dir { @@ -543,6 +547,9 @@ func validateMemberFlags(t *testing.T, cfg *config) { if !reflect.DeepEqual(cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) { t.Errorf("listen-client-urls = %v, want %v", cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) } + if !reflect.DeepEqual(cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) { + t.Errorf("listen-client-http-urls = %v, want %v", cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) + } } func validateClusteringFlags(t *testing.T, cfg *config) { diff --git a/etcdmain/help.go b/etcdmain/help.go index d3bab31e0..c50ddfa16 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -62,7 +62,9 @@ Member: --listen-peer-urls 'http://localhost:2380' List of URLs to listen on for peer traffic. --listen-client-urls 'http://localhost:2379' - List of URLs to listen on for client traffic. + List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified. + --listen-client-http-urls '' + List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls. --max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `' Maximum number of snapshot files to retain (0 is unlimited). --max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `' diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 871744405..fa97442d2 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -112,6 +112,7 @@ type etcdProcessClusterConfig struct { clientTLS clientConnType clientCertAuthEnabled bool + clientHttpSeparate bool isPeerTLS bool isPeerAutoTLS bool isClientAutoTLS bool @@ -192,6 +193,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro var curls []string var curl, curltls string port := cfg.basePort + 5*i + clientHttpPort := port + 4 curlHost := fmt.Sprintf("localhost:%d", port) switch cfg.clientTLS { @@ -226,6 +228,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount), } + if cfg.clientHttpSeparate { + clientHttpUrl := url.URL{Scheme: cfg.clientScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)} + args = append(args, "--listen-client-http-urls", clientHttpUrl.String()) + } args = addV2Args(args) if cfg.forceNewCluster { args = append(args, "--force-new-cluster") diff --git a/tests/e2e/etcdctl.go b/tests/e2e/etcdctl.go index f5330fa3f..2b30fb2fb 100644 --- a/tests/e2e/etcdctl.go +++ b/tests/e2e/etcdctl.go @@ -118,6 +118,7 @@ func (ctl *Etcdctl) cmdArgs(args ...string) []string { func (ctl *Etcdctl) flags() map[string]string { fmap := make(map[string]string) if ctl.v2 { + fmap["no-sync"] = "true" if ctl.connType == clientTLS { fmap["ca-file"] = testTLSInfo.TrustedCAFile fmap["cert-file"] = testTLSInfo.CertFile diff --git a/tests/e2e/utils.go b/tests/e2e/utils.go index 543ed8bba..79baaf7a2 100644 --- a/tests/e2e/utils.go +++ b/tests/e2e/utils.go @@ -109,15 +109,17 @@ func tlsInfo(t testing.TB, connType clientConnType, isAutoTLS bool) (*transport. } } -func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error { +func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error { g := errgroup.Group{} concurrency := 10 + keyCount := 100 keysPerRoutine := keyCount / concurrency + valueSize := dbSize / keyCount for i := 0; i < concurrency; i++ { i := i g.Go(func() error { for j := 0; j < keysPerRoutine; j++ { - _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize)) + _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize))) if err != nil { return err } diff --git a/tests/e2e/watch_delay_test.go b/tests/e2e/watch_delay_test.go index 1468128b4..084eb4d77 100644 --- a/tests/e2e/watch_delay_test.go +++ b/tests/e2e/watch_delay_test.go @@ -35,29 +35,48 @@ import ( const ( watchResponsePeriod = 100 * time.Millisecond watchTestDuration = 5 * time.Second - // TODO: Reduce maxWatchDelay when https://github.com/etcd-io/etcd/issues/15402 is addressed. - maxWatchDelay = 2 * time.Second - // Configure enough read load to cause starvation from https://github.com/etcd-io/etcd/issues/15402. - // Tweaked to pass on GitHub runner. For local runs please increase parameters. - // TODO: Increase when https://github.com/etcd-io/etcd/issues/15402 is fully addressed. - numberOfPreexistingKeys = 100 - sizeOfPreexistingValues = 5000 - readLoadConcurrency = 10 + readLoadConcurrency = 10 ) type testCase struct { - name string - config etcdProcessClusterConfig + name string + config etcdProcessClusterConfig + maxWatchDelay time.Duration + dbSizeBytes int } +const ( + Kilo = 1000 + Mega = 1000 * Kilo +) + +// 10 MB is not a bottleneck of grpc server, but filling up etcd with data. +// Keeping it lower so tests don't take too long. +// If we implement reuse of db we could increase the dbSize. var tcs = []testCase{ { - name: "NoTLS", - config: etcdProcessClusterConfig{clusterSize: 1}, + name: "NoTLS", + config: etcdProcessClusterConfig{clusterSize: 1}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, }, { - name: "ClientTLS", - config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS}, + name: "TLS", + config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS}, + maxWatchDelay: 2 * time.Second, + dbSizeBytes: 500 * Kilo, + }, + { + name: "SeparateHttpNoTLS", + config: etcdProcessClusterConfig{clusterSize: 1, clientHttpSeparate: true}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, + }, + { + name: "SeparateHttpTLS", + config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS, clientHttpSeparate: true}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, }, } @@ -71,13 +90,13 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() g := errgroup.Group{} continuouslyExecuteGetAll(ctx, t, &g, c) - validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify())) + validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } @@ -91,7 +110,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() @@ -109,7 +128,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) { time.Sleep(watchResponsePeriod) } }) - validateWatchDelay(t, c.Watch(ctx, "fake-key")) + validateWatchDelay(t, c.Watch(ctx, "fake-key"), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } @@ -123,7 +142,7 @@ func TestWatchDelayForEvent(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() @@ -142,13 +161,13 @@ func TestWatchDelayForEvent(t *testing.T) { } }) continuouslyExecuteGetAll(ctx, t, &g, c) - validateWatchDelay(t, c.Watch(ctx, "key")) + validateWatchDelay(t, c.Watch(ctx, "key"), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } } -func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) { +func validateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) { start := time.Now() var maxDelay time.Duration for range watch { @@ -179,15 +198,19 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr for i := 0; i < readLoadConcurrency; i++ { g.Go(func() error { for { - _, err := c.Get(ctx, "", clientv3.WithPrefix()) + resp, err := c.Get(ctx, "", clientv3.WithPrefix()) if err != nil { if strings.Contains(err.Error(), "context deadline exceeded") { return nil } return err } + respSize := 0 + for _, kv := range resp.Kvs { + respSize += kv.Size() + } mux.Lock() - size += numberOfPreexistingKeys * sizeOfPreexistingValues + size += respSize mux.Unlock() } }) From 75675cd4640cc4f9c073a7e9631ffaf66c3132f7 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 27 Mar 2023 13:35:29 +0200 Subject: [PATCH 6/7] tests: Test separate http port connection multiplexing Signed-off-by: Marek Siarkowicz --- tests/e2e/cluster_proxy_test.go | 6 ++-- tests/e2e/cluster_test.go | 54 ++++++++++++++++++++------------- tests/e2e/cmux_test.go | 45 +++++++++++++++++---------- tests/e2e/etcd_process.go | 18 ++++++++--- 4 files changed, 80 insertions(+), 43 deletions(-) diff --git a/tests/e2e/cluster_proxy_test.go b/tests/e2e/cluster_proxy_test.go index e44312365..efb05f71e 100644 --- a/tests/e2e/cluster_proxy_test.go +++ b/tests/e2e/cluster_proxy_test.go @@ -54,8 +54,10 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() } -func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() } -func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() } +func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() } +func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() } +func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() } +func (p *proxyEtcdProcess) EndpointsGRPC() []string { return p.proxyV3.endpoints() } func (p *proxyEtcdProcess) EndpointsMetrics() []string { panic("not implemented; proxy doesn't provide health information") } diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index fa97442d2..dcbc540b3 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -191,19 +191,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro initialCluster := make([]string, cfg.clusterSize) for i := 0; i < cfg.clusterSize; i++ { var curls []string - var curl, curltls string + var curl string port := cfg.basePort + 5*i + clientPort := port clientHttpPort := port + 4 - curlHost := fmt.Sprintf("localhost:%d", port) - switch cfg.clientTLS { - case clientNonTLS, clientTLS: - curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String() + if cfg.clientTLS == clientTLSAndNonTLS { + curl = clientURL(clientPort, clientNonTLS) + curls = []string{curl, clientURL(clientPort, clientTLS)} + } else { + curl = clientURL(clientPort, cfg.clientTLS) curls = []string{curl} - case clientTLSAndNonTLS: - curl = (&url.URL{Scheme: "http", Host: curlHost}).String() - curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() - curls = []string{curl, curltls} } purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} @@ -228,9 +226,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount), } + var clientHttpUrl string if cfg.clientHttpSeparate { - clientHttpUrl := url.URL{Scheme: cfg.clientScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)} - args = append(args, "--listen-client-http-urls", clientHttpUrl.String()) + clientHttpUrl = clientURL(clientHttpPort, cfg.clientTLS) + args = append(args, "--listen-client-http-urls", clientHttpUrl) } args = addV2Args(args) if cfg.forceNewCluster { @@ -274,16 +273,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro } etcdCfgs[i] = &etcdServerProcessConfig{ - execPath: cfg.execPath, - args: args, - tlsArgs: cfg.tlsArgs(), - dataDirPath: dataDirPath, - keepDataDir: cfg.keepDataDir, - name: name, - purl: purl, - acurl: curl, - murl: murl, - initialToken: cfg.initialToken, + execPath: cfg.execPath, + args: args, + tlsArgs: cfg.tlsArgs(), + dataDirPath: dataDirPath, + keepDataDir: cfg.keepDataDir, + name: name, + purl: purl, + acurl: curl, + murl: murl, + initialToken: cfg.initialToken, + clientHttpUrl: clientHttpUrl, } } @@ -296,6 +296,18 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro return etcdCfgs } +func clientURL(port int, connType clientConnType) string { + curlHost := fmt.Sprintf("localhost:%d", port) + switch connType { + case clientNonTLS: + return (&url.URL{Scheme: "http", Host: curlHost}).String() + case clientTLS: + return (&url.URL{Scheme: "https", Host: curlHost}).String() + default: + panic(fmt.Sprintf("Unsupported connection type %v", connType)) + } +} + func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { if cfg.clientTLS != clientNonTLS { if cfg.isClientAutoTLS { diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go index 471c36d06..df9ee3ac1 100644 --- a/tests/e2e/cmux_test.go +++ b/tests/e2e/cmux_test.go @@ -34,8 +34,9 @@ import ( func TestConnectionMultiplexing(t *testing.T) { defer testutil.AfterTest(t) for _, tc := range []struct { - name string - serverTLS clientConnType + name string + serverTLS clientConnType + separateHttpPort bool }{ { name: "ServerTLS", @@ -49,10 +50,20 @@ func TestConnectionMultiplexing(t *testing.T) { name: "ServerTLSAndNonTLS", serverTLS: clientTLSAndNonTLS, }, + { + name: "SeparateHTTP/ServerTLS", + serverTLS: clientTLS, + separateHttpPort: true, + }, + { + name: "SeparateHTTP/ServerNonTLS", + serverTLS: clientNonTLS, + separateHttpPort: true, + }, } { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() - cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true} + cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true, clientHttpSeparate: tc.separateHttpPort} clus, err := newEtcdProcessCluster(&cfg) require.NoError(t, err) defer clus.Close() @@ -73,43 +84,45 @@ func TestConnectionMultiplexing(t *testing.T) { name = "ClientTLS" } t.Run(name, func(t *testing.T) { - testConnectionMultiplexing(ctx, t, clus.EndpointsV3()[0], connType) + testConnectionMultiplexing(ctx, t, clus.procs[0], connType) }) } }) } - } -func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint string, connType clientConnType) { +func testConnectionMultiplexing(ctx context.Context, t *testing.T, member etcdProcess, connType clientConnType) { + httpEndpoint := member.EndpointsHTTP()[0] + grpcEndpoint := member.EndpointsGRPC()[0] switch connType { case clientTLS: - endpoint = toTLS(endpoint) + httpEndpoint = toTLS(httpEndpoint) + grpcEndpoint = toTLS(grpcEndpoint) case clientNonTLS: default: panic(fmt.Sprintf("Unsupported conn type %v", connType)) } t.Run("etcdctl", func(t *testing.T) { t.Run("v2", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{endpoint}, connType, false, true) + etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true) err := etcdctl.Set("a", "1") assert.NoError(t, err) }) t.Run("v3", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{endpoint}, connType, false, false) + etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false) err := etcdctl.Put("a", "1") assert.NoError(t, err) }) }) t.Run("clientv2", func(t *testing.T) { - c, err := newClientV2(t, []string{endpoint}, connType, false) + c, err := newClientV2(t, []string{httpEndpoint}, connType, false) require.NoError(t, err) kv := clientv2.NewKeysAPI(c) _, err = kv.Set(ctx, "a", "1", nil) assert.NoError(t, err) }) t.Run("clientv3", func(t *testing.T) { - c := newClient(t, []string{endpoint}, connType, false) + c := newClient(t, []string{grpcEndpoint}, connType, false) _, err := c.Get(ctx, "a") assert.NoError(t, err) }) @@ -120,11 +133,11 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint stri tname = "default" } t.Run(tname, func(t *testing.T) { - assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType)) - assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType)) - assert.NoError(t, fetchVersion(endpoint, httpVersion, connType)) - assert.NoError(t, fetchHealth(endpoint, httpVersion, connType)) - assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType)) + assert.NoError(t, fetchGrpcGateway(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchMetrics(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchVersion(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchHealth(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchDebugVars(httpEndpoint, httpVersion, connType)) }) } }) diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index 3ff7022cf..bdbcc2092 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -33,6 +33,8 @@ var ( type etcdProcess interface { EndpointsV2() []string EndpointsV3() []string + EndpointsGRPC() []string + EndpointsHTTP() []string EndpointsMetrics() []string Start() error @@ -61,8 +63,9 @@ type etcdServerProcessConfig struct { purl url.URL - acurl string - murl string + acurl string + murl string + clientHttpUrl string initialToken string initialCluster string @@ -80,8 +83,15 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil } -func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} } -func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() } +func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() } +func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsGRPC() } +func (ep *etcdServerProcess) EndpointsGRPC() []string { return []string{ep.cfg.acurl} } +func (ep *etcdServerProcess) EndpointsHTTP() []string { + if ep.cfg.clientHttpUrl == "" { + return []string{ep.cfg.acurl} + } + return []string{ep.cfg.clientHttpUrl} +} func (ep *etcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.murl} } func (ep *etcdServerProcess) Start() error { From 47d4ff2e3644d2fd68b5626460db19b617d8478a Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 30 Mar 2023 13:37:19 +0200 Subject: [PATCH 7/7] server: Fix defer function closure escape Signed-off-by: Marek Siarkowicz --- embed/etcd.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/embed/etcd.go b/embed/etcd.go index 528fddbcd..a44ba5e28 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -706,7 +706,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } } - defer func(addr string) { + defer func(sctx *serveCtx) { if err == nil || sctx.l == nil { return } @@ -714,13 +714,13 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro if cfg.logger != nil { cfg.logger.Warn( "closing peer listener", - zap.String("address", addr), + zap.String("address", sctx.addr), zap.Error(err), ) } else { - plog.Info("stopping listening for client requests on ", addr) + plog.Info("stopping listening for client requests on ", sctx.addr) } - }(sctx.addr) + }(sctx) for k := range cfg.UserHandlers { sctx.userHandlers[k] = cfg.UserHandlers[k] }