diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go index 3ead9530b..5dab96491 100644 --- a/tests/e2e/v3_curl_maxstream_test.go +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "math/rand" "sync" "syscall" "testing" @@ -38,19 +39,14 @@ func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Medium(t *testing.T) { testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) } -/* -// There are lots of "device not configured" errors. I suspect it's an issue -// of the project `github.com/creack/pty`. I manually executed the test case -// with 1000 concurrent streams, and confirmed it's working as expected. -// TODO(ahrtr): investigate the test failure in the future. func TestV3Curl_MaxStreamsNoTLS_BelowLimit_Large(t *testing.T) { f, err := setRLimit(10240) if err != nil { t.Fatal(err) } + defer f() testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(1000), withTestTimeout(200*time.Second)) - f() -} */ +} func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Small(t *testing.T) { testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3)) @@ -109,31 +105,23 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { expectedResponse = "Operation timed out" } wg.Add(int(concurrentNumber)) - t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expectedResponse: %s\n", + t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expected range's response: %s\n", cx.cfg.MaxConcurrentStreams, concurrentNumber, expectedResponse) - errCh := make(chan error, concurrentNumber) - submitConcurrentWatch(cx, int(concurrentNumber), &wg, errCh) + + closeServerCh := make(chan struct{}) + submitConcurrentWatch(cx, int(concurrentNumber), &wg, closeServerCh) submitRangeAfterConcurrentWatch(cx, expectedResponse) - // Step 4: check the watch errors. Note that we only check the watch error - // before closing cluster. Once we close the cluster, the watch must run - // into error, and we should ignore them by then. - t.Log("Checking watch error.") - select { - case werr := <-errCh: - t.Fatal(werr) - default: - } - - // Step 5: Close the cluster + // Step 4: Close the cluster t.Log("Closing test cluster...") + close(closeServerCh) assert.NoError(t, epc.Close()) t.Log("Closed test cluster") - // Step 6: Waiting all watch goroutines to exit. - donec := make(chan struct{}) + // Step 5: Waiting all watch goroutines to exit. + doneCh := make(chan struct{}) go func() { - defer close(donec) + defer close(doneCh) wg.Wait() }() @@ -142,14 +130,14 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { select { case <-time.After(timeout): testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) - case <-donec: + case <-doneCh: t.Log("All watch goroutines exited.") } t.Log("testV3CurlMaxStream done!") } -func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh chan<- error) { +func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, closeCh chan struct{}) { watchData, err := json.Marshal(&pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ Key: []byte("foo"), @@ -160,24 +148,54 @@ func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh } var wgSchedule sync.WaitGroup + + createWatchConnection := func() error { + cluster := cx.epc + member := cluster.Procs[rand.Intn(cluster.Cfg.ClusterSize)] + curlReq := e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData)} + + args := e2e.CURLPrefixArgs(cluster.Cfg, member, "POST", curlReq) + proc, err := e2e.SpawnCmd(args, nil) + if err != nil { + return fmt.Errorf("failed to spawn: %w", err) + } + defer proc.Stop() + + // make sure that watch request has been created + expectedLine := `"created":true}}` + _, lerr := proc.ExpectWithContext(context.TODO(), expectedLine) + if lerr != nil { + return fmt.Errorf("%v %v (expected %q). Try EXPECT_DEBUG=TRUE", args, lerr, expectedLine) + } + + wgSchedule.Done() + + // hold the connection and wait for server shutdown + perr := proc.Close() + + // curl process will return + select { + case <-closeCh: + default: + // perr could be nil. + return fmt.Errorf("unexpected connection close before server closes: %v", perr) + } + return nil + } + wgSchedule.Add(number) for i := 0; i < number; i++ { go func(i int) { - wgSchedule.Done() defer wgDone.Done() - if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData), Expected: `"revision":"`}); err != nil { - werr := fmt.Errorf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) - cx.t.Error(werr) - errCh <- werr + + if err := createWatchConnection(); err != nil { + cx.t.Fatalf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) } + }(i) } // make sure all goroutines have already been scheduled. wgSchedule.Wait() - // We need to make sure all watch streams have already been created. - // For simplicity, we just sleep 3 second. We may consider improving - // it in the future. - time.Sleep(3 * time.Second) } func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) {