Merge pull request #12114 from tangcong/grpcproxy-healthcheck

*: add health handler for grpcproxy self
This commit is contained in:
Gyuho Lee 2020-07-10 10:44:59 -07:00 committed by GitHub
commit 07461ecc8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 2 deletions

View File

@ -184,6 +184,8 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Fix [`panic on error`](https://github.com/etcd-io/etcd/pull/11694) for metrics handler.
- Add [gRPC keepalive related flags](https://github.com/etcd-io/etcd/pull/11711) `grpc-keepalive-min-time`, `grpc-keepalive-interval` and `grpc-keepalive-timeout`.
- [Fix grpc watch proxy hangs when failed to cancel a watcher](https://github.com/etcd-io/etcd/pull/12030) .
- Add [metrics handler for grpcproxy self](https://github.com/etcd-io/etcd/pull/12107).
- Add [health handler for grpcproxy self](https://github.com/etcd-io/etcd/pull/12114).
### Auth

View File

@ -202,9 +202,10 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
}()
client := mustNewClient(lg)
proxyClient := mustNewProxyClient(lg, tlsinfo)
httpClient := mustNewHTTPClient(lg)
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client)
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
errc := make(chan error)
go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
go func() { errc <- srvhttp.Serve(httpl) }()
@ -216,6 +217,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints())
grpcproxy.HandleHealth(lg, mux, client)
grpcproxy.HandleProxyMetrics(mux)
grpcproxy.HandleProxyHealth(lg, mux, proxyClient)
lg.Info("gRPC proxy server metrics URL serving")
herr := http.Serve(mhttpl, mux)
if herr != nil {
@ -273,6 +275,37 @@ func mustNewClient(lg *zap.Logger) *clientv3.Client {
return client
}
func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client {
eps := []string{grpcProxyAdvertiseClientURL}
cfg, err := newProxyClientCfg(lg, eps, tls)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
client, err := clientv3.New(*cfg)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
lg.Info("create proxy client", zap.String("grpcProxyAdvertiseClientURL", grpcProxyAdvertiseClientURL))
return client
}
func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*clientv3.Config, error) {
cfg := clientv3.Config{
Endpoints: eps,
DialTimeout: 5 * time.Second,
}
if tls != nil {
clientTLS, err := tls.ClientConfig()
if err != nil {
return nil, err
}
cfg.TLS = clientTLS
}
return &cfg, nil
}
func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) {
// set tls if any one tls option set
cfg := clientv3.Config{
@ -405,13 +438,14 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
return server
}
func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) {
func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client, proxy *clientv3.Client) (*http.Server, net.Listener) {
httpClient := mustNewHTTPClient(lg)
httpmux := http.NewServeMux()
httpmux.HandleFunc("/", http.NotFound)
grpcproxy.HandleMetrics(httpmux, httpClient, c.Endpoints())
grpcproxy.HandleHealth(lg, httpmux, c)
grpcproxy.HandleProxyMetrics(httpmux)
grpcproxy.HandleProxyHealth(lg, httpmux, proxy)
if grpcProxyEnablePprof {
for p, h := range debugutil.PProfHandlers() {
httpmux.Handle(p, h)

View File

@ -32,6 +32,7 @@ const (
PathMetrics = "/metrics"
PathHealth = "/health"
PathProxyMetrics = "/proxy/metrics"
PathProxyHealth = "/proxy/health"
)
// HandleMetricsHealth registers metrics and health handlers.

View File

@ -16,6 +16,7 @@ package grpcproxy
import (
"context"
"fmt"
"net/http"
"time"
@ -33,6 +34,14 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func() etcdhttp.Health { return checkHealth(c) }))
}
// HandleProxyHealth registers health handler on '/proxy/health'.
func HandleProxyHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
if lg == nil {
lg = zap.NewNop()
}
mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func() etcdhttp.Health { return checkProxyHealth(c) }))
}
func checkHealth(c *clientv3.Client) etcdhttp.Health {
h := etcdhttp.Health{Health: "false"}
ctx, cancel := context.WithTimeout(c.Ctx(), time.Second)
@ -40,6 +49,25 @@ func checkHealth(c *clientv3.Client) etcdhttp.Health {
cancel()
if err == nil || err == rpctypes.ErrPermissionDenied {
h.Health = "true"
} else {
h.Reason = fmt.Sprintf("GET ERROR:%s", err)
}
return h
}
func checkProxyHealth(c *clientv3.Client) etcdhttp.Health {
h := checkHealth(c)
if h.Health != "true" {
return h
}
ctx, cancel := context.WithTimeout(c.Ctx(), time.Second*3)
ch := c.Watch(ctx, "a", clientv3.WithCreatedNotify())
select {
case <-ch:
case <-ctx.Done():
h.Health = "false"
h.Reason = "WATCH TIMEOUT"
}
cancel()
return h
}