Merge pull request #15479 from serathius/test-cmux

Test connection multiplexing.
This commit is contained in:
Marek Siarkowicz 2023-03-16 09:16:38 +01:00 committed by GitHub
commit 3717448887
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 349 additions and 112 deletions

View File

@ -656,6 +656,15 @@
}
]
},
{
"project": "golang.org/x/sync/errgroup",
"licenses": [
{
"type": "BSD 3-clause \"New\" or \"Revised\" License",
"confidence": 0.9663865546218487
}
]
},
{
"project": "golang.org/x/sys/unix",
"licenses": [

View File

@ -232,7 +232,7 @@ func compareMemberVersion(expect version.Versions, target version.Versions) erro
}
func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) (version.Versions, error) {
args := e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"})
args := e2e.CURLPrefixArgsCluster(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"})
lines, err := e2e.RunUtilCompletion(args, nil)
if err != nil {
return version.Versions{}, err

203
tests/e2e/cmux_test.go Normal file
View File

@ -0,0 +1,203 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// These tests are directly validating etcd connection multiplexing.
//go:build !cluster_proxy
package e2e
import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
func TestConnectionMultiplexing(t *testing.T) {
e2e.BeforeTest(t)
for _, tc := range []struct {
name string
serverTLS e2e.ClientConnType
}{
{
name: "ServerTLS",
serverTLS: e2e.ClientTLS,
},
{
name: "ServerNonTLS",
serverTLS: e2e.ClientNonTLS,
},
{
name: "ServerTLSAndNonTLS",
serverTLS: e2e.ClientTLSAndNonTLS,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
cfg := e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: tc.serverTLS}}
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&cfg))
require.NoError(t, err)
defer clus.Close()
var clientScenarios []e2e.ClientConnType
switch tc.serverTLS {
case e2e.ClientTLS:
clientScenarios = []e2e.ClientConnType{e2e.ClientTLS}
case e2e.ClientNonTLS:
clientScenarios = []e2e.ClientConnType{e2e.ClientNonTLS}
case e2e.ClientTLSAndNonTLS:
clientScenarios = []e2e.ClientConnType{e2e.ClientTLS, e2e.ClientNonTLS}
}
for _, clientTLS := range clientScenarios {
name := "ClientNonTLS"
if clientTLS == e2e.ClientTLS {
name = "ClientTLS"
}
t.Run(name, func(t *testing.T) {
testConnectionMultiplexing(t, ctx, clus.EndpointsV3()[0], clientTLS)
})
}
})
}
}
func testConnectionMultiplexing(t *testing.T, ctx context.Context, endpoint string, connType e2e.ClientConnType) {
switch connType {
case e2e.ClientTLS:
endpoint = e2e.ToTLS(endpoint)
case e2e.ClientNonTLS:
default:
panic(fmt.Sprintf("Unsupported conn type %v", connType))
}
t.Run("etcdctl", func(t *testing.T) {
etcdctl, err := e2e.NewEtcdctl(e2e.ClientConfig{ConnectionType: connType}, []string{endpoint})
require.NoError(t, err)
_, err = etcdctl.Get(ctx, "a", config.GetOptions{})
assert.NoError(t, err)
})
t.Run("clientv3", func(t *testing.T) {
c := newClient(t, []string{endpoint}, e2e.ClientConfig{ConnectionType: connType})
_, err := c.Get(ctx, "a")
assert.NoError(t, err)
})
t.Run("curl", func(t *testing.T) {
for _, httpVersion := range []string{"2", "1.1", "1.0", ""} {
tname := "http" + httpVersion
if httpVersion == "" {
tname = "default"
}
t.Run(tname, func(t *testing.T) {
assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(endpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(endpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType))
})
}
})
}
func fetchGrpcGateway(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
rangeData, err := json.Marshal(&pb.RangeRequest{
Key: []byte("a"),
})
if err != nil {
return err
}
req := e2e.CURLReq{Endpoint: "/v3/kv/range", Value: string(rangeData), Timeout: 5, HttpVersion: httpVersion}
respData, err := curl(endpoint, "POST", req, connType)
return validateGrpcgatewayRangeReponse([]byte(respData))
}
func validateGrpcgatewayRangeReponse(respData []byte) error {
// Modified json annotation so ResponseHeader fields are stored in string.
type responseHeader struct {
ClusterId uint64 `json:"cluster_id,string,omitempty"`
MemberId uint64 `json:"member_id,string,omitempty"`
Revision int64 `json:"revision,string,omitempty"`
RaftTerm uint64 `json:"raft_term,string,omitempty"`
}
type rangeResponse struct {
Header *responseHeader `json:"header,omitempty"`
Kvs []*mvccpb.KeyValue `json:"kvs,omitempty"`
More bool `json:"more,omitempty"`
Count int64 `json:"count,omitempty"`
}
var resp rangeResponse
return json.Unmarshal(respData, &resp)
}
func fetchMetrics(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
req := e2e.CURLReq{Endpoint: "/metrics", Timeout: 5, HttpVersion: httpVersion}
respData, err := curl(endpoint, "GET", req, connType)
if err != nil {
return err
}
var parser expfmt.TextParser
_, err = parser.TextToMetricFamilies(strings.NewReader(strings.ReplaceAll(respData, "\r\n", "\n")))
return err
}
func fetchVersion(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
req := e2e.CURLReq{Endpoint: "/version", Timeout: 5, HttpVersion: httpVersion}
respData, err := curl(endpoint, "GET", req, connType)
if err != nil {
return err
}
var resp version.Versions
return json.Unmarshal([]byte(respData), &resp)
}
func fetchHealth(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
req := e2e.CURLReq{Endpoint: "/health", Timeout: 5, HttpVersion: httpVersion}
respData, err := curl(endpoint, "GET", req, connType)
if err != nil {
return err
}
var resp etcdhttp.Health
return json.Unmarshal([]byte(respData), &resp)
}
func fetchDebugVars(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
req := e2e.CURLReq{Endpoint: "/debug/vars", Timeout: 5, HttpVersion: httpVersion}
respData, err := curl(endpoint, "GET", req, connType)
if err != nil {
return err
}
var resp map[string]interface{}
return json.Unmarshal([]byte(respData), &resp)
}
func curl(endpoint string, method string, curlReq e2e.CURLReq, connType e2e.ClientConnType) (string, error) {
args := e2e.CURLPrefixArgs(endpoint, e2e.ClientConfig{ConnectionType: connType}, false, method, curlReq)
lines, err := e2e.RunUtilCompletion(args, nil)
if err != nil {
return "", err
}
return strings.Join(lines, "\n"), nil
}

