Merge pull request #14785 from clarkfw/functional-options-pattern-EtcdProcessClusterConfig-2-2

tests: refactor EtcdProcessClusterConfig using Functional Options Pattern
This commit is contained in:
Benjamin Wang
2022-11-16 18:29:49 +08:00
committed by GitHub
21 changed files with 76 additions and 71 deletions

View File

@@ -130,7 +130,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
}
func newCluster(t *testing.T, clusterSize int) *e2e.EtcdProcessCluster {
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, nil,
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithClusterSize(clusterSize),
e2e.WithKeepDataDir(true),
)

View File

@@ -102,7 +102,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
e2e.BeforeTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epc, err := e2e.NewEtcdProcessCluster(ctx, t, nil,
epc, err := e2e.NewEtcdProcessCluster(ctx, t,
e2e.WithKeepDataDir(true),
e2e.WithCorruptCheckTime(time.Second),
)
@@ -149,7 +149,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
e2e.BeforeTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epc, err := e2e.NewEtcdProcessCluster(ctx, t, nil,
epc, err := e2e.NewEtcdProcessCluster(ctx, t,
e2e.WithKeepDataDir(true),
e2e.WithCompactHashCheckEnabled(true),
e2e.WithCompactHashCheckTime(checkTime),

View File

@@ -32,7 +32,7 @@ func TestAuthCluster(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epc, err := e2e.NewEtcdProcessCluster(ctx, t, nil,
epc, err := e2e.NewEtcdProcessCluster(ctx, t,
e2e.WithClusterSize(1),
e2e.WithSnapshotCount(2),
)

View File

@@ -91,7 +91,7 @@ func TestAuthority(t *testing.T) {
// Enable debug mode to get logs with http2 headers (including authority)
cfg.EnvVars = map[string]string{"GODEBUG": "http2debug=2"}
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -84,7 +84,7 @@ func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvEx
dialTimeout: 7 * time.Second,
}
mirrorepc, err := e2e.NewEtcdProcessCluster(context.TODO(), cx.t, &mirrorctx.cfg)
mirrorepc, err := e2e.NewEtcdProcessCluster(context.TODO(), cx.t, e2e.WithConfig(&mirrorctx.cfg))
if err != nil {
cx.t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -147,7 +147,7 @@ func setupEtcdctlTest(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, quorum bo
if !quorum {
cfg = e2e.ConfigStandalone(*cfg)
}
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -170,7 +170,7 @@ func testIssue6361(t *testing.T) {
e2e.BeforeTest(t)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, nil,
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithClusterSize(1),
e2e.WithKeepDataDir(true),
)

View File

@@ -57,7 +57,7 @@ func TestClusterVersion(t *testing.T) {
e2e.WithRollingStart(tt.rollingStart),
)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
@@ -230,7 +230,7 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
ret.cfg.KeepDataDir = true
}
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &ret.cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(&ret.cfg))
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -42,7 +42,7 @@ func testClusterUsingDiscovery(t *testing.T, size int, peerTLS bool) {
t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease)
}
dc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, nil,
dc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithBasePort(2000),
e2e.WithVersion(e2e.LastVersion),
e2e.WithClusterSize(1),
@@ -61,7 +61,7 @@ func testClusterUsingDiscovery(t *testing.T, size int, peerTLS bool) {
}
cancel()
c, err := e2e.NewEtcdProcessCluster(context.TODO(), t, nil,
c, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithBasePort(3000),
e2e.WithClusterSize(size),
e2e.WithIsPeerTLS(peerTLS),

View File

@@ -48,7 +48,7 @@ func testClusterUsingV3Discovery(t *testing.T, discoveryClusterSize, targetClust
e2e.BeforeTest(t)
// step 1: start the discovery service
ds, err := e2e.NewEtcdProcessCluster(context.TODO(), t, nil,
ds, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithBasePort(2000),
e2e.WithClusterSize(discoveryClusterSize),
e2e.WithClientTLS(clientTlsType),

View File

@@ -35,7 +35,7 @@ func TestGrpcProxyAutoSync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epc, err := e2e.NewEtcdProcessCluster(ctx, t, nil, e2e.WithClusterSize(1))
epc, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithClusterSize(1))
require.NoError(t, err)
defer func() {
assert.NoError(t, epc.Close())

View File

@@ -66,14 +66,11 @@ func mixVersionsSnapshotTest(t *testing.T, clusterVersion, newInstanceVersion e2
}
// Create an etcd cluster with 1 member
cfg := &e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
InitialToken: "new",
SnapshotCount: 10,
Version: clusterVersion,
}
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithClusterSize(1),
e2e.WithSnapshotCount(10),
e2e.WithVersion(clusterVersion),
)
if err != nil {
t.Fatalf("failed to start etcd cluster: %v", err)
}

View File

@@ -35,12 +35,11 @@ func TestReleaseUpgrade(t *testing.T) {
e2e.BeforeTest(t)
copiedCfg := e2e.NewConfigNoTLS()
copiedCfg.Version = e2e.LastVersion
copiedCfg.SnapshotCount = 3
copiedCfg.BaseScheme = "unix" // to avoid port conflict
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, copiedCfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithVersion(e2e.LastVersion),
e2e.WithSnapshotCount(3),
e2e.WithBaseScheme("unix"), // to avoid port conflict
)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
@@ -118,12 +117,12 @@ func TestReleaseUpgradeWithRestart(t *testing.T) {
e2e.BeforeTest(t)
copiedCfg := e2e.NewConfigNoTLS()
copiedCfg.Version = e2e.LastVersion
copiedCfg.SnapshotCount = 10
copiedCfg.BaseScheme = "unix"
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithVersion(e2e.LastVersion),
e2e.WithSnapshotCount(10),
e2e.WithBaseScheme("unix"),
)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, copiedCfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -28,7 +28,7 @@ var (
)
func TestGateway(t *testing.T) {
ec, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.NewConfigNoTLS())
ec, err := e2e.NewEtcdProcessCluster(context.TODO(), t)
if err != nil {
t.Fatal(err)
}

View File

@@ -116,7 +116,7 @@ func TestEtctlutlMigrate(t *testing.T) {
}
dataDirPath := t.TempDir()
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, nil,
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithVersion(tc.clusterVersion),
e2e.WithDataDirPath(dataDirPath),
e2e.WithClusterSize(1),

View File

@@ -35,8 +35,13 @@ import (
func createV2store(t testing.TB, dataDirPath string) string {
t.Log("Creating not-yet v2-deprecated etcd")
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{Version: e2e.LastVersion, EnableV2: true, DataDirPath: dataDirPath, SnapshotCount: 5})
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
cfg := e2e.ConfigStandalone(*e2e.NewConfig(
e2e.WithVersion(e2e.LastVersion),
e2e.WithEnableV2(true),
e2e.WithDataDirPath(dataDirPath),
e2e.WithSnapshotCount(5),
))
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
assert.NoError(t, err)
memberDataDir := epc.Procs[0].Config().DataDirPath
@@ -156,8 +161,11 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, epc.Close())
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{Version: e2e.CurrentVersion, DataDirPath: dataDir})
epc, err = e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
cfg := e2e.ConfigStandalone(*e2e.NewConfig(
e2e.WithVersion(e2e.CurrentVersion),
e2e.WithDataDirPath(dataDir),
))
epc, err = e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
assert.NoError(t, err)
cc, err = e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
@@ -174,8 +182,13 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) {
}
func runEtcdAndCreateSnapshot(t testing.TB, serverVersion e2e.ClusterVersion, dataDir string, snapshotCount int) *e2e.EtcdProcessCluster {
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{Version: serverVersion, DataDirPath: dataDir, SnapshotCount: snapshotCount, KeepDataDir: true})
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, cfg)
cfg := e2e.ConfigStandalone(*e2e.NewConfig(
e2e.WithVersion(serverVersion),
e2e.WithDataDirPath(dataDir),
e2e.WithSnapshotCount(snapshotCount),
e2e.WithKeepDataDir(true),
))
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
assert.NoError(t, err)
return epc
}

View File

@@ -89,7 +89,7 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) {
// Step 2: create the cluster
t.Log("Creating an etcd cluster")
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &cx.cfg)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(&cx.cfg))
if err != nil {
t.Fatalf("Failed to start etcd cluster: %v", err)
}

