From 6637aee804678c8abad1019f939dfae078a759a3 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 27 Mar 2023 13:35:29 +0200 Subject: [PATCH] tests: Test separate http port connection multiplexing Signed-off-by: Marek Siarkowicz --- tests/e2e/cluster_proxy_test.go | 6 ++-- tests/e2e/cluster_test.go | 58 ++++++++++++++++++++------------- tests/e2e/cmux_test.go | 45 ++++++++++++++++--------- tests/e2e/etcd_process.go | 18 +++++++--- 4 files changed, 82 insertions(+), 45 deletions(-) diff --git a/tests/e2e/cluster_proxy_test.go b/tests/e2e/cluster_proxy_test.go index fd7924835..beca84cfd 100644 --- a/tests/e2e/cluster_proxy_test.go +++ b/tests/e2e/cluster_proxy_test.go @@ -55,8 +55,10 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() } -func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() } -func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() } +func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() } +func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() } +func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() } +func (p *proxyEtcdProcess) EndpointsGRPC() []string { return p.proxyV3.endpoints() } func (p *proxyEtcdProcess) EndpointsMetrics() []string { panic("not implemented; proxy doesn't provide health information") } diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index c5d752e70..e63e9a5d8 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -246,19 +246,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* initialCluster := make([]string, cfg.clusterSize) for i := 0; i < cfg.clusterSize; i++ { var curls []string - var curl, curltls string + var curl string port := cfg.basePort + 5*i + clientPort := port clientHttpPort := port + 4 - curlHost := fmt.Sprintf("localhost:%d", port) - switch cfg.clientTLS { - case clientNonTLS, clientTLS: - curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String() + if cfg.clientTLS == clientTLSAndNonTLS { + curl = clientURL(clientPort, clientNonTLS) + curls = []string{curl, clientURL(clientPort, clientTLS)} + } else { + curl = clientURL(clientPort, cfg.clientTLS) curls = []string{curl} - case clientTLSAndNonTLS: - curl = (&url.URL{Scheme: "http", Host: curlHost}).String() - curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() - curls = []string{curl, curltls} } purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} @@ -279,9 +277,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount), } + var clientHttpUrl string if cfg.clientHttpSeparate { - clientHttpUrl := url.URL{Scheme: cfg.clientScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)} - args = append(args, "--listen-client-http-urls", clientHttpUrl.String()) + clientHttpUrl = clientURL(clientHttpPort, cfg.clientTLS) + args = append(args, "--listen-client-http-urls", clientHttpUrl) } args = addV2Args(args) if cfg.forceNewCluster { @@ -342,18 +341,19 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* } etcdCfgs[i] = &etcdServerProcessConfig{ - lg: lg, - execPath: cfg.execPath, - args: args, - envVars: cfg.envVars, - tlsArgs: cfg.tlsArgs(), - dataDirPath: dataDirPath, - keepDataDir: cfg.keepDataDir, - name: name, - purl: purl, - acurl: curl, - murl: murl, - initialToken: cfg.initialToken, + lg: lg, + execPath: cfg.execPath, + args: args, + envVars: cfg.envVars, + tlsArgs: cfg.tlsArgs(), + dataDirPath: dataDirPath, + keepDataDir: cfg.keepDataDir, + name: name, + purl: purl, + acurl: curl, + murl: murl, + initialToken: cfg.initialToken, + clientHttpUrl: clientHttpUrl, } } @@ -366,6 +366,18 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* return etcdCfgs } +func clientURL(port int, connType clientConnType) string { + curlHost := fmt.Sprintf("localhost:%d", port) + switch connType { + case clientNonTLS: + return (&url.URL{Scheme: "http", Host: curlHost}).String() + case clientTLS: + return (&url.URL{Scheme: "https", Host: curlHost}).String() + default: + panic(fmt.Sprintf("Unsupported connection type %v", connType)) + } +} + func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { if cfg.clientTLS != clientNonTLS { if cfg.isClientAutoTLS { diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go index bf3f5ac43..22f95297d 100644 --- a/tests/e2e/cmux_test.go +++ b/tests/e2e/cmux_test.go @@ -37,8 +37,9 @@ import ( func TestConnectionMultiplexing(t *testing.T) { BeforeTest(t) for _, tc := range []struct { - name string - serverTLS clientConnType + name string + serverTLS clientConnType + separateHttpPort bool }{ { name: "ServerTLS", @@ -52,10 +53,20 @@ func TestConnectionMultiplexing(t *testing.T) { name: "ServerTLSAndNonTLS", serverTLS: clientTLSAndNonTLS, }, + { + name: "SeparateHTTP/ServerTLS", + serverTLS: clientTLS, + separateHttpPort: true, + }, + { + name: "SeparateHTTP/ServerNonTLS", + serverTLS: clientNonTLS, + separateHttpPort: true, + }, } { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() - cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true} + cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true, clientHttpSeparate: tc.separateHttpPort} clus, err := newEtcdProcessCluster(t, &cfg) require.NoError(t, err) defer clus.Close() @@ -76,43 +87,45 @@ func TestConnectionMultiplexing(t *testing.T) { name = "ClientTLS" } t.Run(name, func(t *testing.T) { - testConnectionMultiplexing(ctx, t, clus.EndpointsV3()[0], connType) + testConnectionMultiplexing(ctx, t, clus.procs[0], connType) }) } }) } - } -func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint string, connType clientConnType) { +func testConnectionMultiplexing(ctx context.Context, t *testing.T, member etcdProcess, connType clientConnType) { + httpEndpoint := member.EndpointsHTTP()[0] + grpcEndpoint := member.EndpointsGRPC()[0] switch connType { case clientTLS: - endpoint = toTLS(endpoint) + httpEndpoint = toTLS(httpEndpoint) + grpcEndpoint = toTLS(grpcEndpoint) case clientNonTLS: default: panic(fmt.Sprintf("Unsupported conn type %v", connType)) } t.Run("etcdctl", func(t *testing.T) { t.Run("v2", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{endpoint}, connType, false, true) + etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true) err := etcdctl.Set("a", "1") assert.NoError(t, err) }) t.Run("v3", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{endpoint}, connType, false, false) + etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false) err := etcdctl.Put("a", "1") assert.NoError(t, err) }) }) t.Run("clientv2", func(t *testing.T) { - c, err := newClientV2(t, []string{endpoint}, connType, false) + c, err := newClientV2(t, []string{httpEndpoint}, connType, false) require.NoError(t, err) kv := clientv2.NewKeysAPI(c) _, err = kv.Set(ctx, "a", "1", nil) assert.NoError(t, err) }) t.Run("clientv3", func(t *testing.T) { - c := newClient(t, []string{endpoint}, connType, false) + c := newClient(t, []string{grpcEndpoint}, connType, false) _, err := c.Get(ctx, "a") assert.NoError(t, err) }) @@ -123,11 +136,11 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint stri tname = "default" } t.Run(tname, func(t *testing.T) { - assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType)) - assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType)) - assert.NoError(t, fetchVersion(endpoint, httpVersion, connType)) - assert.NoError(t, fetchHealth(endpoint, httpVersion, connType)) - assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType)) + assert.NoError(t, fetchGrpcGateway(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchMetrics(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchVersion(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchHealth(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchDebugVars(httpEndpoint, httpVersion, connType)) }) } }) diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index 027b7d6aa..2c5a408e1 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -35,6 +35,8 @@ var ( type etcdProcess interface { EndpointsV2() []string EndpointsV3() []string + EndpointsGRPC() []string + EndpointsHTTP() []string EndpointsMetrics() []string Start() error @@ -72,8 +74,9 @@ type etcdServerProcessConfig struct { purl url.URL - acurl string - murl string + acurl string + murl string + clientHttpUrl string initialToken string initialCluster string @@ -91,8 +94,15 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil } -func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} } -func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() } +func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() } +func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsGRPC() } +func (ep *etcdServerProcess) EndpointsGRPC() []string { return []string{ep.cfg.acurl} } +func (ep *etcdServerProcess) EndpointsHTTP() []string { + if ep.cfg.clientHttpUrl == "" { + return []string{ep.cfg.acurl} + } + return []string{ep.cfg.clientHttpUrl} +} func (ep *etcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.murl} } func (ep *etcdServerProcess) Start() error {