From 53960374502709d6da36eb378d768e3e6af931ec Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 7 Nov 2014 10:01:52 -0800 Subject: [PATCH] integration: add basic discovery tests --- integration/cluster_test.go | 166 ++++++++++++++++++++++++--------- integration/v2_http_kv_test.go | 24 ++--- 2 files changed, 134 insertions(+), 56 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index f61dfae17..f14f61ecd 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -25,6 +25,7 @@ import ( "net/http/httptest" "os" "strings" + "sync" "testing" "time" @@ -53,69 +54,110 @@ func TestClusterOf3(t *testing.T) { testCluster(t, 3) } func testCluster(t *testing.T, size int) { defer afterTest(t) - c := &cluster{Size: size} + c := NewCluster(t, size) c.Launch(t) defer c.Terminate(t) - for i := 0; i < size; i++ { - for j, u := range c.Members[i].ClientURLs { - cc := mustNewHTTPClient(t, []string{u.String()}) - kapi := client.NewKeysAPI(cc) - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - if _, err := kapi.Create(ctx, fmt.Sprintf("/%d%d", i, j), "bar", -1); err != nil { - t.Errorf("create on %s error: %v", u.String(), err) - } - cancel() + for i, u := range c.URLs() { + cc := mustNewHTTPClient(t, []string{u}) + kapi := client.NewKeysAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil { + t.Errorf("create on %s error: %v", u, err) } + cancel() } } -type cluster struct { - Size int - Members []member +func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) } +func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3) } + +func testClusterUsingDiscovery(t *testing.T, size int) { + defer afterTest(t) + dc := NewCluster(t, 1) + dc.Launch(t) + defer dc.Terminate(t) + // init discovery token space + dcc := mustNewHTTPClient(t, dc.URLs()) + dkapi := client.NewKeysAPI(dcc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size), -1); err != nil { + t.Fatal(err) + } + cancel() + + c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys") + c.Launch(t) + defer c.Terminate(t) + + for i, u := range c.URLs() { + cc := mustNewHTTPClient(t, []string{u}) + kapi := client.NewKeysAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil { + t.Errorf("create on %s error: %v", u, err) + } + cancel() + } } // TODO: support TLS -func (c *cluster) Launch(t *testing.T) { - if c.Size <= 0 { - t.Fatalf("cluster size <= 0") - } +type cluster struct { + Members []*member +} - lns := make([]net.Listener, c.Size) - addrs := make([]string, c.Size) - for i := 0; i < c.Size; i++ { - l := newLocalListener(t) - // each member claims only one peer listener - lns[i] = l - addrs[i] = fmt.Sprintf("%v=%v", c.name(i), "http://"+l.Addr().String()) +// NewCluster returns an unlaunched cluster of the given size which has been +// set to use static bootstrap. +func NewCluster(t *testing.T, size int) *cluster { + c := &cluster{} + ms := make([]*member, size) + for i := 0; i < size; i++ { + ms[i] = newMember(t, c.name(i)) + } + c.Members = ms + + addrs := make([]string, 0) + for _, m := range ms { + for _, l := range m.PeerListeners { + addrs = append(addrs, fmt.Sprintf("%s=%s", m.Name, "http://"+l.Addr().String())) + } } clusterStr := strings.Join(addrs, ",") - var err error - for i := 0; i < c.Size; i++ { - m := member{} - m.PeerListeners = []net.Listener{lns[i]} - cln := newLocalListener(t) - m.ClientListeners = []net.Listener{cln} - m.Name = c.name(i) - m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) - if err != nil { - t.Fatal(err) - } - m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") - if err != nil { - t.Fatal(err) - } + for _, m := range ms { m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr) if err != nil { t.Fatal(err) } - m.NewCluster = true - m.Transport = newTransport() - - m.Launch(t) - c.Members = append(c.Members, m) } + return c +} + +// NewClusterUsingDiscovery returns an unlaunched cluster of the given size +// which has been set to use the given url as discovery service to bootstrap. +func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster { + c := &cluster{} + ms := make([]*member, size) + for i := 0; i < size; i++ { + ms[i] = newMember(t, c.name(i)) + ms[i].DiscoveryURL = url + } + c.Members = ms + return c +} + +func (c *cluster) Launch(t *testing.T) { + var wg sync.WaitGroup + for _, m := range c.Members { + wg.Add(1) + // Members are launched in separate goroutines because if they boot + // using discovery url, they have to wait for others to register to continue. + go func(m *member) { + m.Launch(t) + wg.Done() + }(m) + } + wg.Wait() // wait cluster to be stable to receive future client requests c.waitClientURLsPublished(t) } @@ -124,6 +166,16 @@ func (c *cluster) URL(i int) string { return c.Members[i].ClientURLs[0].String() } +func (c *cluster) URLs() []string { + urls := make([]string, 0) + for _, m := range c.Members { + for _, u := range m.ClientURLs { + urls = append(urls, u.String()) + } + } + return urls +} + func (c *cluster) Terminate(t *testing.T) { for _, m := range c.Members { m.Terminate(t) @@ -181,6 +233,32 @@ type member struct { hss []*httptest.Server } +func newMember(t *testing.T, name string) *member { + var err error + m := &member{} + pln := newLocalListener(t) + m.PeerListeners = []net.Listener{pln} + cln := newLocalListener(t) + m.ClientListeners = []net.Listener{cln} + m.Name = name + m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) + if err != nil { + t.Fatal(err) + } + m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") + if err != nil { + t.Fatal(err) + } + clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String()) + m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr) + if err != nil { + t.Fatal(err) + } + m.NewCluster = true + m.Transport = newTransport() + return m +} + // Launch starts a member based on ServerConfig, PeerListeners // and ClientListeners. func (m *member) Launch(t *testing.T) { diff --git a/integration/v2_http_kv_test.go b/integration/v2_http_kv_test.go index 09d4be5cb..6047187b7 100644 --- a/integration/v2_http_kv_test.go +++ b/integration/v2_http_kv_test.go @@ -36,7 +36,7 @@ func init() { } func TestV2Set(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -88,7 +88,7 @@ func TestV2Set(t *testing.T) { } func TestV2CreateUpdate(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -195,7 +195,7 @@ func TestV2CreateUpdate(t *testing.T) { } func TestV2CAS(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -320,7 +320,7 @@ func TestV2CAS(t *testing.T) { } func TestV2Delete(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -416,7 +416,7 @@ func TestV2Delete(t *testing.T) { } func TestV2CAD(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -512,7 +512,7 @@ func TestV2CAD(t *testing.T) { } func TestV2Unique(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -575,7 +575,7 @@ func TestV2Unique(t *testing.T) { } func TestV2Get(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -669,7 +669,7 @@ func TestV2Get(t *testing.T) { } func TestV2QuorumGet(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -763,7 +763,7 @@ func TestV2QuorumGet(t *testing.T) { } func TestV2Watch(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -808,7 +808,7 @@ func TestV2Watch(t *testing.T) { } func TestV2WatchWithIndex(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -865,7 +865,7 @@ func TestV2WatchWithIndex(t *testing.T) { } func TestV2WatchKeyInDir(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -913,7 +913,7 @@ func TestV2WatchKeyInDir(t *testing.T) { } func TestV2Head(t *testing.T) { - cl := cluster{Size: 1} + cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t)