mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
integration: wait cluster to be stable before return launch
The wait ensures that cluster goes into the stable stage, which means that leader has been elected and starts to heartbeat to followers. This makes future client requests always handled in time, and there is no need to retry sending requests.
This commit is contained in:
parent
0b493ac8d4
commit
071ebb9feb
@ -31,6 +31,7 @@ import (
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
@ -59,20 +60,11 @@ func testCluster(t *testing.T, size int) {
|
||||
for j, u := range c.Members[i].ClientURLs {
|
||||
cc := mustNewHTTPClient(t, []string{u.String()})
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
// TODO: we retry it here because MsgProp may be dropped due to
|
||||
// sender reaches its max serving. make it reliable that we don't
|
||||
// need to worry about it.
|
||||
var err error
|
||||
for k := 0; k < 3; k++ {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if _, err = kapi.Create(ctx, fmt.Sprintf("/%d%d%d", i, j, k), "bar", -1); err == nil {
|
||||
break
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
if err != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if _, err := kapi.Create(ctx, fmt.Sprintf("/%d%d", i, j), "bar", -1); err != nil {
|
||||
t.Errorf("create on %s error: %v", u.String(), err)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -123,6 +115,9 @@ func (c *cluster) Launch(t *testing.T) {
|
||||
m.Launch(t)
|
||||
c.Members = append(c.Members, m)
|
||||
}
|
||||
|
||||
// wait cluster to be stable to receive future client requests
|
||||
c.waitClientURLsPublished(t)
|
||||
}
|
||||
|
||||
func (c *cluster) URL(i int) string {
|
||||
@ -135,6 +130,37 @@ func (c *cluster) Terminate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cluster) waitClientURLsPublished(t *testing.T) {
|
||||
timer := time.AfterFunc(10*time.Second, func() {
|
||||
t.Fatal("wait too long for client urls publish")
|
||||
})
|
||||
cc := mustNewHTTPClient(t, []string{c.URL(0)})
|
||||
ma := client.NewMembersAPI(cc)
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
membs, err := ma.List(ctx)
|
||||
cancel()
|
||||
if err == nil && c.checkClientURLsPublished(membs) {
|
||||
break
|
||||
}
|
||||
time.Sleep(tickDuration)
|
||||
}
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
|
||||
func (c *cluster) checkClientURLsPublished(membs []httptypes.Member) bool {
|
||||
if len(membs) != len(c.Members) {
|
||||
return false
|
||||
}
|
||||
for _, m := range membs {
|
||||
if len(m.ClientURLs) == 0 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *cluster) name(i int) string {
|
||||
return fmt.Sprint("node", i)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user