From 071ebb9feb865681a018eacb7754fbebe0a12104 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sat, 8 Nov 2014 14:03:29 -0800 Subject: [PATCH] integration: wait cluster to be stable before return launch The wait ensures that cluster goes into the stable stage, which means that leader has been elected and starts to heartbeat to followers. This makes future client requests always handled in time, and there is no need to retry sending requests. --- integration/cluster_test.go | 50 ++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 028aa92a0..f61dfae17 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -31,6 +31,7 @@ import ( "github.com/coreos/etcd/client" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdhttp" + "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -59,20 +60,11 @@ func testCluster(t *testing.T, size int) { for j, u := range c.Members[i].ClientURLs { cc := mustNewHTTPClient(t, []string{u.String()}) kapi := client.NewKeysAPI(cc) - // TODO: we retry it here because MsgProp may be dropped due to - // sender reaches its max serving. make it reliable that we don't - // need to worry about it. - var err error - for k := 0; k < 3; k++ { - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - if _, err = kapi.Create(ctx, fmt.Sprintf("/%d%d%d", i, j, k), "bar", -1); err == nil { - break - } - cancel() - } - if err != nil { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if _, err := kapi.Create(ctx, fmt.Sprintf("/%d%d", i, j), "bar", -1); err != nil { t.Errorf("create on %s error: %v", u.String(), err) } + cancel() } } } @@ -123,6 +115,9 @@ func (c *cluster) Launch(t *testing.T) { m.Launch(t) c.Members = append(c.Members, m) } + + // wait cluster to be stable to receive future client requests + c.waitClientURLsPublished(t) } func (c *cluster) URL(i int) string { @@ -135,6 +130,37 @@ func (c *cluster) Terminate(t *testing.T) { } } +func (c *cluster) waitClientURLsPublished(t *testing.T) { + timer := time.AfterFunc(10*time.Second, func() { + t.Fatal("wait too long for client urls publish") + }) + cc := mustNewHTTPClient(t, []string{c.URL(0)}) + ma := client.NewMembersAPI(cc) + for { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + membs, err := ma.List(ctx) + cancel() + if err == nil && c.checkClientURLsPublished(membs) { + break + } + time.Sleep(tickDuration) + } + timer.Stop() + return +} + +func (c *cluster) checkClientURLsPublished(membs []httptypes.Member) bool { + if len(membs) != len(c.Members) { + return false + } + for _, m := range membs { + if len(m.ClientURLs) == 0 { + return false + } + } + return true +} + func (c *cluster) name(i int) string { return fmt.Sprint("node", i) }