Merge pull request #13876 from ptabor/20220403-integration-test-fixes

Integration tests flake fixes
This commit is contained in:
Piotr Tabor 2022-04-04 14:46:29 +02:00 committed by GitHub
commit 5b84d3934e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 123 additions and 40 deletions

View File

@ -15,6 +15,8 @@
package testutil package testutil
import ( import (
"io/ioutil"
"log"
"os" "os"
"testing" "testing"
@ -38,3 +40,21 @@ func BeforeTest(t testing.TB) {
t.Cleanup(func() { assert.NoError(t, os.Chdir(path)) }) t.Cleanup(func() { assert.NoError(t, os.Chdir(path)) })
} }
func BeforeIntegrationExamples(*testing.M) func() {
ExitInShortMode("Skipping: the tests require real cluster")
tempDir, err := ioutil.TempDir(os.TempDir(), "etcd-integration")
if err != nil {
log.Printf("Failed to obtain tempDir: %v", tempDir)
os.Exit(1)
}
err = os.Chdir(tempDir)
if err != nil {
log.Printf("Failed to change working dir to: %s: %v", tempDir, err)
os.Exit(1)
}
log.Printf("Running tests (examples) in dir(%v): ...", tempDir)
return func() { os.RemoveAll(tempDir) }
}

View File

@ -107,8 +107,8 @@ function integration_extra {
} }
function integration_pass { function integration_pass {
run_for_module "tests" go_test "./integration/..." "parallel" : -timeout="${TIMEOUT:-15m}" "${COMMON_TEST_FLAGS[@]}" "${RUN_ARG[@]}" "$@" || return $? run_for_module "tests" go_test "./integration/..." "parallel" : -timeout="${TIMEOUT:-15m}" "${COMMON_TEST_FLAGS[@]}" "${RUN_ARG[@]}" -p=2 "$@" || return $?
run_for_module "tests" go_test "./common/..." "parallel" : --tags=integration -timeout="${TIMEOUT:-15m}" "${COMMON_TEST_FLAGS[@]}" "${RUN_ARG[@]}" "$@" || return $? run_for_module "tests" go_test "./common/..." "parallel" : --tags=integration -timeout="${TIMEOUT:-15m}" "${COMMON_TEST_FLAGS[@]}" -p=2 "${RUN_ARG[@]}" "$@" || return $?
integration_extra "$@" integration_extra "$@"
} }

View File

