remove v2 http proxy in 3.6

This commit is contained in:
ahrtr 2022-05-12 13:23:40 +08:00
parent 153824be58
commit 1e6163ba27
13 changed files with 19 additions and 1599 deletions

View File

@ -19,7 +19,6 @@ package etcdmain
import ( import (
"flag" "flag"
"fmt" "fmt"
"log"
"os" "os"
"runtime" "runtime"
@ -32,14 +31,9 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.uber.org/zap" "go.uber.org/zap"
"sigs.k8s.io/yaml"
) )
var ( var (
proxyFlagOff = "off"
proxyFlagReadonly = "readonly"
proxyFlagOn = "on"
fallbackFlagExit = "exit" fallbackFlagExit = "exit"
fallbackFlagProxy = "proxy" fallbackFlagProxy = "proxy"
@ -63,22 +57,9 @@ var (
} }
) )
type configProxy struct {
ProxyFailureWaitMs uint `json:"proxy-failure-wait"`
ProxyRefreshIntervalMs uint `json:"proxy-refresh-interval"`
ProxyDialTimeoutMs uint `json:"proxy-dial-timeout"`
ProxyWriteTimeoutMs uint `json:"proxy-write-timeout"`
ProxyReadTimeoutMs uint `json:"proxy-read-timeout"`
Fallback string
Proxy string
ProxyJSON string `json:"proxy"`
FallbackJSON string `json:"discovery-fallback"`
}
// config holds the config for a command line invocation of etcd // config holds the config for a command line invocation of etcd
type config struct { type config struct {
ec embed.Config ec embed.Config
cp configProxy
cf configFlags cf configFlags
configFile string configFile string
printVersion bool printVersion bool
@ -90,20 +71,12 @@ type configFlags struct {
flagSet *flag.FlagSet flagSet *flag.FlagSet
clusterState *flags.SelectiveStringValue clusterState *flags.SelectiveStringValue
fallback *flags.SelectiveStringValue fallback *flags.SelectiveStringValue
proxy *flags.SelectiveStringValue
v2deprecation *flags.SelectiveStringsValue v2deprecation *flags.SelectiveStringsValue
} }
func newConfig() *config { func newConfig() *config {
cfg := &config{ cfg := &config{
ec: *embed.NewConfig(), ec: *embed.NewConfig(),
cp: configProxy{
Proxy: proxyFlagOff,
ProxyFailureWaitMs: 5000,
ProxyRefreshIntervalMs: 30000,
ProxyDialTimeoutMs: 1000,
ProxyWriteTimeoutMs: 5000,
},
ignored: ignored, ignored: ignored,
} }
cfg.cf = configFlags{ cfg.cf = configFlags{
@ -113,13 +86,8 @@ func newConfig() *config {
embed.ClusterStateFlagExisting, embed.ClusterStateFlagExisting,
), ),
fallback: flags.NewSelectiveStringValue( fallback: flags.NewSelectiveStringValue(
fallbackFlagProxy,
fallbackFlagExit, fallbackFlagExit,
), fallbackFlagProxy,
proxy: flags.NewSelectiveStringValue(
proxyFlagOff,
proxyFlagReadonly,
proxyFlagOn,
), ),
v2deprecation: flags.NewSelectiveStringsValue( v2deprecation: flags.NewSelectiveStringsValue(
string(cconfig.V2_DEPR_1_WRITE_ONLY), string(cconfig.V2_DEPR_1_WRITE_ONLY),
@ -218,15 +186,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.PreVote, "pre-vote", cfg.ec.PreVote, "Enable to run an additional Raft election phase.") fs.BoolVar(&cfg.ec.PreVote, "pre-vote", cfg.ec.PreVote, "Enable to run an additional Raft election phase.")
fs.Var(cfg.cf.v2deprecation, "v2-deprecation", fmt.Sprintf("v2store deprecation stage: %q. ", cfg.cf.proxy.Valids())) fs.Var(cfg.cf.v2deprecation, "v2-deprecation", fmt.Sprintf("v2store deprecation stage: %q. ", cfg.cf.v2deprecation.Valids()))
// proxy
fs.Var(cfg.cf.proxy, "proxy", fmt.Sprintf("Valid values include %q", cfg.cf.proxy.Valids()))
fs.UintVar(&cfg.cp.ProxyFailureWaitMs, "proxy-failure-wait", cfg.cp.ProxyFailureWaitMs, "Time (in milliseconds) an endpoint will be held in a failed state.")
fs.UintVar(&cfg.cp.ProxyRefreshIntervalMs, "proxy-refresh-interval", cfg.cp.ProxyRefreshIntervalMs, "Time (in milliseconds) of the endpoints refresh interval.")
fs.UintVar(&cfg.cp.ProxyDialTimeoutMs, "proxy-dial-timeout", cfg.cp.ProxyDialTimeoutMs, "Time (in milliseconds) for a dial to timeout.")
fs.UintVar(&cfg.cp.ProxyWriteTimeoutMs, "proxy-write-timeout", cfg.cp.ProxyWriteTimeoutMs, "Time (in milliseconds) for a write to timeout.")
fs.UintVar(&cfg.cp.ProxyReadTimeoutMs, "proxy-read-timeout", cfg.cp.ProxyReadTimeoutMs, "Time (in milliseconds) for a read to timeout.")
// security // security
fs.StringVar(&cfg.ec.ClientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.") fs.StringVar(&cfg.ec.ClientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.")
@ -423,14 +383,12 @@ func (cfg *config) configFromCmdLine() error {
cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs") cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs")
cfg.ec.ClusterState = cfg.cf.clusterState.String() cfg.ec.ClusterState = cfg.cf.clusterState.String()
cfg.cp.Fallback = cfg.cf.fallback.String()
cfg.cp.Proxy = cfg.cf.proxy.String()
cfg.ec.V2Deprecation = cconfig.V2DeprecationEnum(cfg.cf.v2deprecation.String()) cfg.ec.V2Deprecation = cconfig.V2DeprecationEnum(cfg.cf.v2deprecation.String())
// disable default advertise-client-urls if lcurls is set // disable default advertise-client-urls if lcurls is set
missingAC := flags.IsSet(cfg.cf.flagSet, "listen-client-urls") && !flags.IsSet(cfg.cf.flagSet, "advertise-client-urls") missingAC := flags.IsSet(cfg.cf.flagSet, "listen-client-urls") && !flags.IsSet(cfg.cf.flagSet, "advertise-client-urls")
if !cfg.mayBeProxy() && missingAC { if missingAC {
cfg.ec.ACUrls = nil cfg.ec.ACUrls = nil
} }
@ -449,45 +407,12 @@ func (cfg *config) configFromFile(path string) error {
} }
cfg.ec = *eCfg cfg.ec = *eCfg
// load extra config information
b, rerr := os.ReadFile(path)
if rerr != nil {
return rerr
}
if yerr := yaml.Unmarshal(b, &cfg.cp); yerr != nil {
return yerr
}
if cfg.cp.FallbackJSON != "" {
if err := cfg.cf.fallback.Set(cfg.cp.FallbackJSON); err != nil {
log.Fatalf("unexpected error setting up discovery-fallback flag: %v", err)
}
cfg.cp.Fallback = cfg.cf.fallback.String()
}
if cfg.cp.ProxyJSON != "" {
if err := cfg.cf.proxy.Set(cfg.cp.ProxyJSON); err != nil {
log.Fatalf("unexpected error setting up proxyFlag: %v", err)
}
cfg.cp.Proxy = cfg.cf.proxy.String()
}
return nil return nil
} }
func (cfg *config) mayBeProxy() bool {
mayFallbackToProxy := cfg.ec.Durl != "" && cfg.cp.Fallback == fallbackFlagProxy
return cfg.cp.Proxy != proxyFlagOff || mayFallbackToProxy
}
func (cfg *config) validate() error { func (cfg *config) validate() error {
err := cfg.ec.Validate() if cfg.cf.fallback.String() == fallbackFlagProxy {
// TODO(yichengq): check this for joining through discovery service case return fmt.Errorf("v2 proxy is deprecated, and --discovery-fallback can't be configured as %q", fallbackFlagProxy)
if err == embed.ErrUnsetAdvertiseClientURLsFlag && cfg.mayBeProxy() {
return nil
} }
return err return cfg.ec.Validate()
} }
func (cfg config) isProxy() bool { return cfg.cf.proxy.String() != proxyFlagOff }
func (cfg config) isReadonlyProxy() bool { return cfg.cf.proxy.String() == proxyFlagReadonly }
func (cfg config) shouldFallbackToProxy() bool { return cfg.cf.fallback.String() == fallbackFlagProxy }

View File

@ -94,7 +94,6 @@ func TestConfigParsingClusteringFlags(t *testing.T) {
"-initial-cluster-token=etcdtest", "-initial-cluster-token=etcdtest",
"-initial-advertise-peer-urls=http://localhost:8000,https://localhost:8001", "-initial-advertise-peer-urls=http://localhost:8000,https://localhost:8001",
"-advertise-client-urls=http://localhost:7000,https://localhost:7001", "-advertise-client-urls=http://localhost:7000,https://localhost:7001",
"-discovery-fallback=exit",
} }
cfg := newConfig() cfg := newConfig()
@ -112,14 +111,12 @@ func TestConfigFileClusteringFields(t *testing.T) {
InitialClusterToken string `json:"initial-cluster-token"` InitialClusterToken string `json:"initial-cluster-token"`
Apurls string `json:"initial-advertise-peer-urls"` Apurls string `json:"initial-advertise-peer-urls"`
Acurls string `json:"advertise-client-urls"` Acurls string `json:"advertise-client-urls"`
Fallback string `json:"discovery-fallback"`
}{ }{
"0=http://localhost:8000", "0=http://localhost:8000",
"existing", "existing",
"etcdtest", "etcdtest",
"http://localhost:8000,https://localhost:8001", "http://localhost:8000,https://localhost:8001",
"http://localhost:7000,https://localhost:7001", "http://localhost:7000,https://localhost:7001",
"exit",
} }
b, err := yaml.Marshal(&yc) b, err := yaml.Marshal(&yc)
@ -193,44 +190,6 @@ func TestConfigFileClusteringFlags(t *testing.T) {
} }
} }
func TestConfigParsingOtherFlags(t *testing.T) {
args := []string{"-proxy=readonly"}
cfg := newConfig()
err := cfg.parse(args)
if err != nil {
t.Fatal(err)
}
validateOtherFlags(t, cfg)
}
func TestConfigFileOtherFields(t *testing.T) {
yc := struct {
ProxyCfgFile string `json:"proxy"`
}{
"readonly",
}
b, err := yaml.Marshal(&yc)
if err != nil {
t.Fatal(err)
}
tmpfile := mustCreateCfgFile(t, b)
defer os.Remove(tmpfile.Name())
args := []string{fmt.Sprintf("--config-file=%s", tmpfile.Name())}
cfg := newConfig()
err = cfg.parse(args)
if err != nil {
t.Fatal(err)
}
validateOtherFlags(t, cfg)
}
func TestConfigParsingConflictClusteringFlags(t *testing.T) { func TestConfigParsingConflictClusteringFlags(t *testing.T) {
conflictArgs := [][]string{ conflictArgs := [][]string{
{ {
@ -336,27 +295,6 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
}, },
embed.ErrUnsetAdvertiseClientURLsFlag, embed.ErrUnsetAdvertiseClientURLsFlag,
}, },
{
[]string{
"-discovery=http://example.com/abc",
"-listen-client-urls=http://127.0.0.1:2379",
},
nil,
},
{
[]string{
"-proxy=on",
"-listen-client-urls=http://127.0.0.1:2379",
},
nil,
},
{
[]string{
"-proxy=readonly",
"-listen-client-urls=http://127.0.0.1:2379",
},
nil,
},
} }
for i, tt := range tests { for i, tt := range tests {
@ -387,65 +325,6 @@ func TestConfigIsNewCluster(t *testing.T) {
} }
} }
func TestConfigIsProxy(t *testing.T) {
tests := []struct {
proxy string
wIsProxy bool
}{
{proxyFlagOff, false},
{proxyFlagReadonly, true},
{proxyFlagOn, true},
}
for i, tt := range tests {
cfg := newConfig()
if err := cfg.cf.proxy.Set(tt.proxy); err != nil {
t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err)
}
if g := cfg.isProxy(); g != tt.wIsProxy {
t.Errorf("#%d: isProxy = %v, want %v", i, g, tt.wIsProxy)
}
}
}
func TestConfigIsReadonlyProxy(t *testing.T) {
tests := []struct {
proxy string
wIsReadonly bool
}{
{proxyFlagOff, false},
{proxyFlagReadonly, true},
{proxyFlagOn, false},
}
for i, tt := range tests {
cfg := newConfig()
if err := cfg.cf.proxy.Set(tt.proxy); err != nil {
t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err)
}
if g := cfg.isReadonlyProxy(); g != tt.wIsReadonly {
t.Errorf("#%d: isReadonlyProxy = %v, want %v", i, g, tt.wIsReadonly)
}
}
}
func TestConfigShouldFallbackToProxy(t *testing.T) {
tests := []struct {
fallback string
wFallback bool
}{
{fallbackFlagProxy, true},
{fallbackFlagExit, false},
}
for i, tt := range tests {
cfg := newConfig()
if err := cfg.cf.fallback.Set(tt.fallback); err != nil {
t.Fatalf("#%d: unexpected fallback.Set error: %v", i, err)
}
if g := cfg.shouldFallbackToProxy(); g != tt.wFallback {
t.Errorf("#%d: shouldFallbackToProxy = %v, want %v", i, g, tt.wFallback)
}
}
}
func TestConfigFileElectionTimeout(t *testing.T) { func TestConfigFileElectionTimeout(t *testing.T) {
tests := []struct { tests := []struct {
TickMs uint `json:"heartbeat-interval"` TickMs uint `json:"heartbeat-interval"`
@ -549,16 +428,12 @@ func validateClusteringFlags(t *testing.T, cfg *config) {
wcfg.ec.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} wcfg.ec.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}
wcfg.ec.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} wcfg.ec.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}
wcfg.ec.ClusterState = embed.ClusterStateFlagExisting wcfg.ec.ClusterState = embed.ClusterStateFlagExisting
wcfg.cf.fallback.Set(fallbackFlagExit)
wcfg.ec.InitialCluster = "0=http://localhost:8000" wcfg.ec.InitialCluster = "0=http://localhost:8000"
wcfg.ec.InitialClusterToken = "etcdtest" wcfg.ec.InitialClusterToken = "etcdtest"
if cfg.ec.ClusterState != wcfg.ec.ClusterState { if cfg.ec.ClusterState != wcfg.ec.ClusterState {
t.Errorf("clusterState = %v, want %v", cfg.ec.ClusterState, wcfg.ec.ClusterState) t.Errorf("clusterState = %v, want %v", cfg.ec.ClusterState, wcfg.ec.ClusterState)
} }
if cfg.cf.fallback.String() != wcfg.cf.fallback.String() {
t.Errorf("fallback = %v, want %v", cfg.cf.fallback, wcfg.cf.fallback)
}
if cfg.ec.InitialCluster != wcfg.ec.InitialCluster { if cfg.ec.InitialCluster != wcfg.ec.InitialCluster {
t.Errorf("initialCluster = %v, want %v", cfg.ec.InitialCluster, wcfg.ec.InitialCluster) t.Errorf("initialCluster = %v, want %v", cfg.ec.InitialCluster, wcfg.ec.InitialCluster)
} }
@ -572,11 +447,3 @@ func validateClusteringFlags(t *testing.T, cfg *config) {
t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.ACUrls, wcfg.ec.ACUrls) t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.ACUrls, wcfg.ec.ACUrls)
} }
} }
func validateOtherFlags(t *testing.T, cfg *config) {
wcfg := newConfig()
wcfg.cf.proxy.Set(proxyFlagReadonly)
if cfg.cf.proxy.String() != wcfg.cf.proxy.String() {
t.Errorf("proxy = %v, want %v", cfg.cf.proxy, wcfg.cf.proxy)
}
}

