From 3ddcb3ddef0370e0ad161b48df10e41e154d0b5f Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Sat, 29 Oct 2022 22:04:56 +0800 Subject: [PATCH] test: deflake TestDowngradeUpgradeClusterOf3 timeout In the TestDowngradeUpgradeCluster case, the brand-new cluster is using simple-config-changer, which means that entries has been committed before leader election and these entries will be applied when etcdserver starts to receive apply-requests. The simple-config-changer will mark the `confState` dirty and the storage backend precommit hook will update the `confState`. For the new cluster, the storage version is nil at the beginning. And it will be v3.5 if the `confState` record has been committed. And it will be >v3.5 if the `storageVersion` record has been committed. When the new cluster is ready, the leader will set init cluster version with v3.6.x. And then it will trigger the `monitorStorageVersion` to update the `storageVersion` to v3.6.x. If the `confState` record has been updated before cluster version update, we will get storageVersion record. If the storage backend doesn't commit in time, the `monitorStorageVersion` won't update the version because of `cannot detect storage schema version: missing confstate information`. And then we file the downgrade request before next round of `monitorStorageVersion`(per 4 second), the cluster version will be v3.5.0 which is equal to the `UnsafeDetectSchemaVersion`'s result. And we won't see that `The server is ready to downgrade`. It is easy to reproduce the issue if you use cpuset or taskset to limit in two cpus. So, we should wait for the new cluster's storage ready before downgrade request. Fixes: #14540 Signed-off-by: Wei Fu --- tests/e2e/cluster_downgrade_test.go | 104 ++++++++++++++++++++-------- tests/framework/e2e/util.go | 16 +++++ 2 files changed, 91 insertions(+), 29 deletions(-) diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go index 9443ce934..579c73cc9 100644 --- a/tests/e2e/cluster_downgrade_test.go +++ b/tests/e2e/cluster_downgrade_test.go @@ -16,7 +16,9 @@ package e2e import ( "context" + "encoding/json" "fmt" + "strings" "testing" "time" @@ -27,6 +29,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/testutils" + "go.uber.org/zap" ) func TestDowngradeUpgradeClusterOf1(t *testing.T) { @@ -43,17 +46,24 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) { if !fileutil.Exist(lastReleaseBinary) { t.Skipf("%q does not exist", lastReleaseBinary) } + currentVersion := semver.New(version.Version) lastVersion := semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1} - currentVersionStr := fmt.Sprintf("%d.%d", currentVersion.Major, currentVersion.Minor) - lastVersionStr := fmt.Sprintf("%d.%d", lastVersion.Major, lastVersion.Minor) + + currentVersion.PreRelease = "" + currentVersionStr := currentVersion.String() + lastVersionStr := lastVersion.String() e2e.BeforeTest(t) t.Logf("Create cluster with version %s", currentVersionStr) epc := newCluster(t, currentEtcdBinary, clusterSize) for i := 0; i < len(epc.Procs); i++ { - validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ + Cluster: currentVersionStr, + Server: version.Version, + Storage: currentVersionStr, + }) } t.Logf("Cluster created") @@ -62,36 +72,49 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) { t.Log("Downgrade enabled, validating if cluster is ready for downgrade") for i := 0; i < len(epc.Procs); i++ { + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ + Cluster: lastVersionStr, + Server: version.Version, + Storage: lastVersionStr, + }) e2e.AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade") - validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr}) } - t.Log("Cluster is ready for downgrade") + t.Log("Cluster is ready for downgrade") t.Logf("Starting downgrade process to %q", lastVersionStr) for i := 0; i < len(epc.Procs); i++ { t.Logf("Downgrading member %d by running %s binary", i, lastReleaseBinary) stopEtcd(t, epc.Procs[i]) startEtcd(t, epc.Procs[i], lastReleaseBinary) } + t.Log("All members downgraded, validating downgrade") e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded") for i := 0; i < len(epc.Procs); i++ { - validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr}) + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ + Cluster: lastVersionStr, + Server: lastVersionStr, + }) } - t.Log("Downgrade complete") + t.Log("Downgrade complete") t.Logf("Starting upgrade process to %q", currentVersionStr) for i := 0; i < len(epc.Procs); i++ { t.Logf("Upgrading member %d", i) stopEtcd(t, epc.Procs[i]) startEtcd(t, epc.Procs[i], currentEtcdBinary) - if i+1 < len(epc.Procs) { - validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr}) - } + // NOTE: The leader has monitor to the cluster version, which will + // update cluster version. We don't need to check the transient + // version just in case that it might be flaky. } + t.Log("All members upgraded, validating upgrade") for i := 0; i < len(epc.Procs); i++ { - validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ + Cluster: currentVersionStr, + Server: version.Version, + Storage: currentVersionStr, + }) } t.Log("Upgrade complete") } @@ -140,30 +163,23 @@ func stopEtcd(t *testing.T, ep e2e.EtcdProcess) { } func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) { - // Two separate calls to expect as it doesn't support multiple matches on the same line - var err error - testutils.ExecuteWithTimeout(t, 20*time.Second, func() { + testutils.ExecuteWithTimeout(t, 30*time.Second, func() { for { - if expect.Server != "" { - err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server) - if err != nil { - time.Sleep(time.Second) - continue - } + result, err := getMemberVersionByCurl(cfg, member) + if err != nil { + cfg.Logger.Warn("failed to get member version and retrying", zap.Error(err)) + time.Sleep(time.Second) + continue } - if expect.Cluster != "" { - err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster) - if err != nil { - time.Sleep(time.Second) - continue - } + + if err := compareMemberVersion(expect, result); err != nil { + cfg.Logger.Warn("failed to validate and retrying", zap.Error(err)) + time.Sleep(time.Second) + continue } break } }) - if err != nil { - t.Fatal(err) - } } func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess { @@ -190,3 +206,33 @@ func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess { t.Fatal("Leader not found") return nil } + +func compareMemberVersion(expect version.Versions, target version.Versions) error { + if expect.Server != "" && expect.Server != target.Server { + return fmt.Errorf("expect etcdserver version %v, but got %v", expect.Server, target.Server) + } + + if expect.Cluster != "" && expect.Cluster != target.Cluster { + return fmt.Errorf("expect etcdcluster version %v, but got %v", expect.Cluster, target.Cluster) + } + + if expect.Storage != "" && expect.Storage != target.Storage { + return fmt.Errorf("expect storage version %v, but got %v", expect.Storage, target.Storage) + } + return nil +} + +func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) (version.Versions, error) { + args := e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}) + lines, err := e2e.RunUtilCompletion(args, nil) + if err != nil { + return version.Versions{}, err + } + + data := strings.Join(lines, "\n") + result := version.Versions{} + if err := json.Unmarshal([]byte(data), &result); err != nil { + return version.Versions{}, fmt.Errorf("failed to unmarshal (%v): %w", data, err) + } + return result, nil +} diff --git a/tests/framework/e2e/util.go b/tests/framework/e2e/util.go index 8c3b573ca..72a5ee200 100644 --- a/tests/framework/e2e/util.go +++ b/tests/framework/e2e/util.go @@ -81,6 +81,22 @@ func SpawnWithExpectLines(ctx context.Context, args []string, envVars map[string return lines, perr } +func RunUtilCompletion(args []string, envVars map[string]string) ([]string, error) { + proc, err := SpawnCmd(args, envVars) + if err != nil { + return nil, fmt.Errorf("failed to spawn command: %w", err) + } + defer proc.Stop() + + perr := proc.Wait() + // make sure that all the outputs are received + proc.Close() + if perr != nil { + return nil, fmt.Errorf("unexpected error from command %v: %w", args, perr) + } + return proc.Lines(), nil +} + func RandomLeaseID() int64 { return rand.New(rand.NewSource(time.Now().UnixNano())).Int63() }