// Copyright 2015 CoreOS, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package client import ( "errors" "fmt" "io/ioutil" "net/http" "net/url" "sync" "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" ) var ( ErrTimeout = context.DeadlineExceeded ErrCanceled = context.Canceled ErrNoEndpoints = errors.New("no endpoints available") ErrTooManyRedirects = errors.New("too many redirects") DefaultRequestTimeout = 5 * time.Second DefaultMaxRedirects = 10 ) func newHTTPClientFactory(tr CancelableTransport) httpClientFactory { return func(ep url.URL) HTTPClient { return &redirectFollowingHTTPClient{ max: DefaultMaxRedirects, client: &simpleHTTPClient{ transport: tr, endpoint: ep, }, } } } type Config struct { Endpoints []string Transport CancelableTransport } func New(cfg Config) (SyncableHTTPClient, error) { c := &httpClusterClient{clientFactory: newHTTPClientFactory(cfg.Transport)} if err := c.reset(cfg.Endpoints); err != nil { return nil, err } return c, nil } type SyncableHTTPClient interface { HTTPClient Sync(context.Context) error Endpoints() []string } type HTTPClient interface { Do(context.Context, httpAction) (*http.Response, []byte, error) } type httpClientFactory func(url.URL) HTTPClient type httpAction interface { HTTPRequest(url.URL) *http.Request } // CancelableTransport mimics http.Transport to provide an interface which can be // substituted for testing (since the RoundTripper interface alone does not // require the CancelRequest method) type CancelableTransport interface { http.RoundTripper CancelRequest(req *http.Request) } type httpClusterClient struct { clientFactory httpClientFactory endpoints []url.URL sync.RWMutex } func (c *httpClusterClient) reset(eps []string) error { if len(eps) == 0 { return ErrNoEndpoints } neps := make([]url.URL, len(eps)) for i, ep := range eps { u, err := url.Parse(ep) if err != nil { return err } neps[i] = *u } c.endpoints = neps return nil } func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (resp *http.Response, body []byte, err error) { c.RLock() leps := len(c.endpoints) eps := make([]url.URL, leps) n := copy(eps, c.endpoints) c.RUnlock() if leps == 0 { err = ErrNoEndpoints return } if leps != n { err = errors.New("unable to pick endpoint: copy failed") return } for _, ep := range eps { hc := c.clientFactory(ep) resp, body, err = hc.Do(ctx, act) if err != nil { if err == ErrTimeout || err == ErrCanceled { return nil, nil, err } continue } if resp.StatusCode/100 == 5 { continue } break } return } func (c *httpClusterClient) Endpoints() []string { c.RLock() defer c.RUnlock() eps := make([]string, len(c.endpoints)) for i, ep := range c.endpoints { eps[i] = ep.String() } return eps } func (c *httpClusterClient) Sync(ctx context.Context) error { c.Lock() defer c.Unlock() mAPI := NewMembersAPI(c) ms, err := mAPI.List(ctx) if err != nil { return err } eps := make([]string, 0) for _, m := range ms { eps = append(eps, m.ClientURLs...) } return c.reset(eps) } type roundTripResponse struct { resp *http.Response err error } type simpleHTTPClient struct { transport CancelableTransport endpoint url.URL } func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { req := act.HTTPRequest(c.endpoint) rtchan := make(chan roundTripResponse, 1) go func() { resp, err := c.transport.RoundTrip(req) rtchan <- roundTripResponse{resp: resp, err: err} close(rtchan) }() var resp *http.Response var err error select { case rtresp := <-rtchan: resp, err = rtresp.resp, rtresp.err case <-ctx.Done(): c.transport.CancelRequest(req) // wait for request to actually exit before continuing <-rtchan err = ctx.Err() } // always check for resp nil-ness to deal with possible // race conditions between channels above defer func() { if resp != nil { resp.Body.Close() } }() if err != nil { return nil, nil, err } body, err := ioutil.ReadAll(resp.Body) return resp, body, err } type redirectFollowingHTTPClient struct { client HTTPClient max int } func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { for i := 0; i <= r.max; i++ { resp, body, err := r.client.Do(ctx, act) if err != nil { return nil, nil, err } if resp.StatusCode/100 == 3 { hdr := resp.Header.Get("Location") if hdr == "" { return nil, nil, fmt.Errorf("Location header not set") } loc, err := url.Parse(hdr) if err != nil { return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr) } act = &redirectedHTTPAction{ action: act, location: *loc, } continue } return resp, body, nil } return nil, nil, ErrTooManyRedirects } type redirectedHTTPAction struct { action httpAction location url.URL } func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request { orig := r.action.HTTPRequest(ep) orig.URL = &r.location return orig }