Merge pull request #15491 from serathius/test-cmux-3.5

Connection multiplexing regression tests for v3.5
This commit is contained in:
Marek Siarkowicz
2023-03-18 13:23:36 +01:00
committed by GitHub
17 changed files with 505 additions and 130 deletions

View File

@@ -611,6 +611,15 @@
}
]
},
{
"project": "golang.org/x/sync/errgroup",
"licenses": [
{
"type": "BSD 3-clause \"New\" or \"Revised\" License",
"confidence": 0.9663865546218487
}
]
},
{
"project": "golang.org/x/sys/unix",
"licenses": [

View File

@@ -149,6 +149,11 @@ func (ep *ExpectProcess) Signal(sig os.Signal) error {
return ep.cmd.Process.Signal(sig)
}
// Wait waits for the process to finish.
func (ep *ExpectProcess) Wait() {
ep.wg.Wait()
}
// Close waits for the expect process to exit.
// Close currently does not return error if process exited with !=0 status.
// TODO: Close should expose underlying proces failure by default.

View File

@@ -31,18 +31,10 @@ import (
const etcdProcessBasePort = 20000
type clientConnType int
var (
fixturesDir = integration.MustAbsPath("../fixtures")
)
const (
clientNonTLS clientConnType = iota
clientTLS
clientTLSAndNonTLS
)
func newConfigNoTLS() *etcdProcessClusterConfig {
return &etcdProcessClusterConfig{clusterSize: 3,
initialToken: "new",

222
tests/e2e/cmux_test.go Normal file
View File

@@ -0,0 +1,222 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// These tests are directly validating etcd connection multiplexing.
//go:build !cluster_proxy
package e2e
import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
clientv2 "go.etcd.io/etcd/client/v2"
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
)
func TestConnectionMultiplexing(t *testing.T) {
BeforeTest(t)
for _, tc := range []struct {
name string
serverTLS clientConnType
}{
{
name: "ServerTLS",
serverTLS: clientTLS,
},
{
name: "ServerNonTLS",
serverTLS: clientNonTLS,
},
{
name: "ServerTLSAndNonTLS",
serverTLS: clientTLSAndNonTLS,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true}
clus, err := newEtcdProcessCluster(t, &cfg)
require.NoError(t, err)
defer clus.Close()
var clientScenarios []clientConnType
switch tc.serverTLS {
case clientTLS:
clientScenarios = []clientConnType{clientTLS}
case clientNonTLS:
clientScenarios = []clientConnType{clientNonTLS}
case clientTLSAndNonTLS:
clientScenarios = []clientConnType{clientTLS, clientNonTLS}
}
for _, connType := range clientScenarios {
name := "ClientNonTLS"
if connType == clientTLS {
name = "ClientTLS"
}
t.Run(name, func(t *testing.T) {
testConnectionMultiplexing(ctx, t, clus.EndpointsV3()[0], connType)
})
}
})
}
}
func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint string, connType clientConnType) {
switch connType {
case clientTLS:
endpoint = toTLS(endpoint)
case clientNonTLS:
default:
panic(fmt.Sprintf("Unsupported conn type %v", connType))
}
t.Run("etcdctl", func(t *testing.T) {
t.Run("v2", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{endpoint}, connType, false, true)
err := etcdctl.Set("a", "1")
assert.NoError(t, err)
})
t.Run("v3", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{endpoint}, connType, false, false)
err := etcdctl.Put("a", "1")
assert.NoError(t, err)
})
})
t.Run("clientv2", func(t *testing.T) {
c, err := newClientV2(t, []string{endpoint}, connType, false)
require.NoError(t, err)
kv := clientv2.NewKeysAPI(c)
_, err = kv.Set(ctx, "a", "1", nil)
assert.NoError(t, err)
})
t.Run("clientv3", func(t *testing.T) {
c := newClient(t, []string{endpoint}, connType, false)
_, err := c.Get(ctx, "a")
assert.NoError(t, err)
})
t.Run("curl", func(t *testing.T) {
for _, httpVersion := range []string{"2", "1.1", "1.0", ""} {
tname := "http" + httpVersion
if httpVersion == "" {
tname = "default"
}
t.Run(tname, func(t *testing.T) {
assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(endpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(endpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType))
})
}
})
}
func fetchGrpcGateway(endpoint string, httpVersion string, connType clientConnType) error {
rangeData, err := json.Marshal(&pb.RangeRequest{
Key: []byte("a"),
})
if err != nil {
return err
}
req := cURLReq{endpoint: "/v3/kv/range", value: string(rangeData), timeout: 5, httpVersion: httpVersion}
respData, err := curl(endpoint, "POST", req, connType)
return validateGrpcgatewayRangeReponse([]byte(respData))
}
func validateGrpcgatewayRangeReponse(respData []byte) error {
// Modified json annotation so ResponseHeader fields are stored in string.
type responseHeader struct {
ClusterId uint64 `json:"cluster_id,string,omitempty"`
MemberId uint64 `json:"member_id,string,omitempty"`
Revision int64 `json:"revision,string,omitempty"`
RaftTerm uint64 `json:"raft_term,string,omitempty"`
}
type keyValue struct {
Key []byte `json:"key,omitempty"`
CreateRevision int64 `json:"create_revision,string,omitempty"`
ModRevision int64 `json:"mod_revision,string,omitempty"`
Version int64 `json:"version,string,omitempty"`
Value []byte `json:"value,omitempty"`
Lease int64 `json:"lease,omitempty"`
}
type rangeResponse struct {
Header *responseHeader `json:"header,omitempty"`
Kvs []*keyValue `json:"kvs,omitempty"`
More bool `json:"more,omitempty"`
Count int64 `json:"count,string,omitempty"`
}
var resp rangeResponse
return json.Unmarshal(respData, &resp)
}
func fetchMetrics(endpoint string, httpVersion string, connType clientConnType) error {
req := cURLReq{endpoint: "/metrics", timeout: 5, httpVersion: httpVersion}
respData, err := curl(endpoint, "GET", req, connType)
if err != nil {
return err
}
var parser expfmt.TextParser
_, err = parser.TextToMetricFamilies(strings.NewReader(strings.ReplaceAll(respData, "\r\n", "\n")))
return err
}
func fetchVersion(endpoint string, httpVersion string, connType clientConnType) error {
req := cURLReq{endpoint: "/version", timeout: 5, httpVersion: httpVersion}
respData, err := curl(endpoint, "GET", req, connType)
if err != nil {
return err
}
var resp version.Versions
return json.Unmarshal([]byte(respData), &resp)
}
func fetchHealth(endpoint string, httpVersion string, connType clientConnType) error {
req := cURLReq{endpoint: "/health", timeout: 5, httpVersion: httpVersion}
respData, err := curl(endpoint, "GET", req, connType)
if err != nil {
return err
}
var resp etcdhttp.Health
return json.Unmarshal([]byte(respData), &resp)
}
func fetchDebugVars(endpoint string, httpVersion string, connType clientConnType) error {
req := cURLReq{endpoint: "/debug/vars", timeout: 5, httpVersion: httpVersion}
respData, err := curl(endpoint, "GET", req, connType)
if err != nil {
return err
}
var resp map[string]interface{}
return json.Unmarshal([]byte(respData), &resp)
}
func curl(endpoint string, method string, curlReq cURLReq, connType clientConnType) (string, error) {
args := cURLPrefixArgs(endpoint, connType, false, method, curlReq)
lines, err := runUtilCompletion(args, nil)
if err != nil {
return "", err
}
return strings.Join(lines, "\n"), nil
}

