From 527aa1a4996d686898e15803a4deda7ae0b903ba Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 12 May 2016 12:56:33 -0700 Subject: [PATCH] clientv3: fix Close after failed Put Was crashing on a nil connection. Reworked the shutdown path a little so there's only one connection close site. --- clientv3/client.go | 22 ++++++++++++++-------- clientv3/integration/kv_test.go | 15 +++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index 5a1049ede..6ea77ca20 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -89,19 +89,23 @@ func NewFromConfigFile(path string) (*Client, error) { // Close shuts down the client's etcd connections. func (c *Client) Close() error { c.mu.Lock() + defer c.mu.Unlock() if c.cancel == nil { - c.mu.Unlock() return nil } c.cancel() c.cancel = nil - err := c.conn.Close() connc := c.newconnc c.mu.Unlock() + c.connStartRetry(nil) c.Watcher.Close() c.Lease.Close() <-connc - return err + c.mu.Lock() + if c.lastConnErr != c.ctx.Err() { + return c.lastConnErr + } + return nil } // Ctx is a context for "out of band" messages (e.g., for sending @@ -213,16 +217,17 @@ func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr e if err != nil { c.errors = append(c.errors, err) } - if c.cancel == nil { - return nil, c.ctx.Err() - } if c.conn != nil { c.conn.Close() if st, _ := c.conn.State(); st != grpc.Shutdown { // wait so grpc doesn't leak sleeping goroutines - c.conn.WaitForStateChange(c.ctx, st) + c.conn.WaitForStateChange(context.Background(), st) } } + if c.cancel == nil { + // client has called Close() so don't try to dial out + return nil, c.ctx.Err() + } c.conn, dialErr = c.cfg.RetryDialer(c) if dialErr != nil { @@ -262,8 +267,9 @@ func (c *Client) connMonitor() { select { case err = <-c.reconnc: case <-c.ctx.Done(): + _, err = c.retryConnection(c.ctx.Err()) c.mu.Lock() - c.lastConnErr = c.ctx.Err() + c.lastConnErr = err close(c.newconnc) c.mu.Unlock() return diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index cfcb47aa3..b449e0de0 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -547,3 +547,18 @@ func TestKVGetCancel(t *testing.T) { t.Fatalf("cancel on get broke client connection") } } + +// TestKVPutStoppedServerAndClose ensures closing after a failed Put works. +func TestKVPutStoppedServerAndClose(t *testing.T) { + t.Parallel() + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + clus.Members[0].Stop(t) + // this Put fails and triggers an asynchronous connection retry + _, err := clus.Client(0).Put(context.TODO(), "abc", "123") + if err == nil || !strings.Contains(err.Error(), "connection is closing") { + t.Fatal(err) + } + // cluster will terminate and close the client with the retry in-flight +}