From d21d2e6624bc5400e52baa45804668f5f9b6a161 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 7 Mar 2016 13:42:52 -0800 Subject: [PATCH] clientv3: don't deadlock on Close with broken connection Fixes #4679 --- clientv3/client.go | 30 +++++++++++++++++++++++------- integration/v3_grpc_test.go | 5 ++--- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index 4ec061db3..99ea7269a 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -87,12 +87,13 @@ func NewFromURL(url string) (*Client, error) { // Close shuts down the client's etcd connections. func (c *Client) Close() error { c.mu.Lock() - defer c.mu.Unlock() if c.cancel == nil { + c.mu.Unlock() return nil } c.cancel() c.cancel = nil + c.mu.Unlock() c.Watcher.Close() c.Lease.Close() return c.conn.Close() @@ -126,14 +127,22 @@ func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) { } else { opts = append(opts, grpc.WithInsecure()) } + + proto := "tcp" if url, uerr := url.Parse(endpoint); uerr == nil && url.Scheme == "unix" { - f := func(a string, t time.Duration) (net.Conn, error) { - return net.DialTimeout("unix", a, t) - } + proto = "unix" // strip unix:// prefix so certs work endpoint = url.Host - opts = append(opts, grpc.WithDialer(f)) } + f := func(a string, t time.Duration) (net.Conn, error) { + select { + case <-c.ctx.Done(): + return nil, c.ctx.Err() + default: + } + return net.DialTimeout(proto, a, t) + } + opts = append(opts, grpc.WithDialer(f)) conn, err := grpc.Dial(endpoint, opts...) if err != nil { @@ -156,11 +165,11 @@ func newClient(cfg *Config) (*Client, error) { creds = &c } // use a temporary skeleton client to bootstrap first connection - conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds}) + ctx, cancel := context.WithCancel(context.TODO()) + conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds, ctx: ctx}) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(context.TODO()) client := &Client{ conn: conn, cfg: *cfg, @@ -198,6 +207,13 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli // conn has already been updated return c.conn, nil } + + oldConn.Close() + if st, _ := oldConn.State(); st != grpc.Shutdown { + // wait for shutdown so grpc doesn't leak sleeping goroutines + oldConn.WaitForStateChange(c.ctx, st) + } + conn, dialErr := c.cfg.RetryDialer(c) if dialErr != nil { c.errors = append(c.errors, dialErr) diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index a05b08e2a..5cb502573 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -675,10 +675,9 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) { st, err = conn.WaitForStateChange(ctx, st) if err != nil { t.Fatalf("unexpected error waiting for change (%v)", err) - } else if st != grpc.Connecting && st != grpc.TransientFailure { - t.Fatalf("expected connecting or transient failure state, got %v", st) + } else if st == grpc.Ready { + t.Fatalf("expected failure state, got %v", st) } - cancel() if perr := <-donec; perr == nil { t.Fatalf("expected client error on put")