From ce4df96e69ba38e73f646de113ef6e092b712ea9 Mon Sep 17 00:00:00 2001
From: Brian Waldon <bcwaldon@gmail.com>
Date: Thu, 23 Oct 2014 17:11:04 -0700
Subject: [PATCH] client: break apart KeysAPI from httpClient

---
 client/client.go            |  60 ------
 client/http.go              | 259 ++++----------------------
 client/http_test.go         | 329 ---------------------------------
 client/keys.go              | 276 ++++++++++++++++++++++++++++
 client/keys_test.go         | 355 ++++++++++++++++++++++++++++++++++++
 discovery/discovery.go      |   6 +-
 discovery/discovery_test.go |   2 +-
 7 files changed, 666 insertions(+), 621 deletions(-)
 delete mode 100644 client/client.go
 create mode 100644 client/keys.go
 create mode 100644 client/keys_test.go

diff --git a/client/client.go b/client/client.go
deleted file mode 100644
index 75b8c5e56..000000000
--- a/client/client.go
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-   Copyright 2014 CoreOS, Inc.
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package client
-
-import (
-	"errors"
-	"fmt"
-	"time"
-)
-
-var (
-	ErrUnavailable = errors.New("client: no available etcd endpoints")
-	ErrNoLeader    = errors.New("client: no leader")
-	ErrKeyNoExist  = errors.New("client: key does not exist")
-	ErrKeyExists   = errors.New("client: key already exists")
-)
-
-type Client interface {
-	Create(key, value string, ttl time.Duration) (*Response, error)
-	Get(key string) (*Response, error)
-	Watch(key string, idx uint64) Watcher
-	RecursiveWatch(key string, idx uint64) Watcher
-}
-
-type Watcher interface {
-	Next() (*Response, error)
-}
-
-type Response struct {
-	Action   string `json:"action"`
-	Node     *Node  `json:"node"`
-	PrevNode *Node  `json:"prevNode"`
-}
-
-type Nodes []*Node
-type Node struct {
-	Key           string `json:"key"`
-	Value         string `json:"value"`
-	Nodes         Nodes  `json:"nodes"`
-	ModifiedIndex uint64 `json:"modifiedIndex"`
-	CreatedIndex  uint64 `json:"createdIndex"`
-}
-
-func (n *Node) String() string {
-	return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex)
-}
diff --git a/client/http.go b/client/http.go
index 4f94aba12..8e8a402c1 100644
--- a/client/http.go
+++ b/client/http.go
@@ -17,22 +17,16 @@
 package client
 
 import (
-	"encoding/json"
-	"fmt"
 	"io/ioutil"
 	"net/http"
 	"net/url"
-	"path"
-	"strconv"
-	"strings"
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 )
 
 var (
-	DefaultV2KeysPrefix = "/v2/keys"
-	ErrTimeout          = context.DeadlineExceeded
+	ErrTimeout = context.DeadlineExceeded
 )
 
 // transport mimics http.Transport to provide an interface which can be
@@ -43,75 +37,8 @@ type transport interface {
 	CancelRequest(req *http.Request)
 }
 