View File

@ -15,28 +15,18 @@
package etcdmain package etcdmain
import ( import (
"encoding/json"
"fmt" "fmt"
"net/http"
"os" "os"
"path/filepath"
"reflect"
"runtime" "runtime"
"strings" "strings"
"time"
"go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
pkgioutil "go.etcd.io/etcd/pkg/v3/ioutil"
"go.etcd.io/etcd/pkg/v3/osutil" "go.etcd.io/etcd/pkg/v3/osutil"
"go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery" "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery"
"go.etcd.io/etcd/server/v3/proxy/httpproxy"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -122,7 +112,7 @@ func startEtcdOrProxyV2(args []string) {
case dirMember: case dirMember:
stopped, errc, err = startEtcd(&cfg.ec) stopped, errc, err = startEtcd(&cfg.ec)
case dirProxy: case dirProxy:
err = startProxy(cfg) lg.Panic("v2 http proxy has already been deprecated in 3.6", zap.String("dir-type", string(which)))
default: default:
lg.Panic( lg.Panic(
"unknown directory type", "unknown directory type",
@ -130,24 +120,9 @@ func startEtcdOrProxyV2(args []string) {
) )
} }
} else { } else {
shouldProxy := cfg.isProxy() stopped, errc, err = startEtcd(&cfg.ec)
if !shouldProxy { if err != nil {
stopped, errc, err = startEtcd(&cfg.ec) lg.Warn("failed to start etcd", zap.Error(err))
if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == v2discovery.ErrFullCluster {
if cfg.shouldFallbackToProxy() {
lg.Warn(
"discovery cluster is full, falling back to proxy",
zap.String("fallback-proxy", fallbackFlagProxy),
zap.Error(err),
)
shouldProxy = true
}
} else if err != nil {
lg.Warn("failed to start etcd", zap.Error(err))
}
}
if shouldProxy {
err = startProxy(cfg)
} }
} }
@ -237,199 +212,6 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
return e.Server.StopNotify(), e.Err(), nil return e.Server.StopNotify(), e.Err(), nil
} }
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
func startProxy(cfg *config) error {
lg := cfg.ec.GetLogger()
lg.Info("v2 API proxy starting")
clientTLSInfo := cfg.ec.ClientTLSInfo
if clientTLSInfo.Empty() {
// Support old proxy behavior of defaulting to PeerTLSInfo
// for both client and peer connections.
clientTLSInfo = cfg.ec.PeerTLSInfo
}
clientTLSInfo.InsecureSkipVerify = cfg.ec.ClientAutoTLS
cfg.ec.PeerTLSInfo.InsecureSkipVerify = cfg.ec.PeerAutoTLS
pt, err := transport.NewTimeoutTransport(
clientTLSInfo,
time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
)
if err != nil {
return err
}
pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost
if err = cfg.ec.PeerSelfCert(); err != nil {
lg.Fatal("failed to get self-signed certs for peer", zap.Error(err))
}
tr, err := transport.NewTimeoutTransport(
cfg.ec.PeerTLSInfo,
time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
)
if err != nil {
return err
}
cfg.ec.Dir = filepath.Join(cfg.ec.Dir, "proxy")
err = fileutil.TouchDirAll(lg, cfg.ec.Dir)
if err != nil {
return err
}
var peerURLs []string
clusterfile := filepath.Join(cfg.ec.Dir, "cluster")
b, err := os.ReadFile(clusterfile)
switch {
case err == nil:
if cfg.ec.Durl != "" || len(cfg.ec.DiscoveryCfg.Endpoints) > 0 {
lg.Warn(
"discovery token ignored since the proxy has already been initialized; valid cluster file found",
zap.String("cluster-file", clusterfile),
)
}
if cfg.ec.DNSCluster != "" {
lg.Warn(
"DNS SRV discovery ignored since the proxy has already been initialized; valid cluster file found",
zap.String("cluster-file", clusterfile),
)
}
urls := struct{ PeerURLs []string }{}
err = json.Unmarshal(b, &urls)
if err != nil {
return err
}
peerURLs = urls.PeerURLs
lg.Info(
"proxy using peer URLS from cluster file",
zap.Strings("peer-urls", peerURLs),
zap.String("cluster-file", clusterfile),
)
case os.IsNotExist(err):
var urlsmap types.URLsMap
urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy")
if err != nil {
return fmt.Errorf("error setting up initial cluster: %v", err)
}
var s string
if cfg.ec.Durl != "" {
lg.Warn("V2 discovery is deprecated!")
s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
} else if len(cfg.ec.DiscoveryCfg.Endpoints) > 0 {
s, err = v3discovery.GetCluster(lg, &cfg.ec.DiscoveryCfg)
}
if err != nil {
return err
}
if s != "" {
if urlsmap, err = types.NewURLsMap(s); err != nil {
return err
}
}
peerURLs = urlsmap.URLs()
lg.Info("proxy using peer URLS", zap.Strings("peer-urls", peerURLs))
default:
return err
}
clientURLs := []string{}
uf := func() []string {
gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr)
if gerr != nil {
lg.Warn(
"failed to get cluster from remote peers",
zap.Strings("peer-urls", peerURLs),
zap.Error(gerr),
)
return []string{}
}
clientURLs = gcls.ClientURLs()
urls := struct{ PeerURLs []string }{gcls.PeerURLs()}
b, jerr := json.Marshal(urls)
if jerr != nil {
lg.Warn("proxy failed to marshal peer URLs", zap.Error(jerr))
return clientURLs
}
err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600)
if err != nil {
lg.Warn("proxy failed to write cluster file", zap.Error(err))
return clientURLs
}
err = os.Rename(clusterfile+".bak", clusterfile)
if err != nil {
lg.Warn(
"proxy failed to rename cluster file",
zap.String("path", clusterfile),
zap.Error(err),
)
return clientURLs
}
if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) {
lg.Info(
"proxy updated peer URLs",
zap.Strings("from", peerURLs),
zap.Strings("to", gcls.PeerURLs()),
)
}
peerURLs = gcls.PeerURLs()
return clientURLs
}
ph := httpproxy.NewHandler(lg, pt, uf, time.Duration(cfg.cp.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.cp.ProxyRefreshIntervalMs)*time.Millisecond)
ph = embed.WrapCORS(cfg.ec.CORS, ph)
if cfg.isReadonlyProxy() {
ph = httpproxy.NewReadonlyHandler(ph)
}
// setup self signed certs when serving https
cHosts, cTLS := []string{}, false
for _, u := range cfg.ec.LCUrls {
cHosts = append(cHosts, u.Host)
cTLS = cTLS || u.Scheme == "https"
}
for _, u := range cfg.ec.ACUrls {
cHosts = append(cHosts, u.Host)
cTLS = cTLS || u.Scheme == "https"
}
listenerTLS := cfg.ec.ClientTLSInfo
if cfg.ec.ClientAutoTLS && cTLS {
listenerTLS, err = transport.SelfCert(cfg.ec.GetLogger(), filepath.Join(cfg.ec.Dir, "clientCerts"), cHosts, cfg.ec.SelfSignedCertValidity)
if err != nil {
lg.Fatal("failed to initialize self-signed client cert", zap.Error(err))
}
}
// Start a proxy server goroutine for each listen address
for _, u := range cfg.ec.LCUrls {
l, err := transport.NewListener(u.Host, u.Scheme, &listenerTLS)
if err != nil {
return err
}
host := u.String()
go func() {
lg.Info("v2 proxy started listening on client requests", zap.String("host", host))
mux := http.NewServeMux()
etcdhttp.HandleMetrics(mux) // v2 proxy just uses the same port
mux.Handle("/", ph)
lg.Fatal("done serving", zap.Error(http.Serve(l, mux)))
}()
}
return nil
}
// identifyDataDirOrDie returns the type of the data dir. // identifyDataDirOrDie returns the type of the data dir.
// Dies if the datadir is invalid. // Dies if the datadir is invalid.
func identifyDataDirOrDie(lg *zap.Logger, dir string) dirType { func identifyDataDirOrDie(lg *zap.Logger, dir string) dirType {

View File

@ -1,4 +1,5 @@
// Copyright 2015 The etcd Authors // Copyright 2015 The etcd Authors
// Copyright 2015 The etcd Authors
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -132,9 +133,8 @@ Clustering:
V3 discovery: username[:password] for authentication (prompt if password is not supplied). V3 discovery: username[:password] for authentication (prompt if password is not supplied).
--discovery-password '' --discovery-password ''
V3 discovery: password for authentication (if this option is used, --user option shouldn't include password). V3 discovery: password for authentication (if this option is used, --user option shouldn't include password).
--discovery-fallback 'proxy' --discovery-fallback 'exit'
Expected behavior ('exit' or 'proxy') when discovery services fails. Expected behavior ('exit') when discovery services fails. Note that v2 proxy is removed.
"proxy" supports v2 API only.
--discovery-proxy '' --discovery-proxy ''
HTTP proxy to use for traffic to discovery service. Will be deprecated in v3.7, and be decommissioned in v3.8. HTTP proxy to use for traffic to discovery service. Will be deprecated in v3.7, and be decommissioned in v3.8.
--discovery-srv '' --discovery-srv ''
@ -239,20 +239,6 @@ Experimental distributed tracing:
--experimental-distributed-tracing-sampling-rate '0' --experimental-distributed-tracing-sampling-rate '0'
Number of samples to collect per million spans for distributed tracing. Disabled by default. Number of samples to collect per million spans for distributed tracing. Disabled by default.
v2 Proxy (to be deprecated in v3.6):
--proxy 'off'
Proxy mode setting ('off', 'readonly' or 'on').
--proxy-failure-wait 5000
Time (in milliseconds) an endpoint will be held in a failed state.
--proxy-refresh-interval 30000
Time (in milliseconds) of the endpoints refresh interval.
--proxy-dial-timeout 1000
Time (in milliseconds) for a dial to timeout.
--proxy-write-timeout 5000
Time (in milliseconds) for a write to timeout.
--proxy-read-timeout 0
Time (in milliseconds) for a read to timeout.
Experimental feature: Experimental feature:
--experimental-initial-corrupt-check 'false' --experimental-initial-corrupt-check 'false'
Enable to check data corruption before serving any client/peer traffic. Enable to check data corruption before serving any client/peer traffic.

View File

@ -1,197 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpproxy
import (
"math/rand"
"net/url"
"sync"
"time"
"go.uber.org/zap"
)
// defaultRefreshInterval is the default proxyRefreshIntervalMs value
// as in etcdmain/config.go.
const defaultRefreshInterval = 30000 * time.Millisecond
var once sync.Once
func init() {
rand.Seed(time.Now().UnixNano())
}
func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
if lg == nil {
lg = zap.NewNop()
}
d := &director{
lg: lg,
uf: urlsFunc,
failureWait: failureWait,
stopc: make(chan struct{}),
donec: make(chan struct{}),
}
d.refresh()
go func() {
defer close(d.donec)
// In order to prevent missing proxy endpoints in the first try:
// when given refresh interval of defaultRefreshInterval or greater
// and whenever there is no available proxy endpoints,
// give 1-second refreshInterval.
for {
es := d.endpoints()
ri := refreshInterval
if ri >= defaultRefreshInterval {
if len(es) == 0 {
ri = time.Second
}
}
if len(es) > 0 {
once.Do(func() {
var sl []string
for _, e := range es {
sl = append(sl, e.URL.String())
}
lg.Info("endpoints found", zap.Strings("endpoints", sl))
})
}
select {
case <-time.After(ri):
d.refresh()
case <-d.stopc:
return
}
}
}()
return d
}
type director struct {
sync.Mutex
lg *zap.Logger
ep []*endpoint
uf GetProxyURLs
failureWait time.Duration
stopc chan struct{}
donec chan struct{}
}
func (d *director) refresh() {
urls := d.uf()
d.Lock()
defer d.Unlock()
var endpoints []*endpoint
for _, u := range urls {
uu, err := url.Parse(u)
if err != nil {
d.lg.Info("upstream URL invalid", zap.Error(err))
continue
}
endpoints = append(endpoints, newEndpoint(d.lg, *uu, d.failureWait))
}
// shuffle array to avoid connections being "stuck" to a single endpoint
for i := range endpoints {
j := rand.Intn(i + 1)
endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
}
d.ep = endpoints
}
func (d *director) endpoints() []*endpoint {
d.Lock()
defer d.Unlock()
filtered := make([]*endpoint, 0)
for _, ep := range d.ep {
if ep.Available {
filtered = append(filtered, ep)
}
}
return filtered
}
func (d *director) stop() {
close(d.stopc)
select {
case <-d.donec:
case <-time.After(time.Second):
d.lg.Warn("timed out waiting for director to stop")
}
}
func newEndpoint(lg *zap.Logger, u url.URL, failureWait time.Duration) *endpoint {
ep := endpoint{
lg: lg,
URL: u,
Available: true,
failFunc: timedUnavailabilityFunc(failureWait),
}
return &ep
}
type endpoint struct {
sync.Mutex
lg *zap.Logger
URL url.URL
Available bool
failFunc func(ep *endpoint)
}
func (ep *endpoint) Failed() {
ep.Lock()
if !ep.Available {
ep.Unlock()
return
}
ep.Available = false
ep.Unlock()
if ep.lg != nil {
ep.lg.Info("marked endpoint unavailable", zap.String("endpoint", ep.URL.String()))
}
if ep.failFunc == nil {
if ep.lg != nil {
ep.lg.Info(
"no failFunc defined, endpoint will be unavailable forever",
zap.String("endpoint", ep.URL.String()),
)
}
return
}
ep.failFunc(ep)
}
func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
return func(ep *endpoint) {
time.AfterFunc(wait, func() {
ep.Available = true
if ep.lg != nil {
ep.lg.Info(
"marked endpoint available, to retest connectivity",
zap.String("endpoint", ep.URL.String()),
)
}
})
}
}

