gRPC health server sets serving status to NOT_SERVING on defrag

gRPC health server sets serving status to NOT_SERVING on defrag
Backport from 3.6 in #16278

Co-authored-by: Chao Chen <chaochn@amazon.com>
Signed-off-by: Thomas Jungblut <tjungblu@redhat.com>
This commit is contained in:
Thomas Jungblut 2024-04-30 14:24:48 +02:00
parent 31a87cff37
commit 750bc0b1e4
12 changed files with 262 additions and 8 deletions

View File

@ -187,6 +187,9 @@ type ServerConfig struct {
// a shared buffer in its readonly check operations. // a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"` ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`
// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`
// ExperimentalBootstrapDefragThresholdMegabytes is the minimum number of megabytes needed to be freed for etcd server to // ExperimentalBootstrapDefragThresholdMegabytes is the minimum number of megabytes needed to be freed for etcd server to
// consider running defrag during bootstrap. Needs to be set to non-zero value to take effect. // consider running defrag during bootstrap. Needs to be set to non-zero value to take effect.
ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"` ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"`

View File

@ -414,6 +414,9 @@ type Config struct {
// ExperimentalTxnModeWriteWithSharedBuffer enables write transaction to use a shared buffer in its readonly check operations. // ExperimentalTxnModeWriteWithSharedBuffer enables write transaction to use a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"` ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`
// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`
// V2Deprecation describes phase of API & Storage V2 support // V2Deprecation describes phase of API & Storage V2 support
V2Deprecation config.V2DeprecationEnum `json:"v2-deprecation"` V2Deprecation config.V2DeprecationEnum `json:"v2-deprecation"`
} }
@ -515,6 +518,7 @@ func NewConfig() *Config {
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime, ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
ExperimentalMemoryMlock: false, ExperimentalMemoryMlock: false,
ExperimentalTxnModeWriteWithSharedBuffer: true, ExperimentalTxnModeWriteWithSharedBuffer: true,
ExperimentalStopGRPCServiceOnDefrag: false,
ExperimentalCompactHashCheckEnabled: false, ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute, ExperimentalCompactHashCheckTime: time.Minute,

View File

@ -222,6 +222,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock, ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer, ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
ExperimentalStopGRPCServiceOnDefrag: cfg.ExperimentalStopGRPCServiceOnDefrag,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes, ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
V2Deprecation: cfg.V2DeprecationEffective(), V2Deprecation: cfg.V2DeprecationEffective(),
} }

View File

@ -301,6 +301,7 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.") fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.") fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.") fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
fs.BoolVar(&cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "experimental-stop-grpc-service-on-defrag", cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "Enable etcd gRPC service to stop serving client requests on defragmentation.")
fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.") fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "(WARNING: Use this flag with caution!) Number of entries for a slow follower to catch up after compacting the raft storage entries.") fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "(WARNING: Use this flag with caution!) Number of entries for a slow follower to catch up after compacting the raft storage entries.")

View File

@ -282,6 +282,8 @@ Experimental feature:
Enable the write transaction to use a shared buffer in its readonly check operations. Enable the write transaction to use a shared buffer in its readonly check operations.
--experimental-bootstrap-defrag-threshold-megabytes --experimental-bootstrap-defrag-threshold-megabytes
Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect. Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.
--experimental-stop-grpc-service-on-defrag
Enable etcd gRPC service to stop serving client requests on defragmentation.
Unsafe feature: Unsafe feature:
--force-new-cluster 'false' --force-new-cluster 'false'

View File

@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s)) wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)} c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s)) mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, nil))
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c) c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)
clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s)) clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))

View File

@ -76,13 +76,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s)) pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s)) pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
// server should register all the services manually
// use empty service name for all etcd services' health status,
// see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
hsrv := health.NewServer() hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, NewHealthNotifier(hsrv, s.Logger())))
healthpb.RegisterHealthServer(grpcServer, hsrv) healthpb.RegisterHealthServer(grpcServer, hsrv)
// set zero values for metrics registered for this grpc server // set zero values for metrics registered for this grpc server

View File

@ -0,0 +1,68 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v3rpc
import (
"go.uber.org/zap"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
const (
allGRPCServices = ""
)
type HealthNotifier interface {
StartServe()
StopServe(reason string)
}
func NewHealthNotifier(hs *health.Server, lg *zap.Logger) HealthNotifier {
if hs == nil {
panic("unexpected nil gRPC health server")
}
if lg == nil {
lg = zap.NewNop()
}
hc := &healthChecker{hs: hs, lg: lg}
// set grpc health server as serving status blindly since
// the grpc server will serve iff s.ReadyNotify() is closed.
hc.StartServe()
return hc
}
type healthChecker struct {
hs *health.Server
lg *zap.Logger
}
func (hc *healthChecker) StartServe() {
hc.lg.Info(
"grpc service status changed",
zap.String("service", allGRPCServices),
zap.String("status", healthpb.HealthCheckResponse_SERVING.String()),
)
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING)
}
func (hc *healthChecker) StopServe(reason string) {
hc.lg.Warn(
"grpc service status changed",
zap.String("service", allGRPCServices),
zap.String("status", healthpb.HealthCheckResponse_NOT_SERVING.String()),
zap.String("reason", reason),
)
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_NOT_SERVING)
}

View File

