mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: add linearizable_read check to readyz.
Signed-off-by: Siyuan Zhang <sizhang@google.com>
This commit is contained in:
parent
293fc21cd8
commit
ebb7e796c3
@ -23,10 +23,9 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -276,14 +275,19 @@ type CheckRegistry struct {
|
||||
|
||||
func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
|
||||
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
|
||||
reg.Register("serializable_read", serializableReadCheck(server))
|
||||
reg.Register("serializable_read", readCheck(server, true /* serializable */))
|
||||
reg.InstallHttpEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
|
||||
reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
|
||||
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
|
||||
reg.Register("serializable_read", serializableReadCheck(server))
|
||||
// serializable_read checks if local read is ok.
|
||||
// linearizable_read checks if there is consensus in the cluster.
|
||||
// Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure.
|
||||
reg.Register("serializable_read", readCheck(server, true))
|
||||
// linearizable_read check would be replaced by read_index check in 3.6
|
||||
reg.Register("linearizable_read", readCheck(server, false))
|
||||
reg.InstallHttpEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
@ -447,13 +451,10 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e
|
||||
}
|
||||
}
|
||||
|
||||
func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
|
||||
func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error {
|
||||
return func(ctx context.Context) error {
|
||||
ctx = srv.AuthStore().WithRoot(ctx)
|
||||
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
|
||||
if err != nil {
|
||||
return fmt.Errorf("range error: %w", err)
|
||||
}
|
||||
return nil
|
||||
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -23,13 +23,17 @@ import (
|
||||
|
||||
type fakeHealthServer struct {
|
||||
fakeServer
|
||||
apiError error
|
||||
missingLeader bool
|
||||
authStore auth.AuthStore
|
||||
serializableReadError error
|
||||
linearizableReadError error
|
||||
missingLeader bool
|
||||
authStore auth.AuthStore
|
||||
}
|
||||
|
||||
func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
return nil, s.apiError
|
||||
func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
if req.Serializable {
|
||||
return nil, s.serializableReadError
|
||||
}
|
||||
return nil, s.linearizableReadError
|
||||
}
|
||||
|
||||
func (s *fakeHealthServer) Config() config.ServerConfig {
|
||||
@ -132,10 +136,11 @@ func TestHealthHandler(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(t)
|
||||
defer betesting.Close(t, be)
|
||||
HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{
|
||||
fakeServer: fakeServer{alarms: tt.alarms},
|
||||
apiError: tt.apiError,
|
||||
missingLeader: tt.missingLeader,
|
||||
authStore: auth.NewAuthStore(lg, be, nil, 0),
|
||||
fakeServer: fakeServer{alarms: tt.alarms},
|
||||
serializableReadError: tt.apiError,
|
||||
linearizableReadError: tt.apiError,
|
||||
missingLeader: tt.missingLeader,
|
||||
authStore: auth.NewAuthStore(lg, be, nil, 0),
|
||||
})
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
@ -171,8 +176,8 @@ func TestHttpSubPath(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
apiError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, be, nil, 0),
|
||||
serializableReadError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, be, nil, 0),
|
||||
}
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
@ -255,14 +260,14 @@ func TestSerializableReadCheck(t *testing.T) {
|
||||
healthCheckURL: "/livez",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
|
||||
inResult: []string{"[-]serializable_read failed: Unexpected error"},
|
||||
},
|
||||
{
|
||||
name: "Not ready if range api is not available",
|
||||
healthCheckURL: "/readyz",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
|
||||
inResult: []string{"[-]serializable_read failed: Unexpected error"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
@ -270,8 +275,8 @@ func TestSerializableReadCheck(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
apiError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, be, nil, 0),
|
||||
serializableReadError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, be, nil, 0),
|
||||
}
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
@ -282,6 +287,47 @@ func TestSerializableReadCheck(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLinearizableReadCheck(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(t)
|
||||
defer betesting.Close(t, be)
|
||||
tests := []healthTestCase{
|
||||
{
|
||||
name: "Alive normal",
|
||||
healthCheckURL: "/livez?verbose",
|
||||
expectStatusCode: http.StatusOK,
|
||||
inResult: []string{"[+]serializable_read ok"},
|
||||
},
|
||||
{
|
||||
name: "Alive if lineariable range api is not available",
|
||||
healthCheckURL: "/livez",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "Not ready if range api is not available",
|
||||
healthCheckURL: "/readyz",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
linearizableReadError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, be, nil, 0),
|
||||
}
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
|
||||
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user