Merge pull request #14383 from serathius/context-e2e

tests: Add context to e2e cluster start
This commit is contained in:
Piotr Tabor
2022-08-29 21:12:12 +02:00
committed by GitHub
24 changed files with 87 additions and 75 deletions

View File

@@ -96,7 +96,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
}
func newCluster(t *testing.T, execPath string, clusterSize int) *e2e.EtcdProcessCluster {
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{
ExecPath: execPath,
ClusterSize: clusterSize,
InitialToken: "new",
@@ -115,7 +115,7 @@ func newCluster(t *testing.T, execPath string, clusterSize int) *e2e.EtcdProcess
func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) {
ep.Config().ExecPath = execPath
err := ep.Restart()
err := ep.Restart(context.TODO())
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -94,7 +94,7 @@ func corruptTest(cx ctlCtx) {
cx.t.Log("waiting for etcd[0] failure...")
// restarting corrupted member should fail
e2e.WaitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)})
e2e.WaitReadyExpectProc(context.TODO(), proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)})
}
func TestPeriodicCheckDetectsCorruption(t *testing.T) {
@@ -102,7 +102,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
e2e.BeforeTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
epc, err := e2e.NewEtcdProcessCluster(ctx, t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
KeepDataDir: true,
CorruptCheckTime: time.Second,
@@ -136,7 +136,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.Procs[0].Config().DataDirPath))
assert.NoError(t, err)
err = epc.Procs[0].Restart()
err = epc.Procs[0].Restart(context.TODO())
assert.NoError(t, err)
time.Sleep(checkTime * 11 / 10)
alarmResponse, err := cc.AlarmList(ctx)
@@ -149,7 +149,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
e2e.BeforeTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
epc, err := e2e.NewEtcdProcessCluster(ctx, t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
KeepDataDir: true,
CompactHashCheckEnabled: true,
@@ -183,7 +183,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.Procs[0].Config().DataDirPath))
assert.NoError(t, err)
err = epc.Procs[0].Restart()
err = epc.Procs[0].Restart(ctx)
assert.NoError(t, err)
_, err = cc.Compact(ctx, 5, config.CompactOption{})
assert.NoError(t, err)

View File

