diff --git a/clientv3/client.go b/clientv3/client.go index aa8ad7f1c..2170b72b7 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -20,7 +20,9 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/transport" ) // Client provides and manages an etcd v3 client session. @@ -36,6 +38,7 @@ type Client struct { conn *grpc.ClientConn cfg Config + creds *credentials.TransportAuthenticator mu sync.RWMutex // protects connection selection and error list errors []error // errors passed to retryConnection } @@ -53,7 +56,8 @@ type Config struct { // DialTimeout is the timeout for failing to establish a connection. DialTimeout time.Duration - // TODO TLS options + // TLS holds the client secure credentials, if any. + TLS *transport.TLSInfo } // New creates a new etcdv3 client from a given configuration. @@ -66,7 +70,7 @@ func New(cfg Config) (*Client, error) { if err != nil { return nil, err } - return newClient(conn, &cfg), nil + return newClient(conn, &cfg) } // NewFromURL creates a new etcdv3 client from a URL. @@ -75,10 +79,10 @@ func NewFromURL(url string) (*Client, error) { } // NewFromConn creates a new etcdv3 client from an established grpc Connection. -func NewFromConn(conn *grpc.ClientConn) *Client { return newClient(conn, nil) } +func NewFromConn(conn *grpc.ClientConn) *Client { return mustNewClient(conn, nil) } // Clone creates a copy of client with the old connection and new API clients. -func (c *Client) Clone() *Client { return newClient(c.conn, &c.cfg) } +func (c *Client) Clone() *Client { return mustNewClient(c.conn, &c.cfg) } // Close shuts down the client's etcd connections. func (c *Client) Close() error { @@ -99,22 +103,43 @@ func (c *Client) Errors() (errs []error) { // Dial establishes a connection for a given endpoint using the client's config func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) { - // TODO: enable grpc.WithTransportCredentials(creds) - conn, err := grpc.Dial( - endpoint, + opts := []grpc.DialOption{ grpc.WithBlock(), grpc.WithTimeout(c.cfg.DialTimeout), - grpc.WithInsecure()) + } + if c.creds != nil { + opts = append(opts, grpc.WithTransportCredentials(*c.creds)) + } else { + opts = append(opts, grpc.WithInsecure()) + } + conn, err := grpc.Dial(endpoint, opts...) if err != nil { return nil, err } return conn, nil } -func newClient(conn *grpc.ClientConn, cfg *Config) *Client { +func mustNewClient(conn *grpc.ClientConn, cfg *Config) *Client { + c, err := newClient(conn, cfg) + if err != nil { + panic("expected no error") + } + return c +} + +func newClient(conn *grpc.ClientConn, cfg *Config) (*Client, error) { if cfg == nil { cfg = &Config{RetryDialer: dialEndpointList} } + var creds *credentials.TransportAuthenticator + if cfg.TLS != nil { + tlscfg, err := cfg.TLS.ClientConfig() + if err != nil { + return nil, err + } + c := credentials.NewTLS(tlscfg) + creds = &c + } return &Client{ KV: pb.NewKVClient(conn), Lease: pb.NewLeaseClient(conn), @@ -122,11 +147,12 @@ func newClient(conn *grpc.ClientConn, cfg *Config) *Client { Cluster: pb.NewClusterClient(conn), conn: conn, cfg: *cfg, - } + creds: creds, + }, nil } // activeConnection returns the current in-use connection -func (c *Client) activeConnection() *grpc.ClientConn { +func (c *Client) ActiveConnection() *grpc.ClientConn { c.mu.RLock() defer c.mu.RUnlock() return c.conn diff --git a/clientv3/kv.go b/clientv3/kv.go index 5c252974a..2f43d0fc8 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -70,11 +70,11 @@ type kv struct { } func NewKV(c *Client) KV { - conn := c.activeConnection() + conn := c.ActiveConnection() remote := pb.NewKVClient(conn) return &kv{ - conn: c.activeConnection(), + conn: c.ActiveConnection(), remote: remote, c: c, diff --git a/etcdctlv3/command/global.go b/etcdctlv3/command/global.go index 951adc59b..00e8802de 100644 --- a/etcdctlv3/command/global.go +++ b/etcdctlv3/command/global.go @@ -17,12 +17,14 @@ package command import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/transport" ) // GlobalFlags are flags that defined globally // and are inherited to all sub-commands. type GlobalFlags struct { Endpoints string + TLS transport.TLSInfo } func mustClient(cmd *cobra.Command) *clientv3.Client { @@ -30,7 +32,25 @@ func mustClient(cmd *cobra.Command) *clientv3.Client { if err != nil { ExitWithError(ExitError, err) } - client, err := clientv3.NewFromURL(endpoint) + + // set tls if any one tls option set + var cfgtls *transport.TLSInfo + tls := transport.TLSInfo{} + if tls.CertFile, err = cmd.Flags().GetString("cert"); err == nil { + cfgtls = &tls + } + if tls.KeyFile, err = cmd.Flags().GetString("key"); err == nil { + cfgtls = &tls + } + if tls.CAFile, err = cmd.Flags().GetString("cacert"); err == nil { + cfgtls = &tls + } + cfg := clientv3.Config{ + Endpoints: []string{endpoint}, + TLS: cfgtls, + } + + client, err := clientv3.New(cfg) if err != nil { ExitWithError(ExitBadConnection, err) } diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go index bfcde6c41..59de85273 100644 --- a/etcdctlv3/main.go +++ b/etcdctlv3/main.go @@ -43,6 +43,10 @@ var ( func init() { rootCmd.PersistentFlags().StringVar(&globalFlags.Endpoints, "endpoint", "127.0.0.1:2378", "gRPC endpoint") + rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.CertFile, "cert", "", "identify HTTPS client using this SSL certificate file") + rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.KeyFile, "key", "", "identify HTTPS client using this SSL key file") + rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.CAFile, "cacert", "", "verify certificates of HTTPS-enabled servers using this CA bundle") + rootCmd.AddCommand( command.NewRangeCommand(), command.NewPutCommand(), diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 4af8bf122..5861488f9 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -32,12 +32,10 @@ import ( systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" - "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/etcdserver/etcdhttp" - "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/cors" "github.com/coreos/etcd/pkg/fileutil" pkgioutil "github.com/coreos/etcd/pkg/ioutil" @@ -330,11 +328,16 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { if cfg.v3demo { // set up v3 demo rpc - grpcServer := grpc.NewServer() - etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s)) - etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s)) - etcdserverpb.RegisterLeaseServer(grpcServer, v3rpc.NewLeaseServer(s)) - etcdserverpb.RegisterClusterServer(grpcServer, v3rpc.NewClusterServer(s)) + tls := &cfg.clientTLSInfo + if cfg.clientTLSInfo.Empty() { + tls = nil + } + grpcServer, err := v3rpc.Server(s, tls) + if err != nil { + s.Stop() + <-s.StopNotify() + return nil, err + } go func() { plog.Fatal(grpcServer.Serve(v3l)) }() } diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go new file mode 100644 index 000000000..7ece528e3 --- /dev/null +++ b/etcdserver/api/v3rpc/grpc.go @@ -0,0 +1,40 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package v3rpc + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials" + "github.com/coreos/etcd/etcdserver" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/transport" +) + +func Server(s *etcdserver.EtcdServer, tls *transport.TLSInfo) (*grpc.Server, error) { + var opts []grpc.ServerOption + if tls != nil { + creds, err := credentials.NewServerTLSFromFile(tls.CertFile, tls.KeyFile) + if err != nil { + return nil, err + } + opts = append(opts, grpc.Creds(creds)) + } + + grpcServer := grpc.NewServer(opts...) + pb.RegisterKVServer(grpcServer, NewKVServer(s)) + pb.RegisterWatchServer(grpcServer, NewWatchServer(s)) + pb.RegisterLeaseServer(grpcServer, NewLeaseServer(s)) + pb.RegisterClusterServer(grpcServer, NewClusterServer(s)) + return grpcServer, nil +} diff --git a/integration/cluster.go b/integration/cluster.go index 5a740a733..90503000f 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -31,13 +31,13 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials" "github.com/coreos/etcd/client" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/etcdserver/etcdhttp" - "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" @@ -56,11 +56,19 @@ var ( // integration test uses well-known ports to listen for each running member, // which ensures restarted member could listen on specific port again. nextListenPort int64 = 20000 + + testTLSInfo = transport.TLSInfo{ + KeyFile: "./fixtures/server.key.insecure", + CertFile: "./fixtures/server.crt", + TrustedCAFile: "./fixtures/ca.crt", + ClientCertAuth: true, + } ) type ClusterConfig struct { Size int - UsePeerTLS bool + PeerTLS *transport.TLSInfo + ClientTLS *transport.TLSInfo DiscoveryURL string UseV3 bool UseGRPC bool @@ -80,7 +88,7 @@ func (c *cluster) fillClusterForMembers() error { addrs := make([]string, 0) for _, m := range c.Members { scheme := "http" - if !m.PeerTLSInfo.Empty() { + if m.PeerTLSInfo != nil { scheme = "https" } for _, l := range m.PeerListeners { @@ -159,16 +167,19 @@ func (c *cluster) URLs() []string { func (c *cluster) HTTPMembers() []client.Member { ms := make([]client.Member, len(c.Members)) for i, m := range c.Members { - scheme := "http" - if !m.PeerTLSInfo.Empty() { - scheme = "https" + pScheme, cScheme := "http", "http" + if m.PeerTLSInfo != nil { + pScheme = "https" + } + if m.ClientTLSInfo != nil { + cScheme = "https" } ms[i].Name = m.Name for _, ln := range m.PeerListeners { - ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String()) + ms[i].PeerURLs = append(ms[i].PeerURLs, pScheme+"://"+ln.Addr().String()) } for _, ln := range m.ClientListeners { - ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String()) + ms[i].ClientURLs = append(ms[i].ClientURLs, cScheme+"://"+ln.Addr().String()) } } return ms @@ -176,7 +187,7 @@ func (c *cluster) HTTPMembers() []client.Member { func (c *cluster) mustNewMember(t *testing.T) *member { name := c.name(rand.Int()) - m := mustNewMember(t, name, c.cfg.UsePeerTLS) + m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS) m.DiscoveryURL = c.cfg.DiscoveryURL m.V3demo = c.cfg.UseV3 if c.cfg.UseGRPC { @@ -191,12 +202,12 @@ func (c *cluster) addMember(t *testing.T) { m := c.mustNewMember(t) scheme := "http" - if c.cfg.UsePeerTLS { + if c.cfg.PeerTLS != nil { scheme = "https" } // send add request to the cluster - cc := mustNewHTTPClient(t, []string{c.URL(0)}) + cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() @@ -229,7 +240,7 @@ func (c *cluster) AddMember(t *testing.T) { func (c *cluster) RemoveMember(t *testing.T, id uint64) { // send remove request to the cluster - cc := mustNewHTTPClient(t, c.URLs()) + cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if err := ma.Remove(ctx, types.ID(id).String()); err != nil { @@ -264,7 +275,7 @@ func (c *cluster) Terminate(t *testing.T) { func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { for _, u := range c.URLs() { - cc := mustNewHTTPClient(t, []string{u}) + cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) for { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -356,8 +367,10 @@ type member struct { etcdserver.ServerConfig PeerListeners, ClientListeners []net.Listener grpcListener net.Listener - // inited PeerTLSInfo implies to enable peer TLS - PeerTLSInfo transport.TLSInfo + // PeerTLSInfo enables peer TLS when set + PeerTLSInfo *transport.TLSInfo + // ClientTLSInfo enables client TLS when set + ClientTLSInfo *transport.TLSInfo raftHandler *testutil.PauseableHandler s *etcdserver.EtcdServer @@ -367,25 +380,19 @@ type member struct { grpcAddr string } -// mustNewMember return an inited member with the given name. If usePeerTLS is -// true, it will set PeerTLSInfo and use https scheme to communicate between -// peers. -func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { - var ( - testTLSInfo = transport.TLSInfo{ - KeyFile: "./fixtures/server.key.insecure", - CertFile: "./fixtures/server.crt", - TrustedCAFile: "./fixtures/ca.crt", - ClientCertAuth: true, - } - err error - ) +// mustNewMember return an inited member with the given name. If peerTLS is +// set, it will use https scheme to communicate between peers. +func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, clientTLS *transport.TLSInfo) *member { + var err error m := &member{} - peerScheme := "http" - if usePeerTLS { + peerScheme, clientScheme := "http", "http" + if peerTLS != nil { peerScheme = "https" } + if clientTLS != nil { + clientScheme = "https" + } pln := newLocalListener(t) m.PeerListeners = []net.Listener{pln} @@ -393,16 +400,15 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { if err != nil { t.Fatal(err) } - if usePeerTLS { - m.PeerTLSInfo = testTLSInfo - } + m.PeerTLSInfo = peerTLS cln := newLocalListener(t) m.ClientListeners = []net.Listener{cln} - m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) + m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()}) if err != nil { t.Fatal(err) } + m.ClientTLSInfo = clientTLS m.Name = name @@ -417,7 +423,9 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { } m.InitialClusterToken = clusterName m.NewCluster = true - m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo + if m.PeerTLSInfo != nil { + m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo + } m.ElectionTicks = electionTicks m.TickMs = uint(tickDuration / time.Millisecond) return m @@ -428,7 +436,8 @@ func (m *member) listenGRPC() error { if m.V3demo == false { return fmt.Errorf("starting grpc server without v3 configured") } - m.grpcAddr = m.Name + ".sock" + // prefix with localhost so cert has right domain + m.grpcAddr = "localhost:" + m.Name + ".sock" if err := os.RemoveAll(m.grpcAddr); err != nil { return err } @@ -449,7 +458,21 @@ func NewClientV3(m *member) (*clientv3.Client, error) { return net.Dial("unix", a) } unixdialer := grpc.WithDialer(f) - conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer) + opts := []grpc.DialOption{ + unixdialer, + grpc.WithBlock(), + grpc.WithTimeout(5 * time.Second)} + if m.ClientTLSInfo != nil { + tlscfg, err := m.ClientTLSInfo.ClientConfig() + if err != nil { + return nil, err + } + creds := credentials.NewTLS(tlscfg) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } else { + opts = append(opts, grpc.WithInsecure()) + } + conn, err := grpc.Dial(m.grpcAddr, opts...) if err != nil { return nil, err } @@ -484,6 +507,7 @@ func (m *member) Clone(t *testing.T) *member { mm.InitialClusterToken = m.InitialClusterToken mm.ElectionTicks = m.ElectionTicks mm.PeerTLSInfo = m.PeerTLSInfo + mm.ClientTLSInfo = m.ClientTLSInfo return mm } @@ -504,7 +528,7 @@ func (m *member) Launch() error { Listener: ln, Config: &http.Server{Handler: m.raftHandler}, } - if m.PeerTLSInfo.Empty() { + if m.PeerTLSInfo == nil { hs.Start() } else { hs.TLS, err = m.PeerTLSInfo.ServerConfig() @@ -520,21 +544,26 @@ func (m *member) Launch() error { Listener: ln, Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())}, } - hs.Start() + if m.ClientTLSInfo == nil { + hs.Start() + } else { + hs.TLS, err = m.ClientTLSInfo.ServerConfig() + if err != nil { + return err + } + hs.StartTLS() + } m.hss = append(m.hss, hs) } if m.grpcListener != nil { - m.grpcServer = grpc.NewServer() - etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s)) - etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s)) - etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s)) + m.grpcServer, err = v3rpc.Server(m.s, m.ClientTLSInfo) go m.grpcServer.Serve(m.grpcListener) } return nil } func (m *member) WaitOK(t *testing.T) { - cc := mustNewHTTPClient(t, []string{m.URL()}) + cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) kapi := client.NewKeysAPI(cc) for { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -612,8 +641,12 @@ func (m *member) Terminate(t *testing.T) { } } -func mustNewHTTPClient(t *testing.T, eps []string) client.Client { - cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps} +func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { + cfgtls := transport.TLSInfo{} + if tls != nil { + cfgtls = *tls + } + cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps} c, err := client.New(cfg) if err != nil { t.Fatal(err) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index a99033ec6..a86222f85 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -51,7 +51,7 @@ func testCluster(t *testing.T, size int) { func TestTLSClusterOf3(t *testing.T) { defer testutil.AfterTest(t) - c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true}) + c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo}) c.Launch(t) defer c.Terminate(t) clusterMustProgress(t, c.Members) @@ -66,7 +66,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) { dc.Launch(t) defer dc.Terminate(t) // init discovery token space - dcc := mustNewHTTPClient(t, dc.URLs()) + dcc := mustNewHTTPClient(t, dc.URLs(), nil) dkapi := client.NewKeysAPI(dcc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil { @@ -89,7 +89,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) { dc.Launch(t) defer dc.Terminate(t) // init discovery token space - dcc := mustNewHTTPClient(t, dc.URLs()) + dcc := mustNewHTTPClient(t, dc.URLs(), nil) dkapi := client.NewKeysAPI(dcc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil { @@ -100,7 +100,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) { c := NewClusterByConfig(t, &ClusterConfig{ Size: 3, - UsePeerTLS: true, + PeerTLS: &testTLSInfo, DiscoveryURL: dc.URL(0) + "/v2/keys"}, ) c.Launch(t) @@ -125,7 +125,7 @@ func testDoubleClusterSize(t *testing.T, size int) { func TestDoubleTLSClusterSizeOf3(t *testing.T) { defer testutil.AfterTest(t) - c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true}) + c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo}) c.Launch(t) defer c.Terminate(t) @@ -156,7 +156,7 @@ func testDecreaseClusterSize(t *testing.T, size int) { func TestForceNewCluster(t *testing.T) { c := NewCluster(t, 3) c.Launch(t) - cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}) + cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) resp, err := kapi.Create(ctx, "/foo", "bar") @@ -183,7 +183,7 @@ func TestForceNewCluster(t *testing.T) { c.waitLeader(t, c.Members[:1]) // use new http client to init new connection - cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}) + cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi = client.NewKeysAPI(cc) // ensure force restart keep the old data, and new cluster can make progress ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) @@ -267,7 +267,7 @@ func TestIssue2904(t *testing.T) { c.Members[1].Stop(t) // send remove member-1 request to the cluster. - cc := mustNewHTTPClient(t, c.URLs()) + cc := mustNewHTTPClient(t, c.URLs(), nil) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) // the proposal is not committed because member 1 is stopped, but the @@ -294,7 +294,7 @@ func TestIssue2904(t *testing.T) { // a random key first, and check the new key could be got from all client urls // of the cluster. func clusterMustProgress(t *testing.T, membs []*member) { - cc := mustNewHTTPClient(t, []string{membs[0].URL()}) + cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", rand.Int()) @@ -306,7 +306,7 @@ func clusterMustProgress(t *testing.T, membs []*member) { for i, m := range membs { u := m.URL() - mcc := mustNewHTTPClient(t, []string{u}) + mcc := mustNewHTTPClient(t, []string{u}, nil) mkapi := client.NewKeysAPI(mcc) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { diff --git a/integration/member_test.go b/integration/member_test.go index ae9ab57ac..4c09bdb66 100644 --- a/integration/member_test.go +++ b/integration/member_test.go @@ -83,7 +83,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) { func TestSnapshotAndRestartMember(t *testing.T) { defer testutil.AfterTest(t) - m := mustNewMember(t, "snapAndRestartTest", false) + m := mustNewMember(t, "snapAndRestartTest", nil, nil) m.SnapCount = 100 m.Launch() defer m.Terminate(t) @@ -92,7 +92,7 @@ func TestSnapshotAndRestartMember(t *testing.T) { resps := make([]*client.Response, 120) var err error for i := 0; i < 120; i++ { - cc := mustNewHTTPClient(t, []string{m.URL()}) + cc := mustNewHTTPClient(t, []string{m.URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", i) @@ -106,7 +106,7 @@ func TestSnapshotAndRestartMember(t *testing.T) { m.Restart(t) for i := 0; i < 120; i++ { - cc := mustNewHTTPClient(t, []string{m.URL()}) + cc := mustNewHTTPClient(t, []string{m.URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", i) diff --git a/integration/migration_test.go b/integration/migration_test.go index 6fcaf05d4..cf3cf1a1f 100644 --- a/integration/migration_test.go +++ b/integration/migration_test.go @@ -23,7 +23,7 @@ import ( func TestUpgradeMember(t *testing.T) { defer testutil.AfterTest(t) - m := mustNewMember(t, "integration046", false) + m := mustNewMember(t, "integration046", nil, nil) cmd := exec.Command("cp", "-r", "testdata/integration046_data/conf", "testdata/integration046_data/log", "testdata/integration046_data/snapshot", m.DataDir) err := cmd.Run() if err != nil { diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 067a8cc71..bcbc2d9f2 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/etcdserver/api/v3rpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" @@ -1416,3 +1417,99 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) { t.Fatalf("lease removed but key remains") } } + +func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 { + cfg.UseV3 = true + cfg.UseGRPC = true + clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)} + clus.Launch(t) + return clus +} + +// TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client. +func TestTLSGRPCRejectInsecureClient(t *testing.T) { + defer testutil.AfterTest(t) + + cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo} + clus := newClusterV3NoClients(t, &cfg) + defer clus.Terminate(t) + + // nil out TLS field so client will use an insecure connection + clus.Members[0].ClientTLSInfo = nil + client, err := NewClientV3(clus.Members[0]) + if err != nil && err != grpc.ErrClientConnTimeout { + t.Fatalf("unexpected error (%v)", err) + } else if client == nil { + // Ideally, no client would be returned. However, grpc will + // return a connection without trying to handshake first so + // the connection appears OK. + return + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + conn := client.ActiveConnection() + st, err := conn.State() + if err != nil { + t.Fatal(err) + } else if st != grpc.Ready { + t.Fatalf("expected Ready, got %v", st) + } + + // rpc will fail to handshake, triggering a connection state change + donec := make(chan error, 1) + go func() { + reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} + _, perr := client.KV.Put(ctx, reqput) + donec <- perr + }() + + st, err = conn.WaitForStateChange(ctx, st) + if err != nil { + t.Fatalf("unexpected error waiting for change (%v)", err) + } else if st != grpc.Connecting { + t.Fatalf("expected connecting state, got %v", st) + } + + cancel() + if perr := <-donec; perr == nil { + t.Fatalf("expected client error on put") + } +} + +// TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server. +func TestTLSGRPCRejectSecureClient(t *testing.T) { + defer testutil.AfterTest(t) + + cfg := ClusterConfig{Size: 3} + clus := newClusterV3NoClients(t, &cfg) + defer clus.Terminate(t) + + clus.Members[0].ClientTLSInfo = &testTLSInfo + client, err := NewClientV3(clus.Members[0]) + if client != nil || err == nil { + t.Fatalf("expected no client") + } else if err != grpc.ErrClientConnTimeout { + t.Fatalf("unexpected error (%v)", err) + } +} + +// TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS +func TestTLSGRPCAcceptSecureAll(t *testing.T) { + defer testutil.AfterTest(t) + + cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo} + clus := newClusterV3NoClients(t, &cfg) + defer clus.Terminate(t) + + client, err := NewClientV3(clus.Members[0]) + if err != nil { + t.Fatalf("expected tls client (%v)", err) + } + defer client.Close() + + reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} + if _, err := client.KV.Put(context.TODO(), reqput); err != nil { + t.Fatalf("unexpected error on put over tls (%v)", err) + } +} diff --git a/tools/benchmark/cmd/root.go b/tools/benchmark/cmd/root.go index c4a292b76..0b2fc725c 100644 --- a/tools/benchmark/cmd/root.go +++ b/tools/benchmark/cmd/root.go @@ -19,6 +19,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" + "github.com/coreos/etcd/pkg/transport" ) // This represents the base command when called without any subcommands @@ -40,6 +41,8 @@ var ( results chan result wg sync.WaitGroup + tls transport.TLSInfo + cpuProfPath string memProfPath string ) @@ -48,4 +51,8 @@ func init() { RootCmd.PersistentFlags().StringVar(&endpoints, "endpoint", "127.0.0.1:2378", "comma-separated gRPC endpoints") RootCmd.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections") RootCmd.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients") + + RootCmd.PersistentFlags().StringVar(&tls.CertFile, "cert", "", "identify HTTPS client using this SSL certificate file") + RootCmd.PersistentFlags().StringVar(&tls.KeyFile, "key", "", "identify HTTPS client using this SSL key file") + RootCmd.PersistentFlags().StringVar(&tls.CAFile, "cacert", "", "verify certificates of HTTPS-enabled servers using this CA bundle") } diff --git a/tools/benchmark/cmd/util.go b/tools/benchmark/cmd/util.go index 13de08fd7..782c1efa1 100644 --- a/tools/benchmark/cmd/util.go +++ b/tools/benchmark/cmd/util.go @@ -33,7 +33,16 @@ func mustCreateConn() *clientv3.Client { eps := strings.Split(endpoints, ",") endpoint := eps[dialTotal%len(eps)] dialTotal++ - client, err := clientv3.NewFromURL(endpoint) + cfgtls := &tls + if cfgtls.Empty() { + cfgtls = nil + } + client, err := clientv3.New( + clientv3.Config{ + Endpoints: []string{endpoint}, + TLS: cfgtls, + }, + ) if err != nil { fmt.Fprintf(os.Stderr, "dial error: %v\n", err) os.Exit(1)