diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index 8b11cf6b5..20e4362f4 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -133,6 +133,7 @@ type ctlCtx struct { envMap map[string]string dialTimeout time.Duration + testTimeout time.Duration quorum bool // if true, set up 3-node cluster and linearizable read interactive bool @@ -166,6 +167,10 @@ func withDialTimeout(timeout time.Duration) ctlOption { return func(cx *ctlCtx) { cx.dialTimeout = timeout } } +func withTestTimeout(timeout time.Duration) ctlOption { + return func(cx *ctlCtx) { cx.testTimeout = timeout } +} + func withQuorum() ctlOption { return func(cx *ctlCtx) { cx.quorum = true } } @@ -198,6 +203,14 @@ func withFlagByEnv() ctlOption { return func(cx *ctlCtx) { cx.envMap = make(map[string]string) } } +// This function must be called after the `withCfg`, otherwise its value +// may be overwritten by `withCfg`. +func withMaxConcurrentStreams(streams uint32) ctlOption { + return func(cx *ctlCtx) { + cx.cfg.MaxConcurrentStreams = streams + } +} + func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { testCtlWithOffline(t, testFunc, nil, opts...) } @@ -262,10 +275,8 @@ func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx t.Log("---testFunc logic DONE") }() - timeout := 2*cx.dialTimeout + time.Second - if cx.dialTimeout == 0 { - timeout = 30 * time.Second - } + timeout := cx.getTestTimeout() + select { case <-time.After(timeout): testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) @@ -282,6 +293,17 @@ func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx } } +func (cx *ctlCtx) getTestTimeout() time.Duration { + timeout := cx.testTimeout + if timeout == 0 { + timeout = 2*cx.dialTimeout + time.Second + if cx.dialTimeout == 0 { + timeout = 30 * time.Second + } + } + return timeout +} + func (cx *ctlCtx) prefixArgs(eps []string) []string { fmap := make(map[string]string) fmap["endpoints"] = strings.Join(eps, ",") diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go new file mode 100644 index 000000000..586306eda --- /dev/null +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -0,0 +1,218 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "encoding/json" + "fmt" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/client/pkg/v3/testutil" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +// NO TLS +func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Small(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +/* +// There are lots of "device not configured" errors. I suspect it's an issue +// of the project `github.com/creack/pty`. I manually executed the test case +// with 1000 concurrent streams, and confirmed it's working as expected. +// TODO(ahrtr): investigate the test failure in the future. +func TestV3Curl_MaxStreamsNoTLS_BelowLimit_Large(t *testing.T) { + f, err := setRLimit(10240) + if err != nil { + t.Fatal(err) + } + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(1000), withTestTimeout(200*time.Second)) + f() +} */ + +func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Small(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +// TLS +func TestV3Curl_MaxStreams_BelowLimit_TLS_Small(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_BelowLimit_TLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +func TestV3Curl_MaxStreams_ReachLimit_TLS_Small(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_ReachLimit_TLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { + e2e.BeforeTest(t) + + // Step 1: generate configuration for creating cluster + t.Log("Generating configuration for creating cluster.") + cx := getDefaultCtlCtx(t) + cx.applyOpts(opts) + // We must set the clusterSize to 1, otherwise different streams may + // connect to different members, accordingly it's difficult to test the + // behavior. + cx.cfg.ClusterSize = 1 + + // Step 2: create the cluster + t.Log("Creating an etcd cluster") + epc, err := e2e.NewEtcdProcessCluster(t, &cx.cfg) + if err != nil { + t.Fatalf("Failed to start etcd cluster: %v", err) + } + cx.epc = epc + cx.dataDir = epc.Procs[0].Config().DataDirPath + + // Step 3: run test + // (a) generate ${concurrentNumber} concurrent watch streams; + // (b) submit a range request. + var wg sync.WaitGroup + concurrentNumber := cx.cfg.MaxConcurrentStreams - 1 + expectedResponse := `"revision":"` + if reachLimit { + concurrentNumber = cx.cfg.MaxConcurrentStreams + expectedResponse = "Operation timed out" + } + wg.Add(int(concurrentNumber)) + t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expectedResponse: %s\n", + cx.cfg.MaxConcurrentStreams, concurrentNumber, expectedResponse) + errCh := make(chan error, concurrentNumber) + submitConcurrentWatch(cx, int(concurrentNumber), &wg, errCh) + submitRangeAfterConcurrentWatch(cx, expectedResponse) + + // Step 4: check the watch errors. Note that we ony check the watch error + // before closing cluster. Once we closed the cluster, the watch must run + // into error, and we should ignore them by then. + t.Log("Checking watch error.") + select { + case werr := <-errCh: + t.Fatal(werr) + default: + } + + // Step 5: Close the cluster + t.Log("Closing test cluster...") + assert.NoError(t, epc.Close()) + t.Log("Closed test cluster") + + // Step 6: Waiting all watch goroutines to exit. + donec := make(chan struct{}) + go func() { + defer close(donec) + wg.Wait() + }() + + timeout := cx.getTestTimeout() + t.Logf("Waiting test case to finish, timeout: %s", timeout) + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + t.Log("All watch goroutines exited.") + } + + t.Log("testV3CurlMaxStream done!") +} + +func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh chan<- error) { + watchData, err := json.Marshal(&pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo"), + }, + }) + if err != nil { + cx.t.Fatal(err) + } + + var wgSchedule sync.WaitGroup + wgSchedule.Add(number) + for i := 0; i < number; i++ { + go func(i int) { + wgSchedule.Done() + defer wgDone.Done() + if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData), Expected: `"revision":"`}); err != nil { + werr := fmt.Errorf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) + cx.t.Error(werr) + errCh <- werr + } + }(i) + } + // make sure all goroutines have already been scheduled. + wgSchedule.Wait() + // We need to make sure all watch streams have already been created. + // For simplicity, we just sleep 3 second. We may consider improving + // it in the future. + time.Sleep(3 * time.Second) +} + +func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) { + rangeData, err := json.Marshal(&pb.RangeRequest{ + Key: []byte("foo"), + }) + if err != nil { + cx.t.Fatal(err) + } + + cx.t.Log("Submitting range request...") + if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/kv/range", Value: string(rangeData), Expected: expectedValue, Timeout: 5}); err != nil { + cx.t.Fatalf("testV3CurlMaxStream get failed, error: %v", err) + } + cx.t.Log("range request done") +} + +// setRLimit sets the open file limitation, and return a function which +// is used to reset the limitation. +func setRLimit(nofile uint64) (func() error, error) { + var rLimit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + return nil, fmt.Errorf("failed to get open file limit, error: %v", err) + } + + var wLimit syscall.Rlimit + wLimit.Max = nofile + wLimit.Cur = nofile + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &wLimit); err != nil { + return nil, fmt.Errorf("failed to set max open file limit, %v", err) + } + + return func() error { + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + return fmt.Errorf("failed reset max open file limit, %v", err) + } + return nil + }, nil +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 4b1daf93d..fece5f5b0 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -176,6 +176,8 @@ type EtcdProcessClusterConfig struct { DiscoveryEndpoints []string // v3 discovery DiscoveryToken string LogLevel string + + MaxConcurrentStreams uint32 // default is math.MaxUint32 } // NewEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -341,6 +343,10 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* args = append(args, "--log-level", cfg.LogLevel) } + if cfg.MaxConcurrentStreams != 0 { + args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) + } + etcdCfgs[i] = &EtcdServerProcessConfig{ lg: lg, ExecPath: cfg.ExecPath,