tests: Test separate http port connection multiplexing

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-03-27 13:35:29 +02:00
parent 8dc1244179
commit 75675cd464
4 changed files with 80 additions and 43 deletions

View File

@ -54,8 +54,10 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error
func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsGRPC() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsMetrics() []string {
panic("not implemented; proxy doesn't provide health information")
}

View File

@ -191,19 +191,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
initialCluster := make([]string, cfg.clusterSize)
for i := 0; i < cfg.clusterSize; i++ {
var curls []string
var curl, curltls string
var curl string
port := cfg.basePort + 5*i
clientPort := port
clientHttpPort := port + 4
curlHost := fmt.Sprintf("localhost:%d", port)
switch cfg.clientTLS {
case clientNonTLS, clientTLS:
curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
if cfg.clientTLS == clientTLSAndNonTLS {
curl = clientURL(clientPort, clientNonTLS)
curls = []string{curl, clientURL(clientPort, clientTLS)}
} else {
curl = clientURL(clientPort, cfg.clientTLS)
curls = []string{curl}
case clientTLSAndNonTLS:
curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
curls = []string{curl, curltls}
}
purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
@ -228,9 +226,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
}
var clientHttpUrl string
if cfg.clientHttpSeparate {
clientHttpUrl := url.URL{Scheme: cfg.clientScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)}
args = append(args, "--listen-client-http-urls", clientHttpUrl.String())
clientHttpUrl = clientURL(clientHttpPort, cfg.clientTLS)
args = append(args, "--listen-client-http-urls", clientHttpUrl)
}
args = addV2Args(args)
if cfg.forceNewCluster {
@ -274,16 +273,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
}
etcdCfgs[i] = &etcdServerProcessConfig{
execPath: cfg.execPath,
args: args,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
acurl: curl,
murl: murl,
initialToken: cfg.initialToken,
execPath: cfg.execPath,
args: args,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
acurl: curl,
murl: murl,
initialToken: cfg.initialToken,
clientHttpUrl: clientHttpUrl,
}
}
@ -296,6 +296,18 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
return etcdCfgs
}
func clientURL(port int, connType clientConnType) string {
curlHost := fmt.Sprintf("localhost:%d", port)
switch connType {
case clientNonTLS:
return (&url.URL{Scheme: "http", Host: curlHost}).String()
case clientTLS:
return (&url.URL{Scheme: "https", Host: curlHost}).String()
default:
panic(fmt.Sprintf("Unsupported connection type %v", connType))
}
}
func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
if cfg.clientTLS != clientNonTLS {
if cfg.isClientAutoTLS {

View File

@ -34,8 +34,9 @@ import (
func TestConnectionMultiplexing(t *testing.T) {
defer testutil.AfterTest(t)
for _, tc := range []struct {
name string
serverTLS clientConnType
name string
serverTLS clientConnType
separateHttpPort bool
}{
{
name: "ServerTLS",
@ -49,10 +50,20 @@ func TestConnectionMultiplexing(t *testing.T) {
name: "ServerTLSAndNonTLS",
serverTLS: clientTLSAndNonTLS,
},
{
name: "SeparateHTTP/ServerTLS",
serverTLS: clientTLS,
separateHttpPort: true,
},
{
name: "SeparateHTTP/ServerNonTLS",
serverTLS: clientNonTLS,
separateHttpPort: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true}
cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true, clientHttpSeparate: tc.separateHttpPort}
clus, err := newEtcdProcessCluster(&cfg)
require.NoError(t, err)
defer clus.Close()
@ -73,43 +84,45 @@ func TestConnectionMultiplexing(t *testing.T) {
name = "ClientTLS"
}
t.Run(name, func(t *testing.T) {
testConnectionMultiplexing(ctx, t, clus.EndpointsV3()[0], connType)
testConnectionMultiplexing(ctx, t, clus.procs[0], connType)
})
}
})
}
}
func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint string, connType clientConnType) {
func testConnectionMultiplexing(ctx context.Context, t *testing.T, member etcdProcess, connType clientConnType) {
httpEndpoint := member.EndpointsHTTP()[0]
grpcEndpoint := member.EndpointsGRPC()[0]
switch connType {
case clientTLS:
endpoint = toTLS(endpoint)
httpEndpoint = toTLS(httpEndpoint)
grpcEndpoint = toTLS(grpcEndpoint)
case clientNonTLS:
default:
panic(fmt.Sprintf("Unsupported conn type %v", connType))
}
t.Run("etcdctl", func(t *testing.T) {
t.Run("v2", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{endpoint}, connType, false, true)
etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true)
err := etcdctl.Set("a", "1")
assert.NoError(t, err)
})
t.Run("v3", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{endpoint}, connType, false, false)
etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false)
err := etcdctl.Put("a", "1")
assert.NoError(t, err)
})
})
t.Run("clientv2", func(t *testing.T) {
c, err := newClientV2(t, []string{endpoint}, connType, false)
c, err := newClientV2(t, []string{httpEndpoint}, connType, false)
require.NoError(t, err)
kv := clientv2.NewKeysAPI(c)
_, err = kv.Set(ctx, "a", "1", nil)
assert.NoError(t, err)
})
t.Run("clientv3", func(t *testing.T) {
c := newClient(t, []string{endpoint}, connType, false)
c := newClient(t, []string{grpcEndpoint}, connType, false)
_, err := c.Get(ctx, "a")
assert.NoError(t, err)
})
@ -120,11 +133,11 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint stri
tname = "default"
}
t.Run(tname, func(t *testing.T) {
assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(endpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(endpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType))
assert.NoError(t, fetchGrpcGateway(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(httpEndpoint, httpVersion, connType))
})
}
})

View File

@ -33,6 +33,8 @@ var (
type etcdProcess interface {
EndpointsV2() []string
EndpointsV3() []string
EndpointsGRPC() []string
EndpointsHTTP() []string
EndpointsMetrics() []string
Start() error
@ -61,8 +63,9 @@ type etcdServerProcessConfig struct {
purl url.URL
acurl string
murl string
acurl string
murl string
clientHttpUrl string
initialToken string
initialCluster string
@ -80,8 +83,15 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err
return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
}
func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} }
func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() }
func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsGRPC() }
func (ep *etcdServerProcess) EndpointsGRPC() []string { return []string{ep.cfg.acurl} }
func (ep *etcdServerProcess) EndpointsHTTP() []string {
if ep.cfg.clientHttpUrl == "" {
return []string{ep.cfg.acurl}
}
return []string{ep.cfg.clientHttpUrl}
}
func (ep *etcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.murl} }
func (ep *etcdServerProcess) Start() error {