Merge pull request #1670 from yichengq/207

integration: add basic discovery tests
This commit is contained in:
Yicheng Qin 2014-11-10 10:30:20 -08:00
commit 2dcd8213e4
2 changed files with 134 additions and 56 deletions

View File

@ -25,6 +25,7 @@ import (
"net/http/httptest"
"os"
"strings"
"sync"
"testing"
"time"
@ -53,69 +54,110 @@ func TestClusterOf3(t *testing.T) { testCluster(t, 3) }
func testCluster(t *testing.T, size int) {
defer afterTest(t)
c := &cluster{Size: size}
c := NewCluster(t, size)
c.Launch(t)
defer c.Terminate(t)
for i := 0; i < size; i++ {
for j, u := range c.Members[i].ClientURLs {
cc := mustNewHTTPClient(t, []string{u.String()})
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Create(ctx, fmt.Sprintf("/%d%d", i, j), "bar", -1); err != nil {
t.Errorf("create on %s error: %v", u.String(), err)
}
cancel()
for i, u := range c.URLs() {
cc := mustNewHTTPClient(t, []string{u})
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil {
t.Errorf("create on %s error: %v", u, err)
}
cancel()
}
}
type cluster struct {
Size int
Members []member
func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) }
func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3) }
func testClusterUsingDiscovery(t *testing.T, size int) {
defer afterTest(t)
dc := NewCluster(t, 1)
dc.Launch(t)
defer dc.Terminate(t)
// init discovery token space
dcc := mustNewHTTPClient(t, dc.URLs())
dkapi := client.NewKeysAPI(dcc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size), -1); err != nil {
t.Fatal(err)
}
cancel()
c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys")
c.Launch(t)
defer c.Terminate(t)
for i, u := range c.URLs() {
cc := mustNewHTTPClient(t, []string{u})
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil {
t.Errorf("create on %s error: %v", u, err)
}
cancel()
}
}
// TODO: support TLS
func (c *cluster) Launch(t *testing.T) {
if c.Size <= 0 {
t.Fatalf("cluster size <= 0")
}
type cluster struct {
Members []*member
}
lns := make([]net.Listener, c.Size)
addrs := make([]string, c.Size)
for i := 0; i < c.Size; i++ {
l := newLocalListener(t)
// each member claims only one peer listener
lns[i] = l
addrs[i] = fmt.Sprintf("%v=%v", c.name(i), "http://"+l.Addr().String())
// NewCluster returns an unlaunched cluster of the given size which has been
// set to use static bootstrap.
func NewCluster(t *testing.T, size int) *cluster {
c := &cluster{}
ms := make([]*member, size)
for i := 0; i < size; i++ {
ms[i] = newMember(t, c.name(i))
}
c.Members = ms
addrs := make([]string, 0)
for _, m := range ms {
for _, l := range m.PeerListeners {
addrs = append(addrs, fmt.Sprintf("%s=%s", m.Name, "http://"+l.Addr().String()))
}
}
clusterStr := strings.Join(addrs, ",")
var err error
for i := 0; i < c.Size; i++ {
m := member{}
m.PeerListeners = []net.Listener{lns[i]}
cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln}
m.Name = c.name(i)
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
if err != nil {
t.Fatal(err)
}
m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
if err != nil {
t.Fatal(err)
}
for _, m := range ms {
m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
if err != nil {
t.Fatal(err)
}
m.NewCluster = true
m.Transport = newTransport()
m.Launch(t)
c.Members = append(c.Members, m)
}
return c
}
// NewClusterUsingDiscovery returns an unlaunched cluster of the given size
// which has been set to use the given url as discovery service to bootstrap.
func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster {
c := &cluster{}
ms := make([]*member, size)
for i := 0; i < size; i++ {
ms[i] = newMember(t, c.name(i))
ms[i].DiscoveryURL = url
}
c.Members = ms
return c
}
func (c *cluster) Launch(t *testing.T) {
var wg sync.WaitGroup
for _, m := range c.Members {
wg.Add(1)
// Members are launched in separate goroutines because if they boot
// using discovery url, they have to wait for others to register to continue.
go func(m *member) {
m.Launch(t)
wg.Done()
}(m)
}
wg.Wait()
// wait cluster to be stable to receive future client requests
c.waitClientURLsPublished(t)
}
@ -124,6 +166,16 @@ func (c *cluster) URL(i int) string {
return c.Members[i].ClientURLs[0].String()
}
func (c *cluster) URLs() []string {
urls := make([]string, 0)
for _, m := range c.Members {
for _, u := range m.ClientURLs {
urls = append(urls, u.String())
}
}
return urls
}
func (c *cluster) Terminate(t *testing.T) {
for _, m := range c.Members {
m.Terminate(t)
@ -181,6 +233,32 @@ type member struct {
hss []*httptest.Server
}
func newMember(t *testing.T, name string) *member {
var err error
m := &member{}
pln := newLocalListener(t)
m.PeerListeners = []net.Listener{pln}
cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln}
m.Name = name
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
if err != nil {
t.Fatal(err)
}
m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
if err != nil {
t.Fatal(err)
}
clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
if err != nil {
t.Fatal(err)
}
m.NewCluster = true
m.Transport = newTransport()
return m
}
// Launch starts a member based on ServerConfig, PeerListeners
// and ClientListeners.
func (m *member) Launch(t *testing.T) {

View File

@ -36,7 +36,7 @@ func init() {
}
func TestV2Set(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -88,7 +88,7 @@ func TestV2Set(t *testing.T) {
}
func TestV2CreateUpdate(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -195,7 +195,7 @@ func TestV2CreateUpdate(t *testing.T) {
}
func TestV2CAS(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -320,7 +320,7 @@ func TestV2CAS(t *testing.T) {
}
func TestV2Delete(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -416,7 +416,7 @@ func TestV2Delete(t *testing.T) {
}
func TestV2CAD(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -512,7 +512,7 @@ func TestV2CAD(t *testing.T) {
}
func TestV2Unique(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -575,7 +575,7 @@ func TestV2Unique(t *testing.T) {
}
func TestV2Get(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -669,7 +669,7 @@ func TestV2Get(t *testing.T) {
}
func TestV2QuorumGet(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -763,7 +763,7 @@ func TestV2QuorumGet(t *testing.T) {
}
func TestV2Watch(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -808,7 +808,7 @@ func TestV2Watch(t *testing.T) {
}
func TestV2WatchWithIndex(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -865,7 +865,7 @@ func TestV2WatchWithIndex(t *testing.T) {
}
func TestV2WatchKeyInDir(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
@ -913,7 +913,7 @@ func TestV2WatchKeyInDir(t *testing.T) {
}
func TestV2Head(t *testing.T) {
cl := cluster{Size: 1}
cl := NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)