mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: fix Close after failed Put
Was crashing on a nil connection. Reworked the shutdown path a little so there's only one connection close site.
This commit is contained in:
parent
5cbd8cefc9
commit
527aa1a499
@ -89,19 +89,23 @@ func NewFromConfigFile(path string) (*Client, error) {
|
||||
// Close shuts down the client's etcd connections.
|
||||
func (c *Client) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.cancel == nil {
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
c.cancel()
|
||||
c.cancel = nil
|
||||
err := c.conn.Close()
|
||||
connc := c.newconnc
|
||||
c.mu.Unlock()
|
||||
c.connStartRetry(nil)
|
||||
c.Watcher.Close()
|
||||
c.Lease.Close()
|
||||
<-connc
|
||||
return err
|
||||
c.mu.Lock()
|
||||
if c.lastConnErr != c.ctx.Err() {
|
||||
return c.lastConnErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ctx is a context for "out of band" messages (e.g., for sending
|
||||
@ -213,16 +217,17 @@ func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr e
|
||||
if err != nil {
|
||||
c.errors = append(c.errors, err)
|
||||
}
|
||||
if c.cancel == nil {
|
||||
return nil, c.ctx.Err()
|
||||
}
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
if st, _ := c.conn.State(); st != grpc.Shutdown {
|
||||
// wait so grpc doesn't leak sleeping goroutines
|
||||
c.conn.WaitForStateChange(c.ctx, st)
|
||||
c.conn.WaitForStateChange(context.Background(), st)
|
||||
}
|
||||
}
|
||||
if c.cancel == nil {
|
||||
// client has called Close() so don't try to dial out
|
||||
return nil, c.ctx.Err()
|
||||
}
|
||||
|
||||
c.conn, dialErr = c.cfg.RetryDialer(c)
|
||||
if dialErr != nil {
|
||||
@ -262,8 +267,9 @@ func (c *Client) connMonitor() {
|
||||
select {
|
||||
case err = <-c.reconnc:
|
||||
case <-c.ctx.Done():
|
||||
_, err = c.retryConnection(c.ctx.Err())
|
||||
c.mu.Lock()
|
||||
c.lastConnErr = c.ctx.Err()
|
||||
c.lastConnErr = err
|
||||
close(c.newconnc)
|
||||
c.mu.Unlock()
|
||||
return
|
||||
|
@ -547,3 +547,18 @@ func TestKVGetCancel(t *testing.T) {
|
||||
t.Fatalf("cancel on get broke client connection")
|
||||
}
|
||||
}
|
||||
|
||||
// TestKVPutStoppedServerAndClose ensures closing after a failed Put works.
|
||||
func TestKVPutStoppedServerAndClose(t *testing.T) {
|
||||
t.Parallel()
|
||||
defer testutil.AfterTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
clus.Members[0].Stop(t)
|
||||
// this Put fails and triggers an asynchronous connection retry
|
||||
_, err := clus.Client(0).Put(context.TODO(), "abc", "123")
|
||||
if err == nil || !strings.Contains(err.Error(), "connection is closing") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// cluster will terminate and close the client with the retry in-flight
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user