diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go index 89cbc75e9..a8531e7db 100644 --- a/tests/e2e/cluster_downgrade_test.go +++ b/tests/e2e/cluster_downgrade_test.go @@ -96,7 +96,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) { } func newCluster(t *testing.T, execPath string, clusterSize int) *e2e.EtcdProcessCluster { - epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{ ExecPath: execPath, ClusterSize: clusterSize, InitialToken: "new", @@ -115,7 +115,7 @@ func newCluster(t *testing.T, execPath string, clusterSize int) *e2e.EtcdProcess func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) { ep.Config().ExecPath = execPath - err := ep.Restart() + err := ep.Restart(context.TODO()) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index 88abeaa70..0ca530ffa 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -94,7 +94,7 @@ func corruptTest(cx ctlCtx) { cx.t.Log("waiting for etcd[0] failure...") // restarting corrupted member should fail - e2e.WaitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) + e2e.WaitReadyExpectProc(context.TODO(), proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) } func TestPeriodicCheckDetectsCorruption(t *testing.T) { @@ -102,7 +102,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) { e2e.BeforeTest(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + epc, err := e2e.NewEtcdProcessCluster(ctx, t, &e2e.EtcdProcessClusterConfig{ ClusterSize: 3, KeepDataDir: true, CorruptCheckTime: time.Second, @@ -136,7 +136,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) { err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.Procs[0].Config().DataDirPath)) assert.NoError(t, err) - err = epc.Procs[0].Restart() + err = epc.Procs[0].Restart(context.TODO()) assert.NoError(t, err) time.Sleep(checkTime * 11 / 10) alarmResponse, err := cc.AlarmList(ctx) @@ -149,7 +149,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { e2e.BeforeTest(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + epc, err := e2e.NewEtcdProcessCluster(ctx, t, &e2e.EtcdProcessClusterConfig{ ClusterSize: 3, KeepDataDir: true, CompactHashCheckEnabled: true, @@ -183,7 +183,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.Procs[0].Config().DataDirPath)) assert.NoError(t, err) - err = epc.Procs[0].Restart() + err = epc.Procs[0].Restart(ctx) assert.NoError(t, err) _, err = cc.Compact(ctx, 5, config.CompactOption{}) assert.NoError(t, err) diff --git a/tests/e2e/ctl_v3_auth_test.go b/tests/e2e/ctl_v3_auth_test.go index 96b4d5f58..55ab93652 100644 --- a/tests/e2e/ctl_v3_auth_test.go +++ b/tests/e2e/ctl_v3_auth_test.go @@ -168,7 +168,7 @@ func authGracefulDisableTest(cx ctlCtx) { // ...and restart the node node0 := cx.epc.Procs[0] node0.WithStopSignal(syscall.SIGINT) - if rerr := node0.Restart(); rerr != nil { + if rerr := node0.Restart(context.TODO()); rerr != nil { cx.t.Fatal(rerr) } @@ -1282,7 +1282,7 @@ func authTestRevisionConsistency(cx ctlCtx) { // restart the node node0.WithStopSignal(syscall.SIGINT) - if err := node0.Restart(); err != nil { + if err := node0.Restart(context.TODO()); err != nil { cx.t.Fatal(err) } diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go index 991cdbd26..8de7f5be1 100644 --- a/tests/e2e/ctl_v3_grpc_test.go +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -96,7 +96,7 @@ func TestAuthority(t *testing.T) { // Enable debug mode to get logs with http2 headers (including authority) cfg.EnvVars = map[string]string{"GODEBUG": "http2debug=2"} - epc, err := e2e.NewEtcdProcessCluster(t, cfg) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/ctl_v3_make_mirror_test.go b/tests/e2e/ctl_v3_make_mirror_test.go index bc4f36f36..c697308e7 100644 --- a/tests/e2e/ctl_v3_make_mirror_test.go +++ b/tests/e2e/ctl_v3_make_mirror_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "testing" "time" @@ -83,7 +84,7 @@ func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvEx dialTimeout: 7 * time.Second, } - mirrorepc, err := e2e.NewEtcdProcessCluster(cx.t, &mirrorctx.cfg) + mirrorepc, err := e2e.NewEtcdProcessCluster(context.TODO(), cx.t, &mirrorctx.cfg) if err != nil { cx.t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/ctl_v3_move_leader_test.go b/tests/e2e/ctl_v3_move_leader_test.go index 9176ee45c..d81cf8630 100644 --- a/tests/e2e/ctl_v3_move_leader_test.go +++ b/tests/e2e/ctl_v3_move_leader_test.go @@ -123,7 +123,7 @@ func setupEtcdctlTest(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, quorum bo if !quorum { cfg = e2e.ConfigStandalone(*cfg) } - epc, err := e2e.NewEtcdProcessCluster(t, cfg) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/ctl_v3_snapshot_test.go b/tests/e2e/ctl_v3_snapshot_test.go index 6db55fa4b..22db112da 100644 --- a/tests/e2e/ctl_v3_snapshot_test.go +++ b/tests/e2e/ctl_v3_snapshot_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "encoding/json" "fmt" "io" @@ -173,7 +174,7 @@ func testIssue6361(t *testing.T) { os.Setenv("ETCDCTL_API", "3") defer os.Unsetenv("ETCDCTL_API") - epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{ ClusterSize: 1, InitialToken: "new", KeepDataDir: true, @@ -227,7 +228,7 @@ func testIssue6361(t *testing.T) { epc.Procs[0].Config().Args[i+1] = newDataDir } } - if err = epc.Procs[0].Restart(); err != nil { + if err = epc.Procs[0].Restart(context.TODO()); err != nil { t.Fatal(err) } diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index 939f5dc69..2c3867a48 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "os" "strings" @@ -61,7 +62,7 @@ func TestClusterVersion(t *testing.T) { cfg.BaseScheme = "unix" // to avoid port conflict cfg.RollingStart = tt.rollingStart - epc, err := e2e.NewEtcdProcessCluster(t, cfg) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } @@ -233,7 +234,7 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun ret.cfg.KeepDataDir = true } - epc, err := e2e.NewEtcdProcessCluster(t, &ret.cfg) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &ret.cfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/discovery_test.go b/tests/e2e/discovery_test.go index 8ace067d5..b4c55f23c 100644 --- a/tests/e2e/discovery_test.go +++ b/tests/e2e/discovery_test.go @@ -43,7 +43,7 @@ func testClusterUsingDiscovery(t *testing.T, size int, peerTLS bool) { t.Skipf("%q does not exist", lastReleaseBinary) } - dc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + dc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{ BasePort: 2000, ExecPath: lastReleaseBinary, ClusterSize: 1, @@ -62,7 +62,7 @@ func testClusterUsingDiscovery(t *testing.T, size int, peerTLS bool) { } cancel() - c, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + c, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{ BasePort: 3000, ClusterSize: size, IsPeerTLS: peerTLS, diff --git a/tests/e2e/discovery_v3_test.go b/tests/e2e/discovery_v3_test.go index 45c87e5a9..3cf75f633 100644 --- a/tests/e2e/discovery_v3_test.go +++ b/tests/e2e/discovery_v3_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "strconv" "strings" @@ -47,7 +48,7 @@ func testClusterUsingV3Discovery(t *testing.T, discoveryClusterSize, targetClust e2e.BeforeTest(t) // step 1: start the discovery service - ds, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + ds, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{ InitialToken: "new", BasePort: 2000, ClusterSize: discoveryClusterSize, @@ -119,5 +120,5 @@ func bootstrapEtcdClusterUsingV3Discovery(t *testing.T, discoveryEndpoints []str } // start the cluster - return e2e.StartEtcdProcessCluster(epc, cfg) + return e2e.StartEtcdProcessCluster(context.TODO(), epc, cfg) } diff --git a/tests/e2e/etcd_config_test.go b/tests/e2e/etcd_config_test.go index e00421024..7f684ab9b 100644 --- a/tests/e2e/etcd_config_test.go +++ b/tests/e2e/etcd_config_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "os" "strings" @@ -33,7 +34,7 @@ func TestEtcdExampleConfig(t *testing.T) { if err != nil { t.Fatal(err) } - if err = e2e.WaitReadyExpectProc(proc, e2e.EtcdServerReadyLines); err != nil { + if err = e2e.WaitReadyExpectProc(context.TODO(), proc, e2e.EtcdServerReadyLines); err != nil { t.Fatal(err) } if err = proc.Stop(); err != nil { @@ -78,7 +79,7 @@ func TestEtcdMultiPeer(t *testing.T) { } for _, p := range procs { - if err := e2e.WaitReadyExpectProc(p, e2e.EtcdServerReadyLines); err != nil { + if err := e2e.WaitReadyExpectProc(context.TODO(), p, e2e.EtcdServerReadyLines); err != nil { t.Fatal(err) } } @@ -103,7 +104,7 @@ func TestEtcdUnixPeers(t *testing.T) { if err != nil { t.Fatal(err) } - if err = e2e.WaitReadyExpectProc(proc, e2e.EtcdServerReadyLines); err != nil { + if err = e2e.WaitReadyExpectProc(context.TODO(), proc, e2e.EtcdServerReadyLines); err != nil { t.Fatal(err) } if err = proc.Stop(); err != nil { @@ -183,7 +184,7 @@ func TestEtcdPeerCNAuth(t *testing.T) { } else { expect = []string{"remote error: tls: bad certificate"} } - if err := e2e.WaitReadyExpectProc(p, expect); err != nil { + if err := e2e.WaitReadyExpectProc(context.TODO(), p, expect); err != nil { t.Fatal(err) } } @@ -258,7 +259,7 @@ func TestEtcdPeerNameAuth(t *testing.T) { } else { expect = []string{"client certificate authentication failed"} } - if err := e2e.WaitReadyExpectProc(p, expect); err != nil { + if err := e2e.WaitReadyExpectProc(context.TODO(), p, expect); err != nil { t.Fatal(err) } } @@ -309,7 +310,7 @@ func TestBootstrapDefragFlag(t *testing.T) { if err != nil { t.Fatal(err) } - if err = e2e.WaitReadyExpectProc(proc, []string{"Skipping defragmentation"}); err != nil { + if err = e2e.WaitReadyExpectProc(context.TODO(), proc, []string{"Skipping defragmentation"}); err != nil { t.Fatal(err) } if err = proc.Stop(); err != nil { diff --git a/tests/e2e/etcd_release_upgrade_test.go b/tests/e2e/etcd_release_upgrade_test.go index f52c91f72..c72585929 100644 --- a/tests/e2e/etcd_release_upgrade_test.go +++ b/tests/e2e/etcd_release_upgrade_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "os" "sync" @@ -41,7 +42,7 @@ func TestReleaseUpgrade(t *testing.T) { copiedCfg.SnapshotCount = 3 copiedCfg.BaseScheme = "unix" // to avoid port conflict - epc, err := e2e.NewEtcdProcessCluster(t, copiedCfg) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, copiedCfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } @@ -82,7 +83,7 @@ func TestReleaseUpgrade(t *testing.T) { epc.Procs[i].Config().KeepDataDir = true t.Logf("Restarting node in the new version: %v", i) - if err := epc.Procs[i].Restart(); err != nil { + if err := epc.Procs[i].Restart(context.TODO()); err != nil { t.Fatalf("error restarting etcd process (%v)", err) } @@ -127,7 +128,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) { copiedCfg.SnapshotCount = 10 copiedCfg.BaseScheme = "unix" - epc, err := e2e.NewEtcdProcessCluster(t, copiedCfg) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, copiedCfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } @@ -168,7 +169,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) { go func(i int) { epc.Procs[i].Config().ExecPath = e2e.BinDir + "/etcd" epc.Procs[i].Config().KeepDataDir = true - if err := epc.Procs[i].Restart(); err != nil { + if err := epc.Procs[i].Restart(context.TODO()); err != nil { t.Errorf("error restarting etcd process (%v)", err) } wg.Done() diff --git a/tests/e2e/gateway_test.go b/tests/e2e/gateway_test.go index 938b3a672..6a7b03bd2 100644 --- a/tests/e2e/gateway_test.go +++ b/tests/e2e/gateway_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "os" "strings" "testing" @@ -28,7 +29,7 @@ var ( ) func TestGateway(t *testing.T) { - ec, err := e2e.NewEtcdProcessCluster(t, e2e.NewConfigNoTLS()) + ec, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.NewConfigNoTLS()) if err != nil { t.Fatal(err) } diff --git a/tests/e2e/utl_migrate_test.go b/tests/e2e/utl_migrate_test.go index 46378e8a4..3d1d3a107 100644 --- a/tests/e2e/utl_migrate_test.go +++ b/tests/e2e/utl_migrate_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "strings" "testing" @@ -113,7 +114,7 @@ func TestEtctlutlMigrate(t *testing.T) { } dataDirPath := t.TempDir() - epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{ ExecPath: tc.binary, DataDirPath: dataDirPath, ClusterSize: 1, diff --git a/tests/e2e/v2store_deprecation_test.go b/tests/e2e/v2store_deprecation_test.go index 08c4670b8..5b9f5f4dd 100644 --- a/tests/e2e/v2store_deprecation_test.go +++ b/tests/e2e/v2store_deprecation_test.go @@ -36,7 +36,7 @@ func createV2store(t testing.TB, lastReleaseBinary string, dataDirPath string) { t.Log("Creating not-yet v2-deprecated etcd") cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{ExecPath: lastReleaseBinary, EnableV2: true, DataDirPath: dataDirPath, SnapshotCount: 5}) - epc, err := e2e.NewEtcdProcessCluster(t, cfg) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg) assert.NoError(t, err) defer func() { @@ -154,7 +154,7 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) { assert.NoError(t, epc.Close()) cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{ExecPath: currentReleaseBinary, DataDirPath: dataDir}) - epc, err = e2e.NewEtcdProcessCluster(t, cfg) + epc, err = e2e.NewEtcdProcessCluster(context.TODO(), t, cfg) assert.NoError(t, err) cc = e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3()) @@ -171,7 +171,7 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) { func runEtcdAndCreateSnapshot(t testing.TB, binary, dataDir string, snapshotCount int) *e2e.EtcdProcessCluster { cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{ExecPath: binary, DataDirPath: dataDir, SnapshotCount: snapshotCount, KeepDataDir: true}) - epc, err := e2e.NewEtcdProcessCluster(t, cfg) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg) assert.NoError(t, err) return epc } diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go index 44dfd3dc1..3ead9530b 100644 --- a/tests/e2e/v3_curl_maxstream_test.go +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "encoding/json" "fmt" "sync" @@ -90,7 +91,7 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { // Step 2: create the cluster t.Log("Creating an etcd cluster") - epc, err := e2e.NewEtcdProcessCluster(t, &cx.cfg) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &cx.cfg) if err != nil { t.Fatalf("Failed to start etcd cluster: %v", err) } diff --git a/tests/e2e/zap_logging_test.go b/tests/e2e/zap_logging_test.go index ef37ef79b..808a7052e 100644 --- a/tests/e2e/zap_logging_test.go +++ b/tests/e2e/zap_logging_test.go @@ -18,6 +18,7 @@ package e2e import ( + "context" "encoding/json" "testing" "time" @@ -28,7 +29,7 @@ import ( func TestServerJsonLogging(t *testing.T) { e2e.BeforeTest(t) - epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{ ClusterSize: 1, InitialToken: "new", LogLevel: "debug", diff --git a/tests/framework/e2e.go b/tests/framework/e2e.go index d1731eac0..db54018d0 100644 --- a/tests/framework/e2e.go +++ b/tests/framework/e2e.go @@ -72,7 +72,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus default: t.Fatalf("PeerTLS config %q not supported", cfg.PeerTLS) } - epc, err := e2e.NewEtcdProcessCluster(t, &e2eConfig) + epc, err := e2e.NewEtcdProcessCluster(ctx, t, &e2eConfig) if err != nil { t.Fatalf("could not start etcd integrationCluster: %s", err) } @@ -173,8 +173,8 @@ func (m e2eMember) Client() Client { return e2eClient{e2e.NewEtcdctl(m.Cfg, m.EndpointsV3())} } -func (m e2eMember) Start() error { - return m.Restart() +func (m e2eMember) Start(ctx context.Context) error { + return m.Restart(ctx) } func (m e2eMember) Stop() { diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index fb4b64a73..e8d202c5b 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "net/url" "os" @@ -190,13 +191,13 @@ type EtcdProcessClusterConfig struct { // NewEtcdProcessCluster launches a new cluster from etcd processes, returning // a new EtcdProcessCluster once all nodes are ready to accept client requests. -func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { +func NewEtcdProcessCluster(ctx context.Context, t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { epc, err := InitEtcdProcessCluster(t, cfg) if err != nil { return nil, err } - return StartEtcdProcessCluster(epc, cfg) + return StartEtcdProcessCluster(ctx, epc, cfg) } // InitEtcdProcessCluster initializes a new cluster based on the given config. @@ -225,13 +226,13 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP } // StartEtcdProcessCluster launches a new cluster from etcd processes. -func StartEtcdProcessCluster(epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { +func StartEtcdProcessCluster(ctx context.Context, epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { if cfg.RollingStart { - if err := epc.RollingStart(); err != nil { + if err := epc.RollingStart(ctx); err != nil { return nil, fmt.Errorf("cannot rolling-start: %v", err) } } else { - if err := epc.Start(); err != nil { + if err := epc.Start(ctx); err != nil { return nil, fmt.Errorf("cannot start: %v", err) } } @@ -457,16 +458,16 @@ func (epc *EtcdProcessCluster) Endpoints(f func(ep EtcdProcess) []string) (ret [ return ret } -func (epc *EtcdProcessCluster) Start() error { - return epc.start(func(ep EtcdProcess) error { return ep.Start() }) +func (epc *EtcdProcessCluster) Start(ctx context.Context) error { + return epc.start(func(ep EtcdProcess) error { return ep.Start(ctx) }) } -func (epc *EtcdProcessCluster) RollingStart() error { - return epc.rollingStart(func(ep EtcdProcess) error { return ep.Start() }) +func (epc *EtcdProcessCluster) RollingStart(ctx context.Context) error { + return epc.rollingStart(func(ep EtcdProcess) error { return ep.Start(ctx) }) } -func (epc *EtcdProcessCluster) Restart() error { - return epc.start(func(ep EtcdProcess) error { return ep.Restart() }) +func (epc *EtcdProcessCluster) Restart(ctx context.Context) error { + return epc.start(func(ep EtcdProcess) error { return ep.Restart(ctx) }) } func (epc *EtcdProcessCluster) start(f func(ep EtcdProcess) error) error { diff --git a/tests/framework/e2e/cluster_proxy.go b/tests/framework/e2e/cluster_proxy.go index 9920ede87..a8c7c7d5b 100644 --- a/tests/framework/e2e/cluster_proxy.go +++ b/tests/framework/e2e/cluster_proxy.go @@ -18,6 +18,7 @@ package e2e import ( + "context" "fmt" "net" "net/url" @@ -63,18 +64,18 @@ func (p *proxyEtcdProcess) EndpointsMetrics() []string { panic("not implemented; proxy doesn't provide health information") } -func (p *proxyEtcdProcess) Start() error { - if err := p.etcdProc.Start(); err != nil { +func (p *proxyEtcdProcess) Start(ctx context.Context) error { + if err := p.etcdProc.Start(ctx); err != nil { return err } - return p.proxyV3.Start() + return p.proxyV3.Start(ctx) } -func (p *proxyEtcdProcess) Restart() error { - if err := p.etcdProc.Restart(); err != nil { +func (p *proxyEtcdProcess) Restart(ctx context.Context) error { + if err := p.etcdProc.Restart(ctx); err != nil { return err } - return p.proxyV3.Restart() + return p.proxyV3.Restart(ctx) } func (p *proxyEtcdProcess) Stop() error { @@ -134,9 +135,9 @@ func (pp *proxyProc) start() error { return nil } -func (pp *proxyProc) waitReady(readyStr string) error { +func (pp *proxyProc) waitReady(ctx context.Context, readyStr string) error { defer close(pp.donec) - return WaitReadyExpectProc(pp.proc, []string{readyStr}) + return WaitReadyExpectProc(ctx, pp.proc, []string{readyStr}) } func (pp *proxyProc) Stop() error { @@ -265,16 +266,16 @@ func newProxyV3Proc(cfg *EtcdServerProcessConfig) *proxyV3Proc { } } -func (v3p *proxyV3Proc) Restart() error { +func (v3p *proxyV3Proc) Restart(ctx context.Context) error { if err := v3p.Stop(); err != nil { return err } - return v3p.Start() + return v3p.Start(ctx) } -func (v3p *proxyV3Proc) Start() error { +func (v3p *proxyV3Proc) Start(ctx context.Context) error { if err := v3p.start(); err != nil { return err } - return v3p.waitReady("started gRPC proxy") + return v3p.waitReady(ctx, "started gRPC proxy") } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 9a5c94a14..ede5f3bb8 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -41,8 +41,8 @@ type EtcdProcess interface { EndpointsV3() []string EndpointsMetrics() []string - Start() error - Restart() error + Start(ctx context.Context) error + Restart(ctx context.Context) error Stop() error Close() error WithStopSignal(sig os.Signal) os.Signal @@ -99,7 +99,7 @@ func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cf func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() } func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} } -func (ep *EtcdServerProcess) Start() error { +func (ep *EtcdServerProcess) Start(ctx context.Context) error { if ep.proc != nil { panic("already started") } @@ -109,20 +109,20 @@ func (ep *EtcdServerProcess) Start() error { return err } ep.proc = proc - err = ep.waitReady() + err = ep.waitReady(ctx) if err == nil { ep.cfg.lg.Info("started server.", zap.String("name", ep.cfg.Name), zap.Int("pid", ep.proc.Pid())) } return err } -func (ep *EtcdServerProcess) Restart() error { +func (ep *EtcdServerProcess) Restart(ctx context.Context) error { ep.cfg.lg.Info("restarting server...", zap.String("name", ep.cfg.Name)) if err := ep.Stop(); err != nil { return err } ep.donec = make(chan struct{}) - err := ep.Start() + err := ep.Start(ctx) if err == nil { ep.cfg.lg.Info("restarted server", zap.String("name", ep.cfg.Name)) } @@ -169,9 +169,9 @@ func (ep *EtcdServerProcess) WithStopSignal(sig os.Signal) os.Signal { return ret } -func (ep *EtcdServerProcess) waitReady() error { +func (ep *EtcdServerProcess) waitReady(ctx context.Context) error { defer close(ep.donec) - return WaitReadyExpectProc(ep.proc, EtcdServerReadyLines) + return WaitReadyExpectProc(ctx, ep.proc, EtcdServerReadyLines) } func (ep *EtcdServerProcess) Config() *EtcdServerProcessConfig { return ep.cfg } diff --git a/tests/framework/e2e/util.go b/tests/framework/e2e/util.go index ab9cf69bc..8c3b573ca 100644 --- a/tests/framework/e2e/util.go +++ b/tests/framework/e2e/util.go @@ -28,7 +28,7 @@ import ( "go.etcd.io/etcd/pkg/v3/expect" ) -func WaitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error { +func WaitReadyExpectProc(ctx context.Context, exproc *expect.ExpectProcess, readyStrs []string) error { matchSet := func(l string) bool { for _, s := range readyStrs { if strings.Contains(l, s) { @@ -37,7 +37,7 @@ func WaitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error } return false } - _, err := exproc.ExpectFunc(context.Background(), matchSet) + _, err := exproc.ExpectFunc(ctx, matchSet) return err } diff --git a/tests/framework/integration.go b/tests/framework/integration.go index 6ba1c9808..f2f4e8b8b 100644 --- a/tests/framework/integration.go +++ b/tests/framework/integration.go @@ -101,7 +101,7 @@ func (m integrationMember) Client() Client { return integrationClient{Client: m.Member.Client} } -func (m integrationMember) Start() error { +func (m integrationMember) Start(ctx context.Context) error { return m.Member.Restart(m.t) } diff --git a/tests/framework/interface.go b/tests/framework/interface.go index f46c4819a..583dd117d 100644 --- a/tests/framework/interface.go +++ b/tests/framework/interface.go @@ -37,7 +37,7 @@ type Cluster interface { type Member interface { Client() Client - Start() error + Start(ctx context.Context) error Stop() }