From 1e6163ba2718b052da8209ccebbe355885e989e7 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Thu, 12 May 2022 13:23:40 +0800 Subject: [PATCH] remove v2 http proxy in 3.6 --- server/etcdmain/config.go | 89 +-------- server/etcdmain/config_test.go | 133 ------------- server/etcdmain/etcd.go | 226 +-------------------- server/etcdmain/help.go | 20 +- server/proxy/httpproxy/director.go | 197 ------------------- server/proxy/httpproxy/director_test.go | 99 ---------- server/proxy/httpproxy/doc.go | 18 -- server/proxy/httpproxy/metrics.go | 90 --------- server/proxy/httpproxy/proxy.go | 121 ------------ server/proxy/httpproxy/proxy_test.go | 103 ---------- server/proxy/httpproxy/reverse.go | 226 --------------------- server/proxy/httpproxy/reverse_test.go | 249 ------------------------ tests/framework/e2e/cluster_proxy.go | 47 +---- 13 files changed, 19 insertions(+), 1599 deletions(-) delete mode 100644 server/proxy/httpproxy/director.go delete mode 100644 server/proxy/httpproxy/director_test.go delete mode 100644 server/proxy/httpproxy/doc.go delete mode 100644 server/proxy/httpproxy/metrics.go delete mode 100644 server/proxy/httpproxy/proxy.go delete mode 100644 server/proxy/httpproxy/proxy_test.go delete mode 100644 server/proxy/httpproxy/reverse.go delete mode 100644 server/proxy/httpproxy/reverse_test.go diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 2206ce001..8091589df 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -19,7 +19,6 @@ package etcdmain import ( "flag" "fmt" - "log" "os" "runtime" @@ -32,14 +31,9 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.uber.org/zap" - "sigs.k8s.io/yaml" ) var ( - proxyFlagOff = "off" - proxyFlagReadonly = "readonly" - proxyFlagOn = "on" - fallbackFlagExit = "exit" fallbackFlagProxy = "proxy" @@ -63,22 +57,9 @@ var ( } ) -type configProxy struct { - ProxyFailureWaitMs uint `json:"proxy-failure-wait"` - ProxyRefreshIntervalMs uint `json:"proxy-refresh-interval"` - ProxyDialTimeoutMs uint `json:"proxy-dial-timeout"` - ProxyWriteTimeoutMs uint `json:"proxy-write-timeout"` - ProxyReadTimeoutMs uint `json:"proxy-read-timeout"` - Fallback string - Proxy string - ProxyJSON string `json:"proxy"` - FallbackJSON string `json:"discovery-fallback"` -} - // config holds the config for a command line invocation of etcd type config struct { ec embed.Config - cp configProxy cf configFlags configFile string printVersion bool @@ -90,20 +71,12 @@ type configFlags struct { flagSet *flag.FlagSet clusterState *flags.SelectiveStringValue fallback *flags.SelectiveStringValue - proxy *flags.SelectiveStringValue v2deprecation *flags.SelectiveStringsValue } func newConfig() *config { cfg := &config{ - ec: *embed.NewConfig(), - cp: configProxy{ - Proxy: proxyFlagOff, - ProxyFailureWaitMs: 5000, - ProxyRefreshIntervalMs: 30000, - ProxyDialTimeoutMs: 1000, - ProxyWriteTimeoutMs: 5000, - }, + ec: *embed.NewConfig(), ignored: ignored, } cfg.cf = configFlags{ @@ -113,13 +86,8 @@ func newConfig() *config { embed.ClusterStateFlagExisting, ), fallback: flags.NewSelectiveStringValue( - fallbackFlagProxy, fallbackFlagExit, - ), - proxy: flags.NewSelectiveStringValue( - proxyFlagOff, - proxyFlagReadonly, - proxyFlagOn, + fallbackFlagProxy, ), v2deprecation: flags.NewSelectiveStringsValue( string(cconfig.V2_DEPR_1_WRITE_ONLY), @@ -218,15 +186,7 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.PreVote, "pre-vote", cfg.ec.PreVote, "Enable to run an additional Raft election phase.") - fs.Var(cfg.cf.v2deprecation, "v2-deprecation", fmt.Sprintf("v2store deprecation stage: %q. ", cfg.cf.proxy.Valids())) - - // proxy - fs.Var(cfg.cf.proxy, "proxy", fmt.Sprintf("Valid values include %q", cfg.cf.proxy.Valids())) - fs.UintVar(&cfg.cp.ProxyFailureWaitMs, "proxy-failure-wait", cfg.cp.ProxyFailureWaitMs, "Time (in milliseconds) an endpoint will be held in a failed state.") - fs.UintVar(&cfg.cp.ProxyRefreshIntervalMs, "proxy-refresh-interval", cfg.cp.ProxyRefreshIntervalMs, "Time (in milliseconds) of the endpoints refresh interval.") - fs.UintVar(&cfg.cp.ProxyDialTimeoutMs, "proxy-dial-timeout", cfg.cp.ProxyDialTimeoutMs, "Time (in milliseconds) for a dial to timeout.") - fs.UintVar(&cfg.cp.ProxyWriteTimeoutMs, "proxy-write-timeout", cfg.cp.ProxyWriteTimeoutMs, "Time (in milliseconds) for a write to timeout.") - fs.UintVar(&cfg.cp.ProxyReadTimeoutMs, "proxy-read-timeout", cfg.cp.ProxyReadTimeoutMs, "Time (in milliseconds) for a read to timeout.") + fs.Var(cfg.cf.v2deprecation, "v2-deprecation", fmt.Sprintf("v2store deprecation stage: %q. ", cfg.cf.v2deprecation.Valids())) // security fs.StringVar(&cfg.ec.ClientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.") @@ -423,14 +383,12 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs") cfg.ec.ClusterState = cfg.cf.clusterState.String() - cfg.cp.Fallback = cfg.cf.fallback.String() - cfg.cp.Proxy = cfg.cf.proxy.String() cfg.ec.V2Deprecation = cconfig.V2DeprecationEnum(cfg.cf.v2deprecation.String()) // disable default advertise-client-urls if lcurls is set missingAC := flags.IsSet(cfg.cf.flagSet, "listen-client-urls") && !flags.IsSet(cfg.cf.flagSet, "advertise-client-urls") - if !cfg.mayBeProxy() && missingAC { + if missingAC { cfg.ec.ACUrls = nil } @@ -449,45 +407,12 @@ func (cfg *config) configFromFile(path string) error { } cfg.ec = *eCfg - // load extra config information - b, rerr := os.ReadFile(path) - if rerr != nil { - return rerr - } - if yerr := yaml.Unmarshal(b, &cfg.cp); yerr != nil { - return yerr - } - - if cfg.cp.FallbackJSON != "" { - if err := cfg.cf.fallback.Set(cfg.cp.FallbackJSON); err != nil { - log.Fatalf("unexpected error setting up discovery-fallback flag: %v", err) - } - cfg.cp.Fallback = cfg.cf.fallback.String() - } - - if cfg.cp.ProxyJSON != "" { - if err := cfg.cf.proxy.Set(cfg.cp.ProxyJSON); err != nil { - log.Fatalf("unexpected error setting up proxyFlag: %v", err) - } - cfg.cp.Proxy = cfg.cf.proxy.String() - } return nil } -func (cfg *config) mayBeProxy() bool { - mayFallbackToProxy := cfg.ec.Durl != "" && cfg.cp.Fallback == fallbackFlagProxy - return cfg.cp.Proxy != proxyFlagOff || mayFallbackToProxy -} - func (cfg *config) validate() error { - err := cfg.ec.Validate() - // TODO(yichengq): check this for joining through discovery service case - if err == embed.ErrUnsetAdvertiseClientURLsFlag && cfg.mayBeProxy() { - return nil + if cfg.cf.fallback.String() == fallbackFlagProxy { + return fmt.Errorf("v2 proxy is deprecated, and --discovery-fallback can't be configured as %q", fallbackFlagProxy) } - return err + return cfg.ec.Validate() } - -func (cfg config) isProxy() bool { return cfg.cf.proxy.String() != proxyFlagOff } -func (cfg config) isReadonlyProxy() bool { return cfg.cf.proxy.String() == proxyFlagReadonly } -func (cfg config) shouldFallbackToProxy() bool { return cfg.cf.fallback.String() == fallbackFlagProxy } diff --git a/server/etcdmain/config_test.go b/server/etcdmain/config_test.go index 61fac94ad..f06fc4cb6 100644 --- a/server/etcdmain/config_test.go +++ b/server/etcdmain/config_test.go @@ -94,7 +94,6 @@ func TestConfigParsingClusteringFlags(t *testing.T) { "-initial-cluster-token=etcdtest", "-initial-advertise-peer-urls=http://localhost:8000,https://localhost:8001", "-advertise-client-urls=http://localhost:7000,https://localhost:7001", - "-discovery-fallback=exit", } cfg := newConfig() @@ -112,14 +111,12 @@ func TestConfigFileClusteringFields(t *testing.T) { InitialClusterToken string `json:"initial-cluster-token"` Apurls string `json:"initial-advertise-peer-urls"` Acurls string `json:"advertise-client-urls"` - Fallback string `json:"discovery-fallback"` }{ "0=http://localhost:8000", "existing", "etcdtest", "http://localhost:8000,https://localhost:8001", "http://localhost:7000,https://localhost:7001", - "exit", } b, err := yaml.Marshal(&yc) @@ -193,44 +190,6 @@ func TestConfigFileClusteringFlags(t *testing.T) { } } -func TestConfigParsingOtherFlags(t *testing.T) { - args := []string{"-proxy=readonly"} - - cfg := newConfig() - err := cfg.parse(args) - if err != nil { - t.Fatal(err) - } - - validateOtherFlags(t, cfg) -} - -func TestConfigFileOtherFields(t *testing.T) { - yc := struct { - ProxyCfgFile string `json:"proxy"` - }{ - "readonly", - } - - b, err := yaml.Marshal(&yc) - if err != nil { - t.Fatal(err) - } - - tmpfile := mustCreateCfgFile(t, b) - defer os.Remove(tmpfile.Name()) - - args := []string{fmt.Sprintf("--config-file=%s", tmpfile.Name())} - - cfg := newConfig() - err = cfg.parse(args) - if err != nil { - t.Fatal(err) - } - - validateOtherFlags(t, cfg) -} - func TestConfigParsingConflictClusteringFlags(t *testing.T) { conflictArgs := [][]string{ { @@ -336,27 +295,6 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) { }, embed.ErrUnsetAdvertiseClientURLsFlag, }, - { - []string{ - "-discovery=http://example.com/abc", - "-listen-client-urls=http://127.0.0.1:2379", - }, - nil, - }, - { - []string{ - "-proxy=on", - "-listen-client-urls=http://127.0.0.1:2379", - }, - nil, - }, - { - []string{ - "-proxy=readonly", - "-listen-client-urls=http://127.0.0.1:2379", - }, - nil, - }, } for i, tt := range tests { @@ -387,65 +325,6 @@ func TestConfigIsNewCluster(t *testing.T) { } } -func TestConfigIsProxy(t *testing.T) { - tests := []struct { - proxy string - wIsProxy bool - }{ - {proxyFlagOff, false}, - {proxyFlagReadonly, true}, - {proxyFlagOn, true}, - } - for i, tt := range tests { - cfg := newConfig() - if err := cfg.cf.proxy.Set(tt.proxy); err != nil { - t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err) - } - if g := cfg.isProxy(); g != tt.wIsProxy { - t.Errorf("#%d: isProxy = %v, want %v", i, g, tt.wIsProxy) - } - } -} - -func TestConfigIsReadonlyProxy(t *testing.T) { - tests := []struct { - proxy string - wIsReadonly bool - }{ - {proxyFlagOff, false}, - {proxyFlagReadonly, true}, - {proxyFlagOn, false}, - } - for i, tt := range tests { - cfg := newConfig() - if err := cfg.cf.proxy.Set(tt.proxy); err != nil { - t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err) - } - if g := cfg.isReadonlyProxy(); g != tt.wIsReadonly { - t.Errorf("#%d: isReadonlyProxy = %v, want %v", i, g, tt.wIsReadonly) - } - } -} - -func TestConfigShouldFallbackToProxy(t *testing.T) { - tests := []struct { - fallback string - wFallback bool - }{ - {fallbackFlagProxy, true}, - {fallbackFlagExit, false}, - } - for i, tt := range tests { - cfg := newConfig() - if err := cfg.cf.fallback.Set(tt.fallback); err != nil { - t.Fatalf("#%d: unexpected fallback.Set error: %v", i, err) - } - if g := cfg.shouldFallbackToProxy(); g != tt.wFallback { - t.Errorf("#%d: shouldFallbackToProxy = %v, want %v", i, g, tt.wFallback) - } - } -} - func TestConfigFileElectionTimeout(t *testing.T) { tests := []struct { TickMs uint `json:"heartbeat-interval"` @@ -549,16 +428,12 @@ func validateClusteringFlags(t *testing.T, cfg *config) { wcfg.ec.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} wcfg.ec.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} wcfg.ec.ClusterState = embed.ClusterStateFlagExisting - wcfg.cf.fallback.Set(fallbackFlagExit) wcfg.ec.InitialCluster = "0=http://localhost:8000" wcfg.ec.InitialClusterToken = "etcdtest" if cfg.ec.ClusterState != wcfg.ec.ClusterState { t.Errorf("clusterState = %v, want %v", cfg.ec.ClusterState, wcfg.ec.ClusterState) } - if cfg.cf.fallback.String() != wcfg.cf.fallback.String() { - t.Errorf("fallback = %v, want %v", cfg.cf.fallback, wcfg.cf.fallback) - } if cfg.ec.InitialCluster != wcfg.ec.InitialCluster { t.Errorf("initialCluster = %v, want %v", cfg.ec.InitialCluster, wcfg.ec.InitialCluster) } @@ -572,11 +447,3 @@ func validateClusteringFlags(t *testing.T, cfg *config) { t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.ACUrls, wcfg.ec.ACUrls) } } - -func validateOtherFlags(t *testing.T, cfg *config) { - wcfg := newConfig() - wcfg.cf.proxy.Set(proxyFlagReadonly) - if cfg.cf.proxy.String() != wcfg.cf.proxy.String() { - t.Errorf("proxy = %v, want %v", cfg.cf.proxy, wcfg.cf.proxy) - } -} diff --git a/server/etcdmain/etcd.go b/server/etcdmain/etcd.go index eb50558ff..40be3188d 100644 --- a/server/etcdmain/etcd.go +++ b/server/etcdmain/etcd.go @@ -15,28 +15,18 @@ package etcdmain import ( - "encoding/json" "fmt" - "net/http" "os" - "path/filepath" - "reflect" "runtime" "strings" - "time" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/logutil" - "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" - pkgioutil "go.etcd.io/etcd/pkg/v3/ioutil" "go.etcd.io/etcd/pkg/v3/osutil" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver" - "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp" "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery" - "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" - "go.etcd.io/etcd/server/v3/proxy/httpproxy" "go.uber.org/zap" "google.golang.org/grpc" @@ -122,7 +112,7 @@ func startEtcdOrProxyV2(args []string) { case dirMember: stopped, errc, err = startEtcd(&cfg.ec) case dirProxy: - err = startProxy(cfg) + lg.Panic("v2 http proxy has already been deprecated in 3.6", zap.String("dir-type", string(which))) default: lg.Panic( "unknown directory type", @@ -130,24 +120,9 @@ func startEtcdOrProxyV2(args []string) { ) } } else { - shouldProxy := cfg.isProxy() - if !shouldProxy { - stopped, errc, err = startEtcd(&cfg.ec) - if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == v2discovery.ErrFullCluster { - if cfg.shouldFallbackToProxy() { - lg.Warn( - "discovery cluster is full, falling back to proxy", - zap.String("fallback-proxy", fallbackFlagProxy), - zap.Error(err), - ) - shouldProxy = true - } - } else if err != nil { - lg.Warn("failed to start etcd", zap.Error(err)) - } - } - if shouldProxy { - err = startProxy(cfg) + stopped, errc, err = startEtcd(&cfg.ec) + if err != nil { + lg.Warn("failed to start etcd", zap.Error(err)) } } @@ -237,199 +212,6 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) { return e.Server.StopNotify(), e.Err(), nil } -// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. -func startProxy(cfg *config) error { - lg := cfg.ec.GetLogger() - lg.Info("v2 API proxy starting") - - clientTLSInfo := cfg.ec.ClientTLSInfo - if clientTLSInfo.Empty() { - // Support old proxy behavior of defaulting to PeerTLSInfo - // for both client and peer connections. - clientTLSInfo = cfg.ec.PeerTLSInfo - } - clientTLSInfo.InsecureSkipVerify = cfg.ec.ClientAutoTLS - cfg.ec.PeerTLSInfo.InsecureSkipVerify = cfg.ec.PeerAutoTLS - - pt, err := transport.NewTimeoutTransport( - clientTLSInfo, - time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, - time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, - time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond, - ) - if err != nil { - return err - } - pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost - - if err = cfg.ec.PeerSelfCert(); err != nil { - lg.Fatal("failed to get self-signed certs for peer", zap.Error(err)) - } - tr, err := transport.NewTimeoutTransport( - cfg.ec.PeerTLSInfo, - time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, - time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, - time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond, - ) - if err != nil { - return err - } - - cfg.ec.Dir = filepath.Join(cfg.ec.Dir, "proxy") - err = fileutil.TouchDirAll(lg, cfg.ec.Dir) - if err != nil { - return err - } - - var peerURLs []string - clusterfile := filepath.Join(cfg.ec.Dir, "cluster") - - b, err := os.ReadFile(clusterfile) - switch { - case err == nil: - if cfg.ec.Durl != "" || len(cfg.ec.DiscoveryCfg.Endpoints) > 0 { - lg.Warn( - "discovery token ignored since the proxy has already been initialized; valid cluster file found", - zap.String("cluster-file", clusterfile), - ) - } - if cfg.ec.DNSCluster != "" { - lg.Warn( - "DNS SRV discovery ignored since the proxy has already been initialized; valid cluster file found", - zap.String("cluster-file", clusterfile), - ) - } - urls := struct{ PeerURLs []string }{} - err = json.Unmarshal(b, &urls) - if err != nil { - return err - } - peerURLs = urls.PeerURLs - lg.Info( - "proxy using peer URLS from cluster file", - zap.Strings("peer-urls", peerURLs), - zap.String("cluster-file", clusterfile), - ) - - case os.IsNotExist(err): - var urlsmap types.URLsMap - urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy") - if err != nil { - return fmt.Errorf("error setting up initial cluster: %v", err) - } - - var s string - if cfg.ec.Durl != "" { - lg.Warn("V2 discovery is deprecated!") - s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy) - } else if len(cfg.ec.DiscoveryCfg.Endpoints) > 0 { - s, err = v3discovery.GetCluster(lg, &cfg.ec.DiscoveryCfg) - } - if err != nil { - return err - } - if s != "" { - if urlsmap, err = types.NewURLsMap(s); err != nil { - return err - } - } - - peerURLs = urlsmap.URLs() - lg.Info("proxy using peer URLS", zap.Strings("peer-urls", peerURLs)) - - default: - return err - } - - clientURLs := []string{} - uf := func() []string { - gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr) - if gerr != nil { - lg.Warn( - "failed to get cluster from remote peers", - zap.Strings("peer-urls", peerURLs), - zap.Error(gerr), - ) - return []string{} - } - - clientURLs = gcls.ClientURLs() - urls := struct{ PeerURLs []string }{gcls.PeerURLs()} - b, jerr := json.Marshal(urls) - if jerr != nil { - lg.Warn("proxy failed to marshal peer URLs", zap.Error(jerr)) - return clientURLs - } - - err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600) - if err != nil { - lg.Warn("proxy failed to write cluster file", zap.Error(err)) - return clientURLs - } - err = os.Rename(clusterfile+".bak", clusterfile) - if err != nil { - lg.Warn( - "proxy failed to rename cluster file", - zap.String("path", clusterfile), - zap.Error(err), - ) - return clientURLs - } - if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) { - lg.Info( - "proxy updated peer URLs", - zap.Strings("from", peerURLs), - zap.Strings("to", gcls.PeerURLs()), - ) - } - peerURLs = gcls.PeerURLs() - - return clientURLs - } - ph := httpproxy.NewHandler(lg, pt, uf, time.Duration(cfg.cp.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.cp.ProxyRefreshIntervalMs)*time.Millisecond) - ph = embed.WrapCORS(cfg.ec.CORS, ph) - - if cfg.isReadonlyProxy() { - ph = httpproxy.NewReadonlyHandler(ph) - } - - // setup self signed certs when serving https - cHosts, cTLS := []string{}, false - for _, u := range cfg.ec.LCUrls { - cHosts = append(cHosts, u.Host) - cTLS = cTLS || u.Scheme == "https" - } - for _, u := range cfg.ec.ACUrls { - cHosts = append(cHosts, u.Host) - cTLS = cTLS || u.Scheme == "https" - } - listenerTLS := cfg.ec.ClientTLSInfo - if cfg.ec.ClientAutoTLS && cTLS { - listenerTLS, err = transport.SelfCert(cfg.ec.GetLogger(), filepath.Join(cfg.ec.Dir, "clientCerts"), cHosts, cfg.ec.SelfSignedCertValidity) - if err != nil { - lg.Fatal("failed to initialize self-signed client cert", zap.Error(err)) - } - } - - // Start a proxy server goroutine for each listen address - for _, u := range cfg.ec.LCUrls { - l, err := transport.NewListener(u.Host, u.Scheme, &listenerTLS) - if err != nil { - return err - } - - host := u.String() - go func() { - lg.Info("v2 proxy started listening on client requests", zap.String("host", host)) - mux := http.NewServeMux() - etcdhttp.HandleMetrics(mux) // v2 proxy just uses the same port - mux.Handle("/", ph) - lg.Fatal("done serving", zap.Error(http.Serve(l, mux))) - }() - } - return nil -} - // identifyDataDirOrDie returns the type of the data dir. // Dies if the datadir is invalid. func identifyDataDirOrDie(lg *zap.Logger, dir string) dirType { diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 6600a4a42..efa2a77e8 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -1,4 +1,5 @@ // Copyright 2015 The etcd Authors +// Copyright 2015 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -132,9 +133,8 @@ Clustering: V3 discovery: username[:password] for authentication (prompt if password is not supplied). --discovery-password '' V3 discovery: password for authentication (if this option is used, --user option shouldn't include password). - --discovery-fallback 'proxy' - Expected behavior ('exit' or 'proxy') when discovery services fails. - "proxy" supports v2 API only. + --discovery-fallback 'exit' + Expected behavior ('exit') when discovery services fails. Note that v2 proxy is removed. --discovery-proxy '' HTTP proxy to use for traffic to discovery service. Will be deprecated in v3.7, and be decommissioned in v3.8. --discovery-srv '' @@ -239,20 +239,6 @@ Experimental distributed tracing: --experimental-distributed-tracing-sampling-rate '0' Number of samples to collect per million spans for distributed tracing. Disabled by default. -v2 Proxy (to be deprecated in v3.6): - --proxy 'off' - Proxy mode setting ('off', 'readonly' or 'on'). - --proxy-failure-wait 5000 - Time (in milliseconds) an endpoint will be held in a failed state. - --proxy-refresh-interval 30000 - Time (in milliseconds) of the endpoints refresh interval. - --proxy-dial-timeout 1000 - Time (in milliseconds) for a dial to timeout. - --proxy-write-timeout 5000 - Time (in milliseconds) for a write to timeout. - --proxy-read-timeout 0 - Time (in milliseconds) for a read to timeout. - Experimental feature: --experimental-initial-corrupt-check 'false' Enable to check data corruption before serving any client/peer traffic. diff --git a/server/proxy/httpproxy/director.go b/server/proxy/httpproxy/director.go deleted file mode 100644 index 43db09a8f..000000000 --- a/server/proxy/httpproxy/director.go +++ /dev/null @@ -1,197 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package httpproxy - -import ( - "math/rand" - "net/url" - "sync" - "time" - - "go.uber.org/zap" -) - -// defaultRefreshInterval is the default proxyRefreshIntervalMs value -// as in etcdmain/config.go. -const defaultRefreshInterval = 30000 * time.Millisecond - -var once sync.Once - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director { - if lg == nil { - lg = zap.NewNop() - } - d := &director{ - lg: lg, - uf: urlsFunc, - failureWait: failureWait, - stopc: make(chan struct{}), - donec: make(chan struct{}), - } - d.refresh() - go func() { - defer close(d.donec) - // In order to prevent missing proxy endpoints in the first try: - // when given refresh interval of defaultRefreshInterval or greater - // and whenever there is no available proxy endpoints, - // give 1-second refreshInterval. - for { - es := d.endpoints() - ri := refreshInterval - if ri >= defaultRefreshInterval { - if len(es) == 0 { - ri = time.Second - } - } - if len(es) > 0 { - once.Do(func() { - var sl []string - for _, e := range es { - sl = append(sl, e.URL.String()) - } - lg.Info("endpoints found", zap.Strings("endpoints", sl)) - }) - } - select { - case <-time.After(ri): - d.refresh() - case <-d.stopc: - return - } - } - }() - return d -} - -type director struct { - sync.Mutex - lg *zap.Logger - ep []*endpoint - uf GetProxyURLs - failureWait time.Duration - stopc chan struct{} - donec chan struct{} -} - -func (d *director) refresh() { - urls := d.uf() - d.Lock() - defer d.Unlock() - var endpoints []*endpoint - for _, u := range urls { - uu, err := url.Parse(u) - if err != nil { - d.lg.Info("upstream URL invalid", zap.Error(err)) - continue - } - endpoints = append(endpoints, newEndpoint(d.lg, *uu, d.failureWait)) - } - - // shuffle array to avoid connections being "stuck" to a single endpoint - for i := range endpoints { - j := rand.Intn(i + 1) - endpoints[i], endpoints[j] = endpoints[j], endpoints[i] - } - - d.ep = endpoints -} - -func (d *director) endpoints() []*endpoint { - d.Lock() - defer d.Unlock() - filtered := make([]*endpoint, 0) - for _, ep := range d.ep { - if ep.Available { - filtered = append(filtered, ep) - } - } - - return filtered -} - -func (d *director) stop() { - close(d.stopc) - select { - case <-d.donec: - case <-time.After(time.Second): - d.lg.Warn("timed out waiting for director to stop") - } -} - -func newEndpoint(lg *zap.Logger, u url.URL, failureWait time.Duration) *endpoint { - ep := endpoint{ - lg: lg, - URL: u, - Available: true, - failFunc: timedUnavailabilityFunc(failureWait), - } - - return &ep -} - -type endpoint struct { - sync.Mutex - - lg *zap.Logger - URL url.URL - Available bool - - failFunc func(ep *endpoint) -} - -func (ep *endpoint) Failed() { - ep.Lock() - if !ep.Available { - ep.Unlock() - return - } - - ep.Available = false - ep.Unlock() - - if ep.lg != nil { - ep.lg.Info("marked endpoint unavailable", zap.String("endpoint", ep.URL.String())) - } - - if ep.failFunc == nil { - if ep.lg != nil { - ep.lg.Info( - "no failFunc defined, endpoint will be unavailable forever", - zap.String("endpoint", ep.URL.String()), - ) - } - return - } - - ep.failFunc(ep) -} - -func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) { - return func(ep *endpoint) { - time.AfterFunc(wait, func() { - ep.Available = true - if ep.lg != nil { - ep.lg.Info( - "marked endpoint available, to retest connectivity", - zap.String("endpoint", ep.URL.String()), - ) - } - }) - } -} diff --git a/server/proxy/httpproxy/director_test.go b/server/proxy/httpproxy/director_test.go deleted file mode 100644 index 7c6716fe3..000000000 --- a/server/proxy/httpproxy/director_test.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package httpproxy - -import ( - "net/url" - "reflect" - "sort" - "testing" - "time" - - "go.uber.org/zap/zaptest" -) - -func TestNewDirectorScheme(t *testing.T) { - tests := []struct { - urls []string - want []string - }{ - { - urls: []string{"http://192.0.2.8:4002", "http://example.com:8080"}, - want: []string{"http://192.0.2.8:4002", "http://example.com:8080"}, - }, - { - urls: []string{"https://192.0.2.8:4002", "https://example.com:8080"}, - want: []string{"https://192.0.2.8:4002", "https://example.com:8080"}, - }, - - // accept urls without a port - { - urls: []string{"http://192.0.2.8"}, - want: []string{"http://192.0.2.8"}, - }, - - // accept urls even if they are garbage - { - urls: []string{"http://."}, - want: []string{"http://."}, - }, - } - - for i, tt := range tests { - uf := func() []string { - return tt.urls - } - got := newDirector(zaptest.NewLogger(t), uf, time.Minute, time.Minute) - - var gep []string - for _, ep := range got.ep { - gep = append(gep, ep.URL.String()) - } - sort.Strings(tt.want) - sort.Strings(gep) - if !reflect.DeepEqual(tt.want, gep) { - t.Errorf("#%d: want endpoints = %#v, got = %#v", i, tt.want, gep) - } - - got.stop() - } -} - -func TestDirectorEndpointsFiltering(t *testing.T) { - d := director{ - ep: []*endpoint{ - { - URL: url.URL{Scheme: "http", Host: "192.0.2.5:5050"}, - Available: false, - }, - { - URL: url.URL{Scheme: "http", Host: "192.0.2.4:4000"}, - Available: true, - }, - }, - } - - got := d.endpoints() - want := []*endpoint{ - { - URL: url.URL{Scheme: "http", Host: "192.0.2.4:4000"}, - Available: true, - }, - } - - if !reflect.DeepEqual(want, got) { - t.Fatalf("directed to incorrect endpoint: want = %#v, got = %#v", want, got) - } -} diff --git a/server/proxy/httpproxy/doc.go b/server/proxy/httpproxy/doc.go deleted file mode 100644 index 7a4509912..000000000 --- a/server/proxy/httpproxy/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package httpproxy implements etcd httpproxy. The etcd proxy acts as a reverse -// http proxy forwarding client requests to active etcd cluster members, and does -// not participate in consensus. -package httpproxy diff --git a/server/proxy/httpproxy/metrics.go b/server/proxy/httpproxy/metrics.go deleted file mode 100644 index fcbedc28a..000000000 --- a/server/proxy/httpproxy/metrics.go +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package httpproxy - -import ( - "net/http" - "strconv" - "time" - - "github.com/prometheus/client_golang/prometheus" -) - -var ( - requestsIncoming = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "etcd", - Subsystem: "proxy", - Name: "requests_total", - Help: "Counter requests incoming by method.", - }, []string{"method"}) - - requestsHandled = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "etcd", - Subsystem: "proxy", - Name: "handled_total", - Help: "Counter of requests fully handled (by authoratitave servers)", - }, []string{"method", "code"}) - - requestsDropped = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "etcd", - Subsystem: "proxy", - Name: "dropped_total", - Help: "Counter of requests dropped on the proxy.", - }, []string{"method", "proxying_error"}) - - requestsHandlingSec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "etcd", - Subsystem: "proxy", - Name: "handling_duration_seconds", - Help: "Bucketed histogram of handling time of successful events (non-watches), by method (GET/PUT etc.).", - - // lowest bucket start of upper bound 0.0005 sec (0.5 ms) with factor 2 - // highest bucket start of 0.0005 sec * 2^12 == 2.048 sec - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), - }, []string{"method"}) -) - -type forwardingError string - -const ( - zeroEndpoints forwardingError = "zero_endpoints" - failedSendingRequest forwardingError = "failed_sending_request" - failedGettingResponse forwardingError = "failed_getting_response" -) - -func init() { - prometheus.MustRegister(requestsIncoming) - prometheus.MustRegister(requestsHandled) - prometheus.MustRegister(requestsDropped) - prometheus.MustRegister(requestsHandlingSec) -} - -func reportIncomingRequest(request *http.Request) { - requestsIncoming.WithLabelValues(request.Method).Inc() -} - -func reportRequestHandled(request *http.Request, response *http.Response, startTime time.Time) { - method := request.Method - requestsHandled.WithLabelValues(method, strconv.Itoa(response.StatusCode)).Inc() - requestsHandlingSec.WithLabelValues(method).Observe(time.Since(startTime).Seconds()) -} - -func reportRequestDropped(request *http.Request, err forwardingError) { - requestsDropped.WithLabelValues(request.Method, string(err)).Inc() -} diff --git a/server/proxy/httpproxy/proxy.go b/server/proxy/httpproxy/proxy.go deleted file mode 100644 index c8f27bf01..000000000 --- a/server/proxy/httpproxy/proxy.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package httpproxy - -import ( - "encoding/json" - "net/http" - "strings" - "time" - - "go.uber.org/zap" - "golang.org/x/net/http2" -) - -const ( - // DefaultMaxIdleConnsPerHost indicates the default maximum idle connection - // count maintained between proxy and each member. We set it to 128 to - // let proxy handle 128 concurrent requests in long term smoothly. - // If the number of concurrent requests is bigger than this value, - // proxy needs to create one new connection when handling each request in - // the delta, which is bad because the creation consumes resource and - // may eat up ephemeral ports. - DefaultMaxIdleConnsPerHost = 128 -) - -// GetProxyURLs is a function which should return the current set of URLs to -// which client requests should be proxied. This function will be queried -// periodically by the proxy Handler to refresh the set of available -// backends. -type GetProxyURLs func() []string - -// NewHandler creates a new HTTP handler, listening on the given transport, -// which will proxy requests to an etcd cluster. -// The handler will periodically update its view of the cluster. -func NewHandler(lg *zap.Logger, t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler { - if lg == nil { - lg = zap.NewNop() - } - if t.TLSClientConfig != nil { - // Enable http2, see Issue 5033. - err := http2.ConfigureTransport(t) - if err != nil { - lg.Info("Error enabling Transport HTTP/2 support", zap.Error(err)) - } - } - - p := &reverseProxy{ - lg: lg, - director: newDirector(lg, urlsFunc, failureWait, refreshInterval), - transport: t, - } - - mux := http.NewServeMux() - mux.Handle("/", p) - mux.HandleFunc("/v2/config/local/proxy", p.configHandler) - - return mux -} - -// NewReadonlyHandler wraps the given HTTP handler to allow only GET requests -func NewReadonlyHandler(hdlr http.Handler) http.Handler { - readonly := readonlyHandlerFunc(hdlr) - return http.HandlerFunc(readonly) -} - -func readonlyHandlerFunc(next http.Handler) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, req *http.Request) { - if req.Method != "GET" { - w.WriteHeader(http.StatusNotImplemented) - return - } - - next.ServeHTTP(w, req) - } -} - -func (p *reverseProxy) configHandler(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET") { - return - } - - eps := p.director.endpoints() - epstr := make([]string, len(eps)) - for i, e := range eps { - epstr[i] = e.URL.String() - } - - proxyConfig := struct { - Endpoints []string `json:"endpoints"` - }{ - Endpoints: epstr, - } - - json.NewEncoder(w).Encode(proxyConfig) -} - -// allowMethod verifies that the given method is one of the allowed methods, -// and if not, it writes an error to w. A boolean is returned indicating -// whether or not the method is allowed. -func allowMethod(w http.ResponseWriter, m string, ms ...string) bool { - for _, meth := range ms { - if m == meth { - return true - } - } - w.Header().Set("Allow", strings.Join(ms, ",")) - http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) - return false -} diff --git a/server/proxy/httpproxy/proxy_test.go b/server/proxy/httpproxy/proxy_test.go deleted file mode 100644 index 5506e0b3c..000000000 --- a/server/proxy/httpproxy/proxy_test.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package httpproxy - -import ( - "io" - "net/http" - "net/http/httptest" - "net/url" - "testing" - "time" - - "go.uber.org/zap/zaptest" -) - -func TestReadonlyHandler(t *testing.T) { - fixture := func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(http.StatusOK) - } - hdlrFunc := readonlyHandlerFunc(http.HandlerFunc(fixture)) - - tests := []struct { - method string - want int - }{ - // GET is only passing method - {"GET", http.StatusOK}, - - // everything but GET is StatusNotImplemented - {"POST", http.StatusNotImplemented}, - {"PUT", http.StatusNotImplemented}, - {"PATCH", http.StatusNotImplemented}, - {"DELETE", http.StatusNotImplemented}, - {"FOO", http.StatusNotImplemented}, - } - - for i, tt := range tests { - req, _ := http.NewRequest(tt.method, "http://example.com", nil) - rr := httptest.NewRecorder() - hdlrFunc(rr, req) - - if tt.want != rr.Code { - t.Errorf("#%d: incorrect HTTP status code: method=%s want=%d got=%d", i, tt.method, tt.want, rr.Code) - } - } -} - -func TestConfigHandlerGET(t *testing.T) { - var err error - us := make([]*url.URL, 3) - us[0], err = url.Parse("http://example1.com") - if err != nil { - t.Fatal(err) - } - us[1], err = url.Parse("http://example2.com") - if err != nil { - t.Fatal(err) - } - us[2], err = url.Parse("http://example3.com") - if err != nil { - t.Fatal(err) - } - - lg := zaptest.NewLogger(t) - rp := reverseProxy{ - lg: lg, - director: &director{ - lg: lg, - ep: []*endpoint{ - newEndpoint(lg, *us[0], 1*time.Second), - newEndpoint(lg, *us[1], 1*time.Second), - newEndpoint(lg, *us[2], 1*time.Second), - }, - }, - } - - req, _ := http.NewRequest("GET", "http://example.com//v2/config/local/proxy", nil) - rr := httptest.NewRecorder() - rp.configHandler(rr, req) - - wbody := "{\"endpoints\":[\"http://example1.com\",\"http://example2.com\",\"http://example3.com\"]}\n" - - body, err := io.ReadAll(rr.Body) - if err != nil { - t.Fatal(err) - } - - if string(body) != wbody { - t.Errorf("body = %s, want %s", string(body), wbody) - } -} diff --git a/server/proxy/httpproxy/reverse.go b/server/proxy/httpproxy/reverse.go deleted file mode 100644 index 0db63f938..000000000 --- a/server/proxy/httpproxy/reverse.go +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package httpproxy - -import ( - "bytes" - "context" - "fmt" - "io" - "net" - "net/http" - "net/url" - "strings" - "sync/atomic" - "time" - - "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types" - - "go.uber.org/zap" -) - -var ( - // Hop-by-hop headers. These are removed when sent to the backend. - // http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html - // This list of headers borrowed from stdlib httputil.ReverseProxy - singleHopHeaders = []string{ - "Connection", - "Keep-Alive", - "Proxy-Authenticate", - "Proxy-Authorization", - "Te", // canonicalized version of "TE" - "Trailers", - "Transfer-Encoding", - "Upgrade", - } -) - -func removeSingleHopHeaders(hdrs *http.Header) { - for _, h := range singleHopHeaders { - hdrs.Del(h) - } -} - -type reverseProxy struct { - lg *zap.Logger - director *director - transport http.RoundTripper -} - -func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) { - reportIncomingRequest(clientreq) - proxyreq := new(http.Request) - *proxyreq = *clientreq - startTime := time.Now() - - var ( - proxybody []byte - err error - ) - - if clientreq.Body != nil { - proxybody, err = io.ReadAll(clientreq.Body) - if err != nil { - msg := fmt.Sprintf("failed to read request body: %v", err) - p.lg.Info("failed to read request body", zap.Error(err)) - e := httptypes.NewHTTPError(http.StatusInternalServerError, "httpproxy: "+msg) - if we := e.WriteTo(rw); we != nil { - p.lg.Debug( - "error writing HTTPError to remote addr", - zap.String("remote-addr", clientreq.RemoteAddr), - zap.Error(we), - ) - } - return - } - } - - // deep-copy the headers, as these will be modified below - proxyreq.Header = make(http.Header) - copyHeader(proxyreq.Header, clientreq.Header) - - normalizeRequest(proxyreq) - removeSingleHopHeaders(&proxyreq.Header) - maybeSetForwardedFor(proxyreq) - - endpoints := p.director.endpoints() - if len(endpoints) == 0 { - msg := "zero endpoints currently available" - reportRequestDropped(clientreq, zeroEndpoints) - - // TODO: limit the rate of the error logging. - p.lg.Info(msg) - e := httptypes.NewHTTPError(http.StatusServiceUnavailable, "httpproxy: "+msg) - if we := e.WriteTo(rw); we != nil { - p.lg.Debug( - "error writing HTTPError to remote addr", - zap.String("remote-addr", clientreq.RemoteAddr), - zap.Error(we), - ) - } - return - } - - var requestClosed int32 - completeCh := make(chan bool, 1) - closeNotifier, ok := rw.(http.CloseNotifier) - ctx, cancel := context.WithCancel(context.Background()) - proxyreq = proxyreq.WithContext(ctx) - defer cancel() - if ok { - closeCh := closeNotifier.CloseNotify() - go func() { - select { - case <-closeCh: - atomic.StoreInt32(&requestClosed, 1) - p.lg.Info( - "client closed request prematurely", - zap.String("remote-addr", clientreq.RemoteAddr), - ) - cancel() - case <-completeCh: - } - }() - - defer func() { - completeCh <- true - }() - } - - var res *http.Response - - for _, ep := range endpoints { - if proxybody != nil { - proxyreq.Body = io.NopCloser(bytes.NewBuffer(proxybody)) - } - redirectRequest(proxyreq, ep.URL) - - res, err = p.transport.RoundTrip(proxyreq) - if atomic.LoadInt32(&requestClosed) == 1 { - return - } - if err != nil { - reportRequestDropped(clientreq, failedSendingRequest) - p.lg.Info( - "failed to direct request", - zap.String("url", ep.URL.String()), - zap.Error(err), - ) - ep.Failed() - continue - } - - break - } - - if res == nil { - // TODO: limit the rate of the error logging. - msg := fmt.Sprintf("unable to get response from %d endpoint(s)", len(endpoints)) - reportRequestDropped(clientreq, failedGettingResponse) - p.lg.Info(msg) - e := httptypes.NewHTTPError(http.StatusBadGateway, "httpproxy: "+msg) - if we := e.WriteTo(rw); we != nil { - p.lg.Debug( - "error writing HTTPError to remote addr", - zap.String("remote-addr", clientreq.RemoteAddr), - zap.Error(we), - ) - } - return - } - - defer res.Body.Close() - reportRequestHandled(clientreq, res, startTime) - removeSingleHopHeaders(&res.Header) - copyHeader(rw.Header(), res.Header) - - rw.WriteHeader(res.StatusCode) - io.Copy(rw, res.Body) -} - -func copyHeader(dst, src http.Header) { - for k, vv := range src { - for _, v := range vv { - dst.Add(k, v) - } - } -} - -func redirectRequest(req *http.Request, loc url.URL) { - req.URL.Scheme = loc.Scheme - req.URL.Host = loc.Host -} - -func normalizeRequest(req *http.Request) { - req.Proto = "HTTP/1.1" - req.ProtoMajor = 1 - req.ProtoMinor = 1 - req.Close = false -} - -func maybeSetForwardedFor(req *http.Request) { - clientIP, _, err := net.SplitHostPort(req.RemoteAddr) - if err != nil { - return - } - - // If we aren't the first proxy retain prior - // X-Forwarded-For information as a comma+space - // separated list and fold multiple headers into one. - if prior, ok := req.Header["X-Forwarded-For"]; ok { - clientIP = strings.Join(prior, ", ") + ", " + clientIP - } - req.Header.Set("X-Forwarded-For", clientIP) -} diff --git a/server/proxy/httpproxy/reverse_test.go b/server/proxy/httpproxy/reverse_test.go deleted file mode 100644 index 6e12edae5..000000000 --- a/server/proxy/httpproxy/reverse_test.go +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package httpproxy - -import ( - "bytes" - "errors" - "io" - "net/http" - "net/http/httptest" - "net/url" - "reflect" - "testing" - - "go.uber.org/zap/zaptest" -) - -type staticRoundTripper struct { - res *http.Response - err error -} - -func (srt *staticRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { - return srt.res, srt.err -} - -func TestReverseProxyServe(t *testing.T) { - u := url.URL{Scheme: "http", Host: "192.0.2.3:4040"} - lg := zaptest.NewLogger(t) - - tests := []struct { - eps []*endpoint - rt http.RoundTripper - want int - }{ - // no endpoints available so no requests are even made - { - eps: []*endpoint{}, - rt: &staticRoundTripper{ - res: &http.Response{ - StatusCode: http.StatusCreated, - Body: io.NopCloser(&bytes.Reader{}), - }, - }, - want: http.StatusServiceUnavailable, - }, - - // error is returned from one endpoint that should be available - { - eps: []*endpoint{{URL: u, Available: true}}, - rt: &staticRoundTripper{err: errors.New("what a bad trip")}, - want: http.StatusBadGateway, - }, - - // endpoint is available and returns success - { - eps: []*endpoint{{URL: u, Available: true}}, - rt: &staticRoundTripper{ - res: &http.Response{ - StatusCode: http.StatusCreated, - Body: io.NopCloser(&bytes.Reader{}), - Header: map[string][]string{"Content-Type": {"application/json"}}, - }, - }, - want: http.StatusCreated, - }, - } - - for i, tt := range tests { - rp := reverseProxy{ - lg: lg, - director: &director{lg: lg, ep: tt.eps}, - transport: tt.rt, - } - - req, _ := http.NewRequest("GET", "http://192.0.2.2:2379", nil) - rr := httptest.NewRecorder() - rp.ServeHTTP(rr, req) - - if rr.Code != tt.want { - t.Errorf("#%d: unexpected HTTP status code: want = %d, got = %d", i, tt.want, rr.Code) - } - if gct := rr.Header().Get("Content-Type"); gct != "application/json" { - t.Errorf("#%d: Content-Type = %s, want %s", i, gct, "application/json") - } - } -} - -func TestRedirectRequest(t *testing.T) { - loc := url.URL{ - Scheme: "http", - Host: "bar.example.com", - } - - req := &http.Request{ - Method: "GET", - Host: "foo.example.com", - URL: &url.URL{ - Host: "foo.example.com", - Path: "/v2/keys/baz", - }, - } - - redirectRequest(req, loc) - - want := &http.Request{ - Method: "GET", - // this field must not change - Host: "foo.example.com", - URL: &url.URL{ - // the Scheme field is updated to that of the provided URL - Scheme: "http", - // the Host field is updated to that of the provided URL - Host: "bar.example.com", - Path: "/v2/keys/baz", - }, - } - - if !reflect.DeepEqual(want, req) { - t.Fatalf("HTTP request does not match expected criteria: want=%#v got=%#v", want, req) - } -} - -func TestMaybeSetForwardedFor(t *testing.T) { - tests := []struct { - raddr string - fwdFor string - want string - }{ - {"192.0.2.3:8002", "", "192.0.2.3"}, - {"192.0.2.3:8002", "192.0.2.2", "192.0.2.2, 192.0.2.3"}, - {"192.0.2.3:8002", "192.0.2.1, 192.0.2.2", "192.0.2.1, 192.0.2.2, 192.0.2.3"}, - {"example.com:8002", "", "example.com"}, - - // While these cases look valid, golang net/http will not let it happen - // The RemoteAddr field will always be a valid host:port - {":8002", "", ""}, - {"192.0.2.3", "", ""}, - - // blatantly invalid host w/o a port - {"12", "", ""}, - {"12", "192.0.2.3", "192.0.2.3"}, - } - - for i, tt := range tests { - req := &http.Request{ - RemoteAddr: tt.raddr, - Header: make(http.Header), - } - - if tt.fwdFor != "" { - req.Header.Set("X-Forwarded-For", tt.fwdFor) - } - - maybeSetForwardedFor(req) - got := req.Header.Get("X-Forwarded-For") - if tt.want != got { - t.Errorf("#%d: incorrect header: want = %q, got = %q", i, tt.want, got) - } - } -} - -func TestRemoveSingleHopHeaders(t *testing.T) { - hdr := http.Header(map[string][]string{ - // single-hop headers that should be removed - "Connection": {"close"}, - "Keep-Alive": {"foo"}, - "Proxy-Authenticate": {"Basic realm=example.com"}, - "Proxy-Authorization": {"foo"}, - "Te": {"deflate,gzip"}, - "Trailers": {"ETag"}, - "Transfer-Encoding": {"chunked"}, - "Upgrade": {"WebSocket"}, - - // headers that should persist - "Accept": {"application/json"}, - "X-Foo": {"Bar"}, - }) - - removeSingleHopHeaders(&hdr) - - want := http.Header(map[string][]string{ - "Accept": {"application/json"}, - "X-Foo": {"Bar"}, - }) - - if !reflect.DeepEqual(want, hdr) { - t.Fatalf("unexpected result: want = %#v, got = %#v", want, hdr) - } -} - -func TestCopyHeader(t *testing.T) { - tests := []struct { - src http.Header - dst http.Header - want http.Header - }{ - { - src: http.Header(map[string][]string{ - "Foo": {"bar", "baz"}, - }), - dst: http.Header(map[string][]string{}), - want: http.Header(map[string][]string{ - "Foo": {"bar", "baz"}, - }), - }, - { - src: http.Header(map[string][]string{ - "Foo": {"bar"}, - "Ping": {"pong"}, - }), - dst: http.Header(map[string][]string{}), - want: http.Header(map[string][]string{ - "Foo": {"bar"}, - "Ping": {"pong"}, - }), - }, - { - src: http.Header(map[string][]string{ - "Foo": {"bar", "baz"}, - }), - dst: http.Header(map[string][]string{ - "Foo": {"qux"}, - }), - want: http.Header(map[string][]string{ - "Foo": {"qux", "bar", "baz"}, - }), - }, - } - - for i, tt := range tests { - copyHeader(tt.dst, tt.src) - if !reflect.DeepEqual(tt.dst, tt.want) { - t.Errorf("#%d: unexpected headers: want = %v, got = %v", i, tt.want, tt.dst) - } - } -} diff --git a/tests/framework/e2e/cluster_proxy.go b/tests/framework/e2e/cluster_proxy.go index 20242e1b0..26094eefc 100644 --- a/tests/framework/e2e/cluster_proxy.go +++ b/tests/framework/e2e/cluster_proxy.go @@ -32,8 +32,9 @@ import ( type proxyEtcdProcess struct { etcdProc EtcdProcess - proxyV2 *proxyV2Proc - proxyV3 *proxyV3Proc + // TODO(ahrtr): We need to remove `proxyV2` and v2discovery when the v2client is removed. + proxyV2 *proxyV2Proc + proxyV3 *proxyV3Proc } func NewEtcdProcess(cfg *EtcdServerProcessConfig) (EtcdProcess, error) { @@ -65,9 +66,6 @@ func (p *proxyEtcdProcess) Start() error { if err := p.etcdProc.Start(); err != nil { return err } - if err := p.proxyV2.Start(); err != nil { - return err - } return p.proxyV3.Start() } @@ -75,17 +73,11 @@ func (p *proxyEtcdProcess) Restart() error { if err := p.etcdProc.Restart(); err != nil { return err } - if err := p.proxyV2.Restart(); err != nil { - return err - } return p.proxyV3.Restart() } func (p *proxyEtcdProcess) Stop() error { - err := p.proxyV2.Stop() - if v3err := p.proxyV3.Stop(); err == nil { - err = v3err - } + err := p.proxyV3.Stop() if eerr := p.etcdProc.Stop(); eerr != nil && err == nil { // fails on go-grpc issue #1384 if !strings.Contains(eerr.Error(), "exit status 2") { @@ -96,10 +88,7 @@ func (p *proxyEtcdProcess) Stop() error { } func (p *proxyEtcdProcess) Close() error { - err := p.proxyV2.Close() - if v3err := p.proxyV3.Close(); err == nil { - err = v3err - } + err := p.proxyV3.Close() if eerr := p.etcdProc.Close(); eerr != nil && err == nil { // fails on go-grpc issue #1384 if !strings.Contains(eerr.Error(), "exit status 2") { @@ -110,7 +99,6 @@ func (p *proxyEtcdProcess) Close() error { } func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal { - p.proxyV2.WithStopSignal(sig) p.proxyV3.WithStopSignal(sig) return p.etcdProc.WithStopSignal(sig) } @@ -210,31 +198,6 @@ func newProxyV2Proc(cfg *EtcdServerProcessConfig) *proxyV2Proc { } } -func (v2p *proxyV2Proc) Start() error { - os.RemoveAll(v2p.dataDir) - if err := v2p.start(); err != nil { - return err - } - // The full line we are expecting in the logs: - // "caller":"httpproxy/director.go:65","msg":"endpoints found","endpoints":["http://localhost:20000"]} - return v2p.waitReady("endpoints found") -} - -func (v2p *proxyV2Proc) Restart() error { - if err := v2p.Stop(); err != nil { - return err - } - return v2p.Start() -} - -func (v2p *proxyV2Proc) Stop() error { - if err := v2p.proxyProc.Stop(); err != nil { - return err - } - // v2 proxy caches members; avoid reuse of directory - return os.RemoveAll(v2p.dataDir) -} - type proxyV3Proc struct { proxyProc }