View File

@ -63,7 +63,7 @@ func metricsTest(cx ctlCtx) {
if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
cx.t.Fatal(err)
}
if err := e2e.CURLGet(cx.epc, e2e.CURLReq{Endpoint: test.endpoint, Expected: test.expected, MetricsURLScheme: cx.cfg.MetricsURLScheme}); err != nil {
if err := e2e.CURLGet(cx.epc, e2e.CURLReq{Endpoint: test.endpoint, Expected: test.expected}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
}

97
tests/e2e/utils.go Normal file
View File

@ -0,0 +1,97 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"fmt"
"testing"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/integration"
)
func newClient(t *testing.T, entpoints []string, cfg e2e.ClientConfig) *clientv3.Client {
tlscfg, err := tlsInfo(t, cfg)
if err != nil {
t.Fatal(err)
}
ccfg := clientv3.Config{
Endpoints: entpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
if tlscfg != nil {
tls, err := tlscfg.ClientConfig()
if err != nil {
t.Fatal(err)
}
ccfg.TLS = tls
}
c, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
c.Close()
})
return c
}
func tlsInfo(t testing.TB, cfg e2e.ClientConfig) (*transport.TLSInfo, error) {
switch cfg.ConnectionType {
case e2e.ClientNonTLS, e2e.ClientTLSAndNonTLS:
return nil, nil
case e2e.ClientTLS:
if cfg.AutoTLS {
tls, err := transport.SelfCert(zap.NewNop(), t.TempDir(), []string{"localhost"}, 1)
if err != nil {
return nil, fmt.Errorf("failed to generate cert: %s", err)
}
return &tls, nil
} else {
return &integration.TestTLSInfo, nil
}
default:
return nil, fmt.Errorf("config %v not supported", cfg)
}
}
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error {
g := errgroup.Group{}
concurrency := 10
keysPerRoutine := keyCount / concurrency
for i := 0; i < concurrency; i++ {
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize))
if err != nil {
return err
}
}
return nil
})
}
return g.Wait()
}

View File

