Merge pull request #15446 from serathius/separate-grpc-server

Allow user to separate http and grpc server
This commit is contained in:
Marek Siarkowicz 2023-03-30 11:52:25 +02:00 committed by GitHub
commit 0bd0b6b0b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 433 additions and 214 deletions

View File

@ -211,12 +211,12 @@ type Config struct {
// streams that each client can open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`
ListenPeerUrls, ListenClientUrls []url.URL
AdvertisePeerUrls, AdvertiseClientUrls []url.URL
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerTLSInfo transport.TLSInfo
PeerAutoTLS bool
ListenPeerUrls, ListenClientUrls, ListenClientHttpUrls []url.URL
AdvertisePeerUrls, AdvertiseClientUrls []url.URL
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerTLSInfo transport.TLSInfo
PeerAutoTLS bool
// SelfSignedCertValidity specifies the validity period of the client and peer certificates
// that are automatically generated by etcd when you specify ClientAutoTLS and PeerAutoTLS,
// the unit is year, and the default is 1
@ -439,10 +439,11 @@ type configYAML struct {
// configJSON has file options that are translated into Config options
type configJSON struct {
ListenPeerUrls string `json:"listen-peer-urls"`
ListenClientUrls string `json:"listen-client-urls"`
AdvertisePeerUrls string `json:"initial-advertise-peer-urls"`
AdvertiseClientUrls string `json:"advertise-client-urls"`
ListenPeerUrls string `json:"listen-peer-urls"`
ListenClientUrls string `json:"listen-client-urls"`
ListenClientHttpUrls string `json:"listen-client-http-urls"`
AdvertisePeerUrls string `json:"initial-advertise-peer-urls"`
AdvertiseClientUrls string `json:"advertise-client-urls"`
CORSJSON string `json:"cors"`
HostWhitelistJSON string `json:"host-whitelist"`
@ -589,6 +590,15 @@ func (cfg *configYAML) configFromFile(path string) error {
cfg.Config.ListenClientUrls = u
}
if cfg.configJSON.ListenClientHttpUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientHttpUrls, ","))
if err != nil {
fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-http-urls: %v\n", err)
os.Exit(1)
}
cfg.Config.ListenClientHttpUrls = u
}
if cfg.configJSON.AdvertisePeerUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ","))
if err != nil {
@ -688,6 +698,12 @@ func (cfg *Config) Validate() error {
if err := checkBindURLs(cfg.ListenClientUrls); err != nil {
return err
}
if err := checkBindURLs(cfg.ListenClientHttpUrls); err != nil {
return err
}
if len(cfg.ListenClientHttpUrls) == 0 {
cfg.logger.Warn("Running http and grpc server on single port. This is not recommended for production.")
}
if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil {
return err
}
@ -957,9 +973,12 @@ func (cfg *Config) ClientSelfCert() (err error) {
cfg.logger.Warn("ignoring client auto TLS since certs given")
return nil
}
chosts := make([]string, len(cfg.ListenClientUrls))
for i, u := range cfg.ListenClientUrls {
chosts[i] = u.Host
chosts := make([]string, 0, len(cfg.ListenClientUrls)+len(cfg.ListenClientHttpUrls))
for _, u := range cfg.ListenClientUrls {
chosts = append(chosts, u.Host)
}
for _, u := range cfg.ListenClientHttpUrls {
chosts = append(chosts, u.Host)
}
cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts, cfg.SelfSignedCertValidity)
if err != nil {
@ -1094,6 +1113,14 @@ func (cfg *Config) getListenClientUrls() (ss []string) {
return ss
}
func (cfg *Config) getListenClientHttpUrls() (ss []string) {
ss = make([]string, len(cfg.ListenClientHttpUrls))
for i := range cfg.ListenClientHttpUrls {
ss[i] = cfg.ListenClientHttpUrls[i].String()
}
return ss
}
func (cfg *Config) getMetricsURLs() (ss []string) {
ss = make([]string, len(cfg.ListenMetricsUrls))
for i := range cfg.ListenMetricsUrls {

View File

@ -19,6 +19,7 @@ import (
"fmt"
"io"
defaultLog "log"
"math"
"net"
"net/http"
"net/url"
@ -32,6 +33,7 @@ import (
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v3/credentials"
"go.etcd.io/etcd/pkg/v3/debugutil"
runtimeutil "go.etcd.io/etcd/pkg/v3/runtime"
"go.etcd.io/etcd/server/v3/config"
@ -45,6 +47,7 @@ import (
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
@ -456,11 +459,16 @@ func (e *Etcd) Close() {
func stopServers(ctx context.Context, ss *servers) {
// first, close the http.Server
ss.http.Shutdown(ctx)
// do not grpc.Server.GracefulStop with TLS enabled etcd server
if ss.http != nil {
ss.http.Shutdown(ctx)
}
if ss.grpc == nil {
return
}
// do not grpc.Server.GracefulStop when grpc runs under http server
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// and https://github.com/etcd-io/etcd/issues/8916
if ss.secure {
if ss.secure && ss.http != nil {
ss.grpc.Stop()
return
}
@ -618,8 +626,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
}
sctxs = make(map[string]*serveCtx)
for _, u := range cfg.ListenClientUrls {
sctx := newServeCtx(cfg.logger)
for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) {
if u.Scheme == "http" || u.Scheme == "unix" {
if !cfg.ClientTLSInfo.Empty() {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String()))
@ -631,24 +638,41 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String())
}
}
network := "tcp"
addr := u.Host
if u.Scheme == "unix" || u.Scheme == "unixs" {
network = "unix"
addr = u.Host + u.Path
for _, u := range cfg.ListenClientUrls {
addr, secure, network := resolveUrl(u)
sctx := sctxs[addr]
if sctx == nil {
sctx = newServeCtx(cfg.logger)
sctxs[addr] = sctx
}
sctx.secure = sctx.secure || secure
sctx.insecure = sctx.insecure || !secure
sctx.scheme = u.Scheme
sctx.addr = addr
sctx.network = network
}
for _, u := range cfg.ListenClientHttpUrls {
addr, secure, network := resolveUrl(u)
sctx.secure = u.Scheme == "https" || u.Scheme == "unixs"
sctx.insecure = !sctx.secure
if oldctx := sctxs[addr]; oldctx != nil {
oldctx.secure = oldctx.secure || sctx.secure
oldctx.insecure = oldctx.insecure || sctx.insecure
continue
sctx := sctxs[addr]
if sctx == nil {
sctx = newServeCtx(cfg.logger)
sctxs[addr] = sctx
} else if !sctx.httpOnly {
return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String())
}
sctx.secure = sctx.secure || secure
sctx.insecure = sctx.insecure || !secure
sctx.scheme = u.Scheme
sctx.addr = addr
sctx.network = network
sctx.httpOnly = true
}
if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme,
for _, sctx := range sctxs {
if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme,
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithSkipTLSInfoCheck(true),
); err != nil {
@ -656,7 +680,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
}
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
// hosts that disable ipv6. So, use the address given by the user.
sctx.addr = addr
if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
if fdLimit <= reservedInternalFDNum {
@ -669,17 +692,17 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
}
defer func(u url.URL) {
if err == nil {
defer func(addr string) {
if err == nil || sctx.l == nil {
return
}
sctx.l.Close()
cfg.logger.Warn(
"closing peer listener",
zap.String("address", u.Host),
zap.String("address", addr),
zap.Error(err),
)
}(u)
}(sctx.addr)
for k := range cfg.UserHandlers {
sctx.userHandlers[k] = cfg.UserHandlers[k]
}
@ -690,11 +713,21 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
if cfg.LogLevel == "debug" {
sctx.registerTrace()
}
sctxs[addr] = sctx
}
return sctxs, nil
}
func resolveUrl(u url.URL) (addr string, secure bool, network string) {
addr = u.Host
network = "tcp"
if u.Scheme == "unix" || u.Scheme == "unixs" {
addr = u.Host + u.Path
network = "unix"
}
secure = u.Scheme == "https" || u.Scheme == "unixs"
return addr, secure, network
}
func (e *Etcd) serveClients() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
e.cfg.logger.Info(
@ -726,15 +759,68 @@ func (e *Etcd) serveClients() (err error) {
}))
}
splitHttp := false
for _, sctx := range e.sctxs {
if sctx.httpOnly {
splitHttp = true
}
}
// start client servers in each goroutine
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, gopts...))
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...))
}(sctx)
}
return nil
}
func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
if !e.cfg.EnableGRPCGateway {
return nil
}
sctx := e.pickGrpcGatewayServeContext(splitHttp)
addr := sctx.addr
if network := sctx.network; network == "unix" {
// explicitly define unix network for gRPC socket support
addr = fmt.Sprintf("%s:%s", network, addr)
}
opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))}
if sctx.secure {
tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig()
if tlsErr != nil {
return func(ctx context.Context) (*grpc.ClientConn, error) {
return nil, tlsErr
}
}
dtls := tlscfg.Clone()
// trust local server
dtls.InsecureSkipVerify = true
bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
opts = append(opts, grpc.WithTransportCredentials(bundle.TransportCredentials()))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
return func(ctx context.Context) (*grpc.ClientConn, error) {
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err))
return nil, err
}
return conn, err
}
}
func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx {
for _, sctx := range e.sctxs {
if !splitHttp || !sctx.httpOnly {
return sctx
}
}
panic("Expect at least one context able to serve grpc")
}
func (e *Etcd) serveMetrics() (err error) {
if e.cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()

View File

@ -19,7 +19,6 @@ import (
"fmt"
"io"
defaultLog "log"
"math"
"net"
"net/http"
"strings"
@ -27,7 +26,6 @@ import (
etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/v3/credentials"
"go.etcd.io/etcd/pkg/v3/debugutil"
"go.etcd.io/etcd/pkg/v3/httputil"
"go.etcd.io/etcd/server/v3/config"
@ -48,16 +46,18 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type serveCtx struct {
lg *zap.Logger
l net.Listener
lg *zap.Logger
l net.Listener
scheme string
addr string
network string
secure bool
insecure bool
httpOnly bool
ctx context.Context
cancel context.CancelFunc
@ -95,6 +95,8 @@ func (sctx *serveCtx) serve(
tlsinfo *transport.TLSInfo,
handler http.Handler,
errHandler func(error),
grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error),
splitHttp bool,
gopts ...grpc.ServerOption) (err error) {
logger := defaultLog.New(io.Discard, "etcdhttp", 0)
@ -110,132 +112,158 @@ func (sctx *serveCtx) serve(
sctx.lg.Info("ready to serve client requests")
m := cmux.New(sctx.l)
var server func() error
onlyGRPC := splitHttp && !sctx.httpOnly
onlyHttp := splitHttp && sctx.httpOnly
grpcEnabled := !onlyHttp
httpEnabled := !onlyGRPC
v3c := v3client.New(s)
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)
// Make sure serversC is closed even if we prematurely exit the function.
defer close(sctx.serversC)
var gwmux *gw.ServeMux
if s.Cfg.EnableGRPCGateway {
// GRPC gateway connects to grpc server via connection provided by grpc dial.
gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends)
if err != nil {
sctx.lg.Error("registerGateway failed", zap.Error(err))
return err
}
}
var traffic string
switch {
case onlyGRPC:
traffic = "grpc"
case onlyHttp:
traffic = "http"
default:
traffic = "grpc+http"
}
if sctx.insecure {
gs := v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
gs.Stop()
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
var gs *grpc.Server
var srv *http.Server
if httpEnabled {
httpmux := sctx.createMux(gwmux, handler)
srv = &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
}(gs)
grpcl := m.Match(cmux.HTTP2())
go func(gs *grpc.Server, grpcLis net.Listener) {
errHandler(gs.Serve(grpcLis))
}(gs, grpcl)
var gwmux *gw.ServeMux
if s.Cfg.EnableGRPCGateway {
gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())})
if err != nil {
sctx.lg.Error("registerGateway failed", zap.Error(err))
if err := configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
}
}
httpmux := sctx.createMux(gwmux, handler)
srvhttp := &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
if grpcEnabled {
gs = v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
gs.Stop()
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
}
}(gs)
}
if err := configureHttpServer(srvhttp, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
if onlyGRPC {
server = func() error {
return gs.Serve(sctx.l)
}
} else {
server = m.Serve
httpl := m.Match(cmux.HTTP1())
go func(srvhttp *http.Server, tlsLis net.Listener) {
errHandler(srvhttp.Serve(tlsLis))
}(srv, httpl)
if grpcEnabled {
grpcl := m.Match(cmux.HTTP2())
go func(gs *grpc.Server, l net.Listener) {
errHandler(gs.Serve(l))
}(gs, grpcl)
}
}
httpl := m.Match(cmux.HTTP1())
go func(srvhttp *http.Server, httpLis net.Listener) {
errHandler(srvhttp.Serve(httpLis))
}(srvhttp, httpl)
sctx.serversC <- &servers{grpc: gs, http: srvhttp}
sctx.serversC <- &servers{grpc: gs, http: srv}
sctx.lg.Info(
"serving client traffic insecurely; this is strongly discouraged!",
zap.String("traffic", traffic),
zap.String("address", sctx.l.Addr().String()),
)
}
if sctx.secure {
var gs *grpc.Server
var srv *http.Server
tlscfg, tlsErr := tlsinfo.ServerConfig()
if tlsErr != nil {
return tlsErr
}
gs := v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
gs.Stop()
sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
if grpcEnabled {
gs = v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
}(gs)
defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
gs.Stop()
sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
}
}(gs)
}
if httpEnabled {
if grpcEnabled {
handler = grpcHandlerFunc(gs, handler)
}
httpmux := sctx.createMux(gwmux, handler)
handler = grpcHandlerFunc(gs, handler)
var gwmux *gw.ServeMux
if s.Cfg.EnableGRPCGateway {
dtls := tlscfg.Clone()
// trust local server
dtls.InsecureSkipVerify = true
bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
opts := []grpc.DialOption{grpc.WithTransportCredentials(bundle.TransportCredentials())}
gwmux, err = sctx.registerGateway(opts)
if err != nil {
srv = &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
}
var tlsl net.Listener
tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
}
// TODO: add debug flag; enable logging when debug flag is set
httpmux := sctx.createMux(gwmux, handler)
if onlyGRPC {
server = func() error { return gs.Serve(sctx.l) }
} else {
server = m.Serve
srv := &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
}
go func(srvhttp *http.Server, tlsl net.Listener) {
errHandler(srvhttp.Serve(tlsl))
}(srv, tlsl)
}
if err := configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
go func(srvhttp *http.Server, tlsLis net.Listener) {
errHandler(srvhttp.Serve(tlsLis))
}(srv, tlsl)
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
sctx.lg.Info(
"serving client traffic securely",
zap.String("traffic", traffic),
zap.String("address", sctx.l.Addr().String()),
)
}
return m.Serve()
return server()
}
func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error {
@ -266,22 +294,11 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha
type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error
func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.ClientConn, error)) (*gw.ServeMux, error) {
ctx := sctx.ctx
addr := sctx.addr
if network := sctx.network; network == "unix" {
// explicitly define unix network for gRPC socket support
addr = fmt.Sprintf("%s:%s", network, addr)
}
opts = append(opts, grpc.WithDefaultCallOptions([]grpc.CallOption{
grpc.MaxCallRecvMsgSize(math.MaxInt32),
}...))
conn, err := grpc.DialContext(ctx, addr, opts...)
conn, err := dial(ctx)
if err != nil {
sctx.lg.Error("registerGateway failed to dial", zap.String("addr", addr), zap.Error(err))
return nil, err
}
gwmux := gw.NewServeMux()

View File

@ -115,7 +115,11 @@ func newConfig() *config {
)
fs.Var(
flags.NewUniqueURLsWithExceptions(embed.DefaultListenClientURLs, ""), "listen-client-urls",
"List of URLs to listen on for client traffic.",
"List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.",
)
fs.Var(
flags.NewUniqueURLsWithExceptions("", ""), "listen-client-http-urls",
"List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.",
)
fs.Var(
flags.NewUniqueURLsWithExceptions("", ""),
@ -386,6 +390,7 @@ func (cfg *config) configFromCmdLine() error {
cfg.ec.ListenPeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls")
cfg.ec.AdvertisePeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls")
cfg.ec.ListenClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls")
cfg.ec.ListenClientHttpUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-http-urls")
cfg.ec.AdvertiseClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls")
cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls")

View File

@ -37,6 +37,7 @@ func TestConfigParsingMemberFlags(t *testing.T) {
"-experimental-snapshot-catchup-entries=1000",
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
"-listen-client-http-urls=http://localhost:7002,https://localhost:7003",
// it should be set if -listen-client-urls is set
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
}
@ -60,6 +61,7 @@ func TestConfigFileMemberFields(t *testing.T) {
SnapshotCatchUpEntries uint64 `json:"experimental-snapshot-catch-up-entries"`
ListenPeerUrls string `json:"listen-peer-urls"`
ListenClientUrls string `json:"listen-client-urls"`
ListenClientHttpUrls string `json:"listen-client-http-urls"`
AdvertiseClientUrls string `json:"advertise-client-urls"`
}{
"testdir",
@ -70,6 +72,7 @@ func TestConfigFileMemberFields(t *testing.T) {
1000,
"http://localhost:8000,https://localhost:8001",
"http://localhost:7000,https://localhost:7001",
"http://localhost:7002,https://localhost:7003",
"http://localhost:7000,https://localhost:7001",
}
@ -398,6 +401,7 @@ func validateMemberFlags(t *testing.T, cfg *config) {
Dir: "testdir",
ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
ListenClientHttpUrls: []url.URL{{Scheme: "http", Host: "localhost:7002"}, {Scheme: "https", Host: "localhost:7003"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
@ -429,6 +433,9 @@ func validateMemberFlags(t *testing.T, cfg *config) {
if !reflect.DeepEqual(cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) {
t.Errorf("listen-client-urls = %v, want %v", cfg.ec.ListenClientUrls, wcfg.ListenClientUrls)
}
if !reflect.DeepEqual(cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) {
t.Errorf("listen-client-http-urls = %v, want %v", cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls)
}
}
func validateClusteringFlags(t *testing.T, cfg *config) {

View File

@ -65,7 +65,9 @@ Member:
--listen-peer-urls 'http://localhost:2380'
List of URLs to listen on for peer traffic.
--listen-client-urls 'http://localhost:2379'
List of URLs to listen on for client traffic.
List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.
--listen-client-http-urls ''
List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.
--max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `'
Maximum number of snapshot files to retain (0 is unlimited).
--max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `'

View File

@ -39,8 +39,9 @@ import (
func TestConnectionMultiplexing(t *testing.T) {
e2e.BeforeTest(t)
for _, tc := range []struct {
name string
serverTLS e2e.ClientConnType
name string
serverTLS e2e.ClientConnType
separateHttpPort bool
}{
{
name: "ServerTLS",
@ -54,10 +55,20 @@ func TestConnectionMultiplexing(t *testing.T) {
name: "ServerTLSAndNonTLS",
serverTLS: e2e.ClientTLSAndNonTLS,
},
{
name: "SeparateHTTP/ServerTLS",
serverTLS: e2e.ClientTLS,
separateHttpPort: true,
},
{
name: "SeparateHTTP/ServerNonTLS",
serverTLS: e2e.ClientNonTLS,
separateHttpPort: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
cfg := e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: tc.serverTLS}}
cfg := e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: tc.serverTLS}, ClientHttpSeparate: tc.separateHttpPort}
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&cfg))
require.NoError(t, err)
defer clus.Close()
@ -78,30 +89,32 @@ func TestConnectionMultiplexing(t *testing.T) {
name = "ClientTLS"
}
t.Run(name, func(t *testing.T) {
testConnectionMultiplexing(t, ctx, clus.EndpointsV3()[0], clientTLS)
testConnectionMultiplexing(t, ctx, clus.Procs[0], clientTLS)
})
}
})
}
}
func testConnectionMultiplexing(t *testing.T, ctx context.Context, endpoint string, connType e2e.ClientConnType) {
func testConnectionMultiplexing(t *testing.T, ctx context.Context, member e2e.EtcdProcess, connType e2e.ClientConnType) {
httpEndpoint := member.EndpointsHTTP()[0]
grpcEndpoint := member.EndpointsGRPC()[0]
switch connType {
case e2e.ClientTLS:
endpoint = e2e.ToTLS(endpoint)
httpEndpoint = e2e.ToTLS(httpEndpoint)
grpcEndpoint = e2e.ToTLS(grpcEndpoint)
case e2e.ClientNonTLS:
default:
panic(fmt.Sprintf("Unsupported conn type %v", connType))
}
t.Run("etcdctl", func(t *testing.T) {
etcdctl, err := e2e.NewEtcdctl(e2e.ClientConfig{ConnectionType: connType}, []string{endpoint})
etcdctl, err := e2e.NewEtcdctl(e2e.ClientConfig{ConnectionType: connType}, []string{grpcEndpoint})
require.NoError(t, err)
_, err = etcdctl.Get(ctx, "a", config.GetOptions{})
assert.NoError(t, err)
})
t.Run("clientv3", func(t *testing.T) {
c := newClient(t, []string{endpoint}, e2e.ClientConfig{ConnectionType: connType})
c := newClient(t, []string{grpcEndpoint}, e2e.ClientConfig{ConnectionType: connType})
_, err := c.Get(ctx, "a")
assert.NoError(t, err)
})
@ -112,11 +125,11 @@ func testConnectionMultiplexing(t *testing.T, ctx context.Context, endpoint stri
tname = "default"
}
t.Run(tname, func(t *testing.T) {
assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(endpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(endpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType))
assert.NoError(t, fetchGrpcGateway(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(httpEndpoint, httpVersion, connType))
})
}
})

View File

@ -77,15 +77,17 @@ func tlsInfo(t testing.TB, cfg e2e.ClientConfig) (*transport.TLSInfo, error) {
}
}
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error {
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error {
g := errgroup.Group{}
concurrency := 10
keyCount := 100
keysPerRoutine := keyCount / concurrency
valueSize := dbSize / keyCount
for i := 0; i < concurrency; i++ {
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize))
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize)))
if err != nil {
return err
}