View File

@ -1,99 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpproxy
import (
"net/url"
"reflect"
"sort"
"testing"
"time"
"go.uber.org/zap/zaptest"
)
func TestNewDirectorScheme(t *testing.T) {
tests := []struct {
urls []string
want []string
}{
{
urls: []string{"http://192.0.2.8:4002", "http://example.com:8080"},
want: []string{"http://192.0.2.8:4002", "http://example.com:8080"},
},
{
urls: []string{"https://192.0.2.8:4002", "https://example.com:8080"},
want: []string{"https://192.0.2.8:4002", "https://example.com:8080"},
},
// accept urls without a port
{
urls: []string{"http://192.0.2.8"},
want: []string{"http://192.0.2.8"},
},
// accept urls even if they are garbage
{
urls: []string{"http://."},
want: []string{"http://."},
},
}
for i, tt := range tests {
uf := func() []string {
return tt.urls
}
got := newDirector(zaptest.NewLogger(t), uf, time.Minute, time.Minute)
var gep []string
for _, ep := range got.ep {
gep = append(gep, ep.URL.String())
}
sort.Strings(tt.want)
sort.Strings(gep)
if !reflect.DeepEqual(tt.want, gep) {
t.Errorf("#%d: want endpoints = %#v, got = %#v", i, tt.want, gep)
}
got.stop()
}
}
func TestDirectorEndpointsFiltering(t *testing.T) {
d := director{
ep: []*endpoint{
{
URL: url.URL{Scheme: "http", Host: "192.0.2.5:5050"},
Available: false,
},
{
URL: url.URL{Scheme: "http", Host: "192.0.2.4:4000"},
Available: true,
},
},
}
got := d.endpoints()
want := []*endpoint{
{
URL: url.URL{Scheme: "http", Host: "192.0.2.4:4000"},
Available: true,
},
}
if !reflect.DeepEqual(want, got) {
t.Fatalf("directed to incorrect endpoint: want = %#v, got = %#v", want, got)
}
}