@ -48,10 +48,9 @@ func testV3CurlCipherSuites(t *testing.T, valid bool) {
func cipherSuiteTestValid(cx ctlCtx) {
if err := e2e.CURLGet(cx.epc, e2e.CURLReq{
Endpoint: "/metrics",
Expected: fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version),
MetricsURLScheme: cx.cfg.MetricsURLScheme,
Ciphers: "ECDHE-RSA-AES128-GCM-SHA256", // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
Endpoint: "/metrics",
Expected: fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version),
Ciphers: "ECDHE-RSA-AES128-GCM-SHA256", // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
}); err != nil {
require.ErrorContains(cx.t, err, fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version))
}
@ -59,10 +58,9 @@ func cipherSuiteTestValid(cx ctlCtx) {
func cipherSuiteTestMismatch(cx ctlCtx) {
err := e2e.CURLGet(cx.epc, e2e.CURLReq{
Endpoint: "/metrics",
Expected: "failed setting cipher list",
MetricsURLScheme: cx.cfg.MetricsURLScheme,
Ciphers: "ECDHE-RSA-DES-CBC3-SHA", // TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Endpoint: "/metrics",
Expected: "failed setting cipher list",
Ciphers: "ECDHE-RSA-DES-CBC3-SHA", // TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
})
require.ErrorContains(cx.t, err, "curl: (59) failed setting cipher list")
}

View File

@ -157,7 +157,7 @@ func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, closeC
member := cluster.Procs[rand.Intn(cluster.Cfg.ClusterSize)]
curlReq := e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData)}
args := e2e.CURLPrefixArgs(cluster.Cfg, member, "POST", curlReq)
args := e2e.CURLPrefixArgsCluster(cluster.Cfg, member, "POST", curlReq)
proc, err := e2e.SpawnCmd(args, nil)
if err != nil {
return fmt.Errorf("failed to spawn: %w", err)

View File

@ -246,7 +246,7 @@ func testV3CurlAuth(cx ctlCtx) {
lineFunc = func(txt string) bool { return true }
)
cmdArgs = e2e.CURLPrefixArgs(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{Endpoint: path.Join(p, "/auth/authenticate"), Value: string(authreq)})
cmdArgs = e2e.CURLPrefixArgsCluster(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{Endpoint: path.Join(p, "/auth/authenticate"), Value: string(authreq)})
proc, err := e2e.SpawnCmd(cmdArgs, cx.envMap)
testutil.AssertNil(cx.t, err)
defer proc.Close()
@ -285,7 +285,7 @@ func testV3CurlCampaign(cx ctlCtx) {
if err != nil {
cx.t.Fatal(err)
}
cargs := e2e.CURLPrefixArgs(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{
cargs := e2e.CURLPrefixArgsCluster(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{
Endpoint: path.Join(cx.apiPrefix, "/election/campaign"),
Value: string(cdata),
})

View File

@ -26,15 +26,10 @@ import (
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/integration"
)
const (
@ -75,7 +70,7 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) {
clus, err := e2e.NewEtcdProcessCluster(context.Background(), t, e2e.WithConfig(&tc.config))
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus, tc.config)
c := newClient(t, clus.EndpointsV3(), tc.config.Client)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
@ -95,7 +90,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) {
clus, err := e2e.NewEtcdProcessCluster(context.Background(), t, e2e.WithConfig(&tc.config))
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus, tc.config)
c := newClient(t, clus.EndpointsV3(), tc.config.Client)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
@ -128,7 +123,7 @@ func TestWatchDelayForEvent(t *testing.T) {
clus, err := e2e.NewEtcdProcessCluster(context.Background(), t, e2e.WithConfig(&tc.config))
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus, tc.config)
c := newClient(t, clus.EndpointsV3(), tc.config.Client)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
@ -180,25 +175,6 @@ func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) {
}
}
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error {
g := errgroup.Group{}
concurrency := 10
keysPerRoutine := keyCount / concurrency
for i := 0; i < concurrency; i++ {
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize))
if err != nil {
return err
}
}
return nil
})
}
return g.Wait()
}
func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Group, c *clientv3.Client) {
mux := sync.RWMutex{}
size := 0
@ -235,49 +211,3 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr
return nil
})
}
func newClient(t *testing.T, clus *e2e.EtcdProcessCluster, cfg e2e.EtcdProcessClusterConfig) *clientv3.Client {
tlscfg, err := tlsInfo(t, cfg.Client)
if err != nil {
t.Fatal(err)
}
ccfg := clientv3.Config{
Endpoints: clus.EndpointsV3(),
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
if tlscfg != nil {
tls, err := tlscfg.ClientConfig()
if err != nil {
t.Fatal(err)
}
ccfg.TLS = tls
}
c, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
c.Close()
})
return c
}
func tlsInfo(t testing.TB, cfg e2e.ClientConfig) (*transport.TLSInfo, error) {
switch cfg.ConnectionType {
case e2e.ClientNonTLS, e2e.ClientTLSAndNonTLS:
return nil, nil
case e2e.ClientTLS:
if cfg.AutoTLS {
tls, err := transport.SelfCert(zap.NewNop(), t.TempDir(), []string{"localhost"}, 1)
if err != nil {
return nil, fmt.Errorf("failed to generate cert: %s", err)
}
return &tls, nil
} else {
return &integration.TestTLSInfo, nil
}
default:
return nil, fmt.Errorf("config %v not supported", cfg)
}
}

