From ce4df96e69ba38e73f646de113ef6e092b712ea9 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Thu, 23 Oct 2014 17:11:04 -0700 Subject: [PATCH 1/3] client: break apart KeysAPI from httpClient --- client/client.go | 60 ------ client/http.go | 259 ++++---------------------- client/http_test.go | 329 --------------------------------- client/keys.go | 276 ++++++++++++++++++++++++++++ client/keys_test.go | 355 ++++++++++++++++++++++++++++++++++++ discovery/discovery.go | 6 +- discovery/discovery_test.go | 2 +- 7 files changed, 666 insertions(+), 621 deletions(-) delete mode 100644 client/client.go create mode 100644 client/keys.go create mode 100644 client/keys_test.go diff --git a/client/client.go b/client/client.go deleted file mode 100644 index 75b8c5e56..000000000 --- a/client/client.go +++ /dev/null @@ -1,60 +0,0 @@ -/* - Copyright 2014 CoreOS, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package client - -import ( - "errors" - "fmt" - "time" -) - -var ( - ErrUnavailable = errors.New("client: no available etcd endpoints") - ErrNoLeader = errors.New("client: no leader") - ErrKeyNoExist = errors.New("client: key does not exist") - ErrKeyExists = errors.New("client: key already exists") -) - -type Client interface { - Create(key, value string, ttl time.Duration) (*Response, error) - Get(key string) (*Response, error) - Watch(key string, idx uint64) Watcher - RecursiveWatch(key string, idx uint64) Watcher -} - -type Watcher interface { - Next() (*Response, error) -} - -type Response struct { - Action string `json:"action"` - Node *Node `json:"node"` - PrevNode *Node `json:"prevNode"` -} - -type Nodes []*Node -type Node struct { - Key string `json:"key"` - Value string `json:"value"` - Nodes Nodes `json:"nodes"` - ModifiedIndex uint64 `json:"modifiedIndex"` - CreatedIndex uint64 `json:"createdIndex"` -} - -func (n *Node) String() string { - return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex) -} diff --git a/client/http.go b/client/http.go index 4f94aba12..8e8a402c1 100644 --- a/client/http.go +++ b/client/http.go @@ -17,22 +17,16 @@ package client import ( - "encoding/json" - "fmt" "io/ioutil" "net/http" "net/url" - "path" - "strconv" - "strings" "time" "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" ) var ( - DefaultV2KeysPrefix = "/v2/keys" - ErrTimeout = context.DeadlineExceeded + ErrTimeout = context.DeadlineExceeded ) // transport mimics http.Transport to provide an interface which can be @@ -43,75 +37,8 @@ type transport interface { CancelRequest(req *http.Request) } -type httpClient struct { - transport transport - endpoint url.URL - timeout time.Duration - v2KeysPrefix string -} - -func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpClient, error) { - u, err := url.Parse(ep) - if err != nil { - return nil, err - } - - c := &httpClient{ - transport: tr, - endpoint: *u, - timeout: timeout, - v2KeysPrefix: DefaultV2KeysPrefix, - } - - return c, nil -} - -func (c *httpClient) SetPrefix(p string) { - c.v2KeysPrefix = p -} - -func (c *httpClient) Endpoint() url.URL { - ep := c.endpoint - ep.Path = path.Join(ep.Path, c.v2KeysPrefix) - return ep -} - -func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) { - create := &createAction{ - Key: key, - Value: val, - } - if ttl >= 0 { - uttl := uint64(ttl.Seconds()) - create.TTL = &uttl - } - - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) - httpresp, body, err := c.do(ctx, create) - cancel() - - if err != nil { - return nil, err - } - - return unmarshalHTTPResponse(httpresp.StatusCode, body) -} - -func (c *httpClient) Get(key string) (*Response, error) { - get := &getAction{ - Key: key, - Recursive: false, - } - - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) - httpresp, body, err := c.do(ctx, get) - cancel() - - if err != nil { - return nil, err - } - - return unmarshalHTTPResponse(httpresp.StatusCode, body) +type httpAction interface { + httpRequest(url.URL) *http.Request } type roundTripResponse struct { @@ -119,8 +46,35 @@ type roundTripResponse struct { err error } +type httpClient struct { + transport transport + endpoint url.URL + timeout time.Duration +} + +func newHTTPClient(tr *http.Transport, ep string, to time.Duration) (*httpClient, error) { + u, err := url.Parse(ep) + if err != nil { + return nil, err + } + + c := &httpClient{ + transport: tr, + endpoint: *u, + timeout: to, + } + + return c, nil +} + +func (c *httpClient) doWithTimeout(act httpAction) (*http.Response, []byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + return c.do(ctx, act) +} + func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { - req := act.httpRequest(c.Endpoint()) + req := act.httpRequest(c.endpoint) rtchan := make(chan roundTripResponse, 1) go func() { @@ -157,154 +111,3 @@ func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, [] body, err := ioutil.ReadAll(resp.Body) return resp, body, err } - -func (c *httpClient) Watch(key string, idx uint64) Watcher { - return &httpWatcher{ - httpClient: *c, - nextWait: waitAction{ - Key: key, - WaitIndex: idx, - Recursive: false, - }, - } -} - -func (c *httpClient) RecursiveWatch(key string, idx uint64) Watcher { - return &httpWatcher{ - httpClient: *c, - nextWait: waitAction{ - Key: key, - WaitIndex: idx, - Recursive: true, - }, - } -} - -type httpWatcher struct { - httpClient - nextWait waitAction -} - -func (hw *httpWatcher) Next() (*Response, error) { - httpresp, body, err := hw.httpClient.do(context.Background(), &hw.nextWait) - if err != nil { - return nil, err - } - - resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body) - if err != nil { - return nil, err - } - - hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1 - return resp, nil -} - -// v2KeysURL forms a URL representing the location of a key. The provided -// endpoint must be the root of the etcd keys API. For example, a valid -// endpoint probably has the path "/v2/keys". -func v2KeysURL(ep url.URL, key string) *url.URL { - ep.Path = path.Join(ep.Path, key) - return &ep -} - -type httpAction interface { - httpRequest(url.URL) *http.Request -} - -type getAction struct { - Key string - Recursive bool -} - -func (g *getAction) httpRequest(ep url.URL) *http.Request { - u := v2KeysURL(ep, g.Key) - - params := u.Query() - params.Set("recursive", strconv.FormatBool(g.Recursive)) - u.RawQuery = params.Encode() - - req, _ := http.NewRequest("GET", u.String(), nil) - return req -} - -type waitAction struct { - Key string - WaitIndex uint64 - Recursive bool -} - -func (w *waitAction) httpRequest(ep url.URL) *http.Request { - u := v2KeysURL(ep, w.Key) - - params := u.Query() - params.Set("wait", "true") - params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10)) - params.Set("recursive", strconv.FormatBool(w.Recursive)) - u.RawQuery = params.Encode() - - req, _ := http.NewRequest("GET", u.String(), nil) - return req -} - -type createAction struct { - Key string - Value string - TTL *uint64 -} - -func (c *createAction) httpRequest(ep url.URL) *http.Request { - u := v2KeysURL(ep, c.Key) - - params := u.Query() - params.Set("prevExist", "false") - u.RawQuery = params.Encode() - - form := url.Values{} - form.Add("value", c.Value) - if c.TTL != nil { - form.Add("ttl", strconv.FormatUint(*c.TTL, 10)) - } - body := strings.NewReader(form.Encode()) - - req, _ := http.NewRequest("PUT", u.String(), body) - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - - return req -} - -func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) { - switch code { - case http.StatusOK, http.StatusCreated: - res, err = unmarshalSuccessfulResponse(body) - default: - err = unmarshalErrorResponse(code) - } - - return -} - -func unmarshalSuccessfulResponse(body []byte) (*Response, error) { - var res Response - err := json.Unmarshal(body, &res) - if err != nil { - return nil, err - } - - return &res, nil -} - -func unmarshalErrorResponse(code int) error { - switch code { - case http.StatusNotFound: - return ErrKeyNoExist - case http.StatusPreconditionFailed: - return ErrKeyExists - case http.StatusInternalServerError: - // this isn't necessarily true - return ErrNoLeader - default: - } - - return fmt.Errorf("unrecognized HTTP status code %d", code) -} diff --git a/client/http_test.go b/client/http_test.go index 816d1b32b..16115958a 100644 --- a/client/http_test.go +++ b/client/http_test.go @@ -18,7 +18,6 @@ package client import ( "errors" - "fmt" "io/ioutil" "net/http" "net/url" @@ -30,334 +29,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" ) -func TestV2URLHelper(t *testing.T) { - tests := []struct { - endpoint url.URL - key string - want url.URL - }{ - // key is empty, no problem - { - endpoint: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"}, - key: "", - want: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"}, - }, - - // key is joined to path - { - endpoint: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"}, - key: "/foo/bar", - want: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys/foo/bar"}, - }, - - // key is joined to path when path is empty - { - endpoint: url.URL{Scheme: "http", Host: "example.com", Path: ""}, - key: "/foo/bar", - want: url.URL{Scheme: "http", Host: "example.com", Path: "/foo/bar"}, - }, - - // Host field carries through with port - { - endpoint: url.URL{Scheme: "http", Host: "example.com:8080", Path: "/v2/keys"}, - key: "", - want: url.URL{Scheme: "http", Host: "example.com:8080", Path: "/v2/keys"}, - }, - - // Scheme carries through - { - endpoint: url.URL{Scheme: "https", Host: "example.com", Path: "/v2/keys"}, - key: "", - want: url.URL{Scheme: "https", Host: "example.com", Path: "/v2/keys"}, - }, - } - - for i, tt := range tests { - got := v2KeysURL(tt.endpoint, tt.key) - if tt.want != *got { - t.Errorf("#%d: want=%#v, got=%#v", i, tt.want, *got) - } - } -} - -func TestGetAction(t *testing.T) { - ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"} - wantURL := &url.URL{ - Scheme: "http", - Host: "example.com", - Path: "/v2/keys/foo/bar", - } - wantHeader := http.Header{} - - tests := []struct { - recursive bool - wantQuery string - }{ - { - recursive: false, - wantQuery: "recursive=false", - }, - { - recursive: true, - wantQuery: "recursive=true", - }, - } - - for i, tt := range tests { - f := getAction{ - Key: "/foo/bar", - Recursive: tt.recursive, - } - got := *f.httpRequest(ep) - - wantURL := wantURL - wantURL.RawQuery = tt.wantQuery - - err := assertResponse(got, wantURL, wantHeader, nil) - if err != nil { - t.Errorf("#%d: %v", i, err) - } - } -} - -func TestWaitAction(t *testing.T) { - ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"} - wantURL := &url.URL{ - Scheme: "http", - Host: "example.com", - Path: "/v2/keys/foo/bar", - } - wantHeader := http.Header{} - - tests := []struct { - waitIndex uint64 - recursive bool - wantQuery string - }{ - { - recursive: false, - waitIndex: uint64(0), - wantQuery: "recursive=false&wait=true&waitIndex=0", - }, - { - recursive: false, - waitIndex: uint64(12), - wantQuery: "recursive=false&wait=true&waitIndex=12", - }, - { - recursive: true, - waitIndex: uint64(12), - wantQuery: "recursive=true&wait=true&waitIndex=12", - }, - } - - for i, tt := range tests { - f := waitAction{ - Key: "/foo/bar", - WaitIndex: tt.waitIndex, - Recursive: tt.recursive, - } - got := *f.httpRequest(ep) - - wantURL := wantURL - wantURL.RawQuery = tt.wantQuery - - err := assertResponse(got, wantURL, wantHeader, nil) - if err != nil { - t.Errorf("#%d: %v", i, err) - } - } -} - -func TestCreateAction(t *testing.T) { - ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"} - wantURL := &url.URL{ - Scheme: "http", - Host: "example.com", - Path: "/v2/keys/foo/bar", - RawQuery: "prevExist=false", - } - wantHeader := http.Header(map[string][]string{ - "Content-Type": []string{"application/x-www-form-urlencoded"}, - }) - - ttl12 := uint64(12) - tests := []struct { - value string - ttl *uint64 - wantBody string - }{ - { - value: "baz", - wantBody: "value=baz", - }, - { - value: "baz", - ttl: &ttl12, - wantBody: "ttl=12&value=baz", - }, - } - - for i, tt := range tests { - f := createAction{ - Key: "/foo/bar", - Value: tt.value, - TTL: tt.ttl, - } - got := *f.httpRequest(ep) - - err := assertResponse(got, wantURL, wantHeader, []byte(tt.wantBody)) - if err != nil { - t.Errorf("#%d: %v", i, err) - } - } -} - -func assertResponse(got http.Request, wantURL *url.URL, wantHeader http.Header, wantBody []byte) error { - if !reflect.DeepEqual(wantURL, got.URL) { - return fmt.Errorf("want.URL=%#v got.URL=%#v", wantURL, got.URL) - } - - if !reflect.DeepEqual(wantHeader, got.Header) { - return fmt.Errorf("want.Header=%#v got.Header=%#v", wantHeader, got.Header) - } - - if got.Body == nil { - if wantBody != nil { - return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body) - } - } else { - if wantBody == nil { - return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body) - } else { - gotBytes, err := ioutil.ReadAll(got.Body) - if err != nil { - return err - } - - if !reflect.DeepEqual(wantBody, gotBytes) { - return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, gotBytes) - } - } - } - - return nil -} - -func TestUnmarshalSuccessfulResponse(t *testing.T) { - tests := []struct { - body string - res *Response - expectError bool - }{ - // Neither PrevNode or Node - { - `{"action":"delete"}`, - &Response{Action: "delete"}, - false, - }, - - // PrevNode - { - `{"action":"delete", "prevNode": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`, - &Response{Action: "delete", PrevNode: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}}, - false, - }, - - // Node - { - `{"action":"get", "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`, - &Response{Action: "get", Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}}, - false, - }, - - // PrevNode and Node - { - `{"action":"update", "prevNode": {"key": "/foo", "value": "baz", "modifiedIndex": 10, "createdIndex": 10}, "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`, - &Response{Action: "update", PrevNode: &Node{Key: "/foo", Value: "baz", ModifiedIndex: 10, CreatedIndex: 10}, Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}}, - false, - }, - - // Garbage in body - { - `garbage`, - nil, - true, - }, - } - - for i, tt := range tests { - res, err := unmarshalSuccessfulResponse([]byte(tt.body)) - if tt.expectError != (err != nil) { - t.Errorf("#%d: expectError=%t, err=%v", i, tt.expectError, err) - } - - if (res == nil) != (tt.res == nil) { - t.Errorf("#%d: received res==%v, but expected res==%v", i, res, tt.res) - continue - } else if tt.res == nil { - // expected and succesfully got nil response - continue - } - - if res.Action != tt.res.Action { - t.Errorf("#%d: Action=%s, expected %s", i, res.Action, tt.res.Action) - } - - if !reflect.DeepEqual(res.Node, tt.res.Node) { - t.Errorf("#%d: Node=%v, expected %v", i, res.Node, tt.res.Node) - } - } -} - -func TestUnmarshalErrorResponse(t *testing.T) { - unrecognized := errors.New("test fixture") - - tests := []struct { - code int - want error - }{ - {http.StatusBadRequest, unrecognized}, - {http.StatusUnauthorized, unrecognized}, - {http.StatusPaymentRequired, unrecognized}, - {http.StatusForbidden, unrecognized}, - {http.StatusNotFound, ErrKeyNoExist}, - {http.StatusMethodNotAllowed, unrecognized}, - {http.StatusNotAcceptable, unrecognized}, - {http.StatusProxyAuthRequired, unrecognized}, - {http.StatusRequestTimeout, unrecognized}, - {http.StatusConflict, unrecognized}, - {http.StatusGone, unrecognized}, - {http.StatusLengthRequired, unrecognized}, - {http.StatusPreconditionFailed, ErrKeyExists}, - {http.StatusRequestEntityTooLarge, unrecognized}, - {http.StatusRequestURITooLong, unrecognized}, - {http.StatusUnsupportedMediaType, unrecognized}, - {http.StatusRequestedRangeNotSatisfiable, unrecognized}, - {http.StatusExpectationFailed, unrecognized}, - {http.StatusTeapot, unrecognized}, - - {http.StatusInternalServerError, ErrNoLeader}, - {http.StatusNotImplemented, unrecognized}, - {http.StatusBadGateway, unrecognized}, - {http.StatusServiceUnavailable, unrecognized}, - {http.StatusGatewayTimeout, unrecognized}, - {http.StatusHTTPVersionNotSupported, unrecognized}, - } - - for i, tt := range tests { - want := tt.want - if reflect.DeepEqual(unrecognized, want) { - want = fmt.Errorf("unrecognized HTTP status code %d", tt.code) - } - - got := unmarshalErrorResponse(tt.code) - if !reflect.DeepEqual(want, got) { - t.Errorf("#%d: want=%v, got=%v", i, want, got) - } - } -} - type fakeTransport struct { respchan chan *http.Response errchan chan error diff --git a/client/keys.go b/client/keys.go new file mode 100644 index 000000000..a95090033 --- /dev/null +++ b/client/keys.go @@ -0,0 +1,276 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package client + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +var ( + DefaultV2KeysPrefix = "/v2/keys" +) + +var ( + ErrUnavailable = errors.New("client: no available etcd endpoints") + ErrNoLeader = errors.New("client: no leader") + ErrKeyNoExist = errors.New("client: key does not exist") + ErrKeyExists = errors.New("client: key already exists") +) + +func NewKeysAPI(tr *http.Transport, ep string, to time.Duration) (*HTTPKeysAPI, error) { + c, err := newHTTPClient(tr, ep, to) + if err != nil { + return nil, err + } + + kAPI := HTTPKeysAPI{ + client: c, + } + + return &kAPI, nil +} + +type KeysAPI interface { + Create(key, value string, ttl time.Duration) (*Response, error) + Get(key string) (*Response, error) + Watch(key string, idx uint64) Watcher + RecursiveWatch(key string, idx uint64) Watcher +} + +type Watcher interface { + Next() (*Response, error) +} + +type Response struct { + Action string `json:"action"` + Node *Node `json:"node"` + PrevNode *Node `json:"prevNode"` +} + +type Nodes []*Node +type Node struct { + Key string `json:"key"` + Value string `json:"value"` + Nodes Nodes `json:"nodes"` + ModifiedIndex uint64 `json:"modifiedIndex"` + CreatedIndex uint64 `json:"createdIndex"` +} + +func (n *Node) String() string { + return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex) +} + +type HTTPKeysAPI struct { + client *httpClient + endpoint url.URL +} + +func (k *HTTPKeysAPI) SetAPIPrefix(p string) { + ep := k.endpoint + ep.Path = path.Join(ep.Path, p) + k.client.endpoint = ep +} + +func (k *HTTPKeysAPI) Create(key, val string, ttl time.Duration) (*Response, error) { + create := &createAction{ + Key: key, + Value: val, + } + if ttl >= 0 { + uttl := uint64(ttl.Seconds()) + create.TTL = &uttl + } + + httpresp, body, err := k.client.doWithTimeout(create) + if err != nil { + return nil, err + } + + return unmarshalHTTPResponse(httpresp.StatusCode, body) +} + +func (k *HTTPKeysAPI) Get(key string) (*Response, error) { + get := &getAction{ + Key: key, + Recursive: false, + } + + httpresp, body, err := k.client.doWithTimeout(get) + if err != nil { + return nil, err + } + + return unmarshalHTTPResponse(httpresp.StatusCode, body) +} + +func (k *HTTPKeysAPI) Watch(key string, idx uint64) Watcher { + return &httpWatcher{ + client: k.client, + nextWait: waitAction{ + Key: key, + WaitIndex: idx, + Recursive: false, + }, + } +} + +func (k *HTTPKeysAPI) RecursiveWatch(key string, idx uint64) Watcher { + return &httpWatcher{ + client: k.client, + nextWait: waitAction{ + Key: key, + WaitIndex: idx, + Recursive: true, + }, + } +} + +type httpWatcher struct { + client *httpClient + nextWait waitAction +} + +func (hw *httpWatcher) Next() (*Response, error) { + //TODO(bcwaldon): This needs to be cancellable by the calling user + httpresp, body, err := hw.client.do(context.Background(), &hw.nextWait) + if err != nil { + return nil, err + } + + resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body) + if err != nil { + return nil, err + } + + hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1 + return resp, nil +} + +// v2KeysURL forms a URL representing the location of a key. The provided +// endpoint must be the root of the etcd keys API. For example, a valid +// endpoint probably has the path "/v2/keys". +func v2KeysURL(ep url.URL, key string) *url.URL { + ep.Path = path.Join(ep.Path, key) + return &ep +} + +type getAction struct { + Key string + Recursive bool +} + +func (g *getAction) httpRequest(ep url.URL) *http.Request { + u := v2KeysURL(ep, g.Key) + + params := u.Query() + params.Set("recursive", strconv.FormatBool(g.Recursive)) + u.RawQuery = params.Encode() + + req, _ := http.NewRequest("GET", u.String(), nil) + return req +} + +type waitAction struct { + Key string + WaitIndex uint64 + Recursive bool +} + +func (w *waitAction) httpRequest(ep url.URL) *http.Request { + u := v2KeysURL(ep, w.Key) + + params := u.Query() + params.Set("wait", "true") + params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10)) + params.Set("recursive", strconv.FormatBool(w.Recursive)) + u.RawQuery = params.Encode() + + req, _ := http.NewRequest("GET", u.String(), nil) + return req +} + +type createAction struct { + Key string + Value string + TTL *uint64 +} + +func (c *createAction) httpRequest(ep url.URL) *http.Request { + u := v2KeysURL(ep, c.Key) + + params := u.Query() + params.Set("prevExist", "false") + u.RawQuery = params.Encode() + + form := url.Values{} + form.Add("value", c.Value) + if c.TTL != nil { + form.Add("ttl", strconv.FormatUint(*c.TTL, 10)) + } + body := strings.NewReader(form.Encode()) + + req, _ := http.NewRequest("PUT", u.String(), body) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + return req +} + +func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) { + switch code { + case http.StatusOK, http.StatusCreated: + res, err = unmarshalSuccessfulResponse(body) + default: + err = unmarshalErrorResponse(code) + } + + return +} + +func unmarshalSuccessfulResponse(body []byte) (*Response, error) { + var res Response + err := json.Unmarshal(body, &res) + if err != nil { + return nil, err + } + + return &res, nil +} + +func unmarshalErrorResponse(code int) error { + switch code { + case http.StatusNotFound: + return ErrKeyNoExist + case http.StatusPreconditionFailed: + return ErrKeyExists + case http.StatusInternalServerError: + // this isn't necessarily true + return ErrNoLeader + default: + } + + return fmt.Errorf("unrecognized HTTP status code %d", code) +} diff --git a/client/keys_test.go b/client/keys_test.go new file mode 100644 index 000000000..1ab0937a1 --- /dev/null +++ b/client/keys_test.go @@ -0,0 +1,355 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package client + +import ( + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "reflect" + "testing" +) + +func TestV2KeysURLHelper(t *testing.T) { + tests := []struct { + endpoint url.URL + key string + want url.URL + }{ + // key is empty, no problem + { + endpoint: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"}, + key: "", + want: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"}, + }, + + // key is joined to path + { + endpoint: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"}, + key: "/foo/bar", + want: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys/foo/bar"}, + }, + + // key is joined to path when path is empty + { + endpoint: url.URL{Scheme: "http", Host: "example.com", Path: ""}, + key: "/foo/bar", + want: url.URL{Scheme: "http", Host: "example.com", Path: "/foo/bar"}, + }, + + // Host field carries through with port + { + endpoint: url.URL{Scheme: "http", Host: "example.com:8080", Path: "/v2/keys"}, + key: "", + want: url.URL{Scheme: "http", Host: "example.com:8080", Path: "/v2/keys"}, + }, + + // Scheme carries through + { + endpoint: url.URL{Scheme: "https", Host: "example.com", Path: "/v2/keys"}, + key: "", + want: url.URL{Scheme: "https", Host: "example.com", Path: "/v2/keys"}, + }, + } + + for i, tt := range tests { + got := v2KeysURL(tt.endpoint, tt.key) + if tt.want != *got { + t.Errorf("#%d: want=%#v, got=%#v", i, tt.want, *got) + } + } +} + +func TestGetAction(t *testing.T) { + ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"} + wantURL := &url.URL{ + Scheme: "http", + Host: "example.com", + Path: "/v2/keys/foo/bar", + } + wantHeader := http.Header{} + + tests := []struct { + recursive bool + wantQuery string + }{ + { + recursive: false, + wantQuery: "recursive=false", + }, + { + recursive: true, + wantQuery: "recursive=true", + }, + } + + for i, tt := range tests { + f := getAction{ + Key: "/foo/bar", + Recursive: tt.recursive, + } + got := *f.httpRequest(ep) + + wantURL := wantURL + wantURL.RawQuery = tt.wantQuery + + err := assertResponse(got, wantURL, wantHeader, nil) + if err != nil { + t.Errorf("#%d: %v", i, err) + } + } +} + +func TestWaitAction(t *testing.T) { + ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"} + wantURL := &url.URL{ + Scheme: "http", + Host: "example.com", + Path: "/v2/keys/foo/bar", + } + wantHeader := http.Header{} + + tests := []struct { + waitIndex uint64 + recursive bool + wantQuery string + }{ + { + recursive: false, + waitIndex: uint64(0), + wantQuery: "recursive=false&wait=true&waitIndex=0", + }, + { + recursive: false, + waitIndex: uint64(12), + wantQuery: "recursive=false&wait=true&waitIndex=12", + }, + { + recursive: true, + waitIndex: uint64(12), + wantQuery: "recursive=true&wait=true&waitIndex=12", + }, + } + + for i, tt := range tests { + f := waitAction{ + Key: "/foo/bar", + WaitIndex: tt.waitIndex, + Recursive: tt.recursive, + } + got := *f.httpRequest(ep) + + wantURL := wantURL + wantURL.RawQuery = tt.wantQuery + + err := assertResponse(got, wantURL, wantHeader, nil) + if err != nil { + t.Errorf("#%d: %v", i, err) + } + } +} + +func TestCreateAction(t *testing.T) { + ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"} + wantURL := &url.URL{ + Scheme: "http", + Host: "example.com", + Path: "/v2/keys/foo/bar", + RawQuery: "prevExist=false", + } + wantHeader := http.Header(map[string][]string{ + "Content-Type": []string{"application/x-www-form-urlencoded"}, + }) + + ttl12 := uint64(12) + tests := []struct { + value string + ttl *uint64 + wantBody string + }{ + { + value: "baz", + wantBody: "value=baz", + }, + { + value: "baz", + ttl: &ttl12, + wantBody: "ttl=12&value=baz", + }, + } + + for i, tt := range tests { + f := createAction{ + Key: "/foo/bar", + Value: tt.value, + TTL: tt.ttl, + } + got := *f.httpRequest(ep) + + err := assertResponse(got, wantURL, wantHeader, []byte(tt.wantBody)) + if err != nil { + t.Errorf("#%d: %v", i, err) + } + } +} + +func assertResponse(got http.Request, wantURL *url.URL, wantHeader http.Header, wantBody []byte) error { + if !reflect.DeepEqual(wantURL, got.URL) { + return fmt.Errorf("want.URL=%#v got.URL=%#v", wantURL, got.URL) + } + + if !reflect.DeepEqual(wantHeader, got.Header) { + return fmt.Errorf("want.Header=%#v got.Header=%#v", wantHeader, got.Header) + } + + if got.Body == nil { + if wantBody != nil { + return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body) + } + } else { + if wantBody == nil { + return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body) + } else { + gotBytes, err := ioutil.ReadAll(got.Body) + if err != nil { + return err + } + + if !reflect.DeepEqual(wantBody, gotBytes) { + return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, gotBytes) + } + } + } + + return nil +} + +func TestUnmarshalSuccessfulResponse(t *testing.T) { + tests := []struct { + body string + res *Response + expectError bool + }{ + // Neither PrevNode or Node + { + `{"action":"delete"}`, + &Response{Action: "delete"}, + false, + }, + + // PrevNode + { + `{"action":"delete", "prevNode": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`, + &Response{Action: "delete", PrevNode: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}}, + false, + }, + + // Node + { + `{"action":"get", "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`, + &Response{Action: "get", Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}}, + false, + }, + + // PrevNode and Node + { + `{"action":"update", "prevNode": {"key": "/foo", "value": "baz", "modifiedIndex": 10, "createdIndex": 10}, "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`, + &Response{Action: "update", PrevNode: &Node{Key: "/foo", Value: "baz", ModifiedIndex: 10, CreatedIndex: 10}, Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}}, + false, + }, + + // Garbage in body + { + `garbage`, + nil, + true, + }, + } + + for i, tt := range tests { + res, err := unmarshalSuccessfulResponse([]byte(tt.body)) + if tt.expectError != (err != nil) { + t.Errorf("#%d: expectError=%t, err=%v", i, tt.expectError, err) + } + + if (res == nil) != (tt.res == nil) { + t.Errorf("#%d: received res==%v, but expected res==%v", i, res, tt.res) + continue + } else if tt.res == nil { + // expected and succesfully got nil response + continue + } + + if res.Action != tt.res.Action { + t.Errorf("#%d: Action=%s, expected %s", i, res.Action, tt.res.Action) + } + + if !reflect.DeepEqual(res.Node, tt.res.Node) { + t.Errorf("#%d: Node=%v, expected %v", i, res.Node, tt.res.Node) + } + } +} + +func TestUnmarshalErrorResponse(t *testing.T) { + unrecognized := errors.New("test fixture") + + tests := []struct { + code int + want error + }{ + {http.StatusBadRequest, unrecognized}, + {http.StatusUnauthorized, unrecognized}, + {http.StatusPaymentRequired, unrecognized}, + {http.StatusForbidden, unrecognized}, + {http.StatusNotFound, ErrKeyNoExist}, + {http.StatusMethodNotAllowed, unrecognized}, + {http.StatusNotAcceptable, unrecognized}, + {http.StatusProxyAuthRequired, unrecognized}, + {http.StatusRequestTimeout, unrecognized}, + {http.StatusConflict, unrecognized}, + {http.StatusGone, unrecognized}, + {http.StatusLengthRequired, unrecognized}, + {http.StatusPreconditionFailed, ErrKeyExists}, + {http.StatusRequestEntityTooLarge, unrecognized}, + {http.StatusRequestURITooLong, unrecognized}, + {http.StatusUnsupportedMediaType, unrecognized}, + {http.StatusRequestedRangeNotSatisfiable, unrecognized}, + {http.StatusExpectationFailed, unrecognized}, + {http.StatusTeapot, unrecognized}, + + {http.StatusInternalServerError, ErrNoLeader}, + {http.StatusNotImplemented, unrecognized}, + {http.StatusBadGateway, unrecognized}, + {http.StatusServiceUnavailable, unrecognized}, + {http.StatusGatewayTimeout, unrecognized}, + {http.StatusHTTPVersionNotSupported, unrecognized}, + } + + for i, tt := range tests { + want := tt.want + if reflect.DeepEqual(unrecognized, want) { + want = fmt.Errorf("unrecognized HTTP status code %d", tt.code) + } + + got := unmarshalErrorResponse(tt.code) + if !reflect.DeepEqual(want, got) { + t.Errorf("#%d: want=%v, got=%v", i, want, got) + } + } +} diff --git a/discovery/discovery.go b/discovery/discovery.go index 5cf7bdc42..996394066 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -58,7 +58,7 @@ type discovery struct { cluster string id uint64 config string - c client.Client + c client.KeysAPI retries uint url *url.URL @@ -105,13 +105,13 @@ func New(durl string, id uint64, config string) (Discoverer, error) { if err != nil { return nil, err } - c, err := client.NewHTTPClient(&http.Transport{Proxy: pf}, u.String(), time.Second*5) + c, err := client.NewKeysAPI(&http.Transport{Proxy: pf}, u.String(), time.Second*5) if err != nil { return nil, err } // discovery service redirects /[key] to /v2/keys/[key] // set the prefix of client to "" to handle this - c.SetPrefix("") + c.SetAPIPrefix("") return &discovery{ cluster: token, id: id, diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index becf164ee..20be66894 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -304,7 +304,7 @@ func TestCreateSelf(t *testing.T) { errwc := &clientWithResp{rs, errw} tests := []struct { - c client.Client + c client.KeysAPI werr error }{ // no error From 45d8fbdcdaa2204d007600d43254cb8ff4444017 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Fri, 24 Oct 2014 10:45:49 -0700 Subject: [PATCH 2/3] client: move discovery path logic into client pkg --- client/keys.go | 19 ++++++++++--------- discovery/discovery.go | 5 +---- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/client/keys.go b/client/keys.go index a95090033..1bbc7cb10 100644 --- a/client/keys.go +++ b/client/keys.go @@ -41,7 +41,15 @@ var ( ErrKeyExists = errors.New("client: key already exists") ) -func NewKeysAPI(tr *http.Transport, ep string, to time.Duration) (*HTTPKeysAPI, error) { +func NewKeysAPI(tr *http.Transport, ep string, to time.Duration) (KeysAPI, error) { + return newHTTPKeysAPIWithPrefix(tr, ep, to, DefaultV2KeysPrefix) +} + +func NewDiscoveryKeysAPI(tr *http.Transport, ep string, to time.Duration) (KeysAPI, error) { + return newHTTPKeysAPIWithPrefix(tr, ep, to, "") +} + +func newHTTPKeysAPIWithPrefix(tr *http.Transport, ep string, to time.Duration, prefix string) (*HTTPKeysAPI, error) { c, err := newHTTPClient(tr, ep, to) if err != nil { return nil, err @@ -85,14 +93,7 @@ func (n *Node) String() string { } type HTTPKeysAPI struct { - client *httpClient - endpoint url.URL -} - -func (k *HTTPKeysAPI) SetAPIPrefix(p string) { - ep := k.endpoint - ep.Path = path.Join(ep.Path, p) - k.client.endpoint = ep + client *httpClient } func (k *HTTPKeysAPI) Create(key, val string, ttl time.Duration) (*Response, error) { diff --git a/discovery/discovery.go b/discovery/discovery.go index 996394066..bba0b4cfe 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -105,13 +105,10 @@ func New(durl string, id uint64, config string) (Discoverer, error) { if err != nil { return nil, err } - c, err := client.NewKeysAPI(&http.Transport{Proxy: pf}, u.String(), time.Second*5) + c, err := client.NewDiscoveryKeysAPI(&http.Transport{Proxy: pf}, u.String(), time.Second*5) if err != nil { return nil, err } - // discovery service redirects /[key] to /v2/keys/[key] - // set the prefix of client to "" to handle this - c.SetAPIPrefix("") return &discovery{ cluster: token, id: id, From f21d93ba605e6599d6393d4902c4cda5169e320e Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Fri, 24 Oct 2014 10:50:09 -0700 Subject: [PATCH 3/3] client: define DefaultRequestTimeout --- client/http.go | 3 ++- discovery/discovery.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/client/http.go b/client/http.go index 8e8a402c1..aa2a65078 100644 --- a/client/http.go +++ b/client/http.go @@ -26,7 +26,8 @@ import ( ) var ( - ErrTimeout = context.DeadlineExceeded + ErrTimeout = context.DeadlineExceeded + DefaultRequestTimeout = 5 * time.Second ) // transport mimics http.Transport to provide an interface which can be diff --git a/discovery/discovery.go b/discovery/discovery.go index bba0b4cfe..786a0da4c 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -105,7 +105,7 @@ func New(durl string, id uint64, config string) (Discoverer, error) { if err != nil { return nil, err } - c, err := client.NewDiscoveryKeysAPI(&http.Transport{Proxy: pf}, u.String(), time.Second*5) + c, err := client.NewDiscoveryKeysAPI(&http.Transport{Proxy: pf}, u.String(), client.DefaultRequestTimeout) if err != nil { return nil, err }