@@ -168,7 +168,7 @@ func authGracefulDisableTest(cx ctlCtx) {
// ...and restart the node
node0 := cx.epc.Procs[0]
node0.WithStopSignal(syscall.SIGINT)
if rerr := node0.Restart(); rerr != nil {
if rerr := node0.Restart(context.TODO()); rerr != nil {
cx.t.Fatal(rerr)
}
@@ -1282,7 +1282,7 @@ func authTestRevisionConsistency(cx ctlCtx) {
// restart the node
node0.WithStopSignal(syscall.SIGINT)
if err := node0.Restart(); err != nil {
if err := node0.Restart(context.TODO()); err != nil {
cx.t.Fatal(err)
}

View File

@@ -96,7 +96,7 @@ func TestAuthority(t *testing.T) {
// Enable debug mode to get logs with http2 headers (including authority)
cfg.EnvVars = map[string]string{"GODEBUG": "http2debug=2"}
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"fmt"
"testing"
"time"
@@ -83,7 +84,7 @@ func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvEx
dialTimeout: 7 * time.Second,
}
mirrorepc, err := e2e.NewEtcdProcessCluster(cx.t, &mirrorctx.cfg)
mirrorepc, err := e2e.NewEtcdProcessCluster(context.TODO(), cx.t, &mirrorctx.cfg)
if err != nil {
cx.t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -123,7 +123,7 @@ func setupEtcdctlTest(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, quorum bo
if !quorum {
cfg = e2e.ConfigStandalone(*cfg)
}
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"encoding/json"
"fmt"
"io"
@@ -173,7 +174,7 @@ func testIssue6361(t *testing.T) {
os.Setenv("ETCDCTL_API", "3")
defer os.Unsetenv("ETCDCTL_API")
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
InitialToken: "new",
KeepDataDir: true,
@@ -227,7 +228,7 @@ func testIssue6361(t *testing.T) {
epc.Procs[0].Config().Args[i+1] = newDataDir
}
}
if err = epc.Procs[0].Restart(); err != nil {
if err = epc.Procs[0].Restart(context.TODO()); err != nil {
t.Fatal(err)
}

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"fmt"
"os"
"strings"
@@ -61,7 +62,7 @@ func TestClusterVersion(t *testing.T) {
cfg.BaseScheme = "unix" // to avoid port conflict
cfg.RollingStart = tt.rollingStart
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
@@ -233,7 +234,7 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
ret.cfg.KeepDataDir = true
}
epc, err := e2e.NewEtcdProcessCluster(t, &ret.cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &ret.cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -43,7 +43,7 @@ func testClusterUsingDiscovery(t *testing.T, size int, peerTLS bool) {
t.Skipf("%q does not exist", lastReleaseBinary)
}
dc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
dc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{
BasePort: 2000,
ExecPath: lastReleaseBinary,
ClusterSize: 1,
@@ -62,7 +62,7 @@ func testClusterUsingDiscovery(t *testing.T, size int, peerTLS bool) {
}
cancel()
c, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
c, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{
BasePort: 3000,
ClusterSize: size,
IsPeerTLS: peerTLS,

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"fmt"
"strconv"
"strings"
@@ -47,7 +48,7 @@ func testClusterUsingV3Discovery(t *testing.T, discoveryClusterSize, targetClust
e2e.BeforeTest(t)
// step 1: start the discovery service
ds, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ds, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{
InitialToken: "new",
BasePort: 2000,
ClusterSize: discoveryClusterSize,
@@ -119,5 +120,5 @@ func bootstrapEtcdClusterUsingV3Discovery(t *testing.T, discoveryEndpoints []str
}
// start the cluster
return e2e.StartEtcdProcessCluster(epc, cfg)
return e2e.StartEtcdProcessCluster(context.TODO(), epc, cfg)
}

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"fmt"
"os"
"strings"
@@ -33,7 +34,7 @@ func TestEtcdExampleConfig(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err = e2e.WaitReadyExpectProc(proc, e2e.EtcdServerReadyLines); err != nil {
if err = e2e.WaitReadyExpectProc(context.TODO(), proc, e2e.EtcdServerReadyLines); err != nil {
t.Fatal(err)
}
if err = proc.Stop(); err != nil {
@@ -78,7 +79,7 @@ func TestEtcdMultiPeer(t *testing.T) {
}
for _, p := range procs {
if err := e2e.WaitReadyExpectProc(p, e2e.EtcdServerReadyLines); err != nil {
if err := e2e.WaitReadyExpectProc(context.TODO(), p, e2e.EtcdServerReadyLines); err != nil {
t.Fatal(err)
}
}
@@ -103,7 +104,7 @@ func TestEtcdUnixPeers(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err = e2e.WaitReadyExpectProc(proc, e2e.EtcdServerReadyLines); err != nil {
if err = e2e.WaitReadyExpectProc(context.TODO(), proc, e2e.EtcdServerReadyLines); err != nil {
t.Fatal(err)
}
if err = proc.Stop(); err != nil {
@@ -183,7 +184,7 @@ func TestEtcdPeerCNAuth(t *testing.T) {
} else {
expect = []string{"remote error: tls: bad certificate"}
}
if err := e2e.WaitReadyExpectProc(p, expect); err != nil {
if err := e2e.WaitReadyExpectProc(context.TODO(), p, expect); err != nil {
t.Fatal(err)
}
}
@@ -258,7 +259,7 @@ func TestEtcdPeerNameAuth(t *testing.T) {
} else {
expect = []string{"client certificate authentication failed"}
}
if err := e2e.WaitReadyExpectProc(p, expect); err != nil {
if err := e2e.WaitReadyExpectProc(context.TODO(), p, expect); err != nil {
t.Fatal(err)
}
}
@@ -309,7 +310,7 @@ func TestBootstrapDefragFlag(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err = e2e.WaitReadyExpectProc(proc, []string{"Skipping defragmentation"}); err != nil {
if err = e2e.WaitReadyExpectProc(context.TODO(), proc, []string{"Skipping defragmentation"}); err != nil {
t.Fatal(err)
}
if err = proc.Stop(); err != nil {

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"fmt"
"os"
"sync"
@@ -41,7 +42,7 @@ func TestReleaseUpgrade(t *testing.T) {
copiedCfg.SnapshotCount = 3
copiedCfg.BaseScheme = "unix" // to avoid port conflict
epc, err := e2e.NewEtcdProcessCluster(t, copiedCfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, copiedCfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
@@ -82,7 +83,7 @@ func TestReleaseUpgrade(t *testing.T) {
epc.Procs[i].Config().KeepDataDir = true
t.Logf("Restarting node in the new version: %v", i)
if err := epc.Procs[i].Restart(); err != nil {
if err := epc.Procs[i].Restart(context.TODO()); err != nil {
t.Fatalf("error restarting etcd process (%v)", err)
}
@@ -127,7 +128,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) {
copiedCfg.SnapshotCount = 10
copiedCfg.BaseScheme = "unix"
epc, err := e2e.NewEtcdProcessCluster(t, copiedCfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, copiedCfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
@@ -168,7 +169,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) {
go func(i int) {
epc.Procs[i].Config().ExecPath = e2e.BinDir + "/etcd"
epc.Procs[i].Config().KeepDataDir = true
if err := epc.Procs[i].Restart(); err != nil {
if err := epc.Procs[i].Restart(context.TODO()); err != nil {
t.Errorf("error restarting etcd process (%v)", err)
}
wg.Done()

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"os"
"strings"
"testing"
@@ -28,7 +29,7 @@ var (
)
func TestGateway(t *testing.T) {
ec, err := e2e.NewEtcdProcessCluster(t, e2e.NewConfigNoTLS())
ec, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.NewConfigNoTLS())
if err != nil {
t.Fatal(err)
}

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"fmt"
"strings"
"testing"
@@ -113,7 +114,7 @@ func TestEtctlutlMigrate(t *testing.T) {
}
dataDirPath := t.TempDir()
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{
ExecPath: tc.binary,
DataDirPath: dataDirPath,
ClusterSize: 1,

View File

@@ -36,7 +36,7 @@ func createV2store(t testing.TB, lastReleaseBinary string, dataDirPath string) {
t.Log("Creating not-yet v2-deprecated etcd")
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{ExecPath: lastReleaseBinary, EnableV2: true, DataDirPath: dataDirPath, SnapshotCount: 5})
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
assert.NoError(t, err)
defer func() {
@@ -154,7 +154,7 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) {
assert.NoError(t, epc.Close())
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{ExecPath: currentReleaseBinary, DataDirPath: dataDir})
epc, err = e2e.NewEtcdProcessCluster(t, cfg)
epc, err = e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
assert.NoError(t, err)
cc = e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
@@ -171,7 +171,7 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) {
func runEtcdAndCreateSnapshot(t testing.TB, binary, dataDir string, snapshotCount int) *e2e.EtcdProcessCluster {
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{ExecPath: binary, DataDirPath: dataDir, SnapshotCount: snapshotCount, KeepDataDir: true})
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
assert.NoError(t, err)
return epc
}

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"encoding/json"
"fmt"
"sync"
@@ -90,7 +91,7 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) {
// Step 2: create the cluster
t.Log("Creating an etcd cluster")
epc, err := e2e.NewEtcdProcessCluster(t, &cx.cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &cx.cfg)
if err != nil {
t.Fatalf("Failed to start etcd cluster: %v", err)
}

View File

@@ -18,6 +18,7 @@
package e2e
import (
"context"
"encoding/json"
"testing"
"time"
@@ -28,7 +29,7 @@ import (
func TestServerJsonLogging(t *testing.T) {
e2e.BeforeTest(t)
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
InitialToken: "new",
LogLevel: "debug",

View File

@@ -72,7 +72,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus
default:
t.Fatalf("PeerTLS config %q not supported", cfg.PeerTLS)
}
epc, err := e2e.NewEtcdProcessCluster(t, &e2eConfig)
epc, err := e2e.NewEtcdProcessCluster(ctx, t, &e2eConfig)
if err != nil {
t.Fatalf("could not start etcd integrationCluster: %s", err)
}
@@ -173,8 +173,8 @@ func (m e2eMember) Client() Client {
return e2eClient{e2e.NewEtcdctl(m.Cfg, m.EndpointsV3())}
}
func (m e2eMember) Start() error {
return m.Restart()
func (m e2eMember) Start(ctx context.Context) error {
return m.Restart(ctx)
}
func (m e2eMember) Stop() {

View File

@@ -15,6 +15,7 @@
package e2e
import (
"context"
"fmt"
"net/url"
"os"
@@ -190,13 +191,13 @@ type EtcdProcessClusterConfig struct {
// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new EtcdProcessCluster once all nodes are ready to accept client requests.
func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
func NewEtcdProcessCluster(ctx context.Context, t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
epc, err := InitEtcdProcessCluster(t, cfg)
if err != nil {
return nil, err
}
return StartEtcdProcessCluster(epc, cfg)
return StartEtcdProcessCluster(ctx, epc, cfg)
}
// InitEtcdProcessCluster initializes a new cluster based on the given config.
@@ -225,13 +226,13 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP
}
// StartEtcdProcessCluster launches a new cluster from etcd processes.
func StartEtcdProcessCluster(epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
func StartEtcdProcessCluster(ctx context.Context, epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
if cfg.RollingStart {
if err := epc.RollingStart(); err != nil {
if err := epc.RollingStart(ctx); err != nil {
return nil, fmt.Errorf("cannot rolling-start: %v", err)
}
} else {
if err := epc.Start(); err != nil {
if err := epc.Start(ctx); err != nil {
return nil, fmt.Errorf("cannot start: %v", err)
}
}
@@ -457,16 +458,16 @@ func (epc *EtcdProcessCluster) Endpoints(f func(ep EtcdProcess) []string) (ret [
return ret
}
func (epc *EtcdProcessCluster) Start() error {
return epc.start(func(ep EtcdProcess) error { return ep.Start() })
func (epc *EtcdProcessCluster) Start(ctx context.Context) error {
return epc.start(func(ep EtcdProcess) error { return ep.Start(ctx) })
}
func (epc *EtcdProcessCluster) RollingStart() error {
return epc.rollingStart(func(ep EtcdProcess) error { return ep.Start() })
func (epc *EtcdProcessCluster) RollingStart(ctx context.Context) error {
return epc.rollingStart(func(ep EtcdProcess) error { return ep.Start(ctx) })
}
func (epc *EtcdProcessCluster) Restart() error {
return epc.start(func(ep EtcdProcess) error { return ep.Restart() })
func (epc *EtcdProcessCluster) Restart(ctx context.Context) error {
return epc.start(func(ep EtcdProcess) error { return ep.Restart(ctx) })
}
func (epc *EtcdProcessCluster) start(f func(ep EtcdProcess) error) error {

View File

@@ -18,6 +18,7 @@
package e2e
import (
"context"
"fmt"
"net"
"net/url"
@@ -63,18 +64,18 @@ func (p *proxyEtcdProcess) EndpointsMetrics() []string {
panic("not implemented; proxy doesn't provide health information")
}
func (p *proxyEtcdProcess) Start() error {
if err := p.etcdProc.Start(); err != nil {
func (p *proxyEtcdProcess) Start(ctx context.Context) error {
if err := p.etcdProc.Start(ctx); err != nil {
return err
}
return p.proxyV3.Start()
return p.proxyV3.Start(ctx)
}
func (p *proxyEtcdProcess) Restart() error {
if err := p.etcdProc.Restart(); err != nil {
func (p *proxyEtcdProcess) Restart(ctx context.Context) error {
if err := p.etcdProc.Restart(ctx); err != nil {
return err
}
return p.proxyV3.Restart()
return p.proxyV3.Restart(ctx)
}
func (p *proxyEtcdProcess) Stop() error {
@@ -134,9 +135,9 @@ func (pp *proxyProc) start() error {
return nil
}
func (pp *proxyProc) waitReady(readyStr string) error {
func (pp *proxyProc) waitReady(ctx context.Context, readyStr string) error {
defer close(pp.donec)
return WaitReadyExpectProc(pp.proc, []string{readyStr})
return WaitReadyExpectProc(ctx, pp.proc, []string{readyStr})
}
func (pp *proxyProc) Stop() error {
@@ -265,16 +266,16 @@ func newProxyV3Proc(cfg *EtcdServerProcessConfig) *proxyV3Proc {
}
}
func (v3p *proxyV3Proc) Restart() error {
func (v3p *proxyV3Proc) Restart(ctx context.Context) error {
if err := v3p.Stop(); err != nil {
return err
}
return v3p.Start()
return v3p.Start(ctx)
}
func (v3p *proxyV3Proc) Start() error {
func (v3p *proxyV3Proc) Start(ctx context.Context) error {
if err := v3p.start(); err != nil {
return err
}
return v3p.waitReady("started gRPC proxy")
return v3p.waitReady(ctx, "started gRPC proxy")
}

View File

@@ -41,8 +41,8 @@ type EtcdProcess interface {
EndpointsV3() []string
EndpointsMetrics() []string
Start() error
Restart() error
Start(ctx context.Context) error
Restart(ctx context.Context) error
Stop() error
Close() error
WithStopSignal(sig os.Signal) os.Signal
@@ -99,7 +99,7 @@ func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cf
func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} }
func (ep *EtcdServerProcess) Start() error {
func (ep *EtcdServerProcess) Start(ctx context.Context) error {
if ep.proc != nil {
panic("already started")
}
@@ -109,20 +109,20 @@ func (ep *EtcdServerProcess) Start() error {
return err
}
ep.proc = proc
err = ep.waitReady()
err = ep.waitReady(ctx)
if err == nil {
ep.cfg.lg.Info("started server.", zap.String("name", ep.cfg.Name), zap.Int("pid", ep.proc.Pid()))
}
return err
}
func (ep *EtcdServerProcess) Restart() error {
func (ep *EtcdServerProcess) Restart(ctx context.Context) error {
ep.cfg.lg.Info("restarting server...", zap.String("name", ep.cfg.Name))
if err := ep.Stop(); err != nil {
return err
}
ep.donec = make(chan struct{})
err := ep.Start()
err := ep.Start(ctx)
if err == nil {
ep.cfg.lg.Info("restarted server", zap.String("name", ep.cfg.Name))
}
@@ -169,9 +169,9 @@ func (ep *EtcdServerProcess) WithStopSignal(sig os.Signal) os.Signal {
return ret
}
func (ep *EtcdServerProcess) waitReady() error {
func (ep *EtcdServerProcess) waitReady(ctx context.Context) error {
defer close(ep.donec)
return WaitReadyExpectProc(ep.proc, EtcdServerReadyLines)
return WaitReadyExpectProc(ctx, ep.proc, EtcdServerReadyLines)
}
func (ep *EtcdServerProcess) Config() *EtcdServerProcessConfig { return ep.cfg }

View File

@@ -28,7 +28,7 @@ import (
"go.etcd.io/etcd/pkg/v3/expect"
)
func WaitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error {
func WaitReadyExpectProc(ctx context.Context, exproc *expect.ExpectProcess, readyStrs []string) error {
matchSet := func(l string) bool {
for _, s := range readyStrs {
if strings.Contains(l, s) {
@@ -37,7 +37,7 @@ func WaitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error
}
return false
}
_, err := exproc.ExpectFunc(context.Background(), matchSet)
_, err := exproc.ExpectFunc(ctx, matchSet)
return err
}

View File

@@ -101,7 +101,7 @@ func (m integrationMember) Client() Client {
return integrationClient{Client: m.Member.Client}
}
func (m integrationMember) Start() error {
func (m integrationMember) Start(ctx context.Context) error {
return m.Member.Restart(m.t)
}

View File

@@ -37,7 +37,7 @@ type Cluster interface {
type Member interface {
Client() Client
Start() error
Start(ctx context.Context) error
Stop()
}