View File

@ -1,18 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package httpproxy implements etcd httpproxy. The etcd proxy acts as a reverse
// http proxy forwarding client requests to active etcd cluster members, and does
// not participate in consensus.
package httpproxy

View File

@ -1,90 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpproxy
import (
"net/http"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
requestsIncoming = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "proxy",
Name: "requests_total",
Help: "Counter requests incoming by method.",
}, []string{"method"})
requestsHandled = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "proxy",
Name: "handled_total",
Help: "Counter of requests fully handled (by authoratitave servers)",
}, []string{"method", "code"})
requestsDropped = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "proxy",
Name: "dropped_total",
Help: "Counter of requests dropped on the proxy.",
}, []string{"method", "proxying_error"})
requestsHandlingSec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "proxy",
Name: "handling_duration_seconds",
Help: "Bucketed histogram of handling time of successful events (non-watches), by method (GET/PUT etc.).",
// lowest bucket start of upper bound 0.0005 sec (0.5 ms) with factor 2
// highest bucket start of 0.0005 sec * 2^12 == 2.048 sec
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{"method"})
)
type forwardingError string
const (
zeroEndpoints forwardingError = "zero_endpoints"
failedSendingRequest forwardingError = "failed_sending_request"
failedGettingResponse forwardingError = "failed_getting_response"
)
func init() {
prometheus.MustRegister(requestsIncoming)
prometheus.MustRegister(requestsHandled)
prometheus.MustRegister(requestsDropped)
prometheus.MustRegister(requestsHandlingSec)
}
func reportIncomingRequest(request *http.Request) {
requestsIncoming.WithLabelValues(request.Method).Inc()
}
func reportRequestHandled(request *http.Request, response *http.Response, startTime time.Time) {
method := request.Method
requestsHandled.WithLabelValues(method, strconv.Itoa(response.StatusCode)).Inc()
requestsHandlingSec.WithLabelValues(method).Observe(time.Since(startTime).Seconds())
}
func reportRequestDropped(request *http.Request, err forwardingError) {
requestsDropped.WithLabelValues(request.Method, string(err)).Inc()
}

