etcd/client/http.go
2015-02-28 10:22:50 -08:00

269 lines
5.6 KiB
Go

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sync"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)
var (
ErrTimeout = context.DeadlineExceeded
ErrCanceled = context.Canceled
ErrNoEndpoints = errors.New("no endpoints available")
ErrTooManyRedirects = errors.New("too many redirects")
DefaultRequestTimeout = 5 * time.Second
DefaultMaxRedirects = 10
)
func defaultHTTPClientFactory(tr CancelableTransport, ep url.URL) HTTPClient {
return &redirectFollowingHTTPClient{
max: DefaultMaxRedirects,
client: &httpClient{
transport: tr,
endpoint: ep,
},
}
}
type ClientConfig struct {
Endpoints []string
Transport CancelableTransport
}
func New(cfg ClientConfig) (SyncableHTTPClient, error) {
c := &httpClusterClient{clientFactory: defaultHTTPClientFactory}
if err := c.reset(cfg.Transport, cfg.Endpoints); err != nil {
return nil, err
}
return c, nil
}
type SyncableHTTPClient interface {
HTTPClient
Sync(context.Context) error
Endpoints() []string
}
type HTTPClient interface {
Do(context.Context, HTTPAction) (*http.Response, []byte, error)
}
type httpClientFactory func(CancelableTransport, url.URL) HTTPClient
type HTTPAction interface {
HTTPRequest(url.URL) *http.Request
}
// CancelableTransport mimics http.Transport to provide an interface which can be
// substituted for testing (since the RoundTripper interface alone does not
// require the CancelRequest method)
type CancelableTransport interface {
http.RoundTripper
CancelRequest(req *http.Request)
}
type httpClusterClient struct {
clientFactory httpClientFactory
transport CancelableTransport
endpoints []url.URL
sync.RWMutex
}
func (c *httpClusterClient) reset(tr CancelableTransport, eps []string) error {
if len(eps) == 0 {
return ErrNoEndpoints
}
neps := make([]url.URL, len(eps))
for i, ep := range eps {
u, err := url.Parse(ep)
if err != nil {
return err
}
neps[i] = *u
}
c.endpoints = neps
c.transport = tr
return nil
}
func (c *httpClusterClient) Do(ctx context.Context, act HTTPAction) (resp *http.Response, body []byte, err error) {
c.RLock()
leps := len(c.endpoints)
eps := make([]url.URL, leps)
n := copy(eps, c.endpoints)
tr := c.transport
c.RUnlock()
if leps == 0 {
err = ErrNoEndpoints
return
}
if leps != n {
err = errors.New("unable to pick endpoint: copy failed")
return
}
for _, ep := range eps {
hc := c.clientFactory(tr, ep)
resp, body, err = hc.Do(ctx, act)
if err != nil {
if err == ErrTimeout || err == ErrCanceled {
return nil, nil, err
}
continue
}
if resp.StatusCode/100 == 5 {
continue
}
break
}
return
}
func (c *httpClusterClient) Endpoints() []string {
c.RLock()
defer c.RUnlock()
eps := make([]string, len(c.endpoints))
for i, ep := range c.endpoints {
eps[i] = ep.String()
}
return eps
}
func (c *httpClusterClient) Sync(ctx context.Context) error {
c.Lock()
defer c.Unlock()
mAPI := NewMembersAPI(c)
ms, err := mAPI.List(ctx)
if err != nil {
return err
}
eps := make([]string, 0)
for _, m := range ms {
eps = append(eps, m.ClientURLs...)
}
return c.reset(c.transport, eps)
}
type roundTripResponse struct {
resp *http.Response
err error
}
type httpClient struct {
transport CancelableTransport
endpoint url.URL
}
func (c *httpClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
req := act.HTTPRequest(c.endpoint)
rtchan := make(chan roundTripResponse, 1)
go func() {
resp, err := c.transport.RoundTrip(req)
rtchan <- roundTripResponse{resp: resp, err: err}
close(rtchan)
}()
var resp *http.Response
var err error
select {
case rtresp := <-rtchan:
resp, err = rtresp.resp, rtresp.err
case <-ctx.Done():
c.transport.CancelRequest(req)
// wait for request to actually exit before continuing
<-rtchan
err = ctx.Err()
}
// always check for resp nil-ness to deal with possible
// race conditions between channels above
defer func() {
if resp != nil {
resp.Body.Close()
}
}()
if err != nil {
return nil, nil, err
}
body, err := ioutil.ReadAll(resp.Body)
return resp, body, err
}
type redirectFollowingHTTPClient struct {
client HTTPClient
max int
}
func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
for i := 0; i <= r.max; i++ {
resp, body, err := r.client.Do(ctx, act)
if err != nil {
return nil, nil, err
}
if resp.StatusCode/100 == 3 {
hdr := resp.Header.Get("Location")
if hdr == "" {
return nil, nil, fmt.Errorf("Location header not set")
}
loc, err := url.Parse(hdr)
if err != nil {
return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr)
}
act = &redirectedHTTPAction{
action: act,
location: *loc,
}
continue
}
return resp, body, nil
}
return nil, nil, ErrTooManyRedirects
}
type redirectedHTTPAction struct {
action HTTPAction
location url.URL
}
func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
orig := r.action.HTTPRequest(ep)
orig.URL = &r.location
return orig
}