server: Add --listen-client-http-urls flag to allow running grpc server separate from http server

Difference in load configuration for watch delay tests show how huge the
impact is. Even with random write scheduler grpc under http
server can only handle 500 KB with 2 seconds delay. On the other hand,
separate grpc server easily hits 10, 100 or even 1000 MB within 100 miliseconds.

Priority write scheduler that was used in most previous releases
is far worse than random one.

Tests configured to only 5 MB to avoid flakes and taking too long to fill
etcd.

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz
2023-03-10 17:33:46 +01:00
parent 2d5f48a7ef
commit c0421c7330
10 changed files with 277 additions and 131 deletions

View File

@@ -205,7 +205,7 @@ type Config struct {
// streams that each client can open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`
ListenPeerUrls, ListenClientUrls []url.URL
ListenPeerUrls, ListenClientUrls, ListenClientHttpUrls []url.URL
AdvertisePeerUrls, AdvertiseClientUrls []url.URL
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
@@ -425,6 +425,7 @@ type configYAML struct {
type configJSON struct {
ListenPeerUrls string `json:"listen-peer-urls"`
ListenClientUrls string `json:"listen-client-urls"`
ListenClientHttpUrls string `json:"listen-client-http-urls"`
AdvertisePeerUrls string `json:"initial-advertise-peer-urls"`
AdvertiseClientUrls string `json:"advertise-client-urls"`
@@ -557,6 +558,15 @@ func (cfg *configYAML) configFromFile(path string) error {
cfg.Config.ListenClientUrls = u
}
if cfg.configJSON.ListenClientHttpUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientHttpUrls, ","))
if err != nil {
fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-http-urls: %v\n", err)
os.Exit(1)
}
cfg.Config.ListenClientHttpUrls = u
}
if cfg.configJSON.AdvertisePeerUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ","))
if err != nil {
@@ -656,6 +666,12 @@ func (cfg *Config) Validate() error {
if err := checkBindURLs(cfg.ListenClientUrls); err != nil {
return err
}
if err := checkBindURLs(cfg.ListenClientHttpUrls); err != nil {
return err
}
if len(cfg.ListenClientHttpUrls) == 0 {
cfg.logger.Warn("Running http and grpc server on single port. This is not recommended for production.")
}
if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil {
return err
}
@@ -877,9 +893,12 @@ func (cfg *Config) ClientSelfCert() (err error) {
cfg.logger.Warn("ignoring client auto TLS since certs given")
return nil
}
chosts := make([]string, len(cfg.ListenClientUrls))
for i, u := range cfg.ListenClientUrls {
chosts[i] = u.Host
chosts := make([]string, 0, len(cfg.ListenClientUrls)+len(cfg.ListenClientHttpUrls))
for _, u := range cfg.ListenClientUrls {
chosts = append(chosts, u.Host)
}
for _, u := range cfg.ListenClientHttpUrls {
chosts = append(chosts, u.Host)
}
cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts, cfg.SelfSignedCertValidity)
if err != nil {
@@ -1014,6 +1033,14 @@ func (cfg *Config) getListenClientUrls() (ss []string) {
return ss
}
func (cfg *Config) getListenClientHttpUrls() (ss []string) {
ss = make([]string, len(cfg.ListenClientHttpUrls))
for i := range cfg.ListenClientHttpUrls {
ss[i] = cfg.ListenClientHttpUrls[i].String()
}
return ss
}
func (cfg *Config) getMetricsURLs() (ss []string) {
ss = make([]string, len(cfg.ListenMetricsUrls))
for i := range cfg.ListenMetricsUrls {

View File

@@ -440,11 +440,16 @@ func (e *Etcd) Close() {
func stopServers(ctx context.Context, ss *servers) {
// first, close the http.Server
if ss.http != nil {
ss.http.Shutdown(ctx)
// do not grpc.Server.GracefulStop with TLS enabled etcd server
}
if ss.grpc == nil {
return
}
// do not grpc.Server.GracefulStop when grpc runs under http server
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// and https://github.com/etcd-io/etcd/issues/8916
if ss.secure {
if ss.secure && ss.http != nil {
ss.grpc.Stop()
return
}
@@ -614,7 +619,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
}
sctxs = make(map[string]*serveCtx)
for _, u := range cfg.ListenClientUrls {
for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) {
if u.Scheme == "http" || u.Scheme == "unix" {
if !cfg.ClientTLSInfo.Empty() {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String()))
@@ -641,6 +646,24 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
sctx.addr = addr
sctx.network = network
}
for _, u := range cfg.ListenClientHttpUrls {
addr, secure, network := resolveUrl(u)
sctx := sctxs[addr]
if sctx == nil {
sctx = newServeCtx(cfg.logger)
sctxs[addr] = sctx
} else if !sctx.httpOnly {
return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String())
}
sctx.secure = sctx.secure || secure
sctx.insecure = sctx.insecure || !secure
sctx.scheme = u.Scheme
sctx.addr = addr
sctx.network = network
sctx.httpOnly = true
}
for _, sctx := range sctxs {
if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme,
transport.WithSocketOpts(&cfg.SocketOpts),
@@ -663,7 +686,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
}
defer func(addr string) {
if err == nil {
if err == nil || sctx.l == nil {
return
}
sctx.l.Close()
@@ -743,20 +766,27 @@ func (e *Etcd) serveClients() (err error) {
}))
}
splitHttp := false
for _, sctx := range e.sctxs {
if sctx.httpOnly {
splitHttp = true
}
}
// start client servers in each goroutine
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(), gopts...))
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...))
}(sctx)
}
return nil
}
func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
if !e.cfg.EnableGRPCGateway {
return nil
}
sctx := e.pickGrpcGatewayServeContext()
sctx := e.pickGrpcGatewayServeContext(splitHttp)
addr := sctx.addr
if network := sctx.network; network == "unix" {
// explicitly define unix network for gRPC socket support
@@ -790,10 +820,12 @@ func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.Clie
}
}
func (e *Etcd) pickGrpcGatewayServeContext() *serveCtx {
func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx {
for _, sctx := range e.sctxs {
if !splitHttp || !sctx.httpOnly {
return sctx
}
}
panic("Expect at least one context able to serve grpc")
}

View File

@@ -56,6 +56,7 @@ type serveCtx struct {
network string
secure bool
insecure bool
httpOnly bool
ctx context.Context
cancel context.CancelFunc
@@ -94,6 +95,7 @@ func (sctx *serveCtx) serve(
handler http.Handler,
errHandler func(error),
grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error),
splitHttp bool,
gopts ...grpc.ServerOption) (err error) {
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
<-s.ReadyNotify()
@@ -101,6 +103,12 @@ func (sctx *serveCtx) serve(
sctx.lg.Info("ready to serve client requests")
m := cmux.New(sctx.l)
var server func() error
onlyGRPC := splitHttp && !sctx.httpOnly
onlyHttp := splitHttp && sctx.httpOnly
grpcEnabled := !onlyHttp
httpEnabled := !onlyGRPC
v3c := v3client.New(s)
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)
@@ -116,15 +124,37 @@ func (sctx *serveCtx) serve(
return err
}
}
var traffic string
switch {
case onlyGRPC:
traffic = "grpc"
case onlyHttp:
traffic = "http"
default:
traffic = "grpc+http"
}
if sctx.insecure {
gs := v3rpc.Server(s, nil, nil, gopts...)
var gs *grpc.Server
var srv *http.Server
if httpEnabled {
httpmux := sctx.createMux(gwmux, handler)
srv = &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if err := configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
}
}
if grpcEnabled {
gs = v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
@@ -132,48 +162,51 @@ func (sctx *serveCtx) serve(
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
}
}(gs)
grpcl := m.Match(cmux.HTTP2())
go func(gs *grpc.Server, grpcLis net.Listener) {
errHandler(gs.Serve(grpcLis))
}(gs, grpcl)
httpmux := sctx.createMux(gwmux, handler)
srvhttp := &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if err := configureHttpServer(srvhttp, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
if onlyGRPC {
server = func() error {
return gs.Serve(sctx.l)
}
} else {
server = m.Serve
httpl := m.Match(cmux.HTTP1())
go func(srvhttp *http.Server, tlsLis net.Listener) {
errHandler(srvhttp.Serve(tlsLis))
}(srv, httpl)
go func(srvhttp *http.Server, httpLis net.Listener) {
errHandler(srvhttp.Serve(httpLis))
}(srvhttp, httpl)
if grpcEnabled {
grpcl := m.Match(cmux.HTTP2())
go func(gs *grpc.Server, l net.Listener) {
errHandler(gs.Serve(l))
}(gs, grpcl)
}
}
sctx.serversC <- &servers{grpc: gs, http: srvhttp}
sctx.serversC <- &servers{grpc: gs, http: srv}
sctx.lg.Info(
"serving client traffic insecurely; this is strongly discouraged!",
zap.String("traffic", traffic),
zap.String("address", sctx.l.Addr().String()),
)
}
if sctx.secure {
var gs *grpc.Server
var srv *http.Server
tlscfg, tlsErr := tlsinfo.ServerConfig()
if tlsErr != nil {
return tlsErr
}
gs := v3rpc.Server(s, tlscfg, nil, gopts...)
if grpcEnabled {
gs = v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
@@ -181,17 +214,14 @@ func (sctx *serveCtx) serve(
sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
}
}(gs)
handler = grpcHandlerFunc(gs, handler)
var tlsl net.Listener
tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
}
// TODO: add debug flag; enable logging when debug flag is set
if httpEnabled {
if grpcEnabled {
handler = grpcHandlerFunc(gs, handler)
}
httpmux := sctx.createMux(gwmux, handler)
srv := &http.Server{
srv = &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
@@ -200,20 +230,31 @@ func (sctx *serveCtx) serve(
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
}
go func(srvhttp *http.Server, tlsLis net.Listener) {
errHandler(srvhttp.Serve(tlsLis))
if onlyGRPC {
server = func() error { return gs.Serve(sctx.l) }
} else {
server = m.Serve
tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
}
go func(srvhttp *http.Server, tlsl net.Listener) {
errHandler(srvhttp.Serve(tlsl))
}(srv, tlsl)
}
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
sctx.lg.Info(
"serving client traffic securely",
zap.String("traffic", traffic),
zap.String("address", sctx.l.Addr().String()),
)
}
close(sctx.serversC)
return m.Serve()
return server()
}
func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error {

View File

@@ -146,7 +146,11 @@ func newConfig() *config {
)
fs.Var(
flags.NewUniqueURLsWithExceptions(embed.DefaultListenClientURLs, ""), "listen-client-urls",
"List of URLs to listen on for client traffic.",
"List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.",
)
fs.Var(
flags.NewUniqueURLsWithExceptions("", ""), "listen-client-http-urls",
"List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.",
)
fs.Var(
flags.NewUniqueURLsWithExceptions("", ""),
@@ -395,6 +399,7 @@ func (cfg *config) configFromCmdLine() error {
cfg.ec.ListenPeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls")
cfg.ec.AdvertisePeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls")
cfg.ec.ListenClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls")
cfg.ec.ListenClientHttpUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-http-urls")
cfg.ec.AdvertiseClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls")
cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls")

View File

@@ -36,6 +36,7 @@ func TestConfigParsingMemberFlags(t *testing.T) {
"-snapshot-count=10",
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
"-listen-client-http-urls=http://localhost:7002,https://localhost:7003",
// it should be set if -listen-client-urls is set
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
}
@@ -58,6 +59,7 @@ func TestConfigFileMemberFields(t *testing.T) {
SnapshotCount uint64 `json:"snapshot-count"`
ListenPeerUrls string `json:"listen-peer-urls"`
ListenClientUrls string `json:"listen-client-urls"`
ListenClientHttpUrls string `json:"listen-client-http-urls"`
AdvertiseClientUrls string `json:"advertise-client-urls"`
}{
"testdir",
@@ -67,6 +69,7 @@ func TestConfigFileMemberFields(t *testing.T) {
10,
"http://localhost:8000,https://localhost:8001",
"http://localhost:7000,https://localhost:7001",
"http://localhost:7002,https://localhost:7003",
"http://localhost:7000,https://localhost:7001",
}
@@ -516,6 +519,7 @@ func validateMemberFlags(t *testing.T, cfg *config) {
Dir: "testdir",
ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
ListenClientHttpUrls: []url.URL{{Scheme: "http", Host: "localhost:7002"}, {Scheme: "https", Host: "localhost:7003"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
@@ -543,6 +547,9 @@ func validateMemberFlags(t *testing.T, cfg *config) {
if !reflect.DeepEqual(cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) {
t.Errorf("listen-client-urls = %v, want %v", cfg.ec.ListenClientUrls, wcfg.ListenClientUrls)
}
if !reflect.DeepEqual(cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) {
t.Errorf("listen-client-http-urls = %v, want %v", cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls)
}
}
func validateClusteringFlags(t *testing.T, cfg *config) {

View File

@@ -63,7 +63,9 @@ Member:
--listen-peer-urls 'http://localhost:2380'
List of URLs to listen on for peer traffic.
--listen-client-urls 'http://localhost:2379'
List of URLs to listen on for client traffic.
List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.
--listen-client-http-urls ''
List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.
--max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `'
Maximum number of snapshot files to retain (0 is unlimited).
--max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `'

View File

@@ -149,6 +149,7 @@ type etcdProcessClusterConfig struct {
clientTLS clientConnType
clientCertAuthEnabled bool
clientHttpSeparate bool
isPeerTLS bool
isPeerAutoTLS bool
isClientAutoTLS bool
@@ -247,6 +248,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
var curls []string
var curl, curltls string
port := cfg.basePort + 5*i
clientHttpPort := port + 4
curlHost := fmt.Sprintf("localhost:%d", port)
switch cfg.clientTLS {
@@ -277,6 +279,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
}
if cfg.clientHttpSeparate {
clientHttpUrl := url.URL{Scheme: cfg.clientScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)}
args = append(args, "--listen-client-http-urls", clientHttpUrl.String())
}
args = addV2Args(args)
if cfg.forceNewCluster {
args = append(args, "--force-new-cluster")

View File

@@ -121,6 +121,7 @@ func (ctl *Etcdctl) cmdArgs(args ...string) []string {
func (ctl *Etcdctl) flags() map[string]string {
fmap := make(map[string]string)
if ctl.v2 {
fmap["no-sync"] = "true"
if ctl.connType == clientTLS {
fmap["ca-file"] = integration.TestTLSInfo.TrustedCAFile
fmap["cert-file"] = integration.TestTLSInfo.CertFile

View File

@@ -101,15 +101,17 @@ func tlsInfo(t testing.TB, connType clientConnType, isAutoTLS bool) (*transport.
}
}
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error {
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error {
g := errgroup.Group{}
concurrency := 10
keyCount := 100
keysPerRoutine := keyCount / concurrency
valueSize := dbSize / keyCount
for i := 0; i < concurrency; i++ {
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize))
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize)))
if err != nil {
return err
}

View File

@@ -33,29 +33,48 @@ import (
const (
watchResponsePeriod = 100 * time.Millisecond
watchTestDuration = 5 * time.Second
// TODO: Reduce maxWatchDelay when https://github.com/etcd-io/etcd/issues/15402 is addressed.
maxWatchDelay = 2 * time.Second
// Configure enough read load to cause starvation from https://github.com/etcd-io/etcd/issues/15402.
// Tweaked to pass on GitHub runner. For local runs please increase parameters.
// TODO: Increase when https://github.com/etcd-io/etcd/issues/15402 is fully addressed.
numberOfPreexistingKeys = 100
sizeOfPreexistingValues = 5000
readLoadConcurrency = 10
)
type testCase struct {
name string
config etcdProcessClusterConfig
maxWatchDelay time.Duration
dbSizeBytes int
}
const (
Kilo = 1000
Mega = 1000 * Kilo
)
// 10 MB is not a bottleneck of grpc server, but filling up etcd with data.
// Keeping it lower so tests don't take too long.
// If we implement reuse of db we could increase the dbSize.
var tcs = []testCase{
{
name: "NoTLS",
config: etcdProcessClusterConfig{clusterSize: 1},
maxWatchDelay: 100 * time.Millisecond,
dbSizeBytes: 5 * Mega,
},
{
name: "ClientTLS",
name: "TLS",
config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS},
maxWatchDelay: 2 * time.Second,
dbSizeBytes: 500 * Kilo,
},
{
name: "SeparateHttpNoTLS",
config: etcdProcessClusterConfig{clusterSize: 1, clientHttpSeparate: true},
maxWatchDelay: 100 * time.Millisecond,
dbSizeBytes: 5 * Mega,
},
{
name: "SeparateHttpTLS",
config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS, clientHttpSeparate: true},
maxWatchDelay: 100 * time.Millisecond,
dbSizeBytes: 5 * Mega,
},
}
@@ -69,13 +88,13 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) {
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
g := errgroup.Group{}
continuouslyExecuteGetAll(ctx, t, &g, c)
validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()))
validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()), tc.maxWatchDelay)
require.NoError(t, g.Wait())
})
}
@@ -89,7 +108,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) {
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
@@ -107,7 +126,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) {
time.Sleep(watchResponsePeriod)
}
})
validateWatchDelay(t, c.Watch(ctx, "fake-key"))
validateWatchDelay(t, c.Watch(ctx, "fake-key"), tc.maxWatchDelay)
require.NoError(t, g.Wait())
})
}
@@ -121,7 +140,7 @@ func TestWatchDelayForEvent(t *testing.T) {
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
@@ -140,13 +159,13 @@ func TestWatchDelayForEvent(t *testing.T) {
}
})
continuouslyExecuteGetAll(ctx, t, &g, c)
validateWatchDelay(t, c.Watch(ctx, "key"))
validateWatchDelay(t, c.Watch(ctx, "key"), tc.maxWatchDelay)
require.NoError(t, g.Wait())
})
}
}
func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) {
func validateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) {
start := time.Now()
var maxDelay time.Duration
for range watch {
@@ -177,15 +196,19 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr
for i := 0; i < readLoadConcurrency; i++ {
g.Go(func() error {
for {
_, err := c.Get(ctx, "", clientv3.WithPrefix())
resp, err := c.Get(ctx, "", clientv3.WithPrefix())
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
return nil
}
return err
}
respSize := 0
for _, kv := range resp.Kvs {
respSize += kv.Size()
}
mux.Lock()
size += numberOfPreexistingKeys * sizeOfPreexistingValues
size += respSize
mux.Unlock()
}
})