Merge pull request #15620 from serathius/separate-grpc-server-3.4

[3.4] Separate grpc server
This commit is contained in:
Marek Siarkowicz 2023-04-04 09:48:45 +02:00 committed by GitHub
commit a1a37492f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 582 additions and 363 deletions

View File

@ -69,8 +69,8 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) {
cfg.Name = "3"
cfg.InitialClusterToken = testClusterTkn
cfg.ClusterState = "existing"
cfg.LCUrls, cfg.ACUrls = newCURLs, newCURLs
cfg.LPUrls, cfg.APUrls = newPURLs, newPURLs
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = newCURLs, newCURLs
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = newPURLs, newPURLs
cfg.InitialCluster = ""
for i := 0; i < clusterN; i++ {
cfg.InitialCluster += fmt.Sprintf(",%d=%s", i, pURLs[i].String())

View File

@ -51,8 +51,8 @@ func TestSnapshotV3RestoreSingle(t *testing.T) {
cfg.Name = "s1"
cfg.InitialClusterToken = testClusterTkn
cfg.ClusterState = "existing"
cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
cfg.LPUrls, cfg.APUrls = pURLs, pURLs
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
@ -87,7 +87,8 @@ func TestSnapshotV3RestoreSingle(t *testing.T) {
}
var cli *clientv3.Client
cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}})
cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}})
if err != nil {
t.Fatal(err)
}
@ -203,8 +204,8 @@ func createSnapshotFile(t *testing.T, kvs []kv) string {
cfg.Debug = false
cfg.Name = "default"
cfg.ClusterState = "new"
cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
cfg.LPUrls, cfg.APUrls = pURLs, pURLs
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
srv, err := embed.StartEtcd(cfg)
@ -221,7 +222,7 @@ func createSnapshotFile(t *testing.T, kvs []kv) string {
t.Fatalf("failed to start embed.Etcd for creating snapshots")
}
ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}
ccfg := clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}}
cli, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
@ -271,8 +272,8 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) (
cfg.Name = fmt.Sprintf("%d", i)
cfg.InitialClusterToken = testClusterTkn
cfg.ClusterState = "existing"
cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]}
cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]}
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]}
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]}
cfg.InitialCluster = ics
cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i))

View File

