diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 4021ff676..a69de7bf0 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -152,9 +152,6 @@ type ClusterConfig struct { GRPCKeepAliveInterval time.Duration GRPCKeepAliveTimeout time.Duration - // SkipCreatingClient to skip creating clients for each member. - SkipCreatingClient bool - ClientMaxCallSendMsgSize int ClientMaxCallRecvMsgSize int @@ -421,6 +418,7 @@ func (c *Cluster) RemoveMember(t testutil.TB, id uint64) error { if uint64(m.Server.ID()) != id { newMembers = append(newMembers, m) } else { + m.Client.Close() select { case <-m.Server.StopNotify(): m.Terminate(t) @@ -438,6 +436,11 @@ func (c *Cluster) RemoveMember(t testutil.TB, id uint64) error { } func (c *Cluster) Terminate(t testutil.TB) { + for _, m := range c.Members { + if m.Client != nil { + m.Client.Close() + } + } var wg sync.WaitGroup wg.Add(len(c.Members)) for _, m := range c.Members { @@ -598,6 +601,8 @@ type Member struct { // ServerClient is a clientv3 that directly calls the etcdserver. ServerClient *clientv3.Client + // Client is a clientv3 that communicates via socket, either UNIX or TCP. + Client *clientv3.Client KeepDataDirTerminate bool ClientMaxCallSendMsgSize int @@ -1086,6 +1091,12 @@ func (m *Member) Launch() error { } m.ServerClosers = append(m.ServerClosers, closer) } + if m.GrpcURL != "" && m.Client == nil { + m.Client, err = NewClientV3(m) + if err != nil { + return err + } + } m.Logger.Info( "launched a member", @@ -1391,7 +1402,6 @@ type ClusterV3 struct { *Cluster mu sync.Mutex - Clients []*clientv3.Client clusterClient *clientv3.Client } @@ -1409,35 +1419,17 @@ func NewClusterV3(t testutil.TB, cfg *ClusterConfig) *ClusterV3 { } clus.Launch(t) - if !cfg.SkipCreatingClient { - for _, m := range clus.Members { - client, err := NewClientV3(m) - if err != nil { - t.Fatalf("cannot create client: %v", err) - } - clus.Clients = append(clus.Clients, client) - } - } - return clus } func (c *ClusterV3) TakeClient(idx int) { c.mu.Lock() - c.Clients[idx] = nil + c.Members[idx].Client = nil c.mu.Unlock() } func (c *ClusterV3) Terminate(t testutil.TB) { c.mu.Lock() - for _, client := range c.Clients { - if client == nil { - continue - } - if err := client.Close(); err != nil { - t.Error(err) - } - } if c.clusterClient != nil { if err := c.clusterClient.Close(); err != nil { t.Error(err) @@ -1448,11 +1440,11 @@ func (c *ClusterV3) Terminate(t testutil.TB) { } func (c *ClusterV3) RandClient() *clientv3.Client { - return c.Clients[rand.Intn(len(c.Clients))] + return c.Members[rand.Intn(len(c.Members))].Client } func (c *ClusterV3) Client(i int) *clientv3.Client { - return c.Clients[i] + return c.Members[i].Client } func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) { diff --git a/tests/integration/clientv3/connectivity/black_hole_test.go b/tests/integration/clientv3/connectivity/black_hole_test.go index 1c501c08a..6519f76ab 100644 --- a/tests/integration/clientv3/connectivity/black_hole_test.go +++ b/tests/integration/clientv3/connectivity/black_hole_test.go @@ -169,9 +169,8 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien integration2.BeforeTest(t) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ - Size: 2, - SkipCreatingClient: true, - UseBridge: true, + Size: 2, + UseBridge: true, }) defer clus.Terminate(t) diff --git a/tests/integration/clientv3/connectivity/dial_test.go b/tests/integration/clientv3/connectivity/dial_test.go index 719de550a..2742ad325 100644 --- a/tests/integration/clientv3/connectivity/dial_test.go +++ b/tests/integration/clientv3/connectivity/dial_test.go @@ -48,7 +48,7 @@ var ( // TestDialTLSExpired tests client with expired certs fails to dial. func TestDialTLSExpired(t *testing.T) { integration2.BeforeTest(t) - clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, PeerTLS: &testTLSInfo, ClientTLS: &testTLSInfo, SkipCreatingClient: true}) + clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, PeerTLS: &testTLSInfo, ClientTLS: &testTLSInfo}) defer clus.Terminate(t) tls, err := testTLSInfoExpired.ClientConfig() @@ -71,7 +71,7 @@ func TestDialTLSExpired(t *testing.T) { // when TLS endpoints (https, unixs) are given but no tls config. func TestDialTLSNoConfig(t *testing.T) { integration2.BeforeTest(t) - clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, ClientTLS: &testTLSInfo, SkipCreatingClient: true}) + clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, ClientTLS: &testTLSInfo}) defer clus.Terminate(t) // expect "signed by unknown authority" c, err := integration2.NewClient(t, clientv3.Config{ @@ -102,7 +102,7 @@ func TestDialSetEndpointsAfterFail(t *testing.T) { // testDialSetEndpoints ensures SetEndpoints can replace unavailable endpoints with available ones. func testDialSetEndpoints(t *testing.T, setBefore bool) { integration2.BeforeTest(t) - clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 3, SkipCreatingClient: true}) + clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 3}) defer clus.Terminate(t) // get endpoint list @@ -166,7 +166,7 @@ func TestSwitchSetEndpoints(t *testing.T) { func TestRejectOldCluster(t *testing.T) { integration2.BeforeTest(t) // 2 endpoints to test multi-endpoint Status - clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 2, SkipCreatingClient: true}) + clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 2}) defer clus.Terminate(t) cfg := clientv3.Config{ diff --git a/tests/integration/clientv3/connectivity/network_partition_test.go b/tests/integration/clientv3/connectivity/network_partition_test.go index 32abb6e12..9e3600e03 100644 --- a/tests/integration/clientv3/connectivity/network_partition_test.go +++ b/tests/integration/clientv3/connectivity/network_partition_test.go @@ -106,8 +106,7 @@ func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, c integration2.BeforeTest(t) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ - Size: 3, - SkipCreatingClient: true, + Size: 3, }) defer clus.Terminate(t) @@ -162,8 +161,7 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T integration2.BeforeTest(t) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ - Size: 3, - SkipCreatingClient: true, + Size: 3, }) defer clus.Terminate(t) eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} @@ -217,8 +215,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) { integration2.BeforeTest(t) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ - Size: 3, - SkipCreatingClient: true, + Size: 3, }) defer clus.Terminate(t) @@ -277,8 +274,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) { integration2.BeforeTest(t) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ - Size: 3, - SkipCreatingClient: true, + Size: 3, }) defer clus.Terminate(t) leaderIndex := clus.WaitLeader(t) diff --git a/tests/integration/clientv3/connectivity/server_shutdown_test.go b/tests/integration/clientv3/connectivity/server_shutdown_test.go index 2d14a3999..f175782a8 100644 --- a/tests/integration/clientv3/connectivity/server_shutdown_test.go +++ b/tests/integration/clientv3/connectivity/server_shutdown_test.go @@ -33,9 +33,8 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) { integration2.BeforeTest(t) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ - Size: 3, - SkipCreatingClient: true, - UseBridge: true, + Size: 3, + UseBridge: true, }) defer clus.Terminate(t) @@ -146,8 +145,7 @@ func testBalancerUnderServerShutdownMutable(t *testing.T, op func(*clientv3.Clie integration2.BeforeTest(t) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ - Size: 3, - SkipCreatingClient: true, + Size: 3, }) defer clus.Terminate(t) @@ -204,8 +202,7 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl integration2.BeforeTest(t) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ - Size: 3, - SkipCreatingClient: true, + Size: 3, }) defer clus.Terminate(t) @@ -277,9 +274,8 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl integration2.BeforeTest(t) cfg := &integration2.ClusterConfig{ - Size: 2, - SkipCreatingClient: true, - UseBridge: true, + Size: 2, + UseBridge: true, } if linearizable { cfg.Size = 3 diff --git a/tests/integration/clientv3/metrics_test.go b/tests/integration/clientv3/metrics_test.go index 07c36a81c..0c3db3dca 100644 --- a/tests/integration/clientv3/metrics_test.go +++ b/tests/integration/clientv3/metrics_test.go @@ -70,7 +70,7 @@ func TestV3ClientMetrics(t *testing.T) { url := "unix://" + addr + "/metrics" - clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, SkipCreatingClient: true}) + clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1}) defer clus.Terminate(t) cfg := clientv3.Config{ diff --git a/tests/integration/clientv3/ordering_util_test.go b/tests/integration/clientv3/ordering_util_test.go index 4c992c702..2fb0c1413 100644 --- a/tests/integration/clientv3/ordering_util_test.go +++ b/tests/integration/clientv3/ordering_util_test.go @@ -82,7 +82,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) { func TestUnresolvableOrderViolation(t *testing.T) { integration2.BeforeTest(t) - clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 5, SkipCreatingClient: true, UseBridge: true}) + clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 5, UseBridge: true}) defer clus.Terminate(t) cfg := clientv3.Config{ Endpoints: []string{ diff --git a/tests/integration/cluster_test.go b/tests/integration/cluster_test.go index 443d97d42..ebdcf9323 100644 --- a/tests/integration/cluster_test.go +++ b/tests/integration/cluster_test.go @@ -309,6 +309,7 @@ func TestIssue2904(t *testing.T) { <-c.Members[1].Server.StopNotify() // terminate removed member + c.Members[1].Client.Close() c.Members[1].Terminate(t) c.Members = c.Members[:1] // wait member to be removed. diff --git a/tests/integration/v3_alarm_test.go b/tests/integration/v3_alarm_test.go index 2fce27bf8..08c0b4082 100644 --- a/tests/integration/v3_alarm_test.go +++ b/tests/integration/v3_alarm_test.go @@ -36,9 +36,8 @@ func TestV3StorageQuotaApply(t *testing.T) { integration.BeforeTest(t) quotasize := int64(16 * os.Getpagesize()) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) defer clus.Terminate(t) - kvc0 := integration.ToGRPC(clus.Client(0)).KV kvc1 := integration.ToGRPC(clus.Client(1)).KV // Set a quota on one node @@ -46,6 +45,7 @@ func TestV3StorageQuotaApply(t *testing.T) { clus.Members[0].Stop(t) clus.Members[0].Restart(t) clus.WaitMembersForLeader(t, clus.Members) + kvc0 := integration.ToGRPC(clus.Client(0)).KV waitForRestart(t, kvc0) key := []byte("abc") diff --git a/tests/integration/v3_election_test.go b/tests/integration/v3_election_test.go index b726f26b3..761afd1f4 100644 --- a/tests/integration/v3_election_test.go +++ b/tests/integration/v3_election_test.go @@ -120,7 +120,7 @@ func TestElectionFailover(t *testing.T) { for i := 0; i < 3; i++ { var err error - ss[i], err = concurrency.NewSession(clus.Clients[i]) + ss[i], err = concurrency.NewSession(clus.Client(i)) if err != nil { t.Error(err) } diff --git a/tests/integration/v3_grpc_test.go b/tests/integration/v3_grpc_test.go index 2b34cbb47..ebb148259 100644 --- a/tests/integration/v3_grpc_test.go +++ b/tests/integration/v3_grpc_test.go @@ -100,14 +100,14 @@ func TestV3PutRestart(t *testing.T) { stopIdx = rand.Intn(3) } - clus.Clients[stopIdx].Close() + clus.Client(stopIdx).Close() clus.Members[stopIdx].Stop(t) clus.Members[stopIdx].Restart(t) c, cerr := integration.NewClientV3(clus.Members[stopIdx]) if cerr != nil { t.Fatalf("cannot create client: %v", cerr) } - clus.Clients[stopIdx] = c + clus.Members[stopIdx].ServerClient = c ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) defer cancel() diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 240e6bcba..5151264a6 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -522,12 +522,12 @@ func TestV3GetNonExistLease(t *testing.T) { Keys: true, } - for _, client := range clus.Clients { + for _, m := range clus.Members { // quorum-read to ensure revoke completes before TimeToLive - if _, err := integration.ToGRPC(client).KV.Range(ctx, &pb.RangeRequest{Key: []byte("_")}); err != nil { + if _, err := integration.ToGRPC(m.Client).KV.Range(ctx, &pb.RangeRequest{Key: []byte("_")}); err != nil { t.Fatal(err) } - resp, err := integration.ToGRPC(client).Lease.LeaseTimeToLive(ctx, leaseTTLr) + resp, err := integration.ToGRPC(m.Client).Lease.LeaseTimeToLive(ctx, leaseTTLr) if err != nil { t.Fatalf("expected non nil error, but go %v", err) }