mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: don't hold client lock while dialing
Causes async reconnect to block while the client is dialing. This was also causing problems with the Close error message, so now Close() will return the last dial error (if any) instead of clearing it out with a cancel(). Fixes #5416
This commit is contained in:
parent
b3fee0abff
commit
5f5a203e27
@ -101,25 +101,39 @@ func NewFromConfigFile(path string) (*Client, error) {
|
||||
}
|
||||
|
||||
// Close shuts down the client's etcd connections.
|
||||
func (c *Client) Close() error {
|
||||
func (c *Client) Close() (err error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// acquire the cancel
|
||||
if c.cancel == nil {
|
||||
return nil
|
||||
// already canceled
|
||||
if c.lastConnErr != c.ctx.Err() {
|
||||
err = c.lastConnErr
|
||||
}
|
||||
return
|
||||
}
|
||||
c.cancel()
|
||||
cancel := c.cancel
|
||||
c.cancel = nil
|
||||
connc := c.newconnc
|
||||
c.mu.Unlock()
|
||||
c.connStartRetry(nil)
|
||||
|
||||
// close watcher and lease before terminating connection
|
||||
// so they don't retry on a closed client
|
||||
c.Watcher.Close()
|
||||
c.Lease.Close()
|
||||
|
||||
// cancel reconnection loop
|
||||
cancel()
|
||||
c.mu.Lock()
|
||||
connc := c.newconnc
|
||||
c.mu.Unlock()
|
||||
// connc on cancel() is left closed
|
||||
<-connc
|
||||
c.mu.Lock()
|
||||
if c.lastConnErr != c.ctx.Err() {
|
||||
return c.lastConnErr
|
||||
err = c.lastConnErr
|
||||
}
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// Ctx is a context for "out of band" messages (e.g., for sending
|
||||
@ -282,30 +296,41 @@ func (c *Client) ActiveConnection() *grpc.ClientConn {
|
||||
}
|
||||
|
||||
// retryConnection establishes a new connection
|
||||
func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr error) {
|
||||
func (c *Client) retryConnection(err error) {
|
||||
oldconn := c.conn
|
||||
|
||||
// return holding lock so old connection can be cleaned up in this defer
|
||||
defer func() {
|
||||
if oldconn != nil {
|
||||
oldconn.Close()
|
||||
if st, _ := oldconn.State(); st != grpc.Shutdown {
|
||||
// wait so grpc doesn't leak sleeping goroutines
|
||||
oldconn.WaitForStateChange(context.Background(), st)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if err != nil {
|
||||
c.errors = append(c.errors, err)
|
||||
}
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
if st, _ := c.conn.State(); st != grpc.Shutdown {
|
||||
// wait so grpc doesn't leak sleeping goroutines
|
||||
c.conn.WaitForStateChange(context.Background(), st)
|
||||
}
|
||||
c.conn = nil
|
||||
}
|
||||
if c.cancel == nil {
|
||||
// client has called Close() so don't try to dial out
|
||||
return nil, c.ctx.Err()
|
||||
return
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
c.conn, dialErr = c.cfg.retryDialer(c)
|
||||
nc, dialErr := c.cfg.retryDialer(c)
|
||||
|
||||
c.mu.Lock()
|
||||
if nc != nil {
|
||||
c.conn = nc
|
||||
}
|
||||
if dialErr != nil {
|
||||
c.errors = append(c.errors, dialErr)
|
||||
}
|
||||
return c.conn, dialErr
|
||||
c.lastConnErr = dialErr
|
||||
}
|
||||
|
||||
// connStartRetry schedules a reconnect if one is not already running
|
||||
@ -321,17 +346,20 @@ func (c *Client) connStartRetry(err error) {
|
||||
|
||||
// connWait waits for a reconnect to be processed
|
||||
func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, error) {
|
||||
c.mu.Lock()
|
||||
c.mu.RLock()
|
||||
ch := c.newconnc
|
||||
c.mu.Unlock()
|
||||
c.mu.RUnlock()
|
||||
c.connStartRetry(err)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-ch:
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
if c.cancel == nil {
|
||||
return c.conn, rpctypes.ErrConnClosed
|
||||
}
|
||||
return c.conn, c.lastConnErr
|
||||
}
|
||||
|
||||
@ -340,11 +368,8 @@ func (c *Client) connMonitor() {
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
_, err = c.retryConnection(c.ctx.Err())
|
||||
c.mu.Lock()
|
||||
c.lastConnErr = err
|
||||
c.retryConnection(c.ctx.Err())
|
||||
close(c.newconnc)
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
|
||||
limiter := rate.NewLimiter(rate.Every(minConnRetryWait), 1)
|
||||
@ -354,10 +379,8 @@ func (c *Client) connMonitor() {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
}
|
||||
conn, connErr := c.retryConnection(err)
|
||||
c.retryConnection(err)
|
||||
c.mu.Lock()
|
||||
c.lastConnErr = connErr
|
||||
c.conn = conn
|
||||
close(c.newconnc)
|
||||
c.newconnc = make(chan struct{})
|
||||
c.reconnc = make(chan error, 1)
|
||||
|
@ -131,6 +131,13 @@ func TestKVPutWithRequireLeader(t *testing.T) {
|
||||
if err != rpctypes.ErrNoLeader {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// clients may give timeout errors since the members are stopped; take
|
||||
// the clients so that terminating the cluster won't complain
|
||||
clus.Client(1).Close()
|
||||
clus.Client(2).Close()
|
||||
clus.TakeClient(1)
|
||||
clus.TakeClient(2)
|
||||
}
|
||||
|
||||
func TestKVRange(t *testing.T) {
|
||||
@ -633,13 +640,22 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
cli := clus.Client(0)
|
||||
clus.Members[0].Stop(t)
|
||||
// this Put fails and triggers an asynchronous connection retry
|
||||
_, err := clus.Client(0).Put(context.TODO(), "abc", "123")
|
||||
_, err := cli.Put(context.TODO(), "abc", "123")
|
||||
if err == nil ||
|
||||
(!strings.Contains(err.Error(), "connection is closing") &&
|
||||
!strings.Contains(err.Error(), "transport is closing")) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// cluster will terminate and close the client with the retry in-flight
|
||||
|
||||
// wait some so the client closes with the retry in-flight
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// get the timeout
|
||||
clus.TakeClient(0)
|
||||
if err := cli.Close(); err == nil || !strings.Contains(err.Error(), "timed out") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ func TestTxnWriteFail(t *testing.T) {
|
||||
|
||||
dialTimeout := 5 * time.Second
|
||||
select {
|
||||
case <-time.After(2*dialTimeout + time.Second):
|
||||
case <-time.After(dialTimeout + time.Second):
|
||||
t.Fatalf("timed out waiting for txn to fail")
|
||||
case <-donec:
|
||||
// don't restart cluster until txn errors out
|
||||
|
@ -88,9 +88,11 @@ func (r *remoteClient) acquire(ctx context.Context) error {
|
||||
r.client.mu.RLock()
|
||||
closed := r.client.cancel == nil
|
||||
c := r.client.conn
|
||||
lastConnErr := r.client.lastConnErr
|
||||
match := r.conn == c
|
||||
r.mu.Unlock()
|
||||
if c != nil && match {
|
||||
if lastConnErr == nil && match {
|
||||
// new connection already
|
||||
return nil
|
||||
}
|
||||
r.client.mu.RUnlock()
|
||||
|
Loading…
x
Reference in New Issue
Block a user