View File

@ -1,121 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpproxy
import (
"encoding/json"
"net/http"
"strings"
"time"
"go.uber.org/zap"
"golang.org/x/net/http2"
)
const (
// DefaultMaxIdleConnsPerHost indicates the default maximum idle connection
// count maintained between proxy and each member. We set it to 128 to
// let proxy handle 128 concurrent requests in long term smoothly.
// If the number of concurrent requests is bigger than this value,
// proxy needs to create one new connection when handling each request in
// the delta, which is bad because the creation consumes resource and
// may eat up ephemeral ports.
DefaultMaxIdleConnsPerHost = 128
)
// GetProxyURLs is a function which should return the current set of URLs to
// which client requests should be proxied. This function will be queried
// periodically by the proxy Handler to refresh the set of available
// backends.
type GetProxyURLs func() []string
// NewHandler creates a new HTTP handler, listening on the given transport,
// which will proxy requests to an etcd cluster.
// The handler will periodically update its view of the cluster.
func NewHandler(lg *zap.Logger, t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler {
if lg == nil {
lg = zap.NewNop()
}
if t.TLSClientConfig != nil {
// Enable http2, see Issue 5033.
err := http2.ConfigureTransport(t)
if err != nil {
lg.Info("Error enabling Transport HTTP/2 support", zap.Error(err))
}
}
p := &reverseProxy{
lg: lg,
director: newDirector(lg, urlsFunc, failureWait, refreshInterval),
transport: t,
}
mux := http.NewServeMux()
mux.Handle("/", p)
mux.HandleFunc("/v2/config/local/proxy", p.configHandler)
return mux
}
// NewReadonlyHandler wraps the given HTTP handler to allow only GET requests
func NewReadonlyHandler(hdlr http.Handler) http.Handler {
readonly := readonlyHandlerFunc(hdlr)
return http.HandlerFunc(readonly)
}
func readonlyHandlerFunc(next http.Handler) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
if req.Method != "GET" {
w.WriteHeader(http.StatusNotImplemented)
return
}
next.ServeHTTP(w, req)
}
}
func (p *reverseProxy) configHandler(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
eps := p.director.endpoints()
epstr := make([]string, len(eps))
for i, e := range eps {
epstr[i] = e.URL.String()
}
proxyConfig := struct {
Endpoints []string `json:"endpoints"`
}{
Endpoints: epstr,
}
json.NewEncoder(w).Encode(proxyConfig)
}
// allowMethod verifies that the given method is one of the allowed methods,
// and if not, it writes an error to w. A boolean is returned indicating
// whether or not the method is allowed.
func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
for _, meth := range ms {
if m == meth {
return true
}
}
w.Header().Set("Allow", strings.Join(ms, ","))
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return false
}

