Support configuring MaxConcurrentStreams for http2

Backport https://github.com/etcd-io/etcd/pull/14219 to 3.4

Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
Benjamin Wang 2022-07-21 13:30:06 +08:00
parent 40ccb8b454
commit 6071b1c523
14 changed files with 461 additions and 14 deletions

View File

@ -18,6 +18,7 @@ import (
"crypto/tls"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
@ -55,6 +56,7 @@ const (
DefaultMaxTxnOps = uint(128)
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultMaxConcurrentStreams = math.MaxUint32
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
@ -177,6 +179,10 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`
// MaxConcurrentStreams specifies the maximum number of concurrent
// streams that each client can open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`
LPUrls, LCUrls []url.URL
APUrls, ACUrls []url.URL
ClientTLSInfo transport.TLSInfo
@ -274,7 +280,7 @@ type Config struct {
AuthToken string `json:"auth-token"`
BcryptCost uint `json:"bcrypt-cost"`
//The AuthTokenTTL in seconds of the simple token
// AuthTokenTTL specifies the TTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
@ -394,6 +400,7 @@ func NewConfig() *Config {
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxConcurrentStreams: DefaultMaxConcurrentStreams,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,
GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,

View File

@ -188,6 +188,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
@ -331,7 +332,10 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali
zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
zap.String("initial-cluster-state", ec.ClusterState),
zap.String("initial-cluster-token", sc.InitialClusterToken),
zap.Int64("quota-size-bytes", quota),
zap.Int64("quota-backend-bytes", quota),
zap.Uint("max-request-bytes", sc.MaxRequestBytes),
zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams),
zap.Bool("pre-vote", sc.PreVote),
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),

View File

@ -17,6 +17,7 @@ package embed
import (
"context"
"fmt"
"golang.org/x/net/http2"
"io/ioutil"
defaultLog "log"
"math"
@ -132,6 +133,10 @@ func (sctx *serveCtx) serve(
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if err = configureHttpServer(srvhttp, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
}
httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }()
@ -185,6 +190,10 @@ func (sctx *serveCtx) serve(
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err = configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
go func() { errHandler(srv.Serve(tlsl)) }()
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
@ -202,6 +211,13 @@ func (sctx *serveCtx) serve(
return m.Serve()
}
func configureHttpServer(srv *http.Server, cfg etcdserver.ServerConfig) error {
// todo (ahrtr): should we support configuring other parameters in the future as well?
return http2.ConfigureServer(srv, &http2.Server{
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
})
}
// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Given in gRPC docs.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {

View File

@ -163,6 +163,8 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.ec.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
fs.DurationVar(&cfg.ec.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.ec.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")
fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.")
// clustering
fs.Var(
flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""),
@ -336,6 +338,8 @@ func (cfg *config) configFromCmdLine() error {
cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites")
cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams")
// TODO: remove this in v3.5
cfg.ec.DeprecatedLogOutput = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-output")
cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs")

View File

@ -45,6 +45,7 @@ import (
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
@ -86,6 +87,8 @@ var (
grpcProxyEnableOrdering bool
grpcProxyDebug bool
maxConcurrentStreams uint32
)
const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
@ -146,6 +149,8 @@ func newGRPCProxyStartCommand() *cobra.Command {
cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.")
return &cmd
}
@ -195,6 +200,13 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
httpClient := mustNewHTTPClient(lg)
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client)
if err := http2.ConfigureServer(srvhttp, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
}); err != nil {
lg.Fatal("Failed to configure the http server", zap.Error(err))
}
errc := make(chan error)
go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
go func() { errc <- srvhttp.Serve(httpl) }()

View File

@ -77,6 +77,8 @@ Member:
Maximum number of operations permitted in a transaction.
--max-request-bytes '1572864'
Maximum client request size in bytes the server will accept.
--max-concurrent-streams 'math.MaxUint32'
Maximum concurrent streams that each client can open at a time.
--grpc-keepalive-min-time '5s'
Minimum duration interval that a client should wait before pinging server.
--grpc-keepalive-interval '2h'

View File

@ -31,7 +31,6 @@ import (
const (
grpcOverheadBytes = 512 * 1024
maxStreams = math.MaxUint32
maxSendBytes = math.MaxInt32
)
@ -53,7 +52,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
)))
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))
grpcServer := grpc.NewServer(append(opts, gopts...)...)
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))