@ -76,10 +76,13 @@ type maintenanceServer struct {
hdr header hdr header
cs ClusterStatusGetter cs ClusterStatusGetter
d Downgrader d Downgrader
hn HealthNotifier
stopServingOnDefrag bool
} }
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s} srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, hn: hn, stopServingOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
if srv.lg == nil { if srv.lg == nil {
srv.lg = zap.NewNop() srv.lg = zap.NewNop()
} }
@ -88,6 +91,10 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment") ms.lg.Info("starting defragment")
if ms.stopServingOnDefrag {
ms.hn.StopServe("defrag is active")
defer ms.hn.StartServe()
}
err := ms.bg.Backend().Defrag() err := ms.bg.Backend().Defrag()
if err != nil { if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err)) ms.lg.Warn("failed to defragment", zap.Error(err))

157
tests/e2e/failover_test.go Normal file
View File

@ -0,0 +1,157 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !cluster_proxy
package e2e
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
_ "google.golang.org/grpc/health"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
const (
// in sync with how kubernetes uses etcd
// https://github.com/kubernetes/kubernetes/blob/release-1.28/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L59-L71
keepaliveTime = 30 * time.Second
keepaliveTimeout = 10 * time.Second
dialTimeout = 20 * time.Second
clientRuntime = 10 * time.Second
requestTimeout = 100 * time.Millisecond
)
func TestFailoverOnDefrag(t *testing.T) {
tcs := []struct {
name string
experimentalStopGRPCServiceOnDefragEnabled bool
gRPCDialOptions []grpc.DialOption
// common assertion
expectedMinTotalRequestsCount int
// happy case assertion
expectedMaxFailedRequestsCount int
// negative case assertion
expectedMinFailedRequestsCount int
}{
{
name: "defrag failover happy case",
experimentalStopGRPCServiceOnDefragEnabled: true,
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMaxFailedRequestsCount: 5,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false",
experimentalStopGRPCServiceOnDefragEnabled: false,
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
experimentalStopGRPCServiceOnDefragEnabled: true,
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
e2e.BeforeTest(t)
cfg := e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
GoFailEnabled: true,
ExperimentalStopGRPCServiceOnDefrag: tc.experimentalStopGRPCServiceOnDefragEnabled,
}
clus, err := e2e.NewEtcdProcessCluster(t, &cfg)
require.NoError(t, err)
t.Cleanup(func() { clus.Stop() })
endpoints := clus.EndpointsGRPC()
requestVolume, successfulRequestCount := 0, 0
g := new(errgroup.Group)
g.Go(func() (lastErr error) {
clusterClient, cerr := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
Endpoints: endpoints,
DialOptions: tc.gRPCDialOptions,
})
if cerr != nil {
return cerr
}
defer clusterClient.Close()
timeout := time.After(clientRuntime)
for {
select {
case <-timeout:
return lastErr
default:
}
getContext, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := clusterClient.Get(getContext, "health")
cancel()
requestVolume++
if err != nil {
lastErr = err
continue
}
successfulRequestCount++
}
})
triggerDefrag(t, clus.Procs[0])
err = g.Wait()
if err != nil {
t.Logf("etcd client failed to fail over, error (%v)", err)
}
t.Logf("request failure rate is %.2f%%, traffic volume successfulRequestCount %d requests, total %d requests", (1-float64(successfulRequestCount)/float64(requestVolume))*100, successfulRequestCount, requestVolume)
require.GreaterOrEqual(t, requestVolume, tc.expectedMinTotalRequestsCount)
failedRequestCount := requestVolume - successfulRequestCount
if tc.expectedMaxFailedRequestsCount != 0 {
require.LessOrEqual(t, failedRequestCount, tc.expectedMaxFailedRequestsCount)
}
if tc.expectedMinFailedRequestsCount != 0 {
require.GreaterOrEqual(t, failedRequestCount, tc.expectedMinFailedRequestsCount)
}
})
}
}
func triggerDefrag(t *testing.T, member e2e.EtcdProcess) {
require.NoError(t, member.Failpoints().SetupHTTP(context.Background(), "defragBeforeCopy", `sleep("10s")`))
require.NoError(t, member.Etcdctl(e2e.ClientNonTLS, false, false).Defragment(time.Minute))
}

View File

@ -191,6 +191,8 @@ type EtcdProcessClusterConfig struct {
CompactHashCheckTime time.Duration CompactHashCheckTime time.Duration
WatchProcessNotifyInterval time.Duration WatchProcessNotifyInterval time.Duration
CompactionBatchLimit int CompactionBatchLimit int
ExperimentalStopGRPCServiceOnDefrag bool
} }
// NewEtcdProcessCluster launches a new cluster from etcd processes, returning // NewEtcdProcessCluster launches a new cluster from etcd processes, returning
@ -329,6 +331,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.InitialCorruptCheck { if cfg.InitialCorruptCheck {
args = append(args, "--experimental-initial-corrupt-check") args = append(args, "--experimental-initial-corrupt-check")
} }
if cfg.ExperimentalStopGRPCServiceOnDefrag {
args = append(args, "--experimental-stop-grpc-service-on-defrag")
}
var murl string var murl string
if cfg.MetricsURLScheme != "" { if cfg.MetricsURLScheme != "" {
murl = (&url.URL{ murl = (&url.URL{

View File

@ -18,6 +18,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
"time"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration" "go.etcd.io/etcd/tests/v3/integration"
@ -166,6 +167,15 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
return nil, SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev)) return nil, SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
} }
func (ctl *Etcdctl) Defragment(timeout time.Duration) error {
args := append(ctl.cmdArgs(), "defrag")
if timeout != 0 {
args = append(args, fmt.Sprintf("--command-timeout=%s", timeout))
}
return SpawnWithExpectWithEnv(args, ctl.env(), "Finished defragmenting etcd member")
}
func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) { func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
var epStatus []*struct { var epStatus []*struct {
Endpoint string Endpoint string