diff --git a/integration/cluster_test.go b/integration/cluster_test.go index b8863c917..e3f34765e 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -24,6 +24,8 @@ import ( "net/http" "net/http/httptest" "os" + "reflect" + "sort" "strings" "testing" "time" @@ -82,6 +84,21 @@ func testClusterUsingDiscovery(t *testing.T, size int) { clusterMustProgress(t, c) } +func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) } +func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) } + +func testDoubleClusterSize(t *testing.T, size int) { + defer afterTest(t) + c := NewCluster(t, size) + c.Launch(t) + defer c.Terminate(t) + + for i := 0; i < size; i++ { + c.AddMember(t) + } + clusterMustProgress(t, c) +} + // clusterMustProgress ensures that cluster can make progress. It creates // a key first, and check the new key could be got from all client urls of // the cluster. @@ -167,7 +184,7 @@ func (c *cluster) Launch(t *testing.T) { } } // wait cluster to be stable to receive future client requests - c.waitClientURLsPublished(t) + c.waitMembersMatch(t, c.HTTPMembers()) } func (c *cluster) URL(i int) string { @@ -184,47 +201,94 @@ func (c *cluster) URLs() []string { return urls } +func (c *cluster) HTTPMembers() []httptypes.Member { + ms := make([]httptypes.Member, len(c.Members)) + for i, m := range c.Members { + ms[i].Name = m.Name + for _, ln := range m.PeerListeners { + ms[i].PeerURLs = append(ms[i].PeerURLs, "http://"+ln.Addr().String()) + } + for _, ln := range m.ClientListeners { + ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String()) + } + } + return ms +} + +func (c *cluster) AddMember(t *testing.T) { + clusterStr := c.Members[0].Cluster.String() + idx := len(c.Members) + m := mustNewMember(t, c.name(idx)) + + // send add request to the cluster + cc := mustNewHTTPClient(t, []string{c.URL(0)}) + ma := client.NewMembersAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + peerURL := "http://" + m.PeerListeners[0].Addr().String() + if _, err := ma.Add(ctx, peerURL); err != nil { + t.Fatalf("add member on %s error: %v", c.URL(0), err) + } + cancel() + + // wait for the add node entry applied in the cluster + members := append(c.HTTPMembers(), httptypes.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) + c.waitMembersMatch(t, members) + + for _, ln := range m.PeerListeners { + clusterStr += fmt.Sprintf(",%s=http://%s", m.Name, ln.Addr().String()) + } + var err error + m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr) + if err != nil { + t.Fatal(err) + } + m.NewCluster = false + if err := m.Launch(); err != nil { + t.Fatal(err) + } + c.Members = append(c.Members, m) + // wait cluster to be stable to receive future client requests + c.waitMembersMatch(t, c.HTTPMembers()) +} + func (c *cluster) Terminate(t *testing.T) { for _, m := range c.Members { m.Terminate(t) } } -func (c *cluster) waitClientURLsPublished(t *testing.T) { - timer := time.AfterFunc(10*time.Second, func() { - t.Fatal("wait too long for client urls publish") - }) - cc := mustNewHTTPClient(t, []string{c.URL(0)}) - ma := client.NewMembersAPI(cc) - for { - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - membs, err := ma.List(ctx) - cancel() - if err == nil && c.checkClientURLsPublished(membs) { - break +func (c *cluster) waitMembersMatch(t *testing.T, membs []httptypes.Member) { + for _, u := range c.URLs() { + cc := mustNewHTTPClient(t, []string{u}) + ma := client.NewMembersAPI(cc) + for { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + ms, err := ma.List(ctx) + cancel() + if err == nil && isMembersEqual(ms, membs) { + break + } + time.Sleep(tickDuration) } - time.Sleep(tickDuration) } - timer.Stop() return } -func (c *cluster) checkClientURLsPublished(membs []httptypes.Member) bool { - if len(membs) != len(c.Members) { - return false - } - for _, m := range membs { - if len(m.ClientURLs) == 0 { - return false - } - } - return true -} - func (c *cluster) name(i int) string { return fmt.Sprint("node", i) } +// isMembersEqual checks whether two members equal except ID field. +// The given wmembs should always set ID field to empty string. +func isMembersEqual(membs []httptypes.Member, wmembs []httptypes.Member) bool { + sort.Sort(SortableMemberSliceByPeerURLs(membs)) + sort.Sort(SortableMemberSliceByPeerURLs(wmembs)) + for i := range membs { + membs[i].ID = "" + } + return reflect.DeepEqual(membs, wmembs) +} + func newLocalListener(t *testing.T) net.Listener { l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -283,7 +347,7 @@ func (m *member) Launch() error { return fmt.Errorf("failed to initialize the etcd server: %v", err) } m.s.Ticker = time.Tick(tickDuration) - m.s.SyncTicker = time.Tick(10 * tickDuration) + m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.Start() for _, ln := range m.PeerListeners { @@ -342,3 +406,11 @@ func newTransport() *http.Transport { tr.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial return tr } + +type SortableMemberSliceByPeerURLs []httptypes.Member + +func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) } +func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool { + return p[i].PeerURLs[0] < p[j].PeerURLs[0] +} +func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }