diff --git a/integration/bridge.go b/integration/bridge.go index 4c596dc90..e46397ca8 100644 --- a/integration/bridge.go +++ b/integration/bridge.go @@ -18,8 +18,9 @@ import ( "fmt" "io" "net" - "os" "sync" + + "github.com/coreos/etcd/pkg/transport" ) // bridge creates a unix socket bridge to another unix socket, making it possible @@ -43,10 +44,7 @@ func newBridge(addr string) (*bridge, error) { conns: make(map[*bridgeConn]struct{}), stopc: make(chan struct{}, 1), } - if err := os.RemoveAll(b.inaddr); err != nil { - return nil, err - } - l, err := net.Listen("unix", b.inaddr) + l, err := transport.NewUnixListener(b.inaddr) if err != nil { return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err) } @@ -79,7 +77,6 @@ func (b *bridge) Reset() { func (b *bridge) serveListen() { defer func() { b.l.Close() - os.RemoveAll(b.inaddr) b.mu.Lock() for bc := range b.conns { bc.Close() diff --git a/integration/cluster.go b/integration/cluster.go index 17f5e5dca..8f15f42cb 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -25,7 +25,6 @@ import ( "os" "reflect" "sort" - "strconv" "strings" "sync" "sync/atomic" @@ -53,14 +52,18 @@ const ( tickDuration = 10 * time.Millisecond clusterName = "etcd" requestTimeout = 20 * time.Second + + basePort = 21000 + urlScheme = "unix" + urlSchemeTLS = "unixs" ) var ( electionTicks = 10 - // integration test uses well-known ports to listen for each running member, - // which ensures restarted member could listen on specific port again. - nextListenPort int64 = 21000 + // integration test uses unique ports, counting up, to listen for each + // member, ensuring restarted members can listen on the same port again. + localListenCount int64 = 0 testTLSInfo = transport.TLSInfo{ KeyFile: "./fixtures/server.key.insecure", @@ -91,6 +94,13 @@ func init() { api.EnableCapability(api.V3rpcCapability) } +func schemeFromTLSInfo(tls *transport.TLSInfo) string { + if tls == nil { + return urlScheme + } + return urlSchemeTLS +} + func (c *cluster) fillClusterForMembers() error { if c.cfg.DiscoveryURL != "" { // cluster will be discovered @@ -99,10 +109,7 @@ func (c *cluster) fillClusterForMembers() error { addrs := make([]string, 0) for _, m := range c.Members { - scheme := "http" - if m.PeerTLSInfo != nil { - scheme = "https" - } + scheme := schemeFromTLSInfo(m.PeerTLSInfo) for _, l := range m.PeerListeners { addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String())) } @@ -186,13 +193,8 @@ func (c *cluster) URLs() []string { func (c *cluster) HTTPMembers() []client.Member { ms := []client.Member{} for _, m := range c.Members { - pScheme, cScheme := "http", "http" - if m.PeerTLSInfo != nil { - pScheme = "https" - } - if m.ClientTLSInfo != nil { - cScheme = "https" - } + pScheme := schemeFromTLSInfo(m.PeerTLSInfo) + cScheme := schemeFromTLSInfo(m.ClientTLSInfo) cm := client.Member{Name: m.Name} for _, ln := range m.PeerListeners { cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String()) @@ -225,10 +227,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member { func (c *cluster) addMember(t *testing.T) { m := c.mustNewMember(t) - scheme := "http" - if c.cfg.PeerTLS != nil { - scheme = "https" - } + scheme := schemeFromTLSInfo(c.cfg.PeerTLS) // send add request to the cluster var err error @@ -390,26 +389,13 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool { } func newLocalListener(t *testing.T) net.Listener { - port := atomic.AddInt64(&nextListenPort, 1) - l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10)) - if err != nil { - t.Fatal(err) - } - return l + c := atomic.AddInt64(&localListenCount, 1) + addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid()) + return newListenerWithAddr(t, addr) } func newListenerWithAddr(t *testing.T, addr string) net.Listener { - var err error - var l net.Listener - // TODO: we want to reuse a previous closed port immediately. - // a better way is to set SO_REUSExx instead of doing retry. - for i := 0; i < 5; i++ { - l, err = net.Listen("tcp", addr) - if err == nil { - break - } - time.Sleep(500 * time.Millisecond) - } + l, err := transport.NewUnixListener(addr) if err != nil { t.Fatal(err) } @@ -449,13 +435,8 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { var err error m := &member{} - peerScheme, clientScheme := "http", "http" - if mcfg.peerTLS != nil { - peerScheme = "https" - } - if mcfg.clientTLS != nil { - clientScheme = "https" - } + peerScheme := schemeFromTLSInfo(mcfg.peerTLS) + clientScheme := schemeFromTLSInfo(mcfg.clientTLS) pln := newLocalListener(t) m.PeerListeners = []net.Listener{pln} @@ -500,10 +481,7 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { func (m *member) listenGRPC() error { // prefix with localhost so cert has right domain m.grpcAddr = "localhost:" + m.Name + ".sock" - if err := os.RemoveAll(m.grpcAddr); err != nil { - return err - } - l, err := net.Listen("unix", m.grpcAddr) + l, err := transport.NewUnixListener(m.grpcAddr) if err != nil { return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) } diff --git a/integration/v2_http_kv_test.go b/integration/v2_http_kv_test.go index c50a02857..03e34f29c 100644 --- a/integration/v2_http_kv_test.go +++ b/integration/v2_http_kv_test.go @@ -19,7 +19,6 @@ import ( "fmt" "io" "io/ioutil" - "net" "net/http" "net/url" "reflect" @@ -28,6 +27,7 @@ import ( "time" "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/pkg/transport" "github.com/coreos/pkg/capnslog" ) @@ -1038,10 +1038,8 @@ type testHttpClient struct { // Creates a new HTTP client with KeepAlive disabled. func NewTestClient() *testHttpClient { - tr := &http.Transport{ - Dial: (&net.Dialer{Timeout: time.Second}).Dial, - DisableKeepAlives: true, - } + tr, _ := transport.NewTransport(transport.TLSInfo{}, time.Second) + tr.DisableKeepAlives = true return &testHttpClient{&http.Client{Transport: tr}} }