diff --git a/bill-of-materials.json b/bill-of-materials.json index 9638e8e54..bf9574041 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -656,6 +656,15 @@ } ] }, + { + "project": "golang.org/x/sync/errgroup", + "licenses": [ + { + "type": "BSD 3-clause \"New\" or \"Revised\" License", + "confidence": 0.9663865546218487 + } + ] + }, { "project": "golang.org/x/sys/unix", "licenses": [ diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go new file mode 100644 index 000000000..51512d57e --- /dev/null +++ b/tests/e2e/cmux_test.go @@ -0,0 +1,203 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// These tests are directly validating etcd connection multiplexing. +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/prometheus/common/expfmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp" + "go.etcd.io/etcd/tests/v3/framework/config" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestConnectionMultiplexing(t *testing.T) { + e2e.BeforeTest(t) + for _, tc := range []struct { + name string + serverTLS e2e.ClientConnType + }{ + { + name: "ServerTLS", + serverTLS: e2e.ClientTLS, + }, + { + name: "ServerNonTLS", + serverTLS: e2e.ClientNonTLS, + }, + { + name: "ServerTLSAndNonTLS", + serverTLS: e2e.ClientTLSAndNonTLS, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + cfg := e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: tc.serverTLS}} + clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&cfg)) + require.NoError(t, err) + defer clus.Close() + + var clientScenarios []e2e.ClientConnType + switch tc.serverTLS { + case e2e.ClientTLS: + clientScenarios = []e2e.ClientConnType{e2e.ClientTLS} + case e2e.ClientNonTLS: + clientScenarios = []e2e.ClientConnType{e2e.ClientNonTLS} + case e2e.ClientTLSAndNonTLS: + clientScenarios = []e2e.ClientConnType{e2e.ClientTLS, e2e.ClientNonTLS} + } + + for _, clientTLS := range clientScenarios { + name := "ClientNonTLS" + if clientTLS == e2e.ClientTLS { + name = "ClientTLS" + } + t.Run(name, func(t *testing.T) { + testConnectionMultiplexing(t, ctx, clus.EndpointsV3()[0], clientTLS) + }) + } + }) + } + +} + +func testConnectionMultiplexing(t *testing.T, ctx context.Context, endpoint string, connType e2e.ClientConnType) { + switch connType { + case e2e.ClientTLS: + endpoint = e2e.ToTLS(endpoint) + case e2e.ClientNonTLS: + default: + panic(fmt.Sprintf("Unsupported conn type %v", connType)) + } + t.Run("etcdctl", func(t *testing.T) { + etcdctl, err := e2e.NewEtcdctl(e2e.ClientConfig{ConnectionType: connType}, []string{endpoint}) + require.NoError(t, err) + _, err = etcdctl.Get(ctx, "a", config.GetOptions{}) + assert.NoError(t, err) + }) + t.Run("clientv3", func(t *testing.T) { + c := newClient(t, []string{endpoint}, e2e.ClientConfig{ConnectionType: connType}) + _, err := c.Get(ctx, "a") + assert.NoError(t, err) + }) + t.Run("curl", func(t *testing.T) { + for _, httpVersion := range []string{"2", "1.1", "1.0", ""} { + tname := "http" + httpVersion + if httpVersion == "" { + tname = "default" + } + t.Run(tname, func(t *testing.T) { + assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType)) + assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType)) + assert.NoError(t, fetchVersion(endpoint, httpVersion, connType)) + assert.NoError(t, fetchHealth(endpoint, httpVersion, connType)) + assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType)) + }) + } + }) +} + +func fetchGrpcGateway(endpoint string, httpVersion string, connType e2e.ClientConnType) error { + rangeData, err := json.Marshal(&pb.RangeRequest{ + Key: []byte("a"), + }) + if err != nil { + return err + } + req := e2e.CURLReq{Endpoint: "/v3/kv/range", Value: string(rangeData), Timeout: 5, HttpVersion: httpVersion} + respData, err := curl(endpoint, "POST", req, connType) + return validateGrpcgatewayRangeReponse([]byte(respData)) +} + +func validateGrpcgatewayRangeReponse(respData []byte) error { + // Modified json annotation so ResponseHeader fields are stored in string. + type responseHeader struct { + ClusterId uint64 `json:"cluster_id,string,omitempty"` + MemberId uint64 `json:"member_id,string,omitempty"` + Revision int64 `json:"revision,string,omitempty"` + RaftTerm uint64 `json:"raft_term,string,omitempty"` + } + type rangeResponse struct { + Header *responseHeader `json:"header,omitempty"` + Kvs []*mvccpb.KeyValue `json:"kvs,omitempty"` + More bool `json:"more,omitempty"` + Count int64 `json:"count,omitempty"` + } + var resp rangeResponse + return json.Unmarshal(respData, &resp) +} + +func fetchMetrics(endpoint string, httpVersion string, connType e2e.ClientConnType) error { + req := e2e.CURLReq{Endpoint: "/metrics", Timeout: 5, HttpVersion: httpVersion} + respData, err := curl(endpoint, "GET", req, connType) + if err != nil { + return err + } + var parser expfmt.TextParser + _, err = parser.TextToMetricFamilies(strings.NewReader(strings.ReplaceAll(respData, "\r\n", "\n"))) + return err +} + +func fetchVersion(endpoint string, httpVersion string, connType e2e.ClientConnType) error { + req := e2e.CURLReq{Endpoint: "/version", Timeout: 5, HttpVersion: httpVersion} + respData, err := curl(endpoint, "GET", req, connType) + if err != nil { + return err + } + var resp version.Versions + return json.Unmarshal([]byte(respData), &resp) +} + +func fetchHealth(endpoint string, httpVersion string, connType e2e.ClientConnType) error { + req := e2e.CURLReq{Endpoint: "/health", Timeout: 5, HttpVersion: httpVersion} + respData, err := curl(endpoint, "GET", req, connType) + if err != nil { + return err + } + var resp etcdhttp.Health + return json.Unmarshal([]byte(respData), &resp) +} + +func fetchDebugVars(endpoint string, httpVersion string, connType e2e.ClientConnType) error { + req := e2e.CURLReq{Endpoint: "/debug/vars", Timeout: 5, HttpVersion: httpVersion} + respData, err := curl(endpoint, "GET", req, connType) + if err != nil { + return err + } + var resp map[string]interface{} + return json.Unmarshal([]byte(respData), &resp) +} + +func curl(endpoint string, method string, curlReq e2e.CURLReq, connType e2e.ClientConnType) (string, error) { + args := e2e.CURLPrefixArgs(endpoint, e2e.ClientConfig{ConnectionType: connType}, false, method, curlReq) + lines, err := e2e.RunUtilCompletion(args, nil) + if err != nil { + return "", err + } + return strings.Join(lines, "\n"), nil +} diff --git a/tests/go.mod b/tests/go.mod index 6897b664d..23e29292a 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -21,6 +21,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/prometheus/client_golang v1.14.0 + github.com/prometheus/common v0.37.0 github.com/soheilhy/cmux v0.1.5 github.com/stretchr/testify v1.8.2 go.etcd.io/etcd/api/v3 v3.6.0-alpha.0 @@ -75,7 +76,6 @@ require ( github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/sirupsen/logrus v1.8.1 // indirect