diff --git a/CHANGELOG/CHANGELOG-3.6.md b/CHANGELOG/CHANGELOG-3.6.md index 97a821346..7b8d95e2f 100644 --- a/CHANGELOG/CHANGELOG-3.6.md +++ b/CHANGELOG/CHANGELOG-3.6.md @@ -36,6 +36,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). - Add [`etcd --experimental-max-learners`](https://github.com/etcd-io/etcd/pull/13377) flag to allow configuration of learner max membership. - Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled. - Add [`etcdctl make-mirror --rev`](https://github.com/etcd-io/etcd/pull/13519) flag to support incremental mirror. +- Add [`etcd --experimental-wait-cluster-ready-timeout`](https://github.com/etcd-io/etcd/pull/13525) flag to wait for cluster to be ready before serving client requests. - Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435) - Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467). - Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399) diff --git a/server/config/config.go b/server/config/config.go index 43ecab7ec..15cb526f4 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -80,6 +80,10 @@ type ServerConfig struct { TickMs uint ElectionTicks int + // WaitClusterReadyTimeout is the maximum time to wait for the + // cluster to be ready on startup before serving client requests. + WaitClusterReadyTimeout time.Duration + // InitialElectionTickAdvance is true, then local member fast-forwards // election ticks to speed up "initial" leader election trigger. This // benefits the case of larger election ticks. For instance, cross diff --git a/server/embed/config.go b/server/embed/config.go index 46d9df0ad..c63a9f971 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -60,6 +60,7 @@ const ( DefaultGRPCKeepAliveInterval = 2 * time.Hour DefaultGRPCKeepAliveTimeout = 20 * time.Second DefaultDowngradeCheckTime = 5 * time.Second + DefaultWaitClusterReadyTimeout = 5 * time.Second DefaultListenPeerURLs = "http://localhost:2380" DefaultListenClientURLs = "http://localhost:2379" @@ -212,14 +213,15 @@ type Config struct { // Note that cipher suites are prioritized in the given order. CipherSuites []string `json:"cipher-suites"` - ClusterState string `json:"initial-cluster-state"` - DNSCluster string `json:"discovery-srv"` - DNSClusterServiceName string `json:"discovery-srv-name"` - Dproxy string `json:"discovery-proxy"` - Durl string `json:"discovery"` - InitialCluster string `json:"initial-cluster"` - InitialClusterToken string `json:"initial-cluster-token"` - StrictReconfigCheck bool `json:"strict-reconfig-check"` + ClusterState string `json:"initial-cluster-state"` + DNSCluster string `json:"discovery-srv"` + DNSClusterServiceName string `json:"discovery-srv-name"` + Dproxy string `json:"discovery-proxy"` + Durl string `json:"discovery"` + InitialCluster string `json:"initial-cluster"` + InitialClusterToken string `json:"initial-cluster-token"` + StrictReconfigCheck bool `json:"strict-reconfig-check"` + ExperimentalWaitClusterReadyTimeout time.Duration `json:"wait-cluster-ready-timeout"` // AutoCompactionMode is either 'periodic' or 'revision'. AutoCompactionMode string `json:"auto-compaction-mode"` @@ -471,8 +473,9 @@ func NewConfig() *Config { APUrls: []url.URL{*apurl}, ACUrls: []url.URL{*acurl}, - ClusterState: ClusterStateFlagNew, - InitialClusterToken: "etcd-cluster", + ClusterState: ClusterStateFlagNew, + InitialClusterToken: "etcd-cluster", + ExperimentalWaitClusterReadyTimeout: DefaultWaitClusterReadyTimeout, StrictReconfigCheck: DefaultStrictReconfigCheck, Metrics: "basic", diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 704842770..825c56869 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -181,6 +181,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { PeerTLSInfo: cfg.PeerTLSInfo, TickMs: cfg.TickMs, ElectionTicks: cfg.ElectionTicks(), + WaitClusterReadyTimeout: cfg.ExperimentalWaitClusterReadyTimeout, InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, AutoCompactionRetention: autoCompactionRetention, AutoCompactionMode: cfg.AutoCompactionMode, @@ -321,6 +322,7 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.Bool("force-new-cluster", sc.ForceNewCluster), zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)), zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)), + zap.String("wait-cluster-ready-timeout", sc.WaitClusterReadyTimeout.String()), zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance), zap.Uint64("snapshot-count", sc.SnapshotCount), zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries), diff --git a/server/embed/serve.go b/server/embed/serve.go index 455abd43d..bce15a339 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "strings" + "time" etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw" "go.etcd.io/etcd/client/pkg/v3/transport" @@ -93,7 +94,15 @@ func (sctx *serveCtx) serve( errHandler func(error), gopts ...grpc.ServerOption) (err error) { logger := defaultLog.New(io.Discard, "etcdhttp", 0) - <-s.ReadyNotify() + + // When the quorum isn't satisfied, then etcd server will be blocked + // on <-s.ReadyNotify(). Set a timeout here so that the etcd server + // can continue to serve serializable read request. + select { + case <-time.After(s.Cfg.WaitClusterReadyTimeout): + sctx.lg.Warn("timed out waiting for the ready notification") + case <-s.ReadyNotify(): + } sctx.lg.Info("ready to serve client requests") diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 4257f1ba0..efb9368a1 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -293,6 +293,7 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.") fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.") fs.IntVar(&cfg.ec.ExperimentalMaxLearners, "experimental-max-learners", membership.DefaultMaxLearners, "Sets the maximum number of learners that can be available in the cluster membership.") + fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.") // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index b9dea2e3c..fdd62cb09 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -250,6 +250,8 @@ Experimental feature: Set time duration after which a warning is generated if a unary request takes more than this duration. --experimental-max-learners '1' Set the max number of learner members allowed in the cluster membership. + --experimental-wait-cluster-ready-timeout '5s' + Set the maximum time duration to wait for the cluster to be ready. Unsafe feature: --force-new-cluster 'false' diff --git a/tests/e2e/ctl_v3_kv_no_quorum_test.go b/tests/e2e/ctl_v3_kv_no_quorum_test.go new file mode 100644 index 000000000..942a6e4a9 --- /dev/null +++ b/tests/e2e/ctl_v3_kv_no_quorum_test.go @@ -0,0 +1,71 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// When the quorum isn't satisfied, then each etcd member isn't able to +// publish/register server information(i.e., clientURL) into the cluster. +// Accordingly, the v2 proxy can't get any member's clientURL, so this +// case will fail for sure in this case. +// +// todo(ahrtr): When v2 proxy is removed, then we can remove the go build +// lines below. +//go:build !cluster_proxy +// +build !cluster_proxy + +package e2e + +import ( + "testing" + + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestSerializableReadWithoutQuorum(t *testing.T) { + // Initialize a cluster with 3 members + epc, err := e2e.InitEtcdProcessCluster(t, e2e.NewConfigAutoTLS()) + if err != nil { + t.Fatalf("Failed to initilize the etcd cluster: %v", err) + } + + // Remove two members, so that only one etcd will get started + epc.Procs = epc.Procs[:1] + + // Start the etcd cluster with only one member + if err := epc.Start(); err != nil { + t.Fatalf("Failed to start the etcd cluster: %v", err) + } + + // construct the ctl context + cx := getDefaultCtlCtx(t) + cx.epc = epc + + // run serializable test and wait for result + runCtlTest(t, serializableReadTest, nil, cx) + + // run linearizable test and wait for result + runCtlTest(t, linearizableReadTest, nil, cx) +} + +func serializableReadTest(cx ctlCtx) { + cx.quorum = false + if err := ctlV3Get(cx, []string{"key1"}, []kv{}...); err != nil { + cx.t.Errorf("serializableReadTest failed: %v", err) + } +} + +func linearizableReadTest(cx ctlCtx) { + cx.quorum = true + if err := ctlV3Get(cx, []string{"key1"}, []kv{}...); err == nil { + cx.t.Error("linearizableReadTest is expected to fail, but it succeeded") + } +} diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index 05c75b816..fc544603d 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -213,14 +213,18 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { testCtlWithOffline(t, testFunc, nil, opts...) } -func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), opts ...ctlOption) { - e2e.BeforeTest(t) - - ret := ctlCtx{ +func getDefaultCtlCtx(t *testing.T) ctlCtx { + return ctlCtx{ t: t, cfg: *e2e.NewConfigAutoTLS(), dialTimeout: 7 * time.Second, } +} + +func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), opts ...ctlOption) { + e2e.BeforeTest(t) + + ret := getDefaultCtlCtx(t) ret.applyOpts(opts) if !ret.quorum { @@ -244,15 +248,19 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun ret.epc = epc ret.dataDir = epc.Procs[0].Config().DataDirPath + runCtlTest(t, testFunc, testOfflineFunc, ret) +} + +func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), cx ctlCtx) { defer func() { - if ret.envMap != nil { - for k := range ret.envMap { + if cx.envMap != nil { + for k := range cx.envMap { os.Unsetenv(k) } - ret.envMap = make(map[string]string) + cx.envMap = make(map[string]string) } - if ret.epc != nil { - if errC := ret.epc.Close(); errC != nil { + if cx.epc != nil { + if errC := cx.epc.Close(); errC != nil { t.Fatalf("error closing etcd processes (%v)", errC) } } @@ -261,12 +269,12 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun donec := make(chan struct{}) go func() { defer close(donec) - testFunc(ret) + testFunc(cx) t.Log("---testFunc logic DONE") }() - timeout := 2*ret.dialTimeout + time.Second - if ret.dialTimeout == 0 { + timeout := 2*cx.dialTimeout + time.Second + if cx.dialTimeout == 0 { timeout = 30 * time.Second } select { @@ -276,12 +284,12 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun } t.Log("closing test cluster...") - assert.NoError(t, epc.Close()) - epc = nil + assert.NoError(t, cx.epc.Close()) + cx.epc = nil t.Log("closed test cluster...") if testOfflineFunc != nil { - testOfflineFunc(ret) + testOfflineFunc(cx) } } diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 4a7c9798b..242ebe0d6 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -176,6 +176,26 @@ type EtcdProcessClusterConfig struct { // NewEtcdProcessCluster launches a new cluster from etcd processes, returning // a new EtcdProcessCluster once all nodes are ready to accept client requests. func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { + epc, err := InitEtcdProcessCluster(t, cfg) + if err != nil { + return nil, err + } + + if cfg.RollingStart { + if err := epc.RollingStart(); err != nil { + return nil, fmt.Errorf("Cannot rolling-start: %v", err) + } + } else { + if err := epc.Start(); err != nil { + return nil, fmt.Errorf("Cannot start: %v", err) + } + } + return epc, nil +} + +// InitEtcdProcessCluster initializes a new cluster based on the given config. +// It doesn't start the cluster. +func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { SkipInShortMode(t) etcdCfgs := cfg.EtcdServerProcessConfigs(t) @@ -190,20 +210,11 @@ func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdPr proc, err := NewEtcdProcess(etcdCfgs[i]) if err != nil { epc.Close() - return nil, fmt.Errorf("Cannot configure: %v", err) + return nil, fmt.Errorf("cannot configure: %v", err) } epc.Procs[i] = proc } - if cfg.RollingStart { - if err := epc.RollingStart(); err != nil { - return nil, fmt.Errorf("Cannot rolling-start: %v", err) - } - } else { - if err := epc.Start(); err != nil { - return nil, fmt.Errorf("Cannot start: %v", err) - } - } return epc, nil }