From 6d824722750e7d7775697828e1866fa7ae8375ae Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Wed, 28 Jan 2015 11:44:38 -0800 Subject: [PATCH] client: move http.go into client.go --- client/client.go | 236 ++++++++++++++++++++++ client/{http_test.go => client_test.go} | 0 client/http.go | 257 ------------------------ 3 files changed, 236 insertions(+), 257 deletions(-) rename client/{http_test.go => client_test.go} (100%) delete mode 100644 client/http.go diff --git a/client/client.go b/client/client.go index c42003958..59826b097 100644 --- a/client/client.go +++ b/client/client.go @@ -16,6 +16,11 @@ package client import ( "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "sync" "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" @@ -35,3 +40,234 @@ var ( DefaultRequestTimeout = 5 * time.Second DefaultMaxRedirects = 10 ) + +type Config struct { + Endpoints []string + Transport CancelableTransport +} + +// CancelableTransport mimics http.Transport to provide an interface which can be +// substituted for testing (since the RoundTripper interface alone does not +// require the CancelRequest method) +type CancelableTransport interface { + http.RoundTripper + CancelRequest(req *http.Request) +} + +type Client interface { + Sync(context.Context) error + Endpoints() []string + + httpClient +} + +func New(cfg Config) (Client, error) { + c := &httpClusterClient{clientFactory: newHTTPClientFactory(cfg.Transport)} + if err := c.reset(cfg.Endpoints); err != nil { + return nil, err + } + return c, nil +} + +type httpClient interface { + Do(context.Context, httpAction) (*http.Response, []byte, error) +} + +func newHTTPClientFactory(tr CancelableTransport) httpClientFactory { + return func(ep url.URL) httpClient { + return &redirectFollowingHTTPClient{ + max: DefaultMaxRedirects, + client: &simpleHTTPClient{ + transport: tr, + endpoint: ep, + }, + } + } +} + +type httpClientFactory func(url.URL) httpClient + +type httpAction interface { + HTTPRequest(url.URL) *http.Request +} + +type httpClusterClient struct { + clientFactory httpClientFactory + endpoints []url.URL + sync.RWMutex +} + +func (c *httpClusterClient) reset(eps []string) error { + if len(eps) == 0 { + return ErrNoEndpoints + } + + neps := make([]url.URL, len(eps)) + for i, ep := range eps { + u, err := url.Parse(ep) + if err != nil { + return err + } + neps[i] = *u + } + + c.endpoints = neps + + return nil +} + +func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (resp *http.Response, body []byte, err error) { + c.RLock() + leps := len(c.endpoints) + eps := make([]url.URL, leps) + n := copy(eps, c.endpoints) + c.RUnlock() + + if leps == 0 { + err = ErrNoEndpoints + return + } + + if leps != n { + err = errors.New("unable to pick endpoint: copy failed") + return + } + + for _, ep := range eps { + hc := c.clientFactory(ep) + resp, body, err = hc.Do(ctx, act) + if err != nil { + if err == ErrTimeout || err == ErrCanceled { + return nil, nil, err + } + continue + } + if resp.StatusCode/100 == 5 { + continue + } + break + } + + return +} + +func (c *httpClusterClient) Endpoints() []string { + c.RLock() + defer c.RUnlock() + + eps := make([]string, len(c.endpoints)) + for i, ep := range c.endpoints { + eps[i] = ep.String() + } + + return eps +} + +func (c *httpClusterClient) Sync(ctx context.Context) error { + c.Lock() + defer c.Unlock() + + mAPI := NewMembersAPI(c) + ms, err := mAPI.List(ctx) + if err != nil { + return err + } + + eps := make([]string, 0) + for _, m := range ms { + eps = append(eps, m.ClientURLs...) + } + + return c.reset(eps) +} + +type roundTripResponse struct { + resp *http.Response + err error +} + +type simpleHTTPClient struct { + transport CancelableTransport + endpoint url.URL +} + +func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { + req := act.HTTPRequest(c.endpoint) + + rtchan := make(chan roundTripResponse, 1) + go func() { + resp, err := c.transport.RoundTrip(req) + rtchan <- roundTripResponse{resp: resp, err: err} + close(rtchan) + }() + + var resp *http.Response + var err error + + select { + case rtresp := <-rtchan: + resp, err = rtresp.resp, rtresp.err + case <-ctx.Done(): + c.transport.CancelRequest(req) + // wait for request to actually exit before continuing + <-rtchan + err = ctx.Err() + } + + // always check for resp nil-ness to deal with possible + // race conditions between channels above + defer func() { + if resp != nil { + resp.Body.Close() + } + }() + + if err != nil { + return nil, nil, err + } + + body, err := ioutil.ReadAll(resp.Body) + return resp, body, err +} + +type redirectFollowingHTTPClient struct { + client httpClient + max int +} + +func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { + for i := 0; i <= r.max; i++ { + resp, body, err := r.client.Do(ctx, act) + if err != nil { + return nil, nil, err + } + if resp.StatusCode/100 == 3 { + hdr := resp.Header.Get("Location") + if hdr == "" { + return nil, nil, fmt.Errorf("Location header not set") + } + loc, err := url.Parse(hdr) + if err != nil { + return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr) + } + act = &redirectedHTTPAction{ + action: act, + location: *loc, + } + continue + } + return resp, body, nil + } + return nil, nil, ErrTooManyRedirects +} + +type redirectedHTTPAction struct { + action httpAction + location url.URL +} + +func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request { + orig := r.action.HTTPRequest(ep) + orig.URL = &r.location + return orig +} diff --git a/client/http_test.go b/client/client_test.go similarity index 100% rename from client/http_test.go rename to client/client_test.go diff --git a/client/http.go b/client/http.go deleted file mode 100644 index 49a40b6be..000000000 --- a/client/http.go +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package client - -import ( - "errors" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "sync" - - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" -) - -func newHTTPClientFactory(tr CancelableTransport) httpClientFactory { - return func(ep url.URL) httpClient { - return &redirectFollowingHTTPClient{ - max: DefaultMaxRedirects, - client: &simpleHTTPClient{ - transport: tr, - endpoint: ep, - }, - } - } -} - -type Config struct { - Endpoints []string - Transport CancelableTransport -} - -func New(cfg Config) (Client, error) { - c := &httpClusterClient{clientFactory: newHTTPClientFactory(cfg.Transport)} - if err := c.reset(cfg.Endpoints); err != nil { - return nil, err - } - return c, nil -} - -type Client interface { - Sync(context.Context) error - Endpoints() []string - - httpClient -} - -type httpClient interface { - Do(context.Context, httpAction) (*http.Response, []byte, error) -} - -type httpClientFactory func(url.URL) httpClient - -type httpAction interface { - HTTPRequest(url.URL) *http.Request -} - -// CancelableTransport mimics http.Transport to provide an interface which can be -// substituted for testing (since the RoundTripper interface alone does not -// require the CancelRequest method) -type CancelableTransport interface { - http.RoundTripper - CancelRequest(req *http.Request) -} - -type httpClusterClient struct { - clientFactory httpClientFactory - endpoints []url.URL - sync.RWMutex -} - -func (c *httpClusterClient) reset(eps []string) error { - if len(eps) == 0 { - return ErrNoEndpoints - } - - neps := make([]url.URL, len(eps)) - for i, ep := range eps { - u, err := url.Parse(ep) - if err != nil { - return err - } - neps[i] = *u - } - - c.endpoints = neps - - return nil -} - -func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (resp *http.Response, body []byte, err error) { - c.RLock() - leps := len(c.endpoints) - eps := make([]url.URL, leps) - n := copy(eps, c.endpoints) - c.RUnlock() - - if leps == 0 { - err = ErrNoEndpoints - return - } - - if leps != n { - err = errors.New("unable to pick endpoint: copy failed") - return - } - - for _, ep := range eps { - hc := c.clientFactory(ep) - resp, body, err = hc.Do(ctx, act) - if err != nil { - if err == ErrTimeout || err == ErrCanceled { - return nil, nil, err - } - continue - } - if resp.StatusCode/100 == 5 { - continue - } - break - } - - return -} - -func (c *httpClusterClient) Endpoints() []string { - c.RLock() - defer c.RUnlock() - - eps := make([]string, len(c.endpoints)) - for i, ep := range c.endpoints { - eps[i] = ep.String() - } - - return eps -} - -func (c *httpClusterClient) Sync(ctx context.Context) error { - c.Lock() - defer c.Unlock() - - mAPI := NewMembersAPI(c) - ms, err := mAPI.List(ctx) - if err != nil { - return err - } - - eps := make([]string, 0) - for _, m := range ms { - eps = append(eps, m.ClientURLs...) - } - - return c.reset(eps) -} - -type roundTripResponse struct { - resp *http.Response - err error -} - -type simpleHTTPClient struct { - transport CancelableTransport - endpoint url.URL -} - -func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { - req := act.HTTPRequest(c.endpoint) - - rtchan := make(chan roundTripResponse, 1) - go func() { - resp, err := c.transport.RoundTrip(req) - rtchan <- roundTripResponse{resp: resp, err: err} - close(rtchan) - }() - - var resp *http.Response - var err error - - select { - case rtresp := <-rtchan: - resp, err = rtresp.resp, rtresp.err - case <-ctx.Done(): - c.transport.CancelRequest(req) - // wait for request to actually exit before continuing - <-rtchan - err = ctx.Err() - } - - // always check for resp nil-ness to deal with possible - // race conditions between channels above - defer func() { - if resp != nil { - resp.Body.Close() - } - }() - - if err != nil { - return nil, nil, err - } - - body, err := ioutil.ReadAll(resp.Body) - return resp, body, err -} - -type redirectFollowingHTTPClient struct { - client httpClient - max int -} - -func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { - for i := 0; i <= r.max; i++ { - resp, body, err := r.client.Do(ctx, act) - if err != nil { - return nil, nil, err - } - if resp.StatusCode/100 == 3 { - hdr := resp.Header.Get("Location") - if hdr == "" { - return nil, nil, fmt.Errorf("Location header not set") - } - loc, err := url.Parse(hdr) - if err != nil { - return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr) - } - act = &redirectedHTTPAction{ - action: act, - location: *loc, - } - continue - } - return resp, body, nil - } - return nil, nil, ErrTooManyRedirects -} - -type redirectedHTTPAction struct { - action httpAction - location url.URL -} - -func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request { - orig := r.action.HTTPRequest(ep) - orig.URL = &r.location - return orig -}