diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 9f4e0cb33..028aa92a0 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -23,21 +23,23 @@ import ( "net" "net/http" "net/http/httptest" - "net/url" "os" "strings" "testing" "time" + "github.com/coreos/etcd/client" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdhttp" - "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" ) const ( - tickDuration = 10 * time.Millisecond - clusterName = "etcd" + tickDuration = 10 * time.Millisecond + clusterName = "etcd" + requestTimeout = 2 * time.Second ) func init() { @@ -52,41 +54,27 @@ func testCluster(t *testing.T, size int) { defer afterTest(t) c := &cluster{Size: size} c.Launch(t) + defer c.Terminate(t) for i := 0; i < size; i++ { - for _, u := range c.Members[i].ClientURLs { + for j, u := range c.Members[i].ClientURLs { + cc := mustNewHTTPClient(t, []string{u.String()}) + kapi := client.NewKeysAPI(cc) + // TODO: we retry it here because MsgProp may be dropped due to + // sender reaches its max serving. make it reliable that we don't + // need to worry about it. var err error - for j := 0; j < 3; j++ { - if err = setKey(u, "/foo", "bar"); err == nil { + for k := 0; k < 3; k++ { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if _, err = kapi.Create(ctx, fmt.Sprintf("/%d%d%d", i, j, k), "bar", -1); err == nil { break } + cancel() } if err != nil { - t.Errorf("setKey on %v error: %v", u.String(), err) + t.Errorf("create on %s error: %v", u.String(), err) } } } - c.Terminate(t) -} - -// TODO: use etcd client -func setKey(u url.URL, key string, value string) error { - u.Path = "/v2/keys" + key - v := url.Values{"value": []string{value}} - req, err := http.NewRequest("PUT", u.String(), strings.NewReader(v.Encode())) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - ioutil.ReadAll(resp.Body) - resp.Body.Close() - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return fmt.Errorf("statusCode = %d, want %d or %d", resp.StatusCode, http.StatusOK, http.StatusCreated) - } - return nil } type cluster struct { @@ -130,13 +118,7 @@ func (c *cluster) Launch(t *testing.T) { t.Fatal(err) } m.NewCluster = true - m.Transport, err = transport.NewTransport(transport.TLSInfo{}) - if err != nil { - t.Fatal(err) - } - // TODO: need the support of graceful stop in Sender to remove this - m.Transport.DisableKeepAlives = true - m.Transport.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial + m.Transport = newTransport() m.Launch(t) c.Members = append(c.Members, m) @@ -223,3 +205,19 @@ func (m *member) Terminate(t *testing.T) { t.Fatal(err) } } + +func mustNewHTTPClient(t *testing.T, eps []string) client.HTTPClient { + cc, err := client.NewHTTPClient(newTransport(), eps) + if err != nil { + t.Fatal(err) + } + return cc +} + +func newTransport() *http.Transport { + tr := &http.Transport{} + // TODO: need the support of graceful stop in Sender to remove this + tr.DisableKeepAlives = true + tr.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial + return tr +}