Merge pull request #2967 from webner/feature/proxy-config

proxy: added endpoint refresh and timeout configuration values
This commit is contained in:
Xiang Li 2015-07-03 11:51:15 -07:00
commit cbe00e4415
7 changed files with 61 additions and 24 deletions

View File

@ -107,6 +107,27 @@ To start etcd automatically using custom settings at startup in Linux, using a [
+ Proxy mode setting ("off", "readonly" or "on"). + Proxy mode setting ("off", "readonly" or "on").
+ default: "off" + default: "off"
##### -proxy-failure-wait
+ Time (in milliseconds) an endpoint will be held in a failed state before being reconsidered for proxied requests.
+ default: 5000
##### -proxy-refresh-interval
+ Time (in milliseconds) of the endpoints refresh interval.
+ default: 30000
##### -proxy-dial-timeout
+ Time (in milliseconds) for a dial to timeout or 0 to disable the timeout
+ default: 1000
##### -proxy-write-timeout
+ Time (in milliseconds) for a write to timeout or 0 to disable the timeout.
+ default: 5000
##### -proxy-read-timeout
+ Time (in milliseconds) for a read to timeout or 0 to disable the timeout.
+ Don't change this value if you use watches because they are using long polling requests.
+ default: 0
### Security Flags ### Security Flags
The security flags help to [build a secure etcd cluster][security]. The security flags help to [build a secure etcd cluster][security].

View File

@ -93,6 +93,11 @@ type config struct {
// proxy // proxy
proxy *flags.StringsFlag proxy *flags.StringsFlag
proxyFailureWaitMs uint
proxyRefreshIntervalMs uint
proxyDialTimeoutMs uint
proxyWriteTimeoutMs uint
proxyReadTimeoutMs uint
// security // security
clientTLSInfo, peerTLSInfo transport.TLSInfo clientTLSInfo, peerTLSInfo transport.TLSInfo
@ -172,6 +177,11 @@ func NewConfig() *config {
// Should never happen. // Should never happen.
plog.Panicf("unexpected error setting up proxyFlag: %v", err) plog.Panicf("unexpected error setting up proxyFlag: %v", err)
} }
fs.UintVar(&cfg.proxyFailureWaitMs, "proxy-failure-wait", 5000, "Time (in milliseconds) an endpoint will be held in a failed state.")
fs.UintVar(&cfg.proxyRefreshIntervalMs, "proxy-refresh-interval", 30000, "Time (in milliseconds) of the endpoints refresh interval.")
fs.UintVar(&cfg.proxyDialTimeoutMs, "proxy-dial-timeout", 1000, "Time (in milliseconds) for a dial to timeout.")
fs.UintVar(&cfg.proxyWriteTimeoutMs, "proxy-write-timeout", 5000, "Time (in milliseconds) for a write to timeout.")
fs.UintVar(&cfg.proxyReadTimeoutMs, "proxy-read-timeout", 0, "Time (in milliseconds) for a read to timeout.")
// security // security
fs.StringVar(&cfg.clientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.") fs.StringVar(&cfg.clientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.")

View File

@ -248,13 +248,13 @@ func startProxy(cfg *config) error {
} }
} }
pt, err := transport.NewTransport(cfg.clientTLSInfo) pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
if err != nil { if err != nil {
return err return err
} }
tr, err := transport.NewTransport(cfg.peerTLSInfo) tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
if err != nil { if err != nil {
return err return err
} }
@ -323,7 +323,7 @@ func startProxy(cfg *config) error {
return clientURLs return clientURLs
} }
ph := proxy.NewHandler(pt, uf) ph := proxy.NewHandler(pt, uf, time.Duration(cfg.proxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.proxyRefreshIntervalMs)*time.Millisecond)
ph = &cors.CORSHandler{ ph = &cors.CORSHandler{
Handler: ph, Handler: ph,
Info: cfg.corsInfo, Info: cfg.corsInfo,

View File

@ -72,6 +72,16 @@ proxy flags:
--proxy 'off' --proxy 'off'
proxy mode setting ('off', 'readonly' or 'on'). proxy mode setting ('off', 'readonly' or 'on').
--proxy-failure-wait 5000
time (in milliseconds) an endpoint will be held in a failed state.
--proxy-refresh-interval 30000
time (in milliseconds) of the endpoints refresh interval.
--proxy-dial-timeout 1000
time (in milliseconds) for a dial to timeout.
--proxy-write-timeout 5000
time (in milliseconds) for a write to timeout.
--proxy-read-timeout 0
time (in milliseconds) for a read to timeout.
security flags: security flags:

View File

@ -22,24 +22,16 @@ import (
"time" "time"
) )
const ( func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
// amount of time an endpoint will be held in a failed
// state before being reconsidered for proxied requests
endpointFailureWait = 5 * time.Second
// how often the proxy will attempt to refresh its set of endpoints
refreshEndpoints = 30 * time.Second
)
func newDirector(urlsFunc GetProxyURLs) *director {
d := &director{ d := &director{
uf: urlsFunc, uf: urlsFunc,
failureWait: failureWait,
} }
d.refresh() d.refresh()
go func() { go func() {
for { for {
select { select {
case <-time.After(refreshEndpoints): case <-time.After(refreshInterval):
d.refresh() d.refresh()
} }
} }
@ -51,6 +43,8 @@ type director struct {
sync.Mutex sync.Mutex
ep []*endpoint ep []*endpoint
uf GetProxyURLs uf GetProxyURLs
failureWait time.Duration
refreshInterval time.Duration
} }
func (d *director) refresh() { func (d *director) refresh() {
@ -64,7 +58,7 @@ func (d *director) refresh() {
log.Printf("proxy: upstream URL invalid: %v", err) log.Printf("proxy: upstream URL invalid: %v", err)
continue continue
} }
endpoints = append(endpoints, newEndpoint(*uu)) endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
} }
// shuffle array to avoid connections being "stuck" to a single endpoint // shuffle array to avoid connections being "stuck" to a single endpoint
@ -89,11 +83,11 @@ func (d *director) endpoints() []*endpoint {
return filtered return filtered
} }
func newEndpoint(u url.URL) *endpoint { func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
ep := endpoint{ ep := endpoint{
URL: u, URL: u,
Available: true, Available: true,
failFunc: timedUnavailabilityFunc(endpointFailureWait), failFunc: timedUnavailabilityFunc(failureWait),
} }
return &ep return &ep

View File

@ -19,6 +19,7 @@ import (
"reflect" "reflect"
"sort" "sort"
"testing" "testing"
"time"
) )
func TestNewDirectorScheme(t *testing.T) { func TestNewDirectorScheme(t *testing.T) {
@ -52,7 +53,7 @@ func TestNewDirectorScheme(t *testing.T) {
uf := func() []string { uf := func() []string {
return tt.urls return tt.urls
} }
got := newDirector(uf) got := newDirector(uf, time.Minute, time.Minute)
var gep []string var gep []string
for _, ep := range got.ep { for _, ep := range got.ep {

View File

@ -16,6 +16,7 @@ package proxy
import ( import (
"net/http" "net/http"
"time"
) )
const ( const (
@ -38,9 +39,9 @@ type GetProxyURLs func() []string
// NewHandler creates a new HTTP handler, listening on the given transport, // NewHandler creates a new HTTP handler, listening on the given transport,
// which will proxy requests to an etcd cluster. // which will proxy requests to an etcd cluster.
// The handler will periodically update its view of the cluster. // The handler will periodically update its view of the cluster.
func NewHandler(t *http.Transport, urlsFunc GetProxyURLs) http.Handler { func NewHandler(t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler {
return &reverseProxy{ return &reverseProxy{
director: newDirector(urlsFunc), director: newDirector(urlsFunc, failureWait, refreshInterval),
transport: t, transport: t,
} }
} }