mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #16857 from chaochn47/release-3.4-atomic-endpoints
[3.4] backport client: call .Endpoints() in dial() in client/v3/client.go instead of accessing cfg.Endpoints directly
This commit is contained in:
commit
1eb276c33d
@ -56,7 +56,9 @@ type Client struct {
|
|||||||
cfg Config
|
cfg Config
|
||||||
creds grpccredentials.TransportCredentials
|
creds grpccredentials.TransportCredentials
|
||||||
resolver *resolver.EtcdManualResolver
|
resolver *resolver.EtcdManualResolver
|
||||||
mu *sync.RWMutex
|
|
||||||
|
epMu *sync.RWMutex
|
||||||
|
endpoints []string
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@ -140,18 +142,18 @@ func (c *Client) Ctx() context.Context { return c.ctx }
|
|||||||
// Endpoints lists the registered endpoints for the client.
|
// Endpoints lists the registered endpoints for the client.
|
||||||
func (c *Client) Endpoints() []string {
|
func (c *Client) Endpoints() []string {
|
||||||
// copy the slice; protect original endpoints from being changed
|
// copy the slice; protect original endpoints from being changed
|
||||||
c.mu.RLock()
|
c.epMu.RLock()
|
||||||
defer c.mu.RUnlock()
|
defer c.epMu.RUnlock()
|
||||||
eps := make([]string, len(c.cfg.Endpoints))
|
eps := make([]string, len(c.endpoints))
|
||||||
copy(eps, c.cfg.Endpoints)
|
copy(eps, c.endpoints)
|
||||||
return eps
|
return eps
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetEndpoints updates client's endpoints.
|
// SetEndpoints updates client's endpoints.
|
||||||
func (c *Client) SetEndpoints(eps ...string) {
|
func (c *Client) SetEndpoints(eps ...string) {
|
||||||
c.mu.Lock()
|
c.epMu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.epMu.Unlock()
|
||||||
c.cfg.Endpoints = eps
|
c.endpoints = eps
|
||||||
|
|
||||||
c.resolver.SetEndpoints(eps)
|
c.resolver.SetEndpoints(eps)
|
||||||
}
|
}
|
||||||
@ -279,7 +281,7 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
|
|||||||
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
|
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
|
||||||
}
|
}
|
||||||
|
|
||||||
initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
|
initialEndpoints := strings.Join(c.Endpoints(), ";")
|
||||||
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
|
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
|
||||||
conn, err := grpc.DialContext(dctx, target, opts...)
|
conn, err := grpc.DialContext(dctx, target, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -327,7 +329,7 @@ func newClient(cfg *Config) (*Client, error) {
|
|||||||
creds: creds,
|
creds: creds,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
mu: new(sync.RWMutex),
|
epMu: new(sync.RWMutex),
|
||||||
callOpts: defaultCallOpts,
|
callOpts: defaultCallOpts,
|
||||||
lgMu: new(sync.RWMutex),
|
lgMu: new(sync.RWMutex),
|
||||||
}
|
}
|
||||||
@ -369,6 +371,7 @@ func newClient(cfg *Config) (*Client, error) {
|
|||||||
if len(cfg.Endpoints) < 1 {
|
if len(cfg.Endpoints) < 1 {
|
||||||
return nil, fmt.Errorf("at least one Endpoint must is required in client config")
|
return nil, fmt.Errorf("at least one Endpoint must is required in client config")
|
||||||
}
|
}
|
||||||
|
client.SetEndpoints(cfg.Endpoints...)
|
||||||
|
|
||||||
// Use a provided endpoint target so that for https:// without any tls config given, then
|
// Use a provided endpoint target so that for https:// without any tls config given, then
|
||||||
// grpc will assume the certificate server name is the endpoint host.
|
// grpc will assume the certificate server name is the endpoint host.
|
||||||
|
@ -237,14 +237,12 @@ func TestClientRejectOldCluster(t *testing.T) {
|
|||||||
endpointToVersion[tt.endpoints[j]] = tt.versions[j]
|
endpointToVersion[tt.endpoints[j]] = tt.versions[j]
|
||||||
}
|
}
|
||||||
c := &Client{
|
c := &Client{
|
||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
cfg: Config{
|
endpoints: tt.endpoints,
|
||||||
Endpoints: tt.endpoints,
|
epMu: new(sync.RWMutex),
|
||||||
},
|
|
||||||
Maintenance: &mockMaintenance{
|
Maintenance: &mockMaintenance{
|
||||||
Version: endpointToVersion,
|
Version: endpointToVersion,
|
||||||
},
|
},
|
||||||
mu: new(sync.RWMutex),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.checkVersion(); err != tt.expectedError {
|
if err := c.checkVersion(); err != tt.expectedError {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user