test: Migrate to ClusterV3 cluster in integration tests

This commit is contained in:
Marek Siarkowicz 2022-01-18 13:59:26 +01:00
parent 69fc517e56
commit 87c8e8b868
6 changed files with 63 additions and 106 deletions

View File

@ -17,9 +17,10 @@ package membership
import (
"encoding/json"
"fmt"
"go.etcd.io/etcd/client/pkg/v3/types"
"path"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"github.com/coreos/go-semver/semver"

View File

@ -209,7 +209,7 @@ func (c *Cluster) fillClusterForMembers() error {
return nil
}
func NewClusterFromConfig(t testutil.TB, cfg *ClusterConfig) *Cluster {
func newClusterFromConfig(t testutil.TB, cfg *ClusterConfig) *Cluster {
testutil.SkipTestIfShortMode(t, "Cannot start etcd Cluster in --short tests")
c := &Cluster{Cfg: cfg}
@ -225,18 +225,6 @@ func NewClusterFromConfig(t testutil.TB, cfg *ClusterConfig) *Cluster {
return c
}
// NewCluster returns an unlaunched Cluster of the given size which has been
// set to use static bootstrap.
func NewCluster(t testutil.TB, size int) *Cluster {
t.Helper()
return NewClusterFromConfig(t, &ClusterConfig{Size: size})
}
// NewClusterByConfig returns an unlaunched Cluster defined by a Cluster configuration
func NewClusterByConfig(t testutil.TB, cfg *ClusterConfig) *Cluster {
return NewClusterFromConfig(t, cfg)
}
func (c *Cluster) Launch(t testutil.TB) {
errc := make(chan error)
for _, m := range c.Members {
@ -1416,7 +1404,7 @@ func NewClusterV3(t testutil.TB, cfg *ClusterConfig) *ClusterV3 {
cfg.UseGRPC = true
clus := &ClusterV3{
Cluster: NewClusterByConfig(t, cfg),
Cluster: newClusterFromConfig(t, cfg),
}
clus.Launch(t)

View File

@ -89,8 +89,7 @@ func TestV2NoRetryNoLeader(t *testing.T) {
// TestV2RetryRefuse tests destructive api calls will retry if a connection is refused.
func TestV2RetryRefuse(t *testing.T) {
integration2.BeforeTest(t)
cl := integration2.NewCluster(t, 1)
cl.Launch(t)
cl := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1})
defer cl.Terminate(t)
// test connection refused; expect no error failover
cli := integration2.MustNewHTTPClient(t, []string{integration2.URLScheme + "://refuseconn:123", cl.URL(0)}, nil)

View File

@ -26,6 +26,7 @@ import (
"time"
"go.etcd.io/etcd/client/v2"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/integration"
)
@ -45,16 +46,14 @@ func TestClusterOf3(t *testing.T) { testCluster(t, 3) }
func testCluster(t *testing.T, size int) {
integration.BeforeTest(t)
c := integration.NewCluster(t, size)
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size})
defer c.Terminate(t)
clusterMustProgress(t, c.Members)
}
func TestTLSClusterOf3(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewClusterByConfig(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfo})
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfo})
defer c.Terminate(t)
clusterMustProgress(t, c.Members)
}
@ -63,8 +62,7 @@ func TestTLSClusterOf3(t *testing.T) {
// authorities that don't issue dual-usage certificates.
func TestTLSClusterOf3WithSpecificUsage(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewClusterByConfig(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfoWithSpecificUsage})
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfoWithSpecificUsage})
defer c.Terminate(t)
clusterMustProgress(t, c.Members)
}
@ -74,8 +72,7 @@ func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3
func testClusterUsingDiscovery(t *testing.T, size int) {
integration.BeforeTest(t)
dc := integration.NewCluster(t, 1)
dc.Launch(t)
dc := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseIP: true})
defer dc.Terminate(t)
// init discovery token space
dcc := integration.MustNewHTTPClient(t, dc.URLs(), nil)
@ -86,19 +83,14 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
}
cancel()
c := integration.NewClusterByConfig(
t,
&integration.ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"},
)
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"})
defer c.Terminate(t)
clusterMustProgress(t, c.Members)
}
func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
integration.BeforeTest(t)
dc := integration.NewCluster(t, 1)
dc.Launch(t)
dc := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseIP: true})
defer dc.Terminate(t)
// init discovery token space
dcc := integration.MustNewHTTPClient(t, dc.URLs(), nil)
@ -109,13 +101,11 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
}
cancel()
c := integration.NewClusterByConfig(t,
&integration.ClusterConfig{
Size: 3,
PeerTLS: &integration.TestTLSInfo,
DiscoveryURL: dc.URL(0) + "/v2/keys"},
c := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 3,
PeerTLS: &integration.TestTLSInfo,
DiscoveryURL: dc.URL(0) + "/v2/keys"},
)
c.Launch(t)
defer c.Terminate(t)
clusterMustProgress(t, c.Members)
}
@ -125,8 +115,7 @@ func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) }
func testDoubleClusterSize(t *testing.T, size int) {
integration.BeforeTest(t)
c := integration.NewCluster(t, size)
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size})
defer c.Terminate(t)
for i := 0; i < size; i++ {
@ -137,8 +126,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
func TestDoubleTLSClusterSizeOf3(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewClusterByConfig(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfo})
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, PeerTLS: &integration.TestTLSInfo})
defer c.Terminate(t)
for i := 0; i < 3; i++ {
@ -152,8 +140,7 @@ func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) }
func testDecreaseClusterSize(t *testing.T, size int) {
integration.BeforeTest(t)
c := integration.NewCluster(t, size)
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size})
defer c.Terminate(t)
// TODO: remove the last but one member
@ -174,8 +161,8 @@ func testDecreaseClusterSize(t *testing.T, size int) {
}
func TestForceNewCluster(t *testing.T) {
c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t)
integration.BeforeTest(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true, UseGRPC: true})
cc := integration.MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestTimeout)
@ -216,8 +203,7 @@ func TestForceNewCluster(t *testing.T) {
func TestAddMemberAfterClusterFullRotation(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewCluster(t, 3)
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer c.Terminate(t)
// remove all the previous three members and add in three new members.
@ -238,8 +224,7 @@ func TestAddMemberAfterClusterFullRotation(t *testing.T) {
// Ensure we can remove a member then add a new one back immediately.
func TestIssue2681(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewCluster(t, 5)
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5})
defer c.Terminate(t)
c.MustRemoveMember(t, uint64(c.Members[4].Server.ID()))
@ -258,13 +243,7 @@ func TestIssue2746WithThree(t *testing.T) { testIssue2746(t, 3) }
func testIssue2746(t *testing.T, members int) {
integration.BeforeTest(t)
c := integration.NewCluster(t, members)
for _, m := range c.Members {
m.SnapshotCount = 10
}
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: members, SnapshotCount: 10})
defer c.Terminate(t)
// force a snapshot
@ -284,8 +263,7 @@ func testIssue2746(t *testing.T, members int) {
func TestIssue2904(t *testing.T) {
integration.BeforeTest(t)
// start 1-member Cluster to ensure member 0 is the leader of the Cluster.
c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer c.Terminate(t)
c.AddMember(t)
@ -321,8 +299,7 @@ func TestIssue2904(t *testing.T) {
func TestIssue3699(t *testing.T) {
// start a Cluster of 3 nodes a, b, c
integration.BeforeTest(t)
c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer c.Terminate(t)
// make node a unavailable
@ -373,8 +350,7 @@ func TestIssue3699(t *testing.T) {
// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members.
func TestRejectUnhealthyAdd(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 3, UseBridge: true, StrictReconfigCheck: true})
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true, StrictReconfigCheck: true})
defer c.Terminate(t)
// make Cluster unhealthy and wait for downed peer
@ -414,8 +390,7 @@ func TestRejectUnhealthyAdd(t *testing.T) {
// if quorum will be lost.
func TestRejectUnhealthyRemove(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 5, UseBridge: true, StrictReconfigCheck: true})
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, UseBridge: true, StrictReconfigCheck: true})
defer c.Terminate(t)
// make cluster unhealthy and wait for downed peer; (3 up, 2 down)
@ -460,8 +435,7 @@ func TestRestartRemoved(t *testing.T) {
integration.BeforeTest(t)
// 1. start single-member Cluster
c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 1, UseBridge: true, StrictReconfigCheck: true})
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, StrictReconfigCheck: true, UseBridge: true})
defer c.Terminate(t)
// 2. add a new member
@ -469,27 +443,27 @@ func TestRestartRemoved(t *testing.T) {
c.AddMember(t)
c.WaitLeader(t)
oldm := c.Members[0]
oldm.KeepDataDirTerminate = true
firstMember := c.Members[0]
firstMember.KeepDataDirTerminate = true
// 3. remove first member, shut down without deleting data
if err := c.RemoveMember(t, uint64(c.Members[0].Server.ID())); err != nil {
if err := c.RemoveMember(t, uint64(firstMember.Server.ID())); err != nil {
t.Fatalf("expected to remove member, got error %v", err)
}
c.WaitLeader(t)
// 4. restart first member with 'initial-cluster-state=new'
// wrong config, expects exit within ReqTimeout
oldm.ServerConfig.NewCluster = false
if err := oldm.Restart(t); err != nil {
firstMember.ServerConfig.NewCluster = false
if err := firstMember.Restart(t); err != nil {
t.Fatalf("unexpected ForceRestart error: %v", err)
}
defer func() {
oldm.Close()
os.RemoveAll(oldm.ServerConfig.DataDir)
firstMember.Close()
os.RemoveAll(firstMember.ServerConfig.DataDir)
}()
select {
case <-oldm.Server.StopNotify():
case <-firstMember.Server.StopNotify():
case <-time.After(time.Minute):
t.Fatalf("removed member didn't exit within %v", time.Minute)
}
@ -498,35 +472,39 @@ func TestRestartRemoved(t *testing.T) {
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.
func clusterMustProgress(t *testing.T, membs []*integration.Member) {
cc := integration.MustNewHTTPClient(t, []string{membs[0].URL()}, nil)
kapi := client.NewKeysAPI(cc)
func clusterMustProgress(t *testing.T, members []*integration.Member) {
key := fmt.Sprintf("foo%d", rand.Int())
var (
err error
resp *client.Response
resp *clientv3.PutResponse
)
// retry in case of leader loss induced by slow CI
for i := 0; i < 3; i++ {
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestTimeout)
resp, err = kapi.Create(ctx, "/"+key, "bar")
resp, err = members[0].Client.Put(ctx, key, "bar")
cancel()
if err == nil {
break
}
t.Logf("failed to create key on %q (%v)", membs[0].URL(), err)
t.Logf("failed to create key on #0 (%v)", err)
}
if err != nil {
t.Fatalf("create on %s error: %v", membs[0].URL(), err)
t.Fatalf("create on #0 error: %v", err)
}
for i, m := range membs {
u := m.URL()
mcc := integration.MustNewHTTPClient(t, []string{u}, nil)
mkapi := client.NewKeysAPI(mcc)
for i, m := range members {
mctx, mcancel := context.WithTimeout(context.Background(), integration.RequestTimeout)
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
t.Fatalf("#%d: watch on %s error: %v", i, u, err)
watch := m.Client.Watcher.Watch(mctx, key, clientv3.WithRev(resp.Header.Revision-1))
for resp := range watch {
if len(resp.Events) != 0 {
break
}
if resp.Err() != nil {
t.Fatalf("#%d: watch error: %q", i, resp.Err())
}
if resp.Canceled {
t.Fatalf("#%d: watch: cancelled", i)
}
}
mcancel()
}

View File

@ -29,8 +29,7 @@ import (
func TestPauseMember(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewCluster(t, 5)
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5})
defer c.Terminate(t)
for i := 0; i < 5; i++ {
@ -47,8 +46,7 @@ func TestPauseMember(t *testing.T) {
func TestRestartMember(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer c.Terminate(t)
for i := 0; i < 3; i++ {
@ -69,14 +67,13 @@ func TestRestartMember(t *testing.T) {
func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
integration.BeforeTest(t)
size := 3
c := integration.NewCluster(t, size)
c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size})
m := c.Members[0].Clone(t)
var err error
m.DataDir, err = os.MkdirTemp(t.TempDir(), "etcd")
if err != nil {
t.Fatal(err)
}
c.Launch(t)
defer c.Terminate(t)
if err := m.Launch(); err == nil {

View File

@ -1552,8 +1552,7 @@ func TestV3RangeRequest(t *testing.T) {
func newClusterV3NoClients(t *testing.T, cfg *integration.ClusterConfig) *integration.ClusterV3 {
cfg.UseGRPC = true
clus := &integration.ClusterV3{Cluster: integration.NewClusterByConfig(t, cfg)}
clus.Launch(t)
clus := integration.NewClusterV3(t, cfg)
return clus
}
@ -1561,8 +1560,7 @@ func newClusterV3NoClients(t *testing.T, cfg *integration.ClusterConfig) *integr
func TestTLSGRPCRejectInsecureClient(t *testing.T) {
integration.BeforeTest(t)
cfg := integration.ClusterConfig{Size: 3, ClientTLS: &integration.TestTLSInfo}
clus := newClusterV3NoClients(t, &cfg)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, ClientTLS: &integration.TestTLSInfo})
defer clus.Terminate(t)
// nil out TLS field so client will use an insecure connection
@ -1596,8 +1594,7 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) {
func TestTLSGRPCRejectSecureClient(t *testing.T) {
integration.BeforeTest(t)
cfg := integration.ClusterConfig{Size: 3}
clus := newClusterV3NoClients(t, &cfg)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
clus.Members[0].ClientTLSInfo = &integration.TestTLSInfo
@ -1616,8 +1613,7 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) {
func TestTLSGRPCAcceptSecureAll(t *testing.T) {
integration.BeforeTest(t)
cfg := integration.ClusterConfig{Size: 3, ClientTLS: &integration.TestTLSInfo}
clus := newClusterV3NoClients(t, &cfg)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, ClientTLS: &integration.TestTLSInfo})
defer clus.Terminate(t)
client, err := integration.NewClientV3(clus.Members[0])
@ -1834,8 +1830,7 @@ func testTLSReload(
func TestGRPCRequireLeader(t *testing.T) {
integration.BeforeTest(t)
cfg := integration.ClusterConfig{Size: 3}
clus := newClusterV3NoClients(t, &cfg)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
clus.Members[1].Stop(t)
@ -1861,8 +1856,7 @@ func TestGRPCRequireLeader(t *testing.T) {
func TestGRPCStreamRequireLeader(t *testing.T) {
integration.BeforeTest(t)
cfg := integration.ClusterConfig{Size: 3, UseBridge: true}
clus := newClusterV3NoClients(t, &cfg)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
client, err := integration.NewClientV3(clus.Members[0])