@ -183,12 +183,12 @@ type Config struct {
// streams that each client can open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`
LPUrls, LCUrls []url.URL
APUrls, ACUrls []url.URL
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerTLSInfo transport.TLSInfo
PeerAutoTLS bool
ListenPeerUrls, ListenClientUrls, ListenClientHttpUrls []url.URL
AdvertisePeerUrls, AdvertiseClientUrls []url.URL
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerTLSInfo transport.TLSInfo
PeerAutoTLS bool
// CipherSuites is a list of supported TLS cipher suites between
// client/server and peers. If empty, Go auto-populates the list.
@ -373,10 +373,11 @@ type configYAML struct {
// configJSON has file options that are translated into Config options
type configJSON struct {
LPUrlsJSON string `json:"listen-peer-urls"`
LCUrlsJSON string `json:"listen-client-urls"`
APUrlsJSON string `json:"initial-advertise-peer-urls"`
ACUrlsJSON string `json:"advertise-client-urls"`
ListenPeerUrls string `json:"listen-peer-urls"`
ListenClientUrls string `json:"listen-client-urls"`
ListenClientHttpUrls string `json:"listen-client-http-urls"`
AdvertisePeerUrls string `json:"initial-advertise-peer-urls"`
AdvertiseClientUrls string `json:"advertise-client-urls"`
CORSJSON string `json:"cors"`
HostWhitelistJSON string `json:"host-whitelist"`
@ -421,10 +422,10 @@ func NewConfig() *Config {
ElectionMs: 1000,
InitialElectionTickAdvance: true,
LPUrls: []url.URL{*lpurl},
LCUrls: []url.URL{*lcurl},
APUrls: []url.URL{*apurl},
ACUrls: []url.URL{*acurl},
ListenPeerUrls: []url.URL{*lpurl},
ListenClientUrls: []url.URL{*lcurl},
AdvertisePeerUrls: []url.URL{*apurl},
AdvertiseClientUrls: []url.URL{*acurl},
ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
@ -489,40 +490,49 @@ func (cfg *configYAML) configFromFile(path string) error {
return err
}
if cfg.LPUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ","))
if cfg.configJSON.ListenPeerUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenPeerUrls, ","))
if err != nil {
fmt.Fprintf(os.Stderr, "unexpected error setting up listen-peer-urls: %v\n", err)
os.Exit(1)
}
cfg.LPUrls = []url.URL(u)
cfg.Config.ListenPeerUrls = u
}
if cfg.LCUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ","))
if cfg.configJSON.ListenClientUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientUrls, ","))
if err != nil {
fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-urls: %v\n", err)
os.Exit(1)
}
cfg.LCUrls = []url.URL(u)
cfg.Config.ListenClientUrls = u
}
if cfg.APUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ","))
if cfg.configJSON.ListenClientHttpUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientHttpUrls, ","))
if err != nil {
fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-http-urls: %v\n", err)
os.Exit(1)
}
cfg.Config.ListenClientHttpUrls = u
}
if cfg.configJSON.AdvertisePeerUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ","))
if err != nil {
fmt.Fprintf(os.Stderr, "unexpected error setting up initial-advertise-peer-urls: %v\n", err)
os.Exit(1)
}
cfg.APUrls = []url.URL(u)
cfg.Config.AdvertisePeerUrls = u
}
if cfg.ACUrlsJSON != "" {
u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ","))
if cfg.configJSON.AdvertiseClientUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertiseClientUrls, ","))
if err != nil {
fmt.Fprintf(os.Stderr, "unexpected error setting up advertise-peer-urls: %v\n", err)
os.Exit(1)
}
cfg.ACUrls = []url.URL(u)
cfg.Config.AdvertiseClientUrls = u
}
if cfg.ListenMetricsUrlsJSON != "" {
@ -596,21 +606,31 @@ func (cfg *Config) Validate() error {
if err := cfg.setupLogging(); err != nil {
return err
}
if err := checkBindURLs(cfg.LPUrls); err != nil {
if err := checkBindURLs(cfg.ListenPeerUrls); err != nil {
return err
}
if err := checkBindURLs(cfg.LCUrls); err != nil {
if err := checkBindURLs(cfg.ListenClientUrls); err != nil {
return err
}
if err := checkBindURLs(cfg.ListenClientHttpUrls); err != nil {
return err
}
if len(cfg.ListenClientHttpUrls) == 0 {
if cfg.logger != nil {
cfg.logger.Warn("Running http and grpc server on single port. This is not recommended for production.")
} else {
plog.Warning("Running http and grpc server on single port. This is not recommended for production.")
}
}
if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil {
return err
}
if err := checkHostURLs(cfg.APUrls); err != nil {
addrs := cfg.getAPURLs()
if err := checkHostURLs(cfg.AdvertisePeerUrls); err != nil {
addrs := cfg.getAdvertisePeerUrls()
return fmt.Errorf(`--initial-advertise-peer-urls %q must be "host:port" (%v)`, strings.Join(addrs, ","), err)
}
if err := checkHostURLs(cfg.ACUrls); err != nil {
addrs := cfg.getACURLs()
if err := checkHostURLs(cfg.AdvertiseClientUrls); err != nil {
addrs := cfg.getAdvertiseClientUrls()
return fmt.Errorf(`--advertise-client-urls %q must be "host:port" (%v)`, strings.Join(addrs, ","), err)
}
// Check if conflicting flags are passed.
@ -643,7 +663,7 @@ func (cfg *Config) Validate() error {
}
// check this last since proxying in etcdmain may make this OK
if cfg.LCUrls != nil && cfg.ACUrls == nil {
if cfg.ListenClientUrls != nil && cfg.AdvertiseClientUrls == nil {
return ErrUnsetAdvertiseClientURLsFlag
}
@ -692,7 +712,7 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok
urlsmap = types.URLsMap{}
// If using discovery, generate a temporary cluster based on
// self's advertised peer URLs
urlsmap[cfg.Name] = cfg.APUrls
urlsmap[cfg.Name] = cfg.AdvertisePeerUrls
token = cfg.Durl
case cfg.DNSCluster != "":
@ -748,7 +768,7 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) {
// Use both etcd-server-ssl and etcd-server for discovery.
// Combine the results if both are available.
clusterStrs, cerr = srv.GetCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls)
clusterStrs, cerr = srv.GetCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.AdvertisePeerUrls)
if cerr != nil {
clusterStrs = make([]string, 0)
}
@ -759,13 +779,13 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) {
zap.String("service-name", "etcd-server-ssl"+serviceNameSuffix),
zap.String("server-name", cfg.Name),
zap.String("discovery-srv", cfg.DNSCluster),
zap.Strings("advertise-peer-urls", cfg.getAPURLs()),
zap.Strings("advertise-peer-urls", cfg.getAdvertisePeerUrls()),
zap.Strings("found-cluster", clusterStrs),
zap.Error(cerr),
)
}
defaultHTTPClusterStrs, httpCerr := srv.GetCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls)
defaultHTTPClusterStrs, httpCerr := srv.GetCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.AdvertisePeerUrls)
if httpCerr != nil {
clusterStrs = append(clusterStrs, defaultHTTPClusterStrs...)
}
@ -776,7 +796,7 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) {
zap.String("service-name", "etcd-server"+serviceNameSuffix),
zap.String("server-name", cfg.Name),
zap.String("discovery-srv", cfg.DNSCluster),
zap.Strings("advertise-peer-urls", cfg.getAPURLs()),
zap.Strings("advertise-peer-urls", cfg.getAdvertisePeerUrls()),
zap.Strings("found-cluster", clusterStrs),
zap.Error(httpCerr),
)
@ -786,15 +806,15 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) {
}
func (cfg Config) InitialClusterFromName(name string) (ret string) {
if len(cfg.APUrls) == 0 {
if len(cfg.AdvertisePeerUrls) == 0 {
return ""
}
n := name
if name == "" {
n = DefaultName
}
for i := range cfg.APUrls {
ret = ret + "," + n + "=" + cfg.APUrls[i].String()
for i := range cfg.AdvertisePeerUrls {
ret = ret + "," + n + "=" + cfg.AdvertisePeerUrls[i].String()
}
return ret[1:]
}
@ -803,11 +823,11 @@ func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateF
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
func (cfg Config) defaultPeerHost() bool {
return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs
return len(cfg.AdvertisePeerUrls) == 1 && cfg.AdvertisePeerUrls[0].String() == DefaultInitialAdvertisePeerURLs
}
func (cfg Config) defaultClientHost() bool {
return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs
return len(cfg.AdvertiseClientUrls) == 1 && cfg.AdvertiseClientUrls[0].String() == DefaultAdvertiseClientURLs
}
func (cfg *Config) ClientSelfCert() (err error) {
@ -822,9 +842,12 @@ func (cfg *Config) ClientSelfCert() (err error) {
}
return nil
}
chosts := make([]string, len(cfg.LCUrls))
for i, u := range cfg.LCUrls {
chosts[i] = u.Host
chosts := make([]string, 0, len(cfg.ListenClientUrls)+len(cfg.ListenClientHttpUrls))
for _, u := range cfg.ListenClientUrls {
chosts = append(chosts, u.Host)
}
for _, u := range cfg.ListenClientHttpUrls {
chosts = append(chosts, u.Host)
}
cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts)
if err != nil {
@ -845,8 +868,8 @@ func (cfg *Config) PeerSelfCert() (err error) {
}
return nil
}
phosts := make([]string, len(cfg.LPUrls))
for i, u := range cfg.LPUrls {
phosts := make([]string, len(cfg.ListenPeerUrls))
for i, u := range cfg.ListenPeerUrls {
phosts[i] = u.Host
}
cfg.PeerTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "peer"), phosts)
@ -874,9 +897,9 @@ func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (s
}
used := false
pip, pport := cfg.LPUrls[0].Hostname(), cfg.LPUrls[0].Port()
pip, pport := cfg.ListenPeerUrls[0].Hostname(), cfg.ListenPeerUrls[0].Port()
if cfg.defaultPeerHost() && pip == "0.0.0.0" {
cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)}
cfg.AdvertisePeerUrls[0] = url.URL{Scheme: cfg.AdvertisePeerUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)}
used = true
}
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
@ -884,9 +907,9 @@ func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (s
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
}
cip, cport := cfg.LCUrls[0].Hostname(), cfg.LCUrls[0].Port()
cip, cport := cfg.ListenClientUrls[0].Hostname(), cfg.ListenClientUrls[0].Port()
if cfg.defaultClientHost() && cip == "0.0.0.0" {
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)}
cfg.AdvertiseClientUrls[0] = url.URL{Scheme: cfg.AdvertiseClientUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)}
used = true
}
dhost := defaultHostname
@ -931,34 +954,42 @@ func checkHostURLs(urls []url.URL) error {
return nil
}
func (cfg *Config) getAPURLs() (ss []string) {
ss = make([]string, len(cfg.APUrls))
for i := range cfg.APUrls {
ss[i] = cfg.APUrls[i].String()
func (cfg *Config) getAdvertisePeerUrls() (ss []string) {
ss = make([]string, len(cfg.AdvertisePeerUrls))
for i := range cfg.AdvertisePeerUrls {
ss[i] = cfg.AdvertisePeerUrls[i].String()
}
return ss
}
func (cfg *Config) getLPURLs() (ss []string) {
ss = make([]string, len(cfg.LPUrls))
for i := range cfg.LPUrls {
ss[i] = cfg.LPUrls[i].String()
func (cfg *Config) getListenPeerUrls() (ss []string) {
ss = make([]string, len(cfg.ListenPeerUrls))
for i := range cfg.ListenPeerUrls {
ss[i] = cfg.ListenPeerUrls[i].String()
}
return ss
}
func (cfg *Config) getACURLs() (ss []string) {
ss = make([]string, len(cfg.ACUrls))
for i := range cfg.ACUrls {
ss[i] = cfg.ACUrls[i].String()
func (cfg *Config) getAdvertiseClientUrls() (ss []string) {
ss = make([]string, len(cfg.AdvertiseClientUrls))
for i := range cfg.AdvertiseClientUrls {
ss[i] = cfg.AdvertiseClientUrls[i].String()
}
return ss
}
func (cfg *Config) getLCURLs() (ss []string) {
ss = make([]string, len(cfg.LCUrls))
for i := range cfg.LCUrls {
ss[i] = cfg.LCUrls[i].String()
func (cfg *Config) getListenClientUrls() (ss []string) {
ss = make([]string, len(cfg.ListenClientUrls))
for i := range cfg.ListenClientUrls {
ss[i] = cfg.ListenClientUrls[i].String()
}
return ss
}
func (cfg *Config) getListenClientHttpUrls() (ss []string) {
ss = make([]string, len(cfg.ListenClientHttpUrls))
for i := range cfg.ListenClientHttpUrls {
ss[i] = cfg.ListenClientHttpUrls[i].String()
}
return ss
}

View File

@ -77,12 +77,12 @@ func TestConfigFileOtherFields(t *testing.T) {
func TestUpdateDefaultClusterFromName(t *testing.T) {
cfg := NewConfig()
defaultInitialCluster := cfg.InitialCluster
oldscheme := cfg.APUrls[0].Scheme
origpeer := cfg.APUrls[0].String()
origadvc := cfg.ACUrls[0].String()
oldscheme := cfg.AdvertisePeerUrls[0].Scheme
origpeer := cfg.AdvertisePeerUrls[0].String()
origadvc := cfg.AdvertiseClientUrls[0].String()
cfg.Name = "abc"
lpport := cfg.LPUrls[0].Port()
lpport := cfg.ListenPeerUrls[0].Port()
// in case of 'etcd --name=abc'
exp := fmt.Sprintf("%s=%s://localhost:%s", cfg.Name, oldscheme, lpport)
@ -91,12 +91,12 @@ func TestUpdateDefaultClusterFromName(t *testing.T) {
t.Fatalf("initial-cluster expected %q, got %q", exp, cfg.InitialCluster)
}
// advertise peer URL should not be affected
if origpeer != cfg.APUrls[0].String() {
t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.APUrls[0].String())
if origpeer != cfg.AdvertisePeerUrls[0].String() {
t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.AdvertisePeerUrls[0].String())
}
// advertise client URL should not be affected
if origadvc != cfg.ACUrls[0].String() {
t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
if origadvc != cfg.AdvertiseClientUrls[0].String() {
t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.AdvertiseClientUrls[0].String())
}
}
@ -109,17 +109,17 @@ func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) {
cfg := NewConfig()
defaultInitialCluster := cfg.InitialCluster
oldscheme := cfg.APUrls[0].Scheme
origadvc := cfg.ACUrls[0].String()
oldscheme := cfg.AdvertisePeerUrls[0].Scheme
origadvc := cfg.AdvertiseClientUrls[0].String()
cfg.Name = "abc"
lpport := cfg.LPUrls[0].Port()
cfg.LPUrls[0] = url.URL{Scheme: cfg.LPUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)}
lpport := cfg.ListenPeerUrls[0].Port()
cfg.ListenPeerUrls[0] = url.URL{Scheme: cfg.ListenPeerUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)}
dhost, _ := cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
if dhost != defaultHostname {
t.Fatalf("expected default host %q, got %q", defaultHostname, dhost)
}
aphost, apport := cfg.APUrls[0].Hostname(), cfg.APUrls[0].Port()
aphost, apport := cfg.AdvertisePeerUrls[0].Hostname(), cfg.AdvertisePeerUrls[0].Port()
if apport != lpport {
t.Fatalf("advertise peer url got different port %s, expected %s", apport, lpport)
}
@ -132,8 +132,8 @@ func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) {
}
// advertise client URL should not be affected
if origadvc != cfg.ACUrls[0].String() {
t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
if origadvc != cfg.AdvertiseClientUrls[0].String() {
t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.AdvertiseClientUrls[0].String())
}
}

View File

@ -20,6 +20,7 @@ import (
"fmt"
"io/ioutil"
defaultLog "log"
"math"
"net"
"net/http"
"net/url"
@ -29,6 +30,7 @@ import (
"sync"
"time"
"go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/etcdserver/api/etcdhttp"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
@ -116,7 +118,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if e.cfg.logger != nil {
e.cfg.logger.Info(
"configuring peer listeners",
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()),
)
}
if e.Peers, err = configurePeerListeners(cfg); err != nil {
@ -126,7 +128,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if e.cfg.logger != nil {
e.cfg.logger.Info(
"configuring client listeners",
zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()),
)
}
if e.sctxs, err = configureClientListeners(cfg); err != nil {
@ -163,8 +165,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
srvcfg := etcdserver.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.ACUrls,
PeerURLs: cfg.APUrls,
ClientURLs: cfg.AdvertiseClientUrls,
PeerURLs: cfg.AdvertisePeerUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
@ -247,10 +249,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
e.cfg.logger.Info(
"now serving peer/client/metrics",
zap.String("local-member-id", e.Server.ID().String()),
zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()),
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerUrls()),
zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()),
zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()),
zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()),
zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
)
}
@ -325,10 +327,10 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali
zap.Uint("max-wals", sc.MaxWALFiles),
zap.Uint("max-snapshots", sc.MaxSnapFiles),
zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
zap.Strings("initial-advertise-peer-urls", ec.getAPURLs()),
zap.Strings("listen-peer-urls", ec.getLPURLs()),
zap.Strings("advertise-client-urls", ec.getACURLs()),
zap.Strings("listen-client-urls", ec.getLCURLs()),
zap.Strings("initial-advertise-peer-urls", ec.getAdvertisePeerUrls()),
zap.Strings("listen-peer-urls", ec.getListenPeerUrls()),
zap.Strings("advertise-client-urls", ec.getAdvertiseClientUrls()),
zap.Strings("listen-client-urls", ec.getListenClientUrls()),
zap.Strings("listen-metrics-urls", ec.getMetricsURLs()),
zap.Strings("cors", cors),
zap.Strings("host-whitelist", hss),
@ -363,8 +365,8 @@ func (e *Etcd) Close() {
fields := []zap.Field{
zap.String("name", e.cfg.Name),
zap.String("data-dir", e.cfg.Dir),
zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()),
zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
zap.Strings("advertise-peer-urls", e.cfg.getAdvertisePeerUrls()),
zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()),
}
lg := e.GetLogger()
if lg != nil {
@ -432,7 +434,7 @@ func stopServers(ctx context.Context, ss *servers) {
// do not grpc.Server.GracefulStop with TLS enabled etcd server
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// and https://github.com/etcd-io/etcd/issues/8916
if ss.secure {
if ss.secure && ss.http != nil {
shutdownNow()
return
}
@ -486,7 +488,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
}
}
peers = make([]*peerListener, len(cfg.LPUrls))
peers = make([]*peerListener, len(cfg.ListenPeerUrls))
defer func() {
if err == nil {
return
@ -496,11 +498,11 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
if cfg.logger != nil {
cfg.logger.Warn(
"closing peer listener",
zap.String("address", cfg.LPUrls[i].String()),
zap.String("address", cfg.ListenPeerUrls[i].String()),
zap.Error(err),
)
} else {
plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
plog.Info("stopping listening for peers on ", cfg.ListenPeerUrls[i].String())
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
peers[i].close(ctx)
@ -509,7 +511,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
}
}()
for i, u := range cfg.LPUrls {
for i, u := range cfg.ListenPeerUrls {
if u.Scheme == "http" {
if !cfg.PeerTLSInfo.Empty() {
if cfg.logger != nil {
@ -623,8 +625,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
}
sctxs = make(map[string]*serveCtx)
for _, u := range cfg.LCUrls {
sctx := newServeCtx(cfg.logger)
for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) {
if u.Scheme == "http" || u.Scheme == "unix" {
if !cfg.ClientTLSInfo.Empty() {
if cfg.logger != nil {
@ -644,29 +645,45 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String())
}
}
network := "tcp"
addr := u.Host
if u.Scheme == "unix" || u.Scheme == "unixs" {
network = "unix"
addr = u.Host + u.Path
for _, u := range cfg.ListenClientUrls {
addr, secure, network := resolveUrl(u)
sctx := sctxs[addr]
if sctx == nil {
sctx = newServeCtx(cfg.logger)
sctxs[addr] = sctx
}
sctx.secure = sctx.secure || secure
sctx.insecure = sctx.insecure || !secure
sctx.scheme = u.Scheme
sctx.addr = addr
sctx.network = network
}
for _, u := range cfg.ListenClientHttpUrls {
addr, secure, network := resolveUrl(u)
sctx.secure = u.Scheme == "https" || u.Scheme == "unixs"
sctx.insecure = !sctx.secure
if oldctx := sctxs[addr]; oldctx != nil {
oldctx.secure = oldctx.secure || sctx.secure
oldctx.insecure = oldctx.insecure || sctx.insecure
continue
sctx := sctxs[addr]
if sctx == nil {
sctx = newServeCtx(cfg.logger)
sctxs[addr] = sctx
} else if !sctx.httpOnly {
return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String())
}
sctx.secure = sctx.secure || secure
sctx.insecure = sctx.insecure || !secure
sctx.scheme = u.Scheme
sctx.addr = addr
sctx.network = network
sctx.httpOnly = true
}
if sctx.l, err = net.Listen(network, addr); err != nil {
for _, sctx := range sctxs {
if sctx.l, err = net.Listen(sctx.network, sctx.addr); err != nil {
return nil, err
}
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
// hosts that disable ipv6. So, use the address given by the user.
sctx.addr = addr
if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
if fdLimit <= reservedInternalFDNum {
@ -683,27 +700,27 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
}
if network == "tcp" {
if sctx.l, err = transport.NewKeepAliveListener(sctx.l, network, nil); err != nil {
if sctx.network == "tcp" {
if sctx.l, err = transport.NewKeepAliveListener(sctx.l, sctx.network, nil); err != nil {
return nil, err
}
}
defer func() {
if err == nil {
defer func(sctx *serveCtx) {
if err == nil || sctx.l == nil {
return
}
sctx.l.Close()
if cfg.logger != nil {
cfg.logger.Warn(
"closing peer listener",
zap.String("address", u.Host),
zap.String("address", sctx.addr),
zap.Error(err),
)
} else {
plog.Info("stopping listening for client requests on ", u.Host)
plog.Info("stopping listening for client requests on ", sctx.addr)
}
}()
}(sctx)
for k := range cfg.UserHandlers {
sctx.userHandlers[k] = cfg.UserHandlers[k]
}
@ -714,11 +731,21 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
if cfg.Debug {
sctx.registerTrace()
}
sctxs[addr] = sctx
}
return sctxs, nil
}
func resolveUrl(u url.URL) (addr string, secure bool, network string) {
addr = u.Host
network = "tcp"
if u.Scheme == "unix" || u.Scheme == "unixs" {
addr = u.Host + u.Path
network = "unix"
}
secure = u.Scheme == "https" || u.Scheme == "unixs"
return addr, secure, network
}
func (e *Etcd) serveClients() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
if e.cfg.logger != nil {
@ -762,15 +789,69 @@ func (e *Etcd) serveClients() (err error) {
}))
}
splitHttp := false
for _, sctx := range e.sctxs {
if sctx.httpOnly {
splitHttp = true
}
}
// start client servers in each goroutine
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...))
}(sctx)
}
return nil
}
func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
if !e.cfg.EnableGRPCGateway {
return nil
}
sctx := e.pickGrpcGatewayServeContext(splitHttp)
addr := sctx.addr
if network := sctx.network; network == "unix" {
// explicitly define unix network for gRPC socket support
addr = fmt.Sprintf("%s://%s", network, addr)
}
opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))}
if sctx.secure {
tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig()
if tlsErr != nil {
return func(ctx context.Context) (*grpc.ClientConn, error) {
return nil, tlsErr
}
}
dtls := tlscfg.Clone()
// trust local server
dtls.InsecureSkipVerify = true
bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
opts = append(opts, grpc.WithTransportCredentials(bundle.TransportCredentials()))
} else {
opts = append(opts, grpc.WithInsecure())
}
return func(ctx context.Context) (*grpc.ClientConn, error) {
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err))
return nil, err
}
return conn, err
}
}
func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx {
for _, sctx := range e.sctxs {
if !splitHttp || !sctx.httpOnly {
return sctx
}
}
panic("Expect at least one context able to serve grpc")
}
func (e *Etcd) serveMetrics() (err error) {
if e.cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()

View File

@ -19,12 +19,10 @@ import (
"fmt"
"io/ioutil"
defaultLog "log"
"math"
"net"
"net/http"
"strings"
"go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/etcdserver/api/v3client"
"go.etcd.io/etcd/etcdserver/api/v3election"
@ -49,12 +47,15 @@ import (
)
type serveCtx struct {
lg *zap.Logger
l net.Listener
lg *zap.Logger
l net.Listener
scheme string
addr string
network string
secure bool
insecure bool
httpOnly bool
ctx context.Context
cancel context.CancelFunc
@ -89,6 +90,8 @@ func (sctx *serveCtx) serve(
tlsinfo *transport.TLSInfo,
handler http.Handler,
errHandler func(error),
grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error),
splitHttp bool,
gopts ...grpc.ServerOption) (err error) {
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
<-s.ReadyNotify()
@ -98,71 +101,103 @@ func (sctx *serveCtx) serve(
}
m := cmux.New(sctx.l)
var server func() error
onlyGRPC := splitHttp && !sctx.httpOnly
onlyHttp := splitHttp && sctx.httpOnly
grpcEnabled := !onlyHttp
httpEnabled := !onlyGRPC
v3c := v3client.New(s)
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)
if sctx.insecure {
gs := v3rpc.Server(s, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
// Make sure serversC is closed even if we prematurely exit the function.
defer close(sctx.serversC)
var gwmux *gw.ServeMux
if s.Cfg.EnableGRPCGateway {
// GRPC gateway connects to grpc server via connection provided by grpc dial.
gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends)
if err != nil {
sctx.lg.Error("registerGateway failed", zap.Error(err))
return err
}
}
var traffic string
switch {
case onlyGRPC:
traffic = "grpc"
case onlyHttp:
traffic = "http"
default:
traffic = "grpc+http"
}
defer func(gs *grpc.Server) {
if err == nil {
return
if sctx.insecure {
var gs *grpc.Server
var srv *http.Server
if httpEnabled {
httpmux := sctx.createMux(gwmux, handler)
srv = &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if sctx.lg != nil {
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
} else {
plog.Warningf("stopping insecure grpc server due to error: %s", err)
}
gs.Stop()
if sctx.lg != nil {
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
} else {
plog.Warningf("stopped insecure grpc server due to error: %s", err)
}
}(gs)
grpcl := m.Match(cmux.HTTP2())
go func(gs *grpc.Server, grpcLis net.Listener) {
errHandler(gs.Serve(grpcLis))
}(gs, grpcl)
var gwmux *gw.ServeMux
if s.Cfg.EnableGRPCGateway {
gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithInsecure()})
if err != nil {
if err := configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
}
}
if grpcEnabled {
gs = v3rpc.Server(s, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
defer func(gs *grpc.Server) {
if err == nil {
return
}
httpmux := sctx.createMux(gwmux, handler)
if sctx.lg != nil {
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
} else {
plog.Warningf("stopping insecure grpc server due to error: %s", err)
}
srvhttp := &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
gs.Stop()
if sctx.lg != nil {
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
} else {
plog.Warningf("stopped insecure grpc server due to error: %s", err)
}
}(gs)
}
if err = configureHttpServer(srvhttp, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
if onlyGRPC {
server = func() error {
return gs.Serve(sctx.l)
}
} else {
server = m.Serve
httpl := m.Match(cmux.HTTP1())
go func(srvhttp *http.Server, tlsLis net.Listener) {
errHandler(srvhttp.Serve(tlsLis))
}(srv, httpl)
if grpcEnabled {
grpcl := m.Match(cmux.HTTP2())
go func(gs *grpc.Server, l net.Listener) {
errHandler(gs.Serve(l))
}(gs, grpcl)
}
}
httpl := m.Match(cmux.HTTP1())
go func(srvhttp *http.Server, httpLis net.Listener) {
errHandler(srvhttp.Serve(httpLis))
}(srvhttp, httpl)
sctx.serversC <- &servers{grpc: gs, http: srvhttp}
sctx.serversC <- &servers{grpc: gs, http: srv}
if sctx.lg != nil {
sctx.lg.Info(
"serving client traffic insecurely; this is strongly discouraged!",
zap.String("traffic", traffic),
zap.String("address", sctx.l.Addr().String()),
)
} else {
@ -171,78 +206,77 @@ func (sctx *serveCtx) serve(
}
if sctx.secure {
var gs *grpc.Server
var srv *http.Server
tlscfg, tlsErr := tlsinfo.ServerConfig()
if tlsErr != nil {
return tlsErr
}
gs := v3rpc.Server(s, tlscfg, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
if grpcEnabled {
gs = v3rpc.Server(s, tlscfg, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
defer func(gs *grpc.Server) {
if err == nil {
return
}
if sctx.lg != nil {
sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
} else {
plog.Warningf("stopping secure grpc server due to error: %s", err)
}
gs.Stop()
if sctx.lg != nil {
sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
} else {
plog.Warningf("stopped secure grpc server due to error: %s", err)
}
}(gs)
}
defer func(gs *grpc.Server) {
if err == nil {
return
if httpEnabled {
if grpcEnabled {
handler = grpcHandlerFunc(gs, handler)
}
httpmux := sctx.createMux(gwmux, handler)
if sctx.lg != nil {
sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
} else {
plog.Warningf("stopping secure grpc server due to error: %s", err)
srv = &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
gs.Stop()
if sctx.lg != nil {
sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
} else {
plog.Warningf("stopped secure grpc server due to error: %s", err)
}
}(gs)
handler = grpcHandlerFunc(gs, handler)
var gwmux *gw.ServeMux
if s.Cfg.EnableGRPCGateway {
dtls := tlscfg.Clone()
// trust local server
dtls.InsecureSkipVerify = true
bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
opts := []grpc.DialOption{grpc.WithTransportCredentials(bundle.TransportCredentials())}
gwmux, err = sctx.registerGateway(opts)
if err != nil {
if err := configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
}
var tlsl net.Listener
tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
}
// TODO: add debug flag; enable logging when debug flag is set
httpmux := sctx.createMux(gwmux, handler)
if onlyGRPC {
server = func() error { return gs.Serve(sctx.l) }
} else {
server = m.Serve
srv := &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
}
go func(srvhttp *http.Server, tlsl net.Listener) {
errHandler(srvhttp.Serve(tlsl))
}(srv, tlsl)
}
if err = configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
go func(srvhttp *http.Server, tlsLis net.Listener) {
errHandler(srvhttp.Serve(tlsLis))
}(srv, tlsl)
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
if sctx.lg != nil {
sctx.lg.Info(
"serving client traffic securely",
zap.String("traffic", traffic),
zap.String("address", sctx.l.Addr().String()),
)
} else {
@ -250,8 +284,7 @@ func (sctx *serveCtx) serve(
}
}
close(sctx.serversC)
return m.Serve()
return server()
}
func configureHttpServer(srv *http.Server, cfg etcdserver.ServerConfig) error {
@ -282,20 +315,10 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha
type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error
func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.ClientConn, error)) (*gw.ServeMux, error) {
ctx := sctx.ctx
addr := sctx.addr
if network := sctx.network; network == "unix" {
// explicitly define unix network for gRPC socket support
addr = fmt.Sprintf("%s://%s", network, addr)
}
opts = append(opts, grpc.WithDefaultCallOptions([]grpc.CallOption{
grpc.MaxCallRecvMsgSize(math.MaxInt32),
}...))
conn, err := grpc.DialContext(ctx, addr, opts...)
conn, err := dial(ctx)
if err != nil {
return nil, err
}

View File

@ -141,7 +141,11 @@ func newConfig() *config {
)
fs.Var(
flags.NewUniqueURLsWithExceptions(embed.DefaultListenClientURLs, ""), "listen-client-urls",
"List of URLs to listen on for client traffic.",
"List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.",
)
fs.Var(
flags.NewUniqueURLsWithExceptions("", ""), "listen-client-http-urls",
"List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.",
)
fs.Var(
flags.NewUniqueURLsWithExceptions("", ""),
@ -332,10 +336,11 @@ func (cfg *config) configFromCmdLine() error {
return err
}
cfg.ec.LPUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls")
cfg.ec.APUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls")
cfg.ec.LCUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls")
cfg.ec.ACUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls")
cfg.ec.ListenPeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls")
cfg.ec.AdvertisePeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls")
cfg.ec.ListenClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls")
cfg.ec.ListenClientHttpUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-http-urls")
cfg.ec.AdvertiseClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls")
cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls")
cfg.ec.CORS = flags.UniqueURLsMapFromFlag(cfg.cf.flagSet, "cors")
@ -356,7 +361,7 @@ func (cfg *config) configFromCmdLine() error {
// disable default advertise-client-urls if lcurls is set
missingAC := flags.IsSet(cfg.cf.flagSet, "listen-client-urls") && !flags.IsSet(cfg.cf.flagSet, "advertise-client-urls")
if !cfg.mayBeProxy() && missingAC {
cfg.ec.ACUrls = nil
cfg.ec.AdvertiseClientUrls = nil
}
// disable default initial-cluster if discovery is set

View File

@ -36,6 +36,7 @@ func TestConfigParsingMemberFlags(t *testing.T) {
"-snapshot-count=10",
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
"-listen-client-http-urls=http://localhost:7002,https://localhost:7003",
// it should be set if -listen-client-urls is set
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
}
@ -51,14 +52,15 @@ func TestConfigParsingMemberFlags(t *testing.T) {
func TestConfigFileMemberFields(t *testing.T) {
yc := struct {
Dir string `json:"data-dir"`
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapshotCount uint64 `json:"snapshot-count"`
LPUrls string `json:"listen-peer-urls"`
LCUrls string `json:"listen-client-urls"`
AcurlsCfgFile string `json:"advertise-client-urls"`
Dir string `json:"data-dir"`
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapshotCount uint64 `json:"snapshot-count"`
ListenPeerUrls string `json:"listen-peer-urls"`
ListenClientUrls string `json:"listen-client-urls"`
ListenClientHttpUrls string `json:"listen-client-http-urls"`
AdvertiseClientUrls string `json:"advertise-client-urls"`
}{
"testdir",
10,
@ -67,6 +69,7 @@ func TestConfigFileMemberFields(t *testing.T) {
10,
"http://localhost:8000,https://localhost:8001",
"http://localhost:7000,https://localhost:7001",
"http://localhost:7002,https://localhost:7003",
"http://localhost:7000,https://localhost:7001",
}
@ -513,13 +516,14 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File {
func validateMemberFlags(t *testing.T, cfg *config) {
wcfg := &embed.Config{
Dir: "testdir",
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
SnapshotCount: 10,
Dir: "testdir",
ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
ListenClientHttpUrls: []url.URL{{Scheme: "http", Host: "localhost:7002"}, {Scheme: "https", Host: "localhost:7003"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
SnapshotCount: 10,
}
if cfg.ec.Dir != wcfg.Dir {
@ -537,18 +541,21 @@ func validateMemberFlags(t *testing.T, cfg *config) {
if cfg.ec.SnapshotCount != wcfg.SnapshotCount {
t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount)
}
if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) {
t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls)
if !reflect.DeepEqual(cfg.ec.ListenPeerUrls, wcfg.ListenPeerUrls) {
t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.ListenPeerUrls, wcfg.ListenPeerUrls)
}
if !reflect.DeepEqual(cfg.ec.LCUrls, wcfg.LCUrls) {
t.Errorf("listen-client-urls = %v, want %v", cfg.ec.LCUrls, wcfg.LCUrls)
if !reflect.DeepEqual(cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) {
t.Errorf("listen-client-urls = %v, want %v", cfg.ec.ListenClientUrls, wcfg.ListenClientUrls)
}
if !reflect.DeepEqual(cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) {
t.Errorf("listen-client-http-urls = %v, want %v", cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls)
}
}
func validateClusteringFlags(t *testing.T, cfg *config) {
wcfg := newConfig()
wcfg.ec.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}
wcfg.ec.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}
wcfg.ec.AdvertisePeerUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}
wcfg.ec.AdvertiseClientUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}
wcfg.ec.ClusterState = embed.ClusterStateFlagExisting
wcfg.cf.fallback.Set(fallbackFlagExit)
wcfg.ec.InitialCluster = "0=http://localhost:8000"
@ -566,11 +573,11 @@ func validateClusteringFlags(t *testing.T, cfg *config) {
if cfg.ec.InitialClusterToken != wcfg.ec.InitialClusterToken {
t.Errorf("initialClusterToken = %v, want %v", cfg.ec.InitialClusterToken, wcfg.ec.InitialClusterToken)
}
if !reflect.DeepEqual(cfg.ec.APUrls, wcfg.ec.APUrls) {
t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.ec.APUrls, wcfg.ec.APUrls)
if !reflect.DeepEqual(cfg.ec.AdvertisePeerUrls, wcfg.ec.AdvertisePeerUrls) {
t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.ec.AdvertisePeerUrls, wcfg.ec.AdvertisePeerUrls)
}
if !reflect.DeepEqual(cfg.ec.ACUrls, wcfg.ec.ACUrls) {
t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.ACUrls, wcfg.ec.ACUrls)
if !reflect.DeepEqual(cfg.ec.AdvertiseClientUrls, wcfg.ec.AdvertiseClientUrls) {
t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.AdvertiseClientUrls, wcfg.ec.AdvertiseClientUrls)
}
}

View File

@ -251,7 +251,7 @@ func startEtcdOrProxyV2() {
plog.Infof("forgot to set --initial-cluster flag?")
}
}
if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
if types.URLs(cfg.ec.AdvertisePeerUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
if lg != nil {
lg.Warn("forgot to set --initial-advertise-peer-urls?")
} else {
@ -507,11 +507,11 @@ func startProxy(cfg *config) error {
// setup self signed certs when serving https
cHosts, cTLS := []string{}, false
for _, u := range cfg.ec.LCUrls {
for _, u := range cfg.ec.ListenClientUrls {
cHosts = append(cHosts, u.Host)
cTLS = cTLS || u.Scheme == "https"
}
for _, u := range cfg.ec.ACUrls {
for _, u := range cfg.ec.AdvertiseClientUrls {
cHosts = append(cHosts, u.Host)
cTLS = cTLS || u.Scheme == "https"
}
@ -528,7 +528,7 @@ func startProxy(cfg *config) error {
}
// Start a proxy server goroutine for each listen address
for _, u := range cfg.ec.LCUrls {
for _, u := range cfg.ec.ListenClientUrls {
l, err := transport.NewListener(u.Host, u.Scheme, &listenerTLS)
if err != nil {
return err

View File

@ -62,7 +62,9 @@ Member:
--listen-peer-urls 'http://localhost:2380'
List of URLs to listen on for peer traffic.
--listen-client-urls 'http://localhost:2379'
List of URLs to listen on for client traffic.
List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.
--listen-client-http-urls ''
List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.
--max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `'
Maximum number of snapshot files to retain (0 is unlimited).
--max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `'

View File

@ -64,7 +64,7 @@ func TestEmbedEtcd(t *testing.T) {
tests[0].cfg.Durl = "abc"
setupEmbedCfg(&tests[1].cfg, []url.URL{urls[0]}, []url.URL{urls[1]})
tests[1].cfg.ACUrls = nil
tests[1].cfg.AdvertiseClientUrls = nil
tests[2].cfg.TickMs = tests[2].cfg.ElectionMs - 1
tests[3].cfg.ElectionMs = 999999
setupEmbedCfg(&tests[4].cfg, []url.URL{urls[2]}, []url.URL{urls[3]})
@ -72,8 +72,8 @@ func TestEmbedEtcd(t *testing.T) {
setupEmbedCfg(&tests[6].cfg, []url.URL{urls[7], urls[8]}, []url.URL{urls[9]})
dnsURL, _ := url.Parse("http://whatever.test:12345")
tests[7].cfg.LCUrls = []url.URL{*dnsURL}
tests[8].cfg.LPUrls = []url.URL{*dnsURL}
tests[7].cfg.ListenClientUrls = []url.URL{*dnsURL}
tests[8].cfg.ListenPeerUrls = []url.URL{*dnsURL}
dir := filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
os.RemoveAll(dir)
@ -188,8 +188,8 @@ func setupEmbedCfg(cfg *embed.Config, curls []url.URL, purls []url.URL) {
cfg.Debug = false
cfg.ClusterState = "new"
cfg.LCUrls, cfg.ACUrls = curls, curls
cfg.LPUrls, cfg.APUrls = purls, purls
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls
cfg.InitialCluster = ""
for i := range purls {
cfg.InitialCluster += ",default=" + purls[i].String()

View File

@ -54,8 +54,10 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error
func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsGRPC() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsMetrics() []string {
panic("not implemented; proxy doesn't provide health information")
}

View File

@ -112,6 +112,7 @@ type etcdProcessClusterConfig struct {
clientTLS clientConnType
clientCertAuthEnabled bool
clientHttpSeparate bool
isPeerTLS bool
isPeerAutoTLS bool
isClientAutoTLS bool
@ -190,18 +191,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
initialCluster := make([]string, cfg.clusterSize)
for i := 0; i < cfg.clusterSize; i++ {
var curls []string
var curl, curltls string
var curl string
port := cfg.basePort + 5*i
curlHost := fmt.Sprintf("localhost:%d", port)
clientPort := port
clientHttpPort := port + 4
switch cfg.clientTLS {
case clientNonTLS, clientTLS:
curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
if cfg.clientTLS == clientTLSAndNonTLS {
curl = clientURL(clientPort, clientNonTLS)
curls = []string{curl, clientURL(clientPort, clientTLS)}
} else {
curl = clientURL(clientPort, cfg.clientTLS)
curls = []string{curl}
case clientTLSAndNonTLS:
curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
curls = []string{curl, curltls}
}
purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
@ -226,6 +226,11 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
}
var clientHttpUrl string
if cfg.clientHttpSeparate {
clientHttpUrl = clientURL(clientHttpPort, cfg.clientTLS)
args = append(args, "--listen-client-http-urls", clientHttpUrl)
}
args = addV2Args(args)
if cfg.forceNewCluster {
args = append(args, "--force-new-cluster")
@ -268,16 +273,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
}
etcdCfgs[i] = &etcdServerProcessConfig{
execPath: cfg.execPath,
args: args,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
acurl: curl,
murl: murl,
initialToken: cfg.initialToken,
execPath: cfg.execPath,
args: args,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
acurl: curl,
murl: murl,
initialToken: cfg.initialToken,
clientHttpUrl: clientHttpUrl,
}
}
@ -290,6 +296,18 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
return etcdCfgs
}
func clientURL(port int, connType clientConnType) string {
curlHost := fmt.Sprintf("localhost:%d", port)
switch connType {
case clientNonTLS:
return (&url.URL{Scheme: "http", Host: curlHost}).String()
case clientTLS:
return (&url.URL{Scheme: "https", Host: curlHost}).String()
default:
panic(fmt.Sprintf("Unsupported connection type %v", connType))
}
}
func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
if cfg.clientTLS != clientNonTLS {
if cfg.isClientAutoTLS {

View File

@ -34,8 +34,9 @@ import (
func TestConnectionMultiplexing(t *testing.T) {
defer testutil.AfterTest(t)
for _, tc := range []struct {
name string
serverTLS clientConnType
name string
serverTLS clientConnType
separateHttpPort bool
}{
{
name: "ServerTLS",
@ -49,10 +50,20 @@ func TestConnectionMultiplexing(t *testing.T) {
name: "ServerTLSAndNonTLS",
serverTLS: clientTLSAndNonTLS,
},
{
name: "SeparateHTTP/ServerTLS",
serverTLS: clientTLS,
separateHttpPort: true,
},
{
name: "SeparateHTTP/ServerNonTLS",
serverTLS: clientNonTLS,
separateHttpPort: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true}
cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true, clientHttpSeparate: tc.separateHttpPort}
clus, err := newEtcdProcessCluster(&cfg)
require.NoError(t, err)
defer clus.Close()
@ -73,43 +84,45 @@ func TestConnectionMultiplexing(t *testing.T) {
name = "ClientTLS"
}
t.Run(name, func(t *testing.T) {
testConnectionMultiplexing(ctx, t, clus.EndpointsV3()[0], connType)
testConnectionMultiplexing(ctx, t, clus.procs[0], connType)
})
}
})
}
}
func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint string, connType clientConnType) {
func testConnectionMultiplexing(ctx context.Context, t *testing.T, member etcdProcess, connType clientConnType) {
httpEndpoint := member.EndpointsHTTP()[0]
grpcEndpoint := member.EndpointsGRPC()[0]
switch connType {
case clientTLS:
endpoint = toTLS(endpoint)
httpEndpoint = toTLS(httpEndpoint)
grpcEndpoint = toTLS(grpcEndpoint)
case clientNonTLS:
default:
panic(fmt.Sprintf("Unsupported conn type %v", connType))
}
t.Run("etcdctl", func(t *testing.T) {
t.Run("v2", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{endpoint}, connType, false, true)
etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true)
err := etcdctl.Set("a", "1")
assert.NoError(t, err)
})
t.Run("v3", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{endpoint}, connType, false, false)
etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false)
err := etcdctl.Put("a", "1")
assert.NoError(t, err)
})
})
t.Run("clientv2", func(t *testing.T) {
c, err := newClientV2(t, []string{endpoint}, connType, false)
c, err := newClientV2(t, []string{httpEndpoint}, connType, false)
require.NoError(t, err)
kv := clientv2.NewKeysAPI(c)
_, err = kv.Set(ctx, "a", "1", nil)
assert.NoError(t, err)
})
t.Run("clientv3", func(t *testing.T) {
c := newClient(t, []string{endpoint}, connType, false)
c := newClient(t, []string{grpcEndpoint}, connType, false)
_, err := c.Get(ctx, "a")
assert.NoError(t, err)
})
@ -120,11 +133,11 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint stri
tname = "default"
}
t.Run(tname, func(t *testing.T) {
assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(endpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(endpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType))
assert.NoError(t, fetchGrpcGateway(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(httpEndpoint, httpVersion, connType))
})
}
})

View File

@ -33,6 +33,8 @@ var (
type etcdProcess interface {
EndpointsV2() []string
EndpointsV3() []string
EndpointsGRPC() []string
EndpointsHTTP() []string
EndpointsMetrics() []string
Start() error
@ -61,8 +63,9 @@ type etcdServerProcessConfig struct {
purl url.URL
acurl string
murl string
acurl string
murl string
clientHttpUrl string
initialToken string
initialCluster string
@ -80,8 +83,15 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err
return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
}
func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} }
func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() }
func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsGRPC() }
func (ep *etcdServerProcess) EndpointsGRPC() []string { return []string{ep.cfg.acurl} }
func (ep *etcdServerProcess) EndpointsHTTP() []string {
if ep.cfg.clientHttpUrl == "" {
return []string{ep.cfg.acurl}
}
return []string{ep.cfg.clientHttpUrl}
}
func (ep *etcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.murl} }
func (ep *etcdServerProcess) Start() error {

View File

@ -118,6 +118,7 @@ func (ctl *Etcdctl) cmdArgs(args ...string) []string {
func (ctl *Etcdctl) flags() map[string]string {
fmap := make(map[string]string)
if ctl.v2 {
fmap["no-sync"] = "true"
if ctl.connType == clientTLS {
fmap["ca-file"] = testTLSInfo.TrustedCAFile
fmap["cert-file"] = testTLSInfo.CertFile

View File

@ -109,15 +109,17 @@ func tlsInfo(t testing.TB, connType clientConnType, isAutoTLS bool) (*transport.
}
}
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error {
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error {
g := errgroup.Group{}
concurrency := 10
keyCount := 100
keysPerRoutine := keyCount / concurrency
valueSize := dbSize / keyCount
for i := 0; i < concurrency; i++ {
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize))
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize)))
if err != nil {
return err
}

View File

@ -35,29 +35,48 @@ import (
const (
watchResponsePeriod = 100 * time.Millisecond
watchTestDuration = 5 * time.Second
// TODO: Reduce maxWatchDelay when https://github.com/etcd-io/etcd/issues/15402 is addressed.
maxWatchDelay = 2 * time.Second
// Configure enough read load to cause starvation from https://github.com/etcd-io/etcd/issues/15402.
// Tweaked to pass on GitHub runner. For local runs please increase parameters.
// TODO: Increase when https://github.com/etcd-io/etcd/issues/15402 is fully addressed.
numberOfPreexistingKeys = 100
sizeOfPreexistingValues = 5000
readLoadConcurrency = 10
readLoadConcurrency = 10
)
type testCase struct {
name string
config etcdProcessClusterConfig
name string
config etcdProcessClusterConfig
maxWatchDelay time.Duration
dbSizeBytes int
}
const (
Kilo = 1000
Mega = 1000 * Kilo
)
// 10 MB is not a bottleneck of grpc server, but filling up etcd with data.
// Keeping it lower so tests don't take too long.
// If we implement reuse of db we could increase the dbSize.
var tcs = []testCase{
{
name: "NoTLS",
config: etcdProcessClusterConfig{clusterSize: 1},
name: "NoTLS",
config: etcdProcessClusterConfig{clusterSize: 1},
maxWatchDelay: 100 * time.Millisecond,
dbSizeBytes: 5 * Mega,
},
{
name: "ClientTLS",
config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS},
name: "TLS",
config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS},
maxWatchDelay: 2 * time.Second,
dbSizeBytes: 500 * Kilo,
},
{
name: "SeparateHttpNoTLS",
config: etcdProcessClusterConfig{clusterSize: 1, clientHttpSeparate: true},
maxWatchDelay: 100 * time.Millisecond,
dbSizeBytes: 5 * Mega,
},
{
name: "SeparateHttpTLS",
config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS, clientHttpSeparate: true},
maxWatchDelay: 100 * time.Millisecond,
dbSizeBytes: 5 * Mega,
},
}
@ -71,13 +90,13 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) {
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
g := errgroup.Group{}
continuouslyExecuteGetAll(ctx, t, &g, c)
validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()))
validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()), tc.maxWatchDelay)
require.NoError(t, g.Wait())
})
}
@ -91,7 +110,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) {
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
@ -109,7 +128,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) {
time.Sleep(watchResponsePeriod)
}
})
validateWatchDelay(t, c.Watch(ctx, "fake-key"))
validateWatchDelay(t, c.Watch(ctx, "fake-key"), tc.maxWatchDelay)
require.NoError(t, g.Wait())
})
}
@ -123,7 +142,7 @@ func TestWatchDelayForEvent(t *testing.T) {
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))
require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes))
ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
@ -142,13 +161,13 @@ func TestWatchDelayForEvent(t *testing.T) {
}
})
continuouslyExecuteGetAll(ctx, t, &g, c)
validateWatchDelay(t, c.Watch(ctx, "key"))
validateWatchDelay(t, c.Watch(ctx, "key"), tc.maxWatchDelay)
require.NoError(t, g.Wait())
})
}
}
func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) {
func validateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) {
start := time.Now()
var maxDelay time.Duration
for range watch {
@ -179,15 +198,19 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr
for i := 0; i < readLoadConcurrency; i++ {
g.Go(func() error {
for {
_, err := c.Get(ctx, "", clientv3.WithPrefix())
resp, err := c.Get(ctx, "", clientv3.WithPrefix())
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
return nil
}
return err
}
respSize := 0
for _, kv := range resp.Kvs {
respSize += kv.Size()
}
mux.Lock()
size += numberOfPreexistingKeys * sizeOfPreexistingValues
size += respSize
mux.Unlock()
}
})

View File

@ -52,8 +52,8 @@ func setupEmbedCfg(cfg *embed.Config, curls, purls, ics []url.URL) {
os.RemoveAll(cfg.Dir)
cfg.ClusterState = "new"
cfg.LCUrls, cfg.ACUrls = curls, curls
cfg.LPUrls, cfg.APUrls = purls, purls
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls
cfg.InitialCluster = ""
for i := range ics {