Merge pull request #12611 from ptabor/20210111-fix-flakes

e2e tests flakes & leaks fixes: In particular TestIssue6361
This commit is contained in:
Jingyi Hu 2021-01-12 21:26:54 +08:00 committed by GitHub
commit bfc6e2ff30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 85 additions and 54 deletions

View File

@ -21,6 +21,7 @@ import (
)
func AssertEqual(t *testing.T, e, a interface{}, msg ...string) {
t.Helper()
if (e == nil || a == nil) && (isNil(e) && isNil(a)) {
return
}
@ -36,20 +37,24 @@ func AssertEqual(t *testing.T, e, a interface{}, msg ...string) {
}
func AssertNil(t *testing.T, v interface{}) {
t.Helper()
AssertEqual(t, nil, v)
}
func AssertNotNil(t *testing.T, v interface{}) {
t.Helper()
if v == nil {
t.Fatalf("expected non-nil, got %+v", v)
}
}
func AssertTrue(t *testing.T, v bool, msg ...string) {
t.Helper()
AssertEqual(t, true, v, msg...)
}
func AssertFalse(t *testing.T, v bool, msg ...string) {
t.Helper()
AssertEqual(t, false, v, msg...)
}

View File

@ -111,10 +111,12 @@ func interestingGoroutines() (gs []string) {
if stack == "" ||
strings.Contains(stack, "sync.(*WaitGroup).Done") ||
strings.Contains(stack, "os.(*file).close") ||
strings.Contains(stack, "os.(*Process).Release") ||
strings.Contains(stack, "created by os/signal.init") ||
strings.Contains(stack, "runtime/panic.go") ||
strings.Contains(stack, "created by testing.RunTests") ||
strings.Contains(stack, "created by testing.runTests") ||
strings.Contains(stack, "created by testing.(*T).Run") ||
strings.Contains(stack, "testing.Main(") ||
strings.Contains(stack, "runtime.goexit") ||
strings.Contains(stack, "go.etcd.io/etcd/pkg/v3/testutil.interestingGoroutines") ||

View File

@ -40,6 +40,26 @@ func (cfg Config) GetLogger() *zap.Logger {
// for testing
var grpcLogOnce = new(sync.Once)
func setupGrpcLogging(debug bool, config zap.Config) {
grpcLogOnce.Do(func() {
// debug true, enable info, warning, error
// debug false, only discard info
if debug {
var gl grpclog.LoggerV2
gl, err := logutil.NewGRPCLoggerV2(config)
if err == nil {
grpclog.SetLoggerV2(gl)
}
} else {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
})
}
func SetupGrpcLoggingForTest(debug bool) {
setupGrpcLogging(debug, zap.NewDevelopmentConfig())
}
// setupLogging initializes etcd logging.
// Must be called after flag parsing or finishing configuring embed.Config.
func (cfg *Config) setupLogging() error {
@ -106,19 +126,7 @@ func (cfg *Config) setupLogging() error {
c.loggerConfig = &copied
c.loggerCore = nil
c.loggerWriteSyncer = nil
grpcLogOnce.Do(func() {
// debug true, enable info, warning, error
// debug false, only discard info
if cfg.LogLevel == "debug" {
var gl grpclog.LoggerV2
gl, err = logutil.NewGRPCLoggerV2(copied)
if err == nil {
grpclog.SetLoggerV2(gl)
}
} else {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
})
setupGrpcLogging(cfg.LogLevel == "debug", copied)
return nil
}
}

View File

@ -18,7 +18,6 @@ import (
"reflect"
"testing"
"time"
"unsafe"
"go.etcd.io/etcd/pkg/v3/testutil"
)
@ -102,7 +101,7 @@ func TestNodeExternClone(t *testing.T) {
}
func sameSlice(a, b []*NodeExtern) bool {
ah := (*reflect.SliceHeader)(unsafe.Pointer(&a))
bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
return *ah == *bh
va := reflect.ValueOf(a)
vb := reflect.ValueOf(b)
return va.Len() == vb.Len() && va.Pointer() == vb.Pointer()
}

View File

@ -25,7 +25,9 @@ import (
"go.etcd.io/etcd/client/v3"
)
func TestCtlV3AuthEnable(t *testing.T) { testCtl(t, authEnableTest) }
func TestCtlV3AuthEnable(t *testing.T) {
testCtl(t, authEnableTest)
}
func TestCtlV3AuthDisable(t *testing.T) { testCtl(t, authDisableTest) }
func TestCtlV3AuthStatus(t *testing.T) { testCtl(t, authStatusTest) }
func TestCtlV3AuthWriteKey(t *testing.T) { testCtl(t, authCredWriteKeyTest) }

View File

@ -24,10 +24,6 @@ import (
)
func TestCtlV3Elect(t *testing.T) {
oldenv := os.Getenv("EXPECT_DEBUG")
defer os.Setenv("EXPECT_DEBUG", oldenv)
os.Setenv("EXPECT_DEBUG", "1")
testCtl(t, testElect)
}

View File

@ -24,10 +24,6 @@ import (
)
func TestCtlV3Lock(t *testing.T) {
oldenv := os.Getenv("EXPECT_DEBUG")
defer os.Setenv("EXPECT_DEBUG", oldenv)
os.Setenv("EXPECT_DEBUG", "1")
testCtl(t, testLock)
}

View File

@ -114,6 +114,7 @@ func ctlV3RoleGrantPermission(cx ctlCtx, rolename string, perm grantingPerm) err
if err != nil {
return err
}
defer proc.Close()
expStr := fmt.Sprintf("Role %s updated", rolename)
_, err = proc.Expect(expStr)
@ -139,7 +140,7 @@ func ctlV3RoleRevokePermission(cx ctlCtx, rolename string, key, rangeEnd string,
if err != nil {
return err
}
defer proc.Close()
_, err = proc.Expect(expStr)
return err
}

View File

@ -18,7 +18,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strings"
@ -32,6 +32,13 @@ import (
func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) }
// TODO: Replace with testing.T.TestDir() in golang-1.15.
func tempDir(tb testing.TB) string {
dir := filepath.Join(os.TempDir(), tb.Name(), fmt.Sprint(rand.Int()))
os.MkdirAll(dir, 0700)
return dir
}
func snapshotTest(cx ctlCtx) {
maintenanceInitKeys(cx)
@ -43,7 +50,7 @@ func snapshotTest(cx ctlCtx) {
cx.t.Fatalf("snapshot: ctlV3Put error (%v)", err)
}
fpath := "test1.snapshot"
fpath := filepath.Join(tempDir(cx.t), "snapshot")
defer os.RemoveAll(fpath)
if err = ctlV3SnapshotSave(cx, fpath); err != nil {
@ -65,7 +72,7 @@ func snapshotTest(cx ctlCtx) {
func TestCtlV3SnapshotCorrupt(t *testing.T) { testCtl(t, snapshotCorruptTest) }
func snapshotCorruptTest(cx ctlCtx) {
fpath := "test2.snapshot"
fpath := filepath.Join(tempDir(cx.t), "snapshot")
defer os.RemoveAll(fpath)
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
@ -82,10 +89,12 @@ func snapshotCorruptTest(cx ctlCtx) {
}
f.Close()
defer os.RemoveAll("snap.etcd")
datadir := filepath.Join(tempDir(cx.t), "data")
defer os.RemoveAll(datadir)
serr := spawnWithExpect(
append(cx.PrefixArgs(), "snapshot", "restore",
"--data-dir", "snap.etcd",
"--data-dir", datadir,
fpath),
"expected sha256")
@ -98,7 +107,7 @@ func snapshotCorruptTest(cx ctlCtx) {
func TestCtlV3SnapshotStatusBeforeRestore(t *testing.T) { testCtl(t, snapshotStatusBeforeRestoreTest) }
func snapshotStatusBeforeRestoreTest(cx ctlCtx) {
fpath := "test3.snapshot"
fpath := filepath.Join(tempDir(cx.t), "snapshot")
defer os.RemoveAll(fpath)
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
@ -111,10 +120,11 @@ func snapshotStatusBeforeRestoreTest(cx ctlCtx) {
cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
}
defer os.RemoveAll("snap.etcd")
dataDir := filepath.Join(tempDir(cx.t), "data")
defer os.RemoveAll(dataDir)
serr := spawnWithExpect(
append(cx.PrefixArgs(), "snapshot", "restore",
"--data-dir", "snap.etcd",
"--data-dir", dataDir,
fpath),
"added member")
if serr != nil {
@ -154,6 +164,14 @@ func getSnapshotStatus(cx ctlCtx, fpath string) (snapshot.Status, error) {
// TestIssue6361 ensures new member that starts with snapshot correctly
// syncs up with other members and serve correct data.
func TestIssue6361(t *testing.T) {
{
// This tests is pretty flaky on semaphoreci as of 2021-01-10.
// TODO: Remove when the flakiness source is identified.
oldenv := os.Getenv("EXPECT_DEBUG")
defer os.Setenv("EXPECT_DEBUG", oldenv)
os.Setenv("EXPECT_DEBUG", "1")
}
defer testutil.AfterTest(t)
os.Setenv("ETCDCTL_API", "3")
defer os.Unsetenv("ETCDCTL_API")
@ -175,7 +193,7 @@ func TestIssue6361(t *testing.T) {
dialTimeout := 10 * time.Second
prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ","), "--dial-timeout", dialTimeout.String()}
// write some keys
t.Log("Writing some keys...")
kvs := []kv{{"foo1", "val1"}, {"foo2", "val2"}, {"foo3", "val3"}}
for i := range kvs {
if err = spawnWithExpect(append(prefixArgs, "put", kvs[i].key, kvs[i].val), "OK"); err != nil {
@ -183,28 +201,29 @@ func TestIssue6361(t *testing.T) {
}
}
fpath := filepath.Join(os.TempDir(), "test.snapshot")
fpath := filepath.Join(tempDir(t), "snapshot")
defer os.RemoveAll(fpath)
// etcdctl save snapshot
t.Log("etcdctl saving snapshot...")
if err = spawnWithExpect(append(prefixArgs, "snapshot", "save", fpath), fmt.Sprintf("Snapshot saved at %s", fpath)); err != nil {
t.Fatal(err)
}
t.Log("Stopping the original server...")
if err = epc.procs[0].Stop(); err != nil {
t.Fatal(err)
}
newDataDir := filepath.Join(os.TempDir(), "test.data")
newDataDir := tempDir(t)
defer os.RemoveAll(newDataDir)
// etcdctl restore the snapshot
t.Log("etcdctl restoring the snapshot...")
err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].Config().name, "--initial-cluster", epc.procs[0].Config().initialCluster, "--initial-cluster-token", epc.procs[0].Config().initialToken, "--initial-advertise-peer-urls", epc.procs[0].Config().purl.String(), "--data-dir", newDataDir}, "added member")
if err != nil {
t.Fatal(err)
}
// start the etcd member using the restored snapshot
t.Log("(Re)starting the etcd member using the restored snapshot...")
epc.procs[0].Config().dataDirPath = newDataDir
for i := range epc.procs[0].Config().args {
if epc.procs[0].Config().args[i] == "--data-dir" {
@ -215,14 +234,14 @@ func TestIssue6361(t *testing.T) {
t.Fatal(err)
}
// ensure the restored member has the correct data
t.Log("Ensuring the restored member has the correct data...")
for i := range kvs {
if err = spawnWithExpect(append(prefixArgs, "get", kvs[i].key), kvs[i].val); err != nil {
t.Fatal(err)
}
}
// add a new member into the cluster
t.Log("Adding new member into the cluster")
clientURL := fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+30)
peerURL := fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+31)
err = spawnWithExpect(append(prefixArgs, "member", "add", "newmember", fmt.Sprintf("--peer-urls=%s", peerURL)), " added to cluster ")
@ -230,16 +249,13 @@ func TestIssue6361(t *testing.T) {
t.Fatal(err)
}
var newDataDir2 string
newDataDir2, err = ioutil.TempDir("", "newdata2")
if err != nil {
t.Fatal(err)
}
newDataDir2 := filepath.Join(tempDir(t), "newdata")
defer os.RemoveAll(newDataDir2)
name2 := "infra2"
initialCluster2 := epc.procs[0].Config().initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL)
t.Log("Starting the new member")
// start the new member
var nepc *expect.ExpectProcess
nepc, err = spawnCmd([]string{epc.procs[0].Config().execPath, "--name", name2,
@ -249,19 +265,20 @@ func TestIssue6361(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, err = nepc.Expect("enabled capabilities for version"); err != nil {
if _, err = nepc.Expect("ready to serve client requests"); err != nil {
t.Fatal(err)
}
prefixArgs = []string{ctlBinPath, "--endpoints", clientURL, "--dial-timeout", dialTimeout.String()}
// ensure added member has data from incoming snapshot
t.Log("Ensuring added member has data from incoming snapshot...")
for i := range kvs {
if err = spawnWithExpect(append(prefixArgs, "get", kvs[i].key), kvs[i].val); err != nil {
t.Fatal(err)
}
}
t.Log("Stopping the second member")
if err = nepc.Stop(); err != nil {
t.Fatal(err)
}

View File

@ -88,10 +88,10 @@ func versionTest(cx ctlCtx) {
func clusterVersionTest(cx ctlCtx, expected string) {
var err error
for i := 0; i < 7; i++ {
for i := 0; i < 35; i++ {
if err = cURLGet(cx.epc, cURLReq{endpoint: "/version", expected: expected}); err != nil {
cx.t.Logf("#%d: v3 is not ready yet (%v)", i, err)
time.Sleep(time.Second)
time.Sleep(200 * time.Millisecond)
continue
}
break

View File

@ -183,6 +183,7 @@ func ctlV3User(cx ctlCtx, args []string, expStr string, stdIn []string) error {
if err != nil {
return err
}
defer proc.Close()
// Send 'stdIn' strings as input.
for _, s := range stdIn {

View File

@ -24,7 +24,7 @@ import (
)
var (
etcdServerReadyLines = []string{"enabled capabilities for version", "published"}
etcdServerReadyLines = []string{"enabled capabilities for version", "published", "ready to serve client requests"}
binPath string
ctlBinPath string
)

View File

@ -246,6 +246,7 @@ func testV3CurlAuth(cx ctlCtx) {
cmdArgs = cURLPrefixArgs(cx.epc, "POST", cURLReq{endpoint: path.Join(p, "/auth/authenticate"), value: string(authreq)})
proc, err := spawnCmd(cmdArgs)
testutil.AssertNil(cx.t, err)
defer proc.Close()
cURLRes, err := proc.ExpectFunc(lineFunc)
testutil.AssertNil(cx.t, err)

View File

@ -665,6 +665,7 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, erro
}
}
// TODO: Verify whether this cleaning of 'cache pages' is needed.
srv.lg.Info("cleaning up page cache")
if err := cleanPageCache(); err != nil {
srv.lg.Warn("failed to clean up page cache", zap.String("error", err.Error()))

View File

@ -103,6 +103,6 @@ func copyFile(src, dst string) error {
func cleanPageCache() error {
// https://www.kernel.org/doc/Documentation/sysctl/vm.txt
// https://github.com/torvalds/linux/blob/master/fs/drop_caches.c
cmd := exec.Command("/bin/sh", "-c", `echo "echo 1 > /proc/sys/vm/drop_caches" | sudo sh`)
cmd := exec.Command("/bin/sh", "-c", `echo "echo 1 > /proc/sys/vm/drop_caches" | sudo -s -n`)
return cmd.Run()
}

View File

@ -8,8 +8,10 @@ import (
"testing"
"go.etcd.io/etcd/pkg/v3/testutil"
"go.etcd.io/etcd/server/v3/embed"
)
func TestMain(m *testing.M) {
embed.SetupGrpcLoggingForTest(true)
testutil.MustTestMainWithLeakDetection(m)
}