v3rpc: run health notifier to listen on online defrag state change

Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
Chao Chen 2023-10-26 13:11:45 -07:00
parent 5fad87c2ab
commit 8a6c1335e2
4 changed files with 48 additions and 37 deletions

View File

@ -77,8 +77,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
hsrv := health.NewServer() hsrv := health.NewServer()
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, NewHealthNotifier(hsrv, s.Logger()))) healthNotifier := newHealthNotifier(hsrv, s)
healthpb.RegisterHealthServer(grpcServer, hsrv) healthpb.RegisterHealthServer(grpcServer, hsrv)
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, healthNotifier))
// set zero values for metrics registered for this grpc server // set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer) grpc_prometheus.Register(grpcServer)

View File

@ -18,37 +18,47 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc/health" "google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1" healthpb "google.golang.org/grpc/health/grpc_health_v1"
"go.etcd.io/etcd/server/v3/etcdserver"
) )
const ( const (
allGRPCServices = "" allGRPCServices = ""
) )
type HealthNotifier interface { type notifier interface {
StartServe() defragStarted()
StopServe(reason string) defragFinished()
} }
func NewHealthNotifier(hs *health.Server, lg *zap.Logger) HealthNotifier { func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier {
if hs == nil { if hs == nil {
panic("unexpected nil gRPC health server") panic("unexpected nil gRPC health server")
} }
if lg == nil { hc := &healthNotifier{hs: hs, lg: s.Logger(), stopGRPCServiceOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
lg = zap.NewNop()
}
hc := &healthChecker{hs: hs, lg: lg}
// set grpc health server as serving status blindly since // set grpc health server as serving status blindly since
// the grpc server will serve iff s.ReadyNotify() is closed. // the grpc server will serve iff s.ReadyNotify() is closed.
hc.StartServe() hc.startServe()
return hc return hc
} }
type healthChecker struct { type healthNotifier struct {
hs *health.Server hs *health.Server
lg *zap.Logger lg *zap.Logger
stopGRPCServiceOnDefrag bool
} }
func (hc *healthChecker) StartServe() { func (hc *healthNotifier) defragStarted() {
if !hc.stopGRPCServiceOnDefrag {
return
}
hc.stopServe("defrag is active")
}
func (hc *healthNotifier) defragFinished() { hc.startServe() }
func (hc *healthNotifier) startServe() {
hc.lg.Info( hc.lg.Info(
"grpc service status changed", "grpc service status changed",
zap.String("service", allGRPCServices), zap.String("service", allGRPCServices),
@ -57,7 +67,7 @@ func (hc *healthChecker) StartServe() {
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING) hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING)
} }
func (hc *healthChecker) StopServe(reason string) { func (hc *healthNotifier) stopServe(reason string) {
hc.lg.Warn( hc.lg.Warn(
"grpc service status changed", "grpc service status changed",
zap.String("service", allGRPCServices), zap.String("service", allGRPCServices),

View File

@ -74,13 +74,12 @@ type maintenanceServer struct {
cs ClusterStatusGetter cs ClusterStatusGetter
d Downgrader d Downgrader
vs serverversion.Server vs serverversion.Server
hn HealthNotifier
stopServingOnDefrag bool healthNotifier notifier
} }
func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.MaintenanceServer { func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier notifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hn: hn, stopServingOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag} srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), healthNotifier: healthNotifier}
if srv.lg == nil { if srv.lg == nil {
srv.lg = zap.NewNop() srv.lg = zap.NewNop()
} }
@ -89,10 +88,8 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.Mainte
func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment") ms.lg.Info("starting defragment")
if ms.stopServingOnDefrag { ms.healthNotifier.defragStarted()
ms.hn.StopServe("defrag is active") defer ms.healthNotifier.defragFinished()
defer ms.hn.StartServe()
}
err := ms.bg.Backend().Defrag() err := ms.bg.Backend().Defrag()
if err != nil { if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err)) ms.lg.Warn("failed to defragment", zap.Error(err))

View File

@ -49,11 +49,11 @@ func TestFailoverOnDefrag(t *testing.T) {
gRPCDialOptions []grpc.DialOption gRPCDialOptions []grpc.DialOption
// common assertion // common assertion
expectedMinTotalRequestsCount int expectedMinQPS float64
// happy case assertion // happy case assertion
expectedMaxFailedRequestsCount int expectedMaxFailureRate float64
// negative case assertion // negative case assertion
expectedMinFailedRequestsCount int expectedMinFailureRate float64
}{ }{
{ {
name: "defrag failover happy case", name: "defrag failover happy case",
@ -66,8 +66,8 @@ func TestFailoverOnDefrag(t *testing.T) {
grpc.WithDisableServiceConfig(), grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
}, },
expectedMinTotalRequestsCount: 300, expectedMinQPS: 20,
expectedMaxFailedRequestsCount: 5, expectedMaxFailureRate: 0.01,
}, },
{ {
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false", name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false",
@ -80,8 +80,8 @@ func TestFailoverOnDefrag(t *testing.T) {
grpc.WithDisableServiceConfig(), grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
}, },
expectedMinTotalRequestsCount: 300, expectedMinQPS: 20,
expectedMinFailedRequestsCount: 90, expectedMinFailureRate: 0.25,
}, },
{ {
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled", name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
@ -90,8 +90,8 @@ func TestFailoverOnDefrag(t *testing.T) {
e2e.WithExperimentalStopGRPCServiceOnDefrag(true), e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
e2e.WithGoFailEnabled(true), e2e.WithGoFailEnabled(true),
}, },
expectedMinTotalRequestsCount: 300, expectedMinQPS: 20,
expectedMinFailedRequestsCount: 90, expectedMinFailureRate: 0.25,
}, },
} }
@ -105,6 +105,7 @@ func TestFailoverOnDefrag(t *testing.T) {
endpoints := clus.EndpointsGRPC() endpoints := clus.EndpointsGRPC()
requestVolume, successfulRequestCount := 0, 0 requestVolume, successfulRequestCount := 0, 0
start := time.Now()
g := new(errgroup.Group) g := new(errgroup.Group)
g.Go(func() (lastErr error) { g.Go(func() (lastErr error) {
clusterClient, cerr := clientv3.New(clientv3.Config{ clusterClient, cerr := clientv3.New(clientv3.Config{
@ -143,15 +144,17 @@ func TestFailoverOnDefrag(t *testing.T) {
if err != nil { if err != nil {
t.Logf("etcd client failed to fail over, error (%v)", err) t.Logf("etcd client failed to fail over, error (%v)", err)
} }
t.Logf("request failure rate is %.2f%%, traffic volume successfulRequestCount %d requests, total %d requests", (1-float64(successfulRequestCount)/float64(requestVolume))*100, successfulRequestCount, requestVolume)
require.GreaterOrEqual(t, requestVolume, tc.expectedMinTotalRequestsCount) qps := float64(requestVolume) / float64(time.Since(start)) * float64(time.Second)
failedRequestCount := requestVolume - successfulRequestCount failureRate := 1 - float64(successfulRequestCount)/float64(requestVolume)
if tc.expectedMaxFailedRequestsCount != 0 { t.Logf("request failure rate is %.2f%%, qps is %.2f requests/second", failureRate*100, qps)
require.LessOrEqual(t, failedRequestCount, tc.expectedMaxFailedRequestsCount)
require.GreaterOrEqual(t, qps, tc.expectedMinQPS)
if tc.expectedMaxFailureRate != 0.0 {
require.LessOrEqual(t, failureRate, tc.expectedMaxFailureRate)
} }
if tc.expectedMinFailedRequestsCount != 0 { if tc.expectedMinFailureRate != 0.0 {
require.GreaterOrEqual(t, failedRequestCount, tc.expectedMinFailedRequestsCount) require.GreaterOrEqual(t, failureRate, tc.expectedMinFailureRate)
} }
}) })
} }