View File

@ -1,103 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpproxy
import (
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"go.uber.org/zap/zaptest"
)
func TestReadonlyHandler(t *testing.T) {
fixture := func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}
hdlrFunc := readonlyHandlerFunc(http.HandlerFunc(fixture))
tests := []struct {
method string
want int
}{
// GET is only passing method
{"GET", http.StatusOK},
// everything but GET is StatusNotImplemented
{"POST", http.StatusNotImplemented},
{"PUT", http.StatusNotImplemented},
{"PATCH", http.StatusNotImplemented},
{"DELETE", http.StatusNotImplemented},
{"FOO", http.StatusNotImplemented},
}
for i, tt := range tests {
req, _ := http.NewRequest(tt.method, "http://example.com", nil)
rr := httptest.NewRecorder()
hdlrFunc(rr, req)
if tt.want != rr.Code {
t.Errorf("#%d: incorrect HTTP status code: method=%s want=%d got=%d", i, tt.method, tt.want, rr.Code)
}
}
}
func TestConfigHandlerGET(t *testing.T) {
var err error
us := make([]*url.URL, 3)
us[0], err = url.Parse("http://example1.com")
if err != nil {
t.Fatal(err)
}
us[1], err = url.Parse("http://example2.com")
if err != nil {
t.Fatal(err)
}
us[2], err = url.Parse("http://example3.com")
if err != nil {
t.Fatal(err)
}
lg := zaptest.NewLogger(t)
rp := reverseProxy{
lg: lg,
director: &director{
lg: lg,
ep: []*endpoint{
newEndpoint(lg, *us[0], 1*time.Second),
newEndpoint(lg, *us[1], 1*time.Second),
newEndpoint(lg, *us[2], 1*time.Second),
},
},
}
req, _ := http.NewRequest("GET", "http://example.com//v2/config/local/proxy", nil)
rr := httptest.NewRecorder()
rp.configHandler(rr, req)
wbody := "{\"endpoints\":[\"http://example1.com\",\"http://example2.com\",\"http://example3.com\"]}\n"
body, err := io.ReadAll(rr.Body)
if err != nil {
t.Fatal(err)
}
if string(body) != wbody {
t.Errorf("body = %s, want %s", string(body), wbody)
}
}

View File

