mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
add existing http health check handler e2e test
Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
parent
262a3a1208
commit
1324f03254
@ -1822,6 +1822,7 @@ func (s *EtcdServer) apply(
|
||||
zap.Stringer("type", e.Type))
|
||||
switch e.Type {
|
||||
case raftpb.EntryNormal:
|
||||
// gofail: var beforeApplyOneEntryNormal struct{}
|
||||
s.applyEntryNormal(&e)
|
||||
s.setAppliedIndex(e.Index)
|
||||
s.setTerm(e.Term)
|
||||
|
@ -316,7 +316,7 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
epc, err = e2e.StartEtcdProcessCluster(ctx, epc, cfg)
|
||||
epc, err = e2e.StartEtcdProcessCluster(ctx, t, epc, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
@ -120,5 +120,5 @@ func bootstrapEtcdClusterUsingV3Discovery(t *testing.T, discoveryEndpoints []str
|
||||
}
|
||||
|
||||
// start the cluster
|
||||
return e2e.StartEtcdProcessCluster(context.TODO(), epc, cfg)
|
||||
return e2e.StartEtcdProcessCluster(context.TODO(), t, epc, cfg)
|
||||
}
|
||||
|
226
tests/e2e/http_health_check_test.go
Normal file
226
tests/e2e/http_health_check_test.go
Normal file
@ -0,0 +1,226 @@
|
||||
// Copyright 2023 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !cluster_proxy
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
"go.etcd.io/etcd/tests/v3/framework/testutils"
|
||||
)
|
||||
|
||||
type healthCheckConfig struct {
|
||||
url string
|
||||
expectedStatusCode int
|
||||
expectedTimeoutError bool
|
||||
}
|
||||
|
||||
func TestHTTPHealthHandler(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
client := &http.Client{}
|
||||
tcs := []struct {
|
||||
name string
|
||||
injectFailure func(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster)
|
||||
clusterOptions []e2e.EPClusterOption
|
||||
healthChecks []healthCheckConfig
|
||||
}{
|
||||
{
|
||||
name: "no failures", // happy case
|
||||
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1)},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "activated no space alarm",
|
||||
injectFailure: triggerNoSpaceAlarm,
|
||||
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithQuotaBackendBytes(int64(13 * os.Getpagesize()))},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health",
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
{
|
||||
url: "/health?exclude=NOSPACE",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "overloaded server slow apply",
|
||||
injectFailure: triggerSlowApply,
|
||||
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithGoFailEnabled(true)},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health?serializable=true",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/health?serializable=false",
|
||||
expectedTimeoutError: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "network partitioned",
|
||||
injectFailure: blackhole,
|
||||
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithIsPeerTLS(true), e2e.WithPeerProxy(true)},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health?serializable=true",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/health?serializable=false",
|
||||
expectedTimeoutError: true,
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
// old leader may return "etcdserver: leader changed" error with 503 in ReadIndex leaderChangedNotifier
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "raft loop deadlock",
|
||||
injectFailure: triggerRaftLoopDeadLock,
|
||||
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
// current kubeadm etcd liveness check failed to detect raft loop deadlock in steady state
|
||||
// ref. https://github.com/kubernetes/kubernetes/blob/master/cmd/kubeadm/app/phases/etcd/local.go#L225-L226
|
||||
// current liveness probe depends on the etcd /health check has a flaw that new /livez check should resolve.
|
||||
url: "/health?serializable=true",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/health?serializable=false",
|
||||
expectedTimeoutError: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
// verify that auth enabled serializable read must go through mvcc
|
||||
{
|
||||
name: "slow buffer write back with auth enabled",
|
||||
injectFailure: triggerSlowBufferWriteBackWithAuth,
|
||||
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health?serializable=true",
|
||||
expectedTimeoutError: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
clus, err := e2e.NewEtcdProcessCluster(ctx, t, tc.clusterOptions...)
|
||||
require.NoError(t, err)
|
||||
defer clus.Close()
|
||||
testutils.ExecuteUntil(ctx, t, func() {
|
||||
if tc.injectFailure != nil {
|
||||
tc.injectFailure(ctx, t, clus)
|
||||
}
|
||||
|
||||
for _, hc := range tc.healthChecks {
|
||||
requestURL := clus.Procs[0].EndpointsHTTP()[0] + hc.url
|
||||
t.Logf("health check URL is %s", requestURL)
|
||||
doHealthCheckAndVerify(t, client, requestURL, hc.expectedStatusCode, hc.expectedTimeoutError)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expectStatusCode int, expectTimeoutError bool) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
require.NoErrorf(t, err, "failed to creat request %+v", err)
|
||||
resp, herr := client.Do(req)
|
||||
cancel()
|
||||
if expectTimeoutError {
|
||||
if herr != nil && strings.Contains(herr.Error(), context.DeadlineExceeded.Error()) {
|
||||
return
|
||||
}
|
||||
}
|
||||
require.NoErrorf(t, herr, "failed to get response %+v", err)
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
require.NoErrorf(t, err, "failed to read response %+v", err)
|
||||
|
||||
t.Logf("health check response body is: %s", body)
|
||||
require.Equal(t, expectStatusCode, resp.StatusCode)
|
||||
}
|
||||
|
||||
func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) {
|
||||
buf := strings.Repeat("b", os.Getpagesize())
|
||||
etcdctl := clus.Etcdctl()
|
||||
for {
|
||||
if err := etcdctl.Put(ctx, "foo", buf, config.PutOptions{}); err != nil {
|
||||
if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) {
|
||||
// the following proposal will be blocked at applying stage
|
||||
// because when apply index < committed index, linearizable read would time out.
|
||||
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", `sleep("3s")`))
|
||||
require.NoError(t, clus.Procs[1].Etcdctl().Put(ctx, "foo", "bar", config.PutOptions{}))
|
||||
}
|
||||
|
||||
func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) {
|
||||
member := clus.Procs[0]
|
||||
proxy := member.PeerProxy()
|
||||
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
|
||||
proxy.BlackholeTx()
|
||||
proxy.BlackholeRx()
|
||||
}
|
||||
|
||||
func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) {
|
||||
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSave", `sleep("3s")`))
|
||||
clus.Procs[0].Etcdctl().Put(context.Background(), "foo", "bar", config.PutOptions{})
|
||||
}
|
||||
|
||||
func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) {
|
||||
etcdctl := clus.Etcdctl()
|
||||
_, err := etcdctl.UserAdd(ctx, "root", "root", config.UserAddOptions{})
|
||||
require.NoError(t, err)
|
||||
_, err = etcdctl.UserGrantRole(ctx, "root", "root")
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, etcdctl.AuthEnable(ctx))
|
||||
|
||||
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeWritebackBuf", `sleep("3s")`))
|
||||
clus.Procs[0].Etcdctl(e2e.WithAuth("root", "root")).Put(context.Background(), "foo", "bar", config.PutOptions{Timeout: 200 * time.Millisecond})
|
||||
}
|
@ -42,6 +42,7 @@ type GetOptions struct {
|
||||
|
||||
type PutOptions struct {
|
||||
LeaseID clientv3.LeaseID
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
type DeleteOptions struct {
|
||||
|
@ -369,7 +369,7 @@ func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterO
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return StartEtcdProcessCluster(ctx, epc, cfg)
|
||||
return StartEtcdProcessCluster(ctx, t, epc, cfg)
|
||||
}
|
||||
|
||||
// InitEtcdProcessCluster initializes a new cluster based on the given config.
|
||||
@ -409,7 +409,7 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP
|
||||
}
|
||||
|
||||
// StartEtcdProcessCluster launches a new cluster from etcd processes.
|
||||
func StartEtcdProcessCluster(ctx context.Context, epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
|
||||
func StartEtcdProcessCluster(ctx context.Context, t testing.TB, epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
|
||||
if cfg.RollingStart {
|
||||
if err := epc.RollingStart(ctx); err != nil {
|
||||
return nil, fmt.Errorf("cannot rolling-start: %v", err)
|
||||
@ -420,6 +420,13 @@ func StartEtcdProcessCluster(ctx context.Context, epc *EtcdProcessCluster, cfg *
|
||||
}
|
||||
}
|
||||
|
||||
for _, proc := range epc.Procs {
|
||||
if cfg.GoFailEnabled && !proc.Failpoints().Enabled() {
|
||||
epc.Close()
|
||||
t.Skip("please run 'make gofail-enable && make build' before running the test")
|
||||
}
|
||||
}
|
||||
|
||||
return epc, nil
|
||||
}
|
||||
|
||||
|
@ -373,6 +373,14 @@ var httpClient = http.Client{
|
||||
Timeout: 10 * time.Millisecond,
|
||||
}
|
||||
|
||||
func (f *BinaryFailpoints) Enabled() bool {
|
||||
_, err := failpoints(f.member)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *BinaryFailpoints) Available(failpoint string) bool {
|
||||
if f.availableCache == nil {
|
||||
fs, err := failpoints(f.member)
|
||||
|
@ -158,6 +158,9 @@ func (ctl *EtcdctlV3) Put(ctx context.Context, key, value string, opts config.Pu
|
||||
if opts.LeaseID != 0 {
|
||||
args = append(args, "--lease", strconv.FormatInt(int64(opts.LeaseID), 16))
|
||||
}
|
||||
if opts.Timeout != 0 {
|
||||
args = append(args, fmt.Sprintf("--command-timeout=%s", opts.Timeout))
|
||||
}
|
||||
_, err := SpawnWithExpectLines(ctx, args, nil, expect.ExpectedResponse{Value: "OK"})
|
||||
return err
|
||||
}
|
||||
|
@ -169,6 +169,11 @@ func (c integrationClient) Get(ctx context.Context, key string, o config.GetOpti
|
||||
}
|
||||
|
||||
func (c integrationClient) Put(ctx context.Context, key, value string, opts config.PutOptions) error {
|
||||
if opts.Timeout != 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, opts.Timeout)
|
||||
defer cancel()
|
||||
}
|
||||
var clientOpts []clientv3.OpOption
|
||||
if opts.LeaseID != 0 {
|
||||
clientOpts = append(clientOpts, clientv3.WithLease(opts.LeaseID))
|
||||
|
Loading…
x
Reference in New Issue
Block a user