From 053ba95ed5447d43a7c27a6243f7566c6d80097b Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 28 Jun 2022 05:32:29 +0800 Subject: [PATCH] set max concurrent streams to the http2 server The default max stream is 250 in http2. When there are more then 250 streams, the client side may be blocked until some previous streams are released. So we need to support configuring a larger `MaxConcurrentStreams`. Signed-off-by: Benjamin Wang --- server/config/config.go | 4 ++-- server/embed/config.go | 4 ++-- server/embed/etcd.go | 5 ++++- server/embed/serve.go | 13 ++++++++----- server/etcdmain/config.go | 4 ++-- server/etcdmain/grpc_proxy.go | 3 ++- server/etcdmain/help.go | 2 +- 7 files changed, 21 insertions(+), 14 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 625f019e4..d471035d8 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -129,8 +129,8 @@ type ServerConfig struct { // MaxRequestBytes is the maximum request size to send over raft. MaxRequestBytes uint - // MaxConcurrentStreams optionally specifies the number of concurrent - // streams that each client may have open at a time. + // MaxConcurrentStreams specifies the max number of concurrent + // streams that the server will accept. MaxConcurrentStreams uint32 WarningApplyDuration time.Duration diff --git a/server/embed/config.go b/server/embed/config.go index 21d38ca84..12cf65698 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -207,8 +207,8 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` - // MaxConcurrentStreams optionally specifies the number of concurrent - // streams that each client may have open at a time. + // MaxConcurrentStreams specifies the number of concurrent streams + // that the server can accept. MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` LPUrls, LCUrls []url.URL diff --git a/server/embed/etcd.go b/server/embed/etcd.go index b557e5d3e..564ad5e7a 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -337,7 +337,10 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.String("initial-cluster", sc.InitialPeerURLsMap.String()), zap.String("initial-cluster-state", ec.ClusterState), zap.String("initial-cluster-token", sc.InitialClusterToken), - zap.Int64("quota-size-bytes", quota), + zap.Int64("quota-backend-bytes", quota), + zap.Uint("max-request-bytes", sc.MaxRequestBytes), + zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams), + zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), diff --git a/server/embed/serve.go b/server/embed/serve.go index 5e162f3c8..08a1dc841 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/client/v3/credentials" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/pkg/v3/httputil" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" "go.etcd.io/etcd/server/v3/etcdserver/api/v3election" @@ -149,7 +150,8 @@ func (sctx *serveCtx) serve( Handler: createAccessController(sctx.lg, s, httpmux), ErrorLog: logger, // do not log user error } - if err := setMaxConcurrentStreams(srvhttp, s.Cfg.MaxConcurrentStreams); err != nil { + if err := configureHttpServer(srvhttp, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) return err } httpl := m.Match(cmux.HTTP1()) @@ -201,7 +203,8 @@ func (sctx *serveCtx) serve( TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } - if err := setMaxConcurrentStreams(srv, s.Cfg.MaxConcurrentStreams); err != nil { + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure https server failed", zap.Error(err)) return err } go func() { errHandler(srv.Serve(tlsl)) }() @@ -216,10 +219,10 @@ func (sctx *serveCtx) serve( return m.Serve() } -// setMaxConcurrentStreams sets the maxConcurrentStreams for the http server -func setMaxConcurrentStreams(srv *http.Server, maxConcurrentStreams uint32) error { +func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error { + // todo (ahrtr): should we support configuring other parameters in the future as well? return http2.ConfigureServer(srv, &http2.Server{ - MaxConcurrentStreams: maxConcurrentStreams, + MaxConcurrentStreams: cfg.MaxConcurrentStreams, }) } diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 88f5b73ec..96b54cada 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -138,12 +138,12 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.") fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.") + fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.") + // raft connection timeouts fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") - fs.Var(flags.NewUint32Value(""), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.") - // clustering fs.Var( flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""), diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 63f4d2e1f..492171430 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -217,10 +217,11 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { httpClient := mustNewHTTPClient(lg) srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient) + if err := http2.ConfigureServer(srvhttp, &http2.Server{ MaxConcurrentStreams: maxConcurrentStreams, }); err != nil { - panic(err) + lg.Fatal("Failed to configure the http server", zap.Error(err)) } errc := make(chan error, 3) diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 3483e96bf..c8bc56bc2 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -81,7 +81,7 @@ Member: Maximum number of operations permitted in a transaction. --max-request-bytes '1572864' Maximum client request size in bytes the server will accept. - --max-concurrent-streams '0' + --max-concurrent-streams 'math.MaxUint32' Maximum concurrent streams that each client may have open at a time. --grpc-keepalive-min-time '5s' Minimum duration interval that a client should wait before pinging server.