mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
What would you like to be added?
Add new compactor based revision count, instead of fixed interval time.
In order to make it happen, the mvcc store needs to export
`CompactNotify` function to notify the compactor that configured number of
write transactions have occured since previsious compaction. The
new compactor can get the revision change and delete out-of-date data in time,
instead of waiting with fixed interval time. The underly bbolt db can
reuse the free pages as soon as possible.
Why is this needed?
In the kubernetes cluster, for instance, argo workflow, there will be batch
requests to create pods , and then there are also a lot of pod status's PATCH
requests, especially when the pod has more than 3 containers. If the burst
requests increase the db size in short time, it will be easy to exceed the max
quota size. And then the cluster admin get involved to defrag, which may casue
long downtime. So, we hope the ETCD can delete the out-of-date data as
soon as possible and slow down the grow of total db size.
Currently, both revision and periodic are based on time. It's not easy
to use fixed interval time to face the unexpected burst update requests.
The new compactor based on revision count can make the admin life easier.
For instance, let's say that average of object size is 50 KiB. The new
compactor will compact based on 10,000 revisions. It's like that ETCD can
compact after new 500 MiB data in, no matter how long ETCD takes to get
new 10,000 revisions. It can handle the burst update requests well.
There are some test results:
* Fixed value size: 10 KiB, Update Rate: 100/s, Total key space: 3,000
```
enchmark put --rate=100 --total=300000 --compact-interval=0 \
--key-space-size=3000 --key-size=256 --val-size=10240
```
| Compactor | DB Total Size | DB InUse Size |
| -- | -- | -- |
| Revision(5min,retension:10000) | 570 MiB | 208 MiB |
| Periodic(1m) | 232 MiB | 165 MiB |
| Periodic(30s) | 151 MiB | 127 MiB |
| NewRevision(retension:10000) | 195 MiB | 187 MiB |
* Random value size: [9 KiB, 11 KiB], Update Rate: 150/s, Total key space: 3,000
```
bnchmark put --rate=150 --total=300000 --compact-interval=0 \
--key-space-size=3000 --key-size=256 --val-size=10240 \
--delta-val-size=1024
```
| Compactor | DB Total Size | DB InUse Size |
| -- | -- | -- |
| Revision(5min,retension:10000) | 718 MiB | 554 MiB |
| Periodic(1m) | 297 MiB | 246 MiB |
| Periodic(30s) | 185 MiB | 146 MiB |
| NewRevision(retension:10000) | 186 MiB | 178 MiB |
* Random value size: [6 KiB, 14 KiB], Update Rate: 200/s, Total key space: 3,000
```
bnchmark put --rate=200 --total=300000 --compact-interval=0 \
--key-space-size=3000 --key-size=256 --val-size=10240 \
--delta-val-size=4096
```
| Compactor | DB Total Size | DB InUse Size |
| -- | -- | -- |
| Revision(5min,retension:10000) | 874 MiB | 221 MiB |
| Periodic(1m) | 357 MiB | 260 MiB |
| Periodic(30s) | 215 MiB | 151 MiB |
| NewRevision(retension:10000) | 182 MiB | 176 MiB |
For the burst requests, we needs to use short periodic interval.
Otherwise, the total size will be large. I think the new compactor can
handle it well.
Additional Change:
Currently, the quota system only checks DB total size. However, there
could be a lot of free pages which can be reused to upcoming requests.
Based on this proposal, I also want to extend current quota system with DB's
InUse size.
If the InUse size is less than max quota size, we should allow requests to
update. Since the bbolt might be resized if there is no available
continuous pages, we should setup a hard limit for the overflow, like 1
GiB.
```diff
// Quota represents an arbitrary quota against arbitrary requests. Each request
@@ -130,7 +134,17 @@ func (b *BackendQuota) Available(v interface{}) bool {
return true
}
// TODO: maybe optimize Backend.Size()
- return b.be.Size()+int64(cost) < b.maxBackendBytes
+
+ // Since the compact comes with allocatable pages, we should check the
+ // SizeInUse first. If there is no continuous pages for key/value and
+ // the boltdb continues to resize, it should not increase more than 1
+ // GiB. It's hard limitation.
+ //
+ // TODO: It should be enabled by flag.
+ if b.be.Size()+int64(cost)-b.maxBackendBytes >= maxAllowedOverflowBytes(b.maxBackendBytes) {
+ return false
+ }
+ return b.be.SizeInUse()+int64(cost) < b.maxBackendBytes
}
```
And it's likely to disable NOSPACE alarm if the compact can get much
more free pages. It can reduce downtime.
Signed-off-by: Wei Fu <fuweid89@gmail.com>
905 lines
29 KiB
Go
905 lines
29 KiB
Go
// Copyright 2016 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package embed
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
defaultLog "log"
|
|
"math"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.etcd.io/etcd/api/v3/version"
|
|
"go.etcd.io/etcd/client/pkg/v3/transport"
|
|
"go.etcd.io/etcd/client/pkg/v3/types"
|
|
"go.etcd.io/etcd/client/v3/credentials"
|
|
"go.etcd.io/etcd/pkg/v3/debugutil"
|
|
runtimeutil "go.etcd.io/etcd/pkg/v3/runtime"
|
|
"go.etcd.io/etcd/server/v3/config"
|
|
"go.etcd.io/etcd/server/v3/etcdserver"
|
|
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
|
|
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
|
|
"go.etcd.io/etcd/server/v3/storage"
|
|
"go.etcd.io/etcd/server/v3/verify"
|
|
|
|
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
|
"github.com/soheilhy/cmux"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/keepalive"
|
|
)
|
|
|
|
const (
|
|
// internal fd usage includes disk usage and transport usage.
|
|
// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
|
|
// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
|
|
// read all logs after some snapshot index, which locates at the end of
|
|
// the second last and the head of the last. For purging, it needs to read
|
|
// directory, so it needs 1. For fd monitor, it needs 1.
|
|
// For transport, rafthttp builds two long-polling connections and at most
|
|
// four temporary connections with each member. There are at most 9 members
|
|
// in a cluster, so it should reserve 96.
|
|
// For the safety, we set the total reserved number to 150.
|
|
reservedInternalFDNum = 150
|
|
)
|
|
|
|
// Etcd contains a running etcd server and its listeners.
|
|
type Etcd struct {
|
|
Peers []*peerListener
|
|
Clients []net.Listener
|
|
// a map of contexts for the servers that serves client requests.
|
|
sctxs map[string]*serveCtx
|
|
metricsListeners []net.Listener
|
|
|
|
tracingExporterShutdown func()
|
|
|
|
Server *etcdserver.EtcdServer
|
|
|
|
cfg Config
|
|
stopc chan struct{}
|
|
errc chan error
|
|
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
type peerListener struct {
|
|
net.Listener
|
|
serve func() error
|
|
close func(context.Context) error
|
|
}
|
|
|
|
// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
|
|
// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
|
|
// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
|
|
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|
if err = inCfg.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
serving := false
|
|
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
|
|
cfg := &e.cfg
|
|
defer func() {
|
|
if e == nil || err == nil {
|
|
return
|
|
}
|
|
if !serving {
|
|
// errored before starting gRPC server for serveCtx.serversC
|
|
for _, sctx := range e.sctxs {
|
|
close(sctx.serversC)
|
|
}
|
|
}
|
|
e.Close()
|
|
e = nil
|
|
}()
|
|
|
|
if !cfg.SocketOpts.Empty() {
|
|
cfg.logger.Info(
|
|
"configuring socket options",
|
|
zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
|
|
zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
|
|
)
|
|
}
|
|
e.cfg.logger.Info(
|
|
"configuring peer listeners",
|
|
zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()),
|
|
)
|
|
if e.Peers, err = configurePeerListeners(cfg); err != nil {
|
|
return e, err
|
|
}
|
|
|
|
e.cfg.logger.Info(
|
|
"configuring client listeners",
|
|
zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()),
|
|
)
|
|
if e.sctxs, err = configureClientListeners(cfg); err != nil {
|
|
return e, err
|
|
}
|
|
|
|
for _, sctx := range e.sctxs {
|
|
e.Clients = append(e.Clients, sctx.l)
|
|
}
|
|
|
|
var (
|
|
urlsmap types.URLsMap
|
|
token string
|
|
)
|
|
memberInitialized := true
|
|
if !isMemberInitialized(cfg) {
|
|
memberInitialized = false
|
|
urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
|
|
if err != nil {
|
|
return e, fmt.Errorf("error setting up initial cluster: %v", err)
|
|
}
|
|
}
|
|
|
|
// AutoCompactionRetention defaults to "0" if not set.
|
|
if len(cfg.AutoCompactionRetention) == 0 {
|
|
cfg.AutoCompactionRetention = "0"
|
|
}
|
|
autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
|
|
if err != nil {
|
|
return e, err
|
|
}
|
|
|
|
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
|
|
|
|
srvcfg := config.ServerConfig{
|
|
Name: cfg.Name,
|
|
ClientURLs: cfg.AdvertiseClientUrls,
|
|
PeerURLs: cfg.AdvertisePeerUrls,
|
|
DataDir: cfg.Dir,
|
|
DedicatedWALDir: cfg.WalDir,
|
|
SnapshotCount: cfg.SnapshotCount,
|
|
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
|
|
MaxSnapFiles: cfg.MaxSnapFiles,
|
|
MaxWALFiles: cfg.MaxWalFiles,
|
|
InitialPeerURLsMap: urlsmap,
|
|
InitialClusterToken: token,
|
|
DiscoveryURL: cfg.Durl,
|
|
DiscoveryProxy: cfg.Dproxy,
|
|
DiscoveryCfg: cfg.DiscoveryCfg,
|
|
NewCluster: cfg.IsNewCluster(),
|
|
PeerTLSInfo: cfg.PeerTLSInfo,
|
|
TickMs: cfg.TickMs,
|
|
ElectionTicks: cfg.ElectionTicks(),
|
|
WaitClusterReadyTimeout: cfg.ExperimentalWaitClusterReadyTimeout,
|
|
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
|
AutoCompactionRetention: autoCompactionRetention,
|
|
AutoCompactionMode: cfg.AutoCompactionMode,
|
|
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
|
BackendBatchLimit: cfg.BackendBatchLimit,
|
|
BackendFreelistType: backendFreelistType,
|
|
BackendBatchInterval: cfg.BackendBatchInterval,
|
|
MaxTxnOps: cfg.MaxTxnOps,
|
|
MaxRequestBytes: cfg.MaxRequestBytes,
|
|
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
|
|
SocketOpts: cfg.SocketOpts,
|
|
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
|
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
|
AuthToken: cfg.AuthToken,
|
|
BcryptCost: cfg.BcryptCost,
|
|
TokenTTL: cfg.AuthTokenTTL,
|
|
CORS: cfg.CORS,
|
|
HostWhitelist: cfg.HostWhitelist,
|
|
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
|
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
|
CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
|
|
CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
|
|
PreVote: cfg.PreVote,
|
|
Logger: cfg.logger,
|
|
ForceNewCluster: cfg.ForceNewCluster,
|
|
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
|
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
|
|
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
|
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
|
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
|
|
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
|
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
|
|
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
|
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
|
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
|
WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration,
|
|
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
|
|
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
|
|
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
|
|
ExperimentalMaxLearners: cfg.ExperimentalMaxLearners,
|
|
V2Deprecation: cfg.V2DeprecationEffective(),
|
|
}
|
|
|
|
if srvcfg.ExperimentalEnableDistributedTracing {
|
|
tctx := context.Background()
|
|
tracingExporter, err := newTracingExporter(tctx, cfg)
|
|
if err != nil {
|
|
return e, err
|
|
}
|
|
e.tracingExporterShutdown = func() {
|
|
tracingExporter.Close(tctx)
|
|
}
|
|
srvcfg.ExperimentalTracerOptions = tracingExporter.opts
|
|
|
|
e.cfg.logger.Info(
|
|
"distributed tracing setup enabled",
|
|
)
|
|
}
|
|
|
|
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
|
|
|
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
|
return e, err
|
|
}
|
|
|
|
// buffer channel so goroutines on closed connections won't wait forever
|
|
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
|
|
|
|
// newly started member ("memberInitialized==false")
|
|
// does not need corruption check
|
|
if memberInitialized && srvcfg.InitialCorruptCheck {
|
|
if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
|
|
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
|
|
// (nothing to close since rafthttp transports have not been started)
|
|
|
|
e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
|
|
e.Server.Cleanup()
|
|
e.Server = nil
|
|
return e, err
|
|
}
|
|
}
|
|
e.Server.Start()
|
|
|
|
if err = e.servePeers(); err != nil {
|
|
return e, err
|
|
}
|
|
if err = e.serveClients(); err != nil {
|
|
return e, err
|
|
}
|
|
if err = e.serveMetrics(); err != nil {
|
|
return e, err
|
|
}
|
|
|
|
e.cfg.logger.Info(
|
|
"now serving peer/client/metrics",
|
|
zap.String("local-member-id", e.Server.MemberId().String()),
|
|
zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerUrls()),
|
|
zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()),
|
|
zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()),
|
|
zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()),
|
|
zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
|
|
)
|
|
serving = true
|
|
return e, nil
|
|
}
|
|
|
|
func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized bool) {
|
|
cors := make([]string, 0, len(ec.CORS))
|
|
for v := range ec.CORS {
|
|
cors = append(cors, v)
|
|
}
|
|
sort.Strings(cors)
|
|
|
|
hss := make([]string, 0, len(ec.HostWhitelist))
|
|
for v := range ec.HostWhitelist {
|
|
hss = append(hss, v)
|
|
}
|
|
sort.Strings(hss)
|
|
|
|
quota := ec.QuotaBackendBytes
|
|
if quota == 0 {
|
|
quota = storage.DefaultQuotaBytes
|
|
}
|
|
|
|
lg.Info(
|
|
"starting an etcd server",
|
|
zap.String("etcd-version", version.Version),
|
|
zap.String("git-sha", version.GitSHA),
|
|
zap.String("go-version", runtime.Version()),
|
|
zap.String("go-os", runtime.GOOS),
|
|
zap.String("go-arch", runtime.GOARCH),
|
|
zap.Int("max-cpu-set", runtime.GOMAXPROCS(0)),
|
|
zap.Int("max-cpu-available", runtime.NumCPU()),
|
|
zap.Bool("member-initialized", memberInitialized),
|
|
zap.String("name", sc.Name),
|
|
zap.String("data-dir", sc.DataDir),
|
|
zap.String("wal-dir", ec.WalDir),
|
|
zap.String("wal-dir-dedicated", sc.DedicatedWALDir),
|
|
zap.String("member-dir", sc.MemberDir()),
|
|
zap.Bool("force-new-cluster", sc.ForceNewCluster),
|
|
zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)),
|
|
zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
|
|
zap.String("wait-cluster-ready-timeout", sc.WaitClusterReadyTimeout.String()),
|
|
zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
|
|
zap.Uint64("snapshot-count", sc.SnapshotCount),
|
|
zap.Uint("max-wals", sc.MaxWALFiles),
|
|
zap.Uint("max-snapshots", sc.MaxSnapFiles),
|
|
zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
|
|
zap.Strings("initial-advertise-peer-urls", ec.getAdvertisePeerUrls()),
|
|
zap.Strings("listen-peer-urls", ec.getListenPeerUrls()),
|
|
zap.Strings("advertise-client-urls", ec.getAdvertiseClientUrls()),
|
|
zap.Strings("listen-client-urls", ec.getListenClientUrls()),
|
|
zap.Strings("listen-metrics-urls", ec.getMetricsURLs()),
|
|
zap.Strings("cors", cors),
|
|
zap.Strings("host-whitelist", hss),
|
|
zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
|
|
zap.String("initial-cluster-state", ec.ClusterState),
|
|
zap.String("initial-cluster-token", sc.InitialClusterToken),
|
|
zap.Int64("quota-backend-bytes", quota),
|
|
zap.Uint("max-request-bytes", sc.MaxRequestBytes),
|
|
zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams),
|
|
|
|
zap.Bool("pre-vote", sc.PreVote),
|
|
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
|
|
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
|
|
zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled),
|
|
zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime),
|
|
zap.String("auto-compaction-mode", sc.AutoCompactionMode),
|
|
zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
|
|
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
|
|
zap.String("discovery-url", sc.DiscoveryURL),
|
|
zap.String("discovery-proxy", sc.DiscoveryProxy),
|
|
|
|
zap.String("discovery-token", sc.DiscoveryCfg.Token),
|
|
zap.String("discovery-endpoints", strings.Join(sc.DiscoveryCfg.Endpoints, ",")),
|
|
zap.String("discovery-dial-timeout", sc.DiscoveryCfg.DialTimeout.String()),
|
|
zap.String("discovery-request-timeout", sc.DiscoveryCfg.RequestTimeout.String()),
|
|
zap.String("discovery-keepalive-time", sc.DiscoveryCfg.KeepAliveTime.String()),
|
|
zap.String("discovery-keepalive-timeout", sc.DiscoveryCfg.KeepAliveTimeout.String()),
|
|
zap.Bool("discovery-insecure-transport", sc.DiscoveryCfg.Secure.InsecureTransport),
|
|
zap.Bool("discovery-insecure-skip-tls-verify", sc.DiscoveryCfg.Secure.InsecureSkipVerify),
|
|
zap.String("discovery-cert", sc.DiscoveryCfg.Secure.Cert),
|
|
zap.String("discovery-key", sc.DiscoveryCfg.Secure.Key),
|
|
zap.String("discovery-cacert", sc.DiscoveryCfg.Secure.Cacert),
|
|
zap.String("discovery-user", sc.DiscoveryCfg.Auth.Username),
|
|
|
|
zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
|
|
zap.Int("max-learners", sc.ExperimentalMaxLearners),
|
|
)
|
|
}
|
|
|
|
// Config returns the current configuration.
|
|
func (e *Etcd) Config() Config {
|
|
return e.cfg
|
|
}
|
|
|
|
// Close gracefully shuts down all servers/listeners.
|
|
// Client requests will be terminated with request timeout.
|
|
// After timeout, enforce remaning requests be closed immediately.
|
|
func (e *Etcd) Close() {
|
|
fields := []zap.Field{
|
|
zap.String("name", e.cfg.Name),
|
|
zap.String("data-dir", e.cfg.Dir),
|
|
zap.Strings("advertise-peer-urls", e.cfg.getAdvertisePeerUrls()),
|
|
zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()),
|
|
}
|
|
lg := e.GetLogger()
|
|
lg.Info("closing etcd server", fields...)
|
|
defer func() {
|
|
lg.Info("closed etcd server", fields...)
|
|
verify.MustVerifyIfEnabled(verify.Config{
|
|
Logger: lg,
|
|
DataDir: e.cfg.Dir,
|
|
ExactIndex: false,
|
|
})
|
|
lg.Sync()
|
|
}()
|
|
|
|
e.closeOnce.Do(func() {
|
|
close(e.stopc)
|
|
})
|
|
|
|
// close client requests with request timeout
|
|
timeout := 2 * time.Second
|
|
if e.Server != nil {
|
|
timeout = e.Server.Cfg.ReqTimeout()
|
|
}
|
|
for _, sctx := range e.sctxs {
|
|
for ss := range sctx.serversC {
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
stopServers(ctx, ss)
|
|
cancel()
|
|
}
|
|
}
|
|
|
|
for _, sctx := range e.sctxs {
|
|
sctx.cancel()
|
|
}
|
|
|
|
for i := range e.Clients {
|
|
if e.Clients[i] != nil {
|
|
e.Clients[i].Close()
|
|
}
|
|
}
|
|
|
|
for i := range e.metricsListeners {
|
|
e.metricsListeners[i].Close()
|
|
}
|
|
|
|
// shutdown tracing exporter
|
|
if e.tracingExporterShutdown != nil {
|
|
e.tracingExporterShutdown()
|
|
}
|
|
|
|
// close rafthttp transports
|
|
if e.Server != nil {
|
|
e.Server.Stop()
|
|
}
|
|
|
|
// close all idle connections in peer handler (wait up to 1-second)
|
|
for i := range e.Peers {
|
|
if e.Peers[i] != nil && e.Peers[i].close != nil {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
e.Peers[i].close(ctx)
|
|
cancel()
|
|
}
|
|
}
|
|
if e.errc != nil {
|
|
close(e.errc)
|
|
}
|
|
}
|
|
|
|
func stopServers(ctx context.Context, ss *servers) {
|
|
// first, close the http.Server
|
|
if ss.http != nil {
|
|
ss.http.Shutdown(ctx)
|
|
}
|
|
if ss.grpc == nil {
|
|
return
|
|
}
|
|
// do not grpc.Server.GracefulStop when grpc runs under http server
|
|
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
|
|
// and https://github.com/etcd-io/etcd/issues/8916
|
|
if ss.secure && ss.http != nil {
|
|
ss.grpc.Stop()
|
|
return
|
|
}
|
|
|
|
ch := make(chan struct{})
|
|
go func() {
|
|
defer close(ch)
|
|
// close listeners to stop accepting new connections,
|
|
// will block on any existing transports
|
|
ss.grpc.GracefulStop()
|
|
}()
|
|
|
|
// wait until all pending RPCs are finished
|
|
select {
|
|
case <-ch:
|
|
case <-ctx.Done():
|
|
// took too long, manually close open transports
|
|
// e.g. watch streams
|
|
ss.grpc.Stop()
|
|
|
|
// concurrent GracefulStop should be interrupted
|
|
<-ch
|
|
}
|
|
}
|
|
|
|
// Err - return channel used to report errors during etcd run/shutdown.
|
|
// Since etcd 3.5 the channel is being closed when the etcd is over.
|
|
func (e *Etcd) Err() <-chan error {
|
|
return e.errc
|
|
}
|
|
|
|
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
|
|
if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
|
|
return nil, err
|
|
}
|
|
if err = cfg.PeerSelfCert(); err != nil {
|
|
cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err))
|
|
}
|
|
updateMinMaxVersions(&cfg.PeerTLSInfo, cfg.TlsMinVersion, cfg.TlsMaxVersion)
|
|
if !cfg.PeerTLSInfo.Empty() {
|
|
cfg.logger.Info(
|
|
"starting with peer TLS",
|
|
zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)),
|
|
zap.Strings("cipher-suites", cfg.CipherSuites),
|
|
)
|
|
}
|
|
|
|
peers = make([]*peerListener, len(cfg.ListenPeerUrls))
|
|
defer func() {
|
|
if err == nil {
|
|
return
|
|
}
|
|
for i := range peers {
|
|
if peers[i] != nil && peers[i].close != nil {
|
|
cfg.logger.Warn(
|
|
"closing peer listener",
|
|
zap.String("address", cfg.ListenPeerUrls[i].String()),
|
|
zap.Error(err),
|
|
)
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
peers[i].close(ctx)
|
|
cancel()
|
|
}
|
|
}
|
|
}()
|
|
|
|
for i, u := range cfg.ListenPeerUrls {
|
|
if u.Scheme == "http" {
|
|
if !cfg.PeerTLSInfo.Empty() {
|
|
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String()))
|
|
}
|
|
if cfg.PeerTLSInfo.ClientCertAuth {
|
|
cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String()))
|
|
}
|
|
}
|
|
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
|
|
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
|
|
transport.WithTLSInfo(&cfg.PeerTLSInfo),
|
|
transport.WithSocketOpts(&cfg.SocketOpts),
|
|
transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// once serve, overwrite with 'http.Server.Shutdown'
|
|
peers[i].close = func(context.Context) error {
|
|
return peers[i].Listener.Close()
|
|
}
|
|
}
|
|
return peers, nil
|
|
}
|
|
|
|
// configure peer handlers after rafthttp.Transport started
|
|
func (e *Etcd) servePeers() (err error) {
|
|
ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
|
|
|
|
for _, p := range e.Peers {
|
|
u := p.Listener.Addr().String()
|
|
m := cmux.New(p.Listener)
|
|
srv := &http.Server{
|
|
Handler: ph,
|
|
ReadTimeout: 5 * time.Minute,
|
|
ErrorLog: defaultLog.New(io.Discard, "", 0), // do not log user error
|
|
}
|
|
go srv.Serve(m.Match(cmux.Any()))
|
|
p.serve = func() error {
|
|
e.cfg.logger.Info(
|
|
"cmux::serve",
|
|
zap.String("address", u),
|
|
)
|
|
return m.Serve()
|
|
}
|
|
p.close = func(ctx context.Context) error {
|
|
// gracefully shutdown http.Server
|
|
// close open listeners, idle connections
|
|
// until context cancel or time-out
|
|
e.cfg.logger.Info(
|
|
"stopping serving peer traffic",
|
|
zap.String("address", u),
|
|
)
|
|
srv.Shutdown(ctx)
|
|
e.cfg.logger.Info(
|
|
"stopped serving peer traffic",
|
|
zap.String("address", u),
|
|
)
|
|
m.Close()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// start peer servers in a goroutine
|
|
for _, pl := range e.Peers {
|
|
go func(l *peerListener) {
|
|
u := l.Addr().String()
|
|
e.cfg.logger.Info(
|
|
"serving peer traffic",
|
|
zap.String("address", u),
|
|
)
|
|
e.errHandler(l.serve())
|
|
}(pl)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
|
|
if err = updateCipherSuites(&cfg.ClientTLSInfo, cfg.CipherSuites); err != nil {
|
|
return nil, err
|
|
}
|
|
if err = cfg.ClientSelfCert(); err != nil {
|
|
cfg.logger.Fatal("failed to get client self-signed certs", zap.Error(err))
|
|
}
|
|
updateMinMaxVersions(&cfg.ClientTLSInfo, cfg.TlsMinVersion, cfg.TlsMaxVersion)
|
|
if cfg.EnablePprof {
|
|
cfg.logger.Info("pprof is enabled", zap.String("path", debugutil.HTTPPrefixPProf))
|
|
}
|
|
|
|
sctxs = make(map[string]*serveCtx)
|
|
for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) {
|
|
if u.Scheme == "http" || u.Scheme == "unix" {
|
|
if !cfg.ClientTLSInfo.Empty() {
|
|
cfg.logger.Warn("scheme is http or unix while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String()))
|
|
}
|
|
if cfg.ClientTLSInfo.ClientCertAuth {
|
|
cfg.logger.Warn("scheme is http or unix while --client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("client-url", u.String()))
|
|
}
|
|
}
|
|
if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
|
|
return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String())
|
|
}
|
|
}
|
|
|
|
for _, u := range cfg.ListenClientUrls {
|
|
addr, secure, network := resolveUrl(u)
|
|
sctx := sctxs[addr]
|
|
if sctx == nil {
|
|
sctx = newServeCtx(cfg.logger)
|
|
sctxs[addr] = sctx
|
|
}
|
|
sctx.secure = sctx.secure || secure
|
|
sctx.insecure = sctx.insecure || !secure
|
|
sctx.scheme = u.Scheme
|
|
sctx.addr = addr
|
|
sctx.network = network
|
|
}
|
|
for _, u := range cfg.ListenClientHttpUrls {
|
|
addr, secure, network := resolveUrl(u)
|
|
|
|
sctx := sctxs[addr]
|
|
if sctx == nil {
|
|
sctx = newServeCtx(cfg.logger)
|
|
sctxs[addr] = sctx
|
|
} else if !sctx.httpOnly {
|
|
return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String())
|
|
}
|
|
sctx.secure = sctx.secure || secure
|
|
sctx.insecure = sctx.insecure || !secure
|
|
sctx.scheme = u.Scheme
|
|
sctx.addr = addr
|
|
sctx.network = network
|
|
sctx.httpOnly = true
|
|
}
|
|
|
|
for _, sctx := range sctxs {
|
|
if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme,
|
|
transport.WithSocketOpts(&cfg.SocketOpts),
|
|
transport.WithSkipTLSInfoCheck(true),
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
|
|
// hosts that disable ipv6. So, use the address given by the user.
|
|
|
|
if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
|
|
if fdLimit <= reservedInternalFDNum {
|
|
cfg.logger.Fatal(
|
|
"file descriptor limit of etcd process is too low; please set higher",
|
|
zap.Uint64("limit", fdLimit),
|
|
zap.Int("recommended-limit", reservedInternalFDNum),
|
|
)
|
|
}
|
|
sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
|
|
}
|
|
|
|
defer func(sctx *serveCtx) {
|
|
if err == nil || sctx.l == nil {
|
|
return
|
|
}
|
|
sctx.l.Close()
|
|
cfg.logger.Warn(
|
|
"closing peer listener",
|
|
zap.String("address", sctx.addr),
|
|
zap.Error(err),
|
|
)
|
|
}(sctx)
|
|
for k := range cfg.UserHandlers {
|
|
sctx.userHandlers[k] = cfg.UserHandlers[k]
|
|
}
|
|
sctx.serviceRegister = cfg.ServiceRegister
|
|
if cfg.EnablePprof || cfg.LogLevel == "debug" {
|
|
sctx.registerPprof()
|
|
}
|
|
if cfg.LogLevel == "debug" {
|
|
sctx.registerTrace()
|
|
}
|
|
}
|
|
return sctxs, nil
|
|
}
|
|
|
|
func resolveUrl(u url.URL) (addr string, secure bool, network string) {
|
|
addr = u.Host
|
|
network = "tcp"
|
|
if u.Scheme == "unix" || u.Scheme == "unixs" {
|
|
addr = u.Host + u.Path
|
|
network = "unix"
|
|
}
|
|
secure = u.Scheme == "https" || u.Scheme == "unixs"
|
|
return addr, secure, network
|
|
}
|
|
|
|
func (e *Etcd) serveClients() (err error) {
|
|
if !e.cfg.ClientTLSInfo.Empty() {
|
|
e.cfg.logger.Info(
|
|
"starting with client TLS",
|
|
zap.String("tls-info", fmt.Sprintf("%+v", e.cfg.ClientTLSInfo)),
|
|
zap.Strings("cipher-suites", e.cfg.CipherSuites),
|
|
)
|
|
}
|
|
|
|
// Start a client server goroutine for each listen address
|
|
mux := http.NewServeMux()
|
|
etcdhttp.HandleDebug(mux)
|
|
etcdhttp.HandleVersion(mux, e.Server)
|
|
etcdhttp.HandleMetrics(mux)
|
|
etcdhttp.HandleHealth(e.cfg.logger, mux, e.Server)
|
|
|
|
var gopts []grpc.ServerOption
|
|
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
|
|
gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
|
MinTime: e.cfg.GRPCKeepAliveMinTime,
|
|
PermitWithoutStream: false,
|
|
}))
|
|
}
|
|
if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
|
|
e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
|
|
gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
|
|
Time: e.cfg.GRPCKeepAliveInterval,
|
|
Timeout: e.cfg.GRPCKeepAliveTimeout,
|
|
}))
|
|
}
|
|
|
|
splitHttp := false
|
|
for _, sctx := range e.sctxs {
|
|
if sctx.httpOnly {
|
|
splitHttp = true
|
|
}
|
|
}
|
|
|
|
// start client servers in each goroutine
|
|
for _, sctx := range e.sctxs {
|
|
go func(s *serveCtx) {
|
|
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...))
|
|
}(sctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
|
|
if !e.cfg.EnableGRPCGateway {
|
|
return nil
|
|
}
|
|
sctx := e.pickGrpcGatewayServeContext(splitHttp)
|
|
addr := sctx.addr
|
|
if network := sctx.network; network == "unix" {
|
|
// explicitly define unix network for gRPC socket support
|
|
addr = fmt.Sprintf("%s:%s", network, addr)
|
|
}
|
|
opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))}
|
|
if sctx.secure {
|
|
tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig()
|
|
if tlsErr != nil {
|
|
return func(ctx context.Context) (*grpc.ClientConn, error) {
|
|
return nil, tlsErr
|
|
}
|
|
}
|
|
dtls := tlscfg.Clone()
|
|
// trust local server
|
|
dtls.InsecureSkipVerify = true
|
|
bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
|
|
opts = append(opts, grpc.WithTransportCredentials(bundle.TransportCredentials()))
|
|
} else {
|
|
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
}
|
|
|
|
return func(ctx context.Context) (*grpc.ClientConn, error) {
|
|
conn, err := grpc.DialContext(ctx, addr, opts...)
|
|
if err != nil {
|
|
sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err))
|
|
return nil, err
|
|
}
|
|
return conn, err
|
|
}
|
|
}
|
|
|
|
func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx {
|
|
for _, sctx := range e.sctxs {
|
|
if !splitHttp || !sctx.httpOnly {
|
|
return sctx
|
|
}
|
|
}
|
|
panic("Expect at least one context able to serve grpc")
|
|
}
|
|
|
|
func (e *Etcd) serveMetrics() (err error) {
|
|
if e.cfg.Metrics == "extensive" {
|
|
grpc_prometheus.EnableHandlingTimeHistogram()
|
|
}
|
|
|
|
if len(e.cfg.ListenMetricsUrls) > 0 {
|
|
metricsMux := http.NewServeMux()
|
|
etcdhttp.HandleMetrics(metricsMux)
|
|
etcdhttp.HandleHealth(e.cfg.logger, metricsMux, e.Server)
|
|
|
|
for _, murl := range e.cfg.ListenMetricsUrls {
|
|
tlsInfo := &e.cfg.ClientTLSInfo
|
|
if murl.Scheme == "http" {
|
|
tlsInfo = nil
|
|
}
|
|
ml, err := transport.NewListenerWithOpts(murl.Host, murl.Scheme,
|
|
transport.WithTLSInfo(tlsInfo),
|
|
transport.WithSocketOpts(&e.cfg.SocketOpts),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.metricsListeners = append(e.metricsListeners, ml)
|
|
go func(u url.URL, ln net.Listener) {
|
|
e.cfg.logger.Info(
|
|
"serving metrics",
|
|
zap.String("address", u.String()),
|
|
)
|
|
e.errHandler(http.Serve(ln, metricsMux))
|
|
}(murl, ml)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Etcd) errHandler(err error) {
|
|
if err != nil {
|
|
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
|
|
}
|
|
select {
|
|
case <-e.stopc:
|
|
return
|
|
default:
|
|
}
|
|
select {
|
|
case <-e.stopc:
|
|
case e.errc <- err:
|
|
}
|
|
}
|
|
|
|
// GetLogger returns the logger.
|
|
func (e *Etcd) GetLogger() *zap.Logger {
|
|
e.cfg.loggerMu.RLock()
|
|
l := e.cfg.logger
|
|
e.cfg.loggerMu.RUnlock()
|
|
return l
|
|
}
|
|
|
|
func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
|
|
h, err := strconv.Atoi(retention)
|
|
if err == nil && h >= 0 {
|
|
switch mode {
|
|
case CompactorModeRevision:
|
|
ret = time.Duration(int64(h))
|
|
case CompactorModePeriodic:
|
|
ret = time.Duration(int64(h)) * time.Hour
|
|
case CompactorModeRevisionThreshold:
|
|
ret = time.Duration(int64(h))
|
|
case "":
|
|
return 0, errors.New("--auto-compaction-mode is undefined")
|
|
}
|
|
} else {
|
|
// periodic compaction
|
|
ret, err = time.ParseDuration(retention)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("error parsing CompactionRetention: %v", err)
|
|
}
|
|
}
|
|
return ret, nil
|
|
}
|