tests: Move client to member struct

This commit is contained in:
Marek Siarkowicz 2022-01-18 11:07:26 +01:00
parent b8182cd2af
commit 5bcbf77980
12 changed files with 44 additions and 60 deletions

View File

@ -152,9 +152,6 @@ type ClusterConfig struct {
GRPCKeepAliveInterval time.Duration GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration GRPCKeepAliveTimeout time.Duration
// SkipCreatingClient to skip creating clients for each member.
SkipCreatingClient bool
ClientMaxCallSendMsgSize int ClientMaxCallSendMsgSize int
ClientMaxCallRecvMsgSize int ClientMaxCallRecvMsgSize int
@ -421,6 +418,7 @@ func (c *Cluster) RemoveMember(t testutil.TB, id uint64) error {
if uint64(m.Server.ID()) != id { if uint64(m.Server.ID()) != id {
newMembers = append(newMembers, m) newMembers = append(newMembers, m)
} else { } else {
m.Client.Close()
select { select {
case <-m.Server.StopNotify(): case <-m.Server.StopNotify():
m.Terminate(t) m.Terminate(t)
@ -438,6 +436,11 @@ func (c *Cluster) RemoveMember(t testutil.TB, id uint64) error {
} }
func (c *Cluster) Terminate(t testutil.TB) { func (c *Cluster) Terminate(t testutil.TB) {
for _, m := range c.Members {
if m.Client != nil {
m.Client.Close()
}
}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(c.Members)) wg.Add(len(c.Members))
for _, m := range c.Members { for _, m := range c.Members {
@ -598,6 +601,8 @@ type Member struct {
// ServerClient is a clientv3 that directly calls the etcdserver. // ServerClient is a clientv3 that directly calls the etcdserver.
ServerClient *clientv3.Client ServerClient *clientv3.Client
// Client is a clientv3 that communicates via socket, either UNIX or TCP.
Client *clientv3.Client
KeepDataDirTerminate bool KeepDataDirTerminate bool
ClientMaxCallSendMsgSize int ClientMaxCallSendMsgSize int
@ -1086,6 +1091,12 @@ func (m *Member) Launch() error {
} }
m.ServerClosers = append(m.ServerClosers, closer) m.ServerClosers = append(m.ServerClosers, closer)
} }
if m.GrpcURL != "" && m.Client == nil {
m.Client, err = NewClientV3(m)
if err != nil {
return err
}
}
m.Logger.Info( m.Logger.Info(
"launched a member", "launched a member",
@ -1391,7 +1402,6 @@ type ClusterV3 struct {
*Cluster *Cluster
mu sync.Mutex mu sync.Mutex
Clients []*clientv3.Client
clusterClient *clientv3.Client clusterClient *clientv3.Client
} }
@ -1409,35 +1419,17 @@ func NewClusterV3(t testutil.TB, cfg *ClusterConfig) *ClusterV3 {
} }
clus.Launch(t) clus.Launch(t)
if !cfg.SkipCreatingClient {
for _, m := range clus.Members {
client, err := NewClientV3(m)
if err != nil {
t.Fatalf("cannot create client: %v", err)
}
clus.Clients = append(clus.Clients, client)
}
}
return clus return clus
} }
func (c *ClusterV3) TakeClient(idx int) { func (c *ClusterV3) TakeClient(idx int) {
c.mu.Lock() c.mu.Lock()
c.Clients[idx] = nil c.Members[idx].Client = nil
c.mu.Unlock() c.mu.Unlock()
} }
func (c *ClusterV3) Terminate(t testutil.TB) { func (c *ClusterV3) Terminate(t testutil.TB) {
c.mu.Lock() c.mu.Lock()
for _, client := range c.Clients {
if client == nil {
continue
}
if err := client.Close(); err != nil {
t.Error(err)
}
}
if c.clusterClient != nil { if c.clusterClient != nil {
if err := c.clusterClient.Close(); err != nil { if err := c.clusterClient.Close(); err != nil {
t.Error(err) t.Error(err)
@ -1448,11 +1440,11 @@ func (c *ClusterV3) Terminate(t testutil.TB) {
} }
func (c *ClusterV3) RandClient() *clientv3.Client { func (c *ClusterV3) RandClient() *clientv3.Client {
return c.Clients[rand.Intn(len(c.Clients))] return c.Members[rand.Intn(len(c.Members))].Client
} }
func (c *ClusterV3) Client(i int) *clientv3.Client { func (c *ClusterV3) Client(i int) *clientv3.Client {
return c.Clients[i] return c.Members[i].Client
} }
func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) { func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) {

View File

@ -170,7 +170,6 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{
Size: 2, Size: 2,
SkipCreatingClient: true,
UseBridge: true, UseBridge: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)

View File

@ -48,7 +48,7 @@ var (
// TestDialTLSExpired tests client with expired certs fails to dial. // TestDialTLSExpired tests client with expired certs fails to dial.
func TestDialTLSExpired(t *testing.T) { func TestDialTLSExpired(t *testing.T) {
integration2.BeforeTest(t) integration2.BeforeTest(t)
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, PeerTLS: &testTLSInfo, ClientTLS: &testTLSInfo, SkipCreatingClient: true}) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, PeerTLS: &testTLSInfo, ClientTLS: &testTLSInfo})
defer clus.Terminate(t) defer clus.Terminate(t)
tls, err := testTLSInfoExpired.ClientConfig() tls, err := testTLSInfoExpired.ClientConfig()
@ -71,7 +71,7 @@ func TestDialTLSExpired(t *testing.T) {
// when TLS endpoints (https, unixs) are given but no tls config. // when TLS endpoints (https, unixs) are given but no tls config.
func TestDialTLSNoConfig(t *testing.T) { func TestDialTLSNoConfig(t *testing.T) {
integration2.BeforeTest(t) integration2.BeforeTest(t)
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, ClientTLS: &testTLSInfo, SkipCreatingClient: true}) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, ClientTLS: &testTLSInfo})
defer clus.Terminate(t) defer clus.Terminate(t)
// expect "signed by unknown authority" // expect "signed by unknown authority"
c, err := integration2.NewClient(t, clientv3.Config{ c, err := integration2.NewClient(t, clientv3.Config{
@ -102,7 +102,7 @@ func TestDialSetEndpointsAfterFail(t *testing.T) {
// testDialSetEndpoints ensures SetEndpoints can replace unavailable endpoints with available ones. // testDialSetEndpoints ensures SetEndpoints can replace unavailable endpoints with available ones.
func testDialSetEndpoints(t *testing.T, setBefore bool) { func testDialSetEndpoints(t *testing.T, setBefore bool) {
integration2.BeforeTest(t) integration2.BeforeTest(t)
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 3, SkipCreatingClient: true}) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t) defer clus.Terminate(t)
// get endpoint list // get endpoint list
@ -166,7 +166,7 @@ func TestSwitchSetEndpoints(t *testing.T) {
func TestRejectOldCluster(t *testing.T) { func TestRejectOldCluster(t *testing.T) {
integration2.BeforeTest(t) integration2.BeforeTest(t)
// 2 endpoints to test multi-endpoint Status // 2 endpoints to test multi-endpoint Status
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 2, SkipCreatingClient: true}) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 2})
defer clus.Terminate(t) defer clus.Terminate(t)
cfg := clientv3.Config{ cfg := clientv3.Config{

View File

@ -107,7 +107,6 @@ func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, c
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{
Size: 3, Size: 3,
SkipCreatingClient: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
@ -163,7 +162,6 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{
Size: 3, Size: 3,
SkipCreatingClient: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
@ -218,7 +216,6 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{
Size: 3, Size: 3,
SkipCreatingClient: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
@ -278,7 +275,6 @@ func TestDropReadUnderNetworkPartition(t *testing.T) {
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{
Size: 3, Size: 3,
SkipCreatingClient: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
leaderIndex := clus.WaitLeader(t) leaderIndex := clus.WaitLeader(t)

View File

@ -34,7 +34,6 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{
Size: 3, Size: 3,
SkipCreatingClient: true,
UseBridge: true, UseBridge: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
@ -147,7 +146,6 @@ func testBalancerUnderServerShutdownMutable(t *testing.T, op func(*clientv3.Clie
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{
Size: 3, Size: 3,
SkipCreatingClient: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
@ -205,7 +203,6 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{ clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{
Size: 3, Size: 3,
SkipCreatingClient: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
@ -278,7 +275,6 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
cfg := &integration2.ClusterConfig{ cfg := &integration2.ClusterConfig{
Size: 2, Size: 2,
SkipCreatingClient: true,
UseBridge: true, UseBridge: true,
} }
if linearizable { if linearizable {

View File

@ -70,7 +70,7 @@ func TestV3ClientMetrics(t *testing.T) {
url := "unix://" + addr + "/metrics" url := "unix://" + addr + "/metrics"
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1, SkipCreatingClient: true}) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1})
defer clus.Terminate(t) defer clus.Terminate(t)
cfg := clientv3.Config{ cfg := clientv3.Config{

View File

@ -82,7 +82,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
func TestUnresolvableOrderViolation(t *testing.T) { func TestUnresolvableOrderViolation(t *testing.T) {
integration2.BeforeTest(t) integration2.BeforeTest(t)
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 5, SkipCreatingClient: true, UseBridge: true}) clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 5, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{ Endpoints: []string{

View File

@ -309,6 +309,7 @@ func TestIssue2904(t *testing.T) {
<-c.Members[1].Server.StopNotify() <-c.Members[1].Server.StopNotify()
// terminate removed member // terminate removed member
c.Members[1].Client.Close()
c.Members[1].Terminate(t) c.Members[1].Terminate(t)
c.Members = c.Members[:1] c.Members = c.Members[:1]
// wait member to be removed. // wait member to be removed.

View File

@ -36,9 +36,8 @@ func TestV3StorageQuotaApply(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
quotasize := int64(16 * os.Getpagesize()) quotasize := int64(16 * os.Getpagesize())
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t) defer clus.Terminate(t)
kvc0 := integration.ToGRPC(clus.Client(0)).KV
kvc1 := integration.ToGRPC(clus.Client(1)).KV kvc1 := integration.ToGRPC(clus.Client(1)).KV
// Set a quota on one node // Set a quota on one node
@ -46,6 +45,7 @@ func TestV3StorageQuotaApply(t *testing.T) {
clus.Members[0].Stop(t) clus.Members[0].Stop(t)
clus.Members[0].Restart(t) clus.Members[0].Restart(t)
clus.WaitMembersForLeader(t, clus.Members) clus.WaitMembersForLeader(t, clus.Members)
kvc0 := integration.ToGRPC(clus.Client(0)).KV
waitForRestart(t, kvc0) waitForRestart(t, kvc0)
key := []byte("abc") key := []byte("abc")

View File

@ -120,7 +120,7 @@ func TestElectionFailover(t *testing.T) {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
var err error var err error
ss[i], err = concurrency.NewSession(clus.Clients[i]) ss[i], err = concurrency.NewSession(clus.Client(i))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

View File

@ -100,14 +100,14 @@ func TestV3PutRestart(t *testing.T) {
stopIdx = rand.Intn(3) stopIdx = rand.Intn(3)
} }
clus.Clients[stopIdx].Close() clus.Client(stopIdx).Close()
clus.Members[stopIdx].Stop(t) clus.Members[stopIdx].Stop(t)
clus.Members[stopIdx].Restart(t) clus.Members[stopIdx].Restart(t)
c, cerr := integration.NewClientV3(clus.Members[stopIdx]) c, cerr := integration.NewClientV3(clus.Members[stopIdx])
if cerr != nil { if cerr != nil {
t.Fatalf("cannot create client: %v", cerr) t.Fatalf("cannot create client: %v", cerr)
} }
clus.Clients[stopIdx] = c clus.Members[stopIdx].ServerClient = c
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel() defer cancel()

View File

@ -522,12 +522,12 @@ func TestV3GetNonExistLease(t *testing.T) {
Keys: true, Keys: true,
} }
for _, client := range clus.Clients { for _, m := range clus.Members {
// quorum-read to ensure revoke completes before TimeToLive // quorum-read to ensure revoke completes before TimeToLive
if _, err := integration.ToGRPC(client).KV.Range(ctx, &pb.RangeRequest{Key: []byte("_")}); err != nil { if _, err := integration.ToGRPC(m.Client).KV.Range(ctx, &pb.RangeRequest{Key: []byte("_")}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
resp, err := integration.ToGRPC(client).Lease.LeaseTimeToLive(ctx, leaseTTLr) resp, err := integration.ToGRPC(m.Client).Lease.LeaseTimeToLive(ctx, leaseTTLr)
if err != nil { if err != nil {
t.Fatalf("expected non nil error, but go %v", err) t.Fatalf("expected non nil error, but go %v", err)
} }