set max concurrent streams to the http2 server

The default max stream is 250 in http2. When there are more then
250 streams, the client side may be blocked until some previous
streams are released. So we need to support configuring a larger
`MaxConcurrentStreams`.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
Benjamin Wang 2022-06-28 05:32:29 +08:00
parent d8347ec683
commit 053ba95ed5
7 changed files with 21 additions and 14 deletions

View File

@ -129,8 +129,8 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint
// MaxConcurrentStreams optionally specifies the number of concurrent
// streams that each client may have open at a time.
// MaxConcurrentStreams specifies the max number of concurrent
// streams that the server will accept.
MaxConcurrentStreams uint32
WarningApplyDuration time.Duration

View File

@ -207,8 +207,8 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`
// MaxConcurrentStreams optionally specifies the number of concurrent
// streams that each client may have open at a time.
// MaxConcurrentStreams specifies the number of concurrent streams
// that the server can accept.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`
LPUrls, LCUrls []url.URL

View File

@ -337,7 +337,10 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
zap.String("initial-cluster-state", ec.ClusterState),
zap.String("initial-cluster-token", sc.InitialClusterToken),
zap.Int64("quota-size-bytes", quota),
zap.Int64("quota-backend-bytes", quota),
zap.Uint("max-request-bytes", sc.MaxRequestBytes),
zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams),
zap.Bool("pre-vote", sc.PreVote),
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),

View File

@ -30,6 +30,7 @@ import (
"go.etcd.io/etcd/client/v3/credentials"
"go.etcd.io/etcd/pkg/v3/debugutil"
"go.etcd.io/etcd/pkg/v3/httputil"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3election"
@ -149,7 +150,8 @@ func (sctx *serveCtx) serve(
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if err := setMaxConcurrentStreams(srvhttp, s.Cfg.MaxConcurrentStreams); err != nil {
if err := configureHttpServer(srvhttp, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
}
httpl := m.Match(cmux.HTTP1())
@ -201,7 +203,8 @@ func (sctx *serveCtx) serve(
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := setMaxConcurrentStreams(srv, s.Cfg.MaxConcurrentStreams); err != nil {
if err := configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
go func() { errHandler(srv.Serve(tlsl)) }()
@ -216,10 +219,10 @@ func (sctx *serveCtx) serve(
return m.Serve()
}
// setMaxConcurrentStreams sets the maxConcurrentStreams for the http server
func setMaxConcurrentStreams(srv *http.Server, maxConcurrentStreams uint32) error {
func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error {
// todo (ahrtr): should we support configuring other parameters in the future as well?
return http2.ConfigureServer(srv, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
})
}

View File

@ -138,12 +138,12 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.")
fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.")
fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.")
// raft connection timeouts
fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection")
fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection")
fs.Var(flags.NewUint32Value(""), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.")
// clustering
fs.Var(
flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""),

View File

@ -217,10 +217,11 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
httpClient := mustNewHTTPClient(lg)
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
if err := http2.ConfigureServer(srvhttp, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
}); err != nil {
panic(err)
lg.Fatal("Failed to configure the http server", zap.Error(err))
}
errc := make(chan error, 3)

View File

@ -81,7 +81,7 @@ Member:
Maximum number of operations permitted in a transaction.
--max-request-bytes '1572864'
Maximum client request size in bytes the server will accept.
--max-concurrent-streams '0'
--max-concurrent-streams 'math.MaxUint32'
Maximum concurrent streams that each client may have open at a time.
--grpc-keepalive-min-time '5s'
Minimum duration interval that a client should wait before pinging server.