mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12706 from ptabor/20210218
clientv3: PS: Replace balancer with upstream grpc solution
This commit is contained in:
commit
57dcb037c0
@ -9,7 +9,7 @@ The minimum recommended etcd versions to run in **production** are 3.2.28+, 3.3.
|
|||||||
<hr>
|
<hr>
|
||||||
|
|
||||||
|
|
||||||
## v3.5.0 (2020 TBD)
|
## v3.5.0 (2021 TBD)
|
||||||
|
|
||||||
See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and [v3.5 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_5.md) for any breaking changes.
|
See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and [v3.5 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_5.md) for any breaking changes.
|
||||||
|
|
||||||
@ -63,6 +63,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
|
|||||||
- Changed `pkg/flags` function signature to [support structured logger](https://github.com/etcd-io/etcd/pull/11616).
|
- Changed `pkg/flags` function signature to [support structured logger](https://github.com/etcd-io/etcd/pull/11616).
|
||||||
- Previously, `SetFlagsFromEnv(prefix string, fs *flag.FlagSet) error`, now `SetFlagsFromEnv(lg *zap.Logger, prefix string, fs *flag.FlagSet) error`.
|
- Previously, `SetFlagsFromEnv(prefix string, fs *flag.FlagSet) error`, now `SetFlagsFromEnv(lg *zap.Logger, prefix string, fs *flag.FlagSet) error`.
|
||||||
- Previously, `SetPflagsFromEnv(prefix string, fs *pflag.FlagSet) error`, now `SetPflagsFromEnv(lg *zap.Logger, prefix string, fs *pflag.FlagSet) error`.
|
- Previously, `SetPflagsFromEnv(prefix string, fs *pflag.FlagSet) error`, now `SetPflagsFromEnv(lg *zap.Logger, prefix string, fs *pflag.FlagSet) error`.
|
||||||
|
- ClientV3 supports [grpc resolver API](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/resolver/resolver.go).
|
||||||
|
- Endpoints can be managed using [endpoints.Manager](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/endpoints/endpoints.go)
|
||||||
|
- Previously supported [GRPCResolver was decomissioned](https://github.com/etcd-io/etcd/pull/12675). Use [resolver](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/resolver/resolver.go) instead.
|
||||||
|
|
||||||
|
|
||||||
### `etcdctl`
|
### `etcdctl`
|
||||||
|
|
||||||
@ -174,6 +178,8 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
|
|||||||
- Make sure [save snapshot downloads checksum for integrity checks](https://github.com/etcd-io/etcd/pull/11896).
|
- Make sure [save snapshot downloads checksum for integrity checks](https://github.com/etcd-io/etcd/pull/11896).
|
||||||
- Fix [auth token invalid after watch reconnects](https://github.com/etcd-io/etcd/pull/12264). Get AuthToken automatically when clientConn is ready.
|
- Fix [auth token invalid after watch reconnects](https://github.com/etcd-io/etcd/pull/12264). Get AuthToken automatically when clientConn is ready.
|
||||||
- Improve [clientv3:get AuthToken gracefully without extra connection](https://github.com/etcd-io/etcd/pull/12165).
|
- Improve [clientv3:get AuthToken gracefully without extra connection](https://github.com/etcd-io/etcd/pull/12165).
|
||||||
|
- Changed [clientv3 dialing code](https://github.com/etcd-io/etcd/pull/12671) to use grpc resolver API instead of custom balancer.
|
||||||
|
- Endpoints self identify now as `etcd-endpoints://{id}/#initially={list of endpoints}` e.g. `etcd-endpoints://0xc0009d8540/#initially=[localhost:2079]`
|
||||||
|
|
||||||
### Package `lease`
|
### Package `lease`
|
||||||
|
|
||||||
|
8
Makefile
8
Makefile
@ -151,6 +151,14 @@ test:
|
|||||||
$(TEST_OPTS) ./test.sh 2>&1 | tee test-$(TEST_SUFFIX).log
|
$(TEST_OPTS) ./test.sh 2>&1 | tee test-$(TEST_SUFFIX).log
|
||||||
! egrep "(--- FAIL:|DATA RACE|panic: test timed out|appears to have leaked)" -B50 -A10 test-$(TEST_SUFFIX).log
|
! egrep "(--- FAIL:|DATA RACE|panic: test timed out|appears to have leaked)" -B50 -A10 test-$(TEST_SUFFIX).log
|
||||||
|
|
||||||
|
test-small:
|
||||||
|
$(info log-file: test-$(TEST_SUFFIX).log)
|
||||||
|
PASSES="fmt build unit" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
|
||||||
|
|
||||||
|
test-full:
|
||||||
|
$(info log-file: test-$(TEST_SUFFIX).log)
|
||||||
|
PASSES="fmt build unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
|
||||||
|
|
||||||
docker-test:
|
docker-test:
|
||||||
$(info GO_VERSION: $(GO_VERSION))
|
$(info GO_VERSION: $(GO_VERSION))
|
||||||
$(info ETCD_VERSION: $(ETCD_VERSION))
|
$(info ETCD_VERSION: $(ETCD_VERSION))
|
||||||
|
@ -206,12 +206,10 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
|
|||||||
} else {
|
} else {
|
||||||
opts = append(opts, grpc.WithInsecure())
|
opts = append(opts, grpc.WithInsecure())
|
||||||
}
|
}
|
||||||
grpc.WithDisableRetry()
|
|
||||||
|
|
||||||
// Interceptor retry and backoff.
|
// Interceptor retry and backoff.
|
||||||
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
|
// TODO: Replace all of clientv3/retry.go with RetryPolicy:
|
||||||
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
|
// https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
|
||||||
// once it is available.
|
|
||||||
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
|
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
|
||||||
opts = append(opts,
|
opts = append(opts,
|
||||||
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
|
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
|
||||||
@ -252,8 +250,8 @@ func (c *Client) getToken(ctx context.Context) error {
|
|||||||
|
|
||||||
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
|
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
|
||||||
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
|
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
|
||||||
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||||
creds := c.credentialsForEndpoint(ep)
|
creds := c.credentialsForEndpoint(c.Endpoints()[0])
|
||||||
opts := append(dopts, grpc.WithResolvers(c.resolver))
|
opts := append(dopts, grpc.WithResolvers(c.resolver))
|
||||||
return c.dial(creds, opts...)
|
return c.dial(creds, opts...)
|
||||||
}
|
}
|
||||||
@ -278,7 +276,9 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
|
|||||||
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
|
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := grpc.DialContext(dctx, c.resolver.Scheme()+":///", opts...)
|
initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
|
||||||
|
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
|
||||||
|
conn, err := grpc.DialContext(dctx, target, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -286,13 +286,20 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
|
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
|
||||||
if c.creds != nil {
|
r := endpoint.RequiresCredentials(ep)
|
||||||
|
switch r {
|
||||||
|
case endpoint.CREDS_DROP:
|
||||||
|
return nil
|
||||||
|
case endpoint.CREDS_OPTIONAL:
|
||||||
return c.creds
|
return c.creds
|
||||||
}
|
case endpoint.CREDS_REQUIRE:
|
||||||
if endpoint.RequiresCredentials(ep) {
|
if c.creds != nil {
|
||||||
|
return c.creds
|
||||||
|
}
|
||||||
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
|
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
|
||||||
|
default:
|
||||||
|
panic(fmt.Errorf("Unsupported CredsRequirement: %v", r))
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(cfg *Config) (*Client, error) {
|
func newClient(cfg *Config) (*Client, error) {
|
||||||
@ -360,18 +367,15 @@ func newClient(cfg *Config) (*Client, error) {
|
|||||||
client.cancel()
|
client.cancel()
|
||||||
return nil, fmt.Errorf("at least one Endpoint is required in client config")
|
return nil, fmt.Errorf("at least one Endpoint is required in client config")
|
||||||
}
|
}
|
||||||
dialEndpoint := cfg.Endpoints[0]
|
|
||||||
|
|
||||||
// Use a provided endpoint target so that for https:// without any tls config given, then
|
// Use a provided endpoint target so that for https:// without any tls config given, then
|
||||||
// grpc will assume the certificate server name is the endpoint host.
|
// grpc will assume the certificate server name is the endpoint host.
|
||||||
conn, err := client.dialWithBalancer(dialEndpoint)
|
conn, err := client.dialWithBalancer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
client.cancel()
|
client.cancel()
|
||||||
client.resolver.Close()
|
client.resolver.Close()
|
||||||
|
// TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot.
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// TODO: With the old grpc balancer interface, we waited until the dial timeout
|
|
||||||
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
|
|
||||||
client.conn = conn
|
client.conn = conn
|
||||||
|
|
||||||
client.Cluster = NewCluster(client)
|
client.Cluster = NewCluster(client)
|
||||||
@ -390,6 +394,7 @@ func newClient(cfg *Config) (*Client, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
client.Close()
|
client.Close()
|
||||||
cancel()
|
cancel()
|
||||||
|
//TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cancel()
|
cancel()
|
||||||
|
@ -82,6 +82,8 @@ func TestDialCancel(t *testing.T) {
|
|||||||
func TestDialTimeout(t *testing.T) {
|
func TestDialTimeout(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
wantError := context.DeadlineExceeded
|
||||||
|
|
||||||
// grpc.WithBlock to block until connection up or timeout
|
// grpc.WithBlock to block until connection up or timeout
|
||||||
testCfgs := []Config{
|
testCfgs := []Config{
|
||||||
{
|
{
|
||||||
@ -121,8 +123,8 @@ func TestDialTimeout(t *testing.T) {
|
|||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
t.Errorf("#%d: failed to timeout dial on time", i)
|
t.Errorf("#%d: failed to timeout dial on time", i)
|
||||||
case err := <-donec:
|
case err := <-donec:
|
||||||
if err != context.DeadlineExceeded {
|
if err.Error() != wantError.Error() {
|
||||||
t.Errorf("#%d: unexpected error %v, want %v", i, err, context.DeadlineExceeded)
|
t.Errorf("#%d: unexpected error '%v', want '%v'", i, err, wantError)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,45 +15,114 @@
|
|||||||
package endpoint
|
package endpoint
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"regexp"
|
"path"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
type CredsRequirement int
|
||||||
STRIP_PORT_REGEXP = regexp.MustCompile("(.*):([0-9]+)")
|
|
||||||
|
const (
|
||||||
|
// CREDS_REQUIRE - Credentials/certificate required for thi type of connection.
|
||||||
|
CREDS_REQUIRE CredsRequirement = iota
|
||||||
|
// CREDS_DROP - Credentials/certificate not needed and should get ignored.
|
||||||
|
CREDS_DROP
|
||||||
|
// CREDS_OPTIONAL - Credentials/certificate might be used if supplied
|
||||||
|
CREDS_OPTIONAL
|
||||||
)
|
)
|
||||||
|
|
||||||
func stripPort(ep string) string {
|
func extractHostFromHostPort(ep string) string {
|
||||||
return STRIP_PORT_REGEXP.ReplaceAllString(ep, "$1")
|
host, _, err := net.SplitHostPort(ep)
|
||||||
|
if err != nil {
|
||||||
|
return ep
|
||||||
|
}
|
||||||
|
return host
|
||||||
}
|
}
|
||||||
|
|
||||||
func translateEndpoint(ep string) (addr string, serverName string, requireCreds bool) {
|
func extractHostFromPath(pathStr string) string {
|
||||||
url, err := url.Parse(ep)
|
return extractHostFromHostPort(path.Base(pathStr))
|
||||||
if err != nil {
|
}
|
||||||
return ep, stripPort(ep), false
|
|
||||||
|
//mustSplit2 returns the values from strings.SplitN(s, sep, 2).
|
||||||
|
//If sep is not found, it returns ("", "", false) instead.
|
||||||
|
func mustSplit2(s, sep string) (string, string) {
|
||||||
|
spl := strings.SplitN(s, sep, 2)
|
||||||
|
if len(spl) < 2 {
|
||||||
|
panic(fmt.Errorf("Token '%v' expected to have separator sep: `%v`", s, sep))
|
||||||
}
|
}
|
||||||
switch url.Scheme {
|
return spl[0], spl[1]
|
||||||
case "http", "https":
|
}
|
||||||
return url.Host, url.Hostname(), url.Scheme == "https"
|
|
||||||
case "unix", "unixs":
|
func schemeToCredsRequirement(schema string) CredsRequirement {
|
||||||
requireCreds = url.Scheme == "unixs"
|
switch schema {
|
||||||
if url.Opaque != "" {
|
case "https", "unixs":
|
||||||
return "unix:" + url.Opaque, stripPort(url.Opaque), requireCreds
|
return CREDS_REQUIRE
|
||||||
} else if url.Path != "" {
|
case "http":
|
||||||
return "unix://" + url.Host + url.Path, url.Host + url.Path, requireCreds
|
return CREDS_DROP
|
||||||
} else {
|
case "unix":
|
||||||
return "unix:" + url.Host, url.Hostname(), requireCreds
|
// Preserving previous behavior from:
|
||||||
}
|
// https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212
|
||||||
|
// that likely was a bug due to missing 'fallthrough'.
|
||||||
|
// At the same time it seems legit to let the users decide whether they
|
||||||
|
// want credential control or not (and 'unixs' schema is not a standard thing).
|
||||||
|
return CREDS_OPTIONAL
|
||||||
case "":
|
case "":
|
||||||
return url.Host + url.Path, url.Host + url.Path, false
|
return CREDS_OPTIONAL
|
||||||
default:
|
default:
|
||||||
return ep, stripPort(ep), false
|
return CREDS_OPTIONAL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This function translates endpoints names supported by etcd server into
|
||||||
|
// endpoints as supported by grpc with additional information
|
||||||
|
// (server_name for cert validation, requireCreds - whether certs are needed).
|
||||||
|
// The main differences:
|
||||||
|
// - etcd supports unixs & https names as opposed to unix & http to
|
||||||
|
// distinguish need to configure certificates.
|
||||||
|
// - etcd support http(s) names as opposed to tcp supported by grpc/dial method.
|
||||||
|
// - etcd supports unix(s)://local-file naming schema
|
||||||
|
// (as opposed to unix:local-file canonical name used by grpc for current dir files).
|
||||||
|
// - Within the unix(s) schemas, the last segment (filename) without 'port' (content after colon)
|
||||||
|
// is considered serverName - to allow local testing of cert-protected communication.
|
||||||
|
// See more:
|
||||||
|
// - https://github.com/grpc/grpc-go/blob/26c143bd5f59344a4b8a1e491e0f5e18aa97abc7/internal/grpcutil/target.go#L47
|
||||||
|
// - https://golang.org/pkg/net/#Dial
|
||||||
|
// - https://github.com/grpc/grpc/blob/master/doc/naming.md
|
||||||
|
func translateEndpoint(ep string) (addr string, serverName string, requireCreds CredsRequirement) {
|
||||||
|
if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") {
|
||||||
|
if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") {
|
||||||
|
// absolute path case
|
||||||
|
schema, absolutePath := mustSplit2(ep, "://")
|
||||||
|
return "unix://" + absolutePath, extractHostFromPath(absolutePath), schemeToCredsRequirement(schema)
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") {
|
||||||
|
// legacy etcd local path
|
||||||
|
schema, localPath := mustSplit2(ep, "://")
|
||||||
|
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
|
||||||
|
}
|
||||||
|
schema, localPath := mustSplit2(ep, ":")
|
||||||
|
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(ep, "://") {
|
||||||
|
url, err := url.Parse(ep)
|
||||||
|
if err != nil {
|
||||||
|
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
|
||||||
|
}
|
||||||
|
if url.Scheme == "http" || url.Scheme == "https" {
|
||||||
|
return url.Host, url.Hostname(), schemeToCredsRequirement(url.Scheme)
|
||||||
|
}
|
||||||
|
return ep, url.Hostname(), schemeToCredsRequirement(url.Scheme)
|
||||||
|
}
|
||||||
|
// Handles plain addresses like 10.0.0.44:437.
|
||||||
|
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
|
||||||
|
}
|
||||||
|
|
||||||
// RequiresCredentials returns whether given endpoint requires
|
// RequiresCredentials returns whether given endpoint requires
|
||||||
// credentials/certificates for connection.
|
// credentials/certificates for connection.
|
||||||
func RequiresCredentials(ep string) bool {
|
func RequiresCredentials(ep string) CredsRequirement {
|
||||||
_, _, requireCreds := translateEndpoint(ep)
|
_, _, requireCreds := translateEndpoint(ep)
|
||||||
return requireCreds
|
return requireCreds
|
||||||
}
|
}
|
||||||
|
@ -18,41 +18,48 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestInterpret(t *testing.T) {
|
func Test_interpret(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
endpoint string
|
endpoint string
|
||||||
wantAddress string
|
wantAddress string
|
||||||
wantServerName string
|
wantServerName string
|
||||||
|
wantRequiresCreds CredsRequirement
|
||||||
}{
|
}{
|
||||||
{"127.0.0.1", "127.0.0.1", "127.0.0.1"},
|
{"127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
|
||||||
{"localhost", "localhost", "localhost"},
|
{"localhost", "localhost", "localhost", CREDS_OPTIONAL},
|
||||||
{"localhost:8080", "localhost:8080", "localhost"},
|
{"localhost:8080", "localhost:8080", "localhost", CREDS_OPTIONAL},
|
||||||
|
|
||||||
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
|
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
|
||||||
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
|
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL},
|
||||||
|
|
||||||
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
|
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
|
||||||
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
|
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL},
|
||||||
|
|
||||||
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
|
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
|
||||||
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
|
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
|
||||||
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
|
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
|
||||||
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
|
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
|
||||||
|
|
||||||
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1"},
|
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_DROP},
|
||||||
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
|
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_DROP},
|
||||||
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1"},
|
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
|
||||||
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
|
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
|
||||||
{"https://localhost:20000", "localhost:20000", "localhost"},
|
{"https://localhost:20000", "localhost:20000", "localhost", CREDS_REQUIRE},
|
||||||
|
|
||||||
{"unix:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
|
{"unix:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_OPTIONAL},
|
||||||
{"unixs:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
|
{"unixs:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_REQUIRE},
|
||||||
{"etcd.io", "etcd.io", "etcd.io"},
|
{"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_OPTIONAL},
|
||||||
{"http://etcd.io/abc", "etcd.io", "etcd.io"},
|
{"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_REQUIRE},
|
||||||
{"dns://something-other", "dns://something-other", "dns://something-other"},
|
{"etcd.io", "etcd.io", "etcd.io", CREDS_OPTIONAL},
|
||||||
|
{"http://etcd.io/abc", "etcd.io", "etcd.io", CREDS_DROP},
|
||||||
|
{"dns://something-other", "dns://something-other", "something-other", CREDS_OPTIONAL},
|
||||||
|
|
||||||
|
{"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_DROP},
|
||||||
|
{"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_OPTIONAL},
|
||||||
|
{"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", CREDS_OPTIONAL},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.endpoint, func(t *testing.T) {
|
t.Run("Interpret_"+tt.endpoint, func(t *testing.T) {
|
||||||
gotAddress, gotServerName := Interpret(tt.endpoint)
|
gotAddress, gotServerName := Interpret(tt.endpoint)
|
||||||
if gotAddress != tt.wantAddress {
|
if gotAddress != tt.wantAddress {
|
||||||
t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress)
|
t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress)
|
||||||
@ -61,5 +68,32 @@ func TestInterpret(t *testing.T) {
|
|||||||
t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName)
|
t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
t.Run("RequiresCredentials_"+tt.endpoint, func(t *testing.T) {
|
||||||
|
requiresCreds := RequiresCredentials(tt.endpoint)
|
||||||
|
if requiresCreds != tt.wantRequiresCreds {
|
||||||
|
t.Errorf("RequiresCredentials() got = %v, want %v", requiresCreds, tt.wantRequiresCreds)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_extractHostFromHostPort(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
ep string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{ep: "localhost", want: "localhost"},
|
||||||
|
{ep: "localhost:8080", want: "localhost"},
|
||||||
|
{ep: "192.158.7.14:8080", want: "192.158.7.14"},
|
||||||
|
{ep: "192.158.7.14:8080", want: "192.158.7.14"},
|
||||||
|
{ep: "[2001:db8:1f70::999:de8:7648:6e8]", want: "[2001:db8:1f70::999:de8:7648:6e8]"},
|
||||||
|
{ep: "[2001:db8:1f70::999:de8:7648:6e8]:100", want: "2001:db8:1f70::999:de8:7648:6e8"},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.ep, func(t *testing.T) {
|
||||||
|
if got := extractHostFromHostPort(tt.ep); got != tt.want {
|
||||||
|
t.Errorf("extractHostFromHostPort() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,10 @@ import (
|
|||||||
"google.golang.org/grpc/serviceconfig"
|
"google.golang.org/grpc/serviceconfig"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
Schema = "etcd-endpoints"
|
||||||
|
)
|
||||||
|
|
||||||
// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated
|
// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated
|
||||||
// using SetEndpoints.
|
// using SetEndpoints.
|
||||||
type EtcdManualResolver struct {
|
type EtcdManualResolver struct {
|
||||||
@ -30,7 +34,7 @@ type EtcdManualResolver struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func New(endpoints ...string) *EtcdManualResolver {
|
func New(endpoints ...string) *EtcdManualResolver {
|
||||||
r := manual.NewBuilderWithScheme("etcd-endpoints")
|
r := manual.NewBuilderWithScheme(Schema)
|
||||||
return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil}
|
return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,7 +208,13 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
client := mustNewClient(lg)
|
client := mustNewClient(lg)
|
||||||
proxyClient := mustNewProxyClient(lg, tlsinfo)
|
|
||||||
|
// The proxy client is used for self-healthchecking.
|
||||||
|
// TODO: The mechanism should be refactored to use internal connection.
|
||||||
|
var proxyClient *clientv3.Client
|
||||||
|
if grpcProxyAdvertiseClientURL != "" {
|
||||||
|
proxyClient = mustNewProxyClient(lg, tlsinfo)
|
||||||
|
}
|
||||||
httpClient := mustNewHTTPClient(lg)
|
httpClient := mustNewHTTPClient(lg)
|
||||||
|
|
||||||
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
|
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
|
||||||
|
@ -56,6 +56,9 @@ func checkHealth(c *clientv3.Client) etcdhttp.Health {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func checkProxyHealth(c *clientv3.Client) etcdhttp.Health {
|
func checkProxyHealth(c *clientv3.Client) etcdhttp.Health {
|
||||||
|
if c == nil {
|
||||||
|
return etcdhttp.Health{Health: "false", Reason: "no connection to proxy"}
|
||||||
|
}
|
||||||
h := checkHealth(c)
|
h := checkHealth(c)
|
||||||
if h.Health != "true" {
|
if h.Health != "true" {
|
||||||
return h
|
return h
|
||||||
|
@ -728,6 +728,7 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
|
|||||||
// before, during, and after quorum loss to confirm Grant/KeepAlive tolerates
|
// before, during, and after quorum loss to confirm Grant/KeepAlive tolerates
|
||||||
// transient cluster failure.
|
// transient cluster failure.
|
||||||
func TestV3LeaseFailureOverlap(t *testing.T) {
|
func TestV3LeaseFailureOverlap(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
// RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
|
// RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
|
||||||
RequestWaitTimeout = 3 * time.Second
|
RequestWaitTimeout = 5 * time.Second
|
||||||
tickDuration = 10 * time.Millisecond
|
tickDuration = 10 * time.Millisecond
|
||||||
requestTimeout = 20 * time.Second
|
requestTimeout = 20 * time.Second
|
||||||
|
|
||||||
@ -1257,6 +1257,7 @@ type ClusterV3 struct {
|
|||||||
// NewClusterV3 returns a launched cluster with a grpc client connection
|
// NewClusterV3 returns a launched cluster with a grpc client connection
|
||||||
// for each cluster member.
|
// for each cluster member.
|
||||||
func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
|
func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
|
||||||
|
// t might be nil in case of Examples and clusters created per test-suite.
|
||||||
if t != nil {
|
if t != nil {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
testutil.SkipTestIfShortMode(t, "Cannot create clusters in --short tests")
|
testutil.SkipTestIfShortMode(t, "Cannot create clusters in --short tests")
|
||||||
@ -1275,7 +1276,11 @@ func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
|
|||||||
for _, m := range clus.Members {
|
for _, m := range clus.Members {
|
||||||
client, err := NewClientV3(m)
|
client, err := NewClientV3(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot create client: %v", err)
|
if t != nil {
|
||||||
|
t.Fatalf("cannot create client: %v", err)
|
||||||
|
} else {
|
||||||
|
log.Fatalf("cannot create client: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
clus.clients = append(clus.clients, client)
|
clus.clients = append(clus.clients, client)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user