mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
test: deflake TestDowngradeUpgradeClusterOf3 timeout
In the TestDowngradeUpgradeCluster case, the brand-new cluster is using simple-config-changer, which means that entries has been committed before leader election and these entries will be applied when etcdserver starts to receive apply-requests. The simple-config-changer will mark the `confState` dirty and the storage backend precommit hook will update the `confState`. For the new cluster, the storage version is nil at the beginning. And it will be v3.5 if the `confState` record has been committed. And it will be >v3.5 if the `storageVersion` record has been committed. When the new cluster is ready, the leader will set init cluster version with v3.6.x. And then it will trigger the `monitorStorageVersion` to update the `storageVersion` to v3.6.x. If the `confState` record has been updated before cluster version update, we will get storageVersion record. If the storage backend doesn't commit in time, the `monitorStorageVersion` won't update the version because of `cannot detect storage schema version: missing confstate information`. And then we file the downgrade request before next round of `monitorStorageVersion`(per 4 second), the cluster version will be v3.5.0 which is equal to the `UnsafeDetectSchemaVersion`'s result. And we won't see that `The server is ready to downgrade`. It is easy to reproduce the issue if you use cpuset or taskset to limit in two cpus. So, we should wait for the new cluster's storage ready before downgrade request. Fixes: #14540 Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
@@ -16,7 +16,9 @@ package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -27,6 +29,7 @@ import (
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
"go.etcd.io/etcd/tests/v3/framework/testutils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestDowngradeUpgradeClusterOf1(t *testing.T) {
|
||||
@@ -43,17 +46,24 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
|
||||
if !fileutil.Exist(lastReleaseBinary) {
|
||||
t.Skipf("%q does not exist", lastReleaseBinary)
|
||||
}
|
||||
|
||||
currentVersion := semver.New(version.Version)
|
||||
lastVersion := semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1}
|
||||
currentVersionStr := fmt.Sprintf("%d.%d", currentVersion.Major, currentVersion.Minor)
|
||||
lastVersionStr := fmt.Sprintf("%d.%d", lastVersion.Major, lastVersion.Minor)
|
||||
|
||||
currentVersion.PreRelease = ""
|
||||
currentVersionStr := currentVersion.String()
|
||||
lastVersionStr := lastVersion.String()
|
||||
|
||||
e2e.BeforeTest(t)
|
||||
|
||||
t.Logf("Create cluster with version %s", currentVersionStr)
|
||||
epc := newCluster(t, currentEtcdBinary, clusterSize)
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
|
||||
Cluster: currentVersionStr,
|
||||
Server: version.Version,
|
||||
Storage: currentVersionStr,
|
||||
})
|
||||
}
|
||||
t.Logf("Cluster created")
|
||||
|
||||
@@ -62,36 +72,49 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
|
||||
|
||||
t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
|
||||
Cluster: lastVersionStr,
|
||||
Server: version.Version,
|
||||
Storage: lastVersionStr,
|
||||
})
|
||||
e2e.AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
|
||||
}
|
||||
t.Log("Cluster is ready for downgrade")
|
||||
|
||||
t.Log("Cluster is ready for downgrade")
|
||||
t.Logf("Starting downgrade process to %q", lastVersionStr)
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
t.Logf("Downgrading member %d by running %s binary", i, lastReleaseBinary)
|
||||
stopEtcd(t, epc.Procs[i])
|
||||
startEtcd(t, epc.Procs[i], lastReleaseBinary)
|
||||
}
|
||||
|
||||
t.Log("All members downgraded, validating downgrade")
|
||||
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
|
||||
Cluster: lastVersionStr,
|
||||
Server: lastVersionStr,
|
||||
})
|
||||
}
|
||||
t.Log("Downgrade complete")
|
||||
|
||||
t.Log("Downgrade complete")
|
||||
t.Logf("Starting upgrade process to %q", currentVersionStr)
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
t.Logf("Upgrading member %d", i)
|
||||
stopEtcd(t, epc.Procs[i])
|
||||
startEtcd(t, epc.Procs[i], currentEtcdBinary)
|
||||
if i+1 < len(epc.Procs) {
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
|
||||
}
|
||||
// NOTE: The leader has monitor to the cluster version, which will
|
||||
// update cluster version. We don't need to check the transient
|
||||
// version just in case that it might be flaky.
|
||||
}
|
||||
|
||||
t.Log("All members upgraded, validating upgrade")
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
|
||||
Cluster: currentVersionStr,
|
||||
Server: version.Version,
|
||||
Storage: currentVersionStr,
|
||||
})
|
||||
}
|
||||
t.Log("Upgrade complete")
|
||||
}
|
||||
@@ -140,30 +163,23 @@ func stopEtcd(t *testing.T, ep e2e.EtcdProcess) {
|
||||
}
|
||||
|
||||
func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) {
|
||||
// Two separate calls to expect as it doesn't support multiple matches on the same line
|
||||
var err error
|
||||
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
|
||||
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
|
||||
for {
|
||||
if expect.Server != "" {
|
||||
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
result, err := getMemberVersionByCurl(cfg, member)
|
||||
if err != nil {
|
||||
cfg.Logger.Warn("failed to get member version and retrying", zap.Error(err))
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
if expect.Cluster != "" {
|
||||
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := compareMemberVersion(expect, result); err != nil {
|
||||
cfg.Logger.Warn("failed to validate and retrying", zap.Error(err))
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
|
||||
@@ -190,3 +206,33 @@ func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
|
||||
t.Fatal("Leader not found")
|
||||
return nil
|
||||
}
|
||||
|
||||
func compareMemberVersion(expect version.Versions, target version.Versions) error {
|
||||
if expect.Server != "" && expect.Server != target.Server {
|
||||
return fmt.Errorf("expect etcdserver version %v, but got %v", expect.Server, target.Server)
|
||||
}
|
||||
|
||||
if expect.Cluster != "" && expect.Cluster != target.Cluster {
|
||||
return fmt.Errorf("expect etcdcluster version %v, but got %v", expect.Cluster, target.Cluster)
|
||||
}
|
||||
|
||||
if expect.Storage != "" && expect.Storage != target.Storage {
|
||||
return fmt.Errorf("expect storage version %v, but got %v", expect.Storage, target.Storage)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) (version.Versions, error) {
|
||||
args := e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"})
|
||||
lines, err := e2e.RunUtilCompletion(args, nil)
|
||||
if err != nil {
|
||||
return version.Versions{}, err
|
||||
}
|
||||
|
||||
data := strings.Join(lines, "\n")
|
||||
result := version.Versions{}
|
||||
if err := json.Unmarshal([]byte(data), &result); err != nil {
|
||||
return version.Versions{}, fmt.Errorf("failed to unmarshal (%v): %w", data, err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -81,6 +81,22 @@ func SpawnWithExpectLines(ctx context.Context, args []string, envVars map[string
|
||||
return lines, perr
|
||||
}
|
||||
|
||||
func RunUtilCompletion(args []string, envVars map[string]string) ([]string, error) {
|
||||
proc, err := SpawnCmd(args, envVars)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to spawn command: %w", err)
|
||||
}
|
||||
defer proc.Stop()
|
||||
|
||||
perr := proc.Wait()
|
||||
// make sure that all the outputs are received
|
||||
proc.Close()
|
||||
if perr != nil {
|
||||
return nil, fmt.Errorf("unexpected error from command %v: %w", args, perr)
|
||||
}
|
||||
return proc.Lines(), nil
|
||||
}
|
||||
|
||||
func RandomLeaseID() int64 {
|
||||
return rand.New(rand.NewSource(time.Now().UnixNano())).Int63()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user