-type httpClient struct {
-	transport    transport
-	endpoint     url.URL
-	timeout      time.Duration
-	v2KeysPrefix string
-}
-
-func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpClient, error) {
-	u, err := url.Parse(ep)
-	if err != nil {
-		return nil, err
-	}
-
-	c := &httpClient{
-		transport:    tr,
-		endpoint:     *u,
-		timeout:      timeout,
-		v2KeysPrefix: DefaultV2KeysPrefix,
-	}
-
-	return c, nil
-}
-
-func (c *httpClient) SetPrefix(p string) {
-	c.v2KeysPrefix = p
-}
-
-func (c *httpClient) Endpoint() url.URL {
-	ep := c.endpoint
-	ep.Path = path.Join(ep.Path, c.v2KeysPrefix)
-	return ep
-}
-
-func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) {
-	create := &createAction{
-		Key:   key,
-		Value: val,
-	}
-	if ttl >= 0 {
-		uttl := uint64(ttl.Seconds())
-		create.TTL = &uttl
-	}
-
-	ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
-	httpresp, body, err := c.do(ctx, create)
-	cancel()
-
-	if err != nil {
-		return nil, err
-	}
-
-	return unmarshalHTTPResponse(httpresp.StatusCode, body)
-}
-
-func (c *httpClient) Get(key string) (*Response, error) {
-	get := &getAction{
-		Key:       key,
-		Recursive: false,
-	}
-
-	ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
-	httpresp, body, err := c.do(ctx, get)
-	cancel()
-
-	if err != nil {
-		return nil, err
-	}
-
-	return unmarshalHTTPResponse(httpresp.StatusCode, body)
+type httpAction interface {
+	httpRequest(url.URL) *http.Request
 }
 
 type roundTripResponse struct {
@@ -119,8 +46,35 @@ type roundTripResponse struct {
 	err  error
 }
 
+type httpClient struct {
+	transport transport
+	endpoint  url.URL
+	timeout   time.Duration
+}
+
+func newHTTPClient(tr *http.Transport, ep string, to time.Duration) (*httpClient, error) {
+	u, err := url.Parse(ep)
+	if err != nil {
+		return nil, err
+	}
+
+	c := &httpClient{
+		transport: tr,
+		endpoint:  *u,
+		timeout:   to,
+	}
+
+	return c, nil
+}
+
+func (c *httpClient) doWithTimeout(act httpAction) (*http.Response, []byte, error) {
+	ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
+	defer cancel()
+	return c.do(ctx, act)
+}
+
 func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
-	req := act.httpRequest(c.Endpoint())
+	req := act.httpRequest(c.endpoint)
 
 	rtchan := make(chan roundTripResponse, 1)
 	go func() {
@@ -157,154 +111,3 @@ func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []
 	body, err := ioutil.ReadAll(resp.Body)
 	return resp, body, err
 }
-
-func (c *httpClient) Watch(key string, idx uint64) Watcher {
-	return &httpWatcher{
-		httpClient: *c,
-		nextWait: waitAction{
-			Key:       key,
-			WaitIndex: idx,
-			Recursive: false,
-		},
-	}
-}
-
-func (c *httpClient) RecursiveWatch(key string, idx uint64) Watcher {
-	return &httpWatcher{
-		httpClient: *c,
-		nextWait: waitAction{
-			Key:       key,
-			WaitIndex: idx,
-			Recursive: true,
-		},
-	}
-}
-
-type httpWatcher struct {
-	httpClient
-	nextWait waitAction
-}
-
-func (hw *httpWatcher) Next() (*Response, error) {
-	httpresp, body, err := hw.httpClient.do(context.Background(), &hw.nextWait)
-	if err != nil {
-		return nil, err
-	}
-
-	resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
-	if err != nil {
-		return nil, err
-	}
-
-	hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
-	return resp, nil
-}
-
-// v2KeysURL forms a URL representing the location of a key. The provided
-// endpoint must be the root of the etcd keys API. For example, a valid
-// endpoint probably has the path "/v2/keys".
-func v2KeysURL(ep url.URL, key string) *url.URL {
-	ep.Path = path.Join(ep.Path, key)
-	return &ep
-}
-
-type httpAction interface {
-	httpRequest(url.URL) *http.Request
-}
-
-type getAction struct {
-	Key       string
-	Recursive bool
-}
-
-func (g *getAction) httpRequest(ep url.URL) *http.Request {
-	u := v2KeysURL(ep, g.Key)
-
-	params := u.Query()
-	params.Set("recursive", strconv.FormatBool(g.Recursive))
-	u.RawQuery = params.Encode()
-
-	req, _ := http.NewRequest("GET", u.String(), nil)
-	return req
-}
-
-type waitAction struct {
-	Key       string
-	WaitIndex uint64
-	Recursive bool
-}
-
-func (w *waitAction) httpRequest(ep url.URL) *http.Request {
-	u := v2KeysURL(ep, w.Key)
-
-	params := u.Query()
-	params.Set("wait", "true")
-	params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
-	params.Set("recursive", strconv.FormatBool(w.Recursive))
-	u.RawQuery = params.Encode()
-
-	req, _ := http.NewRequest("GET", u.String(), nil)
-	return req
-}
-
-type createAction struct {
-	Key   string
-	Value string
-	TTL   *uint64
-}
-
-func (c *createAction) httpRequest(ep url.URL) *http.Request {
-	u := v2KeysURL(ep, c.Key)
-
-	params := u.Query()
-	params.Set("prevExist", "false")
-	u.RawQuery = params.Encode()
-
-	form := url.Values{}
-	form.Add("value", c.Value)
-	if c.TTL != nil {
-		form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
-	}
-	body := strings.NewReader(form.Encode())
-
-	req, _ := http.NewRequest("PUT", u.String(), body)
-	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
-
-	return req
-}
-
-func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) {
-	switch code {
-	case http.StatusOK, http.StatusCreated:
-		res, err = unmarshalSuccessfulResponse(body)
-	default:
-		err = unmarshalErrorResponse(code)
-	}
-
-	return
-}
-
-func unmarshalSuccessfulResponse(body []byte) (*Response, error) {
-	var res Response
-	err := json.Unmarshal(body, &res)
-	if err != nil {
-		return nil, err
-	}
-
-	return &res, nil
-}
-
-func unmarshalErrorResponse(code int) error {
-	switch code {
-	case http.StatusNotFound:
-		return ErrKeyNoExist
-	case http.StatusPreconditionFailed:
-		return ErrKeyExists
-	case http.StatusInternalServerError:
-		// this isn't necessarily true
-		return ErrNoLeader
-	default:
-	}
-
-	return fmt.Errorf("unrecognized HTTP status code %d", code)
-}
diff --git a/client/http_test.go b/client/http_test.go
index 816d1b32b..16115958a 100644
--- a/client/http_test.go
+++ b/client/http_test.go
@@ -18,7 +18,6 @@ package client
 
 import (
 	"errors"
-	"fmt"
 	"io/ioutil"
 	"net/http"
 	"net/url"
@@ -30,334 +29,6 @@ import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 )
 
-func TestV2URLHelper(t *testing.T) {
-	tests := []struct {
-		endpoint url.URL
-		key      string
-		want     url.URL
-	}{
-		// key is empty, no problem
-		{
-			endpoint: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"},
-			key:      "",
-			want:     url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"},
-		},
-
-		// key is joined to path
-		{
-			endpoint: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"},
-			key:      "/foo/bar",
-			want:     url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys/foo/bar"},
-		},
-
-		// key is joined to path when path is empty
-		{
-			endpoint: url.URL{Scheme: "http", Host: "example.com", Path: ""},
-			key:      "/foo/bar",
-			want:     url.URL{Scheme: "http", Host: "example.com", Path: "/foo/bar"},
-		},
-
-		// Host field carries through with port
-		{
-			endpoint: url.URL{Scheme: "http", Host: "example.com:8080", Path: "/v2/keys"},
-			key:      "",
-			want:     url.URL{Scheme: "http", Host: "example.com:8080", Path: "/v2/keys"},
-		},
-
-		// Scheme carries through
-		{
-			endpoint: url.URL{Scheme: "https", Host: "example.com", Path: "/v2/keys"},
-			key:      "",
-			want:     url.URL{Scheme: "https", Host: "example.com", Path: "/v2/keys"},
-		},
-	}
-
-	for i, tt := range tests {
-		got := v2KeysURL(tt.endpoint, tt.key)
-		if tt.want != *got {
-			t.Errorf("#%d: want=%#v, got=%#v", i, tt.want, *got)
-		}
-	}
-}
-
-func TestGetAction(t *testing.T) {
-	ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
-	wantURL := &url.URL{
-		Scheme: "http",
-		Host:   "example.com",
-		Path:   "/v2/keys/foo/bar",
-	}
-	wantHeader := http.Header{}
-
-	tests := []struct {
-		recursive bool
-		wantQuery string
-	}{
-		{
-			recursive: false,
-			wantQuery: "recursive=false",
-		},
-		{
-			recursive: true,
-			wantQuery: "recursive=true",
-		},
-	}
-
-	for i, tt := range tests {
-		f := getAction{
-			Key:       "/foo/bar",
-			Recursive: tt.recursive,
-		}
-		got := *f.httpRequest(ep)
-
-		wantURL := wantURL
-		wantURL.RawQuery = tt.wantQuery
-
-		err := assertResponse(got, wantURL, wantHeader, nil)
-		if err != nil {
-			t.Errorf("#%d: %v", i, err)
-		}
-	}
-}
-
-func TestWaitAction(t *testing.T) {
-	ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
-	wantURL := &url.URL{
-		Scheme: "http",
-		Host:   "example.com",
-		Path:   "/v2/keys/foo/bar",
-	}
-	wantHeader := http.Header{}
-
-	tests := []struct {
-		waitIndex uint64
-		recursive bool
-		wantQuery string
-	}{
-		{
-			recursive: false,
-			waitIndex: uint64(0),
-			wantQuery: "recursive=false&wait=true&waitIndex=0",
-		},
-		{
-			recursive: false,
-			waitIndex: uint64(12),
-			wantQuery: "recursive=false&wait=true&waitIndex=12",
-		},
-		{
-			recursive: true,
-			waitIndex: uint64(12),
-			wantQuery: "recursive=true&wait=true&waitIndex=12",
-		},
-	}
-
-	for i, tt := range tests {
-		f := waitAction{
-			Key:       "/foo/bar",
-			WaitIndex: tt.waitIndex,
-			Recursive: tt.recursive,
-		}
-		got := *f.httpRequest(ep)
-
-		wantURL := wantURL
-		wantURL.RawQuery = tt.wantQuery
-
-		err := assertResponse(got, wantURL, wantHeader, nil)
-		if err != nil {
-			t.Errorf("#%d: %v", i, err)
-		}
-	}
-}
-
-func TestCreateAction(t *testing.T) {
-	ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
-	wantURL := &url.URL{
-		Scheme:   "http",
-		Host:     "example.com",
-		Path:     "/v2/keys/foo/bar",
-		RawQuery: "prevExist=false",
-	}
-	wantHeader := http.Header(map[string][]string{
-		"Content-Type": []string{"application/x-www-form-urlencoded"},
-	})
-
-	ttl12 := uint64(12)
-	tests := []struct {
-		value    string
-		ttl      *uint64
-		wantBody string
-	}{
-		{
-			value:    "baz",
-			wantBody: "value=baz",
-		},
-		{
-			value:    "baz",
-			ttl:      &ttl12,
-			wantBody: "ttl=12&value=baz",
-		},
-	}
-
-	for i, tt := range tests {
-		f := createAction{
-			Key:   "/foo/bar",
-			Value: tt.value,
-			TTL:   tt.ttl,
-		}
-		got := *f.httpRequest(ep)
-
-		err := assertResponse(got, wantURL, wantHeader, []byte(tt.wantBody))
-		if err != nil {
-			t.Errorf("#%d: %v", i, err)
-		}
-	}
-}
-
-func assertResponse(got http.Request, wantURL *url.URL, wantHeader http.Header, wantBody []byte) error {
-	if !reflect.DeepEqual(wantURL, got.URL) {
-		return fmt.Errorf("want.URL=%#v got.URL=%#v", wantURL, got.URL)
-	}
-
-	if !reflect.DeepEqual(wantHeader, got.Header) {
-		return fmt.Errorf("want.Header=%#v got.Header=%#v", wantHeader, got.Header)
-	}
-
-	if got.Body == nil {
-		if wantBody != nil {
-			return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body)
-		}
-	} else {
-		if wantBody == nil {
-			return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body)
-		} else {
-			gotBytes, err := ioutil.ReadAll(got.Body)
-			if err != nil {
-				return err
-			}
-
-			if !reflect.DeepEqual(wantBody, gotBytes) {
-				return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, gotBytes)
-			}
-		}
-	}
-
-	return nil
-}
-
-func TestUnmarshalSuccessfulResponse(t *testing.T) {
-	tests := []struct {
-		body        string
-		res         *Response
-		expectError bool
-	}{
-		// Neither PrevNode or Node
-		{
-			`{"action":"delete"}`,
-			&Response{Action: "delete"},
-			false,
-		},
-
-		// PrevNode
-		{
-			`{"action":"delete", "prevNode": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`,
-			&Response{Action: "delete", PrevNode: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}},
-			false,
-		},
-
-		// Node
-		{
-			`{"action":"get", "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`,
-			&Response{Action: "get", Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}},
-			false,
-		},
-
-		// PrevNode and Node
-		{
-			`{"action":"update", "prevNode": {"key": "/foo", "value": "baz", "modifiedIndex": 10, "createdIndex": 10}, "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`,
-			&Response{Action: "update", PrevNode: &Node{Key: "/foo", Value: "baz", ModifiedIndex: 10, CreatedIndex: 10}, Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}},
-			false,
-		},
-
-		// Garbage in body
-		{
-			`garbage`,
-			nil,
-			true,
-		},
-	}
-
-	for i, tt := range tests {
-		res, err := unmarshalSuccessfulResponse([]byte(tt.body))
-		if tt.expectError != (err != nil) {
-			t.Errorf("#%d: expectError=%t, err=%v", i, tt.expectError, err)
-		}
-
-		if (res == nil) != (tt.res == nil) {
-			t.Errorf("#%d: received res==%v, but expected res==%v", i, res, tt.res)
-			continue
-		} else if tt.res == nil {
-			// expected and succesfully got nil response
-			continue
-		}
-
-		if res.Action != tt.res.Action {
-			t.Errorf("#%d: Action=%s, expected %s", i, res.Action, tt.res.Action)
-		}
-
-		if !reflect.DeepEqual(res.Node, tt.res.Node) {
-			t.Errorf("#%d: Node=%v, expected %v", i, res.Node, tt.res.Node)
-		}
-	}
-}
-
-func TestUnmarshalErrorResponse(t *testing.T) {
-	unrecognized := errors.New("test fixture")
-
-	tests := []struct {
-		code int
-		want error
-	}{
-		{http.StatusBadRequest, unrecognized},
-		{http.StatusUnauthorized, unrecognized},
-		{http.StatusPaymentRequired, unrecognized},
-		{http.StatusForbidden, unrecognized},
-		{http.StatusNotFound, ErrKeyNoExist},
-		{http.StatusMethodNotAllowed, unrecognized},
-		{http.StatusNotAcceptable, unrecognized},
-		{http.StatusProxyAuthRequired, unrecognized},
-		{http.StatusRequestTimeout, unrecognized},
-		{http.StatusConflict, unrecognized},
-		{http.StatusGone, unrecognized},
-		{http.StatusLengthRequired, unrecognized},
-		{http.StatusPreconditionFailed, ErrKeyExists},
-		{http.StatusRequestEntityTooLarge, unrecognized},
-		{http.StatusRequestURITooLong, unrecognized},
-		{http.StatusUnsupportedMediaType, unrecognized},
-		{http.StatusRequestedRangeNotSatisfiable, unrecognized},
-		{http.StatusExpectationFailed, unrecognized},
-		{http.StatusTeapot, unrecognized},
-
-		{http.StatusInternalServerError, ErrNoLeader},
-		{http.StatusNotImplemented, unrecognized},
-		{http.StatusBadGateway, unrecognized},
-		{http.StatusServiceUnavailable, unrecognized},
-		{http.StatusGatewayTimeout, unrecognized},
-		{http.StatusHTTPVersionNotSupported, unrecognized},
-	}
-
-	for i, tt := range tests {
-		want := tt.want
-		if reflect.DeepEqual(unrecognized, want) {
-			want = fmt.Errorf("unrecognized HTTP status code %d", tt.code)
-		}
-
-		got := unmarshalErrorResponse(tt.code)
-		if !reflect.DeepEqual(want, got) {
-			t.Errorf("#%d: want=%v, got=%v", i, want, got)
-		}
-	}
-}
-
 type fakeTransport struct {
 	respchan     chan *http.Response
 	errchan      chan error
diff --git a/client/keys.go b/client/keys.go
new file mode 100644
index 000000000..a95090033
--- /dev/null
+++ b/client/keys.go
@@ -0,0 +1,276 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package client
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"net/http"
+	"net/url"
+	"path"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+var (
+	DefaultV2KeysPrefix = "/v2/keys"
+)
+
+var (
+	ErrUnavailable = errors.New("client: no available etcd endpoints")
+	ErrNoLeader    = errors.New("client: no leader")
+	ErrKeyNoExist  = errors.New("client: key does not exist")
+	ErrKeyExists   = errors.New("client: key already exists")
+)
+
+func NewKeysAPI(tr *http.Transport, ep string, to time.Duration) (*HTTPKeysAPI, error) {
+	c, err := newHTTPClient(tr, ep, to)
+	if err != nil {
+		return nil, err
+	}
+
+	kAPI := HTTPKeysAPI{
+		client: c,
+	}
+
+	return &kAPI, nil
+}
+
+type KeysAPI interface {
+	Create(key, value string, ttl time.Duration) (*Response, error)
+	Get(key string) (*Response, error)
+	Watch(key string, idx uint64) Watcher
+	RecursiveWatch(key string, idx uint64) Watcher
+}
+
+type Watcher interface {
+	Next() (*Response, error)
+}
+
+type Response struct {
+	Action   string `json:"action"`
+	Node     *Node  `json:"node"`
+	PrevNode *Node  `json:"prevNode"`
+}
+
+type Nodes []*Node
+type Node struct {
+	Key           string `json:"key"`
+	Value         string `json:"value"`
+	Nodes         Nodes  `json:"nodes"`
+	ModifiedIndex uint64 `json:"modifiedIndex"`
+	CreatedIndex  uint64 `json:"createdIndex"`
+}
+
+func (n *Node) String() string {
+	return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex)
+}
+
+type HTTPKeysAPI struct {
+	client   *httpClient
+	endpoint url.URL
+}
+
+func (k *HTTPKeysAPI) SetAPIPrefix(p string) {
+	ep := k.endpoint
+	ep.Path = path.Join(ep.Path, p)
+	k.client.endpoint = ep
+}
+
+func (k *HTTPKeysAPI) Create(key, val string, ttl time.Duration) (*Response, error) {
+	create := &createAction{
+		Key:   key,
+		Value: val,
+	}
+	if ttl >= 0 {
+		uttl := uint64(ttl.Seconds())
+		create.TTL = &uttl
+	}
+
+	httpresp, body, err := k.client.doWithTimeout(create)
+	if err != nil {
+		return nil, err
+	}
+
+	return unmarshalHTTPResponse(httpresp.StatusCode, body)
+}
+
+func (k *HTTPKeysAPI) Get(key string) (*Response, error) {
+	get := &getAction{
+		Key:       key,
+		Recursive: false,
+	}
+
+	httpresp, body, err := k.client.doWithTimeout(get)
+	if err != nil {
+		return nil, err
+	}
+
+	return unmarshalHTTPResponse(httpresp.StatusCode, body)
+}
+
+func (k *HTTPKeysAPI) Watch(key string, idx uint64) Watcher {
+	return &httpWatcher{
+		client: k.client,
+		nextWait: waitAction{
+			Key:       key,
+			WaitIndex: idx,
+			Recursive: false,
+		},
+	}
+}
+
+func (k *HTTPKeysAPI) RecursiveWatch(key string, idx uint64) Watcher {
+	return &httpWatcher{
+		client: k.client,
+		nextWait: waitAction{
+			Key:       key,
+			WaitIndex: idx,
+			Recursive: true,
+		},
+	}
+}
+
+type httpWatcher struct {
+	client   *httpClient
+	nextWait waitAction
+}
+
+func (hw *httpWatcher) Next() (*Response, error) {
+	//TODO(bcwaldon): This needs to be cancellable by the calling user
+	httpresp, body, err := hw.client.do(context.Background(), &hw.nextWait)
+	if err != nil {
+		return nil, err
+	}
+
+	resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
+	if err != nil {
+		return nil, err
+	}
+
+	hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
+	return resp, nil
+}
+
+// v2KeysURL forms a URL representing the location of a key. The provided
+// endpoint must be the root of the etcd keys API. For example, a valid
+// endpoint probably has the path "/v2/keys".
+func v2KeysURL(ep url.URL, key string) *url.URL {
+	ep.Path = path.Join(ep.Path, key)
+	return &ep
+}
+
+type getAction struct {
+	Key       string
+	Recursive bool
+}
+
+func (g *getAction) httpRequest(ep url.URL) *http.Request {
+	u := v2KeysURL(ep, g.Key)
+
+	params := u.Query()
+	params.Set("recursive", strconv.FormatBool(g.Recursive))
+	u.RawQuery = params.Encode()
+
+	req, _ := http.NewRequest("GET", u.String(), nil)
+	return req
+}
+
+type waitAction struct {
+	Key       string
+	WaitIndex uint64
+	Recursive bool
+}
+
+func (w *waitAction) httpRequest(ep url.URL) *http.Request {
+	u := v2KeysURL(ep, w.Key)
+
+	params := u.Query()
+	params.Set("wait", "true")
+	params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
+	params.Set("recursive", strconv.FormatBool(w.Recursive))
+	u.RawQuery = params.Encode()
+
+	req, _ := http.NewRequest("GET", u.String(), nil)
+	return req
+}
+
+type createAction struct {
+	Key   string
+	Value string
+	TTL   *uint64
+}
+
+func (c *createAction) httpRequest(ep url.URL) *http.Request {
+	u := v2KeysURL(ep, c.Key)
+
+	params := u.Query()
+	params.Set("prevExist", "false")
+	u.RawQuery = params.Encode()
+
+	form := url.Values{}
+	form.Add("value", c.Value)
+	if c.TTL != nil {
+		form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
+	}
+	body := strings.NewReader(form.Encode())
+
+	req, _ := http.NewRequest("PUT", u.String(), body)
+	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+
+	return req
+}
+
+func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) {
+	switch code {
+	case http.StatusOK, http.StatusCreated:
+		res, err = unmarshalSuccessfulResponse(body)
+	default:
+		err = unmarshalErrorResponse(code)
+	}
+
+	return
+}
+
+func unmarshalSuccessfulResponse(body []byte) (*Response, error) {
+	var res Response
+	err := json.Unmarshal(body, &res)
+	if err != nil {
+		return nil, err
+	}
+
+	return &res, nil
+}
+
+func unmarshalErrorResponse(code int) error {
+	switch code {
+	case http.StatusNotFound:
+		return ErrKeyNoExist
+	case http.StatusPreconditionFailed:
+		return ErrKeyExists
+	case http.StatusInternalServerError:
+		// this isn't necessarily true
+		return ErrNoLeader
+	default:
+	}
+
+	return fmt.Errorf("unrecognized HTTP status code %d", code)
+}
diff --git a/client/keys_test.go b/client/keys_test.go
new file mode 100644
index 000000000..1ab0937a1
--- /dev/null
+++ b/client/keys_test.go
@@ -0,0 +1,355 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package client
+
+import (
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"reflect"
+	"testing"
+)
+
+func TestV2KeysURLHelper(t *testing.T) {
+	tests := []struct {
+		endpoint url.URL
+		key      string
+		want     url.URL
+	}{
+		// key is empty, no problem
+		{
+			endpoint: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"},
+			key:      "",
+			want:     url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"},
+		},
+
+		// key is joined to path
+		{
+			endpoint: url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"},
+			key:      "/foo/bar",
+			want:     url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys/foo/bar"},
+		},
+
+		// key is joined to path when path is empty
+		{
+			endpoint: url.URL{Scheme: "http", Host: "example.com", Path: ""},
+			key:      "/foo/bar",
+			want:     url.URL{Scheme: "http", Host: "example.com", Path: "/foo/bar"},
+		},
+
+		// Host field carries through with port
+		{
+			endpoint: url.URL{Scheme: "http", Host: "example.com:8080", Path: "/v2/keys"},
+			key:      "",
+			want:     url.URL{Scheme: "http", Host: "example.com:8080", Path: "/v2/keys"},
+		},
+
+		// Scheme carries through
+		{
+			endpoint: url.URL{Scheme: "https", Host: "example.com", Path: "/v2/keys"},
+			key:      "",
+			want:     url.URL{Scheme: "https", Host: "example.com", Path: "/v2/keys"},
+		},
+	}
+
+	for i, tt := range tests {
+		got := v2KeysURL(tt.endpoint, tt.key)
+		if tt.want != *got {
+			t.Errorf("#%d: want=%#v, got=%#v", i, tt.want, *got)
+		}
+	}
+}
+
+func TestGetAction(t *testing.T) {
+	ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
+	wantURL := &url.URL{
+		Scheme: "http",
+		Host:   "example.com",
+		Path:   "/v2/keys/foo/bar",
+	}
+	wantHeader := http.Header{}
+
+	tests := []struct {
+		recursive bool
+		wantQuery string
+	}{
+		{
+			recursive: false,
+			wantQuery: "recursive=false",
+		},
+		{
+			recursive: true,
+			wantQuery: "recursive=true",
+		},
+	}
+
+	for i, tt := range tests {
+		f := getAction{
+			Key:       "/foo/bar",
+			Recursive: tt.recursive,
+		}
+		got := *f.httpRequest(ep)
+
+		wantURL := wantURL
+		wantURL.RawQuery = tt.wantQuery
+
+		err := assertResponse(got, wantURL, wantHeader, nil)
+		if err != nil {
+			t.Errorf("#%d: %v", i, err)
+		}
+	}
+}
+
+func TestWaitAction(t *testing.T) {
+	ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
+	wantURL := &url.URL{
+		Scheme: "http",
+		Host:   "example.com",
+		Path:   "/v2/keys/foo/bar",
+	}
+	wantHeader := http.Header{}
+
+	tests := []struct {
+		waitIndex uint64
+		recursive bool
+		wantQuery string
+	}{
+		{
+			recursive: false,
+			waitIndex: uint64(0),
+			wantQuery: "recursive=false&wait=true&waitIndex=0",
+		},
+		{
+			recursive: false,
+			waitIndex: uint64(12),
+			wantQuery: "recursive=false&wait=true&waitIndex=12",
+		},
+		{
+			recursive: true,
+			waitIndex: uint64(12),
+			wantQuery: "recursive=true&wait=true&waitIndex=12",
+		},
+	}
+
+	for i, tt := range tests {
+		f := waitAction{
+			Key:       "/foo/bar",
+			WaitIndex: tt.waitIndex,
+			Recursive: tt.recursive,
+		}
+		got := *f.httpRequest(ep)
+
+		wantURL := wantURL
+		wantURL.RawQuery = tt.wantQuery
+
+		err := assertResponse(got, wantURL, wantHeader, nil)
+		if err != nil {
+			t.Errorf("#%d: %v", i, err)
+		}
+	}
+}
+
+func TestCreateAction(t *testing.T) {
+	ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
+	wantURL := &url.URL{
+		Scheme:   "http",
+		Host:     "example.com",
+		Path:     "/v2/keys/foo/bar",
+		RawQuery: "prevExist=false",
+	}
+	wantHeader := http.Header(map[string][]string{
+		"Content-Type": []string{"application/x-www-form-urlencoded"},
+	})
+
+	ttl12 := uint64(12)
+	tests := []struct {
+		value    string
+		ttl      *uint64
+		wantBody string
+	}{
+		{
+			value:    "baz",
+			wantBody: "value=baz",
+		},
+		{
+			value:    "baz",
+			ttl:      &ttl12,
+			wantBody: "ttl=12&value=baz",
+		},
+	}
+
+	for i, tt := range tests {
+		f := createAction{
+			Key:   "/foo/bar",
+			Value: tt.value,
+			TTL:   tt.ttl,
+		}
+		got := *f.httpRequest(ep)
+
+		err := assertResponse(got, wantURL, wantHeader, []byte(tt.wantBody))
+		if err != nil {
+			t.Errorf("#%d: %v", i, err)
+		}
+	}
+}
+
+func assertResponse(got http.Request, wantURL *url.URL, wantHeader http.Header, wantBody []byte) error {
+	if !reflect.DeepEqual(wantURL, got.URL) {
+		return fmt.Errorf("want.URL=%#v got.URL=%#v", wantURL, got.URL)
+	}
+
+	if !reflect.DeepEqual(wantHeader, got.Header) {
+		return fmt.Errorf("want.Header=%#v got.Header=%#v", wantHeader, got.Header)
+	}
+
+	if got.Body == nil {
+		if wantBody != nil {
+			return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body)
+		}
+	} else {
+		if wantBody == nil {
+			return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, got.Body)
+		} else {
+			gotBytes, err := ioutil.ReadAll(got.Body)
+			if err != nil {
+				return err
+			}
+
+			if !reflect.DeepEqual(wantBody, gotBytes) {
+				return fmt.Errorf("want.Body=%v got.Body=%v", wantBody, gotBytes)
+			}
+		}
+	}
+
+	return nil
+}
+
+func TestUnmarshalSuccessfulResponse(t *testing.T) {
+	tests := []struct {
+		body        string
+		res         *Response
+		expectError bool
+	}{
+		// Neither PrevNode or Node
+		{
+			`{"action":"delete"}`,
+			&Response{Action: "delete"},
+			false,
+		},
+
+		// PrevNode
+		{
+			`{"action":"delete", "prevNode": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`,
+			&Response{Action: "delete", PrevNode: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}},
+			false,
+		},
+
+		// Node
+		{
+			`{"action":"get", "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`,
+			&Response{Action: "get", Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}},
+			false,
+		},
+
+		// PrevNode and Node
+		{
+			`{"action":"update", "prevNode": {"key": "/foo", "value": "baz", "modifiedIndex": 10, "createdIndex": 10}, "node": {"key": "/foo", "value": "bar", "modifiedIndex": 12, "createdIndex": 10}}`,
+			&Response{Action: "update", PrevNode: &Node{Key: "/foo", Value: "baz", ModifiedIndex: 10, CreatedIndex: 10}, Node: &Node{Key: "/foo", Value: "bar", ModifiedIndex: 12, CreatedIndex: 10}},
+			false,
+		},
+
+		// Garbage in body
+		{
+			`garbage`,
+			nil,
+			true,
+		},
+	}
+
+	for i, tt := range tests {
+		res, err := unmarshalSuccessfulResponse([]byte(tt.body))
+		if tt.expectError != (err != nil) {
+			t.Errorf("#%d: expectError=%t, err=%v", i, tt.expectError, err)
+		}
+
+		if (res == nil) != (tt.res == nil) {
+			t.Errorf("#%d: received res==%v, but expected res==%v", i, res, tt.res)
+			continue
+		} else if tt.res == nil {
+			// expected and succesfully got nil response
+			continue
+		}
+
+		if res.Action != tt.res.Action {
+			t.Errorf("#%d: Action=%s, expected %s", i, res.Action, tt.res.Action)
+		}
+
+		if !reflect.DeepEqual(res.Node, tt.res.Node) {
+			t.Errorf("#%d: Node=%v, expected %v", i, res.Node, tt.res.Node)
+		}
+	}
+}
+
+func TestUnmarshalErrorResponse(t *testing.T) {
+	unrecognized := errors.New("test fixture")
+
+	tests := []struct {
+		code int
+		want error
+	}{
+		{http.StatusBadRequest, unrecognized},
+		{http.StatusUnauthorized, unrecognized},
+		{http.StatusPaymentRequired, unrecognized},
+		{http.StatusForbidden, unrecognized},
+		{http.StatusNotFound, ErrKeyNoExist},
+		{http.StatusMethodNotAllowed, unrecognized},
+		{http.StatusNotAcceptable, unrecognized},
+		{http.StatusProxyAuthRequired, unrecognized},
+		{http.StatusRequestTimeout, unrecognized},
+		{http.StatusConflict, unrecognized},
+		{http.StatusGone, unrecognized},
+		{http.StatusLengthRequired, unrecognized},
+		{http.StatusPreconditionFailed, ErrKeyExists},
+		{http.StatusRequestEntityTooLarge, unrecognized},
+		{http.StatusRequestURITooLong, unrecognized},
+		{http.StatusUnsupportedMediaType, unrecognized},
+		{http.StatusRequestedRangeNotSatisfiable, unrecognized},
+		{http.StatusExpectationFailed, unrecognized},
+		{http.StatusTeapot, unrecognized},
+
+		{http.StatusInternalServerError, ErrNoLeader},
+		{http.StatusNotImplemented, unrecognized},
+		{http.StatusBadGateway, unrecognized},
+		{http.StatusServiceUnavailable, unrecognized},
+		{http.StatusGatewayTimeout, unrecognized},
+		{http.StatusHTTPVersionNotSupported, unrecognized},
+	}
+
+	for i, tt := range tests {
+		want := tt.want
+		if reflect.DeepEqual(unrecognized, want) {
+			want = fmt.Errorf("unrecognized HTTP status code %d", tt.code)
+		}
+
+		got := unmarshalErrorResponse(tt.code)
+		if !reflect.DeepEqual(want, got) {
+			t.Errorf("#%d: want=%v, got=%v", i, want, got)
+		}
+	}
+}
diff --git a/discovery/discovery.go b/discovery/discovery.go
index 5cf7bdc42..996394066 100644
--- a/discovery/discovery.go
+++ b/discovery/discovery.go
@@ -58,7 +58,7 @@ type discovery struct {
 	cluster string
 	id      uint64
 	config  string
-	c       client.Client
+	c       client.KeysAPI
 	retries uint
 	url     *url.URL
 
@@ -105,13 +105,13 @@ func New(durl string, id uint64, config string) (Discoverer, error) {
 	if err != nil {
 		return nil, err
 	}
-	c, err := client.NewHTTPClient(&http.Transport{Proxy: pf}, u.String(), time.Second*5)
+	c, err := client.NewKeysAPI(&http.Transport{Proxy: pf}, u.String(), time.Second*5)
 	if err != nil {
 		return nil, err
 	}
 	// discovery service redirects /[key] to /v2/keys/[key]
 	// set the prefix of client to "" to handle this
-	c.SetPrefix("")
+	c.SetAPIPrefix("")
 	return &discovery{
 		cluster: token,
 		id:      id,
diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go
index becf164ee..20be66894 100644
--- a/discovery/discovery_test.go
+++ b/discovery/discovery_test.go
@@ -304,7 +304,7 @@ func TestCreateSelf(t *testing.T) {
 	errwc := &clientWithResp{rs, errw}
 
 	tests := []struct {
-		c    client.Client
+		c    client.KeysAPI
 		werr error
 	}{
 		// no error