diff --git a/Documentation/configuration.md b/Documentation/configuration.md index 70ecec7bd..ada1bbcbe 100644 --- a/Documentation/configuration.md +++ b/Documentation/configuration.md @@ -107,6 +107,27 @@ To start etcd automatically using custom settings at startup in Linux, using a [ + Proxy mode setting ("off", "readonly" or "on"). + default: "off" +##### -proxy-failure-wait ++ Time (in milliseconds) an endpoint will be held in a failed state before being reconsidered for proxied requests. ++ default: 5000 + +##### -proxy-refresh-interval ++ Time (in milliseconds) of the endpoints refresh interval. ++ default: 30000 + +##### -proxy-dial-timeout ++ Time (in milliseconds) for a dial to timeout or 0 to disable the timeout ++ default: 1000 + +##### -proxy-write-timeout ++ Time (in milliseconds) for a write to timeout or 0 to disable the timeout. ++ default: 5000 + +##### -proxy-read-timeout ++ Time (in milliseconds) for a read to timeout or 0 to disable the timeout. ++ Don't change this value if you use watches because they are using long polling requests. ++ default: 0 + ### Security Flags The security flags help to [build a secure etcd cluster][security]. diff --git a/etcdmain/config.go b/etcdmain/config.go index f3c28b66d..350806a23 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -92,7 +92,12 @@ type config struct { initialClusterToken string // proxy - proxy *flags.StringsFlag + proxy *flags.StringsFlag + proxyFailureWaitMs uint + proxyRefreshIntervalMs uint + proxyDialTimeoutMs uint + proxyWriteTimeoutMs uint + proxyReadTimeoutMs uint // security clientTLSInfo, peerTLSInfo transport.TLSInfo @@ -172,6 +177,11 @@ func NewConfig() *config { // Should never happen. plog.Panicf("unexpected error setting up proxyFlag: %v", err) } + fs.UintVar(&cfg.proxyFailureWaitMs, "proxy-failure-wait", 5000, "Time (in milliseconds) an endpoint will be held in a failed state.") + fs.UintVar(&cfg.proxyRefreshIntervalMs, "proxy-refresh-interval", 30000, "Time (in milliseconds) of the endpoints refresh interval.") + fs.UintVar(&cfg.proxyDialTimeoutMs, "proxy-dial-timeout", 1000, "Time (in milliseconds) for a dial to timeout.") + fs.UintVar(&cfg.proxyWriteTimeoutMs, "proxy-write-timeout", 5000, "Time (in milliseconds) for a write to timeout.") + fs.UintVar(&cfg.proxyReadTimeoutMs, "proxy-read-timeout", 0, "Time (in milliseconds) for a read to timeout.") // security fs.StringVar(&cfg.clientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.") diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index c4619492c..e8fcd7dd7 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -248,13 +248,13 @@ func startProxy(cfg *config) error { } } - pt, err := transport.NewTransport(cfg.clientTLSInfo) + pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond) pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost if err != nil { return err } - tr, err := transport.NewTransport(cfg.peerTLSInfo) + tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond) if err != nil { return err } @@ -323,7 +323,7 @@ func startProxy(cfg *config) error { return clientURLs } - ph := proxy.NewHandler(pt, uf) + ph := proxy.NewHandler(pt, uf, time.Duration(cfg.proxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.proxyRefreshIntervalMs)*time.Millisecond) ph = &cors.CORSHandler{ Handler: ph, Info: cfg.corsInfo, diff --git a/etcdmain/help.go b/etcdmain/help.go index 5e0aa863d..b8a3095d8 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -72,6 +72,16 @@ proxy flags: --proxy 'off' proxy mode setting ('off', 'readonly' or 'on'). + --proxy-failure-wait 5000 + time (in milliseconds) an endpoint will be held in a failed state. + --proxy-refresh-interval 30000 + time (in milliseconds) of the endpoints refresh interval. + --proxy-dial-timeout 1000 + time (in milliseconds) for a dial to timeout. + --proxy-write-timeout 5000 + time (in milliseconds) for a write to timeout. + --proxy-read-timeout 0 + time (in milliseconds) for a read to timeout. security flags: diff --git a/proxy/director.go b/proxy/director.go index e795b27fb..0cfe37d89 100644 --- a/proxy/director.go +++ b/proxy/director.go @@ -22,24 +22,16 @@ import ( "time" ) -const ( - // amount of time an endpoint will be held in a failed - // state before being reconsidered for proxied requests - endpointFailureWait = 5 * time.Second - - // how often the proxy will attempt to refresh its set of endpoints - refreshEndpoints = 30 * time.Second -) - -func newDirector(urlsFunc GetProxyURLs) *director { +func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director { d := &director{ - uf: urlsFunc, + uf: urlsFunc, + failureWait: failureWait, } d.refresh() go func() { for { select { - case <-time.After(refreshEndpoints): + case <-time.After(refreshInterval): d.refresh() } } @@ -49,8 +41,10 @@ func newDirector(urlsFunc GetProxyURLs) *director { type director struct { sync.Mutex - ep []*endpoint - uf GetProxyURLs + ep []*endpoint + uf GetProxyURLs + failureWait time.Duration + refreshInterval time.Duration } func (d *director) refresh() { @@ -64,7 +58,7 @@ func (d *director) refresh() { log.Printf("proxy: upstream URL invalid: %v", err) continue } - endpoints = append(endpoints, newEndpoint(*uu)) + endpoints = append(endpoints, newEndpoint(*uu, d.failureWait)) } // shuffle array to avoid connections being "stuck" to a single endpoint @@ -89,11 +83,11 @@ func (d *director) endpoints() []*endpoint { return filtered } -func newEndpoint(u url.URL) *endpoint { +func newEndpoint(u url.URL, failureWait time.Duration) *endpoint { ep := endpoint{ URL: u, Available: true, - failFunc: timedUnavailabilityFunc(endpointFailureWait), + failFunc: timedUnavailabilityFunc(failureWait), } return &ep diff --git a/proxy/director_test.go b/proxy/director_test.go index 6287091bf..0d56ea961 100644 --- a/proxy/director_test.go +++ b/proxy/director_test.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "testing" + "time" ) func TestNewDirectorScheme(t *testing.T) { @@ -52,7 +53,7 @@ func TestNewDirectorScheme(t *testing.T) { uf := func() []string { return tt.urls } - got := newDirector(uf) + got := newDirector(uf, time.Minute, time.Minute) var gep []string for _, ep := range got.ep { diff --git a/proxy/proxy.go b/proxy/proxy.go index 299e9709d..c451cbe0d 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -16,6 +16,7 @@ package proxy import ( "net/http" + "time" ) const ( @@ -38,9 +39,9 @@ type GetProxyURLs func() []string // NewHandler creates a new HTTP handler, listening on the given transport, // which will proxy requests to an etcd cluster. // The handler will periodically update its view of the cluster. -func NewHandler(t *http.Transport, urlsFunc GetProxyURLs) http.Handler { +func NewHandler(t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler { return &reverseProxy{ - director: newDirector(urlsFunc), + director: newDirector(urlsFunc, failureWait, refreshInterval), transport: t, } }