mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Add flag --max-concurrent-streams
to set the max concurrent stream each client can open at a time
Also refer to https://github.com/etcd-io/etcd/pull/14169#discussion_r917154243 Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
parent
40d1a43176
commit
437f3778d0
@ -120,6 +120,10 @@ type ServerConfig struct {
|
||||
// MaxRequestBytes is the maximum request size to send over raft.
|
||||
MaxRequestBytes uint
|
||||
|
||||
// MaxConcurrentStreams specifies the maximum number of concurrent
|
||||
// streams that each client can open at a time.
|
||||
MaxConcurrentStreams uint32
|
||||
|
||||
WarningApplyDuration time.Duration
|
||||
|
||||
StrictReconfigCheck bool
|
||||
|
@ -17,6 +17,7 @@ package embed
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@ -55,6 +56,7 @@ const (
|
||||
DefaultMaxTxnOps = uint(128)
|
||||
DefaultWarningApplyDuration = 100 * time.Millisecond
|
||||
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
|
||||
DefaultMaxConcurrentStreams = math.MaxUint32
|
||||
DefaultGRPCKeepAliveMinTime = 5 * time.Second
|
||||
DefaultGRPCKeepAliveInterval = 2 * time.Hour
|
||||
DefaultGRPCKeepAliveTimeout = 20 * time.Second
|
||||
@ -198,6 +200,10 @@ type Config struct {
|
||||
MaxTxnOps uint `json:"max-txn-ops"`
|
||||
MaxRequestBytes uint `json:"max-request-bytes"`
|
||||
|
||||
// MaxConcurrentStreams specifies the maximum number of concurrent
|
||||
// streams that each client can open at a time.
|
||||
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`
|
||||
|
||||
LPUrls, LCUrls []url.URL
|
||||
APUrls, ACUrls []url.URL
|
||||
ClientTLSInfo transport.TLSInfo
|
||||
@ -305,7 +311,7 @@ type Config struct {
|
||||
AuthToken string `json:"auth-token"`
|
||||
BcryptCost uint `json:"bcrypt-cost"`
|
||||
|
||||
//The AuthTokenTTL in seconds of the simple token
|
||||
// AuthTokenTTL specifies the TTL in seconds of the simple token
|
||||
AuthTokenTTL uint `json:"auth-token-ttl"`
|
||||
|
||||
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
|
||||
@ -448,6 +454,7 @@ func NewConfig() *Config {
|
||||
|
||||
MaxTxnOps: DefaultMaxTxnOps,
|
||||
MaxRequestBytes: DefaultMaxRequestBytes,
|
||||
MaxConcurrentStreams: DefaultMaxConcurrentStreams,
|
||||
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,
|
||||
|
||||
GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
|
||||
|
@ -199,6 +199,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
BackendBatchInterval: cfg.BackendBatchInterval,
|
||||
MaxTxnOps: cfg.MaxTxnOps,
|
||||
MaxRequestBytes: cfg.MaxRequestBytes,
|
||||
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
|
||||
SocketOpts: cfg.SocketOpts,
|
||||
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
||||
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
||||
@ -336,7 +337,10 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
|
||||
zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
|
||||
zap.String("initial-cluster-state", ec.ClusterState),
|
||||
zap.String("initial-cluster-token", sc.InitialClusterToken),
|
||||
zap.Int64("quota-size-bytes", quota),
|
||||
zap.Int64("quota-backend-bytes", quota),
|
||||
zap.Uint("max-request-bytes", sc.MaxRequestBytes),
|
||||
zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams),
|
||||
|
||||
zap.Bool("pre-vote", sc.PreVote),
|
||||
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
|
||||
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"go.etcd.io/etcd/client/v3/credentials"
|
||||
"go.etcd.io/etcd/pkg/v3/debugutil"
|
||||
"go.etcd.io/etcd/pkg/v3/httputil"
|
||||
"go.etcd.io/etcd/server/v3/config"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3election"
|
||||
@ -43,6 +44,7 @@ import (
|
||||
"github.com/soheilhy/cmux"
|
||||
"github.com/tmc/grpc-websocket-proxy/wsproxy"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
@ -133,6 +135,10 @@ func (sctx *serveCtx) serve(
|
||||
Handler: createAccessController(sctx.lg, s, httpmux),
|
||||
ErrorLog: logger, // do not log user error
|
||||
}
|
||||
if err := configureHttpServer(srvhttp, s.Cfg); err != nil {
|
||||
sctx.lg.Error("Configure http server failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
httpl := m.Match(cmux.HTTP1())
|
||||
go func() { errHandler(srvhttp.Serve(httpl)) }()
|
||||
|
||||
@ -182,6 +188,10 @@ func (sctx *serveCtx) serve(
|
||||
TLSConfig: tlscfg,
|
||||
ErrorLog: logger, // do not log user error
|
||||
}
|
||||
if err := configureHttpServer(srv, s.Cfg); err != nil {
|
||||
sctx.lg.Error("Configure https server failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
go func() { errHandler(srv.Serve(tlsl)) }()
|
||||
|
||||
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
|
||||
@ -195,6 +205,13 @@ func (sctx *serveCtx) serve(
|
||||
return m.Serve()
|
||||
}
|
||||
|
||||
func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error {
|
||||
// todo (ahrtr): should we support configuring other parameters in the future as well?
|
||||
return http2.ConfigureServer(srv, &http2.Server{
|
||||
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
|
||||
})
|
||||
}
|
||||
|
||||
// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
|
||||
// connections or otherHandler otherwise. Given in gRPC docs.
|
||||
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
|
||||
|
@ -171,6 +171,8 @@ func newConfig() *config {
|
||||
fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.")
|
||||
fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.")
|
||||
|
||||
fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.")
|
||||
|
||||
// raft connection timeouts
|
||||
fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection")
|
||||
fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection")
|
||||
@ -396,6 +398,8 @@ func (cfg *config) configFromCmdLine() error {
|
||||
|
||||
cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites")
|
||||
|
||||
cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams")
|
||||
|
||||
cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs")
|
||||
|
||||
cfg.ec.ClusterState = cfg.cf.clusterState.String()
|
||||
|
@ -47,6 +47,7 @@ import (
|
||||
"github.com/soheilhy/cmux"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@ -95,6 +96,8 @@ var (
|
||||
grpcKeepAliveMinTime time.Duration
|
||||
grpcKeepAliveTimeout time.Duration
|
||||
grpcKeepAliveInterval time.Duration
|
||||
|
||||
maxConcurrentStreams uint32
|
||||
)
|
||||
|
||||
const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
|
||||
@ -159,6 +162,8 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||
|
||||
cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
|
||||
|
||||
cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.")
|
||||
|
||||
return &cmd
|
||||
}
|
||||
|
||||
@ -212,6 +217,13 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
httpClient := mustNewHTTPClient(lg)
|
||||
|
||||
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
|
||||
|
||||
if err := http2.ConfigureServer(srvhttp, &http2.Server{
|
||||
MaxConcurrentStreams: maxConcurrentStreams,
|
||||
}); err != nil {
|
||||
lg.Fatal("Failed to configure the http server", zap.Error(err))
|
||||
}
|
||||
|
||||
errc := make(chan error, 3)
|
||||
go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
|
||||
go func() { errc <- srvhttp.Serve(httpl) }()
|
||||
|
@ -80,6 +80,8 @@ Member:
|
||||
Maximum number of operations permitted in a transaction.
|
||||
--max-request-bytes '1572864'
|
||||
Maximum client request size in bytes the server will accept.
|
||||
--max-concurrent-streams 'math.MaxUint32'
|
||||
Maximum concurrent streams that each client can open at a time.
|
||||
--grpc-keepalive-min-time '5s'
|
||||
Minimum duration interval that a client should wait before pinging server.
|
||||
--grpc-keepalive-interval '2h'
|
||||
|
@ -32,7 +32,6 @@ import (
|
||||
|
||||
const (
|
||||
grpcOverheadBytes = 512 * 1024
|
||||
maxStreams = math.MaxUint32
|
||||
maxSendBytes = math.MaxInt32
|
||||
)
|
||||
|
||||
@ -68,7 +67,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
|
||||
|
||||
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
|
||||
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
|
||||
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
|
||||
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))
|
||||
|
||||
grpcServer := grpc.NewServer(append(opts, gopts...)...)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user