diff --git a/client/client.go b/client/client.go new file mode 100644 index 000000000..0fbf0d880 --- /dev/null +++ b/client/client.go @@ -0,0 +1,44 @@ +package client + +import ( + "errors" + "fmt" + "time" +) + +var ( + ErrUnavailable = errors.New("client: no available etcd endpoints") + ErrNoLeader = errors.New("client: no leader") + ErrKeyNoExist = errors.New("client: key does not exist") + ErrKeyExists = errors.New("client: key already exists") +) + +type Client interface { + Create(key, value string, ttl time.Duration) (*Response, error) + Get(key string) (*Response, error) + Watch(key string) Watcher + RecursiveWatch(key string) Watcher +} + +type Watcher interface { + Next() (*Response, error) +} + +type Response struct { + Action string `json:"action"` + Node *Node `json:"node"` + PrevNode *Node `json:"prevNode"` +} + +type Nodes []Node +type Node struct { + Key string `json:"key"` + Value string `json:"value"` + Nodes Nodes `json:"nodes"` + ModifiedIndex uint64 `json:"modifiedIndex"` + CreatedIndex uint64 `json:"createdIndex"` +} + +func (n *Node) String() string { + return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex) +} diff --git a/client/http.go b/client/http.go new file mode 100644 index 000000000..41ce4b19e --- /dev/null +++ b/client/http.go @@ -0,0 +1,276 @@ +package client + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "time" + + "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context" +) + +const ( + v2Prefix = "/v2/keys" +) + +// transport mimics http.Transport to provide an interface which can be +// substituted for testing (since the RoundTripper interface alone does not +// require the CancelRequest method) +type transport interface { + http.RoundTripper + CancelRequest(req *http.Request) +} + +type httpClient struct { + transport transport + endpoint url.URL + timeout time.Duration +} + +func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpClient, error) { + u, err := url.Parse(ep) + if err != nil { + return nil, err + } + + c := &httpClient{ + transport: tr, + endpoint: *u, + timeout: timeout, + } + + return c, nil +} + +func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) { + uintTTL := uint64(ttl.Seconds()) + create := &createAction{ + Key: key, + Value: val, + TTL: &uintTTL, + } + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + httpresp, body, err := c.do(ctx, create) + cancel() + + if err != nil { + return nil, err + } + + return unmarshalHTTPResponse(httpresp.StatusCode, body) +} + +func (c *httpClient) Get(key string) (*Response, error) { + get := &getAction{ + Key: key, + Recursive: false, + } + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + httpresp, body, err := c.do(ctx, get) + cancel() + + if err != nil { + return nil, err + } + + return unmarshalHTTPResponse(httpresp.StatusCode, body) +} + +type roundTripResponse struct { + resp *http.Response + err error +} + +func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { + req := act.httpRequest(c.endpoint) + + rtchan := make(chan roundTripResponse, 1) + go func() { + resp, err := c.transport.RoundTrip(req) + rtchan <- roundTripResponse{resp: resp, err: err} + close(rtchan) + }() + + var resp *http.Response + var err error + + select { + case rtresp := <-rtchan: + resp, err = rtresp.resp, rtresp.err + case <-ctx.Done(): + c.transport.CancelRequest(req) + // wait for request to actually exit before continuing + <-rtchan + err = ctx.Err() + } + + // always check for resp nil-ness to deal with possible + // race conditions between channels above + defer func() { + if resp != nil { + resp.Body.Close() + } + }() + + if err != nil { + return nil, nil, err + } + + body, err := ioutil.ReadAll(resp.Body) + return resp, body, err +} + +func (c *httpClient) Watch(key string, idx uint64) *httpWatcher { + return &httpWatcher{ + httpClient: *c, + nextWait: waitAction{ + Key: key, + WaitIndex: idx, + Recursive: false, + }, + } +} + +func (c *httpClient) RecursiveWatch(key string, idx uint64) *httpWatcher { + return &httpWatcher{ + httpClient: *c, + nextWait: waitAction{ + Key: key, + WaitIndex: idx, + Recursive: true, + }, + } +} + +type httpWatcher struct { + httpClient + nextWait waitAction +} + +func (hw *httpWatcher) Next() (*Response, error) { + httpresp, body, err := hw.httpClient.do(context.Background(), &hw.nextWait) + if err != nil { + return nil, err + } + + resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body) + if err != nil { + return nil, err + } + + hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1 + return resp, nil +} + +func v2URL(ep url.URL, key string) *url.URL { + ep.Path = path.Join(ep.Path, v2Prefix, key) + return &ep +} + +type httpAction interface { + httpRequest(url.URL) *http.Request +} + +type getAction struct { + Key string + Recursive bool +} + +func (g *getAction) httpRequest(ep url.URL) *http.Request { + u := v2URL(ep, g.Key) + + params := u.Query() + params.Set("recursive", strconv.FormatBool(g.Recursive)) + u.RawQuery = params.Encode() + + req, _ := http.NewRequest("GET", u.String(), nil) + return req +} + +type waitAction struct { + Key string + WaitIndex uint64 + Recursive bool +} + +func (w *waitAction) httpRequest(ep url.URL) *http.Request { + u := v2URL(ep, w.Key) + + params := u.Query() + params.Set("wait", "true") + params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10)) + params.Set("recursive", strconv.FormatBool(w.Recursive)) + u.RawQuery = params.Encode() + + req, _ := http.NewRequest("GET", u.String(), nil) + return req +} + +type createAction struct { + Key string + Value string + TTL *uint64 +} + +func (c *createAction) httpRequest(ep url.URL) *http.Request { + u := v2URL(ep, c.Key) + + params := u.Query() + params.Set("prevExist", "false") + u.RawQuery = params.Encode() + + form := url.Values{} + form.Add("value", c.Value) + if c.TTL != nil { + form.Add("ttl", strconv.FormatUint(*c.TTL, 10)) + } + body := strings.NewReader(form.Encode()) + + req, _ := http.NewRequest("PUT", u.String(), body) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + return req +} + +func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) { + switch code { + case http.StatusOK, http.StatusCreated: + res, err = unmarshalSuccessfulResponse(body) + default: + err = unmarshalErrorResponse(code) + } + + return +} + +func unmarshalSuccessfulResponse(body []byte) (*Response, error) { + var res Response + err := json.Unmarshal(body, &res) + if err != nil { + return nil, err + } + + return &res, nil +} + +func unmarshalErrorResponse(code int) error { + switch code { + case http.StatusNotFound: + return ErrKeyNoExist + case http.StatusPreconditionFailed: + return ErrKeyExists + case http.StatusInternalServerError: + // this isn't necessarily true + return ErrNoLeader + default: + } + + return fmt.Errorf("unrecognized HTTP status code %d", code) +} diff --git a/client/http_test.go b/client/http_test.go new file mode 100644 index 000000000..05aba9cef --- /dev/null +++ b/client/http_test.go @@ -0,0 +1,464 @@ +package client + +import ( + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "reflect" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context" +) + +func TestV2URLHelper(t *testing.T) { + tests := []struct { + endpoint url.URL + key string + want url.URL + }{ + // key is empty, no problem + { + endpoint: url.URL{Scheme: "http", Host: "example.com", Path: ""}, + key: "", + want: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"}, + }, + + // key is joined to path + { + endpoint: url.URL{Scheme: "http", Host: "example.com", Path: ""}, + key: "/foo/bar", + want: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys/foo/bar"}, + }, + + // Host field carries through with port + { + endpoint: url.URL{Scheme: "http", Host: "example.com:8080", Path: ""}, + key: "", + want: url.URL{Scheme: "http", Host: "example.com:8080", Path: "/v2/keys"}, + }, + + // Scheme carries through + { + endpoint: url.URL{Scheme: "https", Host: "example.com", Path: ""}, + key: "", + want: url.URL{Scheme: "https", Host: "example.com", Path: "/v2/keys"}, + }, + + // Path on endpoint is not ignored + { + endpoint: url.URL{Scheme: "https", Host: "example.com", Path: "/prefix"}, + key: "/foo", + want: url.URL{Scheme: "https", Host: "example.com", Path: "/prefix/v2/keys/foo"}, + }, + } + + for i, tt := range tests { + got := v2URL(tt.endpoint, tt.key) + if tt.want != *got { + t.Errorf("#%d: want=%#v, got=%#v", i, tt.want, *got) + } + } +} + +func TestGetAction(t *testing.T) { + ep := url.URL{Scheme: "http", Host: "example.com"} + wantURL := &url.URL{ + Scheme: "http", + Host: "example.com", + Path: "/v2/keys/foo/bar", + } + wantHeader := http.Header{} + + tests := []struct { + recursive bool + wantQuery string + }{ + { + recursive: false, + wantQuery: "recursive=false", + }, + { + recursive: true, + wantQuery: "recursive=true", + }, + } + + for i, tt := range tests { + f := getAction{ + Key: "/foo/bar", + Recursive: tt.recursive, + } + got := *f.httpRequest(ep) + + wantURL := wantURL + wantURL.RawQuery = tt.wantQuery + + err := assertResponse(got, wantURL, wantHeader, nil) + if err != nil { + t.Errorf("%#d: %v", i, err) + } + } +} + +func TestWaitAction(t *testing.T) { + ep := url.URL{Scheme: "http", Host: "example.com"} + wantURL := &url.URL{ + Scheme: "http", + Host: "example.com", + Path: "/v2/keys/foo/bar", + } + wantHeader := http.Header{} + + tests := []struct { + waitIndex uint64 + recursive bool + wantQuery string + }{ + { + recursive: false, + waitIndex: uint64(0), + wantQuery: "recursive=false&wait=true&waitIndex=0", + }, + { + recursive: false, + waitIndex: uint64(12), + wantQuery: "recursive=false&wait=true&waitIndex=12", + }, + { + recursive: true, + waitIndex: uint64(12), + wantQuery: "recursive=true&wait=true&waitIndex=12", + }, + } + + for i, tt := range tests { + f := waitAction{ + Key: "/foo/bar", + WaitIndex: tt.waitIndex, + Recursive: tt.recursive, + } + got := *f.httpRequest(ep) + + wantURL := wantURL + wantURL.RawQuery = tt.wantQuery + + err := assertResponse(got, wantURL, wantHeader, nil) + if err != nil { + t.Errorf("%#d: %v", i, err) + } + } +} + +func TestCreateAction(t *testing.T) { + ep := url.URL{Scheme: "http", Host: "example.com"} + wantURL := &url.URL{ + Scheme: "http", + Host: "example.com", + Path: "/v2/keys/foo/bar", + RawQuery: "prevExist=false", + } + wantHeader := http.Header(map[string][]string{ + "Content-Type": []string{"application/x-www-form-urlencoded"}, + }) + + ttl12 := uint64(12) + tests := []struct { + value string + ttl *uint64 + wantBody string + }{ + { + value: "baz", + wantBody: "value=baz", + }, + { + value: "baz", + ttl: &ttl12, + wantBody: "ttl=12&value=baz", + }, + } + + for i, tt := range tests { + f := createAction{ + Key: "/foo/bar", + Value: tt.value, + TTL: tt.ttl, + } + got := *f.httpRequest(ep) + + err := assertResponse(got, wantURL, wantHeader, []byte(tt.wantBody)) + if err != nil { + t.Errorf("%#d: %v", i, err) + } + } +} + +func assertResponse(got http.Request, wantURL *url.URL, wantHeader http.Header, wantBody []byte) error { + if !reflect.DeepEqual(wantURL, got.URL) { + return fmt.Errorf("want.URL=%#v got.URL=%#v", wantURL, got.URL) + } + + if !reflect.DeepEqual(wantHeader, got.Header) { + return fmt.Errorf("want.Header=%#v got.Header=%#v", wantHeader, got.Header) + } + + if got.Body == nil { + if wantBody != nil { + return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body) + } + } else { + if wantBody == nil { + return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body) + } else { + gotBytes, err := ioutil.ReadAll(got.Body) + if err != nil { + return err + } + + if !reflect.DeepEqual(wantBody, gotBytes) { + return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, gotBytes) + } + } + } + + return nil +} + +func TestUnmarshalSuccessfulResponse(t *testing.T) { + tests := []struct { + body string + res *Response + expectError bool + }{ + // Neither PrevNode or Node + { + `{"action":"delete"}`, + &Response{Action: "delete"}, + false, + }, + + // PrevNode + { + `{"action":"delete", "prevNode": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`, + &Response{Action: "delete", PrevNode: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}}, + false, + }, + + // Node + { + `{"action":"get", "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`, + &Response{Action: "get", Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}}, + false, + }, + + // PrevNode and Node + { + `{"action":"update", "prevNode": {"key": "/foo", "value": "baz", "modifiedIndex": 10, "createdIndex": 10}, "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`, + &Response{Action: "update", PrevNode: &Node{Key: "/foo", Value: "baz", ModifiedIndex: 10, CreatedIndex: 10}, Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}}, + false, + }, + + // Garbage in body + { + `garbage`, + nil, + true, + }, + } + + for i, tt := range tests { + res, err := unmarshalSuccessfulResponse([]byte(tt.body)) + if tt.expectError != (err != nil) { + t.Errorf("#%d: expectError=%t, err=%v", i, tt.expectError, err) + } + + if (res == nil) != (tt.res == nil) { + t.Errorf("#%d: received res==%v, but expected res==%v", i, res, tt.res) + continue + } else if tt.res == nil { + // expected and succesfully got nil response + continue + } + + if res.Action != tt.res.Action { + t.Errorf("#%d: Action=%s, expected %s", i, res.Action, tt.res.Action) + } + + if !reflect.DeepEqual(res.Node, tt.res.Node) { + t.Errorf("#%d: Node=%v, expected %v", i, res.Node, tt.res.Node) + } + } +} + +func TestUnmarshalErrorResponse(t *testing.T) { + unrecognized := errors.New("test fixture") + + tests := []struct { + code int + want error + }{ + {http.StatusBadRequest, unrecognized}, + {http.StatusUnauthorized, unrecognized}, + {http.StatusPaymentRequired, unrecognized}, + {http.StatusForbidden, unrecognized}, + {http.StatusNotFound, ErrKeyNoExist}, + {http.StatusMethodNotAllowed, unrecognized}, + {http.StatusNotAcceptable, unrecognized}, + {http.StatusProxyAuthRequired, unrecognized}, + {http.StatusRequestTimeout, unrecognized}, + {http.StatusConflict, unrecognized}, + {http.StatusGone, unrecognized}, + {http.StatusLengthRequired, unrecognized}, + {http.StatusPreconditionFailed, ErrKeyExists}, + {http.StatusRequestEntityTooLarge, unrecognized}, + {http.StatusRequestURITooLong, unrecognized}, + {http.StatusUnsupportedMediaType, unrecognized}, + {http.StatusRequestedRangeNotSatisfiable, unrecognized}, + {http.StatusExpectationFailed, unrecognized}, + {http.StatusTeapot, unrecognized}, + + {http.StatusInternalServerError, ErrNoLeader}, + {http.StatusNotImplemented, unrecognized}, + {http.StatusBadGateway, unrecognized}, + {http.StatusServiceUnavailable, unrecognized}, + {http.StatusGatewayTimeout, unrecognized}, + {http.StatusHTTPVersionNotSupported, unrecognized}, + } + + for i, tt := range tests { + want := tt.want + if reflect.DeepEqual(unrecognized, want) { + want = fmt.Errorf("unrecognized HTTP status code %d", tt.code) + } + + got := unmarshalErrorResponse(tt.code) + if !reflect.DeepEqual(want, got) { + t.Errorf("#%d: want=%v, got=%v", i, want, got) + } + } +} + +type fakeTransport struct { + respchan chan *http.Response + errchan chan error + startCancel chan struct{} + finishCancel chan struct{} +} + +func newFakeTransport() *fakeTransport { + return &fakeTransport{ + respchan: make(chan *http.Response, 1), + errchan: make(chan error, 1), + startCancel: make(chan struct{}, 1), + finishCancel: make(chan struct{}, 1), + } +} + +func (t *fakeTransport) RoundTrip(*http.Request) (*http.Response, error) { + select { + case resp := <-t.respchan: + return resp, nil + case err := <-t.errchan: + return nil, err + case <-t.startCancel: + // wait on finishCancel to simulate taking some amount of + // time while calling CancelRequest + <-t.finishCancel + return nil, errors.New("cancelled") + } +} + +func (t *fakeTransport) CancelRequest(*http.Request) { + t.startCancel <- struct{}{} +} + +type fakeAction struct{} + +func (a *fakeAction) httpRequest(url.URL) *http.Request { + return &http.Request{} +} + +func TestHTTPClientDoSuccess(t *testing.T) { + tr := newFakeTransport() + c := &httpClient{transport: tr} + + tr.respchan <- &http.Response{ + StatusCode: http.StatusTeapot, + Body: ioutil.NopCloser(strings.NewReader("foo")), + } + + resp, body, err := c.do(context.Background(), &fakeAction{}) + if err != nil { + t.Fatalf("incorrect error value: want=nil got=%v", err) + } + + wantCode := http.StatusTeapot + if wantCode != resp.StatusCode { + t.Fatalf("invalid response code: want=%d got=%d", wantCode, resp.StatusCode) + } + + wantBody := []byte("foo") + if !reflect.DeepEqual(wantBody, body) { + t.Fatalf("invalid response body: want=%q got=%q", wantBody, body) + } +} + +func TestHTTPClientDoError(t *testing.T) { + tr := newFakeTransport() + c := &httpClient{transport: tr} + + tr.errchan <- errors.New("fixture") + + _, _, err := c.do(context.Background(), &fakeAction{}) + if err == nil { + t.Fatalf("expected non-nil error, got nil") + } +} + +func TestHTTPClientDoCancelContext(t *testing.T) { + tr := newFakeTransport() + c := &httpClient{transport: tr} + + tr.startCancel <- struct{}{} + tr.finishCancel <- struct{}{} + + _, _, err := c.do(context.Background(), &fakeAction{}) + if err == nil { + t.Fatalf("expected non-nil error, got nil") + } +} + +func TestHTTPClientDoCancelContextWaitForRoundTrip(t *testing.T) { + tr := newFakeTransport() + c := &httpClient{transport: tr} + + donechan := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + c.do(ctx, &fakeAction{}) + close(donechan) + }() + + // This should call CancelRequest and begin the cancellation process + cancel() + + select { + case <-donechan: + t.Fatalf("httpClient.do should not have exited yet") + default: + } + + tr.finishCancel <- struct{}{} + + select { + case <-donechan: + //expected behavior + return + case <-time.After(time.Second): + t.Fatalf("httpClient.do did not exit within 1s") + } +} diff --git a/test b/test index deb7a95e7..9503e43eb 100755 --- a/test +++ b/test @@ -14,7 +14,7 @@ COVER=${COVER:-"-cover"} source ./build -TESTABLE="etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional proxy raft snap store wait wal ./" +TESTABLE="client etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional proxy raft snap store wait wal ./" FORMATTABLE="$TESTABLE cors.go" # user has not provided PKG override