mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Implement HA downgrade test
This commit is contained in:
@@ -16,7 +16,6 @@ package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -27,7 +26,15 @@ import (
|
||||
"go.etcd.io/etcd/tests/v3/framework/testutils"
|
||||
)
|
||||
|
||||
func TestDowngradeUpgrade(t *testing.T) {
|
||||
func TestDowngradeUpgradeClusterOf1(t *testing.T) {
|
||||
testDowngradeUpgrade(t, 1)
|
||||
}
|
||||
|
||||
func TestDowngradeUpgradeClusterOf3(t *testing.T) {
|
||||
testDowngradeUpgrade(t, 3)
|
||||
}
|
||||
|
||||
func testDowngradeUpgrade(t *testing.T, clusterSize int) {
|
||||
currentEtcdBinary := e2e.BinDir + "/etcd"
|
||||
lastReleaseBinary := e2e.BinDir + "/etcd-last-release"
|
||||
if !fileutil.Exist(lastReleaseBinary) {
|
||||
@@ -41,38 +48,52 @@ func TestDowngradeUpgrade(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
|
||||
t.Logf("Create cluster with version %s", currentVersionStr)
|
||||
epc := newCluster(t, currentEtcdBinary)
|
||||
validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
|
||||
epc := newCluster(t, currentEtcdBinary, clusterSize)
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
|
||||
}
|
||||
t.Logf("Cluster created")
|
||||
|
||||
t.Logf("etcdctl downgrade enable %s", lastVersion)
|
||||
downgradeEnable(t, epc, lastVersion)
|
||||
|
||||
t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
|
||||
expectLog(t, epc, "The server is ready to downgrade")
|
||||
validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
expectLog(t, epc.Procs[i], "The server is ready to downgrade")
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
|
||||
}
|
||||
t.Log("Cluster is ready for downgrade")
|
||||
|
||||
t.Log("Starting downgrade process")
|
||||
stopEtcd(t, epc.Procs[0])
|
||||
startEtcd(t, epc, lastReleaseBinary)
|
||||
expectLog(t, epc, "the cluster has been downgraded")
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
t.Logf("Downgrading member %d", i)
|
||||
stopEtcd(t, epc.Procs[i])
|
||||
startEtcd(t, epc.Procs[i], lastReleaseBinary)
|
||||
}
|
||||
t.Log("All members downgraded, validating downgrade")
|
||||
validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
|
||||
expectLog(t, leader(t, epc), "the cluster has been downgraded")
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
|
||||
}
|
||||
t.Log("Downgrade complete")
|
||||
|
||||
t.Log("Starting upgrade process")
|
||||
stopEtcd(t, epc.Procs[0])
|
||||
startEtcd(t, epc, currentEtcdBinary)
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
t.Logf("Upgrading member %d", i)
|
||||
stopEtcd(t, epc.Procs[i])
|
||||
startEtcd(t, epc.Procs[i], currentEtcdBinary)
|
||||
}
|
||||
t.Log("All members upgraded, validating upgrade")
|
||||
validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
|
||||
}
|
||||
t.Log("Upgrade complete")
|
||||
}
|
||||
|
||||
func newCluster(t *testing.T, execPath string) *e2e.EtcdProcessCluster {
|
||||
func newCluster(t *testing.T, execPath string, clusterSize int) *e2e.EtcdProcessCluster {
|
||||
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
|
||||
ExecPath: execPath,
|
||||
ClusterSize: 1,
|
||||
ClusterSize: clusterSize,
|
||||
InitialToken: "new",
|
||||
KeepDataDir: true,
|
||||
})
|
||||
@@ -87,9 +108,9 @@ func newCluster(t *testing.T, execPath string) *e2e.EtcdProcessCluster {
|
||||
return epc
|
||||
}
|
||||
|
||||
func startEtcd(t *testing.T, epc *e2e.EtcdProcessCluster, execPath string) {
|
||||
epc.Procs[0].Config().ExecPath = execPath
|
||||
err := epc.Procs[0].Restart()
|
||||
func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) {
|
||||
ep.Config().ExecPath = execPath
|
||||
err := ep.Restart()
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||
}
|
||||
@@ -111,20 +132,20 @@ func stopEtcd(t *testing.T, ep e2e.EtcdProcess) {
|
||||
}
|
||||
}
|
||||
|
||||
func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.Versions) {
|
||||
func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) {
|
||||
// Two separate calls to expect as it doesn't support multiple matches on the same line
|
||||
var err error
|
||||
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
|
||||
for {
|
||||
if expect.Server != "" {
|
||||
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc.Cfg, epc.Procs[rand.Intn(epc.Cfg.ClusterSize)], "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server)
|
||||
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if expect.Cluster != "" {
|
||||
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc.Cfg, epc.Procs[rand.Intn(epc.Cfg.ClusterSize)], "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster)
|
||||
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
@@ -138,13 +159,23 @@ func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.V
|
||||
}
|
||||
}
|
||||
|
||||
func expectLog(t *testing.T, epc *e2e.EtcdProcessCluster, expectLog string) {
|
||||
func expectLog(t *testing.T, ep e2e.EtcdProcess, expectLog string) {
|
||||
t.Helper()
|
||||
var err error
|
||||
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
|
||||
_, err = epc.Procs[0].Logs().Expect(expectLog)
|
||||
_, err = ep.Logs().Expect(expectLog)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
leader, err := epc.Leader(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return leader
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
@@ -23,6 +24,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
@@ -513,3 +515,25 @@ func (epc *EtcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
|
||||
}
|
||||
return ret
|
||||
}
|
||||
func (epc *EtcdProcessCluster) Leader(ctx context.Context) (EtcdProcess, error) {
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
endpoints := epc.Procs[i].EndpointsV3()
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: endpoints,
|
||||
DialTimeout: 3 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cli.Close()
|
||||
resp, err := cli.Status(ctx, endpoints[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.Header.GetMemberId() == resp.Leader {
|
||||
return epc.Procs[i], nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("Leader not found")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user