From ec847337d72762ee26d68fef141e1b876fc5a57b Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Sat, 17 Sep 2022 10:03:27 +0800 Subject: [PATCH 1/2] e2e: make maxstream test stable submitConcurrentWatch use sleep 3s to wait for all the watch connections ready. When the number of connections increases, like 1000, the 3s is not enough and the test case becomes flaky. In this commit, spawn curl process and check the ouput line with `created":true}}` to make sure that the connection has been initialized and ready to receive the events. It is reliable to test the following range request. Signed-off-by: Wei Fu --- tests/e2e/v3_curl_maxstream_test.go | 88 +++++++++++++++++------------ 1 file changed, 53 insertions(+), 35 deletions(-) diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go index 3ead9530b..5dab96491 100644 --- a/tests/e2e/v3_curl_maxstream_test.go +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "math/rand" "sync" "syscall" "testing" @@ -38,19 +39,14 @@ func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Medium(t *testing.T) { testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) } -/* -// There are lots of "device not configured" errors. I suspect it's an issue -// of the project `github.com/creack/pty`. I manually executed the test case -// with 1000 concurrent streams, and confirmed it's working as expected. -// TODO(ahrtr): investigate the test failure in the future. func TestV3Curl_MaxStreamsNoTLS_BelowLimit_Large(t *testing.T) { f, err := setRLimit(10240) if err != nil { t.Fatal(err) } + defer f() testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(1000), withTestTimeout(200*time.Second)) - f() -} */ +} func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Small(t *testing.T) { testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3)) @@ -109,31 +105,23 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { expectedResponse = "Operation timed out" } wg.Add(int(concurrentNumber)) - t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expectedResponse: %s\n", + t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expected range's response: %s\n", cx.cfg.MaxConcurrentStreams, concurrentNumber, expectedResponse) - errCh := make(chan error, concurrentNumber) - submitConcurrentWatch(cx, int(concurrentNumber), &wg, errCh) + + closeServerCh := make(chan struct{}) + submitConcurrentWatch(cx, int(concurrentNumber), &wg, closeServerCh) submitRangeAfterConcurrentWatch(cx, expectedResponse) - // Step 4: check the watch errors. Note that we only check the watch error - // before closing cluster. Once we close the cluster, the watch must run - // into error, and we should ignore them by then. - t.Log("Checking watch error.") - select { - case werr := <-errCh: - t.Fatal(werr) - default: - } - - // Step 5: Close the cluster + // Step 4: Close the cluster t.Log("Closing test cluster...") + close(closeServerCh) assert.NoError(t, epc.Close()) t.Log("Closed test cluster") - // Step 6: Waiting all watch goroutines to exit. - donec := make(chan struct{}) + // Step 5: Waiting all watch goroutines to exit. + doneCh := make(chan struct{}) go func() { - defer close(donec) + defer close(doneCh) wg.Wait() }() @@ -142,14 +130,14 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { select { case <-time.After(timeout): testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) - case <-donec: + case <-doneCh: t.Log("All watch goroutines exited.") } t.Log("testV3CurlMaxStream done!") } -func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh chan<- error) { +func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, closeCh chan struct{}) { watchData, err := json.Marshal(&pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ Key: []byte("foo"), @@ -160,24 +148,54 @@ func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh } var wgSchedule sync.WaitGroup + + createWatchConnection := func() error { + cluster := cx.epc + member := cluster.Procs[rand.Intn(cluster.Cfg.ClusterSize)] + curlReq := e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData)} + + args := e2e.CURLPrefixArgs(cluster.Cfg, member, "POST", curlReq) + proc, err := e2e.SpawnCmd(args, nil) + if err != nil { + return fmt.Errorf("failed to spawn: %w", err) + } + defer proc.Stop() + + // make sure that watch request has been created + expectedLine := `"created":true}}` + _, lerr := proc.ExpectWithContext(context.TODO(), expectedLine) + if lerr != nil { + return fmt.Errorf("%v %v (expected %q). Try EXPECT_DEBUG=TRUE", args, lerr, expectedLine) + } + + wgSchedule.Done() + + // hold the connection and wait for server shutdown + perr := proc.Close() + + // curl process will return + select { + case <-closeCh: + default: + // perr could be nil. + return fmt.Errorf("unexpected connection close before server closes: %v", perr) + } + return nil + } + wgSchedule.Add(number) for i := 0; i < number; i++ { go func(i int) { - wgSchedule.Done() defer wgDone.Done() - if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData), Expected: `"revision":"`}); err != nil { - werr := fmt.Errorf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) - cx.t.Error(werr) - errCh <- werr + + if err := createWatchConnection(); err != nil { + cx.t.Fatalf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) } + }(i) } // make sure all goroutines have already been scheduled. wgSchedule.Wait() - // We need to make sure all watch streams have already been created. - // For simplicity, we just sleep 3 second. We may consider improving - // it in the future. - time.Sleep(3 * time.Second) } func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) { From 16884373b983a325ea4d30f537063bbe5fb50689 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Tue, 20 Sep 2022 11:03:51 +0800 Subject: [PATCH 2/2] e2e: submitConcurrentWatch runs with ExecuteWithTimeout Use testutils.ExecuteWithTimeout to make sure that the all active streams are ready in time. Signed-off-by: Wei Fu --- tests/e2e/v3_curl_maxstream_test.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go index 5dab96491..651b98dc4 100644 --- a/tests/e2e/v3_curl_maxstream_test.go +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -28,6 +28,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/framework/testutils" ) // NO TLS @@ -183,19 +184,22 @@ func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, closeC return nil } - wgSchedule.Add(number) - for i := 0; i < number; i++ { - go func(i int) { - defer wgDone.Done() + testutils.ExecuteWithTimeout(cx.t, cx.getTestTimeout(), func() { + wgSchedule.Add(number) - if err := createWatchConnection(); err != nil { - cx.t.Fatalf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) - } + for i := 0; i < number; i++ { + go func(i int) { + defer wgDone.Done() - }(i) - } - // make sure all goroutines have already been scheduled. - wgSchedule.Wait() + if err := createWatchConnection(); err != nil { + cx.t.Fatalf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) + } + }(i) + } + + // make sure all goroutines have already been scheduled. + wgSchedule.Wait() + }) } func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) {