Merge pull request #9945 from jpbetz/balancer-maintanance-api-fix

clientv3: Fix maintenance APIs to directly dial grpc endpoints directly
This commit is contained in:
Joe Betz 2018-07-27 10:15:25 -07:00 committed by GitHub
commit 1c382a4868
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 118 additions and 41 deletions

1
.words
View File

@ -30,6 +30,7 @@ gRPC
goroutine
goroutines
healthcheck
hostname
iff
inflight
keepalive

View File

@ -227,3 +227,14 @@ func ParseTarget(target string) (string, string, error) {
}
return parts[0], parts[1], nil
}
// ParseHostPort splits a "<host>:<port>" string into the host and port parts.
// The port part is optional.
func ParseHostPort(hostPort string) (host string, port string) {
parts := strings.SplitN(hostPort, ":", 2)
host = parts[0]
if len(parts) > 1 {
port = parts[1]
}
return host, port
}

View File

@ -229,13 +229,8 @@ func (c *Client) processCreds(scheme string) (creds *credentials.TransportCreden
return creds
}
// dialSetupOpts gives the dial opts prior to any authentication
func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
_, ep, err := endpoint.ParseTarget(target)
if err != nil {
return nil, fmt.Errorf("unable to parse target: %v", err)
}
// dialSetupOpts gives the dial opts prior to any authentication.
func (c *Client) dialSetupOpts(creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
if c.cfg.DialKeepAliveTime > 0 {
params := keepalive.ClientParameters{
Time: c.cfg.DialKeepAliveTime,
@ -245,16 +240,9 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
}
opts = append(opts, dopts...)
// Provide a net dialer that supports cancelation and timeout.
f := func(dialEp string, t time.Duration) (net.Conn, error) {
proto, host, _ := endpoint.ParseEndpoint(dialEp)
if host == "" && ep != "" {
// dialing an endpoint not in the balancer; use
// endpoint passed into dial
proto, host, _ = endpoint.ParseEndpoint(ep)
}
if proto == "" {
return nil, fmt.Errorf("unknown scheme for %q", host)
}
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
@ -265,10 +253,6 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
}
opts = append(opts, grpc.WithDialer(f))
creds := c.creds
if _, _, scheme := endpoint.ParseEndpoint(ep); len(scheme) != 0 {
creds = c.processCreds(scheme)
}
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(*creds))
} else {
@ -291,8 +275,13 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
}
// Dial connects to a single endpoint using the client's config.
func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
return c.dial(endpoint)
func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
creds := c.directDialCreds(ep)
// Use the grpc passthrough resolver to directly dial a single endpoint.
// This resolver passes through the 'unix' and 'unixs' endpoints schemes used
// by etcd without modification, allowing us to directly dial endpoints and
// using the same dial functions that we use for load balancer dialing.
return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds)
}
func (c *Client) getToken(ctx context.Context) error {
@ -305,7 +294,8 @@ func (c *Client) getToken(ctx context.Context) error {
var dOpts []grpc.DialOption
_, host, _ := endpoint.ParseEndpoint(ep)
target := c.resolverGroup.Target(host)
dOpts, err = c.dialSetupOpts(target, c.cfg.DialOptions...)
creds := c.dialWithBalancerCreds(ep)
dOpts, err = c.dialSetupOpts(creds, c.cfg.DialOptions...)
if err != nil {
err = fmt.Errorf("failed to configure auth dialer: %v", err)
continue
@ -333,13 +323,18 @@ func (c *Client) getToken(ctx context.Context) error {
return err
}
func (c *Client) dial(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
// We pass a target to DialContext of the form: endpoint://<clusterName>/<host-part> that
// does not include scheme (http/https/unix/unixs) or path parts.
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
_, host, _ := endpoint.ParseEndpoint(ep)
target := c.resolverGroup.Target(host)
creds := c.dialWithBalancerCreds(ep)
return c.dial(target, creds, dopts...)
}
opts, err := c.dialSetupOpts(target, dopts...)
// dial configures and dials any grpc balancer target.
func (c *Client) dial(target string, creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := c.dialSetupOpts(creds, dopts...)
if err != nil {
return nil, fmt.Errorf("failed to configure dialer: %v", err)
}
@ -385,6 +380,34 @@ func (c *Client) dial(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, er
return conn, nil
}
func (c *Client) directDialCreds(ep string) *credentials.TransportCredentials {
_, hostPort, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
if creds != nil {
c := *creds
clone := c.Clone()
// Set the server name must to the endpoint hostname without port since grpc
// otherwise attempts to check if x509 cert is valid for the full endpoint
// including the scheme and port, which fails.
host, _ := endpoint.ParseHostPort(hostPort)
clone.OverrideServerName(host)
creds = &clone
}
}
return creds
}
func (c *Client) dialWithBalancerCreds(ep string) *credentials.TransportCredentials {
_, _, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
}
return creds
}
// WithRequireLeader requires client requests to only succeed
// when the cluster has a leader.
func WithRequireLeader(ctx context.Context) context.Context {
@ -467,7 +490,7 @@ func newClient(cfg *Config) (*Client, error) {
// Use an provided endpoint target so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dial(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
if err != nil {
client.cancel()
client.resolverGroup.Close()

View File

@ -79,17 +79,11 @@ func TestDialTLSNoConfig(t *testing.T) {
DialTimeout: time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
t.Fatal(err)
}
defer c.Close()
// TODO: this should not be required when we set grpc.WithBlock()
if c != nil {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
_, err = c.KV.Get(ctx, "/")
cancel()
}
defer func() {
if c != nil {
c.Close()
}
}()
if !isClientTimeout(err) {
t.Fatalf("expected dial timeout error, got %v", err)
}

View File

@ -25,7 +25,9 @@ import (
"time"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/lease"
@ -193,3 +195,47 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
t.Errorf("expected client timeout, got %v", err)
}
}
func TestMaintenanceStatus(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
clus.WaitLeader(t)
eps := make([]string, 3)
for i := 0; i < 3; i++ {
eps[i] = clus.Members[i].GRPCAddr()
}
cli, err := clientv3.New(clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()
prevID, leaderFound := uint64(0), false
for i := 0; i < 3; i++ {
resp, err := cli.Status(context.TODO(), eps[i])
if err != nil {
t.Fatal(err)
}
if prevID == 0 {
prevID, leaderFound = resp.Header.MemberId, resp.Header.MemberId == resp.Leader
continue
}
if prevID == resp.Header.MemberId {
t.Errorf("#%d: status returned duplicate member ID with %016x", i, prevID)
}
if leaderFound && resp.Header.MemberId == resp.Leader {
t.Errorf("#%d: leader already found, but found another %016x", i, resp.Header.MemberId)
}
if !leaderFound {
leaderFound = resp.Header.MemberId == resp.Leader
}
}
if !leaderFound {
t.Fatal("no leader found")
}
}

View File

@ -76,7 +76,7 @@ type maintenance struct {
func NewMaintenance(c *Client) Maintenance {
api := &maintenance{
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
conn, err := c.dial(endpoint)
conn, err := c.Dial(endpoint)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
}

View File

@ -72,10 +72,12 @@ func testCtlV3MoveLeader(t *testing.T, cfg etcdProcessClusterConfig) {
if err != nil {
t.Fatal(err)
}
resp, err := cli.Status(context.Background(), ep)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp, err := cli.Status(ctx, ep)
if err != nil {
t.Fatal(err)
t.Fatalf("failed to get status from endpoint %s: %v", ep, err)
}
cancel()
cli.Close()
if resp.Header.GetMemberId() == resp.Leader {