From 622017468725647028e75aea2c971966ab70b3b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E9=87=91=E7=8F=8F?= Date: Mon, 30 May 2022 17:03:27 +0800 Subject: [PATCH 1/6] support custom `grpc.MaxConcurrentStreams` There is no update on the original PR (see below) for more then 2 weeks. So Benjamin(@ahrtr) continues to work on the PR. The first step is to rebase the PR, because there are lots of conflicts with the main branch. The change to go.mod and go.sum reverted, because they are not needed. The e2e test cases are also reverted, because they are not correct. ``` https://github.com/etcd-io/etcd/pull/14081 ``` Signed-off-by: nic-chen Signed-off-by: Benjamin Wang --- pkg/flags/int.go | 50 +++++++++++++++++++++++++ pkg/flags/int_test.go | 57 +++++++++++++++++++++++++++++ server/config/config.go | 4 ++ server/embed/config.go | 9 ++++- server/embed/etcd.go | 1 + server/embed/serve.go | 14 +++++++ server/etcdmain/config.go | 4 ++ server/etcdmain/grpc_proxy.go | 11 ++++++ server/etcdmain/help.go | 2 + server/etcdserver/api/v3rpc/grpc.go | 3 +- 10 files changed, 152 insertions(+), 3 deletions(-) create mode 100644 pkg/flags/int.go create mode 100644 pkg/flags/int_test.go diff --git a/pkg/flags/int.go b/pkg/flags/int.go new file mode 100644 index 000000000..c3d763da4 --- /dev/null +++ b/pkg/flags/int.go @@ -0,0 +1,50 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "flag" + "fmt" + "strconv" +) + +type uint32Value uint32 + +func NewUint32Value(s string) *uint32Value { + var u uint32Value + if s == "" || s == "0" { + return &u + } + if err := u.Set(s); err != nil { + panic(fmt.Sprintf("new uint32Value should never fail: %v", err)) + } + return &u +} + +func (i *uint32Value) Set(s string) error { + v, err := strconv.ParseUint(s, 0, 32) + *i = uint32Value(v) + return err +} +func (i *uint32Value) Type() string { + return "uint32" +} +func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) } + +// Uint32FromFlag return the uint32 value of a flag with the given name +func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 { + val := *fs.Lookup(name).Value.(*uint32Value) + return uint32(val) +} diff --git a/pkg/flags/int_test.go b/pkg/flags/int_test.go new file mode 100644 index 000000000..2ebabd01f --- /dev/null +++ b/pkg/flags/int_test.go @@ -0,0 +1,57 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "reflect" + "testing" +) + +func TestInvalidUint32(t *testing.T) { + tests := []string{ + // string + "invalid", + // negative number + "-1", + // float number + "0.1", + "-0.2", + // larger than math.MaxUint32 + "4294967296", + } + for i, in := range tests { + var u uint32Value + if err := u.Set(in); err == nil { + t.Errorf(`#%d: unexpected nil error for in=%q`, i, in) + } + } +} + +func TestUint32Value(t *testing.T) { + tests := []struct { + s string + exp uint32 + }{ + {s: "0", exp: 0}, + {s: "1", exp: 1}, + {s: "", exp: 0}, + } + for i := range tests { + ss := uint32(*NewUint32Value(tests[i].s)) + if !reflect.DeepEqual(tests[i].exp, ss) { + t.Fatalf("#%d: expected %q, got %q", i, tests[i].exp, ss) + } + } +} diff --git a/server/config/config.go b/server/config/config.go index 75d7df6c4..625f019e4 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -129,6 +129,10 @@ type ServerConfig struct { // MaxRequestBytes is the maximum request size to send over raft. MaxRequestBytes uint + // MaxConcurrentStreams optionally specifies the number of concurrent + // streams that each client may have open at a time. + MaxConcurrentStreams uint32 + WarningApplyDuration time.Duration WarningUnaryRequestDuration time.Duration diff --git a/server/embed/config.go b/server/embed/config.go index 2cacf5ea4..21d38ca84 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -17,6 +17,7 @@ package embed import ( "errors" "fmt" + "math" "net" "net/http" "net/url" @@ -59,6 +60,7 @@ const ( DefaultWarningApplyDuration = 100 * time.Millisecond DefaultWarningUnaryRequestDuration = 300 * time.Millisecond DefaultMaxRequestBytes = 1.5 * 1024 * 1024 + DefaultMaxConcurrentStreams = math.MaxUint32 DefaultGRPCKeepAliveMinTime = 5 * time.Second DefaultGRPCKeepAliveInterval = 2 * time.Hour DefaultGRPCKeepAliveTimeout = 20 * time.Second @@ -205,6 +207,10 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` + // MaxConcurrentStreams optionally specifies the number of concurrent + // streams that each client may have open at a time. + MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` + LPUrls, LCUrls []url.URL APUrls, ACUrls []url.URL ClientTLSInfo transport.TLSInfo @@ -311,7 +317,7 @@ type Config struct { AuthToken string `json:"auth-token"` BcryptCost uint `json:"bcrypt-cost"` - //The AuthTokenTTL in seconds of the simple token + // AuthTokenTTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` @@ -462,6 +468,7 @@ func NewConfig() *Config { MaxTxnOps: DefaultMaxTxnOps, MaxRequestBytes: DefaultMaxRequestBytes, + MaxConcurrentStreams: DefaultMaxConcurrentStreams, ExperimentalWarningApplyDuration: DefaultWarningApplyDuration, ExperimentalWarningUnaryRequestDuration: DefaultWarningUnaryRequestDuration, diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 766336ccb..b557e5d3e 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -191,6 +191,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { BackendBatchInterval: cfg.BackendBatchInterval, MaxTxnOps: cfg.MaxTxnOps, MaxRequestBytes: cfg.MaxRequestBytes, + MaxConcurrentStreams: cfg.MaxConcurrentStreams, SocketOpts: cfg.SocketOpts, StrictReconfigCheck: cfg.StrictReconfigCheck, ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, diff --git a/server/embed/serve.go b/server/embed/serve.go index cb3d1c72d..5e162f3c8 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -44,6 +44,7 @@ import ( "github.com/soheilhy/cmux" "github.com/tmc/grpc-websocket-proxy/wsproxy" "go.uber.org/zap" + "golang.org/x/net/http2" "golang.org/x/net/trace" "google.golang.org/grpc" ) @@ -148,6 +149,9 @@ func (sctx *serveCtx) serve( Handler: createAccessController(sctx.lg, s, httpmux), ErrorLog: logger, // do not log user error } + if err := setMaxConcurrentStreams(srvhttp, s.Cfg.MaxConcurrentStreams); err != nil { + return err + } httpl := m.Match(cmux.HTTP1()) go func() { errHandler(srvhttp.Serve(httpl)) }() @@ -197,6 +201,9 @@ func (sctx *serveCtx) serve( TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } + if err := setMaxConcurrentStreams(srv, s.Cfg.MaxConcurrentStreams); err != nil { + return err + } go func() { errHandler(srv.Serve(tlsl)) }() sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} @@ -209,6 +216,13 @@ func (sctx *serveCtx) serve( return m.Serve() } +// setMaxConcurrentStreams sets the maxConcurrentStreams for the http server +func setMaxConcurrentStreams(srv *http.Server, maxConcurrentStreams uint32) error { + return http2.ConfigureServer(srv, &http2.Server{ + MaxConcurrentStreams: maxConcurrentStreams, + }) +} + // grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC // connections or otherHandler otherwise. Given in gRPC docs. func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 8091589df..88f5b73ec 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -142,6 +142,8 @@ func newConfig() *config { fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") + fs.Var(flags.NewUint32Value(""), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.") + // clustering fs.Var( flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""), @@ -380,6 +382,8 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites") + cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams") + cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs") cfg.ec.ClusterState = cfg.cf.clusterState.String() diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 55cc96d0c..63f4d2e1f 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -47,6 +47,7 @@ import ( "github.com/soheilhy/cmux" "github.com/spf13/cobra" "go.uber.org/zap" + "golang.org/x/net/http2" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" @@ -95,6 +96,8 @@ var ( grpcKeepAliveMinTime time.Duration grpcKeepAliveTimeout time.Duration grpcKeepAliveInterval time.Duration + + maxConcurrentStreams uint32 ) const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024 @@ -159,6 +162,8 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.") + cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client may have open at a time.") + return &cmd } @@ -212,6 +217,12 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { httpClient := mustNewHTTPClient(lg) srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient) + if err := http2.ConfigureServer(srvhttp, &http2.Server{ + MaxConcurrentStreams: maxConcurrentStreams, + }); err != nil { + panic(err) + } + errc := make(chan error, 3) go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }() go func() { errc <- srvhttp.Serve(httpl) }() diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index efa2a77e8..3483e96bf 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -81,6 +81,8 @@ Member: Maximum number of operations permitted in a transaction. --max-request-bytes '1572864' Maximum client request size in bytes the server will accept. + --max-concurrent-streams '0' + Maximum concurrent streams that each client may have open at a time. --grpc-keepalive-min-time '5s' Minimum duration interval that a client should wait before pinging server. --grpc-keepalive-interval '2h' diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index ea3dd7570..349ebea40 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -32,7 +32,6 @@ import ( const ( grpcOverheadBytes = 512 * 1024 - maxStreams = math.MaxUint32 maxSendBytes = math.MaxInt32 ) @@ -68,7 +67,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) - opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) + opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams)) grpcServer := grpc.NewServer(append(opts, gopts...)...) From d8347ec68393bdae2db6ecb0717666a592c91fb1 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 28 Jun 2022 05:29:30 +0800 Subject: [PATCH 2/6] add & refine the uint32Value data type The golang buildin package `flag` doesn't support `uint32` data type, so we need to support it via the `flag.Var`. Signed-off-by: Benjamin Wang --- pkg/flags/int_test.go | 57 --------------------------- pkg/flags/{int.go => uint32.go} | 23 +++++------ pkg/flags/uint32_test.go | 68 +++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 71 deletions(-) delete mode 100644 pkg/flags/int_test.go rename pkg/flags/{int.go => uint32.go} (76%) create mode 100644 pkg/flags/uint32_test.go diff --git a/pkg/flags/int_test.go b/pkg/flags/int_test.go deleted file mode 100644 index 2ebabd01f..000000000 --- a/pkg/flags/int_test.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package flags - -import ( - "reflect" - "testing" -) - -func TestInvalidUint32(t *testing.T) { - tests := []string{ - // string - "invalid", - // negative number - "-1", - // float number - "0.1", - "-0.2", - // larger than math.MaxUint32 - "4294967296", - } - for i, in := range tests { - var u uint32Value - if err := u.Set(in); err == nil { - t.Errorf(`#%d: unexpected nil error for in=%q`, i, in) - } - } -} - -func TestUint32Value(t *testing.T) { - tests := []struct { - s string - exp uint32 - }{ - {s: "0", exp: 0}, - {s: "1", exp: 1}, - {s: "", exp: 0}, - } - for i := range tests { - ss := uint32(*NewUint32Value(tests[i].s)) - if !reflect.DeepEqual(tests[i].exp, ss) { - t.Fatalf("#%d: expected %q, got %q", i, tests[i].exp, ss) - } - } -} diff --git a/pkg/flags/int.go b/pkg/flags/uint32.go similarity index 76% rename from pkg/flags/int.go rename to pkg/flags/uint32.go index c3d763da4..bbef7df6a 100644 --- a/pkg/flags/int.go +++ b/pkg/flags/uint32.go @@ -1,4 +1,4 @@ -// Copyright 2018 The etcd Authors +// Copyright 2022 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,31 +16,26 @@ package flags import ( "flag" - "fmt" "strconv" ) type uint32Value uint32 -func NewUint32Value(s string) *uint32Value { - var u uint32Value - if s == "" || s == "0" { - return &u - } - if err := u.Set(s); err != nil { - panic(fmt.Sprintf("new uint32Value should never fail: %v", err)) - } - return &u +// NewUint32Value creates a uint32Value instance with the default value `v`. +func NewUint32Value(v uint32) *uint32Value { + val := new(uint32Value) + *val = uint32Value(v) + return val } +// Set parses a command line uint32 value. +// Implements "flag.Value" interface. func (i *uint32Value) Set(s string) error { v, err := strconv.ParseUint(s, 0, 32) *i = uint32Value(v) return err } -func (i *uint32Value) Type() string { - return "uint32" -} + func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) } // Uint32FromFlag return the uint32 value of a flag with the given name diff --git a/pkg/flags/uint32_test.go b/pkg/flags/uint32_test.go new file mode 100644 index 000000000..6e7d38df2 --- /dev/null +++ b/pkg/flags/uint32_test.go @@ -0,0 +1,68 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUint32Value(t *testing.T) { + cases := []struct { + name string + s string + expectedVal uint32 + expectError bool + }{ + { + name: "normal uint32 value", + s: "200", + expectedVal: 200, + }, + { + name: "zero value", + s: "0", + expectedVal: 0, + }, + { + name: "negative int value", + s: "-200", + expectError: true, + }, + { + name: "invalid integer value", + s: "invalid", + expectError: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var val uint32Value + err := val.Set(tc.s) + + if tc.expectError { + if err == nil { + t.Errorf("Expected failure on parsing uint32 value from %s", tc.s) + } + } else { + if err != nil { + t.Errorf("Unexpected error when parsing %s: %v", tc.s, err) + } + assert.Equal(t, uint32(val), tc.expectedVal) + } + }) + } +} From 053ba95ed5447d43a7c27a6243f7566c6d80097b Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 28 Jun 2022 05:32:29 +0800 Subject: [PATCH 3/6] set max concurrent streams to the http2 server The default max stream is 250 in http2. When there are more then 250 streams, the client side may be blocked until some previous streams are released. So we need to support configuring a larger `MaxConcurrentStreams`. Signed-off-by: Benjamin Wang --- server/config/config.go | 4 ++-- server/embed/config.go | 4 ++-- server/embed/etcd.go | 5 ++++- server/embed/serve.go | 13 ++++++++----- server/etcdmain/config.go | 4 ++-- server/etcdmain/grpc_proxy.go | 3 ++- server/etcdmain/help.go | 2 +- 7 files changed, 21 insertions(+), 14 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 625f019e4..d471035d8 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -129,8 +129,8 @@ type ServerConfig struct { // MaxRequestBytes is the maximum request size to send over raft. MaxRequestBytes uint - // MaxConcurrentStreams optionally specifies the number of concurrent - // streams that each client may have open at a time. + // MaxConcurrentStreams specifies the max number of concurrent + // streams that the server will accept. MaxConcurrentStreams uint32 WarningApplyDuration time.Duration diff --git a/server/embed/config.go b/server/embed/config.go index 21d38ca84..12cf65698 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -207,8 +207,8 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` - // MaxConcurrentStreams optionally specifies the number of concurrent - // streams that each client may have open at a time. + // MaxConcurrentStreams specifies the number of concurrent streams + // that the server can accept. MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` LPUrls, LCUrls []url.URL diff --git a/server/embed/etcd.go b/server/embed/etcd.go index b557e5d3e..564ad5e7a 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -337,7 +337,10 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.String("initial-cluster", sc.InitialPeerURLsMap.String()), zap.String("initial-cluster-state", ec.ClusterState), zap.String("initial-cluster-token", sc.InitialClusterToken), - zap.Int64("quota-size-bytes", quota), + zap.Int64("quota-backend-bytes", quota), + zap.Uint("max-request-bytes", sc.MaxRequestBytes), + zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams), + zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), diff --git a/server/embed/serve.go b/server/embed/serve.go index 5e162f3c8..08a1dc841 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/client/v3/credentials" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/pkg/v3/httputil" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" "go.etcd.io/etcd/server/v3/etcdserver/api/v3election" @@ -149,7 +150,8 @@ func (sctx *serveCtx) serve( Handler: createAccessController(sctx.lg, s, httpmux), ErrorLog: logger, // do not log user error } - if err := setMaxConcurrentStreams(srvhttp, s.Cfg.MaxConcurrentStreams); err != nil { + if err := configureHttpServer(srvhttp, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) return err } httpl := m.Match(cmux.HTTP1()) @@ -201,7 +203,8 @@ func (sctx *serveCtx) serve( TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } - if err := setMaxConcurrentStreams(srv, s.Cfg.MaxConcurrentStreams); err != nil { + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure https server failed", zap.Error(err)) return err } go func() { errHandler(srv.Serve(tlsl)) }() @@ -216,10 +219,10 @@ func (sctx *serveCtx) serve( return m.Serve() } -// setMaxConcurrentStreams sets the maxConcurrentStreams for the http server -func setMaxConcurrentStreams(srv *http.Server, maxConcurrentStreams uint32) error { +func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error { + // todo (ahrtr): should we support configuring other parameters in the future as well? return http2.ConfigureServer(srv, &http2.Server{ - MaxConcurrentStreams: maxConcurrentStreams, + MaxConcurrentStreams: cfg.MaxConcurrentStreams, }) } diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 88f5b73ec..96b54cada 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -138,12 +138,12 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.") fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.") + fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.") + // raft connection timeouts fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") - fs.Var(flags.NewUint32Value(""), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.") - // clustering fs.Var( flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""), diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 63f4d2e1f..492171430 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -217,10 +217,11 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { httpClient := mustNewHTTPClient(lg) srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient) + if err := http2.ConfigureServer(srvhttp, &http2.Server{ MaxConcurrentStreams: maxConcurrentStreams, }); err != nil { - panic(err) + lg.Fatal("Failed to configure the http server", zap.Error(err)) } errc := make(chan error, 3) diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 3483e96bf..c8bc56bc2 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -81,7 +81,7 @@ Member: Maximum number of operations permitted in a transaction. --max-request-bytes '1572864' Maximum client request size in bytes the server will accept. - --max-concurrent-streams '0' + --max-concurrent-streams 'math.MaxUint32' Maximum concurrent streams that each client may have open at a time. --grpc-keepalive-min-time '5s' Minimum duration interval that a client should wait before pinging server. From f40b676701341c0ab5158a05c3f6fc63652ca9e3 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 28 Jun 2022 05:40:15 +0800 Subject: [PATCH 4/6] add e2e test cases to cover the maxConcurrentStreams Signed-off-by: Benjamin Wang --- tests/e2e/ctl_v3_test.go | 30 +++- tests/e2e/v3_curl_maxstream_test.go | 218 ++++++++++++++++++++++++++++ tests/framework/e2e/cluster.go | 6 + 3 files changed, 250 insertions(+), 4 deletions(-) create mode 100644 tests/e2e/v3_curl_maxstream_test.go diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index 8b11cf6b5..20e4362f4 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -133,6 +133,7 @@ type ctlCtx struct { envMap map[string]string dialTimeout time.Duration + testTimeout time.Duration quorum bool // if true, set up 3-node cluster and linearizable read interactive bool @@ -166,6 +167,10 @@ func withDialTimeout(timeout time.Duration) ctlOption { return func(cx *ctlCtx) { cx.dialTimeout = timeout } } +func withTestTimeout(timeout time.Duration) ctlOption { + return func(cx *ctlCtx) { cx.testTimeout = timeout } +} + func withQuorum() ctlOption { return func(cx *ctlCtx) { cx.quorum = true } } @@ -198,6 +203,14 @@ func withFlagByEnv() ctlOption { return func(cx *ctlCtx) { cx.envMap = make(map[string]string) } } +// This function must be called after the `withCfg`, otherwise its value +// may be overwritten by `withCfg`. +func withMaxConcurrentStreams(streams uint32) ctlOption { + return func(cx *ctlCtx) { + cx.cfg.MaxConcurrentStreams = streams + } +} + func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { testCtlWithOffline(t, testFunc, nil, opts...) } @@ -262,10 +275,8 @@ func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx t.Log("---testFunc logic DONE") }() - timeout := 2*cx.dialTimeout + time.Second - if cx.dialTimeout == 0 { - timeout = 30 * time.Second - } + timeout := cx.getTestTimeout() + select { case <-time.After(timeout): testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) @@ -282,6 +293,17 @@ func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx } } +func (cx *ctlCtx) getTestTimeout() time.Duration { + timeout := cx.testTimeout + if timeout == 0 { + timeout = 2*cx.dialTimeout + time.Second + if cx.dialTimeout == 0 { + timeout = 30 * time.Second + } + } + return timeout +} + func (cx *ctlCtx) prefixArgs(eps []string) []string { fmap := make(map[string]string) fmap["endpoints"] = strings.Join(eps, ",") diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go new file mode 100644 index 000000000..586306eda --- /dev/null +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -0,0 +1,218 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "encoding/json" + "fmt" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/client/pkg/v3/testutil" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +// NO TLS +func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Small(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +/* +// There are lots of "device not configured" errors. I suspect it's an issue +// of the project `github.com/creack/pty`. I manually executed the test case +// with 1000 concurrent streams, and confirmed it's working as expected. +// TODO(ahrtr): investigate the test failure in the future. +func TestV3Curl_MaxStreamsNoTLS_BelowLimit_Large(t *testing.T) { + f, err := setRLimit(10240) + if err != nil { + t.Fatal(err) + } + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(1000), withTestTimeout(200*time.Second)) + f() +} */ + +func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Small(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +// TLS +func TestV3Curl_MaxStreams_BelowLimit_TLS_Small(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_BelowLimit_TLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +func TestV3Curl_MaxStreams_ReachLimit_TLS_Small(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_ReachLimit_TLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { + e2e.BeforeTest(t) + + // Step 1: generate configuration for creating cluster + t.Log("Generating configuration for creating cluster.") + cx := getDefaultCtlCtx(t) + cx.applyOpts(opts) + // We must set the clusterSize to 1, otherwise different streams may + // connect to different members, accordingly it's difficult to test the + // behavior. + cx.cfg.ClusterSize = 1 + + // Step 2: create the cluster + t.Log("Creating an etcd cluster") + epc, err := e2e.NewEtcdProcessCluster(t, &cx.cfg) + if err != nil { + t.Fatalf("Failed to start etcd cluster: %v", err) + } + cx.epc = epc + cx.dataDir = epc.Procs[0].Config().DataDirPath + + // Step 3: run test + // (a) generate ${concurrentNumber} concurrent watch streams; + // (b) submit a range request. + var wg sync.WaitGroup + concurrentNumber := cx.cfg.MaxConcurrentStreams - 1 + expectedResponse := `"revision":"` + if reachLimit { + concurrentNumber = cx.cfg.MaxConcurrentStreams + expectedResponse = "Operation timed out" + } + wg.Add(int(concurrentNumber)) + t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expectedResponse: %s\n", + cx.cfg.MaxConcurrentStreams, concurrentNumber, expectedResponse) + errCh := make(chan error, concurrentNumber) + submitConcurrentWatch(cx, int(concurrentNumber), &wg, errCh) + submitRangeAfterConcurrentWatch(cx, expectedResponse) + + // Step 4: check the watch errors. Note that we ony check the watch error + // before closing cluster. Once we closed the cluster, the watch must run + // into error, and we should ignore them by then. + t.Log("Checking watch error.") + select { + case werr := <-errCh: + t.Fatal(werr) + default: + } + + // Step 5: Close the cluster + t.Log("Closing test cluster...") + assert.NoError(t, epc.Close()) + t.Log("Closed test cluster") + + // Step 6: Waiting all watch goroutines to exit. + donec := make(chan struct{}) + go func() { + defer close(donec) + wg.Wait() + }() + + timeout := cx.getTestTimeout() + t.Logf("Waiting test case to finish, timeout: %s", timeout) + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + t.Log("All watch goroutines exited.") + } + + t.Log("testV3CurlMaxStream done!") +} + +func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh chan<- error) { + watchData, err := json.Marshal(&pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo"), + }, + }) + if err != nil { + cx.t.Fatal(err) + } + + var wgSchedule sync.WaitGroup + wgSchedule.Add(number) + for i := 0; i < number; i++ { + go func(i int) { + wgSchedule.Done() + defer wgDone.Done() + if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData), Expected: `"revision":"`}); err != nil { + werr := fmt.Errorf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) + cx.t.Error(werr) + errCh <- werr + } + }(i) + } + // make sure all goroutines have already been scheduled. + wgSchedule.Wait() + // We need to make sure all watch streams have already been created. + // For simplicity, we just sleep 3 second. We may consider improving + // it in the future. + time.Sleep(3 * time.Second) +} + +func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) { + rangeData, err := json.Marshal(&pb.RangeRequest{ + Key: []byte("foo"), + }) + if err != nil { + cx.t.Fatal(err) + } + + cx.t.Log("Submitting range request...") + if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/kv/range", Value: string(rangeData), Expected: expectedValue, Timeout: 5}); err != nil { + cx.t.Fatalf("testV3CurlMaxStream get failed, error: %v", err) + } + cx.t.Log("range request done") +} + +// setRLimit sets the open file limitation, and return a function which +// is used to reset the limitation. +func setRLimit(nofile uint64) (func() error, error) { + var rLimit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + return nil, fmt.Errorf("failed to get open file limit, error: %v", err) + } + + var wLimit syscall.Rlimit + wLimit.Max = nofile + wLimit.Cur = nofile + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &wLimit); err != nil { + return nil, fmt.Errorf("failed to set max open file limit, %v", err) + } + + return func() error { + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + return fmt.Errorf("failed reset max open file limit, %v", err) + } + return nil + }, nil +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 4b1daf93d..fece5f5b0 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -176,6 +176,8 @@ type EtcdProcessClusterConfig struct { DiscoveryEndpoints []string // v3 discovery DiscoveryToken string LogLevel string + + MaxConcurrentStreams uint32 // default is math.MaxUint32 } // NewEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -341,6 +343,10 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* args = append(args, "--log-level", cfg.LogLevel) } + if cfg.MaxConcurrentStreams != 0 { + args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) + } + etcdCfgs[i] = &EtcdServerProcessConfig{ lg: lg, ExecPath: cfg.ExecPath, From 1a6fe4dbc64094486fb7a0a245a572c0ff1d5232 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 28 Jun 2022 08:34:59 +0800 Subject: [PATCH 5/6] update the comment for MaxConcurrentStreams to clearly state it's the max value for each client. Signed-off-by: Benjamin Wang --- pkg/flags/uint32.go | 2 +- server/config/config.go | 4 ++-- server/embed/config.go | 4 ++-- server/etcdmain/config.go | 2 +- server/etcdmain/grpc_proxy.go | 2 +- server/etcdmain/help.go | 2 +- tests/e2e/v3_curl_maxstream_test.go | 6 +++--- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/flags/uint32.go b/pkg/flags/uint32.go index bbef7df6a..496730a45 100644 --- a/pkg/flags/uint32.go +++ b/pkg/flags/uint32.go @@ -21,7 +21,7 @@ import ( type uint32Value uint32 -// NewUint32Value creates a uint32Value instance with the default value `v`. +// NewUint32Value creates an uint32 instance with the provided value. func NewUint32Value(v uint32) *uint32Value { val := new(uint32Value) *val = uint32Value(v) diff --git a/server/config/config.go b/server/config/config.go index d471035d8..9ecfc1463 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -129,8 +129,8 @@ type ServerConfig struct { // MaxRequestBytes is the maximum request size to send over raft. MaxRequestBytes uint - // MaxConcurrentStreams specifies the max number of concurrent - // streams that the server will accept. + // MaxConcurrentStreams specifies the maximum number of concurrent + // streams that each client can open at a time. MaxConcurrentStreams uint32 WarningApplyDuration time.Duration diff --git a/server/embed/config.go b/server/embed/config.go index 12cf65698..4e1f6a19c 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -207,8 +207,8 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` - // MaxConcurrentStreams specifies the number of concurrent streams - // that the server can accept. + // MaxConcurrentStreams specifies the maximum number of concurrent + // streams that each client can open at a time. MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` LPUrls, LCUrls []url.URL diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 96b54cada..28f81e33e 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -138,7 +138,7 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.") fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.") - fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.") + fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.") // raft connection timeouts fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 492171430..b13520695 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -162,7 +162,7 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.") - cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client may have open at a time.") + cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.") return &cmd } diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index c8bc56bc2..a7f569a8e 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -82,7 +82,7 @@ Member: --max-request-bytes '1572864' Maximum client request size in bytes the server will accept. --max-concurrent-streams 'math.MaxUint32' - Maximum concurrent streams that each client may have open at a time. + Maximum concurrent streams that each client can open at a time. --grpc-keepalive-min-time '5s' Minimum duration interval that a client should wait before pinging server. --grpc-keepalive-interval '2h' diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go index 586306eda..44dfd3dc1 100644 --- a/tests/e2e/v3_curl_maxstream_test.go +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -83,7 +83,7 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { t.Log("Generating configuration for creating cluster.") cx := getDefaultCtlCtx(t) cx.applyOpts(opts) - // We must set the clusterSize to 1, otherwise different streams may + // We must set the `ClusterSize` to 1, otherwise different streams may // connect to different members, accordingly it's difficult to test the // behavior. cx.cfg.ClusterSize = 1 @@ -114,8 +114,8 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { submitConcurrentWatch(cx, int(concurrentNumber), &wg, errCh) submitRangeAfterConcurrentWatch(cx, expectedResponse) - // Step 4: check the watch errors. Note that we ony check the watch error - // before closing cluster. Once we closed the cluster, the watch must run + // Step 4: check the watch errors. Note that we only check the watch error + // before closing cluster. Once we close the cluster, the watch must run // into error, and we should ignore them by then. t.Log("Checking watch error.") select { From 8b6c8b4c96265f2bbc1c14a1f3a8d8847ab10c86 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Thu, 7 Jul 2022 04:52:28 +0800 Subject: [PATCH 6/6] add unit test for pkg/flags/Uint32FromFlag in uint32_test.go Signed-off-by: Benjamin Wang --- pkg/flags/uint32_test.go | 43 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/pkg/flags/uint32_test.go b/pkg/flags/uint32_test.go index 6e7d38df2..aa7487a23 100644 --- a/pkg/flags/uint32_test.go +++ b/pkg/flags/uint32_test.go @@ -15,6 +15,7 @@ package flags import ( + "flag" "testing" "github.com/stretchr/testify/assert" @@ -66,3 +67,45 @@ func TestUint32Value(t *testing.T) { }) } } + +func TestUint32FromFlag(t *testing.T) { + const flagName = "max-concurrent-streams" + + cases := []struct { + name string + defaultVal uint32 + arguments []string + expectedVal uint32 + }{ + { + name: "only default value", + defaultVal: 15, + arguments: []string{}, + expectedVal: 15, + }, + { + name: "argument has different value from the default one", + defaultVal: 16, + arguments: []string{"--max-concurrent-streams", "200"}, + expectedVal: 200, + }, + { + name: "argument has the same value from the default one", + defaultVal: 105, + arguments: []string{"--max-concurrent-streams", "105"}, + expectedVal: 105, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fs := flag.NewFlagSet("etcd", flag.ContinueOnError) + fs.Var(NewUint32Value(tc.defaultVal), flagName, "Maximum concurrent streams that each client can open at a time.") + if err := fs.Parse(tc.arguments); err != nil { + t.Fatalf("Unexpected error: %v\n", err) + } + actualMaxStream := Uint32FromFlag(fs, flagName) + assert.Equal(t, actualMaxStream, tc.expectedVal) + }) + } +}