@ -120,19 +120,22 @@ func TestLeaseGrantAndList(t *testing.T) {
for _, nc := range nestedCases { for _, nc := range nestedCases {
t.Run(tc.name+"/"+nc.name, func(t *testing.T) { t.Run(tc.name+"/"+nc.name, func(t *testing.T) {
t.Logf("Creating cluster...")
clus := testRunner.NewCluster(t, tc.config) clus := testRunner.NewCluster(t, tc.config)
defer clus.Close() defer clus.Close()
cc := clus.Client() cc := clus.Client()
t.Logf("Created cluster and client")
testutils.ExecuteWithTimeout(t, 10*time.Second, func() { testutils.ExecuteWithTimeout(t, 10*time.Second, func() {
createdLeases := []clientv3.LeaseID{} createdLeases := []clientv3.LeaseID{}
for i := 0; i < nc.leaseCount; i++ { for i := 0; i < nc.leaseCount; i++ {
leaseResp, err := cc.Grant(10) leaseResp, err := cc.Grant(10)
t.Logf("Grant returned: resp:%s err:%v", leaseResp.String(), err)
require.NoError(t, err) require.NoError(t, err)
createdLeases = append(createdLeases, leaseResp.ID) createdLeases = append(createdLeases, leaseResp.ID)
} }
resp, err := cc.LeaseList() resp, err := cc.LeaseList()
t.Logf("Lease list returned: resp:%s err:%v", resp.String(), err)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, resp.Leases, nc.leaseCount) require.Len(t, resp.Leases, nc.leaseCount)

View File

@ -214,6 +214,7 @@ func (c *Cluster) fillClusterForMembers() error {
} }
func (c *Cluster) Launch(t testutil.TB) { func (c *Cluster) Launch(t testutil.TB) {
t.Logf("Launching new cluster...")
errc := make(chan error) errc := make(chan error)
for _, m := range c.Members { for _, m := range c.Members {
// Members are launched in separate goroutines because if they boot // Members are launched in separate goroutines because if they boot
@ -259,7 +260,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
c.LastMemberNum++ c.LastMemberNum++
m := MustNewMember(t, m := MustNewMember(t,
MemberConfig{ MemberConfig{
Name: fmt.Sprintf("m%v", memberNumber-1), Name: fmt.Sprintf("m%v", memberNumber),
MemberNumber: memberNumber, MemberNumber: memberNumber,
AuthToken: c.Cfg.AuthToken, AuthToken: c.Cfg.AuthToken,
PeerTLS: c.Cfg.PeerTLS, PeerTLS: c.Cfg.PeerTLS,
@ -398,12 +399,41 @@ func (c *Cluster) WaitMembersMatch(t testutil.TB, membs []*pb.Member) {
} }
} }
// WaitLeader returns index of the member in c.Members that is leader (or -1). // WaitLeader returns index of the member in c.Members that is leader
func (c *Cluster) WaitLeader(t testutil.TB) int { return c.WaitMembersForLeader(t, c.Members) } // or fails the test (if not established in 30min).
func (c *Cluster) WaitLeader(t testutil.TB) int {
return c.WaitMembersForLeader(t, c.Members)
}
// WaitMembersForLeader waits until given members agree on the same leader, // WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list (or -1). // and returns its 'index' in the 'membs' list
func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int { func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {
t.Logf("WaitMembersForLeader")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
l := 0
for l = c.waitMembersForLeader(ctx, t, membs); l < 0; {
if ctx.Err() != nil {
t.Fatal("WaitLeader FAILED: %v", ctx.Err())
}
}
t.Logf("WaitMembersForLeader succeeded. Cluster leader index: %v", l)
// TODO: Consider second pass check as sometimes leadership is lost
// soon after election:
//
// We perform multiple attempts, as some-times just after successful WaitLLeader
// there is a race and leadership is quickly lost:
// - MsgAppResp message with higher term from 2acc3d3b521981 [term: 3] {"member": "m0"}
// - 9903a56eaf96afac became follower at term 3 {"member": "m0"}
// - 9903a56eaf96afac lost leader 9903a56eaf96afac at term 3 {"member": "m0"}
return l
}
// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *Cluster) waitMembersForLeader(ctx context.Context, t testutil.TB, membs []*Member) int {
possibleLead := make(map[uint64]bool) possibleLead := make(map[uint64]bool)
var lead uint64 var lead uint64
for _, m := range membs { for _, m := range membs {
@ -415,7 +445,7 @@ func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {
} }
// ensure leader is up via linearizable get // ensure leader is up via linearizable get
for { for {
ctx, cancel := context.WithTimeout(context.Background(), 10*TickDuration+time.Second) ctx, cancel := context.WithTimeout(ctx, 10*TickDuration+time.Second)
_, err := cc.Get(ctx, "0") _, err := cc.Get(ctx, "0")
cancel() cancel()
if err == nil || strings.Contains(err.Error(), "Key not found") { if err == nil || strings.Contains(err.Error(), "Key not found") {
@ -442,10 +472,12 @@ func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {
for i, m := range membs { for i, m := range membs {
if uint64(m.Server.ID()) == lead { if uint64(m.Server.ID()) == lead {
t.Logf("waitMembersForLeader found leader. Member: %v lead: %x", i, lead)
return i return i
} }
} }
t.Logf("waitMembersForLeader failed (-1)")
return -1 return -1
} }
@ -498,6 +530,7 @@ func newLocalListener(t testutil.TB) net.Listener {
} }
func NewListenerWithAddr(t testutil.TB, addr string) net.Listener { func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
t.Logf("Creating listener with addr: %v", addr)
l, err := transport.NewUnixListener(addr) l, err := transport.NewUnixListener(addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -689,7 +722,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
m.Logger = memberLogger(t, mcfg.Name) m.Logger = memberLogger(t, mcfg.Name)
m.StrictReconfigCheck = mcfg.StrictReconfigCheck m.StrictReconfigCheck = mcfg.StrictReconfigCheck
if err := m.listenGRPC(); err != nil { if err := m.listenGRPC(); err != nil {
t.Fatal(err) t.Fatalf("listenGRPC FAILED: %v", err)
} }
t.Cleanup(func() { t.Cleanup(func() {
// if we didn't cleanup the logger, the consecutive test // if we didn't cleanup the logger, the consecutive test
@ -714,7 +747,11 @@ func (m *Member) listenGRPC() error {
// prefix with localhost so cert has right domain // prefix with localhost so cert has right domain
network, host, port := m.grpcAddr() network, host, port := m.grpcAddr()
grpcAddr := host + ":" + port grpcAddr := host + ":" + port
m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name)) wd, err := os.Getwd()
if err != nil {
return err
}
m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name), zap.String("workdir", wd))
grpcListener, err := net.Listen(network, grpcAddr) grpcListener, err := net.Listen(network, grpcAddr)
if err != nil { if err != nil {
return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err) return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err)
@ -1313,7 +1350,7 @@ func NewCluster(t testutil.TB, cfg *ClusterConfig) *Cluster {
} }
c.Members = ms c.Members = ms
if err := c.fillClusterForMembers(); err != nil { if err := c.fillClusterForMembers(); err != nil {
t.Fatal(err) t.Fatalf("fillClusterForMembers failed: %v", err)
} }
c.Launch(t) c.Launch(t)
@ -1327,6 +1364,9 @@ func (c *Cluster) TakeClient(idx int) {
} }
func (c *Cluster) Terminate(t testutil.TB) { func (c *Cluster) Terminate(t testutil.TB) {
if t != nil {
t.Logf("========= Cluster termination started =====================")
}
c.mu.Lock() c.mu.Lock()
if c.clusterClient != nil { if c.clusterClient != nil {
if err := c.clusterClient.Close(); err != nil { if err := c.clusterClient.Close(); err != nil {
@ -1348,6 +1388,9 @@ func (c *Cluster) Terminate(t testutil.TB) {
}(m) }(m)
} }
wg.Wait() wg.Wait()
if t != nil {
t.Logf("========= Cluster termination succeeded ===================")
}
} }
func (c *Cluster) RandClient() *clientv3.Client { func (c *Cluster) RandClient() *clientv3.Client {

View File

@ -33,12 +33,13 @@ func forUnitTestsRunInMockedContext(mocking func(), example func()) {
// TestMain sets up an etcd cluster if running the examples. // TestMain sets up an etcd cluster if running the examples.
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
testutil.ExitInShortMode("Skipping: the tests require real cluster") cleanup := testutil.BeforeIntegrationExamples(m)
v := m.Run() v := m.Run()
lazyCluster.Terminate() lazyCluster.Terminate()
if v == 0 { if v == 0 {
testutil.MustCheckLeakedGoroutine() testutil.MustCheckLeakedGoroutine()
} }
cleanup()
os.Exit(v) os.Exit(v)
} }

View File

@ -15,6 +15,7 @@
package clientv3_test package clientv3_test
import ( import (
"io/ioutil"
"log" "log"
"os" "os"
"testing" "testing"
@ -37,7 +38,7 @@ var lazyCluster = integration.NewLazyClusterWithConfig(
func exampleEndpoints() []string { return lazyCluster.EndpointsV3() } func exampleEndpoints() []string { return lazyCluster.EndpointsV3() }
func forUnitTestsRunInMockedContext(mocking func(), example func()) { func forUnitTestsRunInMockedContext(_ func(), example func()) {
// For integration tests runs in the provided environment // For integration tests runs in the provided environment
example() example()
} }
@ -46,15 +47,19 @@ func forUnitTestsRunInMockedContext(mocking func(), example func()) {
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
testutil.ExitInShortMode("Skipping: the tests require real cluster") testutil.ExitInShortMode("Skipping: the tests require real cluster")
tempDir := os.TempDir() tempDir, err := ioutil.TempDir(os.TempDir(), "etcd-integration")
if err != nil {
log.Printf("Failed to obtain tempDir: %v", tempDir)
os.Exit(1)
}
defer os.RemoveAll(tempDir) defer os.RemoveAll(tempDir)
err := os.Chdir(tempDir) err = os.Chdir(tempDir)
if err != nil { if err != nil {
log.Printf("Failed to change working dir to: %s: %v", tempDir, err) log.Printf("Failed to change working dir to: %s: %v", tempDir, err)
os.Exit(1) os.Exit(1)
} }
log.Printf("Running tests (examples) in dir(%v): ...", tempDir)
v := m.Run() v := m.Run()
lazyCluster.Terminate() lazyCluster.Terminate()

View File

@ -96,7 +96,7 @@ func TestMutexTryLockSingleNode(t *testing.T) {
integration2.BeforeTest(t) integration2.BeforeTest(t)
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3}) clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t) defer clus.Terminate(t)
t.Logf("3 nodes cluster created...")
var clients []*clientv3.Client var clients []*clientv3.Client
testMutexTryLock(t, 5, integration2.MakeSingleNodeClients(t, clus, &clients)) testMutexTryLock(t, 5, integration2.MakeSingleNodeClients(t, clus, &clients))
integration2.CloseClients(t, clients) integration2.CloseClients(t, clients)
@ -113,35 +113,39 @@ func TestMutexTryLockMultiNode(t *testing.T) {
} }
func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.Client) { func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.Client) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
lockedC := make(chan *concurrency.Mutex) lockedC := make(chan *concurrency.Mutex)
notlockedC := make(chan *concurrency.Mutex) notlockedC := make(chan *concurrency.Mutex)
stopC := make(chan struct{})
defer close(stopC)
for i := 0; i < lockers; i++ { for i := 0; i < lockers; i++ {
go func() { go func(i int) {
session, err := concurrency.NewSession(chooseClient()) session, err := concurrency.NewSession(chooseClient())
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
m := concurrency.NewMutex(session, "test-mutex-try-lock") m := concurrency.NewMutex(session, "test-mutex-try-lock")
err = m.TryLock(context.TODO()) err = m.TryLock(ctx)
if err == nil { if err == nil {
select { select {
case lockedC <- m: case lockedC <- m:
case <-stopC: case <-ctx.Done():
t.Errorf("Thread: %v, Context failed: %v", i, err)
} }
} else if err == concurrency.ErrLocked { } else if err == concurrency.ErrLocked {
select { select {
case notlockedC <- m: case notlockedC <- m:
case <-stopC: case <-ctx.Done():
t.Errorf("Thread: %v, Context failed: %v", i, err)
} }
} else { } else {
t.Errorf("Unexpected Error %v", err) t.Errorf("Thread: %v; Unexpected Error %v", i, err)
} }
}() }(i)
} }
timerC := time.After(time.Second) timerC := time.After(30 * time.Second)
select { select {
case <-lockedC: case <-lockedC:
for i := 0; i < lockers-1; i++ { for i := 0; i < lockers-1; i++ {
@ -154,7 +158,7 @@ func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.C
} }
} }
case <-timerC: case <-timerC:
t.Errorf("timed out waiting for lock") t.Errorf("timed out waiting for lock (30s)")
} }
} }

View File

@ -274,18 +274,22 @@ func TestMaintenanceStatus(t *testing.T) {
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3}) clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t) defer clus.Terminate(t)
t.Logf("Waiting for leader...")
clus.WaitLeader(t) clus.WaitLeader(t)
t.Logf("Leader established.")
eps := make([]string, 3) eps := make([]string, 3)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
eps[i] = clus.Members[i].GRPCURL() eps[i] = clus.Members[i].GRPCURL()
} }
t.Logf("Creating client...")
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}}) cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer cli.Close() defer cli.Close()
t.Logf("Creating client [DONE]")
prevID, leaderFound := uint64(0), false prevID, leaderFound := uint64(0), false
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
@ -293,6 +297,7 @@ func TestMaintenanceStatus(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Logf("Response from %v: %v", i, resp)
if prevID == 0 { if prevID == 0 {
prevID, leaderFound = resp.Header.MemberId, resp.Header.MemberId == resp.Leader prevID, leaderFound = resp.Header.MemberId, resp.Header.MemberId == resp.Leader
continue continue

View File

@ -415,15 +415,7 @@ func TestWatchResumeCompacted(t *testing.T) {
} }
clus.Members[0].Stop(t) clus.Members[0].Stop(t)
ticker := time.After(time.Second * 10) clus.WaitLeader(t)
for clus.WaitLeader(t) <= 0 {
select {
case <-ticker:
t.Fatalf("failed to wait for new leader")
default:
time.Sleep(10 * time.Millisecond)
}
}
// put some data and compact away // put some data and compact away
numPuts := 5 numPuts := 5

View File

@ -284,6 +284,8 @@ func TestIssue3699(t *testing.T) {
// add node d // add node d
c.AddMember(t) c.AddMember(t)
t.Logf("Disturbing cluster till member:3 will become a leader")
// electing node d as leader makes node a unable to participate // electing node d as leader makes node a unable to participate
leaderID := c.WaitMembersForLeader(t, c.Members) leaderID := c.WaitMembersForLeader(t, c.Members)
for leaderID != 3 { for leaderID != 3 {
@ -297,11 +299,16 @@ func TestIssue3699(t *testing.T) {
leaderID = c.WaitMembersForLeader(t, c.Members) leaderID = c.WaitMembersForLeader(t, c.Members)
} }
t.Logf("Finally elected member 3 as the leader.")
t.Logf("Restarting member '0'...")
// bring back node a // bring back node a
// node a will remain useless as long as d is the leader. // node a will remain useless as long as d is the leader.
if err := c.Members[0].Restart(t); err != nil { if err := c.Members[0].Restart(t); err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Logf("Restarted member '0'.")
select { select {
// waiting for ReadyNotify can take several seconds // waiting for ReadyNotify can take several seconds
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
@ -311,12 +318,13 @@ func TestIssue3699(t *testing.T) {
case <-c.Members[0].Server.ReadyNotify(): case <-c.Members[0].Server.ReadyNotify():
} }
// must WaitMembersForLeader so goroutines don't leak on terminate // must WaitMembersForLeader so goroutines don't leak on terminate
c.WaitMembersForLeader(t, c.Members) c.WaitLeader(t)
t.Logf("Expecting successful put...")
// try to participate in Cluster // try to participate in Cluster
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestTimeout) ctx, cancel := context.WithTimeout(context.Background(), integration.RequestTimeout)
if _, err := c.Members[0].Client.Put(ctx, "/foo", "bar"); err != nil { if _, err := c.Members[0].Client.Put(ctx, "/foo", "bar"); err != nil {
t.Fatalf("unexpected error on Set (%v)", err) t.Fatalf("unexpected error on Put (%v)", err)
} }
cancel() cancel()
} }

View File

@ -77,12 +77,14 @@ func NewLazyClusterWithConfig(cfg integration.ClusterConfig) LazyCluster {
func (lc *lazyCluster) mustLazyInit() { func (lc *lazyCluster) mustLazyInit() {
lc.once.Do(func() { lc.once.Do(func() {
lc.tb.Logf("LazyIniting ...")
var err error var err error
lc.transport, err = transport.NewTransport(transport.TLSInfo{}, time.Second) lc.transport, err = transport.NewTransport(transport.TLSInfo{}, time.Second)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
lc.cluster = integration.NewCluster(lc.tb, &lc.cfg) lc.cluster = integration.NewCluster(lc.tb, &lc.cfg)
lc.tb.Logf("LazyIniting [Done]")
}) })
} }

View File

@ -411,8 +411,8 @@ func TestV3LeaseCheckpoint(t *testing.T) {
} }
} }
if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT { if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second < tc.expectTTLIsGT {
t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT) t.Errorf("Expected lease ttl (%v) to be >= than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
} }
if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT { if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {