mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #4349 from heyitsanthony/v3-client-conntls
V3 client TLS
This commit is contained in:
commit
f6031b9d11
@ -20,7 +20,9 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes"
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes"
|
||||||
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Client provides and manages an etcd v3 client session.
|
// Client provides and manages an etcd v3 client session.
|
||||||
@ -36,6 +38,7 @@ type Client struct {
|
|||||||
|
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
cfg Config
|
cfg Config
|
||||||
|
creds *credentials.TransportAuthenticator
|
||||||
mu sync.RWMutex // protects connection selection and error list
|
mu sync.RWMutex // protects connection selection and error list
|
||||||
errors []error // errors passed to retryConnection
|
errors []error // errors passed to retryConnection
|
||||||
}
|
}
|
||||||
@ -53,7 +56,8 @@ type Config struct {
|
|||||||
// DialTimeout is the timeout for failing to establish a connection.
|
// DialTimeout is the timeout for failing to establish a connection.
|
||||||
DialTimeout time.Duration
|
DialTimeout time.Duration
|
||||||
|
|
||||||
// TODO TLS options
|
// TLS holds the client secure credentials, if any.
|
||||||
|
TLS *transport.TLSInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new etcdv3 client from a given configuration.
|
// New creates a new etcdv3 client from a given configuration.
|
||||||
@ -66,7 +70,7 @@ func New(cfg Config) (*Client, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return newClient(conn, &cfg), nil
|
return newClient(conn, &cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFromURL creates a new etcdv3 client from a URL.
|
// NewFromURL creates a new etcdv3 client from a URL.
|
||||||
@ -75,10 +79,10 @@ func NewFromURL(url string) (*Client, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewFromConn creates a new etcdv3 client from an established grpc Connection.
|
// NewFromConn creates a new etcdv3 client from an established grpc Connection.
|
||||||
func NewFromConn(conn *grpc.ClientConn) *Client { return newClient(conn, nil) }
|
func NewFromConn(conn *grpc.ClientConn) *Client { return mustNewClient(conn, nil) }
|
||||||
|
|
||||||
// Clone creates a copy of client with the old connection and new API clients.
|
// Clone creates a copy of client with the old connection and new API clients.
|
||||||
func (c *Client) Clone() *Client { return newClient(c.conn, &c.cfg) }
|
func (c *Client) Clone() *Client { return mustNewClient(c.conn, &c.cfg) }
|
||||||
|
|
||||||
// Close shuts down the client's etcd connections.
|
// Close shuts down the client's etcd connections.
|
||||||
func (c *Client) Close() error {
|
func (c *Client) Close() error {
|
||||||
@ -99,22 +103,43 @@ func (c *Client) Errors() (errs []error) {
|
|||||||
|
|
||||||
// Dial establishes a connection for a given endpoint using the client's config
|
// Dial establishes a connection for a given endpoint using the client's config
|
||||||
func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
|
func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
|
||||||
// TODO: enable grpc.WithTransportCredentials(creds)
|
opts := []grpc.DialOption{
|
||||||
conn, err := grpc.Dial(
|
|
||||||
endpoint,
|
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
grpc.WithTimeout(c.cfg.DialTimeout),
|
grpc.WithTimeout(c.cfg.DialTimeout),
|
||||||
grpc.WithInsecure())
|
}
|
||||||
|
if c.creds != nil {
|
||||||
|
opts = append(opts, grpc.WithTransportCredentials(*c.creds))
|
||||||
|
} else {
|
||||||
|
opts = append(opts, grpc.WithInsecure())
|
||||||
|
}
|
||||||
|
conn, err := grpc.Dial(endpoint, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(conn *grpc.ClientConn, cfg *Config) *Client {
|
func mustNewClient(conn *grpc.ClientConn, cfg *Config) *Client {
|
||||||
|
c, err := newClient(conn, cfg)
|
||||||
|
if err != nil {
|
||||||
|
panic("expected no error")
|
||||||
|
}
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClient(conn *grpc.ClientConn, cfg *Config) (*Client, error) {
|
||||||
if cfg == nil {
|
if cfg == nil {
|
||||||
cfg = &Config{RetryDialer: dialEndpointList}
|
cfg = &Config{RetryDialer: dialEndpointList}
|
||||||
}
|
}
|
||||||
|
var creds *credentials.TransportAuthenticator
|
||||||
|
if cfg.TLS != nil {
|
||||||
|
tlscfg, err := cfg.TLS.ClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c := credentials.NewTLS(tlscfg)
|
||||||
|
creds = &c
|
||||||
|
}
|
||||||
return &Client{
|
return &Client{
|
||||||
KV: pb.NewKVClient(conn),
|
KV: pb.NewKVClient(conn),
|
||||||
Lease: pb.NewLeaseClient(conn),
|
Lease: pb.NewLeaseClient(conn),
|
||||||
@ -122,11 +147,12 @@ func newClient(conn *grpc.ClientConn, cfg *Config) *Client {
|
|||||||
Cluster: pb.NewClusterClient(conn),
|
Cluster: pb.NewClusterClient(conn),
|
||||||
conn: conn,
|
conn: conn,
|
||||||
cfg: *cfg,
|
cfg: *cfg,
|
||||||
}
|
creds: creds,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// activeConnection returns the current in-use connection
|
// activeConnection returns the current in-use connection
|
||||||
func (c *Client) activeConnection() *grpc.ClientConn {
|
func (c *Client) ActiveConnection() *grpc.ClientConn {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
defer c.mu.RUnlock()
|
defer c.mu.RUnlock()
|
||||||
return c.conn
|
return c.conn
|
||||||
|
@ -70,11 +70,11 @@ type kv struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewKV(c *Client) KV {
|
func NewKV(c *Client) KV {
|
||||||
conn := c.activeConnection()
|
conn := c.ActiveConnection()
|
||||||
remote := pb.NewKVClient(conn)
|
remote := pb.NewKVClient(conn)
|
||||||
|
|
||||||
return &kv{
|
return &kv{
|
||||||
conn: c.activeConnection(),
|
conn: c.ActiveConnection(),
|
||||||
remote: remote,
|
remote: remote,
|
||||||
|
|
||||||
c: c,
|
c: c,
|
||||||
|
@ -17,12 +17,14 @@ package command
|
|||||||
import (
|
import (
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GlobalFlags are flags that defined globally
|
// GlobalFlags are flags that defined globally
|
||||||
// and are inherited to all sub-commands.
|
// and are inherited to all sub-commands.
|
||||||
type GlobalFlags struct {
|
type GlobalFlags struct {
|
||||||
Endpoints string
|
Endpoints string
|
||||||
|
TLS transport.TLSInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustClient(cmd *cobra.Command) *clientv3.Client {
|
func mustClient(cmd *cobra.Command) *clientv3.Client {
|
||||||
@ -30,7 +32,25 @@ func mustClient(cmd *cobra.Command) *clientv3.Client {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, err)
|
ExitWithError(ExitError, err)
|
||||||
}
|
}
|
||||||
client, err := clientv3.NewFromURL(endpoint)
|
|
||||||
|
// set tls if any one tls option set
|
||||||
|
var cfgtls *transport.TLSInfo
|
||||||
|
tls := transport.TLSInfo{}
|
||||||
|
if tls.CertFile, err = cmd.Flags().GetString("cert"); err == nil {
|
||||||
|
cfgtls = &tls
|
||||||
|
}
|
||||||
|
if tls.KeyFile, err = cmd.Flags().GetString("key"); err == nil {
|
||||||
|
cfgtls = &tls
|
||||||
|
}
|
||||||
|
if tls.CAFile, err = cmd.Flags().GetString("cacert"); err == nil {
|
||||||
|
cfgtls = &tls
|
||||||
|
}
|
||||||
|
cfg := clientv3.Config{
|
||||||
|
Endpoints: []string{endpoint},
|
||||||
|
TLS: cfgtls,
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := clientv3.New(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitBadConnection, err)
|
ExitWithError(ExitBadConnection, err)
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,10 @@ var (
|
|||||||
func init() {
|
func init() {
|
||||||
rootCmd.PersistentFlags().StringVar(&globalFlags.Endpoints, "endpoint", "127.0.0.1:2378", "gRPC endpoint")
|
rootCmd.PersistentFlags().StringVar(&globalFlags.Endpoints, "endpoint", "127.0.0.1:2378", "gRPC endpoint")
|
||||||
|
|
||||||
|
rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.CertFile, "cert", "", "identify HTTPS client using this SSL certificate file")
|
||||||
|
rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.KeyFile, "key", "", "identify HTTPS client using this SSL key file")
|
||||||
|
rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.CAFile, "cacert", "", "verify certificates of HTTPS-enabled servers using this CA bundle")
|
||||||
|
|
||||||
rootCmd.AddCommand(
|
rootCmd.AddCommand(
|
||||||
command.NewRangeCommand(),
|
command.NewRangeCommand(),
|
||||||
command.NewPutCommand(),
|
command.NewPutCommand(),
|
||||||
|
@ -32,12 +32,10 @@ import (
|
|||||||
systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
|
systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
|
||||||
"github.com/coreos/etcd/discovery"
|
"github.com/coreos/etcd/discovery"
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
"github.com/coreos/etcd/pkg/cors"
|
"github.com/coreos/etcd/pkg/cors"
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
pkgioutil "github.com/coreos/etcd/pkg/ioutil"
|
pkgioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||||
@ -330,11 +328,16 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|||||||
|
|
||||||
if cfg.v3demo {
|
if cfg.v3demo {
|
||||||
// set up v3 demo rpc
|
// set up v3 demo rpc
|
||||||
grpcServer := grpc.NewServer()
|
tls := &cfg.clientTLSInfo
|
||||||
etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
|
if cfg.clientTLSInfo.Empty() {
|
||||||
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s))
|
tls = nil
|
||||||
etcdserverpb.RegisterLeaseServer(grpcServer, v3rpc.NewLeaseServer(s))
|
}
|
||||||
etcdserverpb.RegisterClusterServer(grpcServer, v3rpc.NewClusterServer(s))
|
grpcServer, err := v3rpc.Server(s, tls)
|
||||||
|
if err != nil {
|
||||||
|
s.Stop()
|
||||||
|
<-s.StopNotify()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
|
go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
40
etcdserver/api/v3rpc/grpc.go
Normal file
40
etcdserver/api/v3rpc/grpc.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
// Copyright 2016 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
package v3rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
||||||
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials"
|
||||||
|
"github.com/coreos/etcd/etcdserver"
|
||||||
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Server(s *etcdserver.EtcdServer, tls *transport.TLSInfo) (*grpc.Server, error) {
|
||||||
|
var opts []grpc.ServerOption
|
||||||
|
if tls != nil {
|
||||||
|
creds, err := credentials.NewServerTLSFromFile(tls.CertFile, tls.KeyFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
opts = append(opts, grpc.Creds(creds))
|
||||||
|
}
|
||||||
|
|
||||||
|
grpcServer := grpc.NewServer(opts...)
|
||||||
|
pb.RegisterKVServer(grpcServer, NewKVServer(s))
|
||||||
|
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
|
||||||
|
pb.RegisterLeaseServer(grpcServer, NewLeaseServer(s))
|
||||||
|
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
|
||||||
|
return grpcServer, nil
|
||||||
|
}
|
@ -31,13 +31,13 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
||||||
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials"
|
||||||
|
|
||||||
"github.com/coreos/etcd/client"
|
"github.com/coreos/etcd/client"
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
"github.com/coreos/etcd/pkg/transport"
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
@ -56,11 +56,19 @@ var (
|
|||||||
// integration test uses well-known ports to listen for each running member,
|
// integration test uses well-known ports to listen for each running member,
|
||||||
// which ensures restarted member could listen on specific port again.
|
// which ensures restarted member could listen on specific port again.
|
||||||
nextListenPort int64 = 20000
|
nextListenPort int64 = 20000
|
||||||
|
|
||||||
|
testTLSInfo = transport.TLSInfo{
|
||||||
|
KeyFile: "./fixtures/server.key.insecure",
|
||||||
|
CertFile: "./fixtures/server.crt",
|
||||||
|
TrustedCAFile: "./fixtures/ca.crt",
|
||||||
|
ClientCertAuth: true,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClusterConfig struct {
|
type ClusterConfig struct {
|
||||||
Size int
|
Size int
|
||||||
UsePeerTLS bool
|
PeerTLS *transport.TLSInfo
|
||||||
|
ClientTLS *transport.TLSInfo
|
||||||
DiscoveryURL string
|
DiscoveryURL string
|
||||||
UseV3 bool
|
UseV3 bool
|
||||||
UseGRPC bool
|
UseGRPC bool
|
||||||
@ -80,7 +88,7 @@ func (c *cluster) fillClusterForMembers() error {
|
|||||||
addrs := make([]string, 0)
|
addrs := make([]string, 0)
|
||||||
for _, m := range c.Members {
|
for _, m := range c.Members {
|
||||||
scheme := "http"
|
scheme := "http"
|
||||||
if !m.PeerTLSInfo.Empty() {
|
if m.PeerTLSInfo != nil {
|
||||||
scheme = "https"
|
scheme = "https"
|
||||||
}
|
}
|
||||||
for _, l := range m.PeerListeners {
|
for _, l := range m.PeerListeners {
|
||||||
@ -159,16 +167,19 @@ func (c *cluster) URLs() []string {
|
|||||||
func (c *cluster) HTTPMembers() []client.Member {
|
func (c *cluster) HTTPMembers() []client.Member {
|
||||||
ms := make([]client.Member, len(c.Members))
|
ms := make([]client.Member, len(c.Members))
|
||||||
for i, m := range c.Members {
|
for i, m := range c.Members {
|
||||||
scheme := "http"
|
pScheme, cScheme := "http", "http"
|
||||||
if !m.PeerTLSInfo.Empty() {
|
if m.PeerTLSInfo != nil {
|
||||||
scheme = "https"
|
pScheme = "https"
|
||||||
|
}
|
||||||
|
if m.ClientTLSInfo != nil {
|
||||||
|
cScheme = "https"
|
||||||
}
|
}
|
||||||
ms[i].Name = m.Name
|
ms[i].Name = m.Name
|
||||||
for _, ln := range m.PeerListeners {
|
for _, ln := range m.PeerListeners {
|
||||||
ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String())
|
ms[i].PeerURLs = append(ms[i].PeerURLs, pScheme+"://"+ln.Addr().String())
|
||||||
}
|
}
|
||||||
for _, ln := range m.ClientListeners {
|
for _, ln := range m.ClientListeners {
|
||||||
ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String())
|
ms[i].ClientURLs = append(ms[i].ClientURLs, cScheme+"://"+ln.Addr().String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ms
|
return ms
|
||||||
@ -176,7 +187,7 @@ func (c *cluster) HTTPMembers() []client.Member {
|
|||||||
|
|
||||||
func (c *cluster) mustNewMember(t *testing.T) *member {
|
func (c *cluster) mustNewMember(t *testing.T) *member {
|
||||||
name := c.name(rand.Int())
|
name := c.name(rand.Int())
|
||||||
m := mustNewMember(t, name, c.cfg.UsePeerTLS)
|
m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS)
|
||||||
m.DiscoveryURL = c.cfg.DiscoveryURL
|
m.DiscoveryURL = c.cfg.DiscoveryURL
|
||||||
m.V3demo = c.cfg.UseV3
|
m.V3demo = c.cfg.UseV3
|
||||||
if c.cfg.UseGRPC {
|
if c.cfg.UseGRPC {
|
||||||
@ -191,12 +202,12 @@ func (c *cluster) addMember(t *testing.T) {
|
|||||||
m := c.mustNewMember(t)
|
m := c.mustNewMember(t)
|
||||||
|
|
||||||
scheme := "http"
|
scheme := "http"
|
||||||
if c.cfg.UsePeerTLS {
|
if c.cfg.PeerTLS != nil {
|
||||||
scheme = "https"
|
scheme = "https"
|
||||||
}
|
}
|
||||||
|
|
||||||
// send add request to the cluster
|
// send add request to the cluster
|
||||||
cc := mustNewHTTPClient(t, []string{c.URL(0)})
|
cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
|
||||||
ma := client.NewMembersAPI(cc)
|
ma := client.NewMembersAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
|
peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
|
||||||
@ -229,7 +240,7 @@ func (c *cluster) AddMember(t *testing.T) {
|
|||||||
|
|
||||||
func (c *cluster) RemoveMember(t *testing.T, id uint64) {
|
func (c *cluster) RemoveMember(t *testing.T, id uint64) {
|
||||||
// send remove request to the cluster
|
// send remove request to the cluster
|
||||||
cc := mustNewHTTPClient(t, c.URLs())
|
cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
|
||||||
ma := client.NewMembersAPI(cc)
|
ma := client.NewMembersAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
|
if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
|
||||||
@ -264,7 +275,7 @@ func (c *cluster) Terminate(t *testing.T) {
|
|||||||
|
|
||||||
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
|
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
|
||||||
for _, u := range c.URLs() {
|
for _, u := range c.URLs() {
|
||||||
cc := mustNewHTTPClient(t, []string{u})
|
cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
|
||||||
ma := client.NewMembersAPI(cc)
|
ma := client.NewMembersAPI(cc)
|
||||||
for {
|
for {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
@ -356,8 +367,10 @@ type member struct {
|
|||||||
etcdserver.ServerConfig
|
etcdserver.ServerConfig
|
||||||
PeerListeners, ClientListeners []net.Listener
|
PeerListeners, ClientListeners []net.Listener
|
||||||
grpcListener net.Listener
|
grpcListener net.Listener
|
||||||
// inited PeerTLSInfo implies to enable peer TLS
|
// PeerTLSInfo enables peer TLS when set
|
||||||
PeerTLSInfo transport.TLSInfo
|
PeerTLSInfo *transport.TLSInfo
|
||||||
|
// ClientTLSInfo enables client TLS when set
|
||||||
|
ClientTLSInfo *transport.TLSInfo
|
||||||
|
|
||||||
raftHandler *testutil.PauseableHandler
|
raftHandler *testutil.PauseableHandler
|
||||||
s *etcdserver.EtcdServer
|
s *etcdserver.EtcdServer
|
||||||
@ -367,25 +380,19 @@ type member struct {
|
|||||||
grpcAddr string
|
grpcAddr string
|
||||||
}
|
}
|
||||||
|
|
||||||
// mustNewMember return an inited member with the given name. If usePeerTLS is
|
// mustNewMember return an inited member with the given name. If peerTLS is
|
||||||
// true, it will set PeerTLSInfo and use https scheme to communicate between
|
// set, it will use https scheme to communicate between peers.
|
||||||
// peers.
|
func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, clientTLS *transport.TLSInfo) *member {
|
||||||
func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
|
var err error
|
||||||
var (
|
|
||||||
testTLSInfo = transport.TLSInfo{
|
|
||||||
KeyFile: "./fixtures/server.key.insecure",
|
|
||||||
CertFile: "./fixtures/server.crt",
|
|
||||||
TrustedCAFile: "./fixtures/ca.crt",
|
|
||||||
ClientCertAuth: true,
|
|
||||||
}
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
m := &member{}
|
m := &member{}
|
||||||
|
|
||||||
peerScheme := "http"
|
peerScheme, clientScheme := "http", "http"
|
||||||
if usePeerTLS {
|
if peerTLS != nil {
|
||||||
peerScheme = "https"
|
peerScheme = "https"
|
||||||
}
|
}
|
||||||
|
if clientTLS != nil {
|
||||||
|
clientScheme = "https"
|
||||||
|
}
|
||||||
|
|
||||||
pln := newLocalListener(t)
|
pln := newLocalListener(t)
|
||||||
m.PeerListeners = []net.Listener{pln}
|
m.PeerListeners = []net.Listener{pln}
|
||||||
@ -393,16 +400,15 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if usePeerTLS {
|
m.PeerTLSInfo = peerTLS
|
||||||
m.PeerTLSInfo = testTLSInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
cln := newLocalListener(t)
|
cln := newLocalListener(t)
|
||||||
m.ClientListeners = []net.Listener{cln}
|
m.ClientListeners = []net.Listener{cln}
|
||||||
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
|
m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
m.ClientTLSInfo = clientTLS
|
||||||
|
|
||||||
m.Name = name
|
m.Name = name
|
||||||
|
|
||||||
@ -417,7 +423,9 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
|
|||||||
}
|
}
|
||||||
m.InitialClusterToken = clusterName
|
m.InitialClusterToken = clusterName
|
||||||
m.NewCluster = true
|
m.NewCluster = true
|
||||||
m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo
|
if m.PeerTLSInfo != nil {
|
||||||
|
m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
|
||||||
|
}
|
||||||
m.ElectionTicks = electionTicks
|
m.ElectionTicks = electionTicks
|
||||||
m.TickMs = uint(tickDuration / time.Millisecond)
|
m.TickMs = uint(tickDuration / time.Millisecond)
|
||||||
return m
|
return m
|
||||||
@ -428,7 +436,8 @@ func (m *member) listenGRPC() error {
|
|||||||
if m.V3demo == false {
|
if m.V3demo == false {
|
||||||
return fmt.Errorf("starting grpc server without v3 configured")
|
return fmt.Errorf("starting grpc server without v3 configured")
|
||||||
}
|
}
|
||||||
m.grpcAddr = m.Name + ".sock"
|
// prefix with localhost so cert has right domain
|
||||||
|
m.grpcAddr = "localhost:" + m.Name + ".sock"
|
||||||
if err := os.RemoveAll(m.grpcAddr); err != nil {
|
if err := os.RemoveAll(m.grpcAddr); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -449,7 +458,21 @@ func NewClientV3(m *member) (*clientv3.Client, error) {
|
|||||||
return net.Dial("unix", a)
|
return net.Dial("unix", a)
|
||||||
}
|
}
|
||||||
unixdialer := grpc.WithDialer(f)
|
unixdialer := grpc.WithDialer(f)
|
||||||
conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer)
|
opts := []grpc.DialOption{
|
||||||
|
unixdialer,
|
||||||
|
grpc.WithBlock(),
|
||||||
|
grpc.WithTimeout(5 * time.Second)}
|
||||||
|
if m.ClientTLSInfo != nil {
|
||||||
|
tlscfg, err := m.ClientTLSInfo.ClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
creds := credentials.NewTLS(tlscfg)
|
||||||
|
opts = append(opts, grpc.WithTransportCredentials(creds))
|
||||||
|
} else {
|
||||||
|
opts = append(opts, grpc.WithInsecure())
|
||||||
|
}
|
||||||
|
conn, err := grpc.Dial(m.grpcAddr, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -484,6 +507,7 @@ func (m *member) Clone(t *testing.T) *member {
|
|||||||
mm.InitialClusterToken = m.InitialClusterToken
|
mm.InitialClusterToken = m.InitialClusterToken
|
||||||
mm.ElectionTicks = m.ElectionTicks
|
mm.ElectionTicks = m.ElectionTicks
|
||||||
mm.PeerTLSInfo = m.PeerTLSInfo
|
mm.PeerTLSInfo = m.PeerTLSInfo
|
||||||
|
mm.ClientTLSInfo = m.ClientTLSInfo
|
||||||
return mm
|
return mm
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -504,7 +528,7 @@ func (m *member) Launch() error {
|
|||||||
Listener: ln,
|
Listener: ln,
|
||||||
Config: &http.Server{Handler: m.raftHandler},
|
Config: &http.Server{Handler: m.raftHandler},
|
||||||
}
|
}
|
||||||
if m.PeerTLSInfo.Empty() {
|
if m.PeerTLSInfo == nil {
|
||||||
hs.Start()
|
hs.Start()
|
||||||
} else {
|
} else {
|
||||||
hs.TLS, err = m.PeerTLSInfo.ServerConfig()
|
hs.TLS, err = m.PeerTLSInfo.ServerConfig()
|
||||||
@ -520,21 +544,26 @@ func (m *member) Launch() error {
|
|||||||
Listener: ln,
|
Listener: ln,
|
||||||
Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
|
Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
|
||||||
}
|
}
|
||||||
hs.Start()
|
if m.ClientTLSInfo == nil {
|
||||||
|
hs.Start()
|
||||||
|
} else {
|
||||||
|
hs.TLS, err = m.ClientTLSInfo.ServerConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
hs.StartTLS()
|
||||||
|
}
|
||||||
m.hss = append(m.hss, hs)
|
m.hss = append(m.hss, hs)
|
||||||
}
|
}
|
||||||
if m.grpcListener != nil {
|
if m.grpcListener != nil {
|
||||||
m.grpcServer = grpc.NewServer()
|
m.grpcServer, err = v3rpc.Server(m.s, m.ClientTLSInfo)
|
||||||
etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s))
|
|
||||||
etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s))
|
|
||||||
etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s))
|
|
||||||
go m.grpcServer.Serve(m.grpcListener)
|
go m.grpcServer.Serve(m.grpcListener)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *member) WaitOK(t *testing.T) {
|
func (m *member) WaitOK(t *testing.T) {
|
||||||
cc := mustNewHTTPClient(t, []string{m.URL()})
|
cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
for {
|
for {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
@ -612,8 +641,12 @@ func (m *member) Terminate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustNewHTTPClient(t *testing.T, eps []string) client.Client {
|
func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
|
||||||
cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps}
|
cfgtls := transport.TLSInfo{}
|
||||||
|
if tls != nil {
|
||||||
|
cfgtls = *tls
|
||||||
|
}
|
||||||
|
cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
|
||||||
c, err := client.New(cfg)
|
c, err := client.New(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -51,7 +51,7 @@ func testCluster(t *testing.T, size int) {
|
|||||||
|
|
||||||
func TestTLSClusterOf3(t *testing.T) {
|
func TestTLSClusterOf3(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true})
|
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
|
||||||
c.Launch(t)
|
c.Launch(t)
|
||||||
defer c.Terminate(t)
|
defer c.Terminate(t)
|
||||||
clusterMustProgress(t, c.Members)
|
clusterMustProgress(t, c.Members)
|
||||||
@ -66,7 +66,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
|
|||||||
dc.Launch(t)
|
dc.Launch(t)
|
||||||
defer dc.Terminate(t)
|
defer dc.Terminate(t)
|
||||||
// init discovery token space
|
// init discovery token space
|
||||||
dcc := mustNewHTTPClient(t, dc.URLs())
|
dcc := mustNewHTTPClient(t, dc.URLs(), nil)
|
||||||
dkapi := client.NewKeysAPI(dcc)
|
dkapi := client.NewKeysAPI(dcc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
|
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
|
||||||
@ -89,7 +89,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
|
|||||||
dc.Launch(t)
|
dc.Launch(t)
|
||||||
defer dc.Terminate(t)
|
defer dc.Terminate(t)
|
||||||
// init discovery token space
|
// init discovery token space
|
||||||
dcc := mustNewHTTPClient(t, dc.URLs())
|
dcc := mustNewHTTPClient(t, dc.URLs(), nil)
|
||||||
dkapi := client.NewKeysAPI(dcc)
|
dkapi := client.NewKeysAPI(dcc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
|
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
|
||||||
@ -100,7 +100,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
|
|||||||
c := NewClusterByConfig(t,
|
c := NewClusterByConfig(t,
|
||||||
&ClusterConfig{
|
&ClusterConfig{
|
||||||
Size: 3,
|
Size: 3,
|
||||||
UsePeerTLS: true,
|
PeerTLS: &testTLSInfo,
|
||||||
DiscoveryURL: dc.URL(0) + "/v2/keys"},
|
DiscoveryURL: dc.URL(0) + "/v2/keys"},
|
||||||
)
|
)
|
||||||
c.Launch(t)
|
c.Launch(t)
|
||||||
@ -125,7 +125,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
|
|||||||
|
|
||||||
func TestDoubleTLSClusterSizeOf3(t *testing.T) {
|
func TestDoubleTLSClusterSizeOf3(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true})
|
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
|
||||||
c.Launch(t)
|
c.Launch(t)
|
||||||
defer c.Terminate(t)
|
defer c.Terminate(t)
|
||||||
|
|
||||||
@ -156,7 +156,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
|
|||||||
func TestForceNewCluster(t *testing.T) {
|
func TestForceNewCluster(t *testing.T) {
|
||||||
c := NewCluster(t, 3)
|
c := NewCluster(t, 3)
|
||||||
c.Launch(t)
|
c.Launch(t)
|
||||||
cc := mustNewHTTPClient(t, []string{c.Members[0].URL()})
|
cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
resp, err := kapi.Create(ctx, "/foo", "bar")
|
resp, err := kapi.Create(ctx, "/foo", "bar")
|
||||||
@ -183,7 +183,7 @@ func TestForceNewCluster(t *testing.T) {
|
|||||||
c.waitLeader(t, c.Members[:1])
|
c.waitLeader(t, c.Members[:1])
|
||||||
|
|
||||||
// use new http client to init new connection
|
// use new http client to init new connection
|
||||||
cc = mustNewHTTPClient(t, []string{c.Members[0].URL()})
|
cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
||||||
kapi = client.NewKeysAPI(cc)
|
kapi = client.NewKeysAPI(cc)
|
||||||
// ensure force restart keep the old data, and new cluster can make progress
|
// ensure force restart keep the old data, and new cluster can make progress
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
|
||||||
@ -267,7 +267,7 @@ func TestIssue2904(t *testing.T) {
|
|||||||
c.Members[1].Stop(t)
|
c.Members[1].Stop(t)
|
||||||
|
|
||||||
// send remove member-1 request to the cluster.
|
// send remove member-1 request to the cluster.
|
||||||
cc := mustNewHTTPClient(t, c.URLs())
|
cc := mustNewHTTPClient(t, c.URLs(), nil)
|
||||||
ma := client.NewMembersAPI(cc)
|
ma := client.NewMembersAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
// the proposal is not committed because member 1 is stopped, but the
|
// the proposal is not committed because member 1 is stopped, but the
|
||||||
@ -294,7 +294,7 @@ func TestIssue2904(t *testing.T) {
|
|||||||
// a random key first, and check the new key could be got from all client urls
|
// a random key first, and check the new key could be got from all client urls
|
||||||
// of the cluster.
|
// of the cluster.
|
||||||
func clusterMustProgress(t *testing.T, membs []*member) {
|
func clusterMustProgress(t *testing.T, membs []*member) {
|
||||||
cc := mustNewHTTPClient(t, []string{membs[0].URL()})
|
cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
key := fmt.Sprintf("foo%d", rand.Int())
|
key := fmt.Sprintf("foo%d", rand.Int())
|
||||||
@ -306,7 +306,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
|
|||||||
|
|
||||||
for i, m := range membs {
|
for i, m := range membs {
|
||||||
u := m.URL()
|
u := m.URL()
|
||||||
mcc := mustNewHTTPClient(t, []string{u})
|
mcc := mustNewHTTPClient(t, []string{u}, nil)
|
||||||
mkapi := client.NewKeysAPI(mcc)
|
mkapi := client.NewKeysAPI(mcc)
|
||||||
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
|
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
|
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
|
||||||
|
@ -83,7 +83,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
|
|||||||
|
|
||||||
func TestSnapshotAndRestartMember(t *testing.T) {
|
func TestSnapshotAndRestartMember(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
m := mustNewMember(t, "snapAndRestartTest", false)
|
m := mustNewMember(t, "snapAndRestartTest", nil, nil)
|
||||||
m.SnapCount = 100
|
m.SnapCount = 100
|
||||||
m.Launch()
|
m.Launch()
|
||||||
defer m.Terminate(t)
|
defer m.Terminate(t)
|
||||||
@ -92,7 +92,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
|
|||||||
resps := make([]*client.Response, 120)
|
resps := make([]*client.Response, 120)
|
||||||
var err error
|
var err error
|
||||||
for i := 0; i < 120; i++ {
|
for i := 0; i < 120; i++ {
|
||||||
cc := mustNewHTTPClient(t, []string{m.URL()})
|
cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
key := fmt.Sprintf("foo%d", i)
|
key := fmt.Sprintf("foo%d", i)
|
||||||
@ -106,7 +106,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
|
|||||||
m.Restart(t)
|
m.Restart(t)
|
||||||
|
|
||||||
for i := 0; i < 120; i++ {
|
for i := 0; i < 120; i++ {
|
||||||
cc := mustNewHTTPClient(t, []string{m.URL()})
|
cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
key := fmt.Sprintf("foo%d", i)
|
key := fmt.Sprintf("foo%d", i)
|
||||||
|
@ -23,7 +23,7 @@ import (
|
|||||||
|
|
||||||
func TestUpgradeMember(t *testing.T) {
|
func TestUpgradeMember(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
m := mustNewMember(t, "integration046", false)
|
m := mustNewMember(t, "integration046", nil, nil)
|
||||||
cmd := exec.Command("cp", "-r", "testdata/integration046_data/conf", "testdata/integration046_data/log", "testdata/integration046_data/snapshot", m.DataDir)
|
cmd := exec.Command("cp", "-r", "testdata/integration046_data/conf", "testdata/integration046_data/log", "testdata/integration046_data/snapshot", m.DataDir)
|
||||||
err := cmd.Run()
|
err := cmd.Run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
"github.com/coreos/etcd/lease"
|
"github.com/coreos/etcd/lease"
|
||||||
@ -1416,3 +1417,99 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
|
|||||||
t.Fatalf("lease removed but key remains")
|
t.Fatalf("lease removed but key remains")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
|
||||||
|
cfg.UseV3 = true
|
||||||
|
cfg.UseGRPC = true
|
||||||
|
clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
|
||||||
|
clus.Launch(t)
|
||||||
|
return clus
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
|
||||||
|
func TestTLSGRPCRejectInsecureClient(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
|
||||||
|
clus := newClusterV3NoClients(t, &cfg)
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
// nil out TLS field so client will use an insecure connection
|
||||||
|
clus.Members[0].ClientTLSInfo = nil
|
||||||
|
client, err := NewClientV3(clus.Members[0])
|
||||||
|
if err != nil && err != grpc.ErrClientConnTimeout {
|
||||||
|
t.Fatalf("unexpected error (%v)", err)
|
||||||
|
} else if client == nil {
|
||||||
|
// Ideally, no client would be returned. However, grpc will
|
||||||
|
// return a connection without trying to handshake first so
|
||||||
|
// the connection appears OK.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||||
|
conn := client.ActiveConnection()
|
||||||
|
st, err := conn.State()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if st != grpc.Ready {
|
||||||
|
t.Fatalf("expected Ready, got %v", st)
|
||||||
|
}
|
||||||
|
|
||||||
|
// rpc will fail to handshake, triggering a connection state change
|
||||||
|
donec := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
||||||
|
_, perr := client.KV.Put(ctx, reqput)
|
||||||
|
donec <- perr
|
||||||
|
}()
|
||||||
|
|
||||||
|
st, err = conn.WaitForStateChange(ctx, st)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for change (%v)", err)
|
||||||
|
} else if st != grpc.Connecting {
|
||||||
|
t.Fatalf("expected connecting state, got %v", st)
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
if perr := <-donec; perr == nil {
|
||||||
|
t.Fatalf("expected client error on put")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
|
||||||
|
func TestTLSGRPCRejectSecureClient(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
cfg := ClusterConfig{Size: 3}
|
||||||
|
clus := newClusterV3NoClients(t, &cfg)
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
clus.Members[0].ClientTLSInfo = &testTLSInfo
|
||||||
|
client, err := NewClientV3(clus.Members[0])
|
||||||
|
if client != nil || err == nil {
|
||||||
|
t.Fatalf("expected no client")
|
||||||
|
} else if err != grpc.ErrClientConnTimeout {
|
||||||
|
t.Fatalf("unexpected error (%v)", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
|
||||||
|
func TestTLSGRPCAcceptSecureAll(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
|
||||||
|
clus := newClusterV3NoClients(t, &cfg)
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
client, err := NewClientV3(clus.Members[0])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected tls client (%v)", err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
||||||
|
if _, err := client.KV.Put(context.TODO(), reqput); err != nil {
|
||||||
|
t.Fatalf("unexpected error on put over tls (%v)", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
||||||
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
// This represents the base command when called without any subcommands
|
// This represents the base command when called without any subcommands
|
||||||
@ -40,6 +41,8 @@ var (
|
|||||||
results chan result
|
results chan result
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
tls transport.TLSInfo
|
||||||
|
|
||||||
cpuProfPath string
|
cpuProfPath string
|
||||||
memProfPath string
|
memProfPath string
|
||||||
)
|
)
|
||||||
@ -48,4 +51,8 @@ func init() {
|
|||||||
RootCmd.PersistentFlags().StringVar(&endpoints, "endpoint", "127.0.0.1:2378", "comma-separated gRPC endpoints")
|
RootCmd.PersistentFlags().StringVar(&endpoints, "endpoint", "127.0.0.1:2378", "comma-separated gRPC endpoints")
|
||||||
RootCmd.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections")
|
RootCmd.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections")
|
||||||
RootCmd.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients")
|
RootCmd.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients")
|
||||||
|
|
||||||
|
RootCmd.PersistentFlags().StringVar(&tls.CertFile, "cert", "", "identify HTTPS client using this SSL certificate file")
|
||||||
|
RootCmd.PersistentFlags().StringVar(&tls.KeyFile, "key", "", "identify HTTPS client using this SSL key file")
|
||||||
|
RootCmd.PersistentFlags().StringVar(&tls.CAFile, "cacert", "", "verify certificates of HTTPS-enabled servers using this CA bundle")
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,16 @@ func mustCreateConn() *clientv3.Client {
|
|||||||
eps := strings.Split(endpoints, ",")
|
eps := strings.Split(endpoints, ",")
|
||||||
endpoint := eps[dialTotal%len(eps)]
|
endpoint := eps[dialTotal%len(eps)]
|
||||||
dialTotal++
|
dialTotal++
|
||||||
client, err := clientv3.NewFromURL(endpoint)
|
cfgtls := &tls
|
||||||
|
if cfgtls.Empty() {
|
||||||
|
cfgtls = nil
|
||||||
|
}
|
||||||
|
client, err := clientv3.New(
|
||||||
|
clientv3.Config{
|
||||||
|
Endpoints: []string{endpoint},
|
||||||
|
TLS: cfgtls,
|
||||||
|
},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "dial error: %v\n", err)
|
fmt.Fprintf(os.Stderr, "dial error: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user