View File

@ -119,6 +119,10 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint
// MaxConcurrentStreams specifies the maximum number of concurrent
// streams that each client can open at a time.
MaxConcurrentStreams uint32
WarningApplyDuration time.Duration
StrictReconfigCheck bool

45
pkg/flags/uint32.go Normal file
View File

@ -0,0 +1,45 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flags
import (
"flag"
"strconv"
)
type uint32Value uint32
// NewUint32Value creates an uint32 instance with the provided value.
func NewUint32Value(v uint32) *uint32Value {
val := new(uint32Value)
*val = uint32Value(v)
return val
}
// Set parses a command line uint32 value.
// Implements "flag.Value" interface.
func (i *uint32Value) Set(s string) error {
v, err := strconv.ParseUint(s, 0, 32)
*i = uint32Value(v)
return err
}
func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) }
// Uint32FromFlag return the uint32 value of a flag with the given name
func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 {
val := *fs.Lookup(name).Value.(*uint32Value)
return uint32(val)
}

111
pkg/flags/uint32_test.go Normal file
View File

@ -0,0 +1,111 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flags
import (
"flag"
"testing"
"github.com/stretchr/testify/assert"
)
func TestUint32Value(t *testing.T) {
cases := []struct {
name string
s string
expectedVal uint32
expectError bool
}{
{
name: "normal uint32 value",
s: "200",
expectedVal: 200,
},
{
name: "zero value",
s: "0",
expectedVal: 0,
},
{
name: "negative int value",
s: "-200",
expectError: true,
},
{
name: "invalid integer value",
s: "invalid",
expectError: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var val uint32Value
err := val.Set(tc.s)
if tc.expectError {
if err == nil {
t.Errorf("Expected failure on parsing uint32 value from %s", tc.s)
}
} else {
if err != nil {
t.Errorf("Unexpected error when parsing %s: %v", tc.s, err)
}
assert.Equal(t, uint32(val), tc.expectedVal)
}
})
}
}
func TestUint32FromFlag(t *testing.T) {
const flagName = "max-concurrent-streams"
cases := []struct {
name string
defaultVal uint32
arguments []string
expectedVal uint32
}{
{
name: "only default value",
defaultVal: 15,
arguments: []string{},
expectedVal: 15,
},
{
name: "argument has different value from the default one",
defaultVal: 16,
arguments: []string{"--max-concurrent-streams", "200"},
expectedVal: 200,
},
{
name: "argument has the same value from the default one",
defaultVal: 105,
arguments: []string{"--max-concurrent-streams", "105"},
expectedVal: 105,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
fs := flag.NewFlagSet("etcd", flag.ContinueOnError)
fs.Var(NewUint32Value(tc.defaultVal), flagName, "Maximum concurrent streams that each client can open at a time.")
if err := fs.Parse(tc.arguments); err != nil {
t.Fatalf("Unexpected error: %v\n", err)
}
actualMaxStream := Uint32FromFlag(fs, flagName)
assert.Equal(t, actualMaxStream, tc.expectedVal)
})
}
}

View File