@ -1,226 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpproxy
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"sync/atomic"
"time"
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types"
"go.uber.org/zap"
)
var (
// Hop-by-hop headers. These are removed when sent to the backend.
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
// This list of headers borrowed from stdlib httputil.ReverseProxy
singleHopHeaders = []string{
"Connection",
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"Te", // canonicalized version of "TE"
"Trailers",
"Transfer-Encoding",
"Upgrade",
}
)
func removeSingleHopHeaders(hdrs *http.Header) {
for _, h := range singleHopHeaders {
hdrs.Del(h)
}
}
type reverseProxy struct {
lg *zap.Logger
director *director
transport http.RoundTripper
}
func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) {
reportIncomingRequest(clientreq)
proxyreq := new(http.Request)
*proxyreq = *clientreq
startTime := time.Now()
var (
proxybody []byte
err error
)
if clientreq.Body != nil {
proxybody, err = io.ReadAll(clientreq.Body)
if err != nil {
msg := fmt.Sprintf("failed to read request body: %v", err)
p.lg.Info("failed to read request body", zap.Error(err))
e := httptypes.NewHTTPError(http.StatusInternalServerError, "httpproxy: "+msg)
if we := e.WriteTo(rw); we != nil {
p.lg.Debug(
"error writing HTTPError to remote addr",
zap.String("remote-addr", clientreq.RemoteAddr),
zap.Error(we),
)
}
return
}
}
// deep-copy the headers, as these will be modified below
proxyreq.Header = make(http.Header)
copyHeader(proxyreq.Header, clientreq.Header)
normalizeRequest(proxyreq)
removeSingleHopHeaders(&proxyreq.Header)
maybeSetForwardedFor(proxyreq)
endpoints := p.director.endpoints()
if len(endpoints) == 0 {
msg := "zero endpoints currently available"
reportRequestDropped(clientreq, zeroEndpoints)
// TODO: limit the rate of the error logging.
p.lg.Info(msg)
e := httptypes.NewHTTPError(http.StatusServiceUnavailable, "httpproxy: "+msg)
if we := e.WriteTo(rw); we != nil {
p.lg.Debug(
"error writing HTTPError to remote addr",
zap.String("remote-addr", clientreq.RemoteAddr),
zap.Error(we),
)
}
return
}
var requestClosed int32
completeCh := make(chan bool, 1)
closeNotifier, ok := rw.(http.CloseNotifier)
ctx, cancel := context.WithCancel(context.Background())
proxyreq = proxyreq.WithContext(ctx)
defer cancel()
if ok {
closeCh := closeNotifier.CloseNotify()
go func() {
select {
case <-closeCh:
atomic.StoreInt32(&requestClosed, 1)
p.lg.Info(
"client closed request prematurely",
zap.String("remote-addr", clientreq.RemoteAddr),
)
cancel()
case <-completeCh:
}
}()
defer func() {
completeCh <- true
}()
}
var res *http.Response
for _, ep := range endpoints {
if proxybody != nil {
proxyreq.Body = io.NopCloser(bytes.NewBuffer(proxybody))
}
redirectRequest(proxyreq, ep.URL)
res, err = p.transport.RoundTrip(proxyreq)
if atomic.LoadInt32(&requestClosed) == 1 {
return
}
if err != nil {
reportRequestDropped(clientreq, failedSendingRequest)
p.lg.Info(
"failed to direct request",
zap.String("url", ep.URL.String()),
zap.Error(err),
)
ep.Failed()
continue
}
break
}
if res == nil {
// TODO: limit the rate of the error logging.
msg := fmt.Sprintf("unable to get response from %d endpoint(s)", len(endpoints))
reportRequestDropped(clientreq, failedGettingResponse)
p.lg.Info(msg)
e := httptypes.NewHTTPError(http.StatusBadGateway, "httpproxy: "+msg)
if we := e.WriteTo(rw); we != nil {
p.lg.Debug(
"error writing HTTPError to remote addr",
zap.String("remote-addr", clientreq.RemoteAddr),
zap.Error(we),
)
}
return
}
defer res.Body.Close()
reportRequestHandled(clientreq, res, startTime)
removeSingleHopHeaders(&res.Header)
copyHeader(rw.Header(), res.Header)
rw.WriteHeader(res.StatusCode)
io.Copy(rw, res.Body)
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}
func redirectRequest(req *http.Request, loc url.URL) {
req.URL.Scheme = loc.Scheme
req.URL.Host = loc.Host
}
func normalizeRequest(req *http.Request) {
req.Proto = "HTTP/1.1"
req.ProtoMajor = 1
req.ProtoMinor = 1
req.Close = false
}
func maybeSetForwardedFor(req *http.Request) {
clientIP, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
return
}
// If we aren't the first proxy retain prior
// X-Forwarded-For information as a comma+space
// separated list and fold multiple headers into one.
if prior, ok := req.Header["X-Forwarded-For"]; ok {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
req.Header.Set("X-Forwarded-For", clientIP)
}

View File

