From 5f5a203e27386f0f9fd323f54a54da36c45d712c Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 2 Jun 2016 10:54:01 -0700 Subject: [PATCH 1/3] clientv3: don't hold client lock while dialing Causes async reconnect to block while the client is dialing. This was also causing problems with the Close error message, so now Close() will return the last dial error (if any) instead of clearing it out with a cancel(). Fixes #5416 --- clientv3/client.go | 85 ++++++++++++++++++++------------ clientv3/integration/kv_test.go | 20 +++++++- clientv3/integration/txn_test.go | 2 +- clientv3/remote_client.go | 4 +- 4 files changed, 76 insertions(+), 35 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index 71492b422..8ad7f0cf9 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -101,25 +101,39 @@ func NewFromConfigFile(path string) (*Client, error) { } // Close shuts down the client's etcd connections. -func (c *Client) Close() error { +func (c *Client) Close() (err error) { c.mu.Lock() defer c.mu.Unlock() + + // acquire the cancel if c.cancel == nil { - return nil + // already canceled + if c.lastConnErr != c.ctx.Err() { + err = c.lastConnErr + } + return } - c.cancel() + cancel := c.cancel c.cancel = nil - connc := c.newconnc c.mu.Unlock() - c.connStartRetry(nil) + + // close watcher and lease before terminating connection + // so they don't retry on a closed client c.Watcher.Close() c.Lease.Close() + + // cancel reconnection loop + cancel() + c.mu.Lock() + connc := c.newconnc + c.mu.Unlock() + // connc on cancel() is left closed <-connc c.mu.Lock() if c.lastConnErr != c.ctx.Err() { - return c.lastConnErr + err = c.lastConnErr } - return nil + return } // Ctx is a context for "out of band" messages (e.g., for sending @@ -282,30 +296,41 @@ func (c *Client) ActiveConnection() *grpc.ClientConn { } // retryConnection establishes a new connection -func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr error) { +func (c *Client) retryConnection(err error) { + oldconn := c.conn + + // return holding lock so old connection can be cleaned up in this defer + defer func() { + if oldconn != nil { + oldconn.Close() + if st, _ := oldconn.State(); st != grpc.Shutdown { + // wait so grpc doesn't leak sleeping goroutines + oldconn.WaitForStateChange(context.Background(), st) + } + } + c.mu.Unlock() + }() + c.mu.Lock() - defer c.mu.Unlock() if err != nil { c.errors = append(c.errors, err) } - if c.conn != nil { - c.conn.Close() - if st, _ := c.conn.State(); st != grpc.Shutdown { - // wait so grpc doesn't leak sleeping goroutines - c.conn.WaitForStateChange(context.Background(), st) - } - c.conn = nil - } if c.cancel == nil { // client has called Close() so don't try to dial out - return nil, c.ctx.Err() + return } + c.mu.Unlock() - c.conn, dialErr = c.cfg.retryDialer(c) + nc, dialErr := c.cfg.retryDialer(c) + + c.mu.Lock() + if nc != nil { + c.conn = nc + } if dialErr != nil { c.errors = append(c.errors, dialErr) } - return c.conn, dialErr + c.lastConnErr = dialErr } // connStartRetry schedules a reconnect if one is not already running @@ -321,17 +346,20 @@ func (c *Client) connStartRetry(err error) { // connWait waits for a reconnect to be processed func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, error) { - c.mu.Lock() + c.mu.RLock() ch := c.newconnc - c.mu.Unlock() + c.mu.RUnlock() c.connStartRetry(err) select { case <-ctx.Done(): return nil, ctx.Err() case <-ch: } - c.mu.Lock() - defer c.mu.Unlock() + c.mu.RLock() + defer c.mu.RUnlock() + if c.cancel == nil { + return c.conn, rpctypes.ErrConnClosed + } return c.conn, c.lastConnErr } @@ -340,11 +368,8 @@ func (c *Client) connMonitor() { var err error defer func() { - _, err = c.retryConnection(c.ctx.Err()) - c.mu.Lock() - c.lastConnErr = err + c.retryConnection(c.ctx.Err()) close(c.newconnc) - c.mu.Unlock() }() limiter := rate.NewLimiter(rate.Every(minConnRetryWait), 1) @@ -354,10 +379,8 @@ func (c *Client) connMonitor() { case <-c.ctx.Done(): return } - conn, connErr := c.retryConnection(err) + c.retryConnection(err) c.mu.Lock() - c.lastConnErr = connErr - c.conn = conn close(c.newconnc) c.newconnc = make(chan struct{}) c.reconnc = make(chan error, 1) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 24252c428..e9b320756 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -131,6 +131,13 @@ func TestKVPutWithRequireLeader(t *testing.T) { if err != rpctypes.ErrNoLeader { t.Fatal(err) } + + // clients may give timeout errors since the members are stopped; take + // the clients so that terminating the cluster won't complain + clus.Client(1).Close() + clus.Client(2).Close() + clus.TakeClient(1) + clus.TakeClient(2) } func TestKVRange(t *testing.T) { @@ -633,13 +640,22 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) + cli := clus.Client(0) clus.Members[0].Stop(t) // this Put fails and triggers an asynchronous connection retry - _, err := clus.Client(0).Put(context.TODO(), "abc", "123") + _, err := cli.Put(context.TODO(), "abc", "123") if err == nil || (!strings.Contains(err.Error(), "connection is closing") && !strings.Contains(err.Error(), "transport is closing")) { t.Fatal(err) } - // cluster will terminate and close the client with the retry in-flight + + // wait some so the client closes with the retry in-flight + time.Sleep(time.Second) + + // get the timeout + clus.TakeClient(0) + if err := cli.Close(); err == nil || !strings.Contains(err.Error(), "timed out") { + t.Fatal(err) + } } diff --git a/clientv3/integration/txn_test.go b/clientv3/integration/txn_test.go index f381ccfad..68633e6fa 100644 --- a/clientv3/integration/txn_test.go +++ b/clientv3/integration/txn_test.go @@ -74,7 +74,7 @@ func TestTxnWriteFail(t *testing.T) { dialTimeout := 5 * time.Second select { - case <-time.After(2*dialTimeout + time.Second): + case <-time.After(dialTimeout + time.Second): t.Fatalf("timed out waiting for txn to fail") case <-donec: // don't restart cluster until txn errors out diff --git a/clientv3/remote_client.go b/clientv3/remote_client.go index b51116305..3bc3484c2 100644 --- a/clientv3/remote_client.go +++ b/clientv3/remote_client.go @@ -88,9 +88,11 @@ func (r *remoteClient) acquire(ctx context.Context) error { r.client.mu.RLock() closed := r.client.cancel == nil c := r.client.conn + lastConnErr := r.client.lastConnErr match := r.conn == c r.mu.Unlock() - if c != nil && match { + if lastConnErr == nil && match { + // new connection already return nil } r.client.mu.RUnlock() From 267d1cb16fc75e4d56f310bf5109d1a782b3576a Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 2 Jun 2016 23:59:49 -0700 Subject: [PATCH 2/3] clientv3: fix watch to reconnect on failure It was spinning before. --- clientv3/watch.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clientv3/watch.go b/clientv3/watch.go index 3c85a2add..f3a8d7bbb 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -521,6 +521,9 @@ func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) { return nil, v3rpc.Error(err) } w.rc.release() + if nerr := w.rc.reconnectWait(w.ctx, err); nerr != nil { + return nil, v3rpc.Error(nerr) + } } return ws, nil } From 7dfe7db2431bdfe1247cbc5fbc2efabaed67aae6 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 3 Jun 2016 00:16:52 -0700 Subject: [PATCH 3/3] clientv3: panic if ActiveConnection tries to return non-nil connection --- clientv3/client.go | 3 +++ clientv3/txn_test.go | 2 +- integration/cluster.go | 10 +--------- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index 8ad7f0cf9..349014d14 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -292,6 +292,9 @@ func newClient(cfg *Config) (*Client, error) { func (c *Client) ActiveConnection() *grpc.ClientConn { c.mu.RLock() defer c.mu.RUnlock() + if c.conn == nil { + panic("trying to return nil active connection") + } return c.conn } diff --git a/clientv3/txn_test.go b/clientv3/txn_test.go index 4bedf21f1..e936d34ee 100644 --- a/clientv3/txn_test.go +++ b/clientv3/txn_test.go @@ -20,7 +20,7 @@ import ( ) func TestTxnPanics(t *testing.T) { - kv := NewKV(&Client{}) + kv := &kv{} errc := make(chan string) df := func() { diff --git a/integration/cluster.go b/integration/cluster.go index 7561df298..c339e37c9 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -795,15 +795,7 @@ func (c *ClusterV3) Terminate(t *testing.T) { } func (c *ClusterV3) RandClient() *clientv3.Client { - for i := 0; i < 100; i++ { - cli := c.clients[rand.Intn(len(c.clients))] - if cli.ActiveConnection() == nil { - time.Sleep(10 * time.Millisecond) - continue - } - return cli - } - panic("failed to get a active client") + return c.clients[rand.Intn(len(c.clients))] } func (c *ClusterV3) Client(i int) *clientv3.Client {