integration: add increase cluster size test

This commit is contained in:
Yicheng Qin
2014-11-06 15:38:59 -08:00
parent fc21f299b1
commit bc9de47a9a

View File

@@ -24,6 +24,8 @@ import (
"net/http"
"net/http/httptest"
"os"
"reflect"
"sort"
"strings"
"testing"
"time"
@@ -82,6 +84,21 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
clusterMustProgress(t, c)
}
func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) }
func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) }
func testDoubleClusterSize(t *testing.T, size int) {
defer afterTest(t)
c := NewCluster(t, size)
c.Launch(t)
defer c.Terminate(t)
for i := 0; i < size; i++ {
c.AddMember(t)
}
clusterMustProgress(t, c)
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a key first, and check the new key could be got from all client urls of
// the cluster.
@@ -167,7 +184,7 @@ func (c *cluster) Launch(t *testing.T) {
}
}
// wait cluster to be stable to receive future client requests
c.waitClientURLsPublished(t)
c.waitMembersMatch(t, c.HTTPMembers())
}
func (c *cluster) URL(i int) string {
@@ -184,47 +201,94 @@ func (c *cluster) URLs() []string {
return urls
}
func (c *cluster) HTTPMembers() []httptypes.Member {
ms := make([]httptypes.Member, len(c.Members))
for i, m := range c.Members {
ms[i].Name = m.Name
for _, ln := range m.PeerListeners {
ms[i].PeerURLs = append(ms[i].PeerURLs, "http://"+ln.Addr().String())
}
for _, ln := range m.ClientListeners {
ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String())
}
}
return ms
}
func (c *cluster) AddMember(t *testing.T) {
clusterStr := c.Members[0].Cluster.String()
idx := len(c.Members)
m := mustNewMember(t, c.name(idx))
// send add request to the cluster
cc := mustNewHTTPClient(t, []string{c.URL(0)})
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
peerURL := "http://" + m.PeerListeners[0].Addr().String()
if _, err := ma.Add(ctx, peerURL); err != nil {
t.Fatalf("add member on %s error: %v", c.URL(0), err)
}
cancel()
// wait for the add node entry applied in the cluster
members := append(c.HTTPMembers(), httptypes.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
c.waitMembersMatch(t, members)
for _, ln := range m.PeerListeners {
clusterStr += fmt.Sprintf(",%s=http://%s", m.Name, ln.Addr().String())
}
var err error
m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
if err != nil {
t.Fatal(err)
}
m.NewCluster = false
if err := m.Launch(); err != nil {
t.Fatal(err)
}
c.Members = append(c.Members, m)
// wait cluster to be stable to receive future client requests
c.waitMembersMatch(t, c.HTTPMembers())
}
func (c *cluster) Terminate(t *testing.T) {
for _, m := range c.Members {
m.Terminate(t)
}
}
func (c *cluster) waitClientURLsPublished(t *testing.T) {
timer := time.AfterFunc(10*time.Second, func() {
t.Fatal("wait too long for client urls publish")
})
cc := mustNewHTTPClient(t, []string{c.URL(0)})
ma := client.NewMembersAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
membs, err := ma.List(ctx)
cancel()
if err == nil && c.checkClientURLsPublished(membs) {
break
func (c *cluster) waitMembersMatch(t *testing.T, membs []httptypes.Member) {
for _, u := range c.URLs() {
cc := mustNewHTTPClient(t, []string{u})
ma := client.NewMembersAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
ms, err := ma.List(ctx)
cancel()
if err == nil && isMembersEqual(ms, membs) {
break
}
time.Sleep(tickDuration)
}
time.Sleep(tickDuration)
}
timer.Stop()
return
}
func (c *cluster) checkClientURLsPublished(membs []httptypes.Member) bool {
if len(membs) != len(c.Members) {
return false
}
for _, m := range membs {
if len(m.ClientURLs) == 0 {
return false
}
}
return true
}
func (c *cluster) name(i int) string {
return fmt.Sprint("node", i)
}
// isMembersEqual checks whether two members equal except ID field.
// The given wmembs should always set ID field to empty string.
func isMembersEqual(membs []httptypes.Member, wmembs []httptypes.Member) bool {
sort.Sort(SortableMemberSliceByPeerURLs(membs))
sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
for i := range membs {
membs[i].ID = ""
}
return reflect.DeepEqual(membs, wmembs)
}
func newLocalListener(t *testing.T) net.Listener {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
@@ -283,7 +347,7 @@ func (m *member) Launch() error {
return fmt.Errorf("failed to initialize the etcd server: %v", err)
}
m.s.Ticker = time.Tick(tickDuration)
m.s.SyncTicker = time.Tick(10 * tickDuration)
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
for _, ln := range m.PeerListeners {
@@ -342,3 +406,11 @@ func newTransport() *http.Transport {
tr.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial
return tr
}
type SortableMemberSliceByPeerURLs []httptypes.Member
func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
return p[i].PeerURLs[0] < p[j].PeerURLs[0]
}
func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }