From 60c037f1c35f30a4aafcd42ea81ac349277df885 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 29 Jan 2016 11:31:25 -0800 Subject: [PATCH] integration: add client tls support --- integration/cluster.go | 121 ++++++++++++++++++++++------------ integration/cluster_test.go | 20 +++--- integration/member_test.go | 6 +- integration/migration_test.go | 2 +- 4 files changed, 93 insertions(+), 56 deletions(-) diff --git a/integration/cluster.go b/integration/cluster.go index f6bfafcf6..90503000f 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -31,6 +31,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials" "github.com/coreos/etcd/client" "github.com/coreos/etcd/clientv3" @@ -55,11 +56,19 @@ var ( // integration test uses well-known ports to listen for each running member, // which ensures restarted member could listen on specific port again. nextListenPort int64 = 20000 + + testTLSInfo = transport.TLSInfo{ + KeyFile: "./fixtures/server.key.insecure", + CertFile: "./fixtures/server.crt", + TrustedCAFile: "./fixtures/ca.crt", + ClientCertAuth: true, + } ) type ClusterConfig struct { Size int - UsePeerTLS bool + PeerTLS *transport.TLSInfo + ClientTLS *transport.TLSInfo DiscoveryURL string UseV3 bool UseGRPC bool @@ -79,7 +88,7 @@ func (c *cluster) fillClusterForMembers() error { addrs := make([]string, 0) for _, m := range c.Members { scheme := "http" - if !m.PeerTLSInfo.Empty() { + if m.PeerTLSInfo != nil { scheme = "https" } for _, l := range m.PeerListeners { @@ -158,16 +167,19 @@ func (c *cluster) URLs() []string { func (c *cluster) HTTPMembers() []client.Member { ms := make([]client.Member, len(c.Members)) for i, m := range c.Members { - scheme := "http" - if !m.PeerTLSInfo.Empty() { - scheme = "https" + pScheme, cScheme := "http", "http" + if m.PeerTLSInfo != nil { + pScheme = "https" + } + if m.ClientTLSInfo != nil { + cScheme = "https" } ms[i].Name = m.Name for _, ln := range m.PeerListeners { - ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String()) + ms[i].PeerURLs = append(ms[i].PeerURLs, pScheme+"://"+ln.Addr().String()) } for _, ln := range m.ClientListeners { - ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String()) + ms[i].ClientURLs = append(ms[i].ClientURLs, cScheme+"://"+ln.Addr().String()) } } return ms @@ -175,7 +187,7 @@ func (c *cluster) HTTPMembers() []client.Member { func (c *cluster) mustNewMember(t *testing.T) *member { name := c.name(rand.Int()) - m := mustNewMember(t, name, c.cfg.UsePeerTLS) + m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS) m.DiscoveryURL = c.cfg.DiscoveryURL m.V3demo = c.cfg.UseV3 if c.cfg.UseGRPC { @@ -190,12 +202,12 @@ func (c *cluster) addMember(t *testing.T) { m := c.mustNewMember(t) scheme := "http" - if c.cfg.UsePeerTLS { + if c.cfg.PeerTLS != nil { scheme = "https" } // send add request to the cluster - cc := mustNewHTTPClient(t, []string{c.URL(0)}) + cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() @@ -228,7 +240,7 @@ func (c *cluster) AddMember(t *testing.T) { func (c *cluster) RemoveMember(t *testing.T, id uint64) { // send remove request to the cluster - cc := mustNewHTTPClient(t, c.URLs()) + cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if err := ma.Remove(ctx, types.ID(id).String()); err != nil { @@ -263,7 +275,7 @@ func (c *cluster) Terminate(t *testing.T) { func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { for _, u := range c.URLs() { - cc := mustNewHTTPClient(t, []string{u}) + cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) for { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -355,8 +367,10 @@ type member struct { etcdserver.ServerConfig PeerListeners, ClientListeners []net.Listener grpcListener net.Listener - // inited PeerTLSInfo implies to enable peer TLS - PeerTLSInfo transport.TLSInfo + // PeerTLSInfo enables peer TLS when set + PeerTLSInfo *transport.TLSInfo + // ClientTLSInfo enables client TLS when set + ClientTLSInfo *transport.TLSInfo raftHandler *testutil.PauseableHandler s *etcdserver.EtcdServer @@ -366,25 +380,19 @@ type member struct { grpcAddr string } -// mustNewMember return an inited member with the given name. If usePeerTLS is -// true, it will set PeerTLSInfo and use https scheme to communicate between -// peers. -func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { - var ( - testTLSInfo = transport.TLSInfo{ - KeyFile: "./fixtures/server.key.insecure", - CertFile: "./fixtures/server.crt", - TrustedCAFile: "./fixtures/ca.crt", - ClientCertAuth: true, - } - err error - ) +// mustNewMember return an inited member with the given name. If peerTLS is +// set, it will use https scheme to communicate between peers. +func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, clientTLS *transport.TLSInfo) *member { + var err error m := &member{} - peerScheme := "http" - if usePeerTLS { + peerScheme, clientScheme := "http", "http" + if peerTLS != nil { peerScheme = "https" } + if clientTLS != nil { + clientScheme = "https" + } pln := newLocalListener(t) m.PeerListeners = []net.Listener{pln} @@ -392,16 +400,15 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { if err != nil { t.Fatal(err) } - if usePeerTLS { - m.PeerTLSInfo = testTLSInfo - } + m.PeerTLSInfo = peerTLS cln := newLocalListener(t) m.ClientListeners = []net.Listener{cln} - m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) + m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()}) if err != nil { t.Fatal(err) } + m.ClientTLSInfo = clientTLS m.Name = name @@ -416,7 +423,9 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { } m.InitialClusterToken = clusterName m.NewCluster = true - m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo + if m.PeerTLSInfo != nil { + m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo + } m.ElectionTicks = electionTicks m.TickMs = uint(tickDuration / time.Millisecond) return m @@ -427,7 +436,8 @@ func (m *member) listenGRPC() error { if m.V3demo == false { return fmt.Errorf("starting grpc server without v3 configured") } - m.grpcAddr = m.Name + ".sock" + // prefix with localhost so cert has right domain + m.grpcAddr = "localhost:" + m.Name + ".sock" if err := os.RemoveAll(m.grpcAddr); err != nil { return err } @@ -448,7 +458,21 @@ func NewClientV3(m *member) (*clientv3.Client, error) { return net.Dial("unix", a) } unixdialer := grpc.WithDialer(f) - conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer) + opts := []grpc.DialOption{ + unixdialer, + grpc.WithBlock(), + grpc.WithTimeout(5 * time.Second)} + if m.ClientTLSInfo != nil { + tlscfg, err := m.ClientTLSInfo.ClientConfig() + if err != nil { + return nil, err + } + creds := credentials.NewTLS(tlscfg) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } else { + opts = append(opts, grpc.WithInsecure()) + } + conn, err := grpc.Dial(m.grpcAddr, opts...) if err != nil { return nil, err } @@ -483,6 +507,7 @@ func (m *member) Clone(t *testing.T) *member { mm.InitialClusterToken = m.InitialClusterToken mm.ElectionTicks = m.ElectionTicks mm.PeerTLSInfo = m.PeerTLSInfo + mm.ClientTLSInfo = m.ClientTLSInfo return mm } @@ -503,7 +528,7 @@ func (m *member) Launch() error { Listener: ln, Config: &http.Server{Handler: m.raftHandler}, } - if m.PeerTLSInfo.Empty() { + if m.PeerTLSInfo == nil { hs.Start() } else { hs.TLS, err = m.PeerTLSInfo.ServerConfig() @@ -519,18 +544,26 @@ func (m *member) Launch() error { Listener: ln, Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())}, } - hs.Start() + if m.ClientTLSInfo == nil { + hs.Start() + } else { + hs.TLS, err = m.ClientTLSInfo.ServerConfig() + if err != nil { + return err + } + hs.StartTLS() + } m.hss = append(m.hss, hs) } if m.grpcListener != nil { - m.grpcServer, err = v3rpc.Server(m.s, nil) + m.grpcServer, err = v3rpc.Server(m.s, m.ClientTLSInfo) go m.grpcServer.Serve(m.grpcListener) } return nil } func (m *member) WaitOK(t *testing.T) { - cc := mustNewHTTPClient(t, []string{m.URL()}) + cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) kapi := client.NewKeysAPI(cc) for { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -608,8 +641,12 @@ func (m *member) Terminate(t *testing.T) { } } -func mustNewHTTPClient(t *testing.T, eps []string) client.Client { - cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps} +func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { + cfgtls := transport.TLSInfo{} + if tls != nil { + cfgtls = *tls + } + cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps} c, err := client.New(cfg) if err != nil { t.Fatal(err) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index a99033ec6..a86222f85 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -51,7 +51,7 @@ func testCluster(t *testing.T, size int) { func TestTLSClusterOf3(t *testing.T) { defer testutil.AfterTest(t) - c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true}) + c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo}) c.Launch(t) defer c.Terminate(t) clusterMustProgress(t, c.Members) @@ -66,7 +66,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) { dc.Launch(t) defer dc.Terminate(t) // init discovery token space - dcc := mustNewHTTPClient(t, dc.URLs()) + dcc := mustNewHTTPClient(t, dc.URLs(), nil) dkapi := client.NewKeysAPI(dcc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil { @@ -89,7 +89,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) { dc.Launch(t) defer dc.Terminate(t) // init discovery token space - dcc := mustNewHTTPClient(t, dc.URLs()) + dcc := mustNewHTTPClient(t, dc.URLs(), nil) dkapi := client.NewKeysAPI(dcc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil { @@ -100,7 +100,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) { c := NewClusterByConfig(t, &ClusterConfig{ Size: 3, - UsePeerTLS: true, + PeerTLS: &testTLSInfo, DiscoveryURL: dc.URL(0) + "/v2/keys"}, ) c.Launch(t) @@ -125,7 +125,7 @@ func testDoubleClusterSize(t *testing.T, size int) { func TestDoubleTLSClusterSizeOf3(t *testing.T) { defer testutil.AfterTest(t) - c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true}) + c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo}) c.Launch(t) defer c.Terminate(t) @@ -156,7 +156,7 @@ func testDecreaseClusterSize(t *testing.T, size int) { func TestForceNewCluster(t *testing.T) { c := NewCluster(t, 3) c.Launch(t) - cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}) + cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) resp, err := kapi.Create(ctx, "/foo", "bar") @@ -183,7 +183,7 @@ func TestForceNewCluster(t *testing.T) { c.waitLeader(t, c.Members[:1]) // use new http client to init new connection - cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}) + cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi = client.NewKeysAPI(cc) // ensure force restart keep the old data, and new cluster can make progress ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) @@ -267,7 +267,7 @@ func TestIssue2904(t *testing.T) { c.Members[1].Stop(t) // send remove member-1 request to the cluster. - cc := mustNewHTTPClient(t, c.URLs()) + cc := mustNewHTTPClient(t, c.URLs(), nil) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) // the proposal is not committed because member 1 is stopped, but the @@ -294,7 +294,7 @@ func TestIssue2904(t *testing.T) { // a random key first, and check the new key could be got from all client urls // of the cluster. func clusterMustProgress(t *testing.T, membs []*member) { - cc := mustNewHTTPClient(t, []string{membs[0].URL()}) + cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", rand.Int()) @@ -306,7 +306,7 @@ func clusterMustProgress(t *testing.T, membs []*member) { for i, m := range membs { u := m.URL() - mcc := mustNewHTTPClient(t, []string{u}) + mcc := mustNewHTTPClient(t, []string{u}, nil) mkapi := client.NewKeysAPI(mcc) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { diff --git a/integration/member_test.go b/integration/member_test.go index ae9ab57ac..4c09bdb66 100644 --- a/integration/member_test.go +++ b/integration/member_test.go @@ -83,7 +83,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) { func TestSnapshotAndRestartMember(t *testing.T) { defer testutil.AfterTest(t) - m := mustNewMember(t, "snapAndRestartTest", false) + m := mustNewMember(t, "snapAndRestartTest", nil, nil) m.SnapCount = 100 m.Launch() defer m.Terminate(t) @@ -92,7 +92,7 @@ func TestSnapshotAndRestartMember(t *testing.T) { resps := make([]*client.Response, 120) var err error for i := 0; i < 120; i++ { - cc := mustNewHTTPClient(t, []string{m.URL()}) + cc := mustNewHTTPClient(t, []string{m.URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", i) @@ -106,7 +106,7 @@ func TestSnapshotAndRestartMember(t *testing.T) { m.Restart(t) for i := 0; i < 120; i++ { - cc := mustNewHTTPClient(t, []string{m.URL()}) + cc := mustNewHTTPClient(t, []string{m.URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", i) diff --git a/integration/migration_test.go b/integration/migration_test.go index 6fcaf05d4..cf3cf1a1f 100644 --- a/integration/migration_test.go +++ b/integration/migration_test.go @@ -23,7 +23,7 @@ import ( func TestUpgradeMember(t *testing.T) { defer testutil.AfterTest(t) - m := mustNewMember(t, "integration046", false) + m := mustNewMember(t, "integration046", nil, nil) cmd := exec.Command("cp", "-r", "testdata/integration046_data/conf", "testdata/integration046_data/log", "testdata/integration046_data/snapshot", m.DataDir) err := cmd.Run() if err != nil {