View File

@ -35,29 +35,48 @@ import (
const (
watchResponsePeriod = 100 * time.Millisecond
watchTestDuration = 5 * time.Second
// TODO: Reduce maxWatchDelay when https://github.com/etcd-io/etcd/issues/15402 is addressed.
maxWatchDelay = 2 * time.Second
// Configure enough read load to cause starvation from https://github.com/etcd-io/etcd/issues/15402.
// Tweaked to pass on GitHub runner. For local runs please increase parameters.
// TODO: Increase when https://github.com/etcd-io/etcd/issues/15402 is fully addressed.
numberOfPreexistingKeys = 100
sizeOfPreexistingValues = 5000
readLoadConcurrency = 10
readLoadConcurrency = 10
)
type testCase struct {
name string
config e2e.EtcdProcessClusterConfig
name string
config e2e.EtcdProcessClusterConfig
maxWatchDelay time.Duration
dbSizeBytes int
}
const (
Kilo = 1000
Mega = 1000 * Kilo
)
// 10 MB is not a bottleneck of grpc server, but filling up etcd with data.
// Keeping it lower so tests don't take too long.
// If we implement reuse of db we could increase the dbSize.
var tcs = []testCase{
{
name: "NoTLS",
config: e2e.EtcdProcessClusterConfig{ClusterSize: 1},
name: "NoTLS",
config: e2e.EtcdProcessClusterConfig{ClusterSize: 1},
maxWatchDelay: 100 * time.Millisecond,
dbSizeBytes: 5 * Mega,
},
{
name: "ClientTLS",
config: e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: e2e.ClientTLS}},
name: "TLS",
config: e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: e2e.ClientTLS}},
maxWatchDelay: 2 * time.Second,
dbSizeBytes: 500 * Kilo,
},
{
name: "SeparateHttpNoTLS",
config: e2e.EtcdProcessClusterConfig{ClusterSize: 1, ClientHttpSeparate: true},
maxWatchDelay: 100 * time.Millisecond,
dbSizeBytes: 5 * Mega,
},
{
name: "SeparateHttpTLS",
config: e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: e2e.ClientTLS}, ClientHttpSeparate: true},
maxWatchDelay: 100 * time.Millisecond,
dbSizeBytes: 5 * Mega,
},
}
@ -71,13 +90,13 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) {
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus.EndpointsV3(), tc.config.Client)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
g := errgroup.Group{}
continuouslyExecuteGetAll(ctx, t, &g, c)
validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()))
validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()), tc.maxWatchDelay)
require.NoError(t, g.Wait())
})
}
@ -91,7 +110,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) {
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus.EndpointsV3(), tc.config.Client)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
@ -110,7 +129,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) {
time.Sleep(watchResponsePeriod)
}
})
validateWatchDelay(t, c.Watch(ctx, "fake-key"))
validateWatchDelay(t, c.Watch(ctx, "fake-key"), tc.maxWatchDelay)
require.NoError(t, g.Wait())
})
}
@ -124,7 +143,7 @@ func TestWatchDelayForEvent(t *testing.T) {
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus.EndpointsV3(), tc.config.Client)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
@ -144,13 +163,13 @@ func TestWatchDelayForEvent(t *testing.T) {
}
})
continuouslyExecuteGetAll(ctx, t, &g, c)
validateWatchDelay(t, c.Watch(ctx, "key"))
validateWatchDelay(t, c.Watch(ctx, "key"), tc.maxWatchDelay)
require.NoError(t, g.Wait())
})
}
}
func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) {
func validateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) {
start := time.Now()
var maxDelay time.Duration
for range watch {
@ -181,7 +200,7 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr
for i := 0; i < readLoadConcurrency; i++ {
g.Go(func() error {
for {
_, err := c.Get(ctx, "", clientv3.WithPrefix())
resp, err := c.Get(ctx, "", clientv3.WithPrefix())
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
return nil
@ -189,8 +208,12 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr
return err
}
}
respSize := 0
for _, kv := range resp.Kvs {
respSize += kv.Size()
}
mux.Lock()
size += numberOfPreexistingKeys * sizeOfPreexistingValues
size += respSize
mux.Unlock()
}
})

View File

@ -151,10 +151,11 @@ type EtcdProcessClusterConfig struct {
SnapshotCount int // default is 10000
SnapshotCatchUpEntries int // default is 5000
Client ClientConfig
IsPeerTLS bool
IsPeerAutoTLS bool
CN bool
Client ClientConfig
ClientHttpSeparate bool
IsPeerTLS bool
IsPeerAutoTLS bool
CN bool
CipherSuites []string
@ -457,22 +458,20 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer
func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig {
var curls []string
var curl, curltls string
var curl string
port := cfg.BasePort + 5*i
clientPort := port
peerPort := port + 1
metricsPort := port + 2
peer2Port := port + 3
clientHttpPort := port + 4
curlHost := fmt.Sprintf("localhost:%d", clientPort)
switch cfg.Client.ConnectionType {
case ClientNonTLS, ClientTLS:
curl = (&url.URL{Scheme: cfg.ClientScheme(), Host: curlHost}).String()
if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(clientPort, ClientNonTLS)
curls = []string{curl, clientURL(clientPort, ClientTLS)}
} else {
curl = clientURL(clientPort, cfg.Client.ConnectionType)
curls = []string{curl}
case ClientTLSAndNonTLS:
curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
curls = []string{curl, curltls}
}
peerListenUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
@ -511,6 +510,11 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount),
}
var clientHttpUrl string
if cfg.ClientHttpSeparate {
clientHttpUrl = clientURL(clientHttpPort, cfg.Client.ConnectionType)
args = append(args, "--listen-client-http-urls", clientHttpUrl)
}
if cfg.ForceNewCluster {
args = append(args, "--force-new-cluster")
@ -619,21 +623,34 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
}
return &EtcdServerProcessConfig{
lg: cfg.Logger,
ExecPath: execPath,
Args: args,
EnvVars: envVars,
TlsArgs: cfg.TlsArgs(),
Client: cfg.Client,
DataDirPath: dataDirPath,
KeepDataDir: cfg.KeepDataDir,
Name: name,
PeerURL: peerAdvertiseUrl,
ClientURL: curl,
MetricsURL: murl,
InitialToken: cfg.InitialToken,
GoFailPort: gofailPort,
Proxy: proxyCfg,
lg: cfg.Logger,
ExecPath: execPath,
Args: args,
EnvVars: envVars,
TlsArgs: cfg.TlsArgs(),
Client: cfg.Client,
DataDirPath: dataDirPath,
KeepDataDir: cfg.KeepDataDir,
Name: name,
PeerURL: peerAdvertiseUrl,
ClientURL: curl,
ClientHTTPURL: clientHttpUrl,
MetricsURL: murl,
InitialToken: cfg.InitialToken,
GoFailPort: gofailPort,
Proxy: proxyCfg,
}
}
func clientURL(port int, connType ClientConnType) string {
curlHost := fmt.Sprintf("localhost:%d", port)
switch connType {
case ClientNonTLS:
return (&url.URL{Scheme: "http", Host: curlHost}).String()
case ClientTLS:
return (&url.URL{Scheme: "https", Host: curlHost}).String()
default:
panic(fmt.Sprintf("Unsupported connection type %v", connType))
}
}
@ -687,6 +704,14 @@ func (epc *EtcdProcessCluster) EndpointsV3() []string {
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsV3() })
}
func (epc *EtcdProcessCluster) EndpointsGRPC() []string {
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsGRPC() })
}
func (epc *EtcdProcessCluster) EndpointsHTTP() []string {
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsHTTP() })
}
func (epc *EtcdProcessCluster) Endpoints(f func(ep EtcdProcess) []string) (ret []string) {
for _, p := range epc.Procs {
ret = append(ret, f(p)...)

View File

@ -58,8 +58,10 @@ func NewProxyEtcdProcess(cfg *EtcdServerProcessConfig) (*proxyEtcdProcess, error
func (p *proxyEtcdProcess) Config() *EtcdServerProcessConfig { return p.etcdProc.Config() }
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsGRPC() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsMetrics() []string {
panic("not implemented; proxy doesn't provide health information")
}

View File

@ -43,6 +43,8 @@ var (
type EtcdProcess interface {
EndpointsV2() []string
EndpointsV3() []string
EndpointsGRPC() []string
EndpointsHTTP() []string
EndpointsMetrics() []string
Client(opts ...config.ClientOption) *EtcdctlV3
@ -86,9 +88,10 @@ type EtcdServerProcessConfig struct {
Name string
PeerURL url.URL
ClientURL string
MetricsURL string
PeerURL url.URL
ClientURL string
ClientHTTPURL string
MetricsURL string
InitialToken string
InitialCluster string
@ -113,8 +116,15 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err
return ep, nil
}
func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.ClientURL} }
func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
func (ep *EtcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() }
func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsGRPC() }
func (ep *EtcdServerProcess) EndpointsGRPC() []string { return []string{ep.cfg.ClientURL} }
func (ep *EtcdServerProcess) EndpointsHTTP() []string {
if ep.cfg.ClientHTTPURL == "" {
return []string{ep.cfg.ClientURL}
}
return []string{ep.cfg.ClientHTTPURL}
}
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.MetricsURL} }
func (epc *EtcdServerProcess) Client(opts ...config.ClientOption) *EtcdctlV3 {