mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
proxy: added endpoint refresh and timeout configuration values
the default dial timeout was set to 30 seconds this made the proxy a pain to use in failure scenarios. fixes 2862
This commit is contained in:
parent
8e7fa9e201
commit
1264dbe24d
@ -105,6 +105,27 @@ To start etcd automatically using custom settings at startup in Linux, using a [
|
||||
+ Proxy mode setting ("off", "readonly" or "on").
|
||||
+ default: "off"
|
||||
|
||||
##### -proxy-failure-wait
|
||||
+ Time (in milliseconds) an endpoint will be held in a failed state.
|
||||
+ default: 5000
|
||||
|
||||
##### -proxy-refresh-interval
|
||||
+ Time (in milliseconds) of the endpoints refresh interval.
|
||||
+ default: 30000
|
||||
|
||||
##### -proxy-dial-timeout
|
||||
+ Time (in milliseconds) for a dial to timeout.
|
||||
+ default: 1000
|
||||
|
||||
##### -proxy-write-timeout
|
||||
+ Time (in milliseconds) for a write to timeout.
|
||||
+ default: 5000
|
||||
|
||||
##### -proxy-read-timeout
|
||||
+ Time (in milliseconds) for a read to timeout.
|
||||
+ Don't change this value if you use watches because they are using long polling requests.
|
||||
+ default: 0
|
||||
|
||||
### Security Flags
|
||||
|
||||
The security flags help to [build a secure etcd cluster][security].
|
||||
|
@ -92,7 +92,12 @@ type config struct {
|
||||
initialClusterToken string
|
||||
|
||||
// proxy
|
||||
proxy *flags.StringsFlag
|
||||
proxy *flags.StringsFlag
|
||||
proxyFailureWaitMs uint
|
||||
proxyRefreshIntervalMs uint
|
||||
proxyDialTimeoutMs uint
|
||||
proxyWriteTimeoutMs uint
|
||||
proxyReadTimeoutMs uint
|
||||
|
||||
// security
|
||||
clientTLSInfo, peerTLSInfo transport.TLSInfo
|
||||
@ -172,6 +177,11 @@ func NewConfig() *config {
|
||||
// Should never happen.
|
||||
plog.Panicf("unexpected error setting up proxyFlag: %v", err)
|
||||
}
|
||||
fs.UintVar(&cfg.proxyFailureWaitMs, "proxy-failure-wait", 5000, "Time (in milliseconds) an endpoint will be held in a failed state.")
|
||||
fs.UintVar(&cfg.proxyRefreshIntervalMs, "proxy-refresh-interval", 30000, "Time (in milliseconds) of the endpoints refresh interval.")
|
||||
fs.UintVar(&cfg.proxyDialTimeoutMs, "proxy-dial-timeout", 1000, "Time (in milliseconds) for a dial to timeout.")
|
||||
fs.UintVar(&cfg.proxyWriteTimeoutMs, "proxy-write-timeout", 5000, "Time (in milliseconds) for a write to timeout.")
|
||||
fs.UintVar(&cfg.proxyReadTimeoutMs, "proxy-read-timeout", 0, "Time (in milliseconds) for a read to timeout.")
|
||||
|
||||
// security
|
||||
fs.StringVar(&cfg.clientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.")
|
||||
|
@ -248,13 +248,13 @@ func startProxy(cfg *config) error {
|
||||
}
|
||||
}
|
||||
|
||||
pt, err := transport.NewTransport(cfg.clientTLSInfo)
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
|
||||
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tr, err := transport.NewTransport(cfg.peerTLSInfo)
|
||||
tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -323,7 +323,7 @@ func startProxy(cfg *config) error {
|
||||
|
||||
return clientURLs
|
||||
}
|
||||
ph := proxy.NewHandler(pt, uf)
|
||||
ph := proxy.NewHandler(pt, uf, time.Duration(cfg.proxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.proxyRefreshIntervalMs)*time.Millisecond)
|
||||
ph = &cors.CORSHandler{
|
||||
Handler: ph,
|
||||
Info: cfg.corsInfo,
|
||||
|
@ -72,6 +72,16 @@ proxy flags:
|
||||
|
||||
--proxy 'off'
|
||||
proxy mode setting ('off', 'readonly' or 'on').
|
||||
--proxy-failure-wait 5000
|
||||
time (in milliseconds) an endpoint will be held in a failed state.
|
||||
--proxy-refresh-interval 30000
|
||||
time (in milliseconds) of the endpoints refresh interval.
|
||||
--proxy-dial-timeout 1000
|
||||
time (in milliseconds) for a dial to timeout.
|
||||
--proxy-write-timeout 5000
|
||||
time (in milliseconds) for a write to timeout.
|
||||
--proxy-read-timeout 0
|
||||
time (in milliseconds) for a read to timeout.
|
||||
|
||||
|
||||
security flags:
|
||||
|
@ -22,24 +22,17 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// amount of time an endpoint will be held in a failed
|
||||
// state before being reconsidered for proxied requests
|
||||
endpointFailureWait = 5 * time.Second
|
||||
|
||||
// how often the proxy will attempt to refresh its set of endpoints
|
||||
refreshEndpoints = 30 * time.Second
|
||||
)
|
||||
|
||||
func newDirector(urlsFunc GetProxyURLs) *director {
|
||||
func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
|
||||
d := &director{
|
||||
uf: urlsFunc,
|
||||
uf: urlsFunc,
|
||||
failureWait: failureWait,
|
||||
refreshInterval: refreshInterval,
|
||||
}
|
||||
d.refresh()
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(refreshEndpoints):
|
||||
case <-time.After(refreshInterval):
|
||||
d.refresh()
|
||||
}
|
||||
}
|
||||
@ -49,8 +42,10 @@ func newDirector(urlsFunc GetProxyURLs) *director {
|
||||
|
||||
type director struct {
|
||||
sync.Mutex
|
||||
ep []*endpoint
|
||||
uf GetProxyURLs
|
||||
ep []*endpoint
|
||||
uf GetProxyURLs
|
||||
failureWait time.Duration
|
||||
refreshInterval time.Duration
|
||||
}
|
||||
|
||||
func (d *director) refresh() {
|
||||
@ -64,7 +59,7 @@ func (d *director) refresh() {
|
||||
log.Printf("proxy: upstream URL invalid: %v", err)
|
||||
continue
|
||||
}
|
||||
endpoints = append(endpoints, newEndpoint(*uu))
|
||||
endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
|
||||
}
|
||||
|
||||
// shuffle array to avoid connections being "stuck" to a single endpoint
|
||||
@ -89,11 +84,11 @@ func (d *director) endpoints() []*endpoint {
|
||||
return filtered
|
||||
}
|
||||
|
||||
func newEndpoint(u url.URL) *endpoint {
|
||||
func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
|
||||
ep := endpoint{
|
||||
URL: u,
|
||||
Available: true,
|
||||
failFunc: timedUnavailabilityFunc(endpointFailureWait),
|
||||
failFunc: timedUnavailabilityFunc(failureWait),
|
||||
}
|
||||
|
||||
return &ep
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewDirectorScheme(t *testing.T) {
|
||||
@ -52,7 +53,7 @@ func TestNewDirectorScheme(t *testing.T) {
|
||||
uf := func() []string {
|
||||
return tt.urls
|
||||
}
|
||||
got := newDirector(uf)
|
||||
got := newDirector(uf, time.Minute, time.Minute)
|
||||
|
||||
var gep []string
|
||||
for _, ep := range got.ep {
|
||||
|
@ -16,6 +16,7 @@ package proxy
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -38,9 +39,9 @@ type GetProxyURLs func() []string
|
||||
// NewHandler creates a new HTTP handler, listening on the given transport,
|
||||
// which will proxy requests to an etcd cluster.
|
||||
// The handler will periodically update its view of the cluster.
|
||||
func NewHandler(t *http.Transport, urlsFunc GetProxyURLs) http.Handler {
|
||||
func NewHandler(t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler {
|
||||
return &reverseProxy{
|
||||
director: newDirector(urlsFunc),
|
||||
director: newDirector(urlsFunc, failureWait, refreshInterval),
|
||||
transport: t,
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user