Merge pull request #14482 from fuweid/testing-curl-maxstream

e2e: make maxstream test stable
This commit is contained in:
Benjamin Wang 2022-09-20 12:30:10 +08:00 committed by GitHub
commit c113f93d79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/rand"
"sync" "sync"
"syscall" "syscall"
"testing" "testing"
@ -27,6 +28,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/testutils"
) )
// NO TLS // NO TLS
@ -38,19 +40,14 @@ func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Medium(t *testing.T) {
testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second))
} }
/*
// There are lots of "device not configured" errors. I suspect it's an issue
// of the project `github.com/creack/pty`. I manually executed the test case
// with 1000 concurrent streams, and confirmed it's working as expected.
// TODO(ahrtr): investigate the test failure in the future.
func TestV3Curl_MaxStreamsNoTLS_BelowLimit_Large(t *testing.T) { func TestV3Curl_MaxStreamsNoTLS_BelowLimit_Large(t *testing.T) {
f, err := setRLimit(10240) f, err := setRLimit(10240)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer f()
testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(1000), withTestTimeout(200*time.Second)) testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(1000), withTestTimeout(200*time.Second))
f() }
} */
func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Small(t *testing.T) { func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Small(t *testing.T) {
testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3)) testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3))
@ -109,31 +106,23 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) {
expectedResponse = "Operation timed out" expectedResponse = "Operation timed out"
} }
wg.Add(int(concurrentNumber)) wg.Add(int(concurrentNumber))
t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expectedResponse: %s\n", t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expected range's response: %s\n",
cx.cfg.MaxConcurrentStreams, concurrentNumber, expectedResponse) cx.cfg.MaxConcurrentStreams, concurrentNumber, expectedResponse)
errCh := make(chan error, concurrentNumber)
submitConcurrentWatch(cx, int(concurrentNumber), &wg, errCh) closeServerCh := make(chan struct{})
submitConcurrentWatch(cx, int(concurrentNumber), &wg, closeServerCh)
submitRangeAfterConcurrentWatch(cx, expectedResponse) submitRangeAfterConcurrentWatch(cx, expectedResponse)
// Step 4: check the watch errors. Note that we only check the watch error // Step 4: Close the cluster
// before closing cluster. Once we close the cluster, the watch must run
// into error, and we should ignore them by then.
t.Log("Checking watch error.")
select {
case werr := <-errCh:
t.Fatal(werr)
default:
}
// Step 5: Close the cluster
t.Log("Closing test cluster...") t.Log("Closing test cluster...")
close(closeServerCh)
assert.NoError(t, epc.Close()) assert.NoError(t, epc.Close())
t.Log("Closed test cluster") t.Log("Closed test cluster")
// Step 6: Waiting all watch goroutines to exit. // Step 5: Waiting all watch goroutines to exit.
donec := make(chan struct{}) doneCh := make(chan struct{})
go func() { go func() {
defer close(donec) defer close(doneCh)
wg.Wait() wg.Wait()
}() }()
@ -142,14 +131,14 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) {
select { select {
case <-time.After(timeout): case <-time.After(timeout):
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
case <-donec: case <-doneCh:
t.Log("All watch goroutines exited.") t.Log("All watch goroutines exited.")
} }
t.Log("testV3CurlMaxStream done!") t.Log("testV3CurlMaxStream done!")
} }
func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh chan<- error) { func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, closeCh chan struct{}) {
watchData, err := json.Marshal(&pb.WatchRequest_CreateRequest{ watchData, err := json.Marshal(&pb.WatchRequest_CreateRequest{
CreateRequest: &pb.WatchCreateRequest{ CreateRequest: &pb.WatchCreateRequest{
Key: []byte("foo"), Key: []byte("foo"),
@ -160,24 +149,57 @@ func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh
} }
var wgSchedule sync.WaitGroup var wgSchedule sync.WaitGroup
wgSchedule.Add(number)
for i := 0; i < number; i++ { createWatchConnection := func() error {
go func(i int) { cluster := cx.epc
wgSchedule.Done() member := cluster.Procs[rand.Intn(cluster.Cfg.ClusterSize)]
defer wgDone.Done() curlReq := e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData)}
if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData), Expected: `"revision":"`}); err != nil {
werr := fmt.Errorf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) args := e2e.CURLPrefixArgs(cluster.Cfg, member, "POST", curlReq)
cx.t.Error(werr) proc, err := e2e.SpawnCmd(args, nil)
errCh <- werr if err != nil {
} return fmt.Errorf("failed to spawn: %w", err)
}(i) }
defer proc.Stop()
// make sure that watch request has been created
expectedLine := `"created":true}}`
_, lerr := proc.ExpectWithContext(context.TODO(), expectedLine)
if lerr != nil {
return fmt.Errorf("%v %v (expected %q). Try EXPECT_DEBUG=TRUE", args, lerr, expectedLine)
}
wgSchedule.Done()
// hold the connection and wait for server shutdown
perr := proc.Close()
// curl process will return
select {
case <-closeCh:
default:
// perr could be nil.
return fmt.Errorf("unexpected connection close before server closes: %v", perr)
}
return nil
} }
// make sure all goroutines have already been scheduled.
wgSchedule.Wait() testutils.ExecuteWithTimeout(cx.t, cx.getTestTimeout(), func() {
// We need to make sure all watch streams have already been created. wgSchedule.Add(number)
// For simplicity, we just sleep 3 second. We may consider improving
// it in the future. for i := 0; i < number; i++ {
time.Sleep(3 * time.Second) go func(i int) {
defer wgDone.Done()
if err := createWatchConnection(); err != nil {
cx.t.Fatalf("testV3CurlMaxStream watch failed: %d, error: %v", i, err)
}
}(i)
}
// make sure all goroutines have already been scheduled.
wgSchedule.Wait()
})
} }
func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) { func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) {