mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: add health check for grpcproxy self
This commit is contained in:
parent
58bb8ae09f
commit
0898c5b978
@ -202,9 +202,10 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
}()
|
||||
|
||||
client := mustNewClient(lg)
|
||||
proxyClient := mustNewProxyClient(lg, tlsinfo)
|
||||
httpClient := mustNewHTTPClient(lg)
|
||||
|
||||
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client)
|
||||
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
|
||||
errc := make(chan error)
|
||||
go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
|
||||
go func() { errc <- srvhttp.Serve(httpl) }()
|
||||
@ -216,6 +217,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints())
|
||||
grpcproxy.HandleHealth(lg, mux, client)
|
||||
grpcproxy.HandleProxyMetrics(mux)
|
||||
grpcproxy.HandleProxyHealth(lg, mux, proxyClient)
|
||||
lg.Info("gRPC proxy server metrics URL serving")
|
||||
herr := http.Serve(mhttpl, mux)
|
||||
if herr != nil {
|
||||
@ -273,6 +275,37 @@ func mustNewClient(lg *zap.Logger) *clientv3.Client {
|
||||
return client
|
||||
}
|
||||
|
||||
func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client {
|
||||
eps := []string{grpcProxyAdvertiseClientURL}
|
||||
cfg, err := newProxyClientCfg(lg, eps, tls)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
client, err := clientv3.New(*cfg)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
lg.Info("create proxy client", zap.String("grpcProxyAdvertiseClientURL", grpcProxyAdvertiseClientURL))
|
||||
return client
|
||||
}
|
||||
|
||||
func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*clientv3.Config, error) {
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: eps,
|
||||
DialTimeout: 5 * time.Second,
|
||||
}
|
||||
if tls != nil {
|
||||
clientTLS, err := tls.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg.TLS = clientTLS
|
||||
}
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) {
|
||||
// set tls if any one tls option set
|
||||
cfg := clientv3.Config{
|
||||
@ -405,13 +438,14 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
|
||||
return server
|
||||
}
|
||||
|
||||
func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) {
|
||||
func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client, proxy *clientv3.Client) (*http.Server, net.Listener) {
|
||||
httpClient := mustNewHTTPClient(lg)
|
||||
httpmux := http.NewServeMux()
|
||||
httpmux.HandleFunc("/", http.NotFound)
|
||||
grpcproxy.HandleMetrics(httpmux, httpClient, c.Endpoints())
|
||||
grpcproxy.HandleHealth(lg, httpmux, c)
|
||||
grpcproxy.HandleProxyMetrics(httpmux)
|
||||
grpcproxy.HandleProxyHealth(lg, httpmux, proxy)
|
||||
if grpcProxyEnablePprof {
|
||||
for p, h := range debugutil.PProfHandlers() {
|
||||
httpmux.Handle(p, h)
|
||||
|
@ -32,6 +32,7 @@ const (
|
||||
PathMetrics = "/metrics"
|
||||
PathHealth = "/health"
|
||||
PathProxyMetrics = "/proxy/metrics"
|
||||
PathProxyHealth = "/proxy/health"
|
||||
)
|
||||
|
||||
// HandleMetricsHealth registers metrics and health handlers.
|
||||
|
@ -16,6 +16,7 @@ package grpcproxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
@ -33,6 +34,14 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
|
||||
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func() etcdhttp.Health { return checkHealth(c) }))
|
||||
}
|
||||
|
||||
// HandleProxyHealth registers health handler on '/proxy/health'.
|
||||
func HandleProxyHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func() etcdhttp.Health { return checkProxyHealth(c) }))
|
||||
}
|
||||
|
||||
func checkHealth(c *clientv3.Client) etcdhttp.Health {
|
||||
h := etcdhttp.Health{Health: "false"}
|
||||
ctx, cancel := context.WithTimeout(c.Ctx(), time.Second)
|
||||
@ -40,6 +49,25 @@ func checkHealth(c *clientv3.Client) etcdhttp.Health {
|
||||
cancel()
|
||||
if err == nil || err == rpctypes.ErrPermissionDenied {
|
||||
h.Health = "true"
|
||||
} else {
|
||||
h.Reason = fmt.Sprintf("GET ERROR:%s", err)
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
func checkProxyHealth(c *clientv3.Client) etcdhttp.Health {
|
||||
h := checkHealth(c)
|
||||
if h.Health != "true" {
|
||||
return h
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(c.Ctx(), time.Second*3)
|
||||
ch := c.Watch(ctx, "a", clientv3.WithCreatedNotify())
|
||||
select {
|
||||
case <-ch:
|
||||
case <-ctx.Done():
|
||||
h.Health = "false"
|
||||
h.Reason = "WATCH TIMEOUT"
|
||||
}
|
||||
cancel()
|
||||
return h
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user