mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17128 from siyuanfoundation/livez-bp-3.4-step2
[3.4] Backport livez/readyz
This commit is contained in:
commit
2a07f80f77
@ -12,13 +12,19 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// This file defines the http endpoints for etcd health checks.
|
||||
// The endpoints include /livez, /readyz and /health.
|
||||
|
||||
package etcdhttp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/auth"
|
||||
@ -31,13 +37,19 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
PathHealth = "/health"
|
||||
PathHealth = "/health"
|
||||
HealthStatusSuccess string = "success"
|
||||
HealthStatusError string = "error"
|
||||
checkTypeLivez = "livez"
|
||||
checkTypeReadyz = "readyz"
|
||||
checkTypeHealth = "health"
|
||||
)
|
||||
|
||||
type ServerHealth interface {
|
||||
serverHealthV2V3
|
||||
Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error)
|
||||
Config() etcdserver.ServerConfig
|
||||
AuthStore() auth.AuthStore
|
||||
}
|
||||
|
||||
type serverHealthV2V3 interface {
|
||||
@ -47,33 +59,36 @@ type serverHealthV2V3 interface {
|
||||
|
||||
// HandleHealthForV2 registers metrics and health handlers for v2.
|
||||
func HandleHealthForV2(mux *http.ServeMux, srv etcdserver.ServerV2) {
|
||||
mux.Handle(PathHealth, NewHealthHandler(func(excludedAlarms AlarmSet, serializable bool) Health {
|
||||
mux.Handle(PathHealth, NewHealthHandler(func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health {
|
||||
if h := checkAlarms(srv, excludedAlarms); h.Health != "true" {
|
||||
return h
|
||||
}
|
||||
if h := checkLeader(srv, serializable); h.Health != "true" {
|
||||
return h
|
||||
}
|
||||
return checkV2API(srv)
|
||||
return checkV2API(ctx, srv)
|
||||
}))
|
||||
}
|
||||
|
||||
// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
|
||||
// and its corresponding timeout.
|
||||
func HandleHealth(mux *http.ServeMux, srv ServerHealth) {
|
||||
mux.Handle(PathHealth, NewHealthHandler(func(excludedAlarms AlarmSet, serializable bool) Health {
|
||||
mux.Handle(PathHealth, NewHealthHandler(func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health {
|
||||
if h := checkAlarms(srv, excludedAlarms); h.Health != "true" {
|
||||
return h
|
||||
}
|
||||
if h := checkLeader(srv, serializable); h.Health != "true" {
|
||||
return h
|
||||
}
|
||||
return checkAPI(srv, serializable)
|
||||
return checkAPI(ctx, srv, serializable)
|
||||
}))
|
||||
|
||||
installLivezEndpoints(mux, srv)
|
||||
installReadyzEndpoints(mux, srv)
|
||||
}
|
||||
|
||||
// NewHealthHandler handles '/health' requests.
|
||||
func NewHealthHandler(hfunc func(excludedAlarms AlarmSet, serializable bool) Health) http.HandlerFunc {
|
||||
func NewHealthHandler(hfunc func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
w.Header().Set("Allow", http.MethodGet)
|
||||
@ -81,13 +96,13 @@ func NewHealthHandler(hfunc func(excludedAlarms AlarmSet, serializable bool) Hea
|
||||
plog.Warningf("/health error (status code %d)", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
excludedAlarms := getExcludedAlarms(r)
|
||||
excludedAlarms := getQuerySet(r, "exclude")
|
||||
// Passing the query parameter "serializable=true" ensures that the
|
||||
// health of the local etcd is checked vs the health of the cluster.
|
||||
// This is useful for probes attempting to validate the liveness of
|
||||
// the etcd process vs readiness of the cluster to serve requests.
|
||||
serializableFlag := getSerializableFlag(r)
|
||||
h := hfunc(excludedAlarms, serializableFlag)
|
||||
h := hfunc(r.Context(), excludedAlarms, serializableFlag)
|
||||
defer func() {
|
||||
if h.Health == "true" {
|
||||
healthSuccess.Inc()
|
||||
@ -119,11 +134,29 @@ var (
|
||||
Name: "health_failures",
|
||||
Help: "The total number of failed health checks",
|
||||
})
|
||||
healthCheckGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "healthcheck",
|
||||
Help: "The result of each kind of healthcheck.",
|
||||
},
|
||||
[]string{"type", "name"},
|
||||
)
|
||||
healthCheckCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "healthchecks_total",
|
||||
Help: "The total number of each kind of healthcheck.",
|
||||
},
|
||||
[]string{"type", "name", "status"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(healthSuccess)
|
||||
prometheus.MustRegister(healthFailed)
|
||||
prometheus.MustRegister(healthCheckGauge)
|
||||
prometheus.MustRegister(healthCheckCounter)
|
||||
}
|
||||
|
||||
// Health defines etcd server health status.
|
||||
@ -133,20 +166,24 @@ type Health struct {
|
||||
Reason string `json:"-"`
|
||||
}
|
||||
|
||||
type AlarmSet map[string]struct{}
|
||||
// HealthStatus is used in new /readyz or /livez health checks instead of the Health struct.
|
||||
type HealthStatus struct {
|
||||
Reason string `json:"reason"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
func getExcludedAlarms(r *http.Request) (alarms AlarmSet) {
|
||||
alarms = make(map[string]struct{}, 2)
|
||||
alms, found := r.URL.Query()["exclude"]
|
||||
func getQuerySet(r *http.Request, query string) StringSet {
|
||||
querySet := make(map[string]struct{})
|
||||
qs, found := r.URL.Query()[query]
|
||||
if found {
|
||||
for _, alm := range alms {
|
||||
if len(alms) == 0 {
|
||||
for _, q := range qs {
|
||||
if len(qs) == 0 {
|
||||
continue
|
||||
}
|
||||
alarms[alm] = struct{}{}
|
||||
querySet[q] = struct{}{}
|
||||
}
|
||||
}
|
||||
return alarms
|
||||
return querySet
|
||||
}
|
||||
|
||||
func getSerializableFlag(r *http.Request) bool {
|
||||
@ -155,7 +192,7 @@ func getSerializableFlag(r *http.Request) bool {
|
||||
|
||||
// TODO: etcdserver.ErrNoLeader in health API
|
||||
|
||||
func checkAlarms(srv serverHealthV2V3, excludedAlarms AlarmSet) Health {
|
||||
func checkAlarms(srv serverHealthV2V3, excludedAlarms StringSet) Health {
|
||||
h := Health{Health: "true"}
|
||||
as := srv.Alarms()
|
||||
if len(as) > 0 {
|
||||
@ -193,9 +230,9 @@ func checkLeader(srv serverHealthV2V3, serializable bool) Health {
|
||||
return h
|
||||
}
|
||||
|
||||
func checkV2API(srv etcdserver.ServerV2) Health {
|
||||
func checkV2API(ctx context.Context, srv etcdserver.ServerV2) Health {
|
||||
h := Health{Health: "true"}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
_, err := srv.Do(ctx, pb.Request{Method: "QGET"})
|
||||
cancel()
|
||||
if err != nil {
|
||||
@ -207,17 +244,212 @@ func checkV2API(srv etcdserver.ServerV2) Health {
|
||||
return h
|
||||
}
|
||||
|
||||
func checkAPI(srv ServerHealth, serializable bool) Health {
|
||||
func checkAPI(ctx context.Context, srv ServerHealth, serializable bool) Health {
|
||||
h := Health{Health: "true"}
|
||||
cfg := srv.Config()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.ReqTimeout())
|
||||
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
|
||||
ctx = srv.AuthStore().WithRoot(ctx)
|
||||
cctx, cancel := context.WithTimeout(ctx, cfg.ReqTimeout())
|
||||
_, err := srv.Range(cctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
|
||||
cancel()
|
||||
if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
|
||||
if err != nil {
|
||||
h.Health = "false"
|
||||
h.Reason = fmt.Sprintf("RANGE ERROR:%s", err)
|
||||
plog.Warningf("serving /health false; Range failed %v (status code %d)", err, http.StatusServiceUnavailable)
|
||||
return h
|
||||
}
|
||||
plog.Debug("serving /health true")
|
||||
return h
|
||||
}
|
||||
|
||||
type HealthCheck func(ctx context.Context) error
|
||||
|
||||
type CheckRegistry struct {
|
||||
checkType string
|
||||
checks map[string]HealthCheck
|
||||
}
|
||||
|
||||
func installLivezEndpoints(mux *http.ServeMux, server ServerHealth) {
|
||||
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
|
||||
reg.Register("serializable_read", readCheck(server, true /* serializable */))
|
||||
reg.InstallHttpEndpoints(mux)
|
||||
}
|
||||
|
||||
func installReadyzEndpoints(mux *http.ServeMux, server ServerHealth) {
|
||||
reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
|
||||
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
|
||||
// serializable_read checks if local read is ok.
|
||||
// linearizable_read checks if there is consensus in the cluster.
|
||||
// Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure.
|
||||
reg.Register("serializable_read", readCheck(server, true))
|
||||
// linearizable_read check would be replaced by read_index check in 3.6
|
||||
reg.Register("linearizable_read", readCheck(server, false))
|
||||
reg.InstallHttpEndpoints(mux)
|
||||
}
|
||||
|
||||
func (reg *CheckRegistry) Register(name string, check HealthCheck) {
|
||||
reg.checks[name] = check
|
||||
}
|
||||
|
||||
func (reg *CheckRegistry) RootPath() string {
|
||||
return "/" + reg.checkType
|
||||
}
|
||||
|
||||
func (reg *CheckRegistry) InstallHttpEndpoints(mux *http.ServeMux) {
|
||||
checkNames := make([]string, 0, len(reg.checks))
|
||||
for k := range reg.checks {
|
||||
checkNames = append(checkNames, k)
|
||||
}
|
||||
|
||||
// installs the http handler for the root path.
|
||||
reg.installRootHttpEndpoint(mux, checkNames...)
|
||||
for _, checkName := range checkNames {
|
||||
// installs the http handler for the individual check sub path.
|
||||
subpath := path.Join(reg.RootPath(), checkName)
|
||||
check := checkName
|
||||
mux.Handle(subpath, newHealthHandler(subpath, func(r *http.Request) HealthStatus {
|
||||
return reg.runHealthChecks(r.Context(), check)
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) HealthStatus {
|
||||
h := HealthStatus{Status: HealthStatusSuccess}
|
||||
var individualCheckOutput bytes.Buffer
|
||||
for _, checkName := range checkNames {
|
||||
check, found := reg.checks[checkName]
|
||||
if !found {
|
||||
panic(fmt.Errorf("Health check: %s not registered", checkName))
|
||||
}
|
||||
if err := check(ctx); err != nil {
|
||||
fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err)
|
||||
h.Status = HealthStatusError
|
||||
recordMetrics(reg.checkType, checkName, HealthStatusError)
|
||||
} else {
|
||||
fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName)
|
||||
recordMetrics(reg.checkType, checkName, HealthStatusSuccess)
|
||||
}
|
||||
}
|
||||
h.Reason = individualCheckOutput.String()
|
||||
return h
|
||||
}
|
||||
|
||||
// installRootHttpEndpoint installs the http handler for the root path.
|
||||
func (reg *CheckRegistry) installRootHttpEndpoint(mux *http.ServeMux, checks ...string) {
|
||||
hfunc := func(r *http.Request) HealthStatus {
|
||||
// extracts the health check names to be excludeList from the query param
|
||||
excluded := getQuerySet(r, "exclude")
|
||||
|
||||
filteredCheckNames := filterCheckList(listToStringSet(checks), excluded)
|
||||
h := reg.runHealthChecks(r.Context(), filteredCheckNames...)
|
||||
return h
|
||||
}
|
||||
mux.Handle(reg.RootPath(), newHealthHandler(reg.RootPath(), hfunc))
|
||||
}
|
||||
|
||||
// newHealthHandler generates a http HandlerFunc for a health check function hfunc.
|
||||
func newHealthHandler(path string, hfunc func(*http.Request) HealthStatus) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
w.Header().Set("Allow", http.MethodGet)
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
plog.Warningf("Health request error path=%s (status code %d)", path, http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
h := hfunc(r)
|
||||
// Always returns detailed reason for failed checks.
|
||||
if h.Status == HealthStatusError {
|
||||
http.Error(w, h.Reason, http.StatusServiceUnavailable)
|
||||
plog.Errorf("Health check error path=%s, reason=%s (status code %d)", path, h.Reason, http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
// Only writes detailed reason for verbose requests.
|
||||
if _, found := r.URL.Query()["verbose"]; found {
|
||||
fmt.Fprint(w, h.Reason)
|
||||
}
|
||||
fmt.Fprint(w, "ok\n")
|
||||
plog.Debugf("Health OK path=%s, reason=%s (status code %d)", path, h.Reason, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
func filterCheckList(checks StringSet, excluded StringSet) []string {
|
||||
filteredList := []string{}
|
||||
for chk := range checks {
|
||||
if _, found := excluded[chk]; found {
|
||||
delete(excluded, chk)
|
||||
continue
|
||||
}
|
||||
filteredList = append(filteredList, chk)
|
||||
}
|
||||
if len(excluded) > 0 {
|
||||
// For version compatibility, excluding non-exist checks would not fail the request.
|
||||
plog.Warningf("some health checks cannot be excluded, missing-health-checks=%s", formatQuoted(excluded.List()...))
|
||||
}
|
||||
return filteredList
|
||||
}
|
||||
|
||||
// formatQuoted returns a formatted string of the health check names,
|
||||
// preserving the order passed in.
|
||||
func formatQuoted(names ...string) string {
|
||||
quoted := make([]string, 0, len(names))
|
||||
for _, name := range names {
|
||||
quoted = append(quoted, fmt.Sprintf("%q", name))
|
||||
}
|
||||
return strings.Join(quoted, ",")
|
||||
}
|
||||
|
||||
type StringSet map[string]struct{}
|
||||
|
||||
func (s StringSet) List() []string {
|
||||
keys := make([]string, 0, len(s))
|
||||
for k := range s {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func listToStringSet(list []string) StringSet {
|
||||
set := make(map[string]struct{})
|
||||
for _, s := range list {
|
||||
set[s] = struct{}{}
|
||||
}
|
||||
return set
|
||||
}
|
||||
|
||||
func recordMetrics(checkType, name string, status string) {
|
||||
val := 0.0
|
||||
if status == HealthStatusSuccess {
|
||||
val = 1.0
|
||||
}
|
||||
healthCheckGauge.With(prometheus.Labels{
|
||||
"type": checkType,
|
||||
"name": name,
|
||||
}).Set(val)
|
||||
healthCheckCounter.With(prometheus.Labels{
|
||||
"type": checkType,
|
||||
"name": name,
|
||||
"status": status,
|
||||
}).Inc()
|
||||
}
|
||||
|
||||
// activeAlarmCheck checks if a specific alarm type is active in the server.
|
||||
func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) error {
|
||||
return func(ctx context.Context) error {
|
||||
as := srv.Alarms()
|
||||
for _, v := range as {
|
||||
if v.Alarm == at {
|
||||
return fmt.Errorf("alarm activated: %s", at.String())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error {
|
||||
return func(ctx context.Context) error {
|
||||
ctx = srv.AuthStore().WithRoot(ctx)
|
||||
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -16,16 +16,20 @@ package etcdhttp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"go.etcd.io/etcd/auth"
|
||||
"go.etcd.io/etcd/etcdserver"
|
||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||
betesting "go.etcd.io/etcd/mvcc/backend"
|
||||
"go.etcd.io/etcd/pkg/testutil"
|
||||
"go.etcd.io/etcd/pkg/types"
|
||||
"go.etcd.io/etcd/raft"
|
||||
@ -33,12 +37,17 @@ import (
|
||||
|
||||
type fakeHealthServer struct {
|
||||
fakeServer
|
||||
health string
|
||||
apiError error
|
||||
serializableReadError error
|
||||
linearizableReadError error
|
||||
missingLeader bool
|
||||
authStore auth.AuthStore
|
||||
}
|
||||
|
||||
func (s *fakeHealthServer) Range(ctx context.Context, request *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
return nil, s.apiError
|
||||
func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
if req.Serializable {
|
||||
return nil, s.serializableReadError
|
||||
}
|
||||
return nil, s.linearizableReadError
|
||||
}
|
||||
|
||||
func (s *fakeHealthServer) Config() etcdserver.ServerConfig {
|
||||
@ -46,144 +55,369 @@ func (s *fakeHealthServer) Config() etcdserver.ServerConfig {
|
||||
}
|
||||
|
||||
func (s *fakeHealthServer) Leader() types.ID {
|
||||
if s.health == "true" {
|
||||
if !s.missingLeader {
|
||||
return 1
|
||||
}
|
||||
return types.ID(raft.None)
|
||||
}
|
||||
func (s *fakeHealthServer) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) {
|
||||
if s.health == "true" {
|
||||
return etcdserver.Response{}, nil
|
||||
}
|
||||
return etcdserver.Response{}, fmt.Errorf("fail health check")
|
||||
}
|
||||
|
||||
func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore }
|
||||
|
||||
func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false }
|
||||
|
||||
type healthTestCase struct {
|
||||
name string
|
||||
healthCheckURL string
|
||||
expectStatusCode int
|
||||
inResult []string
|
||||
notInResult []string
|
||||
|
||||
alarms []*pb.AlarmMember
|
||||
apiError error
|
||||
missingLeader bool
|
||||
}
|
||||
|
||||
func TestHealthHandler(t *testing.T) {
|
||||
// define the input and expected output
|
||||
// input: alarms, and healthCheckURL
|
||||
tests := []struct {
|
||||
name string
|
||||
alarms []*pb.AlarmMember
|
||||
healthCheckURL string
|
||||
apiError error
|
||||
|
||||
expectStatusCode int
|
||||
expectHealth string
|
||||
}{
|
||||
tests := []healthTestCase{
|
||||
{
|
||||
name: "Healthy if no alarm",
|
||||
alarms: []*pb.AlarmMember{},
|
||||
healthCheckURL: "/health",
|
||||
expectStatusCode: http.StatusOK,
|
||||
expectHealth: "true",
|
||||
},
|
||||
{
|
||||
name: "Unhealthy if NOSPACE alarm is on",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
|
||||
healthCheckURL: "/health",
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
expectHealth: "false",
|
||||
},
|
||||
{
|
||||
name: "Healthy if NOSPACE alarm is on and excluded",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
|
||||
healthCheckURL: "/health?exclude=NOSPACE",
|
||||
expectStatusCode: http.StatusOK,
|
||||
expectHealth: "true",
|
||||
},
|
||||
{
|
||||
name: "Healthy if NOSPACE alarm is excluded",
|
||||
alarms: []*pb.AlarmMember{},
|
||||
healthCheckURL: "/health?exclude=NOSPACE",
|
||||
expectStatusCode: http.StatusOK,
|
||||
expectHealth: "true",
|
||||
},
|
||||
{
|
||||
name: "Healthy if multiple NOSPACE alarms are on and excluded",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}},
|
||||
healthCheckURL: "/health?exclude=NOSPACE",
|
||||
expectStatusCode: http.StatusOK,
|
||||
expectHealth: "true",
|
||||
},
|
||||
{
|
||||
name: "Unhealthy if NOSPACE alarms is excluded and CORRUPT is on",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
|
||||
healthCheckURL: "/health?exclude=NOSPACE",
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
expectHealth: "false",
|
||||
},
|
||||
{
|
||||
name: "Unhealthy if both NOSPACE and CORRUPT are on and excluded",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
|
||||
healthCheckURL: "/health?exclude=NOSPACE&exclude=CORRUPT",
|
||||
expectStatusCode: http.StatusOK,
|
||||
expectHealth: "true",
|
||||
},
|
||||
{
|
||||
name: "Healthy even if authentication failed",
|
||||
healthCheckURL: "/health",
|
||||
apiError: auth.ErrUserEmpty,
|
||||
expectStatusCode: http.StatusOK,
|
||||
expectHealth: "true",
|
||||
},
|
||||
{
|
||||
name: "Healthy even if authorization failed",
|
||||
healthCheckURL: "/health",
|
||||
apiError: auth.ErrPermissionDenied,
|
||||
expectStatusCode: http.StatusOK,
|
||||
expectHealth: "true",
|
||||
},
|
||||
{
|
||||
name: "Unhealthy if api is not available",
|
||||
healthCheckURL: "/health",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
expectHealth: "false",
|
||||
},
|
||||
{
|
||||
name: "Unhealthy if no leader",
|
||||
healthCheckURL: "/health",
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
missingLeader: true,
|
||||
},
|
||||
{
|
||||
name: "Healthy if no leader and serializable=true",
|
||||
healthCheckURL: "/health?serializable=true",
|
||||
expectStatusCode: http.StatusOK,
|
||||
missingLeader: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
lg := zaptest.NewLogger(t)
|
||||
be, _ := betesting.NewDefaultTmpBackend()
|
||||
defer be.Close()
|
||||
HandleHealth(mux, &fakeHealthServer{
|
||||
fakeServer: fakeServer{alarms: tt.alarms},
|
||||
health: tt.expectHealth,
|
||||
apiError: tt.apiError,
|
||||
fakeServer: fakeServer{alarms: tt.alarms},
|
||||
serializableReadError: tt.apiError,
|
||||
linearizableReadError: tt.apiError,
|
||||
missingLeader: tt.missingLeader,
|
||||
authStore: auth.NewAuthStore(lg, be, nil, 0),
|
||||
})
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
|
||||
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+tt.healthCheckURL)})
|
||||
if err != nil {
|
||||
t.Errorf("fail serve http request %s %v", tt.healthCheckURL, err)
|
||||
}
|
||||
if res == nil {
|
||||
t.Errorf("got nil http response with http request %s", tt.healthCheckURL)
|
||||
return
|
||||
}
|
||||
if res.StatusCode != tt.expectStatusCode {
|
||||
t.Errorf("want statusCode %d but got %d", tt.expectStatusCode, res.StatusCode)
|
||||
}
|
||||
health, err := parseHealthOutput(res.Body)
|
||||
if err != nil {
|
||||
t.Errorf("fail parse health check output %v", err)
|
||||
}
|
||||
if health.Health != tt.expectHealth {
|
||||
t.Errorf("want health %s but got %s", tt.expectHealth, health.Health)
|
||||
}
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, nil, nil)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func parseHealthOutput(body io.Reader) (Health, error) {
|
||||
obj := Health{}
|
||||
d, derr := io.ReadAll(body)
|
||||
if derr != nil {
|
||||
return obj, derr
|
||||
func TestHttpSubPath(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend()
|
||||
defer be.Close()
|
||||
tests := []healthTestCase{
|
||||
{
|
||||
name: "/readyz/data_corruption ok",
|
||||
healthCheckURL: "/readyz/data_corruption",
|
||||
expectStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "/readyz/serializable_read not ok with error",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
healthCheckURL: "/readyz/serializable_read",
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
notInResult: []string{"data_corruption"},
|
||||
},
|
||||
{
|
||||
name: "/readyz/non_exist 404",
|
||||
healthCheckURL: "/readyz/non_exist",
|
||||
expectStatusCode: http.StatusNotFound,
|
||||
},
|
||||
}
|
||||
if err := json.Unmarshal(d, &obj); err != nil {
|
||||
return obj, err
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
serializableReadError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, be, nil, 0),
|
||||
}
|
||||
HandleHealth(mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataCorruptionCheck(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend()
|
||||
defer be.Close()
|
||||
tests := []healthTestCase{
|
||||
{
|
||||
name: "Live if CORRUPT alarm is on",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}},
|
||||
healthCheckURL: "/livez",
|
||||
expectStatusCode: http.StatusOK,
|
||||
notInResult: []string{"data_corruption"},
|
||||
},
|
||||
{
|
||||
name: "Not ready if CORRUPT alarm is on",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}},
|
||||
healthCheckURL: "/readyz",
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"},
|
||||
},
|
||||
{
|
||||
name: "ready if CORRUPT alarm is not on",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
|
||||
healthCheckURL: "/readyz",
|
||||
expectStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "ready if CORRUPT alarm is excluded",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}, {MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
|
||||
healthCheckURL: "/readyz?exclude=data_corruption",
|
||||
expectStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "Not ready if CORRUPT alarm is on",
|
||||
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}},
|
||||
healthCheckURL: "/readyz?exclude=non_exist",
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
authStore: auth.NewAuthStore(logger, be, nil, 0),
|
||||
}
|
||||
HandleHealth(mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
// OK before alarms are activated.
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, http.StatusOK, nil, nil)
|
||||
// Activate the alarms.
|
||||
s.alarms = tt.alarms
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSerializableReadCheck(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend()
|
||||
defer be.Close()
|
||||
tests := []healthTestCase{
|
||||
{
|
||||
name: "Alive normal",
|
||||
healthCheckURL: "/livez?verbose",
|
||||
expectStatusCode: http.StatusOK,
|
||||
inResult: []string{"[+]serializable_read ok"},
|
||||
},
|
||||
{
|
||||
name: "Not alive if range api is not available",
|
||||
healthCheckURL: "/livez",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[-]serializable_read failed: Unexpected error"},
|
||||
},
|
||||
{
|
||||
name: "Not ready if range api is not available",
|
||||
healthCheckURL: "/readyz",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[-]serializable_read failed: Unexpected error"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
serializableReadError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, be, nil, 0),
|
||||
}
|
||||
HandleHealth(mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "serializable_read", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLinearizableReadCheck(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend()
|
||||
defer be.Close()
|
||||
tests := []healthTestCase{
|
||||
{
|
||||
name: "Alive normal",
|
||||
healthCheckURL: "/livez?verbose",
|
||||
expectStatusCode: http.StatusOK,
|
||||
inResult: []string{"[+]serializable_read ok"},
|
||||
},
|
||||
{
|
||||
name: "Alive if lineariable range api is not available",
|
||||
healthCheckURL: "/livez",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "Not ready if range api is not available",
|
||||
healthCheckURL: "/readyz",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
linearizableReadError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, be, nil, 0),
|
||||
}
|
||||
HandleHealth(mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
|
||||
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("fail serve http request %s %v", url, err)
|
||||
}
|
||||
if res.StatusCode != expectStatusCode {
|
||||
t.Errorf("want statusCode %d but got %d", expectStatusCode, res.StatusCode)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
b, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read response for %s", url)
|
||||
}
|
||||
result := string(b)
|
||||
for _, substr := range inResult {
|
||||
if !strings.Contains(result, substr) {
|
||||
t.Errorf("Could not find substring : %s, in response: %s", substr, result)
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, substr := range notInResult {
|
||||
if strings.Contains(result, substr) {
|
||||
t.Errorf("Do not expect substring : %s, in response: %s", substr, result)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkMetrics(t *testing.T, url, checkName string, expectStatusCode int) {
|
||||
defer healthCheckGauge.Reset()
|
||||
defer healthCheckCounter.Reset()
|
||||
|
||||
typeName := strings.TrimPrefix(strings.Split(url, "?")[0], "/")
|
||||
if len(checkName) == 0 {
|
||||
checkName = strings.Split(typeName, "/")[1]
|
||||
typeName = strings.Split(typeName, "/")[0]
|
||||
}
|
||||
|
||||
expectedSuccessCount := 1
|
||||
expectedErrorCount := 0
|
||||
if expectStatusCode != http.StatusOK {
|
||||
expectedSuccessCount = 0
|
||||
expectedErrorCount = 1
|
||||
}
|
||||
|
||||
gather, _ := prometheus.DefaultGatherer.Gather()
|
||||
for _, mf := range gather {
|
||||
name := *mf.Name
|
||||
val := 0
|
||||
switch name {
|
||||
case "etcd_server_healthcheck":
|
||||
val = int(mf.GetMetric()[0].GetGauge().GetValue())
|
||||
case "etcd_server_healthcheck_total":
|
||||
val = int(mf.GetMetric()[0].GetCounter().GetValue())
|
||||
default:
|
||||
continue
|
||||
}
|
||||
labelMap := make(map[string]string)
|
||||
for _, label := range mf.GetMetric()[0].Label {
|
||||
labelMap[label.GetName()] = label.GetValue()
|
||||
}
|
||||
if typeName != labelMap["type"] {
|
||||
continue
|
||||
}
|
||||
if labelMap["name"] != checkName {
|
||||
continue
|
||||
}
|
||||
if statusLabel, found := labelMap["status"]; found && statusLabel == HealthStatusError {
|
||||
if val != expectedErrorCount {
|
||||
t.Fatalf("%s got errorCount %d, wanted %d\n", name, val, expectedErrorCount)
|
||||
}
|
||||
} else {
|
||||
if val != expectedSuccessCount {
|
||||
t.Fatalf("%s got expectedSuccessCount %d, wanted %d\n", name, val, expectedSuccessCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
@ -26,7 +26,9 @@ import (
|
||||
|
||||
// HandleHealth registers health handler on '/health'.
|
||||
func HandleHealth(mux *http.ServeMux, c *clientv3.Client) {
|
||||
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(func(excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { return checkHealth(c) }))
|
||||
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(func(ctx context.Context, excludedAlarms etcdhttp.StringSet, serializable bool) etcdhttp.Health {
|
||||
return checkHealth(c)
|
||||
}))
|
||||
}
|
||||
|
||||
func checkHealth(c *clientv3.Client) etcdhttp.Health {
|
||||
|
Loading…
x
Reference in New Issue
Block a user