diff --git a/client/cluster.go b/client/cluster.go deleted file mode 100644 index b57f57c8d..000000000 --- a/client/cluster.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - Copyright 2014 CoreOS, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package client - -import ( - "net/http" - "net/url" - - "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" -) - -func newHTTPClusterClient(tr *http.Transport, eps []string) (*httpClusterClient, error) { - c := httpClusterClient{ - endpoints: make([]*httpClient, len(eps)), - } - - for i, ep := range eps { - u, err := url.Parse(ep) - if err != nil { - return nil, err - } - - c.endpoints[i] = &httpClient{ - transport: tr, - endpoint: *u, - } - } - - return &c, nil -} - -type httpClusterClient struct { - endpoints []*httpClient -} - -func (c *httpClusterClient) do(ctx context.Context, act httpAction) (int, []byte, error) { - //TODO(bcwaldon): introduce retry logic so all endpoints are attempted - return c.endpoints[0].do(ctx, act) -} diff --git a/client/http.go b/client/http.go index 5bdf0eebe..a5ae7e56f 100644 --- a/client/http.go +++ b/client/http.go @@ -30,20 +30,80 @@ var ( DefaultRequestTimeout = 5 * time.Second ) -// transport mimics http.Transport to provide an interface which can be +type SyncableHTTPClient interface { + HTTPClient + Sync(context.Context) error +} + +type HTTPClient interface { + Do(context.Context, HTTPAction) (*http.Response, []byte, error) +} + +type HTTPAction interface { + HTTPRequest(url.URL) *http.Request +} + +// CancelableTransport mimics http.Transport to provide an interface which can be // substituted for testing (since the RoundTripper interface alone does not // require the CancelRequest method) -type transport interface { +type CancelableTransport interface { http.RoundTripper CancelRequest(req *http.Request) } -type httpAction interface { - httpRequest(url.URL) *http.Request +func NewHTTPClient(tr CancelableTransport, eps []string) (SyncableHTTPClient, error) { + return newHTTPClusterClient(tr, eps) } -type httpActionDo interface { - do(context.Context, httpAction) (int, []byte, error) +func newHTTPClusterClient(tr CancelableTransport, eps []string) (*httpClusterClient, error) { + c := httpClusterClient{ + transport: tr, + endpoints: make([]HTTPClient, len(eps)), + } + + for i, ep := range eps { + u, err := url.Parse(ep) + if err != nil { + return nil, err + } + + c.endpoints[i] = &httpClient{ + transport: tr, + endpoint: *u, + } + } + + return &c, nil +} + +type httpClusterClient struct { + transport CancelableTransport + endpoints []HTTPClient +} + +func (c *httpClusterClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) { + //TODO(bcwaldon): introduce retry logic so all endpoints are attempted + return c.endpoints[0].Do(ctx, act) +} + +func (c *httpClusterClient) Sync(ctx context.Context) error { + mAPI := NewMembersAPI(c) + ms, err := mAPI.List(ctx) + if err != nil { + return err + } + + eps := make([]string, 0) + for _, m := range ms { + eps = append(eps, m.ClientURLs...) + } + nc, err := newHTTPClusterClient(c.transport, eps) + if err != nil { + return err + } + + *c = *nc + return nil } type roundTripResponse struct { @@ -52,13 +112,12 @@ type roundTripResponse struct { } type httpClient struct { - transport transport + transport CancelableTransport endpoint url.URL - timeout time.Duration } -func (c *httpClient) do(ctx context.Context, act httpAction) (int, []byte, error) { - req := act.httpRequest(c.endpoint) +func (c *httpClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) { + req := act.HTTPRequest(c.endpoint) rtchan := make(chan roundTripResponse, 1) go func() { @@ -89,9 +148,9 @@ func (c *httpClient) do(ctx context.Context, act httpAction) (int, []byte, error }() if err != nil { - return 0, nil, err + return nil, nil, err } body, err := ioutil.ReadAll(resp.Body) - return resp.StatusCode, body, err + return resp, body, err } diff --git a/client/http_test.go b/client/http_test.go index 3ae6c4d8c..33062b51e 100644 --- a/client/http_test.go +++ b/client/http_test.go @@ -65,7 +65,7 @@ func (t *fakeTransport) CancelRequest(*http.Request) { type fakeAction struct{} -func (a *fakeAction) httpRequest(url.URL) *http.Request { +func (a *fakeAction) HTTPRequest(url.URL) *http.Request { return &http.Request{} } @@ -78,14 +78,14 @@ func TestHTTPClientDoSuccess(t *testing.T) { Body: ioutil.NopCloser(strings.NewReader("foo")), } - code, body, err := c.do(context.Background(), &fakeAction{}) + resp, body, err := c.Do(context.Background(), &fakeAction{}) if err != nil { t.Fatalf("incorrect error value: want=nil got=%v", err) } wantCode := http.StatusTeapot - if wantCode != code { - t.Fatalf("invalid response code: want=%d got=%d", wantCode, code) + if wantCode != resp.StatusCode { + t.Fatalf("invalid response code: want=%d got=%d", wantCode, resp.StatusCode) } wantBody := []byte("foo") @@ -100,7 +100,7 @@ func TestHTTPClientDoError(t *testing.T) { tr.errchan <- errors.New("fixture") - _, _, err := c.do(context.Background(), &fakeAction{}) + _, _, err := c.Do(context.Background(), &fakeAction{}) if err == nil { t.Fatalf("expected non-nil error, got nil") } @@ -113,7 +113,7 @@ func TestHTTPClientDoCancelContext(t *testing.T) { tr.startCancel <- struct{}{} tr.finishCancel <- struct{}{} - _, _, err := c.do(context.Background(), &fakeAction{}) + _, _, err := c.Do(context.Background(), &fakeAction{}) if err == nil { t.Fatalf("expected non-nil error, got nil") } @@ -126,7 +126,7 @@ func TestHTTPClientDoCancelContextWaitForRoundTrip(t *testing.T) { donechan := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) go func() { - c.do(ctx, &fakeAction{}) + c.Do(ctx, &fakeAction{}) close(donechan) }() diff --git a/client/keys.go b/client/keys.go index 4235c05cc..252c599cf 100644 --- a/client/keys.go +++ b/client/keys.go @@ -41,38 +41,30 @@ var ( ErrKeyExists = errors.New("client: key already exists") ) -func NewKeysAPI(tr *http.Transport, eps []string, to time.Duration) (KeysAPI, error) { - return newHTTPKeysAPIWithPrefix(tr, eps, to, DefaultV2KeysPrefix) +func NewKeysAPI(c HTTPClient) KeysAPI { + return &httpKeysAPI{ + client: c, + prefix: DefaultV2KeysPrefix, + } } -func NewDiscoveryKeysAPI(tr *http.Transport, eps []string, to time.Duration) (KeysAPI, error) { - return newHTTPKeysAPIWithPrefix(tr, eps, to, "") -} - -func newHTTPKeysAPIWithPrefix(tr *http.Transport, eps []string, to time.Duration, prefix string) (*httpKeysAPI, error) { - c, err := newHTTPClusterClient(tr, eps) - if err != nil { - return nil, err +func NewDiscoveryKeysAPI(c HTTPClient) KeysAPI { + return &httpKeysAPI{ + client: c, + prefix: "", } - - kAPI := httpKeysAPI{ - client: c, - prefix: prefix, - timeout: to, - } - - return &kAPI, nil } type KeysAPI interface { - Create(key, value string, ttl time.Duration) (*Response, error) - Get(key string) (*Response, error) + Create(ctx context.Context, key, value string, ttl time.Duration) (*Response, error) + Get(ctx context.Context, key string) (*Response, error) + Watch(key string, idx uint64) Watcher RecursiveWatch(key string, idx uint64) Watcher } type Watcher interface { - Next() (*Response, error) + Next(context.Context) (*Response, error) } type Response struct { @@ -95,12 +87,11 @@ func (n *Node) String() string { } type httpKeysAPI struct { - client httpActionDo - prefix string - timeout time.Duration + client HTTPClient + prefix string } -func (k *httpKeysAPI) Create(key, val string, ttl time.Duration) (*Response, error) { +func (k *httpKeysAPI) Create(ctx context.Context, key, val string, ttl time.Duration) (*Response, error) { create := &createAction{ Prefix: k.prefix, Key: key, @@ -111,31 +102,27 @@ func (k *httpKeysAPI) Create(key, val string, ttl time.Duration) (*Response, err create.TTL = &uttl } - ctx, cancel := context.WithTimeout(context.Background(), k.timeout) - code, body, err := k.client.do(ctx, create) - cancel() + resp, body, err := k.client.Do(ctx, create) if err != nil { return nil, err } - return unmarshalHTTPResponse(code, body) + return unmarshalHTTPResponse(resp.StatusCode, body) } -func (k *httpKeysAPI) Get(key string) (*Response, error) { +func (k *httpKeysAPI) Get(ctx context.Context, key string) (*Response, error) { get := &getAction{ Prefix: k.prefix, Key: key, Recursive: false, } - ctx, cancel := context.WithTimeout(context.Background(), k.timeout) - code, body, err := k.client.do(ctx, get) - cancel() + resp, body, err := k.client.Do(ctx, get) if err != nil { return nil, err } - return unmarshalHTTPResponse(code, body) + return unmarshalHTTPResponse(resp.StatusCode, body) } func (k *httpKeysAPI) Watch(key string, idx uint64) Watcher { @@ -163,18 +150,17 @@ func (k *httpKeysAPI) RecursiveWatch(key string, idx uint64) Watcher { } type httpWatcher struct { - client httpActionDo + client HTTPClient nextWait waitAction } -func (hw *httpWatcher) Next() (*Response, error) { - //TODO(bcwaldon): This needs to be cancellable by the calling user - code, body, err := hw.client.do(context.Background(), &hw.nextWait) +func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) { + httpresp, body, err := hw.client.Do(ctx, &hw.nextWait) if err != nil { return nil, err } - resp, err := unmarshalHTTPResponse(code, body) + resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body) if err != nil { return nil, err } @@ -199,7 +185,7 @@ type getAction struct { Recursive bool } -func (g *getAction) httpRequest(ep url.URL) *http.Request { +func (g *getAction) HTTPRequest(ep url.URL) *http.Request { u := v2KeysURL(ep, g.Prefix, g.Key) params := u.Query() @@ -217,7 +203,7 @@ type waitAction struct { Recursive bool } -func (w *waitAction) httpRequest(ep url.URL) *http.Request { +func (w *waitAction) HTTPRequest(ep url.URL) *http.Request { u := v2KeysURL(ep, w.Prefix, w.Key) params := u.Query() @@ -237,7 +223,7 @@ type createAction struct { TTL *uint64 } -func (c *createAction) httpRequest(ep url.URL) *http.Request { +func (c *createAction) HTTPRequest(ep url.URL) *http.Request { u := v2KeysURL(ep, c.Prefix, c.Key) params := u.Query() diff --git a/client/keys_test.go b/client/keys_test.go index ad7b4b1c0..1b632e6b5 100644 --- a/client/keys_test.go +++ b/client/keys_test.go @@ -117,7 +117,7 @@ func TestGetAction(t *testing.T) { Key: "/foo/bar", Recursive: tt.recursive, } - got := *f.httpRequest(ep) + got := *f.HTTPRequest(ep) wantURL := wantURL wantURL.RawQuery = tt.wantQuery @@ -166,7 +166,7 @@ func TestWaitAction(t *testing.T) { WaitIndex: tt.waitIndex, Recursive: tt.recursive, } - got := *f.httpRequest(ep) + got := *f.HTTPRequest(ep) wantURL := wantURL wantURL.RawQuery = tt.wantQuery @@ -213,7 +213,7 @@ func TestCreateAction(t *testing.T) { Value: tt.value, TTL: tt.ttl, } - got := *f.httpRequest(ep) + got := *f.HTTPRequest(ep) err := assertResponse(got, wantURL, wantHeader, []byte(tt.wantBody)) if err != nil { diff --git a/client/members.go b/client/members.go index 780bf28cb..3d0507861 100644 --- a/client/members.go +++ b/client/members.go @@ -23,7 +23,6 @@ import ( "net/http" "net/url" "path" - "time" "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" @@ -34,41 +33,30 @@ var ( DefaultV2MembersPrefix = "/v2/members" ) -func NewMembersAPI(tr *http.Transport, eps []string, to time.Duration) (MembersAPI, error) { - c, err := newHTTPClusterClient(tr, eps) - if err != nil { - return nil, err +func NewMembersAPI(c HTTPClient) MembersAPI { + return &httpMembersAPI{ + client: c, } - - mAPI := httpMembersAPI{ - client: c, - timeout: to, - } - - return &mAPI, nil } type MembersAPI interface { - List() ([]httptypes.Member, error) - Add(peerURL string) (*httptypes.Member, error) - Remove(mID string) error + List(ctx context.Context) ([]httptypes.Member, error) + Add(ctx context.Context, peerURL string) (*httptypes.Member, error) + Remove(ctx context.Context, mID string) error } type httpMembersAPI struct { - client httpActionDo - timeout time.Duration + client HTTPClient } -func (m *httpMembersAPI) List() ([]httptypes.Member, error) { +func (m *httpMembersAPI) List(ctx context.Context) ([]httptypes.Member, error) { req := &membersAPIActionList{} - ctx, cancel := context.WithTimeout(context.Background(), m.timeout) - code, body, err := m.client.do(ctx, req) - cancel() + resp, body, err := m.client.Do(ctx, req) if err != nil { return nil, err } - if err := assertStatusCode(http.StatusOK, code); err != nil { + if err := assertStatusCode(http.StatusOK, resp.StatusCode); err != nil { return nil, err } @@ -80,21 +68,19 @@ func (m *httpMembersAPI) List() ([]httptypes.Member, error) { return []httptypes.Member(mCollection), nil } -func (m *httpMembersAPI) Add(peerURL string) (*httptypes.Member, error) { +func (m *httpMembersAPI) Add(ctx context.Context, peerURL string) (*httptypes.Member, error) { urls, err := types.NewURLs([]string{peerURL}) if err != nil { return nil, err } req := &membersAPIActionAdd{peerURLs: urls} - ctx, cancel := context.WithTimeout(context.Background(), m.timeout) - code, body, err := m.client.do(ctx, req) - cancel() + resp, body, err := m.client.Do(ctx, req) if err != nil { return nil, err } - if err := assertStatusCode(http.StatusCreated, code); err != nil { + if err := assertStatusCode(http.StatusCreated, resp.StatusCode); err != nil { return nil, err } @@ -106,21 +92,19 @@ func (m *httpMembersAPI) Add(peerURL string) (*httptypes.Member, error) { return &memb, nil } -func (m *httpMembersAPI) Remove(memberID string) error { +func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error { req := &membersAPIActionRemove{memberID: memberID} - ctx, cancel := context.WithTimeout(context.Background(), m.timeout) - code, _, err := m.client.do(ctx, req) - cancel() + resp, _, err := m.client.Do(ctx, req) if err != nil { return err } - return assertStatusCode(http.StatusNoContent, code) + return assertStatusCode(http.StatusNoContent, resp.StatusCode) } type membersAPIActionList struct{} -func (l *membersAPIActionList) httpRequest(ep url.URL) *http.Request { +func (l *membersAPIActionList) HTTPRequest(ep url.URL) *http.Request { u := v2MembersURL(ep) req, _ := http.NewRequest("GET", u.String(), nil) return req @@ -130,7 +114,7 @@ type membersAPIActionRemove struct { memberID string } -func (d *membersAPIActionRemove) httpRequest(ep url.URL) *http.Request { +func (d *membersAPIActionRemove) HTTPRequest(ep url.URL) *http.Request { u := v2MembersURL(ep) u.Path = path.Join(u.Path, d.memberID) req, _ := http.NewRequest("DELETE", u.String(), nil) @@ -141,7 +125,7 @@ type membersAPIActionAdd struct { peerURLs types.URLs } -func (a *membersAPIActionAdd) httpRequest(ep url.URL) *http.Request { +func (a *membersAPIActionAdd) HTTPRequest(ep url.URL) *http.Request { u := v2MembersURL(ep) m := httptypes.MemberCreateRequest{PeerURLs: a.peerURLs} b, _ := json.Marshal(&m) diff --git a/client/members_test.go b/client/members_test.go index 8d1534e49..bbc6c9221 100644 --- a/client/members_test.go +++ b/client/members_test.go @@ -35,7 +35,7 @@ func TestMembersAPIActionList(t *testing.T) { Path: "/v2/members", } - got := *act.httpRequest(ep) + got := *act.HTTPRequest(ep) err := assertResponse(got, wantURL, http.Header{}, nil) if err != nil { t.Error(err.Error()) @@ -61,7 +61,7 @@ func TestMembersAPIActionAdd(t *testing.T) { } wantBody := []byte(`{"peerURLs":["https://127.0.0.1:8081","http://127.0.0.1:8080"]}`) - got := *act.httpRequest(ep) + got := *act.HTTPRequest(ep) err := assertResponse(got, wantURL, wantHeader, wantBody) if err != nil { t.Error(err.Error()) @@ -78,7 +78,7 @@ func TestMembersAPIActionRemove(t *testing.T) { Path: "/v2/members/XXX", } - got := *act.httpRequest(ep) + got := *act.HTTPRequest(ep) err := assertResponse(got, wantURL, http.Header{}, nil) if err != nil { t.Error(err.Error()) diff --git a/discovery/discovery.go b/discovery/discovery.go index b16f38713..94d35b9f3 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -29,6 +29,7 @@ import ( "strings" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" "github.com/coreos/etcd/client" "github.com/coreos/etcd/pkg/types" @@ -106,15 +107,16 @@ func New(durl string, id types.ID, config string) (Discoverer, error) { if err != nil { return nil, err } - c, err := client.NewDiscoveryKeysAPI(&http.Transport{Proxy: pf}, []string{u.String()}, client.DefaultRequestTimeout) + c, err := client.NewHTTPClient(&http.Transport{Proxy: pf}, []string{u.String()}) if err != nil { return nil, err } + dc := client.NewDiscoveryKeysAPI(c) return &discovery{ cluster: token, id: id, config: config, - c: c, + c: dc, url: u, clock: clockwork.NewRealClock(), }, nil @@ -149,21 +151,25 @@ func (d *discovery) Discover() (string, error) { } func (d *discovery) createSelf() error { - resp, err := d.c.Create(d.selfKey(), d.config, -1) + ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) + resp, err := d.c.Create(ctx, d.selfKey(), d.config, -1) + cancel() if err != nil { return err } // ensure self appears on the server we connected to w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex) - _, err = w.Next() + _, err = w.Next(context.Background()) return err } func (d *discovery) checkCluster() (client.Nodes, int, error) { configKey := path.Join("/", d.cluster, "_config") + ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) // find cluster size - resp, err := d.c.Get(path.Join(configKey, "size")) + resp, err := d.c.Get(ctx, path.Join(configKey, "size")) + cancel() if err != nil { if err == client.ErrKeyNoExist { return nil, 0, ErrSizeNotFound @@ -178,7 +184,9 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) { return nil, 0, ErrBadSizeKey } - resp, err = d.c.Get(d.cluster) + ctx, cancel = context.WithTimeout(context.Background(), client.DefaultRequestTimeout) + resp, err = d.c.Get(ctx, d.cluster) + cancel() if err != nil { if err == client.ErrTimeout { return d.checkClusterRetry() @@ -253,7 +261,7 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error // wait for others for len(all) < size { log.Printf("discovery: found %d peer(s), waiting for %d more", len(all), size-len(all)) - resp, err := w.Next() + resp, err := w.Next(context.Background()) if err != nil { if err == client.ErrTimeout { return d.waitNodesRetry() diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 86bd9569a..8501b645e 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -21,13 +21,13 @@ import ( "math/rand" "net/http" "os" + "reflect" "sort" "strconv" - - "reflect" "testing" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" "github.com/coreos/etcd/client" ) @@ -397,7 +397,7 @@ type clientWithResp struct { w client.Watcher } -func (c *clientWithResp) Create(key string, value string, ttl time.Duration) (*client.Response, error) { +func (c *clientWithResp) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) { if len(c.rs) == 0 { return &client.Response{}, nil } @@ -406,7 +406,7 @@ func (c *clientWithResp) Create(key string, value string, ttl time.Duration) (*c return r, nil } -func (c *clientWithResp) Get(key string) (*client.Response, error) { +func (c *clientWithResp) Get(ctx context.Context, key string) (*client.Response, error) { if len(c.rs) == 0 { return &client.Response{}, client.ErrKeyNoExist } @@ -428,11 +428,11 @@ type clientWithErr struct { w client.Watcher } -func (c *clientWithErr) Create(key string, value string, ttl time.Duration) (*client.Response, error) { +func (c *clientWithErr) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) { return &client.Response{}, c.err } -func (c *clientWithErr) Get(key string) (*client.Response, error) { +func (c *clientWithErr) Get(ctx context.Context, key string) (*client.Response, error) { return &client.Response{}, c.err } @@ -448,7 +448,7 @@ type watcherWithResp struct { rs []*client.Response } -func (w *watcherWithResp) Next() (*client.Response, error) { +func (w *watcherWithResp) Next(context.Context) (*client.Response, error) { if len(w.rs) == 0 { return &client.Response{}, nil } @@ -461,7 +461,7 @@ type watcherWithErr struct { err error } -func (w *watcherWithErr) Next() (*client.Response, error) { +func (w *watcherWithErr) Next(context.Context) (*client.Response, error) { return &client.Response{}, w.err } @@ -472,20 +472,20 @@ type clientWithRetry struct { failTimes int } -func (c *clientWithRetry) Create(key string, value string, ttl time.Duration) (*client.Response, error) { +func (c *clientWithRetry) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) { if c.failCount < c.failTimes { c.failCount++ return nil, client.ErrTimeout } - return c.clientWithResp.Create(key, value, ttl) + return c.clientWithResp.Create(ctx, key, value, ttl) } -func (c *clientWithRetry) Get(key string) (*client.Response, error) { +func (c *clientWithRetry) Get(ctx context.Context, key string) (*client.Response, error) { if c.failCount < c.failTimes { c.failCount++ return nil, client.ErrTimeout } - return c.clientWithResp.Get(key) + return c.clientWithResp.Get(ctx, key) } // watcherWithRetry will timeout all requests up to failTimes @@ -495,7 +495,7 @@ type watcherWithRetry struct { failTimes int } -func (w *watcherWithRetry) Next() (*client.Response, error) { +func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) { if w.failCount < w.failTimes { w.failCount++ return nil, client.ErrTimeout diff --git a/etcdctl/command/member_commands.go b/etcdctl/command/member_commands.go index 85a5ccb50..143ae92ce 100644 --- a/etcdctl/command/member_commands.go +++ b/etcdctl/command/member_commands.go @@ -6,6 +6,7 @@ import ( "os" "strings" + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" "github.com/coreos/etcd/client" ) @@ -42,13 +43,23 @@ func mustNewMembersAPI(c *cli.Context) client.MembersAPI { } } - mAPI, err := client.NewMembersAPI(&http.Transport{}, peers, client.DefaultRequestTimeout) + hc, err := client.NewHTTPClient(&http.Transport{}, peers) if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) } - return mAPI + if !c.GlobalBool("no-sync") { + ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) + err := hc.Sync(ctx) + cancel() + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + } + + return client.NewMembersAPI(hc) } func actionMemberList(c *cli.Context) { @@ -57,7 +68,9 @@ func actionMemberList(c *cli.Context) { os.Exit(1) } mAPI := mustNewMembersAPI(c) - members, err := mAPI.List() + ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) + members, err := mAPI.List(ctx) + cancel() if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) @@ -78,7 +91,9 @@ func actionMemberAdd(c *cli.Context) { mAPI := mustNewMembersAPI(c) url := args[1] - m, err := mAPI.Add(url) + ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) + m, err := mAPI.Add(ctx, url) + cancel() if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) @@ -88,7 +103,9 @@ func actionMemberAdd(c *cli.Context) { newName := args[0] fmt.Printf("Added member named %s with ID %s to cluster\n", newName, newID) - members, err := mAPI.List() + ctx, cancel = context.WithTimeout(context.Background(), client.DefaultRequestTimeout) + members, err := mAPI.List(ctx) + cancel() if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) @@ -120,7 +137,10 @@ func actionMemberRemove(c *cli.Context) { mAPI := mustNewMembersAPI(c) mID := args[0] - if err := mAPI.Remove(mID); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) + err := mAPI.Remove(ctx, mID) + cancel() + if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) }