@ -134,6 +134,8 @@ type etcdProcessClusterConfig struct {
enableV2 bool
initialCorruptCheck bool
authTokenOpts string
MaxConcurrentStreams uint32 // default is math.MaxUint32
}
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
@ -263,6 +265,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
args = append(args, "--auth-token", cfg.authTokenOpts)
}
if cfg.MaxConcurrentStreams != 0 {
args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams))
}
etcdCfgs[i] = &etcdServerProcessConfig{
execPath: cfg.execPath,
args: args,

View File

@ -23,11 +23,11 @@ import (
)
func TestCtlV3AuthCertCN(t *testing.T) {
testCtl(t, authTestCertCN, withCfg(*newConfigClientTLSCertAuth()))
testCtl(t, authTestCertCN, withCfg(configClientTLSCertAuth))
}
func TestCtlV3AuthCertCNAndUsername(t *testing.T) {
testCtl(t, authTestCertCNAndUsername, withCfg(*newConfigClientTLSCertAuth()))
testCtl(t, authTestCertCNAndUsername, withCfg(configClientTLSCertAuth))
}
func TestCtlV3AuthCertCNAndUsernameNoPassword(t *testing.T) {
testCtl(t, authTestCertCNAndUsernameNoPassword, withCfg(*newConfigClientTLSCertAuth()))
testCtl(t, authTestCertCNAndUsernameNoPassword, withCfg(configClientTLSCertAuth))
}

View File

@ -64,6 +64,7 @@ type ctlCtx struct {
envMap map[string]struct{}
dialTimeout time.Duration
testTimeout time.Duration
quorum bool // if true, set up 3-node cluster and linearizable read
interactive bool
@ -94,6 +95,10 @@ func withDialTimeout(timeout time.Duration) ctlOption {
return func(cx *ctlCtx) { cx.dialTimeout = timeout }
}
func withTestTimeout(timeout time.Duration) ctlOption {
return func(cx *ctlCtx) { cx.testTimeout = timeout }
}
func withQuorum() ctlOption {
return func(cx *ctlCtx) { cx.quorum = true }
}
@ -130,14 +135,26 @@ func withFlagByEnv() ctlOption {
return func(cx *ctlCtx) { cx.envMap = make(map[string]struct{}) }
}
func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
defer testutil.AfterTest(t)
// This function must be called after the `withCfg`, otherwise its value
// may be overwritten by `withCfg`.
func withMaxConcurrentStreams(streams uint32) ctlOption {
return func(cx *ctlCtx) {
cx.cfg.MaxConcurrentStreams = streams
}
}
ret := ctlCtx{
func getDefaultCtlCtx(t *testing.T) ctlCtx {
return ctlCtx{
t: t,
cfg: configAutoTLS,
dialTimeout: 7 * time.Second,
}
}
func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
defer testutil.AfterTest(t)
ret := getDefaultCtlCtx(t)
ret.applyOpts(opts)
mustEtcdctl(t)
@ -175,10 +192,8 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
testFunc(ret)
}()
timeout := 2*ret.dialTimeout + time.Second
if ret.dialTimeout == 0 {
timeout = 30 * time.Second
}
timeout := ret.getTestTimeout()
select {
case <-time.After(timeout):
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
@ -186,6 +201,17 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
}
}
func (cx *ctlCtx) getTestTimeout() time.Duration {
timeout := cx.testTimeout
if timeout == 0 {
timeout = 2*cx.dialTimeout + time.Second
if cx.dialTimeout == 0 {
timeout = 30 * time.Second
}
}
return timeout
}
func (cx *ctlCtx) prefixArgs(eps []string) []string {
fmap := make(map[string]string)
fmap["endpoints"] = strings.Join(eps, ",")

View File

@ -0,0 +1,211 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"encoding/json"
"fmt"
"sync"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/testutil"
)
// NO TLS
func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Small(t *testing.T) {
testV3CurlMaxStream(t, false, withCfg(configNoTLS), withMaxConcurrentStreams(3))
}
func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Medium(t *testing.T) {
testV3CurlMaxStream(t, false, withCfg(configNoTLS), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second))
}
/*
// There are lots of "device not configured" errors. I suspect it's an issue
// of the project `github.com/creack/pty`. I manually executed the test case
// with 1000 concurrent streams, and confirmed it's working as expected.
// TODO(ahrtr): investigate the test failure in the future.
func TestV3Curl_MaxStreamsNoTLS_BelowLimit_Large(t *testing.T) {
f, err := setRLimit(10240)
if err != nil {
t.Fatal(err)
}
testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(1000), withTestTimeout(200*time.Second))
f()
} */
func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Small(t *testing.T) {
testV3CurlMaxStream(t, true, withCfg(configNoTLS), withMaxConcurrentStreams(3))
}
func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Medium(t *testing.T) {
testV3CurlMaxStream(t, true, withCfg(configNoTLS), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second))
}
// TLS
func TestV3Curl_MaxStreams_BelowLimit_TLS_Small(t *testing.T) {
testV3CurlMaxStream(t, false, withCfg(configTLS), withMaxConcurrentStreams(3))
}
func TestV3Curl_MaxStreams_BelowLimit_TLS_Medium(t *testing.T) {
testV3CurlMaxStream(t, false, withCfg(configTLS), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second))
}
func TestV3Curl_MaxStreams_ReachLimit_TLS_Small(t *testing.T) {
testV3CurlMaxStream(t, true, withCfg(configTLS), withMaxConcurrentStreams(3))
}
func TestV3Curl_MaxStreams_ReachLimit_TLS_Medium(t *testing.T) {
testV3CurlMaxStream(t, true, withCfg(configTLS), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second))
}
func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) {
// Step 1: generate configuration for creating cluster
t.Log("Generating configuration for creating cluster.")
cx := getDefaultCtlCtx(t)
cx.applyOpts(opts)
// We must set the `ClusterSize` to 1, otherwise different streams may
// connect to different members, accordingly it's difficult to test the
// behavior.
cx.cfg.clusterSize = 1
// Step 2: create the cluster
t.Log("Creating an etcd cluster")
epc, err := newEtcdProcessCluster(&cx.cfg)
if err != nil {
t.Fatalf("Failed to start etcd cluster: %v", err)
}
cx.epc = epc
// Step 3: run test
// (a) generate ${concurrentNumber} concurrent watch streams;
// (b) submit a range request.
var wg sync.WaitGroup
concurrentNumber := cx.cfg.MaxConcurrentStreams - 1
expectedResponse := `"revision":"`
if reachLimit {
concurrentNumber = cx.cfg.MaxConcurrentStreams
expectedResponse = "Operation timed out"
}
wg.Add(int(concurrentNumber))
t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expectedResponse: %s\n",
cx.cfg.MaxConcurrentStreams, concurrentNumber, expectedResponse)
errCh := make(chan error, concurrentNumber)
submitConcurrentWatch(cx, int(concurrentNumber), &wg, errCh)
submitRangeAfterConcurrentWatch(cx, expectedResponse)
// Step 4: check the watch errors. Note that we only check the watch error
// before closing cluster. Once we close the cluster, the watch must run
// into error, and we should ignore them by then.
t.Log("Checking watch error.")
select {
case werr := <-errCh:
t.Fatal(werr)
default:
}
// Step 5: Close the cluster
t.Log("Closing test cluster...")
assert.NoError(t, epc.Close())
t.Log("Closed test cluster")
// Step 6: Waiting all watch goroutines to exit.
donec := make(chan struct{})
go func() {
defer close(donec)
wg.Wait()
}()
timeout := cx.getTestTimeout()
t.Logf("Waiting test case to finish, timeout: %s", timeout)
select {
case <-time.After(timeout):
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
case <-donec:
t.Log("All watch goroutines exited.")
}
t.Log("testV3CurlMaxStream done!")
}
func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh chan<- error) {
wreq, err := json.Marshal(&pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1})
if err != nil {
cx.t.Fatal(err)
}
wstr := `{"create_request" : ` + string(wreq) + "}"
var wgSchedule sync.WaitGroup
wgSchedule.Add(number)
for i := 0; i < number; i++ {
go func(i int) {
wgSchedule.Done()
defer wgDone.Done()
if err := cURLPost(cx.epc, cURLReq{endpoint: "/v3/watch", value: wstr, expected: `"revision":"`}); err != nil {
werr := fmt.Errorf("testV3CurlMaxStream watch failed: %d, error: %v", i, err)
cx.t.Error(werr)
errCh <- werr
}
}(i)
}
// make sure all goroutines have already been scheduled.
wgSchedule.Wait()
// We need to make sure all watch streams have already been created.
// For simplicity, we just sleep 3 second. We may consider improving
// it in the future.
time.Sleep(3 * time.Second)
}
func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) {
rangeData, err := json.Marshal(&pb.RangeRequest{
Key: []byte("foo"),
})
if err != nil {
cx.t.Fatal(err)
}
cx.t.Log("Submitting range request...")
if err := cURLPost(cx.epc, cURLReq{endpoint: "/v3/kv/range", value: string(rangeData), expected: expectedValue, timeout: 5}); err != nil {
cx.t.Fatalf("testV3CurlMaxStream get failed, error: %v", err)
}
cx.t.Log("range request done")
}
// setRLimit sets the open file limitation, and return a function which
// is used to reset the limitation.
func setRLimit(nofile uint64) (func() error, error) {
var rLimit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
return nil, fmt.Errorf("failed to get open file limit, error: %v", err)
}
var wLimit syscall.Rlimit
wLimit.Max = nofile
wLimit.Cur = nofile
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &wLimit); err != nil {
return nil, fmt.Errorf("failed to set max open file limit, %v", err)
}
return func() error {
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
return fmt.Errorf("failed reset max open file limit, %v", err)
}
return nil
}, nil
}