etcdserver: add livez and ready http endpoints for etcd.

Add two separate probes, one for liveness and one for readiness. The liveness probe would check that the local individual node is up and running, or else restart the node, while the readiness probe would check that the cluster is ready to serve traffic. This would make etcd health-check fully Kubernetes API complient.

Signed-off-by: Siyuan Zhang <sizhang@google.com>
This commit is contained in:
Siyuan Zhang 2023-09-25 15:44:19 -07:00
parent aea6f0b4c2
commit 7a57e06eca
3 changed files with 386 additions and 76 deletions

View File

@ -15,10 +15,13 @@
package etcdhttp
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"path"
"strings"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
@ -46,7 +49,7 @@ type ServerHealth interface {
// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms AlarmSet, serializable bool) Health {
mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health {
if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" {
return h
}
@ -55,10 +58,13 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
}
return checkAPI(ctx, lg, srv, serializable)
}))
installLivezEndpoints(lg, mux, srv)
installReadyzEndpoints(lg, mux, srv)
}
// NewHealthHandler handles '/health' requests.
func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms AlarmSet, Serializable bool) Health) http.HandlerFunc {
func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms StringSet, Serializable bool) Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
@ -66,7 +72,7 @@ func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAl
lg.Warn("/health error", zap.Int("status-code", http.StatusMethodNotAllowed))
return
}
excludedAlarms := getExcludedAlarms(r)
excludedAlarms := getQuerySet(r, "exclude")
// Passing the query parameter "serializable=true" ensures that the
// health of the local etcd is checked vs the health of the cluster.
// This is useful for probes attempting to validate the liveness of
@ -119,20 +125,18 @@ type Health struct {
Reason string `json:"reason"`
}
type AlarmSet map[string]struct{}
func getExcludedAlarms(r *http.Request) (alarms AlarmSet) {
alarms = make(map[string]struct{}, 2)
alms, found := r.URL.Query()["exclude"]
func getQuerySet(r *http.Request, query string) StringSet {
querySet := make(map[string]struct{})
qs, found := r.URL.Query()[query]
if found {
for _, alm := range alms {
if len(alm) == 0 {
for _, q := range qs {
if len(q) == 0 {
continue
}
alarms[alm] = struct{}{}
querySet[q] = struct{}{}
}
}
return alarms
return querySet
}
func getSerializableFlag(r *http.Request) bool {
@ -141,7 +145,7 @@ func getSerializableFlag(r *http.Request) bool {
// TODO: etcdserver.ErrNoLeader in health API
func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms AlarmSet) Health {
func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms StringSet) Health {
h := Health{Health: "true"}
for _, v := range srv.Alarms() {
@ -193,3 +197,171 @@ func checkAPI(ctx context.Context, lg *zap.Logger, srv ServerHealth, serializabl
lg.Debug("serving /health true")
return h
}
type HealthCheck func(ctx context.Context) error
type CheckRegistry struct {
path string
checks map[string]HealthCheck
}
func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", serializableReadCheck(server))
reg.InstallHttpEndpoints(lg, mux)
}
func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
reg.InstallHttpEndpoints(lg, mux)
}
func (reg *CheckRegistry) Register(name string, check HealthCheck) {
reg.checks[name] = check
}
func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) {
checkNames := make([]string, 0, len(reg.checks))
for k := range reg.checks {
checkNames = append(checkNames, k)
}
// installs the http handler for the root path.
reg.installRootHttpEndpoint(lg, mux, reg.path, checkNames...)
for _, checkName := range checkNames {
// installs the http handler for the individual check sub path.
subpath := path.Join(reg.path, checkName)
check := checkName
mux.Handle(subpath, newHealthHandler(subpath, lg, func(r *http.Request) Health {
return reg.runHealthChecks(r.Context(), check)
}))
}
}
func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) Health {
h := Health{Health: "true"}
var individualCheckOutput bytes.Buffer
for _, checkName := range checkNames {
check, found := reg.checks[checkName]
if !found {
panic(fmt.Errorf("Health check: %s not registered", checkName))
}
if err := check(ctx); err != nil {
fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err)
h.Health = "false"
} else {
fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName)
}
}
h.Reason = individualCheckOutput.String()
return h
}
// installRootHttpEndpoint installs the http handler for the root path.
func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, path string, checks ...string) {
hfunc := func(r *http.Request) Health {
// extracts the health check names to be excludeList from the query param
excluded := getQuerySet(r, "exclude")
filteredCheckNames := filterCheckList(lg, listToStringSet(checks), excluded)
return reg.runHealthChecks(r.Context(), filteredCheckNames...)
}
mux.Handle(path, newHealthHandler(path, lg, hfunc))
}
// newHealthHandler generates a http HandlerFunc for a health check function hfunc.
func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
lg.Warn("Health request error", zap.String("path", path), zap.Int("status-code", http.StatusMethodNotAllowed))
return
}
h := hfunc(r)
// Always returns detailed reason for failed checks.
if h.Health != "true" {
http.Error(w, h.Reason, http.StatusServiceUnavailable)
lg.Error("Health check error", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusServiceUnavailable))
return
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
// Only writes detailed reason for verbose requests.
if _, found := r.URL.Query()["verbose"]; found {
fmt.Fprint(w, h.Reason)
}
fmt.Fprint(w, "ok\n")
lg.Debug("Health check OK", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusOK))
}
}
func filterCheckList(lg *zap.Logger, checks StringSet, excluded StringSet) []string {
filteredList := []string{}
for chk := range checks {
if _, found := excluded[chk]; found {
delete(excluded, chk)
continue
}
filteredList = append(filteredList, chk)
}
if len(excluded) > 0 {
// For version compatibility, excluding non-exist checks would not fail the request.
lg.Warn("some health checks cannot be excluded", zap.String("missing-health-checks", formatQuoted(excluded.List()...)))
}
return filteredList
}
// formatQuoted returns a formatted string of the health check names,
// preserving the order passed in.
func formatQuoted(names ...string) string {
quoted := make([]string, 0, len(names))
for _, name := range names {
quoted = append(quoted, fmt.Sprintf("%q", name))
}
return strings.Join(quoted, ",")
}
type StringSet map[string]struct{}
func (s StringSet) List() []string {
keys := make([]string, 0, len(s))
for k := range s {
keys = append(keys, k)
}
return keys
}
func listToStringSet(list []string) StringSet {
set := make(map[string]struct{})
for _, s := range list {
set[s] = struct{}{}
}
return set
}
// activeAlarmCheck checks if a specific alarm type is active in the server.
func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) error {
return func(ctx context.Context) error {
as := srv.Alarms()
for _, v := range as {
if v.Alarm == at {
return fmt.Errorf("alarm activated: %s", at.String())
}
}
return nil
}
}
func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
return func(ctx context.Context) error {
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
return fmt.Errorf("range error: %w", err)
}
return nil
}
}

