diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go index 06031ea48..eecf4c862 100644 --- a/tests/e2e/cluster_downgrade_test.go +++ b/tests/e2e/cluster_downgrade_test.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "testing" "time" @@ -22,12 +23,21 @@ import ( "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/fileutil" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/testutils" ) -func TestDowngradeUpgrade(t *testing.T) { - currentEtcdBinary := "" +func TestDowngradeUpgradeClusterOf1(t *testing.T) { + testDowngradeUpgrade(t, 1) +} + +func TestDowngradeUpgradeClusterOf3(t *testing.T) { + testDowngradeUpgrade(t, 3) +} + +func testDowngradeUpgrade(t *testing.T, clusterSize int) { + currentEtcdBinary := e2e.BinDir + "/etcd" lastReleaseBinary := e2e.BinDir + "/etcd-last-release" if !fileutil.Exist(lastReleaseBinary) { t.Skipf("%q does not exist", lastReleaseBinary) @@ -38,30 +48,57 @@ func TestDowngradeUpgrade(t *testing.T) { lastVersionStr := fmt.Sprintf("%d.%d", lastVersion.Major, lastVersion.Minor) e2e.BeforeTest(t) - dataDirPath := t.TempDir() - epc := startEtcd(t, currentEtcdBinary, dataDirPath) - validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + t.Logf("Create cluster with version %s", currentVersionStr) + epc := newCluster(t, currentEtcdBinary, clusterSize) + for i := 0; i < len(epc.Procs); i++ { + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + } + t.Logf("Cluster created") + t.Logf("etcdctl downgrade enable %s", lastVersionStr) downgradeEnable(t, epc, lastVersion) - expectLog(t, epc, "The server is ready to downgrade") - validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: currentVersionStr}) - stopEtcd(t, epc) - epc = startEtcd(t, lastReleaseBinary, dataDirPath) - expectLog(t, epc, "the cluster has been downgraded") - validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: lastVersionStr}) + t.Log("Downgrade enabled, validating if cluster is ready for downgrade") + for i := 0; i < len(epc.Procs); i++ { + expectLog(t, epc.Procs[i], "The server is ready to downgrade") + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr}) + } + t.Log("Cluster is ready for downgrade") - stopEtcd(t, epc) - epc = startEtcd(t, currentEtcdBinary, dataDirPath) - validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + t.Logf("Starting downgrade process to %q", lastVersionStr) + for i := 0; i < len(epc.Procs); i++ { + t.Logf("Downgrading member %d by running %s binary", i, lastReleaseBinary) + stopEtcd(t, epc.Procs[i]) + startEtcd(t, epc.Procs[i], lastReleaseBinary) + } + t.Log("All members downgraded, validating downgrade") + expectLog(t, leader(t, epc), "the cluster has been downgraded") + for i := 0; i < len(epc.Procs); i++ { + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr}) + } + t.Log("Downgrade complete") + + t.Logf("Starting upgrade process to %q", currentVersionStr) + for i := 0; i < len(epc.Procs); i++ { + t.Logf("Upgrading member %d", i) + stopEtcd(t, epc.Procs[i]) + startEtcd(t, epc.Procs[i], currentEtcdBinary) + if i+1 < len(epc.Procs) { + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr}) + } + } + t.Log("All members upgraded, validating upgrade") + for i := 0; i < len(epc.Procs); i++ { + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + } + t.Log("Upgrade complete") } -func startEtcd(t *testing.T, execPath, dataDirPath string) *e2e.EtcdProcessCluster { +func newCluster(t *testing.T, execPath string, clusterSize int) *e2e.EtcdProcessCluster { epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ ExecPath: execPath, - DataDirPath: dataDirPath, - ClusterSize: 1, + ClusterSize: clusterSize, InitialToken: "new", KeepDataDir: true, }) @@ -76,8 +113,15 @@ func startEtcd(t *testing.T, execPath, dataDirPath string) *e2e.EtcdProcessClust return epc } +func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) { + ep.Config().ExecPath = execPath + err := ep.Restart() + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } +} + func downgradeEnable(t *testing.T, epc *e2e.EtcdProcessCluster, ver semver.Version) { - t.Log("etcdctl downgrade...") c := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3()) testutils.ExecuteWithTimeout(t, 20*time.Second, func() { err := c.DowngradeEnable(ver.String()) @@ -87,38 +131,71 @@ func downgradeEnable(t *testing.T, epc *e2e.EtcdProcessCluster, ver semver.Versi }) } -func stopEtcd(t *testing.T, epc *e2e.EtcdProcessCluster) { - t.Log("Stopping the server...") - if err := epc.Procs[0].Stop(); err != nil { +func stopEtcd(t *testing.T, ep e2e.EtcdProcess) { + if err := ep.Stop(); err != nil { t.Fatal(err) } } -func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.Versions) { - t.Log("Validate version") +func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) { // Two separate calls to expect as it doesn't support multiple matches on the same line + var err error testutils.ExecuteWithTimeout(t, 20*time.Second, func() { - if expect.Server != "" { - err := e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server) - if err != nil { - t.Fatal(err) + for { + if expect.Server != "" { + err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server) + if err != nil { + time.Sleep(time.Second) + continue + } } - } - if expect.Cluster != "" { - err := e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster) - if err != nil { - t.Fatal(err) + if expect.Cluster != "" { + err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster) + if err != nil { + time.Sleep(time.Second) + continue + } } + break } }) + if err != nil { + t.Fatal(err) + } } -func expectLog(t *testing.T, epc *e2e.EtcdProcessCluster, expectLog string) { +func expectLog(t *testing.T, ep e2e.EtcdProcess, expectLog string) { t.Helper() + var err error testutils.ExecuteWithTimeout(t, 30*time.Second, func() { - _, err := epc.Procs[0].Logs().Expect(expectLog) + _, err = ep.Logs().Expect(expectLog) + }) + if err != nil { + t.Fatal(err) + } +} + +func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + for i := 0; i < len(epc.Procs); i++ { + endpoints := epc.Procs[i].EndpointsV3() + cli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 3 * time.Second, + }) if err != nil { t.Fatal(err) } - }) + defer cli.Close() + resp, err := cli.Status(ctx, endpoints[0]) + if err != nil { + t.Fatal(err) + } + if resp.Header.GetMemberId() == resp.Leader { + return epc.Procs[i] + } + } + t.Fatal("Leader not found") + return nil } diff --git a/tests/e2e/v3_curl_test.go b/tests/e2e/v3_curl_test.go index 17fe6590b..35450bbe4 100644 --- a/tests/e2e/v3_curl_test.go +++ b/tests/e2e/v3_curl_test.go @@ -18,6 +18,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "math/rand" "path" "strconv" "testing" @@ -243,7 +244,7 @@ func testV3CurlAuth(cx ctlCtx) { lineFunc = func(txt string) bool { return true } ) - cmdArgs = e2e.CURLPrefixArgs(cx.epc, "POST", e2e.CURLReq{Endpoint: path.Join(p, "/auth/authenticate"), Value: string(authreq)}) + cmdArgs = e2e.CURLPrefixArgs(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{Endpoint: path.Join(p, "/auth/authenticate"), Value: string(authreq)}) proc, err := e2e.SpawnCmd(cmdArgs, cx.envMap) testutil.AssertNil(cx.t, err) defer proc.Close() @@ -282,7 +283,7 @@ func testV3CurlCampaign(cx ctlCtx) { if err != nil { cx.t.Fatal(err) } - cargs := e2e.CURLPrefixArgs(cx.epc, "POST", e2e.CURLReq{ + cargs := e2e.CURLPrefixArgs(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{ Endpoint: path.Join(cx.apiPrefix, "/election/campaign"), Value: string(cdata), }) diff --git a/tests/framework/e2e/curl.go b/tests/framework/e2e/curl.go index 284b49aaa..a3b11de85 100644 --- a/tests/framework/e2e/curl.go +++ b/tests/framework/e2e/curl.go @@ -40,20 +40,20 @@ type CURLReq struct { // CURLPrefixArgs builds the beginning of a curl command for a given key // addressed to a random URL in the given cluster. -func CURLPrefixArgs(clus *EtcdProcessCluster, method string, req CURLReq) []string { +func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method string, req CURLReq) []string { var ( cmdArgs = []string{"curl"} - acurl = clus.Procs[rand.Intn(clus.Cfg.ClusterSize)].Config().Acurl + acurl = member.Config().Acurl ) if req.MetricsURLScheme != "https" { if req.IsTLS { - if clus.Cfg.ClientTLS != ClientTLSAndNonTLS { + if cfg.ClientTLS != ClientTLSAndNonTLS { panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS") } cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath) - acurl = ToTLS(clus.Procs[rand.Intn(clus.Cfg.ClusterSize)].Config().Acurl) - } else if clus.Cfg.ClientTLS == ClientTLS { - if !clus.Cfg.NoCN { + acurl = ToTLS(member.Config().Acurl) + } else if cfg.ClientTLS == ClientTLS { + if !cfg.NoCN { cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath) } else { cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath3, "--key", PrivateKeyPath3) @@ -61,7 +61,7 @@ func CURLPrefixArgs(clus *EtcdProcessCluster, method string, req CURLReq) []stri } } if req.MetricsURLScheme != "" { - acurl = clus.Procs[rand.Intn(clus.Cfg.ClusterSize)].EndpointsMetrics()[0] + acurl = member.EndpointsMetrics()[0] } ep := acurl + req.Endpoint @@ -94,13 +94,13 @@ func CURLPrefixArgs(clus *EtcdProcessCluster, method string, req CURLReq) []stri } func CURLPost(clus *EtcdProcessCluster, req CURLReq) error { - return SpawnWithExpect(CURLPrefixArgs(clus, "POST", req), req.Expected) + return SpawnWithExpect(CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "POST", req), req.Expected) } func CURLPut(clus *EtcdProcessCluster, req CURLReq) error { - return SpawnWithExpect(CURLPrefixArgs(clus, "PUT", req), req.Expected) + return SpawnWithExpect(CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "PUT", req), req.Expected) } func CURLGet(clus *EtcdProcessCluster, req CURLReq) error { - return SpawnWithExpect(CURLPrefixArgs(clus, "GET", req), req.Expected) + return SpawnWithExpect(CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "GET", req), req.Expected) }