View File

@@ -29,11 +29,10 @@ import (
func TestServerJsonLogging(t *testing.T) {
e2e.BeforeTest(t)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
InitialToken: "new",
LogLevel: "debug",
})
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithClusterSize(1),
e2e.WithLogLevel("debug"),
)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

View File

@@ -306,16 +306,14 @@ func WithGoFailEnabled(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.GoFailEnabled = enabled }
}
func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
}
// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new EtcdProcessCluster once all nodes are ready to accept client requests.
func NewEtcdProcessCluster(ctx context.Context, t testing.TB, cfg *EtcdProcessClusterConfig, opts ...EPClusterOption) (*EtcdProcessCluster, error) {
if cfg == nil {
cfg = NewConfig(opts...)
} else {
for _, opt := range opts {
opt(cfg)
}
}
func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterOption) (*EtcdProcessCluster, error) {
cfg := NewConfig(opts...)
epc, err := InitEtcdProcessCluster(t, cfg)
if err != nil {
return nil, err

View File

@@ -86,7 +86,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, opts ...config.
default:
t.Fatalf("PeerTLS config %q not supported", cfg.PeerTLS)
}
epc, err := NewEtcdProcessCluster(ctx, t, e2eConfig)
epc, err := NewEtcdProcessCluster(ctx, t, WithConfig(e2eConfig))
if err != nil {
t.Fatalf("could not start etcd integrationCluster: %s", err)
}

View File

@@ -50,28 +50,27 @@ func TestLinearizability(t *testing.T) {
{
name: "ClusterOfSize1",
failpoint: RandomFailpoint,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
GoFailEnabled: true,
CompactionBatchLimit: 100, // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
},
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
),
},
{
name: "ClusterOfSize3",
failpoint: RandomFailpoint,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
GoFailEnabled: true,
CompactionBatchLimit: 100, // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
},
config: *e2e.NewConfig(
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
),
},
{
name: "Issue14370",
failpoint: RaftBeforeSavePanic,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
GoFailEnabled: true,
},
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true),
),
},
}
for _, tc := range tcs {
@@ -93,7 +92,7 @@ func TestLinearizability(t *testing.T) {
}
func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) {
clus, err := e2e.NewEtcdProcessCluster(ctx, t, &config)
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config))
if err != nil {
t.Fatal(err)
}