@ -1,249 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpproxy
import (
"bytes"
"errors"
"io"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"testing"
"go.uber.org/zap/zaptest"
)
type staticRoundTripper struct {
res *http.Response
err error
}
func (srt *staticRoundTripper) RoundTrip(*http.Request) (*http.Response, error) {
return srt.res, srt.err
}
func TestReverseProxyServe(t *testing.T) {
u := url.URL{Scheme: "http", Host: "192.0.2.3:4040"}
lg := zaptest.NewLogger(t)
tests := []struct {
eps []*endpoint
rt http.RoundTripper
want int
}{
// no endpoints available so no requests are even made
{
eps: []*endpoint{},
rt: &staticRoundTripper{
res: &http.Response{
StatusCode: http.StatusCreated,
Body: io.NopCloser(&bytes.Reader{}),
},
},
want: http.StatusServiceUnavailable,
},
// error is returned from one endpoint that should be available
{
eps: []*endpoint{{URL: u, Available: true}},
rt: &staticRoundTripper{err: errors.New("what a bad trip")},
want: http.StatusBadGateway,
},
// endpoint is available and returns success
{
eps: []*endpoint{{URL: u, Available: true}},
rt: &staticRoundTripper{
res: &http.Response{
StatusCode: http.StatusCreated,
Body: io.NopCloser(&bytes.Reader{}),
Header: map[string][]string{"Content-Type": {"application/json"}},
},
},
want: http.StatusCreated,
},
}
for i, tt := range tests {
rp := reverseProxy{
lg: lg,
director: &director{lg: lg, ep: tt.eps},
transport: tt.rt,
}
req, _ := http.NewRequest("GET", "http://192.0.2.2:2379", nil)
rr := httptest.NewRecorder()
rp.ServeHTTP(rr, req)
if rr.Code != tt.want {
t.Errorf("#%d: unexpected HTTP status code: want = %d, got = %d", i, tt.want, rr.Code)
}
if gct := rr.Header().Get("Content-Type"); gct != "application/json" {
t.Errorf("#%d: Content-Type = %s, want %s", i, gct, "application/json")
}
}
}
func TestRedirectRequest(t *testing.T) {
loc := url.URL{
Scheme: "http",
Host: "bar.example.com",
}
req := &http.Request{
Method: "GET",
Host: "foo.example.com",
URL: &url.URL{
Host: "foo.example.com",
Path: "/v2/keys/baz",
},
}
redirectRequest(req, loc)
want := &http.Request{
Method: "GET",
// this field must not change
Host: "foo.example.com",
URL: &url.URL{
// the Scheme field is updated to that of the provided URL
Scheme: "http",
// the Host field is updated to that of the provided URL
Host: "bar.example.com",
Path: "/v2/keys/baz",
},
}
if !reflect.DeepEqual(want, req) {
t.Fatalf("HTTP request does not match expected criteria: want=%#v got=%#v", want, req)
}
}
func TestMaybeSetForwardedFor(t *testing.T) {
tests := []struct {
raddr string
fwdFor string
want string
}{
{"192.0.2.3:8002", "", "192.0.2.3"},
{"192.0.2.3:8002", "192.0.2.2", "192.0.2.2, 192.0.2.3"},
{"192.0.2.3:8002", "192.0.2.1, 192.0.2.2", "192.0.2.1, 192.0.2.2, 192.0.2.3"},
{"example.com:8002", "", "example.com"},
// While these cases look valid, golang net/http will not let it happen
// The RemoteAddr field will always be a valid host:port
{":8002", "", ""},
{"192.0.2.3", "", ""},
// blatantly invalid host w/o a port
{"12", "", ""},
{"12", "192.0.2.3", "192.0.2.3"},
}
for i, tt := range tests {
req := &http.Request{
RemoteAddr: tt.raddr,
Header: make(http.Header),
}
if tt.fwdFor != "" {
req.Header.Set("X-Forwarded-For", tt.fwdFor)
}
maybeSetForwardedFor(req)
got := req.Header.Get("X-Forwarded-For")
if tt.want != got {
t.Errorf("#%d: incorrect header: want = %q, got = %q", i, tt.want, got)
}
}
}
func TestRemoveSingleHopHeaders(t *testing.T) {
hdr := http.Header(map[string][]string{
// single-hop headers that should be removed
"Connection": {"close"},
"Keep-Alive": {"foo"},
"Proxy-Authenticate": {"Basic realm=example.com"},
"Proxy-Authorization": {"foo"},
"Te": {"deflate,gzip"},
"Trailers": {"ETag"},
"Transfer-Encoding": {"chunked"},
"Upgrade": {"WebSocket"},
// headers that should persist
"Accept": {"application/json"},
"X-Foo": {"Bar"},
})
removeSingleHopHeaders(&hdr)
want := http.Header(map[string][]string{
"Accept": {"application/json"},
"X-Foo": {"Bar"},
})
if !reflect.DeepEqual(want, hdr) {
t.Fatalf("unexpected result: want = %#v, got = %#v", want, hdr)
}
}
func TestCopyHeader(t *testing.T) {
tests := []struct {
src http.Header
dst http.Header
want http.Header
}{
{
src: http.Header(map[string][]string{
"Foo": {"bar", "baz"},
}),
dst: http.Header(map[string][]string{}),
want: http.Header(map[string][]string{
"Foo": {"bar", "baz"},
}),
},
{
src: http.Header(map[string][]string{
"Foo": {"bar"},
"Ping": {"pong"},
}),
dst: http.Header(map[string][]string{}),
want: http.Header(map[string][]string{
"Foo": {"bar"},
"Ping": {"pong"},
}),
},
{
src: http.Header(map[string][]string{
"Foo": {"bar", "baz"},
}),
dst: http.Header(map[string][]string{
"Foo": {"qux"},
}),
want: http.Header(map[string][]string{
"Foo": {"qux", "bar", "baz"},
}),
},
}
for i, tt := range tests {
copyHeader(tt.dst, tt.src)
if !reflect.DeepEqual(tt.dst, tt.want) {
t.Errorf("#%d: unexpected headers: want = %v, got = %v", i, tt.want, tt.dst)
}
}
}

View File

@ -32,8 +32,9 @@ import (
type proxyEtcdProcess struct { type proxyEtcdProcess struct {
etcdProc EtcdProcess etcdProc EtcdProcess
proxyV2 *proxyV2Proc // TODO(ahrtr): We need to remove `proxyV2` and v2discovery when the v2client is removed.
proxyV3 *proxyV3Proc proxyV2 *proxyV2Proc
proxyV3 *proxyV3Proc
} }
func NewEtcdProcess(cfg *EtcdServerProcessConfig) (EtcdProcess, error) { func NewEtcdProcess(cfg *EtcdServerProcessConfig) (EtcdProcess, error) {
@ -65,9 +66,6 @@ func (p *proxyEtcdProcess) Start() error {
if err := p.etcdProc.Start(); err != nil { if err := p.etcdProc.Start(); err != nil {
return err return err
} }
if err := p.proxyV2.Start(); err != nil {
return err
}
return p.proxyV3.Start() return p.proxyV3.Start()
} }
@ -75,17 +73,11 @@ func (p *proxyEtcdProcess) Restart() error {
if err := p.etcdProc.Restart(); err != nil { if err := p.etcdProc.Restart(); err != nil {
return err return err
} }
if err := p.proxyV2.Restart(); err != nil {
return err
}
return p.proxyV3.Restart() return p.proxyV3.Restart()
} }
func (p *proxyEtcdProcess) Stop() error { func (p *proxyEtcdProcess) Stop() error {
err := p.proxyV2.Stop() err := p.proxyV3.Stop()
if v3err := p.proxyV3.Stop(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Stop(); eerr != nil && err == nil { if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
// fails on go-grpc issue #1384 // fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") { if !strings.Contains(eerr.Error(), "exit status 2") {
@ -96,10 +88,7 @@ func (p *proxyEtcdProcess) Stop() error {
} }
func (p *proxyEtcdProcess) Close() error { func (p *proxyEtcdProcess) Close() error {
err := p.proxyV2.Close() err := p.proxyV3.Close()
if v3err := p.proxyV3.Close(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Close(); eerr != nil && err == nil { if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
// fails on go-grpc issue #1384 // fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") { if !strings.Contains(eerr.Error(), "exit status 2") {
@ -110,7 +99,6 @@ func (p *proxyEtcdProcess) Close() error {
} }
func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal { func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
p.proxyV2.WithStopSignal(sig)
p.proxyV3.WithStopSignal(sig) p.proxyV3.WithStopSignal(sig)
return p.etcdProc.WithStopSignal(sig) return p.etcdProc.WithStopSignal(sig)
} }
@ -210,31 +198,6 @@ func newProxyV2Proc(cfg *EtcdServerProcessConfig) *proxyV2Proc {
} }
} }
func (v2p *proxyV2Proc) Start() error {
os.RemoveAll(v2p.dataDir)
if err := v2p.start(); err != nil {
return err
}
// The full line we are expecting in the logs:
// "caller":"httpproxy/director.go:65","msg":"endpoints found","endpoints":["http://localhost:20000"]}
return v2p.waitReady("endpoints found")
}
func (v2p *proxyV2Proc) Restart() error {
if err := v2p.Stop(); err != nil {
return err
}
return v2p.Start()
}
func (v2p *proxyV2Proc) Stop() error {
if err := v2p.proxyProc.Stop(); err != nil {
return err
}
// v2 proxy caches members; avoid reuse of directory
return os.RemoveAll(v2p.dataDir)
}
type proxyV3Proc struct { type proxyV3Proc struct {
proxyProc proxyProc
} }