View File

@@ -112,7 +112,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
}
})
cc := NewEtcdctl(epc.EndpointsV3())
cc := NewEtcdctl(epc.EndpointsV3(), clientNonTLS, false, false)
for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
@@ -158,7 +158,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
}
})
cc := NewEtcdctl(epc.EndpointsV3())
cc := NewEtcdctl(epc.EndpointsV3(), clientNonTLS, false, false)
for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))

View File

@@ -20,44 +20,86 @@ import (
"strings"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
)
type EtcdctlV3 struct {
type Etcdctl struct {
connType clientConnType
isAutoTLS bool
endpoints []string
v2 bool
}
func NewEtcdctl(endpoints []string) *EtcdctlV3 {
return &EtcdctlV3{
func NewEtcdctl(endpoints []string, connType clientConnType, isAutoTLS bool, v2 bool) *Etcdctl {
return &Etcdctl{
endpoints: endpoints,
connType: connType,
isAutoTLS: isAutoTLS,
v2: v2,
}
}
func (ctl *EtcdctlV3) Put(key, value string) error {
args := ctl.cmdArgs()
args = append(args, "put", key, value)
return spawnWithExpect(args, "OK")
func (ctl *Etcdctl) Get(key string) (*clientv3.GetResponse, error) {
var resp clientv3.GetResponse
err := ctl.spawnJsonCmd(&resp, "get", key)
return &resp, err
}
func (ctl *EtcdctlV3) AlarmList() (*clientv3.AlarmResponse, error) {
func (ctl *Etcdctl) Put(key, value string) error {
if ctl.v2 {
panic("Unsupported method for v2")
}
args := ctl.cmdArgs()
args = append(args, "put", key, value)
return spawnWithExpectWithEnv(args, ctl.env(), "OK")
}
func (ctl *Etcdctl) Set(key, value string) error {
if !ctl.v2 {
panic("Unsupported method for v3")
}
args := ctl.cmdArgs()
args = append(args, "set", key, value)
lines, err := runUtilCompletion(args, ctl.env())
if err != nil {
return err
}
response := strings.ReplaceAll(strings.Join(lines, "\n"), "\r\n", "")
if response != value {
return fmt.Errorf("Got unexpected response %q, expected %q", response, value)
}
return nil
}
func (ctl *Etcdctl) AlarmList() (*clientv3.AlarmResponse, error) {
if ctl.v2 {
panic("Unsupported method for v2")
}
var resp clientv3.AlarmResponse
err := ctl.spawnJsonCmd(&resp, "alarm", "list")
return &resp, err
}
func (ctl *EtcdctlV3) MemberList() (*clientv3.MemberListResponse, error) {
func (ctl *Etcdctl) MemberList() (*clientv3.MemberListResponse, error) {
if ctl.v2 {
panic("Unsupported method for v2")
}
var resp clientv3.MemberListResponse
err := ctl.spawnJsonCmd(&resp, "member", "list")
return &resp, err
}
func (ctl *EtcdctlV3) Compact(rev int64) (*clientv3.CompactResponse, error) {
func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
if ctl.v2 {
panic("Unsupported method for v2")
}
args := ctl.cmdArgs("compact", fmt.Sprint(rev))
return nil, spawnWithExpect(args, fmt.Sprintf("compacted revision %v", rev))
return nil, spawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
}
func (ctl *EtcdctlV3) spawnJsonCmd(output interface{}, args ...string) error {
func (ctl *Etcdctl) spawnJsonCmd(output interface{}, args ...string) error {
args = append(args, "-w", "json")
cmd, err := spawnCmd(append(ctl.cmdArgs(), args...), nil)
cmd, err := spawnCmd(append(ctl.cmdArgs(), args...), ctl.env())
if err != nil {
return err
}
@@ -68,16 +110,44 @@ func (ctl *EtcdctlV3) spawnJsonCmd(output interface{}, args ...string) error {
return json.Unmarshal([]byte(line), output)
}
func (ctl *EtcdctlV3) cmdArgs(args ...string) []string {
cmdArgs := []string{ctlBinPath + "3"}
func (ctl *Etcdctl) cmdArgs(args ...string) []string {
cmdArgs := []string{ctlBinPath}
for k, v := range ctl.flags() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
}
return append(cmdArgs, args...)
}
func (ctl *EtcdctlV3) flags() map[string]string {
func (ctl *Etcdctl) flags() map[string]string {
fmap := make(map[string]string)
if ctl.v2 {
if ctl.connType == clientTLS {
fmap["ca-file"] = integration.TestTLSInfo.TrustedCAFile
fmap["cert-file"] = integration.TestTLSInfo.CertFile
fmap["key-file"] = integration.TestTLSInfo.KeyFile
}
} else {
if ctl.connType == clientTLS {
if ctl.isAutoTLS {
fmap["insecure-transport"] = "false"
fmap["insecure-skip-tls-verify"] = "true"
} else {
fmap["cacert"] = integration.TestTLSInfo.TrustedCAFile
fmap["cert"] = integration.TestTLSInfo.CertFile
fmap["key"] = integration.TestTLSInfo.KeyFile
}
}
}
fmap["endpoints"] = strings.Join(ctl.endpoints, ",")
return fmap
}
func (ctl *Etcdctl) env() map[string]string {
env := make(map[string]string)
if ctl.v2 {
env["ETCDCTL_API"] = "2"
} else {
env["ETCDCTL_API"] = "3"
}
return env
}

View File

@@ -78,6 +78,21 @@ func spawnWithExpectLines(args []string, envVars map[string]string, xs ...string
return lines, perr
}
func runUtilCompletion(args []string, envVars map[string]string) ([]string, error) {
proc, err := spawnCmd(args, envVars)
if err != nil {
return nil, fmt.Errorf("failed to spawn command %v with error: %w", args, err)
}
proc.Wait()
err = proc.Close()
if err != nil {
return nil, fmt.Errorf("failed to close command %v with error: %w", args, err)
}
return proc.Lines(), nil
}
func randomLeaseID() int64 {
return rand.New(rand.NewSource(time.Now().UnixNano())).Int63()
}

121
tests/e2e/utils.go Normal file
View File

@@ -0,0 +1,121 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"fmt"
"testing"
"time"
clientv2 "go.etcd.io/etcd/client/v2"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/stringutil"
)
type clientConnType int
const (
clientNonTLS clientConnType = iota
clientTLS
clientTLSAndNonTLS
)
func newClient(t *testing.T, entpoints []string, connType clientConnType, isAutoTLS bool) *clientv3.Client {
tlscfg, err := tlsInfo(t, connType, isAutoTLS)
if err != nil {
t.Fatal(err)
}
ccfg := clientv3.Config{
Endpoints: entpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
if tlscfg != nil {
tls, err := tlscfg.ClientConfig()
if err != nil {
t.Fatal(err)
}
ccfg.TLS = tls
}
c, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
c.Close()
})
return c
}
func newClientV2(t *testing.T, endpoints []string, connType clientConnType, isAutoTLS bool) (clientv2.Client, error) {
tls, err := tlsInfo(t, connType, isAutoTLS)
if err != nil {
t.Fatal(err)
}
cfg := clientv2.Config{
Endpoints: endpoints,
}
if tls != nil {
cfg.Transport, err = transport.NewTransport(*tls, 5*time.Second)
if err != nil {
t.Fatal(err)
}
}
return clientv2.New(cfg)
}
func tlsInfo(t testing.TB, connType clientConnType, isAutoTLS bool) (*transport.TLSInfo, error) {
switch connType {
case clientNonTLS, clientTLSAndNonTLS:
return nil, nil
case clientTLS:
if isAutoTLS {
tls, err := transport.SelfCert(zap.NewNop(), t.TempDir(), []string{"localhost"}, 1)
if err != nil {
return nil, fmt.Errorf("failed to generate cert: %s", err)
}
return &tls, nil
}
return &integration.TestTLSInfo, nil
default:
return nil, fmt.Errorf("config %v not supported", connType)
}
}
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error {
g := errgroup.Group{}
concurrency := 10
keysPerRoutine := keyCount / concurrency
for i := 0; i < concurrency; i++ {
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize))
if err != nil {
return err
}
}
return nil
})
}
return g.Wait()
}

View File

@@ -129,35 +129,44 @@ type cURLReq struct {
metricsURLScheme string
ciphers string
ciphers string
httpVersion string
}
// cURLPrefixArgs builds the beginning of a curl command for a given key
// cURLPrefixArgsCluster builds the beginning of a curl command for a given key
// addressed to a random URL in the given cluster.
func cURLPrefixArgs(clus *etcdProcessCluster, method string, req cURLReq) []string {
func cURLPrefixArgsCluster(clus *etcdProcessCluster, method string, req cURLReq) []string {
member := clus.procs[rand.Intn(clus.cfg.clusterSize)]
clientURL := member.Config().acurl
if req.metricsURLScheme != "" {
clientURL = member.EndpointsMetrics()[0]
}
return cURLPrefixArgs(clientURL, clus.cfg.clientTLS, !clus.cfg.noCN, method, req)
}
func cURLPrefixArgs(clientURL string, connType clientConnType, CN bool, method string, req cURLReq) []string {
var (
cmdArgs = []string{"curl"}
acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl
)
if req.httpVersion != "" {
cmdArgs = append(cmdArgs, "--http"+req.httpVersion)
}
if req.metricsURLScheme != "https" {
if req.isTLS {
if clus.cfg.clientTLS != clientTLSAndNonTLS {
if connType != clientTLSAndNonTLS {
panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
}
cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
acurl = toTLS(clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl)
} else if clus.cfg.clientTLS == clientTLS {
if !clus.cfg.noCN {
clientURL = toTLS(clientURL)
} else if connType == clientTLS {
if CN {
cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
} else {
cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath3, "--key", privateKeyPath3)
}
}
}
if req.metricsURLScheme != "" {
acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].EndpointsMetrics()[0]
}
ep := acurl + req.endpoint
ep := clientURL + req.endpoint
if req.username != "" || req.password != "" {
cmdArgs = append(cmdArgs, "-L", "-u", fmt.Sprintf("%s:%s", req.username, req.password), ep)
@@ -188,13 +197,13 @@ func cURLPrefixArgs(clus *etcdProcessCluster, method string, req cURLReq) []stri
}
func cURLPost(clus *etcdProcessCluster, req cURLReq) error {
return spawnWithExpect(cURLPrefixArgs(clus, "POST", req), req.expected)
return spawnWithExpect(cURLPrefixArgsCluster(clus, "POST", req), req.expected)
}
func cURLPut(clus *etcdProcessCluster, req cURLReq) error {
return spawnWithExpect(cURLPrefixArgs(clus, "PUT", req), req.expected)
return spawnWithExpect(cURLPrefixArgsCluster(clus, "PUT", req), req.expected)
}
func cURLGet(clus *etcdProcessCluster, req cURLReq) error {
return spawnWithExpect(cURLPrefixArgs(clus, "GET", req), req.expected)
return spawnWithExpect(cURLPrefixArgsCluster(clus, "GET", req), req.expected)
}

View File

@@ -243,7 +243,7 @@ func testV3CurlAuth(cx ctlCtx) {
lineFunc = func(txt string) bool { return true }
)
cmdArgs = cURLPrefixArgs(cx.epc, "POST", cURLReq{endpoint: path.Join(p, "/auth/authenticate"), value: string(authreq)})
cmdArgs = cURLPrefixArgsCluster(cx.epc, "POST", cURLReq{endpoint: path.Join(p, "/auth/authenticate"), value: string(authreq)})
proc, err := spawnCmd(cmdArgs, cx.envMap)
testutil.AssertNil(cx.t, err)
defer proc.Close()
@@ -282,7 +282,7 @@ func testV3CurlCampaign(cx ctlCtx) {
if err != nil {
cx.t.Fatal(err)
}
cargs := cURLPrefixArgs(cx.epc, "POST", cURLReq{
cargs := cURLPrefixArgsCluster(cx.epc, "POST", cURLReq{
endpoint: path.Join(cx.apiPrefix, "/election/campaign"),
value: string(cdata),
})

View File

@@ -26,12 +26,8 @@ import (
"time"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
const (
@@ -72,7 +68,7 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) {
clus, err := newEtcdProcessCluster(t, &tc.config)
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus, tc.config)
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
@@ -92,7 +88,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) {
clus, err := newEtcdProcessCluster(t, &tc.config)
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus, tc.config)
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
@@ -124,7 +120,7 @@ func TestWatchDelayForEvent(t *testing.T) {
clus, err := newEtcdProcessCluster(t, &tc.config)
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus, tc.config)
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
@@ -175,25 +171,6 @@ func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) {
}
}
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error {
g := errgroup.Group{}
concurrency := 10
keysPerRoutine := keyCount / concurrency
for i := 0; i < concurrency; i++ {
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize))
if err != nil {
return err
}
}
return nil
})
}
return g.Wait()
}
func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Group, c *clientv3.Client) {
mux := sync.RWMutex{}
size := 0
@@ -229,48 +206,3 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr
return nil
})
}
func newClient(t *testing.T, clus *etcdProcessCluster, cfg etcdProcessClusterConfig) *clientv3.Client {
tlscfg, err := tlsInfo(t, cfg)
if err != nil {
t.Fatal(err)
}
ccfg := clientv3.Config{
Endpoints: clus.EndpointsV3(),
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
if tlscfg != nil {
tls, err := tlscfg.ClientConfig()
if err != nil {
t.Fatal(err)
}
ccfg.TLS = tls
}
c, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
c.Close()
})
return c
}
func tlsInfo(t testing.TB, cfg etcdProcessClusterConfig) (*transport.TLSInfo, error) {
switch cfg.clientTLS {
case clientNonTLS, clientTLSAndNonTLS:
return nil, nil
case clientTLS:
if cfg.isClientAutoTLS {
tls, err := transport.SelfCert(zap.NewNop(), t.TempDir(), []string{"localhost"}, 1)
if err != nil {
return nil, fmt.Errorf("failed to generate cert: %s", err)
}
return &tls, nil
}
panic("Unsupported non-auto tls")
default:
return nil, fmt.Errorf("config %v not supported", cfg)
}
}