View File

@ -16,11 +16,11 @@ package etcdhttp
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"go.uber.org/zap/zaptest"
@ -32,16 +32,15 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/schema"
)
type fakeHealthServer struct {
fakeServer
health string
apiError error
authStore auth.AuthStore
apiError error
missingLeader bool
authStore auth.AuthStore
}
func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
@ -53,87 +52,91 @@ func (s *fakeHealthServer) Config() config.ServerConfig {
}
func (s *fakeHealthServer) Leader() types.ID {
if s.health == "true" {
if !s.missingLeader {
return 1
}
return types.ID(raft.None)
}
func (s *fakeHealthServer) Do(_ context.Context, _ pb.Request) (etcdserver.Response, error) {
if s.health == "true" {
return etcdserver.Response{}, nil
}
return etcdserver.Response{}, fmt.Errorf("fail health check")
}
func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore }
func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore }
func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false }
type healthTestCase struct {
name string
healthCheckURL string
expectStatusCode int
inResult []string
notInResult []string
alarms []*pb.AlarmMember
apiError error
missingLeader bool
}
func TestHealthHandler(t *testing.T) {
// define the input and expected output
// input: alarms, and healthCheckURL
tests := []struct {
name string
alarms []*pb.AlarmMember
healthCheckURL string
apiError error
expectStatusCode int
expectHealth string
}{
tests := []healthTestCase{
{
name: "Healthy if no alarm",
alarms: []*pb.AlarmMember{},
healthCheckURL: "/health",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Unhealthy if NOSPACE alarm is on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
healthCheckURL: "/health",
expectStatusCode: http.StatusServiceUnavailable,
expectHealth: "false",
},
{
name: "Healthy if NOSPACE alarm is on and excluded",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
healthCheckURL: "/health?exclude=NOSPACE",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Healthy if NOSPACE alarm is excluded",
alarms: []*pb.AlarmMember{},
healthCheckURL: "/health?exclude=NOSPACE",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Healthy if multiple NOSPACE alarms are on and excluded",
alarms: []*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}},
healthCheckURL: "/health?exclude=NOSPACE",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Unhealthy if NOSPACE alarms is excluded and CORRUPT is on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
healthCheckURL: "/health?exclude=NOSPACE",
expectStatusCode: http.StatusServiceUnavailable,
expectHealth: "false",
},
{
name: "Unhealthy if both NOSPACE and CORRUPT are on and excluded",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
healthCheckURL: "/health?exclude=NOSPACE&exclude=CORRUPT",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Unhealthy if api is not available",
healthCheckURL: "/health",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
expectHealth: "false",
},
{
name: "Unhealthy if no leader",
healthCheckURL: "/health",
expectStatusCode: http.StatusServiceUnavailable,
missingLeader: true,
},
{
name: "Healthy if no leader and serializable=true",
healthCheckURL: "/health?serializable=true",
expectStatusCode: http.StatusOK,
missingLeader: true,
},
}
@ -144,44 +147,179 @@ func TestHealthHandler(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{
fakeServer: fakeServer{alarms: tt.alarms},
health: tt.expectHealth,
apiError: tt.apiError,
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
fakeServer: fakeServer{alarms: tt.alarms},
apiError: tt.apiError,
missingLeader: tt.missingLeader,
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
})
ts := httptest.NewServer(mux)
defer ts.Close()
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+tt.healthCheckURL)})
if err != nil {
t.Errorf("fail serve http request %s %v", tt.healthCheckURL, err)
}
if res == nil {
t.Errorf("got nil http response with http request %s", tt.healthCheckURL)
return
}
if res.StatusCode != tt.expectStatusCode {
t.Errorf("want statusCode %d but got %d", tt.expectStatusCode, res.StatusCode)
}
health, err := parseHealthOutput(res.Body)
if err != nil {
t.Errorf("fail parse health check output %v", err)
}
if health.Health != tt.expectHealth {
t.Errorf("want health %s but got %s", tt.expectHealth, health.Health)
}
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, nil, nil)
})
}
}
func parseHealthOutput(body io.Reader) (Health, error) {
obj := Health{}
d, derr := io.ReadAll(body)
if derr != nil {
return obj, derr
func TestHttpSubPath(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
tests := []healthTestCase{
{
name: "/readyz/data_corruption ok",
healthCheckURL: "/readyz/data_corruption",
expectStatusCode: http.StatusOK,
},
{
name: "/readyz/serializable_read not ok with error",
apiError: fmt.Errorf("Unexpected error"),
healthCheckURL: "/readyz/serializable_read",
expectStatusCode: http.StatusServiceUnavailable,
notInResult: []string{"data_corruption"},
},
{
name: "/readyz/non_exist 404",
healthCheckURL: "/readyz/non_exist",
expectStatusCode: http.StatusNotFound,
},
}
if err := json.Unmarshal(d, &obj); err != nil {
return obj, err
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
})
}
}
func TestDataCorruptionCheck(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
tests := []healthTestCase{
{
name: "Live if CORRUPT alarm is on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}},
healthCheckURL: "/livez",
expectStatusCode: http.StatusOK,
notInResult: []string{"data_corruption"},
},
{
name: "Not ready if CORRUPT alarm is on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}},
healthCheckURL: "/readyz",
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"},
},
{
name: "ready if CORRUPT alarm is not on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
healthCheckURL: "/readyz",
expectStatusCode: http.StatusOK,
},
{
name: "ready if CORRUPT alarm is excluded",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}, {MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
healthCheckURL: "/readyz?exclude=data_corruption",
expectStatusCode: http.StatusOK,
},
{
name: "Not ready if CORRUPT alarm is on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}},
healthCheckURL: "/readyz?exclude=non_exist",
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
defer ts.Close()
// OK before alarms are activated.
checkHttpResponse(t, ts, tt.healthCheckURL, http.StatusOK, nil, nil)
// Activate the alarms.
s.alarms = tt.alarms
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
})
}
}
func TestSerializableReadCheck(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
tests := []healthTestCase{
{
name: "Alive normal",
healthCheckURL: "/livez?verbose",
expectStatusCode: http.StatusOK,
inResult: []string{"[+]serializable_read ok"},
},
{
name: "Not alive if range api is not available",
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
},
{
name: "Not ready if range api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
})
}
}
func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})
if err != nil {
t.Fatalf("fail serve http request %s %v", url, err)
}
if res.StatusCode != expectStatusCode {
t.Errorf("want statusCode %d but got %d", expectStatusCode, res.StatusCode)
}
defer res.Body.Close()
b, err := io.ReadAll(res.Body)
if err != nil {
t.Fatalf("Failed to read response for %s", url)
}
result := string(b)
for _, substr := range inResult {
if !strings.Contains(result, substr) {
t.Errorf("Could not find substring : %s, in response: %s", substr, result)
return
}
}
for _, substr := range notInResult {
if strings.Contains(result, substr) {
t.Errorf("Do not expect substring : %s, in response: %s", substr, result)
return
}
}
return obj, nil
}

View File

@ -32,7 +32,7 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
if lg == nil {
lg = zap.NewNop()
}
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health {
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.StringSet, serializable bool) etcdhttp.Health {
return checkHealth(c)
}))
}
@ -42,7 +42,7 @@ func HandleProxyHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
if lg == nil {
lg = zap.NewNop()
}
mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health {
mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.StringSet, serializable bool) etcdhttp.Health {
return checkProxyHealth(c)
}))
}