mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #16677 from serathius/revert-13525
Revert "etcd server shouldn't wait for the ready notification infinitely on startup"
This commit is contained in:
commit
1c5289dd73
@ -65,7 +65,6 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
|
||||
- Add [`etcd --experimental-max-learners`](https://github.com/etcd-io/etcd/pull/13377) flag to allow configuration of learner max membership.
|
||||
- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled.
|
||||
- Add [`etcdctl make-mirror --rev`](https://github.com/etcd-io/etcd/pull/13519) flag to support incremental mirror.
|
||||
- Add [`etcd --experimental-wait-cluster-ready-timeout`](https://github.com/etcd-io/etcd/pull/13525) flag to wait for cluster to be ready before serving client requests.
|
||||
- Add [v3 discovery](https://github.com/etcd-io/etcd/pull/13635) to bootstrap a new etcd cluster.
|
||||
- Add [field `storage`](https://github.com/etcd-io/etcd/pull/13772) into the response body of endpoint `/version`.
|
||||
- Add [`etcd --max-concurrent-streams`](https://github.com/etcd-io/etcd/pull/14169) flag to configure the max concurrent streams each client can open at a time, and defaults to math.MaxUint32.
|
||||
|
@ -83,10 +83,6 @@ type ServerConfig struct {
|
||||
TickMs uint
|
||||
ElectionTicks int
|
||||
|
||||
// WaitClusterReadyTimeout is the maximum time to wait for the
|
||||
// cluster to be ready on startup before serving client requests.
|
||||
WaitClusterReadyTimeout time.Duration
|
||||
|
||||
// InitialElectionTickAdvance is true, then local member fast-forwards
|
||||
// election ticks to speed up "initial" leader election trigger. This
|
||||
// benefits the case of larger election ticks. For instance, cross
|
||||
|
@ -68,7 +68,6 @@ const (
|
||||
DefaultGRPCKeepAliveInterval = 2 * time.Hour
|
||||
DefaultGRPCKeepAliveTimeout = 20 * time.Second
|
||||
DefaultDowngradeCheckTime = 5 * time.Second
|
||||
DefaultWaitClusterReadyTimeout = 5 * time.Second
|
||||
DefaultAutoCompactionMode = "periodic"
|
||||
|
||||
DefaultDiscoveryDialTimeout = 2 * time.Second
|
||||
@ -242,10 +241,9 @@ type Config struct {
|
||||
Durl string `json:"discovery"`
|
||||
DiscoveryCfg v3discovery.DiscoveryConfig `json:"discovery-config"`
|
||||
|
||||
InitialCluster string `json:"initial-cluster"`
|
||||
InitialClusterToken string `json:"initial-cluster-token"`
|
||||
StrictReconfigCheck bool `json:"strict-reconfig-check"`
|
||||
ExperimentalWaitClusterReadyTimeout time.Duration `json:"wait-cluster-ready-timeout"`
|
||||
InitialCluster string `json:"initial-cluster"`
|
||||
InitialClusterToken string `json:"initial-cluster-token"`
|
||||
StrictReconfigCheck bool `json:"strict-reconfig-check"`
|
||||
|
||||
// AutoCompactionMode is either 'periodic' or 'revision'.
|
||||
AutoCompactionMode string `json:"auto-compaction-mode"`
|
||||
@ -502,9 +500,8 @@ func NewConfig() *Config {
|
||||
AdvertisePeerUrls: []url.URL{*apurl},
|
||||
AdvertiseClientUrls: []url.URL{*acurl},
|
||||
|
||||
ClusterState: ClusterStateFlagNew,
|
||||
InitialClusterToken: "etcd-cluster",
|
||||
ExperimentalWaitClusterReadyTimeout: DefaultWaitClusterReadyTimeout,
|
||||
ClusterState: ClusterStateFlagNew,
|
||||
InitialClusterToken: "etcd-cluster",
|
||||
|
||||
StrictReconfigCheck: DefaultStrictReconfigCheck,
|
||||
Metrics: "basic",
|
||||
@ -728,7 +725,6 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
|
||||
fs.BoolVar(&cfg.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
|
||||
fs.UintVar(&cfg.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
|
||||
fs.IntVar(&cfg.ExperimentalMaxLearners, "experimental-max-learners", membership.DefaultMaxLearners, "Sets the maximum number of learners that can be available in the cluster membership.")
|
||||
fs.DurationVar(&cfg.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.")
|
||||
fs.Uint64Var(&cfg.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the raft storage entries.")
|
||||
|
||||
// unsafe
|
||||
|
@ -185,7 +185,6 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
PeerTLSInfo: cfg.PeerTLSInfo,
|
||||
TickMs: cfg.TickMs,
|
||||
ElectionTicks: cfg.ElectionTicks(),
|
||||
WaitClusterReadyTimeout: cfg.ExperimentalWaitClusterReadyTimeout,
|
||||
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
||||
AutoCompactionRetention: autoCompactionRetention,
|
||||
AutoCompactionMode: cfg.AutoCompactionMode,
|
||||
@ -328,7 +327,6 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
|
||||
zap.Bool("force-new-cluster", sc.ForceNewCluster),
|
||||
zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)),
|
||||
zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
|
||||
zap.String("wait-cluster-ready-timeout", sc.WaitClusterReadyTimeout.String()),
|
||||
zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
|
||||
zap.Uint64("snapshot-count", sc.SnapshotCount),
|
||||
zap.Uint("max-wals", sc.MaxWALFiles),
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
gw "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
"github.com/soheilhy/cmux"
|
||||
@ -100,15 +99,7 @@ func (sctx *serveCtx) serve(
|
||||
splitHttp bool,
|
||||
gopts ...grpc.ServerOption) (err error) {
|
||||
logger := defaultLog.New(io.Discard, "etcdhttp", 0)
|
||||
|
||||
// When the quorum isn't satisfied, then etcd server will be blocked
|
||||
// on <-s.ReadyNotify(). Set a timeout here so that the etcd server
|
||||
// can continue to serve serializable read request.
|
||||
select {
|
||||
case <-time.After(s.Cfg.WaitClusterReadyTimeout):
|
||||
sctx.lg.Warn("timed out waiting for the ready notification")
|
||||
case <-s.ReadyNotify():
|
||||
}
|
||||
<-s.ReadyNotify()
|
||||
|
||||
sctx.lg.Info("ready to serve client requests")
|
||||
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
@ -211,8 +210,6 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
|
||||
select {
|
||||
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
|
||||
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
|
||||
case <-time.After(cfg.ExperimentalWaitClusterReadyTimeout):
|
||||
e.GetLogger().Warn("startEtcd: timed out waiting for the ready notification")
|
||||
}
|
||||
return e.Server.StopNotify(), e.Err(), nil
|
||||
}
|
||||
|
@ -293,8 +293,6 @@ Experimental feature:
|
||||
Set time duration after which a warning is generated if a unary request takes more than this duration. It's deprecated, and will be decommissioned in v3.7. Use --warning-unary-request-duration instead.
|
||||
--experimental-max-learners '1'
|
||||
Set the max number of learner members allowed in the cluster membership.
|
||||
--experimental-wait-cluster-ready-timeout '5s'
|
||||
Set the maximum time duration to wait for the cluster to be ready.
|
||||
--experimental-snapshot-catch-up-entries '5000'
|
||||
Number of entries for a slow follower to catch up after compacting the raft storage entries.
|
||||
--experimental-compaction-sleep-interval
|
||||
|
@ -12,11 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !cluster_proxy
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -155,22 +158,30 @@ func TestInPlaceRecovery(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Rolling recovery of the servers.
|
||||
wg := sync.WaitGroup{}
|
||||
t.Log("rolling updating servers in place...")
|
||||
for i, newProc := range epcNew.Procs {
|
||||
for i := range epcNew.Procs {
|
||||
oldProc := epcOld.Procs[i]
|
||||
err = oldProc.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("could not stop etcd process (%v)", err)
|
||||
}
|
||||
t.Logf("old cluster server %d: %s stopped.", i, oldProc.Config().Name)
|
||||
err = newProc.Start(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process (%v)", err)
|
||||
}
|
||||
t.Logf("new cluster server %d: %s started in-place with blank db.", i, newProc.Config().Name)
|
||||
wg.Add(1)
|
||||
// Start servers in background to avoid blocking on server start.
|
||||
// EtcdProcess.Start waits until etcd becomes healthy, which will not happen here until we restart at least 2 members.
|
||||
go func(proc e2e.EtcdProcess) {
|
||||
defer wg.Done()
|
||||
err = proc.Start(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("could not start etcd process (%v)", err)
|
||||
}
|
||||
t.Logf("new cluster server: %s started in-place with blank db.", proc.Config().Name)
|
||||
}(epcNew.Procs[i])
|
||||
t.Log("sleeping 5 sec to let nodes do periodical check...")
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
wg.Wait()
|
||||
t.Log("new cluster started.")
|
||||
|
||||
alarmResponse, err := newCc.AlarmList(ctx)
|
||||
|
@ -69,7 +69,6 @@ func TestCtlV3ConsistentMemberList(t *testing.T) {
|
||||
|
||||
epc, err := e2e.NewEtcdProcessCluster(ctx, t,
|
||||
e2e.WithClusterSize(1),
|
||||
e2e.WithWaitClusterReadyTimeout(1*time.Nanosecond),
|
||||
e2e.WithEnvVars(map[string]string{"GOFAIL_FAILPOINTS": `beforeApplyOneConfChange=sleep("2s")`}),
|
||||
)
|
||||
require.NoError(t, err, "failed to start etcd cluster: %v", err)
|
||||
|
@ -1,45 +0,0 @@
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
func TestInitDaemonNotifyWithoutQuorum(t *testing.T) {
|
||||
// Initialize a cluster with 3 members
|
||||
epc, err := e2e.InitEtcdProcessCluster(t, e2e.NewConfigAutoTLS())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to initilize the etcd cluster: %v", err)
|
||||
}
|
||||
|
||||
defer epc.Close()
|
||||
|
||||
// Remove two members, so that only one etcd will get started
|
||||
epc.Procs = epc.Procs[:1]
|
||||
|
||||
// Start the etcd cluster with only one member
|
||||
if err := epc.Start(context.TODO()); err != nil {
|
||||
t.Fatalf("Failed to start the etcd cluster: %v", err)
|
||||
}
|
||||
|
||||
// Expect log message indicating time out waiting for quorum hit
|
||||
e2e.AssertProcessLogs(t, epc.Procs[0], "startEtcd: timed out waiting for the ready notification")
|
||||
// Expect log message indicating systemd notify message has been sent
|
||||
e2e.AssertProcessLogs(t, epc.Procs[0], "notifying init daemon")
|
||||
}
|
@ -354,10 +354,6 @@ func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.ServerConfig.ExperimentalWatchProgressNotifyInterval = interval }
|
||||
}
|
||||
|
||||
func WithWaitClusterReadyTimeout(readyTimeout time.Duration) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.ServerConfig.ExperimentalWaitClusterReadyTimeout = readyTimeout }
|
||||
}
|
||||
|
||||
func WithEnvVars(ev map[string]string) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.EnvVars = ev }
|
||||
}
|
||||
@ -601,9 +597,6 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
||||
if cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0 {
|
||||
args = append(args, "--experimental-watch-progress-notify-interval", cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval.String())
|
||||
}
|
||||
if cfg.ServerConfig.ExperimentalWaitClusterReadyTimeout != 0 {
|
||||
args = append(args, "--experimental-wait-cluster-ready-timeout", cfg.ServerConfig.ExperimentalWaitClusterReadyTimeout.String())
|
||||
}
|
||||
if cfg.ServerConfig.SnapshotCatchUpEntries != etcdserver.DefaultSnapshotCatchUpEntries {
|
||||
if cfg.Version == CurrentVersion || (cfg.Version == MinorityLastVersion && i <= cfg.ClusterSize/2) || (cfg.Version == QuorumLastVersion && i > cfg.ClusterSize/2) {
|
||||
args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.ServerConfig.SnapshotCatchUpEntries))
|
||||
|
Loading…
x
Reference in New Issue
Block a user