mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
[3.4] backport health check e2e tests.
Signed-off-by: Siyuan Zhang <sizhang@google.com>
This commit is contained in:
parent
2a07f80f77
commit
c43530c402
@ -2179,6 +2179,7 @@ func (s *EtcdServer) apply(
|
|||||||
e := es[i]
|
e := es[i]
|
||||||
switch e.Type {
|
switch e.Type {
|
||||||
case raftpb.EntryNormal:
|
case raftpb.EntryNormal:
|
||||||
|
// gofail: var beforeApplyOneEntryNormal struct{}
|
||||||
s.applyEntryNormal(&e)
|
s.applyEntryNormal(&e)
|
||||||
s.setAppliedIndex(e.Index)
|
s.setAppliedIndex(e.Index)
|
||||||
s.setTerm(e.Term)
|
s.setTerm(e.Term)
|
||||||
|
@ -276,6 +276,7 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {
|
|||||||
func (t *batchTxBuffered) Unlock() {
|
func (t *batchTxBuffered) Unlock() {
|
||||||
if t.pending != 0 {
|
if t.pending != 0 {
|
||||||
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
|
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
|
||||||
|
// gofail: var beforeWritebackBuf struct{}
|
||||||
t.buf.writeback(&t.backend.readTx.buf)
|
t.buf.writeback(&t.backend.readTx.buf)
|
||||||
t.backend.readTx.Unlock()
|
t.backend.readTx.Unlock()
|
||||||
if t.pending >= t.backend.batchLimit {
|
if t.pending >= t.backend.batchLimit {
|
||||||
|
@ -167,3 +167,7 @@ func (ep *ExpectProcess) Send(command string) error {
|
|||||||
_, err := io.WriteString(ep.fpty, command)
|
_, err := io.WriteString(ep.fpty, command)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ep *ExpectProcess) IsRunning() bool {
|
||||||
|
return ep.cmd != nil
|
||||||
|
}
|
||||||
|
@ -30,7 +30,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type proxyEtcdProcess struct {
|
type proxyEtcdProcess struct {
|
||||||
etcdProc etcdProcess
|
*etcdServerProcess
|
||||||
proxyV2 *proxyV2Proc
|
proxyV2 *proxyV2Proc
|
||||||
proxyV3 *proxyV3Proc
|
proxyV3 *proxyV3Proc
|
||||||
}
|
}
|
||||||
@ -45,15 +45,13 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
pep := &proxyEtcdProcess{
|
pep := &proxyEtcdProcess{
|
||||||
etcdProc: ep,
|
etcdServerProcess: ep,
|
||||||
proxyV2: newProxyV2Proc(cfg),
|
proxyV2: newProxyV2Proc(cfg),
|
||||||
proxyV3: newProxyV3Proc(cfg),
|
proxyV3: newProxyV3Proc(cfg),
|
||||||
}
|
}
|
||||||
return pep, nil
|
return pep, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }
|
|
||||||
|
|
||||||
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
|
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
|
||||||
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
|
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
|
||||||
func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() }
|
func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() }
|
||||||
@ -63,7 +61,7 @@ func (p *proxyEtcdProcess) EndpointsMetrics() []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *proxyEtcdProcess) Start() error {
|
func (p *proxyEtcdProcess) Start() error {
|
||||||
if err := p.etcdProc.Start(); err != nil {
|
if err := p.etcdServerProcess.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := p.proxyV2.Start(); err != nil {
|
if err := p.proxyV2.Start(); err != nil {
|
||||||
@ -73,7 +71,7 @@ func (p *proxyEtcdProcess) Start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *proxyEtcdProcess) Restart() error {
|
func (p *proxyEtcdProcess) Restart() error {
|
||||||
if err := p.etcdProc.Restart(); err != nil {
|
if err := p.etcdServerProcess.Restart(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := p.proxyV2.Restart(); err != nil {
|
if err := p.proxyV2.Restart(); err != nil {
|
||||||
@ -87,7 +85,7 @@ func (p *proxyEtcdProcess) Stop() error {
|
|||||||
if v3err := p.proxyV3.Stop(); err == nil {
|
if v3err := p.proxyV3.Stop(); err == nil {
|
||||||
err = v3err
|
err = v3err
|
||||||
}
|
}
|
||||||
if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
|
if eerr := p.etcdServerProcess.Stop(); eerr != nil && err == nil {
|
||||||
// fails on go-grpc issue #1384
|
// fails on go-grpc issue #1384
|
||||||
if !strings.Contains(eerr.Error(), "exit status 2") {
|
if !strings.Contains(eerr.Error(), "exit status 2") {
|
||||||
err = eerr
|
err = eerr
|
||||||
@ -101,7 +99,7 @@ func (p *proxyEtcdProcess) Close() error {
|
|||||||
if v3err := p.proxyV3.Close(); err == nil {
|
if v3err := p.proxyV3.Close(); err == nil {
|
||||||
err = v3err
|
err = v3err
|
||||||
}
|
}
|
||||||
if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
|
if eerr := p.etcdServerProcess.Close(); eerr != nil && err == nil {
|
||||||
// fails on go-grpc issue #1384
|
// fails on go-grpc issue #1384
|
||||||
if !strings.Contains(eerr.Error(), "exit status 2") {
|
if !strings.Contains(eerr.Error(), "exit status 2") {
|
||||||
err = eerr
|
err = eerr
|
||||||
@ -113,11 +111,7 @@ func (p *proxyEtcdProcess) Close() error {
|
|||||||
func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
|
func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
|
||||||
p.proxyV3.WithStopSignal(sig)
|
p.proxyV3.WithStopSignal(sig)
|
||||||
p.proxyV3.WithStopSignal(sig)
|
p.proxyV3.WithStopSignal(sig)
|
||||||
return p.etcdProc.WithStopSignal(sig)
|
return p.etcdServerProcess.WithStopSignal(sig)
|
||||||
}
|
|
||||||
|
|
||||||
func (p *proxyEtcdProcess) Logs() logsExpect {
|
|
||||||
return p.etcdProc.Logs()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type proxyProc struct {
|
type proxyProc struct {
|
||||||
|
@ -20,9 +20,12 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/etcdserver"
|
"go.etcd.io/etcd/etcdserver"
|
||||||
|
"go.etcd.io/etcd/pkg/proxy"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const etcdProcessBasePort = 20000
|
const etcdProcessBasePort = 20000
|
||||||
@ -100,6 +103,8 @@ type etcdProcessClusterConfig struct {
|
|||||||
execPath string
|
execPath string
|
||||||
dataDirPath string
|
dataDirPath string
|
||||||
keepDataDir bool
|
keepDataDir bool
|
||||||
|
goFailEnabled bool
|
||||||
|
peerProxy bool
|
||||||
envVars map[string]string
|
envVars map[string]string
|
||||||
|
|
||||||
clusterSize int
|
clusterSize int
|
||||||
@ -141,13 +146,13 @@ type etcdProcessClusterConfig struct {
|
|||||||
|
|
||||||
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
|
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
|
||||||
// a new etcdProcessCluster once all nodes are ready to accept client requests.
|
// a new etcdProcessCluster once all nodes are ready to accept client requests.
|
||||||
func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
|
func newEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
|
||||||
epc, err := initEtcdProcessCluster(cfg)
|
epc, err := initEtcdProcessCluster(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return startEtcdProcessCluster(epc, cfg)
|
return startEtcdProcessCluster(t, epc, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// `initEtcdProcessCluster` initializes a new cluster based on the given config.
|
// `initEtcdProcessCluster` initializes a new cluster based on the given config.
|
||||||
@ -174,7 +179,7 @@ func initEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// `startEtcdProcessCluster` launches a new cluster from etcd processes.
|
// `startEtcdProcessCluster` launches a new cluster from etcd processes.
|
||||||
func startEtcdProcessCluster(epc *etcdProcessCluster, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
|
func startEtcdProcessCluster(t testing.TB, epc *etcdProcessCluster, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
|
||||||
if err := epc.Start(); err != nil {
|
if err := epc.Start(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -185,6 +190,12 @@ func startEtcdProcessCluster(epc *etcdProcessCluster, cfg *etcdProcessClusterCon
|
|||||||
proc.WithStopSignal(cfg.stopSignal)
|
proc.WithStopSignal(cfg.stopSignal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for _, proc := range epc.procs {
|
||||||
|
if cfg.goFailEnabled && !proc.Failpoints().Enabled() {
|
||||||
|
epc.Close()
|
||||||
|
t.Skip("please run test with 'FAILPOINTS=true'")
|
||||||
|
}
|
||||||
|
}
|
||||||
return epc, nil
|
return epc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,6 +234,8 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
|
|||||||
var curls []string
|
var curls []string
|
||||||
var curl string
|
var curl string
|
||||||
port := cfg.basePort + 5*i
|
port := cfg.basePort + 5*i
|
||||||
|
peerPort := port + 1
|
||||||
|
peer2Port := port + 3
|
||||||
clientPort := port
|
clientPort := port
|
||||||
clientHttpPort := port + 4
|
clientHttpPort := port + 4
|
||||||
|
|
||||||
@ -235,6 +248,20 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
|
|||||||
}
|
}
|
||||||
|
|
||||||
purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
|
purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
|
||||||
|
peerAdvertiseUrl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
|
||||||
|
var proxyCfg *proxy.ServerConfig
|
||||||
|
if cfg.peerProxy {
|
||||||
|
if !cfg.isPeerTLS {
|
||||||
|
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
|
||||||
|
}
|
||||||
|
peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port)
|
||||||
|
proxyCfg = &proxy.ServerConfig{
|
||||||
|
Logger: zap.NewNop(),
|
||||||
|
To: purl,
|
||||||
|
From: peerAdvertiseUrl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
name := fmt.Sprintf("testname%d", i)
|
name := fmt.Sprintf("testname%d", i)
|
||||||
dataDirPath := cfg.dataDirPath
|
dataDirPath := cfg.dataDirPath
|
||||||
if cfg.dataDirPath == "" {
|
if cfg.dataDirPath == "" {
|
||||||
@ -244,14 +271,14 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
|
|||||||
panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
|
panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
|
initialCluster[i] = fmt.Sprintf("%s=%s", name, peerAdvertiseUrl.String())
|
||||||
|
|
||||||
args := []string{
|
args := []string{
|
||||||
"--name", name,
|
"--name", name,
|
||||||
"--listen-client-urls", strings.Join(curls, ","),
|
"--listen-client-urls", strings.Join(curls, ","),
|
||||||
"--advertise-client-urls", strings.Join(curls, ","),
|
"--advertise-client-urls", strings.Join(curls, ","),
|
||||||
"--listen-peer-urls", purl.String(),
|
"--listen-peer-urls", purl.String(),
|
||||||
"--initial-advertise-peer-urls", purl.String(),
|
"--initial-advertise-peer-urls", peerAdvertiseUrl.String(),
|
||||||
"--initial-cluster-token", cfg.initialToken,
|
"--initial-cluster-token", cfg.initialToken,
|
||||||
"--data-dir", dataDirPath,
|
"--data-dir", dataDirPath,
|
||||||
"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
|
"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
|
||||||
@ -309,19 +336,31 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
|
|||||||
args = append(args, "--debug")
|
args = append(args, "--debug")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
envVars := map[string]string{}
|
||||||
|
for key, value := range cfg.envVars {
|
||||||
|
envVars[key] = value
|
||||||
|
}
|
||||||
|
var gofailPort int
|
||||||
|
if cfg.goFailEnabled {
|
||||||
|
gofailPort = (i+1)*10000 + 2381
|
||||||
|
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
|
||||||
|
}
|
||||||
|
|
||||||
etcdCfgs[i] = &etcdServerProcessConfig{
|
etcdCfgs[i] = &etcdServerProcessConfig{
|
||||||
execPath: cfg.execPath,
|
execPath: cfg.execPath,
|
||||||
args: args,
|
args: args,
|
||||||
envVars: cfg.envVars,
|
envVars: envVars,
|
||||||
tlsArgs: cfg.tlsArgs(),
|
tlsArgs: cfg.tlsArgs(),
|
||||||
dataDirPath: dataDirPath,
|
dataDirPath: dataDirPath,
|
||||||
keepDataDir: cfg.keepDataDir,
|
keepDataDir: cfg.keepDataDir,
|
||||||
name: name,
|
name: name,
|
||||||
purl: purl,
|
purl: peerAdvertiseUrl,
|
||||||
acurl: curl,
|
acurl: curl,
|
||||||
murl: murl,
|
murl: murl,
|
||||||
initialToken: cfg.initialToken,
|
initialToken: cfg.initialToken,
|
||||||
clientHttpUrl: clientHttpUrl,
|
clientHttpUrl: clientHttpUrl,
|
||||||
|
goFailPort: gofailPort,
|
||||||
|
proxy: proxyCfg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,7 +71,7 @@ func TestConnectionMultiplexing(t *testing.T) {
|
|||||||
clientHttpSeparate: tc.separateHttpPort,
|
clientHttpSeparate: tc.separateHttpPort,
|
||||||
stopSignal: syscall.SIGTERM, // check graceful stop
|
stopSignal: syscall.SIGTERM, // check graceful stop
|
||||||
}
|
}
|
||||||
clus, err := newEtcdProcessCluster(&cfg)
|
clus, err := newEtcdProcessCluster(t, &cfg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -547,7 +547,7 @@ func setupEtcdctlTest(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool)
|
|||||||
if !quorum {
|
if !quorum {
|
||||||
cfg = configStandalone(*cfg)
|
cfg = configStandalone(*cfg)
|
||||||
}
|
}
|
||||||
epc, err := newEtcdProcessCluster(cfg)
|
epc, err := newEtcdProcessCluster(t, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,7 @@ func TestCtlV3AuthCertCNWithWithConcurrentOperation(t *testing.T) {
|
|||||||
initialToken: "new",
|
initialToken: "new",
|
||||||
}
|
}
|
||||||
|
|
||||||
epc, err := newEtcdProcessCluster(&cx.cfg)
|
epc, err := newEtcdProcessCluster(t, &cx.cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to start etcd cluster: %v", err)
|
t.Fatalf("Failed to start etcd cluster: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ func TestAuthority(t *testing.T) {
|
|||||||
// Enable debug mode to get logs with http2 headers (including authority)
|
// Enable debug mode to get logs with http2 headers (including authority)
|
||||||
cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"}
|
cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"}
|
||||||
|
|
||||||
epc, err := newEtcdProcessCluster(&cfg)
|
epc, err := newEtcdProcessCluster(t, &cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,7 @@ func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvEx
|
|||||||
dialTimeout: 7 * time.Second,
|
dialTimeout: 7 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
mirrorepc, err := newEtcdProcessCluster(&mirrorctx.cfg)
|
mirrorepc, err := newEtcdProcessCluster(cx.t, &mirrorctx.cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cx.t.Fatalf("could not start etcd process cluster (%v)", err)
|
cx.t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||||
}
|
}
|
||||||
|
@ -164,7 +164,7 @@ func TestIssue6361(t *testing.T) {
|
|||||||
os.Setenv("ETCDCTL_API", "3")
|
os.Setenv("ETCDCTL_API", "3")
|
||||||
defer os.Unsetenv("ETCDCTL_API")
|
defer os.Unsetenv("ETCDCTL_API")
|
||||||
|
|
||||||
epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{
|
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
|
||||||
clusterSize: 1,
|
clusterSize: 1,
|
||||||
initialToken: "new",
|
initialToken: "new",
|
||||||
keepDataDir: true,
|
keepDataDir: true,
|
||||||
@ -276,7 +276,7 @@ func TestIssue6361(t *testing.T) {
|
|||||||
func TestRestoreCompactionRevBump(t *testing.T) {
|
func TestRestoreCompactionRevBump(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{
|
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
|
||||||
clusterSize: 1,
|
clusterSize: 1,
|
||||||
initialToken: "new",
|
initialToken: "new",
|
||||||
keepDataDir: true,
|
keepDataDir: true,
|
||||||
|
@ -175,7 +175,7 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
|
|||||||
ret.cfg.initialCorruptCheck = ret.initialCorruptCheck
|
ret.cfg.initialCorruptCheck = ret.initialCorruptCheck
|
||||||
}
|
}
|
||||||
|
|
||||||
epc, err := newEtcdProcessCluster(&ret.cfg)
|
epc, err := newEtcdProcessCluster(t, &ret.cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||||
}
|
}
|
||||||
|
@ -147,7 +147,7 @@ func TestInPlaceRecovery(t *testing.T) {
|
|||||||
corruptCheckTime: time.Second,
|
corruptCheckTime: time.Second,
|
||||||
basePort: basePort,
|
basePort: basePort,
|
||||||
}
|
}
|
||||||
epcOld, err := newEtcdProcessCluster(&cfgOld)
|
epcOld, err := newEtcdProcessCluster(t, &cfgOld)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||||
}
|
}
|
||||||
|
@ -15,12 +15,20 @@
|
|||||||
package e2e
|
package e2e
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/pkg/expect"
|
"go.etcd.io/etcd/pkg/expect"
|
||||||
"go.etcd.io/etcd/pkg/fileutil"
|
"go.etcd.io/etcd/pkg/fileutil"
|
||||||
|
"go.etcd.io/etcd/pkg/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -45,6 +53,9 @@ type etcdProcess interface {
|
|||||||
Config() *etcdServerProcessConfig
|
Config() *etcdServerProcessConfig
|
||||||
|
|
||||||
Logs() logsExpect
|
Logs() logsExpect
|
||||||
|
PeerProxy() proxy.Server
|
||||||
|
Failpoints() *BinaryFailpoints
|
||||||
|
IsRunning() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type logsExpect interface {
|
type logsExpect interface {
|
||||||
@ -54,6 +65,8 @@ type logsExpect interface {
|
|||||||
type etcdServerProcess struct {
|
type etcdServerProcess struct {
|
||||||
cfg *etcdServerProcessConfig
|
cfg *etcdServerProcessConfig
|
||||||
proc *expect.ExpectProcess
|
proc *expect.ExpectProcess
|
||||||
|
proxy proxy.Server
|
||||||
|
failpoints *BinaryFailpoints
|
||||||
donec chan struct{} // closed when Interact() terminates
|
donec chan struct{} // closed when Interact() terminates
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,6 +89,9 @@ type etcdServerProcessConfig struct {
|
|||||||
|
|
||||||
initialToken string
|
initialToken string
|
||||||
initialCluster string
|
initialCluster string
|
||||||
|
|
||||||
|
proxy *proxy.ServerConfig
|
||||||
|
goFailPort int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, error) {
|
func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, error) {
|
||||||
@ -87,7 +103,11 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
|
ep := &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}
|
||||||
|
if cfg.goFailPort != 0 {
|
||||||
|
ep.failpoints = &BinaryFailpoints{member: ep}
|
||||||
|
}
|
||||||
|
return ep, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() }
|
func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() }
|
||||||
@ -105,6 +125,14 @@ func (ep *etcdServerProcess) Start() error {
|
|||||||
if ep.proc != nil {
|
if ep.proc != nil {
|
||||||
panic("already started")
|
panic("already started")
|
||||||
}
|
}
|
||||||
|
if ep.cfg.proxy != nil && ep.proxy == nil {
|
||||||
|
ep.proxy = proxy.NewServer(*ep.cfg.proxy)
|
||||||
|
select {
|
||||||
|
case <-ep.proxy.Ready():
|
||||||
|
case err := <-ep.proxy.Error():
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
proc, err := spawnCmdWithEnv(append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars)
|
proc, err := spawnCmdWithEnv(append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -138,6 +166,13 @@ func (ep *etcdServerProcess) Stop() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if ep.proxy != nil {
|
||||||
|
err = ep.proxy.Close()
|
||||||
|
ep.proxy = nil
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,3 +202,151 @@ func (ep *etcdServerProcess) Logs() logsExpect {
|
|||||||
}
|
}
|
||||||
return ep.proc
|
return ep.proc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ep *etcdServerProcess) PeerProxy() proxy.Server {
|
||||||
|
return ep.proxy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ep *etcdServerProcess) Failpoints() *BinaryFailpoints {
|
||||||
|
return ep.failpoints
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ep *etcdServerProcess) IsRunning() bool {
|
||||||
|
if ep.proc == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if ep.proc.IsRunning() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
ep.proc = nil
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type BinaryFailpoints struct {
|
||||||
|
member etcdProcess
|
||||||
|
availableCache map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *BinaryFailpoints) SetupEnv(failpoint, payload string) error {
|
||||||
|
if f.member.IsRunning() {
|
||||||
|
return errors.New("cannot setup environment variable while process is running")
|
||||||
|
}
|
||||||
|
f.member.Config().envVars["GOFAIL_FAILPOINTS"] = fmt.Sprintf("%s=%s", failpoint, payload)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *BinaryFailpoints) SetupHTTP(ctx context.Context, failpoint, payload string) error {
|
||||||
|
host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().goFailPort)
|
||||||
|
failpointUrl := url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
Host: host,
|
||||||
|
Path: failpoint,
|
||||||
|
}
|
||||||
|
r, err := http.NewRequestWithContext(ctx, "PUT", failpointUrl.String(), bytes.NewBuffer([]byte(payload)))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp, err := httpClient.Do(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusNoContent {
|
||||||
|
return fmt.Errorf("bad status code: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string) error {
|
||||||
|
host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().goFailPort)
|
||||||
|
failpointUrl := url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
Host: host,
|
||||||
|
Path: failpoint,
|
||||||
|
}
|
||||||
|
r, err := http.NewRequestWithContext(ctx, "DELETE", failpointUrl.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp, err := httpClient.Do(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusNoContent {
|
||||||
|
return fmt.Errorf("bad status code: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var httpClient = http.Client{
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *BinaryFailpoints) Enabled() bool {
|
||||||
|
_, err := failpoints(f.member)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *BinaryFailpoints) Available(failpoint string) bool {
|
||||||
|
if f.availableCache == nil {
|
||||||
|
fs, err := failpoints(f.member)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
f.availableCache = fs
|
||||||
|
}
|
||||||
|
_, found := f.availableCache[failpoint]
|
||||||
|
return found
|
||||||
|
}
|
||||||
|
|
||||||
|
func failpoints(member etcdProcess) (map[string]string, error) {
|
||||||
|
body, err := fetchFailpointsBody(member)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer body.Close()
|
||||||
|
return parseFailpointsBody(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchFailpointsBody(member etcdProcess) (io.ReadCloser, error) {
|
||||||
|
address := fmt.Sprintf("127.0.0.1:%d", member.Config().goFailPort)
|
||||||
|
failpointUrl := url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
Host: address,
|
||||||
|
}
|
||||||
|
resp, err := http.Get(failpointUrl.String())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil, fmt.Errorf("invalid status code, %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return resp.Body, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseFailpointsBody(body io.Reader) (map[string]string, error) {
|
||||||
|
data, err := io.ReadAll(body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
lines := strings.Split(string(data), "\n")
|
||||||
|
failpoints := map[string]string{}
|
||||||
|
for _, line := range lines {
|
||||||
|
// Format:
|
||||||
|
// failpoint=value
|
||||||
|
parts := strings.SplitN(line, "=", 2)
|
||||||
|
failpoint := parts[0]
|
||||||
|
var value string
|
||||||
|
if len(parts) == 2 {
|
||||||
|
value = parts[1]
|
||||||
|
}
|
||||||
|
failpoints[failpoint] = value
|
||||||
|
}
|
||||||
|
return failpoints, nil
|
||||||
|
}
|
||||||
|
@ -42,7 +42,7 @@ func TestReleaseUpgrade(t *testing.T) {
|
|||||||
copiedCfg.snapshotCount = 3
|
copiedCfg.snapshotCount = 3
|
||||||
copiedCfg.baseScheme = "unix" // to avoid port conflict
|
copiedCfg.baseScheme = "unix" // to avoid port conflict
|
||||||
|
|
||||||
epc, err := newEtcdProcessCluster(&copiedCfg)
|
epc, err := newEtcdProcessCluster(t, &copiedCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||||
}
|
}
|
||||||
@ -127,7 +127,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) {
|
|||||||
copiedCfg.snapshotCount = 10
|
copiedCfg.snapshotCount = 10
|
||||||
copiedCfg.baseScheme = "unix"
|
copiedCfg.baseScheme = "unix"
|
||||||
|
|
||||||
epc, err := newEtcdProcessCluster(&copiedCfg)
|
epc, err := newEtcdProcessCluster(t, &copiedCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||||
}
|
}
|
||||||
|
@ -53,6 +53,15 @@ func (ctl *Etcdctl) Put(key, value string) error {
|
|||||||
return spawnWithExpect(args, "OK")
|
return spawnWithExpect(args, "OK")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error {
|
||||||
|
if ctl.v2 {
|
||||||
|
panic("Unsupported method for v2")
|
||||||
|
}
|
||||||
|
args := ctl.cmdArgs()
|
||||||
|
args = append(args, "--user", fmt.Sprintf("%s:%s", username, password), "put", key, value)
|
||||||
|
return spawnWithExpect(args, "OK")
|
||||||
|
}
|
||||||
|
|
||||||
func (ctl *Etcdctl) Set(key, value string) error {
|
func (ctl *Etcdctl) Set(key, value string) error {
|
||||||
if !ctl.v2 {
|
if !ctl.v2 {
|
||||||
panic("Unsupported method for v3")
|
panic("Unsupported method for v3")
|
||||||
@ -62,6 +71,32 @@ func (ctl *Etcdctl) Set(key, value string) error {
|
|||||||
return spawnWithExpect(args, value)
|
return spawnWithExpect(args, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ctl *Etcdctl) AuthEnable() error {
|
||||||
|
args := ctl.cmdArgs("auth", "enable")
|
||||||
|
return spawnWithExpect(args, "Authentication Enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctl *Etcdctl) UserGrantRole(user string, role string) (*clientv3.AuthUserGrantRoleResponse, error) {
|
||||||
|
var resp clientv3.AuthUserGrantRoleResponse
|
||||||
|
err := ctl.spawnJsonCmd(&resp, "", "user", "grant-role", user, role)
|
||||||
|
return &resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctl *Etcdctl) UserAdd(name, password string) (*clientv3.AuthUserAddResponse, error) {
|
||||||
|
args := []string{"user", "add"}
|
||||||
|
if password == "" {
|
||||||
|
args = append(args, name)
|
||||||
|
args = append(args, "--no-password")
|
||||||
|
} else {
|
||||||
|
args = append(args, fmt.Sprintf("%s:%s", name, password))
|
||||||
|
}
|
||||||
|
args = append(args, "--interactive=false")
|
||||||
|
|
||||||
|
var resp clientv3.AuthUserAddResponse
|
||||||
|
err := ctl.spawnJsonCmd(&resp, "", args...)
|
||||||
|
return &resp, err
|
||||||
|
}
|
||||||
|
|
||||||
func (ctl *Etcdctl) AlarmList() (*clientv3.AlarmResponse, error) {
|
func (ctl *Etcdctl) AlarmList() (*clientv3.AlarmResponse, error) {
|
||||||
if ctl.v2 {
|
if ctl.v2 {
|
||||||
panic("Unsupported method for v2")
|
panic("Unsupported method for v2")
|
||||||
|
@ -27,7 +27,7 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestGateway(t *testing.T) {
|
func TestGateway(t *testing.T) {
|
||||||
ec, err := newEtcdProcessCluster(&configNoTLS)
|
ec, err := newEtcdProcessCluster(t, &configNoTLS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
444
tests/e2e/http_health_check_test.go
Normal file
444
tests/e2e/http_health_check_test.go
Normal file
@ -0,0 +1,444 @@
|
|||||||
|
// Copyright 2023 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//go:build !cluster_proxy
|
||||||
|
|
||||||
|
package e2e
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
healthCheckTimeout = 3 * time.Second
|
||||||
|
putCommandTimeout = 200 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
|
type healthCheckConfig struct {
|
||||||
|
url string
|
||||||
|
expectedStatusCode int
|
||||||
|
expectedTimeoutError bool
|
||||||
|
expectedRespSubStrings []string
|
||||||
|
}
|
||||||
|
|
||||||
|
type injectFailure func(ctx context.Context, t *testing.T, clus *etcdProcessCluster, duration time.Duration)
|
||||||
|
|
||||||
|
func TestHTTPHealthHandler(t *testing.T) {
|
||||||
|
client := &http.Client{}
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
injectFailure injectFailure
|
||||||
|
clusterConfig etcdProcessClusterConfig
|
||||||
|
healthChecks []healthCheckConfig
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no failures", // happy case
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 1},
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/health",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "activated no space alarm",
|
||||||
|
injectFailure: triggerNoSpaceAlarm,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 1, quotaBackendBytes: int64(13 * os.Getpagesize())},
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/health",
|
||||||
|
expectedStatusCode: http.StatusServiceUnavailable,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/health?exclude=NOSPACE",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "overloaded server slow apply",
|
||||||
|
injectFailure: triggerSlowApply,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 3, goFailEnabled: true},
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/health?serializable=true",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/health?serializable=false",
|
||||||
|
expectedTimeoutError: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "network partitioned",
|
||||||
|
injectFailure: blackhole,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 3, isPeerTLS: true, peerProxy: true},
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/health?serializable=true",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/health?serializable=false",
|
||||||
|
expectedTimeoutError: true,
|
||||||
|
expectedStatusCode: http.StatusServiceUnavailable,
|
||||||
|
// old leader may return "etcdserver: leader changed" error with 503 in ReadIndex leaderChangedNotifier
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "raft loop deadlock",
|
||||||
|
injectFailure: triggerRaftLoopDeadLock,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 1, goFailEnabled: true},
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
// current kubeadm etcd liveness check failed to detect raft loop deadlock in steady state
|
||||||
|
// ref. https://github.com/kubernetes/kubernetes/blob/master/cmd/kubeadm/app/phases/etcd/local.go#L225-L226
|
||||||
|
// current liveness probe depends on the etcd /health check has a flaw that new /livez check should resolve.
|
||||||
|
url: "/health?serializable=true",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/health?serializable=false",
|
||||||
|
expectedTimeoutError: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// verify that auth enabled serializable read must go through mvcc
|
||||||
|
{
|
||||||
|
name: "slow buffer write back with auth enabled",
|
||||||
|
injectFailure: triggerSlowBufferWriteBackWithAuth,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 1, goFailEnabled: true},
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/health?serializable=true",
|
||||||
|
expectedTimeoutError: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
cx := getDefaultCtlCtx(t)
|
||||||
|
cx.cfg = tc.clusterConfig
|
||||||
|
|
||||||
|
clus, err := newEtcdProcessCluster(t, &cx.cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer clus.Close()
|
||||||
|
executeUntil(ctx, t, func() {
|
||||||
|
if tc.injectFailure != nil {
|
||||||
|
// guaranteed that failure point is active until all the health checks timeout.
|
||||||
|
duration := time.Duration(len(tc.healthChecks)+10) * healthCheckTimeout
|
||||||
|
tc.injectFailure(ctx, t, clus, duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, hc := range tc.healthChecks {
|
||||||
|
requestURL := clus.procs[0].EndpointsHTTP()[0] + hc.url
|
||||||
|
t.Logf("health check URL is %s", requestURL)
|
||||||
|
doHealthCheckAndVerify(t, client, requestURL, hc.expectedTimeoutError, hc.expectedStatusCode, hc.expectedRespSubStrings)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultHealthCheckConfigs = []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/livez",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
expectedRespSubStrings: []string{`ok`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/readyz",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
expectedRespSubStrings: []string{`ok`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/livez?verbose=true",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
expectedRespSubStrings: []string{`[+]serializable_read ok`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/readyz?verbose=true",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
expectedRespSubStrings: []string{
|
||||||
|
`[+]serializable_read ok`,
|
||||||
|
`[+]data_corruption ok`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHTTPLivezReadyzHandler(t *testing.T) {
|
||||||
|
client := &http.Client{}
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
injectFailure injectFailure
|
||||||
|
clusterConfig etcdProcessClusterConfig
|
||||||
|
healthChecks []healthCheckConfig
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no failures", // happy case
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 1},
|
||||||
|
healthChecks: defaultHealthCheckConfigs,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "activated no space alarm",
|
||||||
|
injectFailure: triggerNoSpaceAlarm,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 1, quotaBackendBytes: int64(13 * os.Getpagesize())},
|
||||||
|
healthChecks: defaultHealthCheckConfigs,
|
||||||
|
},
|
||||||
|
// Readiness is not an indicator of performance. Slow response is not covered by readiness.
|
||||||
|
// refer to https://tinyurl.com/livez-readyz-design-doc or https://github.com/etcd-io/etcd/issues/16007#issuecomment-1726541091 in case tinyurl is down.
|
||||||
|
{
|
||||||
|
name: "overloaded server slow apply",
|
||||||
|
injectFailure: triggerSlowApply,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 3, goFailEnabled: true},
|
||||||
|
// TODO expected behavior of readyz check should be 200 after ReadIndex check is implemented to replace linearizable read.
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/livez",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/readyz",
|
||||||
|
expectedTimeoutError: true,
|
||||||
|
expectedStatusCode: http.StatusServiceUnavailable,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "network partitioned",
|
||||||
|
injectFailure: blackhole,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 3, isPeerTLS: true, peerProxy: true},
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/livez",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/readyz",
|
||||||
|
expectedTimeoutError: true,
|
||||||
|
expectedStatusCode: http.StatusServiceUnavailable,
|
||||||
|
expectedRespSubStrings: []string{
|
||||||
|
`[-]linearizable_read failed: etcdserver: leader changed`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "raft loop deadlock",
|
||||||
|
injectFailure: triggerRaftLoopDeadLock,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 1, goFailEnabled: true},
|
||||||
|
// TODO expected behavior of livez check should be 503 or timeout after RaftLoopDeadLock check is implemented.
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/livez",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/readyz",
|
||||||
|
expectedTimeoutError: true,
|
||||||
|
expectedStatusCode: http.StatusServiceUnavailable,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// verify that auth enabled serializable read must go through mvcc
|
||||||
|
{
|
||||||
|
name: "slow buffer write back with auth enabled",
|
||||||
|
injectFailure: triggerSlowBufferWriteBackWithAuth,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 1, goFailEnabled: true},
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/livez",
|
||||||
|
expectedTimeoutError: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/readyz",
|
||||||
|
expectedTimeoutError: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "corrupt",
|
||||||
|
injectFailure: triggerCorrupt,
|
||||||
|
clusterConfig: etcdProcessClusterConfig{clusterSize: 3, corruptCheckTime: time.Second},
|
||||||
|
healthChecks: []healthCheckConfig{
|
||||||
|
{
|
||||||
|
url: "/livez?verbose=true",
|
||||||
|
expectedStatusCode: http.StatusOK,
|
||||||
|
expectedRespSubStrings: []string{`[+]serializable_read ok`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
url: "/readyz",
|
||||||
|
expectedStatusCode: http.StatusServiceUnavailable,
|
||||||
|
expectedRespSubStrings: []string{
|
||||||
|
`[+]serializable_read ok`,
|
||||||
|
`[-]data_corruption failed: alarm activated: CORRUPT`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
cx := getDefaultCtlCtx(t)
|
||||||
|
cx.cfg = tc.clusterConfig
|
||||||
|
|
||||||
|
clus, err := newEtcdProcessCluster(t, &cx.cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer clus.Close()
|
||||||
|
executeUntil(ctx, t, func() {
|
||||||
|
if tc.injectFailure != nil {
|
||||||
|
// guaranteed that failure point is active until all the health checks timeout.
|
||||||
|
duration := time.Duration(len(tc.healthChecks)+10) * healthCheckTimeout
|
||||||
|
tc.injectFailure(ctx, t, clus, duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, hc := range tc.healthChecks {
|
||||||
|
requestURL := clus.procs[0].EndpointsHTTP()[0] + hc.url
|
||||||
|
t.Logf("health check URL is %s", requestURL)
|
||||||
|
doHealthCheckAndVerify(t, client, requestURL, hc.expectedTimeoutError, hc.expectedStatusCode, hc.expectedRespSubStrings)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expectTimeoutError bool, expectStatusCode int, expectRespSubStrings []string) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout)
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||||
|
require.NoErrorf(t, err, "failed to creat request %+v", err)
|
||||||
|
resp, herr := client.Do(req)
|
||||||
|
cancel()
|
||||||
|
if expectTimeoutError {
|
||||||
|
if herr != nil && strings.Contains(herr.Error(), context.DeadlineExceeded.Error()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NoErrorf(t, herr, "failed to get response %+v", err)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
require.NoErrorf(t, err, "failed to read response %+v", err)
|
||||||
|
|
||||||
|
t.Logf("health check response body is:\n%s", body)
|
||||||
|
require.Equal(t, expectStatusCode, resp.StatusCode)
|
||||||
|
for _, expectRespSubString := range expectRespSubStrings {
|
||||||
|
require.Contains(t, string(body), expectRespSubString)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *etcdProcessCluster, _ time.Duration) {
|
||||||
|
buf := strings.Repeat("b", os.Getpagesize())
|
||||||
|
etcdctl := NewEtcdctl(clus.procs[0].EndpointsV3(), clientNonTLS, false, false)
|
||||||
|
for {
|
||||||
|
if err := etcdctl.Put("foo", buf); err != nil {
|
||||||
|
if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func triggerSlowApply(ctx context.Context, t *testing.T, clus *etcdProcessCluster, duration time.Duration) {
|
||||||
|
// the following proposal will be blocked at applying stage
|
||||||
|
// because when apply index < committed index, linearizable read would time out.
|
||||||
|
require.NoError(t, clus.procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", fmt.Sprintf(`sleep("%s")`, duration)))
|
||||||
|
etcdctl := NewEtcdctl(clus.procs[1].EndpointsV3(), clientNonTLS, false, false)
|
||||||
|
etcdctl.Put("foo", "bar")
|
||||||
|
}
|
||||||
|
|
||||||
|
func blackhole(_ context.Context, t *testing.T, clus *etcdProcessCluster, _ time.Duration) {
|
||||||
|
member := clus.procs[0]
|
||||||
|
proxy := member.PeerProxy()
|
||||||
|
t.Logf("Blackholing traffic from and to member %q", member.Config().name)
|
||||||
|
proxy.BlackholeTx()
|
||||||
|
proxy.BlackholeRx()
|
||||||
|
}
|
||||||
|
|
||||||
|
func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *etcdProcessCluster, duration time.Duration) {
|
||||||
|
require.NoError(t, clus.procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSaveWaitWalSync", fmt.Sprintf(`sleep("%s")`, duration)))
|
||||||
|
etcdctl := NewEtcdctl(clus.procs[0].EndpointsV3(), clientNonTLS, false, false)
|
||||||
|
etcdctl.Put("foo", "bar")
|
||||||
|
}
|
||||||
|
|
||||||
|
func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *etcdProcessCluster, duration time.Duration) {
|
||||||
|
etcdctl := NewEtcdctl(clus.procs[0].EndpointsV3(), clientNonTLS, false, false)
|
||||||
|
|
||||||
|
_, err := etcdctl.UserAdd("root", "root")
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = etcdctl.UserGrantRole("root", "root")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, etcdctl.AuthEnable())
|
||||||
|
|
||||||
|
require.NoError(t, clus.procs[0].Failpoints().SetupHTTP(ctx, "beforeWritebackBuf", fmt.Sprintf(`sleep("%s")`, duration)))
|
||||||
|
etcdctl.PutWithAuth("foo", "bar", "root", "root")
|
||||||
|
}
|
||||||
|
|
||||||
|
func triggerCorrupt(ctx context.Context, t *testing.T, clus *etcdProcessCluster, _ time.Duration) {
|
||||||
|
etcdctl := NewEtcdctl(clus.procs[0].EndpointsV3(), clientNonTLS, false, false)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
require.NoError(t, etcdctl.Put("foo", "bar"))
|
||||||
|
}
|
||||||
|
err := clus.procs[0].Stop()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = corruptBBolt(path.Join(clus.procs[0].Config().dataDirPath, "member", "snap", "db"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = clus.procs[0].Start()
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
require.NoError(t, err)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
response, err := etcdctl.AlarmList()
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(response.Alarms) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
require.Len(t, response.Alarms, 1)
|
||||||
|
if response.Alarms[0].Alarm == etcdserverpb.AlarmType_CORRUPT {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -15,13 +15,20 @@
|
|||||||
package e2e
|
package e2e
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/expect"
|
"go.etcd.io/etcd/pkg/expect"
|
||||||
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error {
|
func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error {
|
||||||
@ -108,3 +115,60 @@ func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error {
|
|||||||
func toTLS(s string) string {
|
func toTLS(s string) string {
|
||||||
return strings.Replace(s, "http://", "https://", 1)
|
return strings.Replace(s, "http://", "https://", 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func executeUntil(ctx context.Context, t *testing.T, f func()) {
|
||||||
|
deadline, deadlineSet := ctx.Deadline()
|
||||||
|
timeout := time.Until(deadline)
|
||||||
|
donec := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(donec)
|
||||||
|
f()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
msg := ctx.Err().Error()
|
||||||
|
if deadlineSet {
|
||||||
|
msg = fmt.Sprintf("test timed out after %v, err: %v", timeout, msg)
|
||||||
|
}
|
||||||
|
testutil.FatalStack(t, msg)
|
||||||
|
case <-donec:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func corruptBBolt(fpath string) error {
|
||||||
|
db, derr := bbolt.Open(fpath, os.ModePerm, &bbolt.Options{})
|
||||||
|
if derr != nil {
|
||||||
|
return derr
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
return db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte("key"))
|
||||||
|
if b == nil {
|
||||||
|
return errors.New("got nil bucket for 'key'")
|
||||||
|
}
|
||||||
|
keys, vals := [][]byte{}, [][]byte{}
|
||||||
|
c := b.Cursor()
|
||||||
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||||
|
keys = append(keys, k)
|
||||||
|
var kv mvccpb.KeyValue
|
||||||
|
if uerr := kv.Unmarshal(v); uerr != nil {
|
||||||
|
return uerr
|
||||||
|
}
|
||||||
|
kv.Key[0]++
|
||||||
|
kv.Value[0]++
|
||||||
|
v2, v2err := kv.Marshal()
|
||||||
|
if v2err != nil {
|
||||||
|
return v2err
|
||||||
|
}
|
||||||
|
vals = append(vals, v2)
|
||||||
|
}
|
||||||
|
for i := range keys {
|
||||||
|
if perr := b.Put(keys[i], vals[i]); perr != nil {
|
||||||
|
return perr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -38,7 +38,7 @@ func testCurlPutGet(t *testing.T, cfg *etcdProcessClusterConfig) {
|
|||||||
cfg = configStandalone(*cfg)
|
cfg = configStandalone(*cfg)
|
||||||
|
|
||||||
cfg.enableV2 = true
|
cfg.enableV2 = true
|
||||||
epc, err := newEtcdProcessCluster(cfg)
|
epc, err := newEtcdProcessCluster(t, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) {
|
|||||||
|
|
||||||
// Step 2: create the cluster
|
// Step 2: create the cluster
|
||||||
t.Log("Creating an etcd cluster")
|
t.Log("Creating an etcd cluster")
|
||||||
epc, err := newEtcdProcessCluster(&cx.cfg)
|
epc, err := newEtcdProcessCluster(t, &cx.cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to start etcd cluster: %v", err)
|
t.Fatalf("Failed to start etcd cluster: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) {
|
|||||||
tc := tc
|
tc := tc
|
||||||
tc.config.WatchProcessNotifyInterval = watchResponsePeriod
|
tc.config.WatchProcessNotifyInterval = watchResponsePeriod
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
clus, err := newEtcdProcessCluster(&tc.config)
|
clus, err := newEtcdProcessCluster(t, &tc.config)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer clus.Close()
|
defer clus.Close()
|
||||||
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
|
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
|
||||||
@ -106,7 +106,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) {
|
|||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
for _, tc := range tcs {
|
for _, tc := range tcs {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
clus, err := newEtcdProcessCluster(&tc.config)
|
clus, err := newEtcdProcessCluster(t, &tc.config)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer clus.Close()
|
defer clus.Close()
|
||||||
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
|
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
|
||||||
@ -138,7 +138,7 @@ func TestWatchDelayForEvent(t *testing.T) {
|
|||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
for _, tc := range tcs {
|
for _, tc := range tcs {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
clus, err := newEtcdProcessCluster(&tc.config)
|
clus, err := newEtcdProcessCluster(t, &tc.config)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer clus.Close()
|
defer clus.Close()
|
||||||
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
|
c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user