View File

@@ -23,6 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
@@ -63,7 +64,6 @@ require (
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect

View File

@@ -84,7 +84,7 @@ var (
// member, ensuring restarted members can listen on the same port again.
localListenCount = int64(0)
testTLSInfo = transport.TLSInfo{
TestTLSInfo = transport.TLSInfo{
KeyFile: MustAbsPath("../fixtures/server.key.insecure"),
CertFile: MustAbsPath("../fixtures/server.crt"),
TrustedCAFile: MustAbsPath("../fixtures/ca.crt"),

View File

@@ -52,7 +52,7 @@ func testCluster(t *testing.T, size int) {
func TestTLSClusterOf3(t *testing.T) {
BeforeTest(t)
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &TestTLSInfo})
c.Launch(t)
defer c.Terminate(t)
clusterMustProgress(t, c.Members)
@@ -111,7 +111,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
c := NewClusterByConfig(t,
&ClusterConfig{
Size: 3,
PeerTLS: &testTLSInfo,
PeerTLS: &TestTLSInfo,
DiscoveryURL: dc.URL(0) + "/v2/keys"},
)
c.Launch(t)
@@ -136,7 +136,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
func TestDoubleTLSClusterSizeOf3(t *testing.T) {
BeforeTest(t)
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &TestTLSInfo})
c.Launch(t)
defer c.Terminate(t)

View File

@@ -121,8 +121,8 @@ func TestAuthority(t *testing.T) {
func setupTLS(t *testing.T, useTLS bool, cfg ClusterConfig) (ClusterConfig, *tls.Config) {
t.Helper()
if useTLS {
cfg.ClientTLS = &testTLSInfo
tlsConfig, err := testTLSInfo.ClientConfig()
cfg.ClientTLS = &TestTLSInfo
tlsConfig, err := TestTLSInfo.ClientConfig()
if err != nil {
t.Fatal(err)
}

View File

@@ -1554,7 +1554,7 @@ func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
func TestTLSGRPCRejectInsecureClient(t *testing.T) {
BeforeTest(t)
cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
cfg := ClusterConfig{Size: 3, ClientTLS: &TestTLSInfo}
clus := newClusterV3NoClients(t, &cfg)
defer clus.Terminate(t)
@@ -1593,7 +1593,7 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) {
clus := newClusterV3NoClients(t, &cfg)
defer clus.Terminate(t)
clus.Members[0].ClientTLSInfo = &testTLSInfo
clus.Members[0].ClientTLSInfo = &TestTLSInfo
clus.Members[0].DialOptions = []grpc.DialOption{grpc.WithBlock()}
clus.Members[0].grpcURL = strings.Replace(clus.Members[0].grpcURL, "http://", "https://", 1)
client, err := NewClientV3(clus.Members[0])
@@ -1609,7 +1609,7 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) {
func TestTLSGRPCAcceptSecureAll(t *testing.T) {
BeforeTest(t)
cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
cfg := ClusterConfig{Size: 3, ClientTLS: &TestTLSInfo}
clus := newClusterV3NoClients(t, &cfg)
defer clus.Terminate(t)
@@ -1649,7 +1649,7 @@ func TestTLSReloadAtomicReplace(t *testing.T) {
defer os.RemoveAll(certsDirExp)
cloneFunc := func() transport.TLSInfo {
tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
tlsInfo, terr := copyTLSFiles(TestTLSInfo, certsDir)
if terr != nil {
t.Fatal(terr)
}
@@ -1695,7 +1695,7 @@ func TestTLSReloadCopy(t *testing.T) {
defer os.RemoveAll(certsDir)
cloneFunc := func() transport.TLSInfo {
tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
tlsInfo, terr := copyTLSFiles(TestTLSInfo, certsDir)
if terr != nil {
t.Fatal(terr)
}
@@ -1707,7 +1707,7 @@ func TestTLSReloadCopy(t *testing.T) {
}
}
revertFunc := func() {
if _, err = copyTLSFiles(testTLSInfo, certsDir); err != nil {
if _, err = copyTLSFiles(TestTLSInfo, certsDir); err != nil {
t.Fatal(err)
}
}

View File

@@ -41,7 +41,7 @@ func testTLSCipherSuites(t *testing.T, valid bool) {
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
}
srvTLS, cliTLS := testTLSInfo, testTLSInfo
srvTLS, cliTLS := TestTLSInfo, TestTLSInfo
if valid {
srvTLS.CipherSuites, cliTLS.CipherSuites = cipherSuites, cipherSuites
} else {
@@ -112,7 +112,7 @@ func TestTLSMinMaxVersion(t *testing.T) {
}
// Configure server to support TLS 1.3 only.
srvTLS := testTLSInfo
srvTLS := TestTLSInfo
srvTLS.MinVersion = tls.VersionTLS13
srvTLS.MaxVersion = tls.VersionTLS13
clus := NewClusterV3(t, &ClusterConfig{Size: 1, ClientTLS: &srvTLS})
@@ -120,7 +120,7 @@ func TestTLSMinMaxVersion(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cc, err := testTLSInfo.ClientConfig()
cc, err := TestTLSInfo.ClientConfig()
assert.NoError(t, err)
cc.MinVersion = tt.minVersion