mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientV3: simplify grpc dialer usage. Remove workaround #11184 after bumping grpc to 1.26.0.
Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
parent
d152339b5b
commit
03f79003d4
@ -174,7 +174,8 @@ type Resolver struct {
|
|||||||
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
|
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
|
||||||
addrs = make([]resolver.Address, 0, len(eps))
|
addrs = make([]resolver.Address, 0, len(eps))
|
||||||
for _, ep := range eps {
|
for _, ep := range eps {
|
||||||
addrs = append(addrs, resolver.Address{Addr: ep})
|
_, host, _ := ParseEndpoint(ep)
|
||||||
|
addrs = append(addrs, resolver.Address{Addr: ep, ServerName: host})
|
||||||
}
|
}
|
||||||
return addrs
|
return addrs
|
||||||
}
|
}
|
||||||
|
@ -253,10 +253,6 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
|
|||||||
dialer := endpoint.Dialer
|
dialer := endpoint.Dialer
|
||||||
if creds != nil {
|
if creds != nil {
|
||||||
opts = append(opts, grpc.WithTransportCredentials(creds))
|
opts = append(opts, grpc.WithTransportCredentials(creds))
|
||||||
// gRPC load balancer workaround. See credentials.transportCredential for details.
|
|
||||||
if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok {
|
|
||||||
dialer = credsDialer.Dialer
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
opts = append(opts, grpc.WithInsecure())
|
opts = append(opts, grpc.WithInsecure())
|
||||||
}
|
}
|
||||||
@ -651,9 +647,3 @@ func IsConnCanceled(err error) bool {
|
|||||||
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
|
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
|
||||||
return strings.Contains(err.Error(), "grpc: the client connection is closing")
|
return strings.Contains(err.Error(), "grpc: the client connection is closing")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details.
|
|
||||||
type TransportCredentialsWithDialer interface {
|
|
||||||
grpccredentials.TransportCredentials
|
|
||||||
Dialer(ctx context.Context, dialEp string) (net.Conn, error)
|
|
||||||
}
|
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
|
|
||||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
grpccredentials "google.golang.org/grpc/credentials"
|
grpccredentials "google.golang.org/grpc/credentials"
|
||||||
)
|
)
|
||||||
@ -66,46 +65,20 @@ func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// transportCredential implements "grpccredentials.TransportCredentials" interface.
|
// transportCredential implements "grpccredentials.TransportCredentials" interface.
|
||||||
// transportCredential wraps TransportCredentials to track which
|
|
||||||
// addresses are dialed for which endpoints, and then sets the authority when checking the endpoint's cert to the
|
|
||||||
// hostname or IP of the dialed endpoint.
|
|
||||||
// This is a workaround of a gRPC load balancer issue. gRPC uses the dialed target's service name as the authority when
|
|
||||||
// checking all endpoint certs, which does not work for etcd servers using their hostname or IP as the Subject Alternative Name
|
|
||||||
// in their TLS certs.
|
|
||||||
// To enable, include both WithTransportCredentials(creds) and WithContextDialer(creds.Dialer)
|
|
||||||
// when dialing.
|
|
||||||
type transportCredential struct {
|
type transportCredential struct {
|
||||||
gtc grpccredentials.TransportCredentials
|
gtc grpccredentials.TransportCredentials
|
||||||
mu sync.Mutex
|
|
||||||
// addrToEndpoint maps from the connection addresses that are dialed to the hostname or IP of the
|
|
||||||
// endpoint provided to the dialer when dialing
|
|
||||||
addrToEndpoint map[string]string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTransportCredential(cfg *tls.Config) *transportCredential {
|
func newTransportCredential(cfg *tls.Config) *transportCredential {
|
||||||
return &transportCredential{
|
return &transportCredential{
|
||||||
gtc: grpccredentials.NewTLS(cfg),
|
gtc: grpccredentials.NewTLS(cfg),
|
||||||
addrToEndpoint: map[string]string{},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
|
func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
|
||||||
// Set the authority when checking the endpoint's cert to the hostname or IP of the dialed endpoint
|
|
||||||
tc.mu.Lock()
|
|
||||||
dialEp, ok := tc.addrToEndpoint[rawConn.RemoteAddr().String()]
|
|
||||||
tc.mu.Unlock()
|
|
||||||
if ok {
|
|
||||||
_, host, _ := endpoint.ParseEndpoint(dialEp)
|
|
||||||
authority = host
|
|
||||||
}
|
|
||||||
return tc.gtc.ClientHandshake(ctx, authority, rawConn)
|
return tc.gtc.ClientHandshake(ctx, authority, rawConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// return true if given string is an IP.
|
|
||||||
func isIP(ep string) bool {
|
|
||||||
return net.ParseIP(ep) != nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tc *transportCredential) ServerHandshake(rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
|
func (tc *transportCredential) ServerHandshake(rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
|
||||||
return tc.gtc.ServerHandshake(rawConn)
|
return tc.gtc.ServerHandshake(rawConn)
|
||||||
}
|
}
|
||||||
@ -115,15 +88,8 @@ func (tc *transportCredential) Info() grpccredentials.ProtocolInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tc *transportCredential) Clone() grpccredentials.TransportCredentials {
|
func (tc *transportCredential) Clone() grpccredentials.TransportCredentials {
|
||||||
copy := map[string]string{}
|
|
||||||
tc.mu.Lock()
|
|
||||||
for k, v := range tc.addrToEndpoint {
|
|
||||||
copy[k] = v
|
|
||||||
}
|
|
||||||
tc.mu.Unlock()
|
|
||||||
return &transportCredential{
|
return &transportCredential{
|
||||||
gtc: tc.gtc.Clone(),
|
gtc: tc.gtc.Clone(),
|
||||||
addrToEndpoint: copy,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,17 +97,6 @@ func (tc *transportCredential) OverrideServerName(serverNameOverride string) err
|
|||||||
return tc.gtc.OverrideServerName(serverNameOverride)
|
return tc.gtc.OverrideServerName(serverNameOverride)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *transportCredential) Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
|
|
||||||
// Keep track of which addresses are dialed for which endpoints
|
|
||||||
conn, err := endpoint.Dialer(ctx, dialEp)
|
|
||||||
if conn != nil {
|
|
||||||
tc.mu.Lock()
|
|
||||||
tc.addrToEndpoint[conn.RemoteAddr().String()] = dialEp
|
|
||||||
tc.mu.Unlock()
|
|
||||||
}
|
|
||||||
return conn, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// perRPCCredential implements "grpccredentials.PerRPCCredentials" interface.
|
// perRPCCredential implements "grpccredentials.PerRPCCredentials" interface.
|
||||||
type perRPCCredential struct {
|
type perRPCCredential struct {
|
||||||
authToken string
|
authToken string
|
||||||
|
Loading…
x
Reference in New Issue
Block a user