mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13733 from ahrtr/change_discovery_url_to_endpoints
Change v3 discovery url to endpoints so as to support failover
This commit is contained in:
commit
10998ab90b
@ -37,10 +37,9 @@ import (
|
||||
type ServerConfig struct {
|
||||
Name string
|
||||
|
||||
EnableV2Discovery bool
|
||||
DiscoveryURL string
|
||||
DiscoveryProxy string
|
||||
DiscoveryCfg v3discovery.DiscoveryConfig
|
||||
DiscoveryURL string
|
||||
DiscoveryProxy string
|
||||
DiscoveryCfg v3discovery.DiscoveryConfig
|
||||
|
||||
ClientURLs types.URLs
|
||||
PeerURLs types.URLs
|
||||
@ -309,7 +308,9 @@ func (c *ServerConfig) WALDir() string {
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return filepath.Join(c.MemberDir(), "snap") }
|
||||
|
||||
func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
|
||||
func (c *ServerConfig) ShouldDiscover() bool {
|
||||
return c.DiscoveryURL != "" || len(c.DiscoveryCfg.Endpoints) > 0
|
||||
}
|
||||
|
||||
// ReqTimeout returns timeout for request to finish.
|
||||
func (c *ServerConfig) ReqTimeout() time.Duration {
|
||||
|
@ -95,8 +95,6 @@ const (
|
||||
// It's enabled by default.
|
||||
DefaultStrictReconfigCheck = true
|
||||
|
||||
DefaultEnableV2Discovery = true
|
||||
|
||||
// maxElectionMs specifies the maximum value of election timeout.
|
||||
// More details are listed in ../Documentation/tuning.md#time-parameters.
|
||||
maxElectionMs = 50000
|
||||
@ -106,7 +104,7 @@ const (
|
||||
|
||||
var (
|
||||
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set. " +
|
||||
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
|
||||
"Choose one of \"initial-cluster\", \"discovery\", \"discovery-endpoints\" or \"discovery-srv\"")
|
||||
ErrUnsetAdvertiseClientURLsFlag = fmt.Errorf("--advertise-client-urls is required when --listen-client-urls is set explicitly")
|
||||
ErrLogRotationInvalidLogOutput = fmt.Errorf("--log-outputs requires a single file path when --log-rotate-config-json is defined")
|
||||
|
||||
@ -227,9 +225,8 @@ type Config struct {
|
||||
DNSClusterServiceName string `json:"discovery-srv-name"`
|
||||
Dproxy string `json:"discovery-proxy"`
|
||||
|
||||
EnableV2Discovery bool `json:"enable-v2-discovery"`
|
||||
Durl string `json:"discovery"`
|
||||
DiscoveryCfg v3discovery.DiscoveryConfig `json:"discovery-config"`
|
||||
Durl string `json:"discovery"`
|
||||
DiscoveryCfg v3discovery.DiscoveryConfig `json:"discovery-config"`
|
||||
|
||||
InitialCluster string `json:"initial-cluster"`
|
||||
InitialClusterToken string `json:"initial-cluster-token"`
|
||||
@ -518,7 +515,6 @@ func NewConfig() *Config {
|
||||
|
||||
V2Deprecation: config.V2_DEPR_DEFAULT,
|
||||
|
||||
EnableV2Discovery: DefaultEnableV2Discovery,
|
||||
DiscoveryCfg: v3discovery.DiscoveryConfig{
|
||||
DialTimeout: DefaultDiscoveryDialTimeout,
|
||||
RequestTimeOut: DefaultDiscoveryRequestTimeOut,
|
||||
@ -606,8 +602,8 @@ func (cfg *configYAML) configFromFile(path string) error {
|
||||
cfg.HostWhitelist = uv.Values
|
||||
}
|
||||
|
||||
// If a discovery flag is set, clear default initial cluster set by InitialClusterFromName
|
||||
if (cfg.Durl != "" || cfg.DNSCluster != "") && cfg.InitialCluster == defaultInitialCluster {
|
||||
// If a discovery or discovery-endpoints flag is set, clear default initial cluster set by InitialClusterFromName
|
||||
if (cfg.Durl != "" || cfg.DNSCluster != "" || len(cfg.DiscoveryCfg.Endpoints) > 0) && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = ""
|
||||
}
|
||||
if cfg.ClusterState == "" {
|
||||
@ -674,7 +670,7 @@ func (cfg *Config) Validate() error {
|
||||
}
|
||||
// Check if conflicting flags are passed.
|
||||
nSet := 0
|
||||
for _, v := range []bool{cfg.Durl != "", cfg.InitialCluster != "", cfg.DNSCluster != ""} {
|
||||
for _, v := range []bool{cfg.Durl != "", cfg.InitialCluster != "", cfg.DNSCluster != "", len(cfg.DiscoveryCfg.Endpoints) > 0} {
|
||||
if v {
|
||||
nSet++
|
||||
}
|
||||
@ -690,18 +686,24 @@ func (cfg *Config) Validate() error {
|
||||
|
||||
// Check if both v2 discovery and v3 discovery flags are passed.
|
||||
v2discoveryFlagsExist := cfg.Dproxy != ""
|
||||
v3discoveryFlagsExist := cfg.DiscoveryCfg.CertFile != "" ||
|
||||
v3discoveryFlagsExist := len(cfg.DiscoveryCfg.Endpoints) > 0 ||
|
||||
cfg.DiscoveryCfg.Token != "" ||
|
||||
cfg.DiscoveryCfg.CertFile != "" ||
|
||||
cfg.DiscoveryCfg.KeyFile != "" ||
|
||||
cfg.DiscoveryCfg.TrustedCAFile != "" ||
|
||||
cfg.DiscoveryCfg.User != "" ||
|
||||
cfg.DiscoveryCfg.Password != ""
|
||||
if cfg.EnableV2Discovery && v3discoveryFlagsExist {
|
||||
return errors.New("v2 discovery is enabled, but some v3 discovery " +
|
||||
"settings (discovery-cert, discovery-key, discovery-cacert, " +
|
||||
"discovery-user, discovery-password) are set")
|
||||
|
||||
if v2discoveryFlagsExist && v3discoveryFlagsExist {
|
||||
return errors.New("both v2 discovery settings (discovery, discovery-proxy) " +
|
||||
"and v3 discovery settings (discovery-token, discovery-endpoints, discovery-cert, " +
|
||||
"discovery-key, discovery-cacert, discovery-user, discovery-password) are set")
|
||||
}
|
||||
if !cfg.EnableV2Discovery && v2discoveryFlagsExist {
|
||||
return errors.New("v3 discovery is enabled, but --discovery-proxy is set")
|
||||
|
||||
// If one of `discovery-token` and `discovery-endpoints` is provided,
|
||||
// then the other one must be provided as well.
|
||||
if (cfg.DiscoveryCfg.Token != "") != (len(cfg.DiscoveryCfg.Endpoints) > 0) {
|
||||
return errors.New("both --discovery-token and --discovery-endpoints must be set")
|
||||
}
|
||||
|
||||
if cfg.TickMs == 0 {
|
||||
@ -753,11 +755,18 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok
|
||||
switch {
|
||||
case cfg.Durl != "":
|
||||
urlsmap = types.URLsMap{}
|
||||
// If using discovery, generate a temporary cluster based on
|
||||
// If using v2 discovery, generate a temporary cluster based on
|
||||
// self's advertised peer URLs
|
||||
urlsmap[cfg.Name] = cfg.APUrls
|
||||
token = cfg.Durl
|
||||
|
||||
case len(cfg.DiscoveryCfg.Endpoints) > 0:
|
||||
urlsmap = types.URLsMap{}
|
||||
// If using v3 discovery, generate a temporary cluster based on
|
||||
// self's advertised peer URLs
|
||||
urlsmap[cfg.Name] = cfg.APUrls
|
||||
token = cfg.DiscoveryCfg.Token
|
||||
|
||||
case cfg.DNSCluster != "":
|
||||
clusterStrs, cerr := cfg.GetDNSClusterNames()
|
||||
lg := cfg.logger
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -175,7 +176,6 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
MaxWALFiles: cfg.MaxWalFiles,
|
||||
InitialPeerURLsMap: urlsmap,
|
||||
InitialClusterToken: token,
|
||||
EnableV2Discovery: cfg.EnableV2Discovery,
|
||||
DiscoveryURL: cfg.Durl,
|
||||
DiscoveryProxy: cfg.Dproxy,
|
||||
DiscoveryCfg: cfg.DiscoveryCfg,
|
||||
@ -348,6 +348,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
|
||||
zap.String("discovery-url", sc.DiscoveryURL),
|
||||
zap.String("discovery-proxy", sc.DiscoveryProxy),
|
||||
|
||||
zap.String("discovery-token", sc.DiscoveryCfg.Token),
|
||||
zap.String("discovery-endpoints", strings.Join(sc.DiscoveryCfg.Endpoints, ",")),
|
||||
zap.String("discovery-dial-timeout", sc.DiscoveryCfg.DialTimeout.String()),
|
||||
zap.String("discovery-request-timeout", sc.DiscoveryCfg.RequestTimeOut.String()),
|
||||
zap.String("discovery-keepalive-time", sc.DiscoveryCfg.KeepAliveTime.String()),
|
||||
|
@ -185,10 +185,16 @@ func newConfig() *config {
|
||||
"advertise-client-urls",
|
||||
"List of this member's client URLs to advertise to the public.",
|
||||
)
|
||||
fs.BoolVar(&cfg.ec.EnableV2Discovery, "enable-v2-discovery", cfg.ec.EnableV2Discovery, "Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.")
|
||||
fs.StringVar(&cfg.ec.Durl, "discovery", cfg.ec.Durl, "Discovery URL used to bootstrap the cluster.")
|
||||
|
||||
fs.StringVar(&cfg.ec.Durl, "discovery", cfg.ec.Durl, "Discovery URL used to bootstrap the cluster for v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.")
|
||||
fs.Var(cfg.cf.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %q", cfg.cf.fallback.Valids()))
|
||||
|
||||
fs.Var(
|
||||
flags.NewUniqueStringsValue(""),
|
||||
"discovery-endpoints",
|
||||
"V3 discovery: List of gRPC endpoints of the discovery service.",
|
||||
)
|
||||
fs.StringVar(&cfg.ec.DiscoveryCfg.Token, "discovery-token", "", "V3 discovery: discovery token for the etcd cluster to be bootstrapped.")
|
||||
fs.DurationVar(&cfg.ec.DiscoveryCfg.DialTimeout, "discovery-dial-timeout", cfg.ec.DiscoveryCfg.DialTimeout, "V3 discovery: dial timeout for client connections.")
|
||||
fs.DurationVar(&cfg.ec.DiscoveryCfg.RequestTimeOut, "discovery-request-timeout", cfg.ec.DiscoveryCfg.RequestTimeOut, "V3 discovery: timeout for discovery requests (excluding dial timeout).")
|
||||
fs.DurationVar(&cfg.ec.DiscoveryCfg.KeepAliveTime, "discovery-keepalive-time", cfg.ec.DiscoveryCfg.KeepAliveTime, "V3 discovery: keepalive time for client connections.")
|
||||
@ -408,6 +414,8 @@ func (cfg *config) configFromCmdLine() error {
|
||||
cfg.ec.ACUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls")
|
||||
cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls")
|
||||
|
||||
cfg.ec.DiscoveryCfg.Endpoints = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "discovery-endpoints")
|
||||
|
||||
cfg.ec.CORS = flags.UniqueURLsMapFromFlag(cfg.cf.flagSet, "cors")
|
||||
cfg.ec.HostWhitelist = flags.UniqueStringsMapFromFlag(cfg.cf.flagSet, "host-whitelist")
|
||||
|
||||
@ -428,7 +436,7 @@ func (cfg *config) configFromCmdLine() error {
|
||||
}
|
||||
|
||||
// disable default initial-cluster if discovery is set
|
||||
if (cfg.ec.Durl != "" || cfg.ec.DNSCluster != "" || cfg.ec.DNSClusterServiceName != "") && !flags.IsSet(cfg.cf.flagSet, "initial-cluster") {
|
||||
if (cfg.ec.Durl != "" || cfg.ec.DNSCluster != "" || cfg.ec.DNSClusterServiceName != "" || len(cfg.ec.DiscoveryCfg.Endpoints) > 0) && !flags.IsSet(cfg.cf.flagSet, "initial-cluster") {
|
||||
cfg.ec.InitialCluster = ""
|
||||
}
|
||||
|
||||
|
@ -196,8 +196,8 @@ func startEtcdOrProxyV2(args []string) {
|
||||
if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
|
||||
lg.Warn("forgot to set --initial-advertise-peer-urls?")
|
||||
}
|
||||
if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 {
|
||||
lg.Warn("--discovery flag is not set")
|
||||
if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 && len(cfg.ec.DiscoveryCfg.Endpoints) == 0 {
|
||||
lg.Warn("V2 discovery settings (i.e., --discovery) or v3 discovery settings (i.e., --discovery-token, --discovery-endpoints) are not set")
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
@ -287,7 +287,7 @@ func startProxy(cfg *config) error {
|
||||
b, err := os.ReadFile(clusterfile)
|
||||
switch {
|
||||
case err == nil:
|
||||
if cfg.ec.Durl != "" {
|
||||
if cfg.ec.Durl != "" || len(cfg.ec.DiscoveryCfg.Endpoints) > 0 {
|
||||
lg.Warn(
|
||||
"discovery token ignored since the proxy has already been initialized; valid cluster file found",
|
||||
zap.String("cluster-file", clusterfile),
|
||||
@ -318,21 +318,22 @@ func startProxy(cfg *config) error {
|
||||
return fmt.Errorf("error setting up initial cluster: %v", err)
|
||||
}
|
||||
|
||||
var s string
|
||||
if cfg.ec.Durl != "" {
|
||||
var s string
|
||||
if cfg.ec.EnableV2Discovery {
|
||||
lg.Warn("V2 discovery is deprecated!")
|
||||
s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
|
||||
} else {
|
||||
s, err = v3discovery.GetCluster(lg, cfg.ec.Durl, &cfg.ec.DiscoveryCfg)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lg.Warn("V2 discovery is deprecated!")
|
||||
s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
|
||||
} else if len(cfg.ec.DiscoveryCfg.Endpoints) > 0 {
|
||||
s, err = v3discovery.GetCluster(lg, &cfg.ec.DiscoveryCfg)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s != "" {
|
||||
if urlsmap, err = types.NewURLsMap(s); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
peerURLs = urlsmap.URLs()
|
||||
lg.Info("proxy using peer URLS", zap.Strings("peer-urls", peerURLs))
|
||||
|
||||
|
@ -104,10 +104,12 @@ Clustering:
|
||||
--advertise-client-urls 'http://localhost:2379'
|
||||
List of this member's client URLs to advertise to the public.
|
||||
The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster.
|
||||
--enable-v2-discovery 'true'
|
||||
Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.
|
||||
--discovery ''
|
||||
Discovery URL used to bootstrap the cluster.
|
||||
Discovery URL used to bootstrap the cluster for v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.
|
||||
--discovery-token ''
|
||||
V3 discovery: discovery token for the etcd cluster to be bootstrapped.
|
||||
--discovery-endpoints ''
|
||||
V3 discovery: List of gRPC endpoints of the discovery service.
|
||||
--discovery-dial-timeout '2s'
|
||||
V3 discovery: dial timeout for client connections.
|
||||
--discovery-request-timeout '5s'
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"errors"
|
||||
|
||||
"math"
|
||||
"net/url"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
@ -56,7 +55,8 @@ var (
|
||||
)
|
||||
|
||||
type DiscoveryConfig struct {
|
||||
Url string `json:"discovery"`
|
||||
Token string `json:"discovery-token"`
|
||||
Endpoints []string `json:"discovery-endpoints"`
|
||||
|
||||
DialTimeout time.Duration `json:"discovery-dial-timeout"`
|
||||
RequestTimeOut time.Duration `json:"discovery-request-timeout"`
|
||||
@ -110,10 +110,10 @@ func getMemberKey(cluster, memberId string) string {
|
||||
return path.Join(getMemberKeyPrefix(cluster), memberId)
|
||||
}
|
||||
|
||||
// GetCluster will connect to the discovery service at the given url and
|
||||
// GetCluster will connect to the discovery service at the given endpoints and
|
||||
// retrieve a string describing the cluster
|
||||
func GetCluster(lg *zap.Logger, dUrl string, cfg *DiscoveryConfig) (cs string, rerr error) {
|
||||
d, err := newDiscovery(lg, dUrl, cfg, 0)
|
||||
func GetCluster(lg *zap.Logger, cfg *DiscoveryConfig) (cs string, rerr error) {
|
||||
d, err := newDiscovery(lg, cfg, 0)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -137,15 +137,15 @@ func GetCluster(lg *zap.Logger, dUrl string, cfg *DiscoveryConfig) (cs string, r
|
||||
return d.getCluster()
|
||||
}
|
||||
|
||||
// JoinCluster will connect to the discovery service at the given url, and
|
||||
// JoinCluster will connect to the discovery service at the endpoints, and
|
||||
// register the server represented by the given id and config to the cluster.
|
||||
// The parameter `config` is supposed to be in the format "memberName=peerURLs",
|
||||
// such as "member1=http://127.0.0.1:2380".
|
||||
//
|
||||
// The final returned string has the same format as "--initial-cluster", such as
|
||||
// "infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380".
|
||||
func JoinCluster(lg *zap.Logger, durl string, cfg *DiscoveryConfig, id types.ID, config string) (cs string, rerr error) {
|
||||
d, err := newDiscovery(lg, durl, cfg, id)
|
||||
func JoinCluster(lg *zap.Logger, cfg *DiscoveryConfig, id types.ID, config string) (cs string, rerr error) {
|
||||
d, err := newDiscovery(lg, cfg, id)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -175,26 +175,19 @@ type discovery struct {
|
||||
memberId types.ID
|
||||
c *clientv3.Client
|
||||
retries uint
|
||||
durl string
|
||||
|
||||
cfg *DiscoveryConfig
|
||||
|
||||
clock clockwork.Clock
|
||||
}
|
||||
|
||||
func newDiscovery(lg *zap.Logger, durl string, dcfg *DiscoveryConfig, id types.ID) (*discovery, error) {
|
||||
func newDiscovery(lg *zap.Logger, dcfg *DiscoveryConfig, id types.ID) (*discovery, error) {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
u, err := url.Parse(durl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
token := u.Path
|
||||
u.Path = ""
|
||||
|
||||
lg = lg.With(zap.String("discovery-url", durl))
|
||||
cfg, err := newClientCfg(dcfg, u.String(), lg)
|
||||
lg = lg.With(zap.String("discovery-token", dcfg.Token), zap.String("discovery-endpoints", strings.Join(dcfg.Endpoints, ",")))
|
||||
cfg, err := newClientCfg(dcfg, lg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -205,10 +198,9 @@ func newDiscovery(lg *zap.Logger, durl string, dcfg *DiscoveryConfig, id types.I
|
||||
}
|
||||
return &discovery{
|
||||
lg: lg,
|
||||
clusterToken: token,
|
||||
clusterToken: dcfg.Token,
|
||||
memberId: id,
|
||||
c: c,
|
||||
durl: u.String(),
|
||||
cfg: dcfg,
|
||||
clock: clockwork.NewRealClock(),
|
||||
}, nil
|
||||
@ -216,7 +208,7 @@ func newDiscovery(lg *zap.Logger, durl string, dcfg *DiscoveryConfig, id types.I
|
||||
|
||||
// The following function follows the same logic as etcdctl, refer to
|
||||
// https://github.com/etcd-io/etcd/blob/f9a8c49c695b098d66a07948666664ea10d01a82/etcdctl/ctlv3/command/global.go#L191-L250
|
||||
func newClientCfg(dcfg *DiscoveryConfig, dUrl string, lg *zap.Logger) (*clientv3.Config, error) {
|
||||
func newClientCfg(dcfg *DiscoveryConfig, lg *zap.Logger) (*clientv3.Config, error) {
|
||||
var cfgtls *transport.TLSInfo
|
||||
|
||||
if dcfg.CertFile != "" || dcfg.KeyFile != "" || dcfg.TrustedCAFile != "" {
|
||||
@ -229,7 +221,7 @@ func newClientCfg(dcfg *DiscoveryConfig, dUrl string, lg *zap.Logger) (*clientv3
|
||||
}
|
||||
|
||||
cfg := &clientv3.Config{
|
||||
Endpoints: []string{dUrl},
|
||||
Endpoints: dcfg.Endpoints,
|
||||
DialTimeout: dcfg.DialTimeout,
|
||||
DialKeepAliveTime: dcfg.KeepAliveTime,
|
||||
DialKeepAliveTimeout: dcfg.KeepAliveTimeout,
|
||||
|
@ -329,10 +329,12 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*
|
||||
}
|
||||
if cfg.ShouldDiscover() {
|
||||
var str string
|
||||
if cfg.EnableV2Discovery {
|
||||
if cfg.DiscoveryURL != "" {
|
||||
cfg.Logger.Warn("V2 discovery is deprecated!")
|
||||
str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
|
||||
} else {
|
||||
str, err = v3discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String())
|
||||
cfg.Logger.Info("Bootstrapping cluster using v3 discovery.")
|
||||
str, err = v3discovery.JoinCluster(cfg.Logger, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String())
|
||||
}
|
||||
if err != nil {
|
||||
return nil, &DiscoveryError{Op: "join", Err: err}
|
||||
|
@ -23,24 +23,34 @@ import (
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
func TestClusterOf1UsingV3Discovery(t *testing.T) {
|
||||
testClusterUsingV3Discovery(t, 1, e2e.ClientNonTLS, false)
|
||||
func TestClusterOf1UsingV3Discovery_1endpoint(t *testing.T) {
|
||||
testClusterUsingV3Discovery(t, 1, 1, e2e.ClientNonTLS, false)
|
||||
}
|
||||
func TestClusterOf3UsingV3Discovery(t *testing.T) {
|
||||
testClusterUsingV3Discovery(t, 3, e2e.ClientTLS, true)
|
||||
func TestClusterOf3UsingV3Discovery_1endpoint(t *testing.T) {
|
||||
testClusterUsingV3Discovery(t, 1, 3, e2e.ClientTLS, true)
|
||||
}
|
||||
func TestTLSClusterOf3UsingV3Discovery(t *testing.T) {
|
||||
testClusterUsingV3Discovery(t, 5, e2e.ClientTLS, false)
|
||||
func TestTLSClusterOf5UsingV3Discovery_1endpoint(t *testing.T) {
|
||||
testClusterUsingV3Discovery(t, 1, 5, e2e.ClientTLS, false)
|
||||
}
|
||||
|
||||
func testClusterUsingV3Discovery(t *testing.T, clusterSize int, clientTlsType e2e.ClientConnType, isClientAutoTls bool) {
|
||||
func TestClusterOf1UsingV3Discovery_3endpoints(t *testing.T) {
|
||||
testClusterUsingV3Discovery(t, 3, 1, e2e.ClientNonTLS, false)
|
||||
}
|
||||
func TestClusterOf3UsingV3Discovery_3endpoints(t *testing.T) {
|
||||
testClusterUsingV3Discovery(t, 3, 3, e2e.ClientTLS, true)
|
||||
}
|
||||
func TestTLSClusterOf5UsingV3Discovery_3endpoints(t *testing.T) {
|
||||
testClusterUsingV3Discovery(t, 3, 5, e2e.ClientTLS, false)
|
||||
}
|
||||
|
||||
func testClusterUsingV3Discovery(t *testing.T, discoveryClusterSize, targetClusterSize int, clientTlsType e2e.ClientConnType, isClientAutoTls bool) {
|
||||
e2e.BeforeTest(t)
|
||||
|
||||
// step 1: start the discovery service
|
||||
ds, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
|
||||
InitialToken: "new",
|
||||
BasePort: 2000,
|
||||
ClusterSize: 1,
|
||||
ClusterSize: discoveryClusterSize,
|
||||
ClientTLS: clientTlsType,
|
||||
IsClientAutoTLS: isClientAutoTls,
|
||||
})
|
||||
@ -50,15 +60,15 @@ func testClusterUsingV3Discovery(t *testing.T, clusterSize int, clientTlsType e2
|
||||
defer ds.Close()
|
||||
|
||||
// step 2: configure the cluster size
|
||||
clusterToken := "8A591FAB-1D72-41FA-BDF2-A27162FDA1E0"
|
||||
configSizeKey := fmt.Sprintf("/_etcd/registry/%s/_config/size", clusterToken)
|
||||
configSizeValStr := strconv.Itoa(clusterSize)
|
||||
discoveryToken := "8A591FAB-1D72-41FA-BDF2-A27162FDA1E0"
|
||||
configSizeKey := fmt.Sprintf("/_etcd/registry/%s/_config/size", discoveryToken)
|
||||
configSizeValStr := strconv.Itoa(targetClusterSize)
|
||||
if err := ctlV3Put(ctlCtx{epc: ds}, configSizeKey, configSizeValStr, ""); err != nil {
|
||||
t.Errorf("failed to configure cluster size to discovery serivce, error: %v", err)
|
||||
}
|
||||
|
||||
// step 3: start the etcd cluster
|
||||
epc, err := bootstrapEtcdClusterUsingV3Discovery(t, ds.EndpointsV3()[0], clusterToken, clusterSize, clientTlsType, isClientAutoTls)
|
||||
epc, err := bootstrapEtcdClusterUsingV3Discovery(t, ds.EndpointsV3(), discoveryToken, targetClusterSize, clientTlsType, isClientAutoTls)
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||
}
|
||||
@ -74,26 +84,27 @@ func testClusterUsingV3Discovery(t *testing.T, clusterSize int, clientTlsType e2
|
||||
}
|
||||
}
|
||||
|
||||
func bootstrapEtcdClusterUsingV3Discovery(t *testing.T, durl string, clusterToken string, clusterSize int, clientTlsType e2e.ClientConnType, isClientAutoTls bool) (*e2e.EtcdProcessCluster, error) {
|
||||
func bootstrapEtcdClusterUsingV3Discovery(t *testing.T, discoveryEndpoints []string, discoveryToken string, clusterSize int, clientTlsType e2e.ClientConnType, isClientAutoTls bool) (*e2e.EtcdProcessCluster, error) {
|
||||
// cluster configuration
|
||||
cfg := &e2e.EtcdProcessClusterConfig{
|
||||
BasePort: 3000,
|
||||
ClusterSize: clusterSize,
|
||||
IsPeerTLS: true,
|
||||
IsPeerAutoTLS: true,
|
||||
Discovery: fmt.Sprintf("%s/%s", durl, clusterToken),
|
||||
BasePort: 3000,
|
||||
ClusterSize: clusterSize,
|
||||
IsPeerTLS: true,
|
||||
IsPeerAutoTLS: true,
|
||||
DiscoveryToken: discoveryToken,
|
||||
DiscoveryEndpoints: discoveryEndpoints,
|
||||
}
|
||||
|
||||
// initialize the cluster
|
||||
epc, err := e2e.InitEtcdProcessCluster(t, cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("could not initialize etcd cluster (%v)", err)
|
||||
return epc, err
|
||||
}
|
||||
|
||||
// populate discovery related security configuration
|
||||
for _, ep := range epc.Procs {
|
||||
epCfg := ep.Config()
|
||||
epCfg.Args = append(epCfg.Args, "--enable-v2-discovery=false")
|
||||
|
||||
if clientTlsType == e2e.ClientTLS {
|
||||
if isClientAutoTls {
|
||||
|
@ -170,7 +170,11 @@ type EtcdProcessClusterConfig struct {
|
||||
V2deprecation string
|
||||
|
||||
RollingStart bool
|
||||
Discovery string
|
||||
|
||||
Discovery string // v2 discovery
|
||||
|
||||
DiscoveryEndpoints []string // v3 discovery
|
||||
DiscoveryToken string
|
||||
}
|
||||
|
||||
// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
|
||||
@ -348,7 +352,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Discovery == "" {
|
||||
if cfg.Discovery == "" && len(cfg.DiscoveryEndpoints) == 0 {
|
||||
for i := range etcdCfgs {
|
||||
initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
|
||||
etcdCfgs[i].InitialCluster = strings.Join(initialCluster, ",")
|
||||
@ -356,6 +360,13 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
|
||||
}
|
||||
}
|
||||
|
||||
if len(cfg.DiscoveryEndpoints) > 0 {
|
||||
for i := range etcdCfgs {
|
||||
etcdCfgs[i].Args = append(etcdCfgs[i].Args, fmt.Sprintf("--discovery-token=%s", cfg.DiscoveryToken))
|
||||
etcdCfgs[i].Args = append(etcdCfgs[i].Args, fmt.Sprintf("--discovery-endpoints=%s", strings.Join(cfg.DiscoveryEndpoints, ",")))
|
||||
}
|
||||
}
|
||||
|
||||
return etcdCfgs
|
||||
}
|
||||
|
||||
|
@ -588,8 +588,6 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
|
||||
peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)
|
||||
clientScheme := SchemeFromTLSInfo(mcfg.ClientTLS)
|
||||
|
||||
m.EnableV2Discovery = embed.DefaultEnableV2Discovery
|
||||
|
||||
pln := newLocalListener(t)
|
||||
m.PeerListeners = []net.Listener{pln}
|
||||
m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
|
||||
|
Loading…
x
Reference in New Issue
Block a user