mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
[3.4] backport etcd-io#13359 Fix http2 authority header in single endpoint scenario
Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
parent
1e754357a8
commit
5c95ef3e31
@ -281,8 +281,7 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
|
||||
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
|
||||
}
|
||||
|
||||
initialEndpoints := strings.Join(c.Endpoints(), ";")
|
||||
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
|
||||
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0]))
|
||||
conn, err := grpc.DialContext(dctx, target, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -290,6 +289,20 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func authority(endpoint string) string {
|
||||
spl := strings.SplitN(endpoint, "://", 2)
|
||||
if len(spl) < 2 {
|
||||
if strings.HasPrefix(endpoint, "unix:") {
|
||||
return endpoint[len("unix:"):]
|
||||
}
|
||||
if strings.HasPrefix(endpoint, "unixs:") {
|
||||
return endpoint[len("unixs:"):]
|
||||
}
|
||||
return endpoint
|
||||
}
|
||||
return spl[1]
|
||||
}
|
||||
|
||||
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
|
||||
r := endpoint.RequiresCredentials(ep)
|
||||
switch r {
|
||||
|
@ -116,6 +116,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
|
||||
return p.etcdProc.WithStopSignal(sig)
|
||||
}
|
||||
|
||||
func (p *proxyEtcdProcess) Logs() logsExpect {
|
||||
return p.etcdProc.Logs()
|
||||
}
|
||||
|
||||
type proxyProc struct {
|
||||
execPath string
|
||||
args []string
|
||||
|
@ -100,6 +100,7 @@ type etcdProcessClusterConfig struct {
|
||||
execPath string
|
||||
dataDirPath string
|
||||
keepDataDir bool
|
||||
envVars map[string]string
|
||||
|
||||
clusterSize int
|
||||
|
||||
@ -291,6 +292,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
|
||||
etcdCfgs[i] = &etcdServerProcessConfig{
|
||||
execPath: cfg.execPath,
|
||||
args: args,
|
||||
envVars: cfg.envVars,
|
||||
tlsArgs: cfg.tlsArgs(),
|
||||
dataDirPath: dataDirPath,
|
||||
keepDataDir: cfg.keepDataDir,
|
||||
|
214
tests/e2e/ctl_v3_grpc_test.go
Normal file
214
tests/e2e/ctl_v3_grpc_test.go
Normal file
@ -0,0 +1,214 @@
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !cluster_proxy
|
||||
// +build !cluster_proxy
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"go.etcd.io/etcd/pkg/testutil"
|
||||
)
|
||||
|
||||
func TestAuthority(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
useTLS bool
|
||||
useInsecureTLS bool
|
||||
// Pattern used to generate endpoints for client. Fields filled
|
||||
// %d - will be filled with member grpc port
|
||||
clientURLPattern string
|
||||
|
||||
// Pattern used to validate authority received by server. Fields filled:
|
||||
// %d - will be filled with first member grpc port
|
||||
expectAuthorityPattern string
|
||||
}{
|
||||
{
|
||||
name: "http://domain[:port]",
|
||||
clientURLPattern: "http://localhost:%d",
|
||||
expectAuthorityPattern: "localhost:%d",
|
||||
},
|
||||
{
|
||||
name: "http://address[:port]",
|
||||
clientURLPattern: "http://127.0.0.1:%d",
|
||||
expectAuthorityPattern: "127.0.0.1:%d",
|
||||
},
|
||||
{
|
||||
name: "https://domain[:port] insecure",
|
||||
useTLS: true,
|
||||
useInsecureTLS: true,
|
||||
clientURLPattern: "https://localhost:%d",
|
||||
expectAuthorityPattern: "localhost:%d",
|
||||
},
|
||||
{
|
||||
name: "https://address[:port] insecure",
|
||||
useTLS: true,
|
||||
useInsecureTLS: true,
|
||||
clientURLPattern: "https://127.0.0.1:%d",
|
||||
expectAuthorityPattern: "127.0.0.1:%d",
|
||||
},
|
||||
{
|
||||
name: "https://domain[:port]",
|
||||
useTLS: true,
|
||||
clientURLPattern: "https://localhost:%d",
|
||||
expectAuthorityPattern: "localhost:%d",
|
||||
},
|
||||
{
|
||||
name: "https://address[:port]",
|
||||
useTLS: true,
|
||||
clientURLPattern: "https://127.0.0.1:%d",
|
||||
expectAuthorityPattern: "127.0.0.1:%d",
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
for _, clusterSize := range []int{1, 3} {
|
||||
t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
cfg := configNoTLS
|
||||
cfg.clusterSize = clusterSize
|
||||
if tc.useTLS {
|
||||
cfg.clientTLS = clientTLS
|
||||
}
|
||||
cfg.isClientAutoTLS = tc.useInsecureTLS
|
||||
// Enable debug mode to get logs with http2 headers (including authority)
|
||||
cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"}
|
||||
|
||||
epc, err := newEtcdProcessCluster(&cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||
}
|
||||
defer epc.Close()
|
||||
endpoints := templateEndpoints(t, tc.clientURLPattern, epc)
|
||||
|
||||
client := clusterEtcdctlV3(&cfg, endpoints)
|
||||
err = client.Put("foo", "bar")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
executeWithTimeout(t, 5*time.Second, func() {
|
||||
assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc)
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func templateEndpoints(t *testing.T, pattern string, clus *etcdProcessCluster) []string {
|
||||
t.Helper()
|
||||
endpoints := []string{}
|
||||
for i := 0; i < clus.cfg.clusterSize; i++ {
|
||||
ent := pattern
|
||||
if strings.Contains(ent, "%d") {
|
||||
ent = fmt.Sprintf(ent, etcdProcessBasePort+i*5)
|
||||
}
|
||||
if strings.Contains(ent, "%") {
|
||||
t.Fatalf("Failed to template pattern, %% symbol left %q", ent)
|
||||
}
|
||||
endpoints = append(endpoints, ent)
|
||||
}
|
||||
return endpoints
|
||||
}
|
||||
|
||||
func assertAuthority(t *testing.T, expectAurhority string, clus *etcdProcessCluster) {
|
||||
logs := []logsExpect{}
|
||||
for _, proc := range clus.procs {
|
||||
logs = append(logs, proc.Logs())
|
||||
}
|
||||
line := firstMatch(t, `http2: decoded hpack field header field ":authority"`, logs...)
|
||||
line = strings.TrimSuffix(line, "\n")
|
||||
line = strings.TrimSuffix(line, "\r")
|
||||
expectLine := fmt.Sprintf(`http2: decoded hpack field header field ":authority" = %q`, expectAurhority)
|
||||
assert.True(t, strings.HasSuffix(line, expectLine), fmt.Sprintf("Got %q expected suffix %q", line, expectLine))
|
||||
}
|
||||
|
||||
func firstMatch(t *testing.T, expectLine string, logs ...logsExpect) string {
|
||||
t.Helper()
|
||||
match := make(chan string, len(logs))
|
||||
for i := range logs {
|
||||
go func(l logsExpect) {
|
||||
line, _ := l.Expect(expectLine)
|
||||
match <- line
|
||||
}(logs[i])
|
||||
}
|
||||
return <-match
|
||||
}
|
||||
|
||||
func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) {
|
||||
donec := make(chan struct{})
|
||||
go func() {
|
||||
defer close(donec)
|
||||
f()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
|
||||
case <-donec:
|
||||
}
|
||||
}
|
||||
|
||||
type etcdctlV3 struct {
|
||||
cfg *etcdProcessClusterConfig
|
||||
endpoints []string
|
||||
}
|
||||
|
||||
func clusterEtcdctlV3(cfg *etcdProcessClusterConfig, endpoints []string) *etcdctlV3 {
|
||||
return &etcdctlV3{
|
||||
cfg: cfg,
|
||||
endpoints: endpoints,
|
||||
}
|
||||
}
|
||||
|
||||
func (ctl *etcdctlV3) Put(key, value string) error {
|
||||
return ctl.runCmd("put", key, value)
|
||||
}
|
||||
|
||||
func (ctl *etcdctlV3) runCmd(args ...string) error {
|
||||
cmdArgs := []string{ctlBinPath + "3"}
|
||||
for k, v := range ctl.flags() {
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
|
||||
}
|
||||
cmdArgs = append(cmdArgs, args...)
|
||||
return spawnWithExpect(cmdArgs, "OK")
|
||||
}
|
||||
|
||||
func (ctl *etcdctlV3) flags() map[string]string {
|
||||
fmap := make(map[string]string)
|
||||
if ctl.cfg.clientTLS == clientTLS {
|
||||
if ctl.cfg.isClientAutoTLS {
|
||||
fmap["insecure-transport"] = "false"
|
||||
fmap["insecure-skip-tls-verify"] = "true"
|
||||
} else if ctl.cfg.isClientCRL {
|
||||
fmap["cacert"] = caPath
|
||||
fmap["cert"] = revokedCertPath
|
||||
fmap["key"] = revokedPrivateKeyPath
|
||||
} else {
|
||||
fmap["cacert"] = caPath
|
||||
fmap["cert"] = certPath
|
||||
fmap["key"] = privateKeyPath
|
||||
}
|
||||
}
|
||||
fmap["endpoints"] = strings.Join(ctl.endpoints, ",")
|
||||
return fmap
|
||||
}
|
@ -43,6 +43,12 @@ type etcdProcess interface {
|
||||
Close() error
|
||||
WithStopSignal(sig os.Signal) os.Signal
|
||||
Config() *etcdServerProcessConfig
|
||||
|
||||
Logs() logsExpect
|
||||
}
|
||||
|
||||
type logsExpect interface {
|
||||
Expect(string) (string, error)
|
||||
}
|
||||
|
||||
type etcdServerProcess struct {
|
||||
@ -54,6 +60,7 @@ type etcdServerProcess struct {
|
||||
type etcdServerProcessConfig struct {
|
||||
execPath string
|
||||
args []string
|
||||
envVars map[string]string
|
||||
tlsArgs []string
|
||||
|
||||
dataDirPath string
|
||||
@ -98,7 +105,7 @@ func (ep *etcdServerProcess) Start() error {
|
||||
if ep.proc != nil {
|
||||
panic("already started")
|
||||
}
|
||||
proc, err := spawnCmd(append([]string{ep.cfg.execPath}, ep.cfg.args...))
|
||||
proc, err := spawnCmdWithEnv(append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -153,3 +160,10 @@ func (ep *etcdServerProcess) waitReady() error {
|
||||
}
|
||||
|
||||
func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg }
|
||||
|
||||
func (ep *etcdServerProcess) Logs() logsExpect {
|
||||
if ep.proc == nil {
|
||||
panic("please grab logs before process is stopped")
|
||||
}
|
||||
return ep.proc
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
@ -27,7 +28,11 @@ import (
|
||||
const noOutputLineCount = 0 // regular binaries emit no extra lines
|
||||
|
||||
func spawnCmd(args []string) (*expect.ExpectProcess, error) {
|
||||
env := os.Environ()
|
||||
return spawnCmdWithEnv(args, nil)
|
||||
}
|
||||
|
||||
func spawnCmdWithEnv(args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
|
||||
env := mergeEnvVariables(envVars)
|
||||
switch {
|
||||
case strings.HasSuffix(args[0], ctlBinPath+"2"):
|
||||
env = append(env, "ETCDCTL_API=2")
|
||||
@ -38,3 +43,23 @@ func spawnCmd(args []string) (*expect.ExpectProcess, error) {
|
||||
}
|
||||
return expect.NewExpectWithEnv(args[0], args[1:], env)
|
||||
}
|
||||
|
||||
func mergeEnvVariables(envVars map[string]string) []string {
|
||||
var env []string
|
||||
// Environment variables are passed as parameter have higher priority
|
||||
// than os environment variables.
|
||||
for k, v := range envVars {
|
||||
env = append(env, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
|
||||
// Now, we can set os environment variables not passed as parameter.
|
||||
currVars := os.Environ()
|
||||
for _, v := range currVars {
|
||||
p := strings.Split(v, "=")
|
||||
if _, ok := envVars[p[0]]; !ok {
|
||||
env = append(env, fmt.Sprintf("%s=%s", p[0], p[1]))
|
||||
}
|
||||
}
|
||||
|
||||
return env
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user