View File

@ -35,9 +35,8 @@ type CURLReq struct {
Expected string
Header string
MetricsURLScheme string
Ciphers string
Ciphers string
HttpVersion string
}
func (r CURLReq) timeoutDuration() time.Duration {
@ -49,32 +48,33 @@ func (r CURLReq) timeoutDuration() time.Duration {
return 5 * time.Second
}
// CURLPrefixArgs builds the beginning of a curl command for a given key
// CURLPrefixArgsCluster builds the beginning of a curl command for a given key
// addressed to a random URL in the given cluster.
func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method string, req CURLReq) []string {
func CURLPrefixArgsCluster(cfg *EtcdProcessClusterConfig, member EtcdProcess, method string, req CURLReq) []string {
return CURLPrefixArgs(member.Config().ClientURL, cfg.Client, cfg.CN, method, req)
}
func CURLPrefixArgs(clientURL string, cfg ClientConfig, CN bool, method string, req CURLReq) []string {
var (
cmdArgs = []string{"curl"}
acurl = member.Config().ClientURL
)
if req.MetricsURLScheme != "https" {
if req.IsTLS {
if cfg.Client.ConnectionType != ClientTLSAndNonTLS {
panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
}
if req.HttpVersion != "" {
cmdArgs = append(cmdArgs, "--http"+req.HttpVersion)
}
if req.IsTLS {
if cfg.ConnectionType != ClientTLSAndNonTLS {
panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
}
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
clientURL = ToTLS(clientURL)
} else if cfg.ConnectionType == ClientTLS {
if CN {
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
acurl = ToTLS(member.Config().ClientURL)
} else if cfg.Client.ConnectionType == ClientTLS {
if cfg.CN {
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
} else {
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath3, "--key", PrivateKeyPath3)
}
} else {
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath3, "--key", PrivateKeyPath3)
}
}
if req.MetricsURLScheme != "" {
acurl = member.EndpointsMetrics()[0]
}
ep := acurl + req.Endpoint
ep := clientURL + req.Endpoint
if req.Username != "" || req.Password != "" {
cmdArgs = append(cmdArgs, "-L", "-u", fmt.Sprintf("%s:%s", req.Username, req.Password), ep)
@ -107,18 +107,18 @@ func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method st
func CURLPost(clus *EtcdProcessCluster, req CURLReq) error {
ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration())
defer cancel()
return SpawnWithExpectsContext(ctx, CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "POST", req), nil, req.Expected)
return SpawnWithExpectsContext(ctx, CURLPrefixArgsCluster(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "POST", req), nil, req.Expected)
}
func CURLPut(clus *EtcdProcessCluster, req CURLReq) error {
ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration())
defer cancel()
return SpawnWithExpectsContext(ctx, CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "PUT", req), nil, req.Expected)
return SpawnWithExpectsContext(ctx, CURLPrefixArgsCluster(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "PUT", req), nil, req.Expected)
}
func CURLGet(clus *EtcdProcessCluster, req CURLReq) error {
ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration())
defer cancel()
return SpawnWithExpectsContext(ctx, CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "GET", req), nil, req.Expected)
return SpawnWithExpectsContext(ctx, CURLPrefixArgsCluster(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "GET", req), nil, req.Expected)
}

View File

@ -21,6 +21,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/soheilhy/cmux v0.1.5
github.com/stretchr/testify v1.8.2
go.etcd.io/etcd/api/v3 v3.6.0-alpha.0
@ -75,7 +76,6 @@ require (
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect