mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
[3.5] backport health check e2e tests.
Signed-off-by: Siyuan Zhang <sizhang@google.com>
This commit is contained in:
parent
fa0eb078fa
commit
b8d5e79fc1
@ -203,3 +203,7 @@ func (ep *ExpectProcess) Lines() []string {
|
||||
defer ep.mu.Unlock()
|
||||
return ep.lines
|
||||
}
|
||||
|
||||
func (ep *ExpectProcess) IsRunning() bool {
|
||||
return ep.cmd != nil
|
||||
}
|
||||
|
@ -2152,6 +2152,7 @@ func (s *EtcdServer) apply(
|
||||
zap.Stringer("type", e.Type))
|
||||
switch e.Type {
|
||||
case raftpb.EntryNormal:
|
||||
// gofail: var beforeApplyOneEntryNormal struct{}
|
||||
s.applyEntryNormal(&e)
|
||||
s.setAppliedIndex(e.Index)
|
||||
s.setTerm(e.Term)
|
||||
|
@ -307,6 +307,7 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {
|
||||
func (t *batchTxBuffered) Unlock() {
|
||||
if t.pending != 0 {
|
||||
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
|
||||
// gofail: var beforeWritebackBuf struct{}
|
||||
t.buf.writeback(&t.backend.readTx.buf)
|
||||
t.backend.readTx.Unlock()
|
||||
if t.pending >= t.backend.batchLimit {
|
||||
|
@ -55,6 +55,15 @@ func (ctl *Etcdctl) Put(key, value string) error {
|
||||
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK")
|
||||
}
|
||||
|
||||
func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error {
|
||||
if ctl.v2 {
|
||||
panic("Unsupported method for v2")
|
||||
}
|
||||
args := ctl.cmdArgs()
|
||||
args = append(args, "--user", fmt.Sprintf("%s:%s", username, password), "put", key, value)
|
||||
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK")
|
||||
}
|
||||
|
||||
func (ctl *Etcdctl) Set(key, value string) error {
|
||||
if !ctl.v2 {
|
||||
panic("Unsupported method for v3")
|
||||
@ -72,6 +81,32 @@ func (ctl *Etcdctl) Set(key, value string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ctl *Etcdctl) AuthEnable() error {
|
||||
args := ctl.cmdArgs("auth", "enable")
|
||||
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled")
|
||||
}
|
||||
|
||||
func (ctl *Etcdctl) UserGrantRole(user string, role string) (*clientv3.AuthUserGrantRoleResponse, error) {
|
||||
var resp clientv3.AuthUserGrantRoleResponse
|
||||
err := ctl.spawnJsonCmd(&resp, "user", "grant-role", user, role)
|
||||
return &resp, err
|
||||
}
|
||||
|
||||
func (ctl *Etcdctl) UserAdd(name, password string) (*clientv3.AuthUserAddResponse, error) {
|
||||
args := []string{"user", "add"}
|
||||
if password == "" {
|
||||
args = append(args, name)
|
||||
args = append(args, "--no-password")
|
||||
} else {
|
||||
args = append(args, fmt.Sprintf("%s:%s", name, password))
|
||||
}
|
||||
args = append(args, "--interactive=false")
|
||||
|
||||
var resp clientv3.AuthUserAddResponse
|
||||
err := ctl.spawnJsonCmd(&resp, args...)
|
||||
return &resp, err
|
||||
}
|
||||
|
||||
func (ctl *Etcdctl) AlarmList() (*clientv3.AlarmResponse, error) {
|
||||
if ctl.v2 {
|
||||
panic("Unsupported method for v2")
|
||||
|
440
tests/e2e/http_health_check_test.go
Normal file
440
tests/e2e/http_health_check_test.go
Normal file
@ -0,0 +1,440 @@
|
||||
// Copyright 2023 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !cluster_proxy
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
const (
|
||||
healthCheckTimeout = 2 * time.Second
|
||||
putCommandTimeout = 200 * time.Millisecond
|
||||
)
|
||||
|
||||
type healthCheckConfig struct {
|
||||
url string
|
||||
expectedStatusCode int
|
||||
expectedTimeoutError bool
|
||||
expectedRespSubStrings []string
|
||||
}
|
||||
|
||||
type injectFailure func(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration)
|
||||
|
||||
func TestHTTPHealthHandler(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
client := &http.Client{}
|
||||
tcs := []struct {
|
||||
name string
|
||||
injectFailure injectFailure
|
||||
clusterConfig e2e.EtcdProcessClusterConfig
|
||||
healthChecks []healthCheckConfig
|
||||
}{
|
||||
{
|
||||
name: "no failures", // happy case
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "activated no space alarm",
|
||||
injectFailure: triggerNoSpaceAlarm,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, QuotaBackendBytes: int64(13 * os.Getpagesize())},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health",
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
{
|
||||
url: "/health?exclude=NOSPACE",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "overloaded server slow apply",
|
||||
injectFailure: triggerSlowApply,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, GoFailEnabled: true},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health?serializable=true",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/health?serializable=false",
|
||||
expectedTimeoutError: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "network partitioned",
|
||||
injectFailure: blackhole,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, IsPeerTLS: true, PeerProxy: true},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health?serializable=true",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/health?serializable=false",
|
||||
expectedTimeoutError: true,
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
// old leader may return "etcdserver: leader changed" error with 503 in ReadIndex leaderChangedNotifier
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "raft loop deadlock",
|
||||
injectFailure: triggerRaftLoopDeadLock,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, GoFailEnabled: true},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
// current kubeadm etcd liveness check failed to detect raft loop deadlock in steady state
|
||||
// ref. https://github.com/kubernetes/kubernetes/blob/master/cmd/kubeadm/app/phases/etcd/local.go#L225-L226
|
||||
// current liveness probe depends on the etcd /health check has a flaw that new /livez check should resolve.
|
||||
url: "/health?serializable=true",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/health?serializable=false",
|
||||
expectedTimeoutError: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
// verify that auth enabled serializable read must go through mvcc
|
||||
{
|
||||
name: "slow buffer write back with auth enabled",
|
||||
injectFailure: triggerSlowBufferWriteBackWithAuth,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, GoFailEnabled: true},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/health?serializable=true",
|
||||
expectedTimeoutError: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
clus, err := e2e.NewEtcdProcessCluster(t, &tc.clusterConfig)
|
||||
require.NoError(t, err)
|
||||
defer clus.Close()
|
||||
e2e.ExecuteUntil(ctx, t, func() {
|
||||
if tc.injectFailure != nil {
|
||||
// guaranteed that failure point is active until all the health checks timeout.
|
||||
duration := time.Duration(len(tc.healthChecks)+10) * healthCheckTimeout
|
||||
tc.injectFailure(ctx, t, clus, duration)
|
||||
}
|
||||
|
||||
for _, hc := range tc.healthChecks {
|
||||
requestURL := clus.Procs[0].EndpointsHTTP()[0] + hc.url
|
||||
t.Logf("health check URL is %s", requestURL)
|
||||
doHealthCheckAndVerify(t, client, requestURL, hc.expectedTimeoutError, hc.expectedStatusCode, hc.expectedRespSubStrings)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
defaultHealthCheckConfigs = []healthCheckConfig{
|
||||
{
|
||||
url: "/livez",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
expectedRespSubStrings: []string{`ok`},
|
||||
},
|
||||
{
|
||||
url: "/readyz",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
expectedRespSubStrings: []string{`ok`},
|
||||
},
|
||||
{
|
||||
url: "/livez?verbose=true",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
expectedRespSubStrings: []string{`[+]serializable_read ok`},
|
||||
},
|
||||
{
|
||||
url: "/readyz?verbose=true",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
expectedRespSubStrings: []string{
|
||||
`[+]serializable_read ok`,
|
||||
`[+]data_corruption ok`,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func TestHTTPLivezReadyzHandler(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
client := &http.Client{}
|
||||
tcs := []struct {
|
||||
name string
|
||||
injectFailure injectFailure
|
||||
clusterConfig e2e.EtcdProcessClusterConfig
|
||||
healthChecks []healthCheckConfig
|
||||
}{
|
||||
{
|
||||
name: "no failures", // happy case
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1},
|
||||
healthChecks: defaultHealthCheckConfigs,
|
||||
},
|
||||
{
|
||||
name: "activated no space alarm",
|
||||
injectFailure: triggerNoSpaceAlarm,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, QuotaBackendBytes: int64(13 * os.Getpagesize())},
|
||||
healthChecks: defaultHealthCheckConfigs,
|
||||
},
|
||||
// Readiness is not an indicator of performance. Slow response is not covered by readiness.
|
||||
// refer to https://tinyurl.com/livez-readyz-design-doc or https://github.com/etcd-io/etcd/issues/16007#issuecomment-1726541091 in case tinyurl is down.
|
||||
{
|
||||
name: "overloaded server slow apply",
|
||||
injectFailure: triggerSlowApply,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, GoFailEnabled: true},
|
||||
// TODO expected behavior of readyz check should be 200 after ReadIndex check is implemented to replace linearizable read.
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/livez",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/readyz",
|
||||
expectedTimeoutError: true,
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "network partitioned",
|
||||
injectFailure: blackhole,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, IsPeerTLS: true, PeerProxy: true},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/livez",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/readyz",
|
||||
expectedTimeoutError: true,
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
expectedRespSubStrings: []string{
|
||||
`[-]linearizable_read failed: etcdserver: leader changed`,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "raft loop deadlock",
|
||||
injectFailure: triggerRaftLoopDeadLock,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, GoFailEnabled: true},
|
||||
// TODO expected behavior of livez check should be 503 or timeout after RaftLoopDeadLock check is implemented.
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/livez",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/readyz",
|
||||
expectedTimeoutError: true,
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
},
|
||||
},
|
||||
// verify that auth enabled serializable read must go through mvcc
|
||||
{
|
||||
name: "slow buffer write back with auth enabled",
|
||||
injectFailure: triggerSlowBufferWriteBackWithAuth,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, GoFailEnabled: true},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/livez",
|
||||
expectedTimeoutError: true,
|
||||
},
|
||||
{
|
||||
url: "/readyz",
|
||||
expectedTimeoutError: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "corrupt",
|
||||
injectFailure: triggerCorrupt,
|
||||
clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, CorruptCheckTime: time.Second},
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/livez?verbose=true",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
expectedRespSubStrings: []string{`[+]serializable_read ok`},
|
||||
},
|
||||
{
|
||||
url: "/readyz",
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
expectedRespSubStrings: []string{
|
||||
`[+]serializable_read ok`,
|
||||
`[-]data_corruption failed: alarm activated: CORRUPT`,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
clus, err := e2e.NewEtcdProcessCluster(t, &tc.clusterConfig)
|
||||
require.NoError(t, err)
|
||||
defer clus.Close()
|
||||
e2e.ExecuteUntil(ctx, t, func() {
|
||||
if tc.injectFailure != nil {
|
||||
// guaranteed that failure point is active until all the health checks timeout.
|
||||
duration := time.Duration(len(tc.healthChecks)+10) * healthCheckTimeout
|
||||
tc.injectFailure(ctx, t, clus, duration)
|
||||
}
|
||||
|
||||
for _, hc := range tc.healthChecks {
|
||||
requestURL := clus.Procs[0].EndpointsHTTP()[0] + hc.url
|
||||
t.Logf("health check URL is %s", requestURL)
|
||||
doHealthCheckAndVerify(t, client, requestURL, hc.expectedTimeoutError, hc.expectedStatusCode, hc.expectedRespSubStrings)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expectTimeoutError bool, expectStatusCode int, expectRespSubStrings []string) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
require.NoErrorf(t, err, "failed to creat request %+v", err)
|
||||
resp, herr := client.Do(req)
|
||||
cancel()
|
||||
if expectTimeoutError {
|
||||
if herr != nil && strings.Contains(herr.Error(), context.DeadlineExceeded.Error()) {
|
||||
return
|
||||
}
|
||||
}
|
||||
require.NoErrorf(t, herr, "failed to get response %+v", err)
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
require.NoErrorf(t, err, "failed to read response %+v", err)
|
||||
|
||||
t.Logf("health check response body is:\n%s", body)
|
||||
require.Equal(t, expectStatusCode, resp.StatusCode)
|
||||
for _, expectRespSubString := range expectRespSubStrings {
|
||||
require.Contains(t, string(body), expectRespSubString)
|
||||
}
|
||||
}
|
||||
|
||||
func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
|
||||
buf := strings.Repeat("b", os.Getpagesize())
|
||||
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
|
||||
for {
|
||||
if err := etcdctl.Put("foo", buf); err != nil {
|
||||
if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
|
||||
// the following proposal will be blocked at applying stage
|
||||
// because when apply index < committed index, linearizable read would time out.
|
||||
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", fmt.Sprintf(`sleep("%s")`, duration)))
|
||||
etcdctl := NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false)
|
||||
etcdctl.Put("foo", "bar")
|
||||
}
|
||||
|
||||
func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
|
||||
member := clus.Procs[0]
|
||||
proxy := member.PeerProxy()
|
||||
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
|
||||
proxy.BlackholeTx()
|
||||
proxy.BlackholeRx()
|
||||
}
|
||||
|
||||
func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
|
||||
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSaveWaitWalSync", fmt.Sprintf(`sleep("%s")`, duration)))
|
||||
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
|
||||
etcdctl.Put("foo", "bar")
|
||||
}
|
||||
|
||||
func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
|
||||
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
|
||||
|
||||
_, err := etcdctl.UserAdd("root", "root")
|
||||
require.NoError(t, err)
|
||||
_, err = etcdctl.UserGrantRole("root", "root")
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, etcdctl.AuthEnable())
|
||||
|
||||
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeWritebackBuf", fmt.Sprintf(`sleep("%s")`, duration)))
|
||||
etcdctl.PutWithAuth("foo", "bar", "root", "root")
|
||||
}
|
||||
|
||||
func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
|
||||
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
|
||||
for i := 0; i < 10; i++ {
|
||||
require.NoError(t, etcdctl.Put("foo", "bar"))
|
||||
}
|
||||
err := clus.Procs[0].Stop()
|
||||
require.NoError(t, err)
|
||||
err = testutil.CorruptBBolt(path.Join(clus.Procs[0].Config().DataDirPath, "member", "snap", "db"))
|
||||
require.NoError(t, err)
|
||||
err = clus.Procs[0].Start()
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
require.NoError(t, err)
|
||||
default:
|
||||
}
|
||||
response, err := etcdctl.AlarmList()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if len(response.Alarms) == 0 {
|
||||
continue
|
||||
}
|
||||
require.Len(t, response.Alarms, 1)
|
||||
if response.Alarms[0].Alarm == etcdserverpb.AlarmType_CORRUPT {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
@ -23,6 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/proxy"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
@ -136,10 +137,12 @@ type EtcdProcessCluster struct {
|
||||
}
|
||||
|
||||
type EtcdProcessClusterConfig struct {
|
||||
ExecPath string
|
||||
DataDirPath string
|
||||
KeepDataDir bool
|
||||
EnvVars map[string]string
|
||||
ExecPath string
|
||||
DataDirPath string
|
||||
KeepDataDir bool
|
||||
GoFailEnabled bool
|
||||
PeerProxy bool
|
||||
EnvVars map[string]string
|
||||
|
||||
ClusterSize int
|
||||
|
||||
@ -189,7 +192,7 @@ func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdPr
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return StartEtcdProcessCluster(epc, cfg)
|
||||
return StartEtcdProcessCluster(t, epc, cfg)
|
||||
}
|
||||
|
||||
// InitEtcdProcessCluster initializes a new cluster based on the given config.
|
||||
@ -217,7 +220,7 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP
|
||||
}
|
||||
|
||||
// StartEtcdProcessCluster launches a new cluster from etcd processes.
|
||||
func StartEtcdProcessCluster(epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
|
||||
func StartEtcdProcessCluster(t testing.TB, epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
|
||||
if cfg.RollingStart {
|
||||
if err := epc.RollingStart(); err != nil {
|
||||
return nil, fmt.Errorf("Cannot rolling-start: %v", err)
|
||||
@ -227,6 +230,13 @@ func StartEtcdProcessCluster(epc *EtcdProcessCluster, cfg *EtcdProcessClusterCon
|
||||
return nil, fmt.Errorf("Cannot start: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, proc := range epc.Procs {
|
||||
if cfg.GoFailEnabled && !proc.Failpoints().Enabled() {
|
||||
epc.Close()
|
||||
t.Skip("please run 'make gofail-enable && make build' before running the test")
|
||||
}
|
||||
}
|
||||
return epc, nil
|
||||
}
|
||||
|
||||
@ -268,6 +278,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
|
||||
var curl string
|
||||
port := cfg.BasePort + 5*i
|
||||
clientPort := port
|
||||
peerPort := port + 1
|
||||
peer2Port := port + 3
|
||||
clientHttpPort := port + 4
|
||||
|
||||
if cfg.ClientTLS == ClientTLSAndNonTLS {
|
||||
@ -278,20 +290,34 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
|
||||
curls = []string{curl}
|
||||
}
|
||||
|
||||
purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
|
||||
purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
|
||||
peerAdvertiseUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
|
||||
var proxyCfg *proxy.ServerConfig
|
||||
if cfg.PeerProxy {
|
||||
if !cfg.IsPeerTLS {
|
||||
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
|
||||
}
|
||||
peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port)
|
||||
proxyCfg = &proxy.ServerConfig{
|
||||
Logger: zap.NewNop(),
|
||||
To: purl,
|
||||
From: peerAdvertiseUrl,
|
||||
}
|
||||
}
|
||||
|
||||
name := fmt.Sprintf("test-%d", i)
|
||||
dataDirPath := cfg.DataDirPath
|
||||
if cfg.DataDirPath == "" {
|
||||
dataDirPath = tb.TempDir()
|
||||
}
|
||||
initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
|
||||
initialCluster[i] = fmt.Sprintf("%s=%s", name, peerAdvertiseUrl.String())
|
||||
|
||||
args := []string{
|
||||
"--name", name,
|
||||
"--listen-client-urls", strings.Join(curls, ","),
|
||||
"--advertise-client-urls", strings.Join(curls, ","),
|
||||
"--listen-peer-urls", purl.String(),
|
||||
"--initial-advertise-peer-urls", purl.String(),
|
||||
"--initial-advertise-peer-urls", peerAdvertiseUrl.String(),
|
||||
"--initial-cluster-token", cfg.InitialToken,
|
||||
"--data-dir", dataDirPath,
|
||||
"--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount),
|
||||
@ -362,20 +388,32 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
|
||||
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
|
||||
}
|
||||
|
||||
envVars := map[string]string{}
|
||||
for key, value := range cfg.EnvVars {
|
||||
envVars[key] = value
|
||||
}
|
||||
var gofailPort int
|
||||
if cfg.GoFailEnabled {
|
||||
gofailPort = (i+1)*10000 + 2381
|
||||
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
|
||||
}
|
||||
|
||||
etcdCfgs[i] = &EtcdServerProcessConfig{
|
||||
lg: lg,
|
||||
ExecPath: cfg.ExecPath,
|
||||
Args: args,
|
||||
EnvVars: cfg.EnvVars,
|
||||
EnvVars: envVars,
|
||||
TlsArgs: cfg.TlsArgs(),
|
||||
DataDirPath: dataDirPath,
|
||||
KeepDataDir: cfg.KeepDataDir,
|
||||
Name: name,
|
||||
Purl: purl,
|
||||
Purl: peerAdvertiseUrl,
|
||||
Acurl: curl,
|
||||
Murl: murl,
|
||||
InitialToken: cfg.InitialToken,
|
||||
ClientHttpUrl: clientHttpUrl,
|
||||
GoFailPort: gofailPort,
|
||||
Proxy: proxyCfg,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,9 +31,9 @@ import (
|
||||
)
|
||||
|
||||
type proxyEtcdProcess struct {
|
||||
etcdProc EtcdProcess
|
||||
proxyV2 *proxyV2Proc
|
||||
proxyV3 *proxyV3Proc
|
||||
*EtcdServerProcess
|
||||
proxyV2 *proxyV2Proc
|
||||
proxyV3 *proxyV3Proc
|
||||
}
|
||||
|
||||
func NewEtcdProcess(cfg *EtcdServerProcessConfig) (EtcdProcess, error) {
|
||||
@ -46,14 +46,14 @@ func NewProxyEtcdProcess(cfg *EtcdServerProcessConfig) (*proxyEtcdProcess, error
|
||||
return nil, err
|
||||
}
|
||||
pep := &proxyEtcdProcess{
|
||||
etcdProc: ep,
|
||||
proxyV2: newProxyV2Proc(cfg),
|
||||
proxyV3: newProxyV3Proc(cfg),
|
||||
EtcdServerProcess: ep,
|
||||
proxyV2: newProxyV2Proc(cfg),
|
||||
proxyV3: newProxyV3Proc(cfg),
|
||||
}
|
||||
return pep, nil
|
||||
}
|
||||
|
||||
func (p *proxyEtcdProcess) Config() *EtcdServerProcessConfig { return p.etcdProc.Config() }
|
||||
func (p *proxyEtcdProcess) Config() *EtcdServerProcessConfig { return p.EtcdServerProcess.Config() }
|
||||
|
||||
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
|
||||
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
|
||||
@ -64,7 +64,7 @@ func (p *proxyEtcdProcess) EndpointsMetrics() []string {
|
||||
}
|
||||
|
||||
func (p *proxyEtcdProcess) Start() error {
|
||||
if err := p.etcdProc.Start(); err != nil {
|
||||
if err := p.EtcdServerProcess.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.proxyV2.Start(); err != nil {
|
||||
@ -74,7 +74,7 @@ func (p *proxyEtcdProcess) Start() error {
|
||||
}
|
||||
|
||||
func (p *proxyEtcdProcess) Restart() error {
|
||||
if err := p.etcdProc.Restart(); err != nil {
|
||||
if err := p.EtcdServerProcess.Restart(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.proxyV2.Restart(); err != nil {
|
||||
@ -88,7 +88,7 @@ func (p *proxyEtcdProcess) Stop() error {
|
||||
if v3err := p.proxyV3.Stop(); err == nil {
|
||||
err = v3err
|
||||
}
|
||||
if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
|
||||
if eerr := p.EtcdServerProcess.Stop(); eerr != nil && err == nil {
|
||||
// fails on go-grpc issue #1384
|
||||
if !strings.Contains(eerr.Error(), "exit status 2") {
|
||||
err = eerr
|
||||
@ -102,7 +102,7 @@ func (p *proxyEtcdProcess) Close() error {
|
||||
if v3err := p.proxyV3.Close(); err == nil {
|
||||
err = v3err
|
||||
}
|
||||
if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
|
||||
if eerr := p.EtcdServerProcess.Close(); eerr != nil && err == nil {
|
||||
// fails on go-grpc issue #1384
|
||||
if !strings.Contains(eerr.Error(), "exit status 2") {
|
||||
err = eerr
|
||||
@ -114,11 +114,11 @@ func (p *proxyEtcdProcess) Close() error {
|
||||
func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
|
||||
p.proxyV3.WithStopSignal(sig)
|
||||
p.proxyV3.WithStopSignal(sig)
|
||||
return p.etcdProc.WithStopSignal(sig)
|
||||
return p.EtcdServerProcess.WithStopSignal(sig)
|
||||
}
|
||||
|
||||
func (p *proxyEtcdProcess) Logs() LogsExpect {
|
||||
return p.etcdProc.Logs()
|
||||
return p.EtcdServerProcess.Logs()
|
||||
}
|
||||
|
||||
type proxyProc struct {
|
||||
|
@ -15,12 +15,20 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||
"go.etcd.io/etcd/pkg/v3/expect"
|
||||
"go.etcd.io/etcd/pkg/v3/proxy"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -46,6 +54,10 @@ type EtcdProcess interface {
|
||||
WithStopSignal(sig os.Signal) os.Signal
|
||||
Config() *EtcdServerProcessConfig
|
||||
Logs() LogsExpect
|
||||
|
||||
PeerProxy() proxy.Server
|
||||
Failpoints() *BinaryFailpoints
|
||||
IsRunning() bool
|
||||
}
|
||||
|
||||
type LogsExpect interface {
|
||||
@ -55,9 +67,11 @@ type LogsExpect interface {
|
||||
}
|
||||
|
||||
type EtcdServerProcess struct {
|
||||
cfg *EtcdServerProcessConfig
|
||||
proc *expect.ExpectProcess
|
||||
donec chan struct{} // closed when Interact() terminates
|
||||
cfg *EtcdServerProcessConfig
|
||||
proc *expect.ExpectProcess
|
||||
proxy proxy.Server
|
||||
failpoints *BinaryFailpoints
|
||||
donec chan struct{} // closed when Interact() terminates
|
||||
}
|
||||
|
||||
type EtcdServerProcessConfig struct {
|
||||
@ -80,6 +94,8 @@ type EtcdServerProcessConfig struct {
|
||||
|
||||
InitialToken string
|
||||
InitialCluster string
|
||||
GoFailPort int
|
||||
Proxy *proxy.ServerConfig
|
||||
}
|
||||
|
||||
func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {
|
||||
@ -91,7 +107,11 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
|
||||
ep := &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}
|
||||
if cfg.GoFailPort != 0 {
|
||||
ep.failpoints = &BinaryFailpoints{member: ep}
|
||||
}
|
||||
return ep, nil
|
||||
}
|
||||
|
||||
func (ep *EtcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() }
|
||||
@ -109,6 +129,15 @@ func (ep *EtcdServerProcess) Start() error {
|
||||
if ep.proc != nil {
|
||||
panic("already started")
|
||||
}
|
||||
if ep.cfg.Proxy != nil && ep.proxy == nil {
|
||||
ep.cfg.lg.Info("starting proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.Proxy.From.String()), zap.String("to", ep.cfg.Proxy.To.String()))
|
||||
ep.proxy = proxy.NewServer(*ep.cfg.Proxy)
|
||||
select {
|
||||
case <-ep.proxy.Ready():
|
||||
case err := <-ep.proxy.Error():
|
||||
return err
|
||||
}
|
||||
}
|
||||
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name))
|
||||
proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars)
|
||||
if err != nil {
|
||||
@ -154,6 +183,14 @@ func (ep *EtcdServerProcess) Stop() (err error) {
|
||||
}
|
||||
}
|
||||
ep.cfg.lg.Info("stopped server.", zap.String("name", ep.cfg.Name))
|
||||
if ep.proxy != nil {
|
||||
ep.cfg.lg.Info("stopping proxy...", zap.String("name", ep.cfg.Name))
|
||||
err = ep.proxy.Close()
|
||||
ep.proxy = nil
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -188,3 +225,154 @@ func (ep *EtcdServerProcess) Logs() LogsExpect {
|
||||
}
|
||||
return ep.proc
|
||||
}
|
||||
|
||||
func (ep *EtcdServerProcess) PeerProxy() proxy.Server {
|
||||
return ep.proxy
|
||||
}
|
||||
|
||||
func (ep *EtcdServerProcess) Failpoints() *BinaryFailpoints {
|
||||
return ep.failpoints
|
||||
}
|
||||
|
||||
func (ep *EtcdServerProcess) IsRunning() bool {
|
||||
if ep.proc == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if ep.proc.IsRunning() {
|
||||
return true
|
||||
}
|
||||
|
||||
ep.cfg.lg.Info("server exited",
|
||||
zap.String("name", ep.cfg.Name))
|
||||
ep.proc = nil
|
||||
return false
|
||||
}
|
||||
|
||||
type BinaryFailpoints struct {
|
||||
member EtcdProcess
|
||||
availableCache map[string]string
|
||||
}
|
||||
|
||||
func (f *BinaryFailpoints) SetupEnv(failpoint, payload string) error {
|
||||
if f.member.IsRunning() {
|
||||
return errors.New("cannot setup environment variable while process is running")
|
||||
}
|
||||
f.member.Config().EnvVars["GOFAIL_FAILPOINTS"] = fmt.Sprintf("%s=%s", failpoint, payload)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *BinaryFailpoints) SetupHTTP(ctx context.Context, failpoint, payload string) error {
|
||||
host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().GoFailPort)
|
||||
failpointUrl := url.URL{
|
||||
Scheme: "http",
|
||||
Host: host,
|
||||
Path: failpoint,
|
||||
}
|
||||
r, err := http.NewRequestWithContext(ctx, "PUT", failpointUrl.String(), bytes.NewBuffer([]byte(payload)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := httpClient.Do(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
return fmt.Errorf("bad status code: %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string) error {
|
||||
host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().GoFailPort)
|
||||
failpointUrl := url.URL{
|
||||
Scheme: "http",
|
||||
Host: host,
|
||||
Path: failpoint,
|
||||
}
|
||||
r, err := http.NewRequestWithContext(ctx, "DELETE", failpointUrl.String(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := httpClient.Do(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
return fmt.Errorf("bad status code: %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var httpClient = http.Client{
|
||||
Timeout: 1 * time.Second,
|
||||
}
|
||||
|
||||
func (f *BinaryFailpoints) Enabled() bool {
|
||||
_, err := failpoints(f.member)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *BinaryFailpoints) Available(failpoint string) bool {
|
||||
if f.availableCache == nil {
|
||||
fs, err := failpoints(f.member)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
f.availableCache = fs
|
||||
}
|
||||
_, found := f.availableCache[failpoint]
|
||||
return found
|
||||
}
|
||||
|
||||
func failpoints(member EtcdProcess) (map[string]string, error) {
|
||||
body, err := fetchFailpointsBody(member)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer body.Close()
|
||||
return parseFailpointsBody(body)
|
||||
}
|
||||
|
||||
func fetchFailpointsBody(member EtcdProcess) (io.ReadCloser, error) {
|
||||
address := fmt.Sprintf("127.0.0.1:%d", member.Config().GoFailPort)
|
||||
failpointUrl := url.URL{
|
||||
Scheme: "http",
|
||||
Host: address,
|
||||
}
|
||||
resp, err := http.Get(failpointUrl.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("invalid status code, %d", resp.StatusCode)
|
||||
}
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
func parseFailpointsBody(body io.Reader) (map[string]string, error) {
|
||||
data, err := io.ReadAll(body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lines := strings.Split(string(data), "\n")
|
||||
failpoints := map[string]string{}
|
||||
for _, line := range lines {
|
||||
// Format:
|
||||
// failpoint=value
|
||||
parts := strings.SplitN(line, "=", 2)
|
||||
failpoint := parts[0]
|
||||
var value string
|
||||
if len(parts) == 2 {
|
||||
value = parts[1]
|
||||
}
|
||||
failpoints[failpoint] = value
|
||||
}
|
||||
return failpoints, nil
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
@ -126,3 +127,23 @@ func ToTLS(s string) string {
|
||||
func SkipInShortMode(t testing.TB) {
|
||||
testutil.SkipTestIfShortMode(t, "e2e tests are not running in --short mode")
|
||||
}
|
||||
|
||||
func ExecuteUntil(ctx context.Context, t *testing.T, f func()) {
|
||||
deadline, deadlineSet := ctx.Deadline()
|
||||
timeout := time.Until(deadline)
|
||||
donec := make(chan struct{})
|
||||
go func() {
|
||||
defer close(donec)
|
||||
f()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
msg := ctx.Err().Error()
|
||||
if deadlineSet {
|
||||
msg = fmt.Sprintf("test timed out after %v, err: %v", timeout, msg)
|
||||
}
|
||||
testutil.FatalStack(t, msg)
|
||||
case <-donec:
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user