mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
tests: Implement network delay and blackholing in linearizability tests
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
de8b914eb0
commit
064fad5ae4
@ -212,7 +212,7 @@ func testIssue6361(t *testing.T) {
|
|||||||
|
|
||||||
newDataDir := filepath.Join(t.TempDir(), "test.data")
|
newDataDir := filepath.Join(t.TempDir(), "test.data")
|
||||||
t.Log("etcdctl restoring the snapshot...")
|
t.Log("etcdctl restoring the snapshot...")
|
||||||
err = e2e.SpawnWithExpect([]string{e2e.BinPath.Etcdutl, "snapshot", "restore", fpath, "--name", epc.Procs[0].Config().Name, "--initial-cluster", epc.Procs[0].Config().InitialCluster, "--initial-cluster-token", epc.Procs[0].Config().InitialToken, "--initial-advertise-peer-urls", epc.Procs[0].Config().Purl.String(), "--data-dir", newDataDir}, "added member")
|
err = e2e.SpawnWithExpect([]string{e2e.BinPath.Etcdutl, "snapshot", "restore", fpath, "--name", epc.Procs[0].Config().Name, "--initial-cluster", epc.Procs[0].Config().InitialCluster, "--initial-cluster-token", epc.Procs[0].Config().InitialToken, "--initial-advertise-peer-urls", epc.Procs[0].Config().PeerURL.String(), "--data-dir", newDataDir}, "added member")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ func TestGrpcProxyAutoSync(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
node1ClientURL = epc.Procs[0].Config().Acurl
|
node1ClientURL = epc.Procs[0].Config().ClientURL
|
||||||
proxyClientURL = "127.0.0.1:32379"
|
proxyClientURL = "127.0.0.1:32379"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -67,11 +67,11 @@ func TestGrpcProxyAutoSync(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Wait for auto sync of endpoints
|
// Wait for auto sync of endpoints
|
||||||
err = waitForEndpointInLog(ctx, proxyProc, epc.Procs[1].Config().Acurl)
|
err = waitForEndpointInLog(ctx, proxyProc, epc.Procs[1].Config().ClientURL)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = epc.CloseProc(ctx, func(proc e2e.EtcdProcess) bool {
|
err = epc.CloseProc(ctx, func(proc e2e.EtcdProcess) bool {
|
||||||
return proc.Config().Acurl == node1ClientURL
|
return proc.Config().ClientURL == node1ClientURL
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
|
"go.etcd.io/etcd/pkg/v3/proxy"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||||
)
|
)
|
||||||
@ -182,6 +183,7 @@ type EtcdProcessClusterConfig struct {
|
|||||||
|
|
||||||
WarningUnaryRequestDuration time.Duration
|
WarningUnaryRequestDuration time.Duration
|
||||||
ExperimentalWarningUnaryRequestDuration time.Duration
|
ExperimentalWarningUnaryRequestDuration time.Duration
|
||||||
|
PeerProxy bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultConfig() *EtcdProcessClusterConfig {
|
func DefaultConfig() *EtcdProcessClusterConfig {
|
||||||
@ -329,6 +331,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption {
|
|||||||
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
|
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithPeerProxy(enabled bool) EPClusterOption {
|
||||||
|
return func(c *EtcdProcessClusterConfig) { c.PeerProxy = enabled }
|
||||||
|
}
|
||||||
|
|
||||||
// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
|
// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
|
||||||
// a new EtcdProcessCluster once all nodes are ready to accept client requests.
|
// a new EtcdProcessCluster once all nodes are ready to accept client requests.
|
||||||
func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterOption) (*EtcdProcessCluster, error) {
|
func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterOption) (*EtcdProcessCluster, error) {
|
||||||
@ -416,7 +422,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdAllServerProcessConfigs(tb testing.TB)
|
|||||||
|
|
||||||
for i := 0; i < cfg.ClusterSize; i++ {
|
for i := 0; i < cfg.ClusterSize; i++ {
|
||||||
etcdCfgs[i] = cfg.EtcdServerProcessConfig(tb, i)
|
etcdCfgs[i] = cfg.EtcdServerProcessConfig(tb, i)
|
||||||
initialCluster[i] = fmt.Sprintf("%s=%s", etcdCfgs[i].Name, etcdCfgs[i].Purl.String())
|
initialCluster[i] = fmt.Sprintf("%s=%s", etcdCfgs[i].Name, etcdCfgs[i].PeerURL.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range etcdCfgs {
|
for i := range etcdCfgs {
|
||||||
@ -443,8 +449,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
|||||||
var curls []string
|
var curls []string
|
||||||
var curl, curltls string
|
var curl, curltls string
|
||||||
port := cfg.BasePort + 5*i
|
port := cfg.BasePort + 5*i
|
||||||
curlHost := fmt.Sprintf("localhost:%d", port)
|
clientPort := port
|
||||||
|
peerPort := port + 1
|
||||||
|
metricsPort := port + 2
|
||||||
|
peer2Port := port + 3
|
||||||
|
|
||||||
|
curlHost := fmt.Sprintf("localhost:%d", clientPort)
|
||||||
switch cfg.Client.ConnectionType {
|
switch cfg.Client.ConnectionType {
|
||||||
case ClientNonTLS, ClientTLS:
|
case ClientNonTLS, ClientTLS:
|
||||||
curl = (&url.URL{Scheme: cfg.ClientScheme(), Host: curlHost}).String()
|
curl = (&url.URL{Scheme: cfg.ClientScheme(), Host: curlHost}).String()
|
||||||
@ -455,7 +465,17 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
|||||||
curls = []string{curl, curltls}
|
curls = []string{curl, curltls}
|
||||||
}
|
}
|
||||||
|
|
||||||
purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
|
peerListenUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
|
||||||
|
peerAdvertiseUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
|
||||||
|
var proxyCfg *proxy.ServerConfig
|
||||||
|
if cfg.PeerProxy {
|
||||||
|
peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port)
|
||||||
|
proxyCfg = &proxy.ServerConfig{
|
||||||
|
Logger: zap.NewNop(),
|
||||||
|
To: peerListenUrl,
|
||||||
|
From: peerAdvertiseUrl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)
|
name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)
|
||||||
|
|
||||||
@ -475,8 +495,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
|||||||
"--name", name,
|
"--name", name,
|
||||||
"--listen-client-urls", strings.Join(curls, ","),
|
"--listen-client-urls", strings.Join(curls, ","),
|
||||||
"--advertise-client-urls", strings.Join(curls, ","),
|
"--advertise-client-urls", strings.Join(curls, ","),
|
||||||
"--listen-peer-urls", purl.String(),
|
"--listen-peer-urls", peerListenUrl.String(),
|
||||||
"--initial-advertise-peer-urls", purl.String(),
|
"--initial-advertise-peer-urls", peerAdvertiseUrl.String(),
|
||||||
"--initial-cluster-token", cfg.InitialToken,
|
"--initial-cluster-token", cfg.InitialToken,
|
||||||
"--data-dir", dataDirPath,
|
"--data-dir", dataDirPath,
|
||||||
"--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount),
|
"--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount),
|
||||||
@ -503,7 +523,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
|||||||
if cfg.MetricsURLScheme != "" {
|
if cfg.MetricsURLScheme != "" {
|
||||||
murl = (&url.URL{
|
murl = (&url.URL{
|
||||||
Scheme: cfg.MetricsURLScheme,
|
Scheme: cfg.MetricsURLScheme,
|
||||||
Host: fmt.Sprintf("localhost:%d", port+2),
|
Host: fmt.Sprintf("localhost:%d", metricsPort),
|
||||||
}).String()
|
}).String()
|
||||||
args = append(args, "--listen-metrics-urls", murl)
|
args = append(args, "--listen-metrics-urls", murl)
|
||||||
}
|
}
|
||||||
@ -590,11 +610,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
|||||||
DataDirPath: dataDirPath,
|
DataDirPath: dataDirPath,
|
||||||
KeepDataDir: cfg.KeepDataDir,
|
KeepDataDir: cfg.KeepDataDir,
|
||||||
Name: name,
|
Name: name,
|
||||||
Purl: purl,
|
PeerURL: peerAdvertiseUrl,
|
||||||
Acurl: curl,
|
ClientURL: curl,
|
||||||
Murl: murl,
|
MetricsURL: murl,
|
||||||
InitialToken: cfg.InitialToken,
|
InitialToken: cfg.InitialToken,
|
||||||
GoFailPort: gofailPort,
|
GoFailPort: gofailPort,
|
||||||
|
Proxy: proxyCfg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -687,7 +708,7 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr
|
|||||||
return fmt.Errorf("failed to get member list: %w", err)
|
return fmt.Errorf("failed to get member list: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
memberID, err := findMemberIDByEndpoint(memberList.Members, proc.Config().Acurl)
|
memberID, err := findMemberIDByEndpoint(memberList.Members, proc.Config().ClientURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to find member ID: %w", err)
|
return fmt.Errorf("failed to find member ID: %w", err)
|
||||||
}
|
}
|
||||||
@ -707,7 +728,7 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr
|
|||||||
return errors.New("failed to remove member after 10 tries")
|
return errors.New("failed to remove member after 10 tries")
|
||||||
}
|
}
|
||||||
|
|
||||||
epc.lg.Info("successfully removed member", zap.String("acurl", proc.Config().Acurl))
|
epc.lg.Info("successfully removed member", zap.String("acurl", proc.Config().ClientURL))
|
||||||
|
|
||||||
// Then stop process
|
// Then stop process
|
||||||
return proc.Close()
|
return proc.Close()
|
||||||
@ -724,17 +745,17 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces
|
|||||||
epc.nextSeq++
|
epc.nextSeq++
|
||||||
|
|
||||||
initialCluster := []string{
|
initialCluster := []string{
|
||||||
fmt.Sprintf("%s=%s", serverCfg.Name, serverCfg.Purl.String()),
|
fmt.Sprintf("%s=%s", serverCfg.Name, serverCfg.PeerURL.String()),
|
||||||
}
|
}
|
||||||
for _, p := range epc.Procs {
|
for _, p := range epc.Procs {
|
||||||
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().Purl.String()))
|
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().PeerURL.String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "existing")
|
epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "existing")
|
||||||
|
|
||||||
// First add new member to cluster
|
// First add new member to cluster
|
||||||
memberCtl := epc.Client(opts...)
|
memberCtl := epc.Client(opts...)
|
||||||
_, err := memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.Purl.String()})
|
_, err := memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.PeerURL.String()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to add new member: %w", err)
|
return fmt.Errorf("failed to add new member: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"go.etcd.io/etcd/pkg/v3/expect"
|
"go.etcd.io/etcd/pkg/v3/expect"
|
||||||
|
"go.etcd.io/etcd/pkg/v3/proxy"
|
||||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -124,6 +125,10 @@ func (p *proxyEtcdProcess) Wait(ctx context.Context) error {
|
|||||||
return p.etcdProc.Wait(ctx)
|
return p.etcdProc.Wait(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *proxyEtcdProcess) PeerProxy() proxy.Server {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type proxyProc struct {
|
type proxyProc struct {
|
||||||
lg *zap.Logger
|
lg *zap.Logger
|
||||||
name string
|
name string
|
||||||
@ -188,7 +193,7 @@ type proxyV2Proc struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func proxyListenURL(cfg *EtcdServerProcessConfig, portOffset int) string {
|
func proxyListenURL(cfg *EtcdServerProcessConfig, portOffset int) string {
|
||||||
u, err := url.Parse(cfg.Acurl)
|
u, err := url.Parse(cfg.ClientURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -206,7 +211,7 @@ func newProxyV2Proc(cfg *EtcdServerProcessConfig) *proxyV2Proc {
|
|||||||
"--name", name,
|
"--name", name,
|
||||||
"--proxy", "on",
|
"--proxy", "on",
|
||||||
"--listen-client-urls", listenAddr,
|
"--listen-client-urls", listenAddr,
|
||||||
"--initial-cluster", cfg.Name + "=" + cfg.Purl.String(),
|
"--initial-cluster", cfg.Name + "=" + cfg.PeerURL.String(),
|
||||||
"--data-dir", dataDir,
|
"--data-dir", dataDir,
|
||||||
}
|
}
|
||||||
return &proxyV2Proc{
|
return &proxyV2Proc{
|
||||||
@ -232,13 +237,13 @@ func newProxyV3Proc(cfg *EtcdServerProcessConfig) *proxyV3Proc {
|
|||||||
"grpc-proxy",
|
"grpc-proxy",
|
||||||
"start",
|
"start",
|
||||||
"--listen-addr", strings.Split(listenAddr, "/")[2],
|
"--listen-addr", strings.Split(listenAddr, "/")[2],
|
||||||
"--endpoints", cfg.Acurl,
|
"--endpoints", cfg.ClientURL,
|
||||||
// pass-through member RPCs
|
// pass-through member RPCs
|
||||||
"--advertise-client-url", "",
|
"--advertise-client-url", "",
|
||||||
"--data-dir", cfg.DataDirPath,
|
"--data-dir", cfg.DataDirPath,
|
||||||
}
|
}
|
||||||
murl := ""
|
murl := ""
|
||||||
if cfg.Murl != "" {
|
if cfg.MetricsURL != "" {
|
||||||
murl = proxyListenURL(cfg, 4)
|
murl = proxyListenURL(cfg, 4)
|
||||||
args = append(args, "--metrics-addr", murl)
|
args = append(args, "--metrics-addr", murl)
|
||||||
}
|
}
|
||||||
|
@ -54,7 +54,7 @@ func (r CURLReq) timeoutDuration() time.Duration {
|
|||||||
func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method string, req CURLReq) []string {
|
func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method string, req CURLReq) []string {
|
||||||
var (
|
var (
|
||||||
cmdArgs = []string{"curl"}
|
cmdArgs = []string{"curl"}
|
||||||
acurl = member.Config().Acurl
|
acurl = member.Config().ClientURL
|
||||||
)
|
)
|
||||||
if req.MetricsURLScheme != "https" {
|
if req.MetricsURLScheme != "https" {
|
||||||
if req.IsTLS {
|
if req.IsTLS {
|
||||||
@ -62,7 +62,7 @@ func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method st
|
|||||||
panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
|
panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
|
||||||
}
|
}
|
||||||
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
|
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
|
||||||
acurl = ToTLS(member.Config().Acurl)
|
acurl = ToTLS(member.Config().ClientURL)
|
||||||
} else if cfg.Client.ConnectionType == ClientTLS {
|
} else if cfg.Client.ConnectionType == ClientTLS {
|
||||||
if cfg.CN {
|
if cfg.CN {
|
||||||
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
|
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
|
|
||||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||||
"go.etcd.io/etcd/pkg/v3/expect"
|
"go.etcd.io/etcd/pkg/v3/expect"
|
||||||
|
"go.etcd.io/etcd/pkg/v3/proxy"
|
||||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -49,6 +50,7 @@ type EtcdProcess interface {
|
|||||||
Stop() error
|
Stop() error
|
||||||
Close() error
|
Close() error
|
||||||
Config() *EtcdServerProcessConfig
|
Config() *EtcdServerProcessConfig
|
||||||
|
PeerProxy() proxy.Server
|
||||||
Logs() LogsExpect
|
Logs() LogsExpect
|
||||||
Kill() error
|
Kill() error
|
||||||
}
|
}
|
||||||
@ -62,6 +64,7 @@ type LogsExpect interface {
|
|||||||
type EtcdServerProcess struct {
|
type EtcdServerProcess struct {
|
||||||
cfg *EtcdServerProcessConfig
|
cfg *EtcdServerProcessConfig
|
||||||
proc *expect.ExpectProcess
|
proc *expect.ExpectProcess
|
||||||
|
proxy proxy.Server
|
||||||
donec chan struct{} // closed when Interact() terminates
|
donec chan struct{} // closed when Interact() terminates
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,14 +81,15 @@ type EtcdServerProcessConfig struct {
|
|||||||
|
|
||||||
Name string
|
Name string
|
||||||
|
|
||||||
Purl url.URL
|
PeerURL url.URL
|
||||||
|
ClientURL string
|
||||||
Acurl string
|
MetricsURL string
|
||||||
Murl string
|
|
||||||
|
|
||||||
InitialToken string
|
InitialToken string
|
||||||
InitialCluster string
|
InitialCluster string
|
||||||
GoFailPort int
|
GoFailPort int
|
||||||
|
|
||||||
|
Proxy *proxy.ServerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {
|
func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {
|
||||||
@ -100,9 +104,9 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err
|
|||||||
return &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
|
return &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.Acurl} }
|
func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.ClientURL} }
|
||||||
func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
|
func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
|
||||||
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} }
|
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.MetricsURL} }
|
||||||
|
|
||||||
func (epc *EtcdServerProcess) Client(opts ...config.ClientOption) *EtcdctlV3 {
|
func (epc *EtcdServerProcess) Client(opts ...config.ClientOption) *EtcdctlV3 {
|
||||||
etcdctl, err := NewEtcdctl(epc.Config().Client, epc.EndpointsV3(), opts...)
|
etcdctl, err := NewEtcdctl(epc.Config().Client, epc.EndpointsV3(), opts...)
|
||||||
@ -117,6 +121,15 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error {
|
|||||||
if ep.proc != nil {
|
if ep.proc != nil {
|
||||||
panic("already started")
|
panic("already started")
|
||||||
}
|
}
|
||||||
|
if ep.cfg.Proxy != nil && ep.proxy == nil {
|
||||||
|
ep.cfg.lg.Info("starting proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.Proxy.From.String()), zap.String("to", ep.cfg.Proxy.To.String()))
|
||||||
|
ep.proxy = proxy.NewServer(*ep.cfg.Proxy)
|
||||||
|
select {
|
||||||
|
case <-ep.proxy.Ready():
|
||||||
|
case err := <-ep.proxy.Error():
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name))
|
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name))
|
||||||
proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name)
|
proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -161,8 +174,8 @@ func (ep *EtcdServerProcess) Stop() (err error) {
|
|||||||
}
|
}
|
||||||
<-ep.donec
|
<-ep.donec
|
||||||
ep.donec = make(chan struct{})
|
ep.donec = make(chan struct{})
|
||||||
if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" {
|
if ep.cfg.PeerURL.Scheme == "unix" || ep.cfg.PeerURL.Scheme == "unixs" {
|
||||||
err = os.Remove(ep.cfg.Purl.Host + ep.cfg.Purl.Path)
|
err = os.Remove(ep.cfg.PeerURL.Host + ep.cfg.PeerURL.Path)
|
||||||
if err != nil && !os.IsNotExist(err) {
|
if err != nil && !os.IsNotExist(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -176,6 +189,15 @@ func (ep *EtcdServerProcess) Close() error {
|
|||||||
if err := ep.Stop(); err != nil {
|
if err := ep.Stop(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if ep.proxy != nil {
|
||||||
|
ep.cfg.lg.Info("closing proxy...", zap.String("name", ep.cfg.Name))
|
||||||
|
err := ep.proxy.Close()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ep.proxy = nil
|
||||||
|
}
|
||||||
|
|
||||||
if !ep.cfg.KeepDataDir {
|
if !ep.cfg.KeepDataDir {
|
||||||
ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath))
|
ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath))
|
||||||
return os.RemoveAll(ep.cfg.DataDirPath)
|
return os.RemoveAll(ep.cfg.DataDirPath)
|
||||||
@ -243,3 +265,7 @@ func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ep *EtcdServerProcess) PeerProxy() proxy.Server {
|
||||||
|
return ep.proxy
|
||||||
|
}
|
||||||
|
@ -56,6 +56,8 @@ var (
|
|||||||
CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember}
|
CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember}
|
||||||
CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember}
|
CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember}
|
||||||
RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader}
|
RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader}
|
||||||
|
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{duration: time.Second}
|
||||||
|
DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond}
|
||||||
RandomFailpoint Failpoint = randomFailpoint{[]Failpoint{
|
RandomFailpoint Failpoint = randomFailpoint{[]Failpoint{
|
||||||
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
|
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
|
||||||
RaftAfterSavePanic, DefragBeforeCopyPanic, DefragBeforeRenamePanic,
|
RaftAfterSavePanic, DefragBeforeCopyPanic, DefragBeforeRenamePanic,
|
||||||
@ -66,6 +68,8 @@ var (
|
|||||||
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic,
|
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic,
|
||||||
CompactBeforeCommitBatchPanic, CompactAfterCommitBatchPanic,
|
CompactBeforeCommitBatchPanic, CompactAfterCommitBatchPanic,
|
||||||
RaftBeforeLeaderSendPanic,
|
RaftBeforeLeaderSendPanic,
|
||||||
|
BlackholePeerNetwork,
|
||||||
|
DelayPeerNetwork,
|
||||||
}}
|
}}
|
||||||
// TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint
|
// TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint
|
||||||
raftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", nil, AnyMember}
|
raftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", nil, AnyMember}
|
||||||
@ -247,3 +251,49 @@ func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.Et
|
|||||||
func (f randomFailpoint) Name() string {
|
func (f randomFailpoint) Name() string {
|
||||||
return "Random"
|
return "Random"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type blackholePeerNetworkFailpoint struct {
|
||||||
|
duration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f blackholePeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||||
|
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||||
|
proxy := member.PeerProxy()
|
||||||
|
|
||||||
|
proxy.BlackholeTx()
|
||||||
|
proxy.BlackholeRx()
|
||||||
|
t.Logf("Blackholing traffic from and to %s", member.Config().Name)
|
||||||
|
time.Sleep(f.duration)
|
||||||
|
t.Logf("Traffic restored for %s", member.Config().Name)
|
||||||
|
proxy.UnblackholeTx()
|
||||||
|
proxy.UnblackholeRx()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f blackholePeerNetworkFailpoint) Name() string {
|
||||||
|
return "blackhole"
|
||||||
|
}
|
||||||
|
|
||||||
|
type delayPeerNetworkFailpoint struct {
|
||||||
|
duration time.Duration
|
||||||
|
baseLatency time.Duration
|
||||||
|
randomizedLatency time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f delayPeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||||
|
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||||
|
proxy := member.PeerProxy()
|
||||||
|
|
||||||
|
proxy.DelayRx(f.baseLatency, f.randomizedLatency)
|
||||||
|
proxy.DelayTx(f.baseLatency, f.randomizedLatency)
|
||||||
|
t.Logf("Delaying traffic from and to %s by %v +/- %v", member.Config().Name, f.baseLatency, f.randomizedLatency)
|
||||||
|
time.Sleep(f.duration)
|
||||||
|
t.Logf("Traffic delay removed for %s", member.Config().Name)
|
||||||
|
proxy.UndelayRx()
|
||||||
|
proxy.UndelayTx()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f delayPeerNetworkFailpoint) Name() string {
|
||||||
|
return "delay"
|
||||||
|
}
|
||||||
|
@ -50,6 +50,7 @@ func TestLinearizability(t *testing.T) {
|
|||||||
failpoint: RandomFailpoint,
|
failpoint: RandomFailpoint,
|
||||||
config: *e2e.NewConfig(
|
config: *e2e.NewConfig(
|
||||||
e2e.WithClusterSize(1),
|
e2e.WithClusterSize(1),
|
||||||
|
e2e.WithPeerProxy(true),
|
||||||
e2e.WithGoFailEnabled(true),
|
e2e.WithGoFailEnabled(true),
|
||||||
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
|
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
|
||||||
),
|
),
|
||||||
@ -58,6 +59,7 @@ func TestLinearizability(t *testing.T) {
|
|||||||
name: "ClusterOfSize3",
|
name: "ClusterOfSize3",
|
||||||
failpoint: RandomFailpoint,
|
failpoint: RandomFailpoint,
|
||||||
config: *e2e.NewConfig(
|
config: *e2e.NewConfig(
|
||||||
|
e2e.WithPeerProxy(true),
|
||||||
e2e.WithGoFailEnabled(true),
|
e2e.WithGoFailEnabled(true),
|
||||||
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
|
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
|
||||||
),
|
),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user