clientv3: Fix dial calls to consistently use endpoint resolver, attempt to deflake alarm test

This commit is contained in:
Joe Betz 2018-04-26 13:22:31 -07:00 committed by Gyuho Lee
parent f84f554301
commit ee2747eba8
4 changed files with 36 additions and 25 deletions

View File

@ -110,15 +110,15 @@ func (r *Resolver) InitialAddrs(addrs []resolver.Address) {
r.Unlock() r.Unlock()
} }
// InitialEndpoints sets the initial endpoints to for the resolver and returns a grpc dial target. // InitialEndpoints sets the initial endpoints to for the resolver.
// This should be called before dialing. The endpoints may be updated after the dial using NewAddress. // This should be called before dialing. The endpoints may be updated after the dial using NewAddress.
// At least one endpoint is required. // At least one endpoint is required.
func (r *Resolver) InitialEndpoints(eps []string) (string, error) { func (r *Resolver) InitialEndpoints(eps []string) error {
if len(eps) < 1 { if len(eps) < 1 {
return "", fmt.Errorf("At least one endpoint is required, but got: %v", eps) return fmt.Errorf("At least one endpoint is required, but got: %v", eps)
} }
r.InitialAddrs(epsToAddrs(eps...)) r.InitialAddrs(epsToAddrs(eps...))
return r.Target(eps[0]), nil return nil
} }
// TODO: use balancer.epsToAddrs // TODO: use balancer.epsToAddrs

View File

@ -302,6 +302,7 @@ func (c *Client) getToken(ctx context.Context) error {
var dOpts []grpc.DialOption var dOpts []grpc.DialOption
dOpts, err = c.dialSetupOpts(c.resolver.Target(endpoint), c.cfg.DialOptions...) dOpts, err = c.dialSetupOpts(c.resolver.Target(endpoint), c.cfg.DialOptions...)
if err != nil { if err != nil {
err = fmt.Errorf("failed to configure auth dialer: %v", err)
continue continue
} }
auth, err = newAuthenticator(ctx, endpoint, dOpts, c) auth, err = newAuthenticator(ctx, endpoint, dOpts, c)
@ -327,9 +328,14 @@ func (c *Client) getToken(ctx context.Context) error {
} }
func (c *Client) dial(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { func (c *Client) dial(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := c.dialSetupOpts(ep, dopts...) // We pass a target to DialContext of the form: endpoint://<clusterName>/<host-part> that
// does not include scheme (http/https/unix/unixs) or path parts.
_, host, _ := endpoint.ParseEndpoint(ep)
target := c.resolver.Target(host)
opts, err := c.dialSetupOpts(target, dopts...)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to configure dialer: %v", err)
} }
if c.Username != "" && c.Password != "" { if c.Username != "" && c.Password != "" {
@ -366,18 +372,7 @@ func (c *Client) dial(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, er
defer cancel() defer cancel()
} }
// We pass a target to DialContext of the form: endpoint://<clusterName>/<host-part> that conn, err := grpc.DialContext(dctx, target, opts...)
// does not include scheme (http/https/unix/unixs) or path parts.
if endpoint.IsTarget(ep) {
clusterName, tep, err := endpoint.ParseTarget(ep)
if err != nil {
return nil, fmt.Errorf("failed to parse endpoint target '%s': %v", ep, err)
}
_, host, _ := endpoint.ParseEndpoint(tep)
ep = endpoint.Target(clusterName, host)
}
conn, err := grpc.DialContext(dctx, ep, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -443,20 +438,25 @@ func newClient(cfg *Config) (*Client, error) {
// Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass // Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
// to dial so the client knows to use this resolver. // to dial so the client knows to use this resolver.
client.resolver = endpoint.EndpointResolver(fmt.Sprintf("client-%s", strconv.FormatInt(time.Now().UnixNano(), 36))) client.resolver = endpoint.EndpointResolver(fmt.Sprintf("client-%s", strconv.FormatInt(time.Now().UnixNano(), 36)))
target, err := client.resolver.InitialEndpoints(cfg.Endpoints) err := client.resolver.InitialEndpoints(cfg.Endpoints)
if err != nil { if err != nil {
client.cancel() client.cancel()
client.resolver.Close() client.resolver.Close()
return nil, err return nil, err
} }
if len(cfg.Endpoints) < 1 {
return nil, fmt.Errorf("at least one Endpoint must is required in client config")
}
dialEndpoint := cfg.Endpoints[0]
// Use an provided endpoint target so that for https:// without any tls config given, then // Use an provided endpoint target so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host. // grpc will assume the certificate server name is the endpoint host.
conn, err := client.dial(target, grpc.WithBalancerName(roundRobinBalancerName)) conn, err := client.dial(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
if err != nil { if err != nil {
client.cancel() client.cancel()
client.resolver.Close() client.resolver.Close()
return nil, err return nil, fmt.Errorf("failed to dial initial client connection: %v", err)
} }
// TODO: With the old grpc balancer interface, we waited until the dial timeout // TODO: With the old grpc balancer interface, we waited until the dial timeout
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface? // for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?

View File

@ -16,6 +16,7 @@ package clientv3
import ( import (
"context" "context"
"fmt"
"io" "io"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -77,7 +78,7 @@ func NewMaintenance(c *Client) Maintenance {
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) { dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
conn, err := c.dial(endpoint) conn, err := c.dial(endpoint)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
} }
cancel := func() { conn.Close() } cancel := func() { conn.Close() }
return RetryMaintenanceClient(c, conn), cancel, nil return RetryMaintenanceClient(c, conn), cancel, nil
@ -175,6 +176,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) { func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
remote, cancel, err := m.dial(endpoint) remote, cancel, err := m.dial(endpoint)
if err != nil { if err != nil {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
defer cancel() defer cancel()

View File

@ -175,12 +175,20 @@ func TestV3CorruptAlarm(t *testing.T) {
s.Close() s.Close()
be.Close() be.Close()
clus.Members[1].WaitOK(t)
clus.Members[2].WaitOK(t)
time.Sleep(time.Second * 2)
// Wait for cluster so Puts succeed in case member 0 was the leader. // Wait for cluster so Puts succeed in case member 0 was the leader.
if _, err := clus.Client(1).Get(context.TODO(), "k"); err != nil { if _, err := clus.Client(1).Get(context.TODO(), "k"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
clus.Client(1).Put(context.TODO(), "xyz", "321") if _, err := clus.Client(1).Put(context.TODO(), "xyz", "321"); err != nil {
clus.Client(1).Put(context.TODO(), "abc", "fed") t.Fatal(err)
}
if _, err := clus.Client(1).Put(context.TODO(), "abc", "fed"); err != nil {
t.Fatal(err)
}
// Restart with corruption checking enabled. // Restart with corruption checking enabled.
clus.Members[1].Stop(t) clus.Members[1].Stop(t)
@ -189,7 +197,8 @@ func TestV3CorruptAlarm(t *testing.T) {
m.CorruptCheckTime = time.Second m.CorruptCheckTime = time.Second
m.Restart(t) m.Restart(t)
} }
// Member 0 restarts into split brain. clus.WaitLeader(t)
time.Sleep(time.Second * 2)
clus.Members[0].WaitStarted(t) clus.Members[0].WaitStarted(t)
resp0, err0 := clus.Client(0).Get(context.TODO(), "abc") resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")