diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 9c429e716..e9bac4f37 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -26,6 +26,8 @@ import ( "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/server/v3/storage/schema" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) // NewMigrateCommand prints out the version of etcd. @@ -90,12 +92,24 @@ func (o *migrateOptions) Config() (*migrateConfig, error) { dbPath := datadir.ToBackendFileName(o.dataDir) c.be = backend.NewDefaultBackend(dbPath) + walPath := datadir.ToWalDir(o.dataDir) + w, err := wal.OpenForRead(GetLogger(), walPath, walpb.Snapshot{}) + if err != nil { + return nil, fmt.Errorf(`failed to open wal: %v`, err) + } + defer w.Close() + c.walVersion, err = wal.ReadWALVersion(w) + if err != nil { + return nil, fmt.Errorf(`failed to read wal: %v`, err) + } + return c, nil } type migrateConfig struct { be backend.Backend targetVersion *semver.Version + walVersion schema.WALVersion force bool } @@ -112,7 +126,7 @@ func migrateCommandFunc(c *migrateConfig) error { lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(¤t))) return nil } - err = schema.Migrate(lg, tx, *c.targetVersion) + err = schema.Migrate(lg, tx, c.walVersion, *c.targetVersion) if err != nil { if !c.force { return err diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index e5e943d15..bc4b68645 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -18,34 +18,29 @@ import ( "context" "github.com/coreos/go-semver/semver" - "go.uber.org/zap" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/api/v3/version" serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" - "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" ) // serverVersionAdapter implements Server interface needed by serverversion.Monitor type serverVersionAdapter struct { *EtcdServer - tx backend.BatchTx } func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter { return &serverVersionAdapter{ EtcdServer: s, - tx: nil, } } var _ serverversion.Server = (*serverVersionAdapter)(nil) func (s *serverVersionAdapter) UpdateClusterVersion(version string) { - // TODO switch to updateClusterVersionV3 in 3.6 - s.GoAttach(func() { s.updateClusterVersionV2(version) }) + s.GoAttach(func() { s.updateClusterVersionV3(version) }) } func (s *serverVersionAdapter) LinearizableReadNotify(ctx context.Context) error { @@ -77,34 +72,19 @@ func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions } func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { - if s.tx == nil { - s.Lock() - defer s.Unlock() - } - v, err := schema.UnsafeDetectSchemaVersion(s.lg, s.tx) + tx := s.be.BatchTx() + tx.Lock() + defer tx.Unlock() + v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx) if err != nil { return nil } return &v } -func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) { - if s.tx == nil { - s.Lock() - defer s.Unlock() - } - err := schema.UnsafeMigrate(s.lg, s.tx, target) - if err != nil { - s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err)) - } -} - -func (s *serverVersionAdapter) Lock() { - s.tx = s.be.BatchTx() - s.tx.Lock() -} - -func (s *serverVersionAdapter) Unlock() { - s.tx.Unlock() - s.tx = nil +func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error { + tx := s.be.BatchTx() + tx.Lock() + defer tx.Unlock() + return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 649e8a62f..795556b88 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -271,12 +271,15 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { if c.be != nil { c.downgradeInfo = c.be.DowngradeInfoFromBackend() } - d := &serverversion.DowngradeInfo{Enabled: false} - if c.downgradeInfo != nil { - d = &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} - } sv := semver.Must(semver.NewVersion(version.Version)) - serverversion.MustDetectDowngrade(c.lg, sv, c.version, d) + if c.downgradeInfo != nil && c.downgradeInfo.Enabled { + c.lg.Info( + "cluster is downgrading to target version", + zap.String("target-cluster-version", c.downgradeInfo.TargetVersion), + zap.String("current-server-version", sv.String()), + ) + } + serverversion.MustDetectDowngrade(c.lg, sv, c.version) onSet(c.lg, c.version) for _, m := range c.members { @@ -548,7 +551,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s oldVer := c.version c.version = ver sv := semver.Must(semver.NewVersion(version.Version)) - serverversion.MustDetectDowngrade(c.lg, sv, c.version, c.downgradeInfo) + serverversion.MustDetectDowngrade(c.lg, sv, c.version) if c.v2store != nil { mustSaveClusterVersionToStore(c.lg, c.v2store, ver) } @@ -759,14 +762,6 @@ func (c *RaftCluster) SetDowngradeInfo(d *serverversion.DowngradeInfo, shouldApp } c.downgradeInfo = d - - if d.Enabled { - c.lg.Info( - "The server is ready to downgrade", - zap.String("target-version", d.TargetVersion), - zap.String("server-version", version.Version), - ) - } } // IsMemberExist returns if the member with the given id exists in cluster. diff --git a/server/etcdserver/version/downgrade.go b/server/etcdserver/version/downgrade.go index d70fd63ac..60cab4931 100644 --- a/server/etcdserver/version/downgrade.go +++ b/server/etcdserver/version/downgrade.go @@ -37,31 +37,11 @@ func isValidDowngrade(verFrom *semver.Version, verTo *semver.Version) bool { return verTo.Equal(*allowedDowngradeVersion(verFrom)) } -// MustDetectDowngrade will detect unexpected downgrade when the local server is recovered. -func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) { +// MustDetectDowngrade will detect local server joining cluster that doesn't support it's version. +func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) { // only keep major.minor version for comparison against cluster version sv = &semver.Version{Major: sv.Major, Minor: sv.Minor} - // if the cluster enables downgrade, check local version against downgrade target version. - if d != nil && d.Enabled && d.TargetVersion != "" { - if sv.Equal(*d.GetTargetVersion()) { - if cv != nil { - lg.Info( - "cluster is downgrading to target version", - zap.String("target-cluster-version", d.TargetVersion), - zap.String("determined-cluster-version", version.Cluster(cv.String())), - zap.String("current-server-version", sv.String()), - ) - } - return - } - lg.Panic( - "invalid downgrade; server version is not allowed to join when downgrade is enabled", - zap.String("current-server-version", sv.String()), - zap.String("target-cluster-version", d.TargetVersion), - ) - } - // if the cluster disables downgrade, check local version against determined cluster version. // the validation passes when local version is not less than cluster version if cv != nil && sv.LessThan(*cv) { diff --git a/server/etcdserver/version/downgrade_test.go b/server/etcdserver/version/downgrade_test.go index 97c8d2125..a16a1bafc 100644 --- a/server/etcdserver/version/downgrade_test.go +++ b/server/etcdserver/version/downgrade_test.go @@ -29,92 +29,47 @@ func TestMustDetectDowngrade(t *testing.T) { lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} oneMinorHigher := &semver.Version{Major: lv.Major, Minor: lv.Minor + 1} oneMinorLower := &semver.Version{Major: lv.Major, Minor: lv.Minor - 1} - downgradeEnabledHigherVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorHigher.String()} - downgradeEnabledEqualVersion := &DowngradeInfo{Enabled: true, TargetVersion: lv.String()} - downgradeEnabledLowerVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorLower.String()} - downgradeDisabled := &DowngradeInfo{Enabled: false} tests := []struct { name string clusterVersion *semver.Version - downgrade *DowngradeInfo success bool message string }{ { - "Succeeded when downgrade is disabled and cluster version is nil", + "Succeeded when cluster version is nil", nil, - downgradeDisabled, true, "", }, { - "Succeeded when downgrade is disabled and cluster version is one minor lower", + "Succeeded when cluster version is one minor lower", oneMinorLower, - downgradeDisabled, true, "", }, { - "Succeeded when downgrade is disabled and cluster version is server version", + "Succeeded when cluster version is server version", lv, - downgradeDisabled, true, "", }, { - "Failed when downgrade is disabled and server version is lower than determined cluster version ", + "Failed when server version is lower than determined cluster version ", oneMinorHigher, - downgradeDisabled, false, "invalid downgrade; server version is lower than determined cluster version", }, - { - "Succeeded when downgrade is enabled and cluster version is nil", - nil, - downgradeEnabledEqualVersion, - true, - "", - }, - { - "Failed when downgrade is enabled and server version is target version", - lv, - downgradeEnabledEqualVersion, - true, - "cluster is downgrading to target version", - }, - { - "Succeeded when downgrade to lower version and server version is cluster version ", - lv, - downgradeEnabledLowerVersion, - false, - "invalid downgrade; server version is not allowed to join when downgrade is enabled", - }, - { - "Failed when downgrade is enabled and local version is out of range and cluster version is nil", - nil, - downgradeEnabledHigherVersion, - false, - "invalid downgrade; server version is not allowed to join when downgrade is enabled", - }, - - { - "Failed when downgrade is enabled and local version is out of range", - lv, - downgradeEnabledHigherVersion, - false, - "invalid downgrade; server version is not allowed to join when downgrade is enabled", - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { lg := zaptest.NewLogger(t) sv := semver.Must(semver.NewVersion(version.Version)) - err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion, tt.downgrade) + err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion) if tt.success != (err == nil) { - t.Errorf("Unexpected status, got %q, wanted: %v", err, tt.success) + t.Errorf("Unexpected success, got: %v, wanted: %v", err == nil, tt.success) // TODO test err } if err != nil && tt.message != fmt.Sprintf("%s", err) { @@ -124,11 +79,11 @@ func TestMustDetectDowngrade(t *testing.T) { } } -func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) (err interface{}) { +func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) (err interface{}) { defer func() { err = recover() }() - MustDetectDowngrade(lg, sv, cv, d) + MustDetectDowngrade(lg, sv, cv) return err } diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 1a8e73e47..8ac8d8e8d 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -39,10 +39,7 @@ type Server interface { DowngradeCancel(ctx context.Context) error GetStorageVersion() *semver.Version - UpdateStorageVersion(semver.Version) - - Lock() - Unlock() + UpdateStorageVersion(semver.Version) error } func NewMonitor(lg *zap.Logger, storage Server) *Monitor { @@ -61,18 +58,35 @@ func (m *Monitor) UpdateClusterVersionIfNeeded() { } } -// decideClusterVersion decides the cluster version based on the members versions if all members agree on a higher one. +// decideClusterVersion decides whether to change cluster version and its next value. +// New cluster version is based on the members versions server and whether cluster is downgrading. +// Returns nil if cluster version should be left unchanged. func (m *Monitor) decideClusterVersion() *semver.Version { clusterVersion := m.s.GetClusterVersion() - membersMinimalVersion := m.membersMinimalVersion() + minimalServerVersion := m.membersMinimalServerVersion() if clusterVersion == nil { - if membersMinimalVersion != nil { - return membersMinimalVersion + if minimalServerVersion != nil { + return minimalServerVersion } return semver.New(version.MinClusterVersion) } - if membersMinimalVersion != nil && clusterVersion.LessThan(*membersMinimalVersion) && IsValidVersionChange(clusterVersion, membersMinimalVersion) { - return membersMinimalVersion + if minimalServerVersion == nil { + return nil + } + downgrade := m.s.GetDowngradeInfo() + if downgrade != nil && downgrade.Enabled { + if IsValidVersionChange(clusterVersion, downgrade.GetTargetVersion()) && IsValidVersionChange(minimalServerVersion, downgrade.GetTargetVersion()) { + return downgrade.GetTargetVersion() + } + m.lg.Error("Cannot downgrade cluster version, version change is not valid", + zap.String("downgrade-version", downgrade.TargetVersion), + zap.String("cluster-version", clusterVersion.String()), + zap.String("minimal-server-version", minimalServerVersion.String()), + ) + return nil + } + if clusterVersion.LessThan(*minimalServerVersion) && IsValidVersionChange(clusterVersion, minimalServerVersion) { + return minimalServerVersion } return nil } @@ -83,15 +97,25 @@ func (m *Monitor) UpdateStorageVersionIfNeeded() { if cv == nil { return } - m.s.Lock() - defer m.s.Unlock() sv := m.s.GetStorageVersion() if sv == nil || sv.Major != cv.Major || sv.Minor != cv.Minor { if sv != nil { m.lg.Info("storage version differs from storage version.", zap.String("cluster-version", cv.String()), zap.String("storage-version", sv.String())) } - m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor}) + err := m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor}) + if err != nil { + m.lg.Error("failed update storage version", zap.String("cluster-version", cv.String()), zap.Error(err)) + return + } + d := m.s.GetDowngradeInfo() + if d != nil && d.Enabled { + m.lg.Info( + "The server is ready to downgrade", + zap.String("target-version", d.TargetVersion), + zap.String("server-version", version.Version), + ) + } } } @@ -112,11 +136,11 @@ func (m *Monitor) CancelDowngradeIfNeeded() { } } -// membersMinimalVersion returns the min server version in the map, or nil if the min +// membersMinimalServerVersion returns the min server version in the map, or nil if the min // version in unknown. // It prints out log if there is a member with a higher version than the // local version. -func (m *Monitor) membersMinimalVersion() *semver.Version { +func (m *Monitor) membersMinimalServerVersion() *semver.Version { vers := m.s.GetMembersVersions() var minV *semver.Version lv := semver.Must(semver.NewVersion(version.Version)) @@ -156,11 +180,12 @@ func (m *Monitor) membersMinimalVersion() *semver.Version { // It can be used to decide the whether the cluster finishes downgrading to target version. func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool { vers := m.s.GetMembersVersions() + targetVersion = &semver.Version{Major: targetVersion.Major, Minor: targetVersion.Minor} for mid, ver := range vers { if ver == nil { return false } - v, err := semver.NewVersion(ver.Cluster) + v, err := semver.NewVersion(ver.Server) if err != nil { m.lg.Warn( "failed to parse server version of remote member", @@ -170,6 +195,7 @@ func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool { ) return false } + v = &semver.Version{Major: v.Major, Minor: v.Minor} if !targetVersion.Equal(*v) { m.lg.Warn("remotes server has mismatching etcd version", zap.String("remote-member-id", mid), diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index ffc908e7b..b11f09581 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -50,7 +50,7 @@ func TestMemberMinimalVersion(t *testing.T) { monitor := NewMonitor(zaptest.NewLogger(t), &storageMock{ memberVersions: tt.memberVersions, }) - minV := monitor.membersMinimalVersion() + minV := monitor.membersMinimalServerVersion() if !reflect.DeepEqual(minV, tt.wantVersion) { t.Errorf("#%d: ver = %+v, want %+v", i, minV, tt.wantVersion) } @@ -127,7 +127,7 @@ func TestVersionMatchTarget(t *testing.T) { "When cannot parse peer version", &semver.Version{Major: 3, Minor: 4}, map[string]*version.Versions{ - "mem1": {Server: "3.4.1", Cluster: "3.4"}, + "mem1": {Server: "3.4", Cluster: "3.4.0"}, "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, }, @@ -204,6 +204,36 @@ func TestUpdateClusterVersionIfNeeded(t *testing.T) { clusterVersion: &V3_5, expectClusterVersion: &V3_6, }, + { + name: "Should downgrade cluster version if downgrade is set to allow older members to join", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.6.0", Server: "3.6.0"}, + "b": {Cluster: "3.6.0", Server: "3.6.0"}, + }, + clusterVersion: &V3_6, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectClusterVersion: &V3_5, + }, + { + name: "Should maintain downgrade target version to allow older members to join", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.6.0"}, + "b": {Cluster: "3.5.0", Server: "3.6.0"}, + }, + clusterVersion: &V3_5, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectClusterVersion: &V3_5, + }, + { + name: "Don't downgrade below supported range", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.6.0"}, + "b": {Cluster: "3.5.0", Server: "3.6.0"}, + }, + clusterVersion: &V3_5, + downgrade: &DowngradeInfo{TargetVersion: "3.4.0", Enabled: true}, + expectClusterVersion: &V3_5, + }, } for _, tt := range tests { @@ -247,6 +277,24 @@ func TestCancelDowngradeIfNeeded(t *testing.T) { "b": {Cluster: "3.6.0", Server: "3.6.2"}, }, }, + { + name: "Continue downgrade if just started", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.6.1"}, + "b": {Cluster: "3.5.0", Server: "3.6.2"}, + }, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectDowngrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + }, + { + name: "Continue downgrade if there is at least one member with not matching", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.5.1"}, + "b": {Cluster: "3.5.0", Server: "3.6.2"}, + }, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectDowngrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + }, { name: "Cancel downgrade if all members have downgraded", memberVersions: map[string]*version.Versions{ @@ -369,17 +417,7 @@ func (s *storageMock) GetStorageVersion() *semver.Version { return s.storageVersion } -func (s *storageMock) UpdateStorageVersion(v semver.Version) { +func (s *storageMock) UpdateStorageVersion(v semver.Version) error { s.storageVersion = &v -} - -func (s *storageMock) Lock() { - if s.locked { - panic("Deadlock") - } - s.locked = true -} - -func (s *storageMock) Unlock() { - s.locked = false + return nil } diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go index 6beaec0b3..c82ca808c 100644 --- a/server/etcdserver/version/version_test.go +++ b/server/etcdserver/version/version_test.go @@ -62,6 +62,68 @@ func TestUpgradeThreeNodes(t *testing.T) { assert.Equal(t, newCluster(lg, 3, V3_7), c) } +func TestDowngradeSingleNode(t *testing.T) { + lg := zaptest.NewLogger(t) + c := newCluster(lg, 1, V3_6) + c.StepMonitors() + assert.Equal(t, newCluster(lg, 1, V3_6), c) + + assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5)) + c.StepMonitors() + assert.Equal(t, V3_5, c.clusterVersion) + + c.ReplaceMemberBinary(0, V3_5) + c.StepMonitors() + + assert.Equal(t, newCluster(lg, 1, V3_5), c) +} + +func TestDowngradeThreeNode(t *testing.T) { + lg := zaptest.NewLogger(t) + c := newCluster(lg, 3, V3_6) + c.StepMonitors() + assert.Equal(t, newCluster(lg, 3, V3_6), c) + + assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5)) + c.StepMonitors() + assert.Equal(t, V3_5, c.clusterVersion) + + c.ReplaceMemberBinary(0, V3_5) + c.StepMonitors() + c.ReplaceMemberBinary(1, V3_5) + c.StepMonitors() + c.ReplaceMemberBinary(2, V3_5) + c.StepMonitors() + + assert.Equal(t, newCluster(lg, 3, V3_5), c) +} + +func TestNewerMemberCanReconnectDuringDowngrade(t *testing.T) { + lg := zaptest.NewLogger(t) + c := newCluster(lg, 3, V3_6) + c.StepMonitors() + assert.Equal(t, newCluster(lg, 3, V3_6), c) + + assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5)) + c.StepMonitors() + assert.Equal(t, V3_5, c.clusterVersion) + + c.ReplaceMemberBinary(0, V3_5) + c.StepMonitors() + + c.MemberCrashes(2) + c.StepMonitors() + c.MemberReconnects(2) + c.StepMonitors() + + c.ReplaceMemberBinary(1, V3_5) + c.StepMonitors() + c.ReplaceMemberBinary(2, V3_5) + c.StepMonitors() + + assert.Equal(t, newCluster(lg, 3, V3_5), c) +} + func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMock { cluster := &clusterMock{ lg: lg, @@ -71,6 +133,7 @@ func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMoc majorMinVer := semver.Version{Major: ver.Major, Minor: ver.Minor} for i := 0; i < memberCount; i++ { m := &memberMock{ + isRunning: true, cluster: cluster, serverVersion: ver, storageVersion: majorMinVer, @@ -113,22 +176,34 @@ func (c *clusterMock) Version() *Manager { func (c *clusterMock) MembersVersions() map[string]*version.Versions { result := map[string]*version.Versions{} for i, m := range c.members { - result[fmt.Sprintf("%d", i)] = &version.Versions{ - Server: m.serverVersion.String(), - Cluster: c.clusterVersion.String(), + if m.isRunning { + result[fmt.Sprintf("%d", i)] = &version.Versions{ + Server: m.serverVersion.String(), + Cluster: c.clusterVersion.String(), + } } } return result } func (c *clusterMock) ReplaceMemberBinary(mid int, newServerVersion semver.Version) { - MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion, c.downgradeInfo) + MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion) c.members[mid].serverVersion = newServerVersion } +func (c *clusterMock) MemberCrashes(mid int) { + c.members[mid].isRunning = false +} + +func (c *clusterMock) MemberReconnects(mid int) { + MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion) + c.members[mid].isRunning = true +} + type memberMock struct { cluster *clusterMock + isRunning bool isLeader bool serverVersion semver.Version storageVersion semver.Version @@ -174,15 +249,10 @@ func (m *memberMock) GetStorageVersion() *semver.Version { return &m.storageVersion } -func (m *memberMock) UpdateStorageVersion(v semver.Version) { +func (m *memberMock) UpdateStorageVersion(v semver.Version) error { m.storageVersion = v + return nil } func (m *memberMock) TriggerSnapshot() { } - -func (m *memberMock) Lock() { -} - -func (m *memberMock) Unlock() { -} diff --git a/server/mock/mockstorage/storage_recorder.go b/server/mock/mockstorage/storage_recorder.go index db989cd2c..73cad169f 100644 --- a/server/mock/mockstorage/storage_recorder.go +++ b/server/mock/mockstorage/storage_recorder.go @@ -15,6 +15,7 @@ package mockstorage import ( + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" @@ -57,4 +58,5 @@ func (p *storageRecorder) Sync() error { return nil } -func (p *storageRecorder) Close() error { return nil } +func (p *storageRecorder) Close() error { return nil } +func (p *storageRecorder) MinimalEtcdVersion() *semver.Version { return nil } diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index bac67bec5..c8c4a701b 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -24,19 +24,7 @@ import ( type migrationPlan []migrationStep -func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (p migrationPlan, err error) { - // TODO(serathius): Implement downgrades - if target.LessThan(current) { - lg.Error("Target version is lower than the current version, downgrades are not yet supported", - zap.String("storage-version", current.String()), - zap.String("target-storage-version", target.String()), - ) - return nil, fmt.Errorf("downgrades are not yet supported") - } - return buildPlan(lg, current, target) -} - -func buildPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) { +func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) { current = trimToMinor(current) target = trimToMinor(target) if current.Major != target.Major { diff --git a/server/storage/schema/migration_test.go b/server/storage/schema/migration_test.go index d81a6208d..95e4d9739 100644 --- a/server/storage/schema/migration_test.go +++ b/server/storage/schema/migration_test.go @@ -46,11 +46,9 @@ func TestNewPlan(t *testing.T) { target: V3_6, }, { - name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported", - current: V3_6, - target: V3_5, - expectError: true, - expectErrorMsg: "downgrades are not yet supported", + name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported", + current: V3_6, + target: V3_5, }, { name: "Upgrade v3.6 to v3.7 should fail as v3.7 is unknown", diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 198b90f3e..f3b29c011 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -52,15 +52,21 @@ func localBinaryVersion() semver.Version { return semver.Version{Major: v.Major, Minor: v.Minor} } -// Migrate updates storage schema to provided target version. -func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { - tx.Lock() - defer tx.Unlock() - return UnsafeMigrate(lg, tx, target) +type WALVersion interface { + // MinimalEtcdVersion returns minimal etcd version able to interpret WAL log. + MinimalEtcdVersion() *semver.Version } -// UnsafeMigrate is non-threadsafe version of Migrate. -func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { +// Migrate updates storage schema to provided target version. +// Downgrading requires that provided WAL doesn't contain unsupported entries. +func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { + tx.Lock() + defer tx.Unlock() + return UnsafeMigrate(lg, tx, w, target) +} + +// UnsafeMigrate is non thread-safe version of Migrate. +func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { return fmt.Errorf("cannot detect storage schema version: %w", err) @@ -69,6 +75,12 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) er if err != nil { return fmt.Errorf("cannot create migration plan: %w", err) } + if target.LessThan(current) { + minVersion := w.MinimalEtcdVersion() + if minVersion != nil && target.LessThan(*minVersion) { + return fmt.Errorf("cannot downgrade storage, WAL contains newer entries") + } + } return plan.unsafeExecute(lg, tx) } @@ -101,12 +113,16 @@ func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Vers func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, error) { // changes should be taken from higher version + var higherV = v if isUpgrade { - v = semver.Version{Major: v.Major, Minor: v.Minor + 1} + higherV = semver.Version{Major: v.Major, Minor: v.Minor + 1} } - actions, found := schemaChanges[v] + actions, found := schemaChanges[higherV] if !found { + if isUpgrade { + return nil, fmt.Errorf("version %q is not supported", higherV.String()) + } return nil, fmt.Errorf("version %q is not supported", v.String()) } return actions, nil diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index 823400760..f0fb943b4 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -15,15 +15,18 @@ package schema import ( - "fmt" "testing" "time" "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" + "go.etcd.io/etcd/server/v3/storage/wal" + waltesting "go.etcd.io/etcd/server/v3/storage/wal/testing" "go.uber.org/zap" ) @@ -75,7 +78,7 @@ func TestValidate(t *testing.T) { name: `V3.7 schema is unknown and should return error`, version: V3_7, expectError: true, - expectErrorMsg: "downgrades are not yet supported", + expectErrorMsg: `version "3.7.0" is not supported`, }, } for _, tc := range tcs { @@ -103,6 +106,7 @@ func TestMigrate(t *testing.T) { // Overrides which keys should be set (default based on version) overrideKeys func(tx backend.BatchTx) targetVersion semver.Version + walEntries []etcdserverpb.InternalRaftRequest expectVersion *semver.Version expectError bool @@ -168,33 +172,52 @@ func TestMigrate(t *testing.T) { targetVersion: V3_6, expectVersion: &V3_7, expectError: true, - expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + expectErrorMsg: `cannot create migration plan: version "3.7.0" is not supported`, }, { - name: "Downgrading v3.6 to v3.5 is not supported", - version: V3_6, - targetVersion: V3_5, + name: "Downgrading v3.6 to v3.5 works as there are no v3.6 wal entries", + version: V3_6, + targetVersion: V3_5, + walEntries: []etcdserverpb.InternalRaftRequest{ + {Range: &etcdserverpb.RangeRequest{Key: []byte("\x00"), RangeEnd: []byte("\xff")}}, + }, + expectVersion: nil, + }, + { + name: "Downgrading v3.6 to v3.5 fails if there are newer WAL entries", + version: V3_6, + targetVersion: V3_5, + walEntries: []etcdserverpb.InternalRaftRequest{ + {ClusterVersionSet: &membershippb.ClusterVersionSetRequest{Ver: "3.6.0"}}, + }, expectVersion: &V3_6, expectError: true, - expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + expectErrorMsg: "cannot downgrade storage, WAL contains newer entries", }, { - name: "Downgrading v3.5 to v3.4 is not supported", + name: "Downgrading v3.5 to v3.4 is not supported as schema was introduced in v3.6", version: V3_5, targetVersion: V3_4, expectVersion: nil, expectError: true, - expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + expectErrorMsg: `cannot create migration plan: version "3.5.0" is not supported`, }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { lg := zap.NewNop() dataPath := setupBackendData(t, tc.version, tc.overrideKeys) + w, _ := waltesting.NewTmpWAL(t, tc.walEntries) + defer w.Close() + walVersion, err := wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } b := backend.NewDefaultBackend(dataPath) defer b.Close() - err := Migrate(lg, b.BatchTx(), tc.targetVersion) + + err = Migrate(lg, b.BatchTx(), walVersion, tc.targetVersion) if (err != nil) != tc.expectError { t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError) } @@ -241,17 +264,29 @@ func TestMigrateIsReversible(t *testing.T) { tx.Lock() defer tx.Unlock() assertBucketState(t, tx, Meta, tc.state) + w, walPath := waltesting.NewTmpWAL(t, nil) + walVersion, err := wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } // Upgrade to current version ver := localBinaryVersion() - err := testUnsafeMigrate(lg, tx, ver) + err = UnsafeMigrate(lg, tx, walVersion, ver) if err != nil { t.Errorf("Migrate(lg, tx, %q) returned error %+v", ver, err) } assert.Equal(t, &ver, UnsafeReadStorageVersion(tx)) // Downgrade back to initial version - err = testUnsafeMigrate(lg, tx, tc.initialVersion) + w.Close() + w = waltesting.Reopen(t, walPath) + defer w.Close() + walVersion, err = wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } + err = UnsafeMigrate(lg, tx, walVersion, tc.initialVersion) if err != nil { t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err) } @@ -262,20 +297,6 @@ func TestMigrateIsReversible(t *testing.T) { } } -// Does the same as UnsafeMigrate but skips version checks -// TODO(serathius): Use UnsafeMigrate when downgrades are implemented -func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { - current, err := UnsafeDetectSchemaVersion(lg, tx) - if err != nil { - return fmt.Errorf("cannot determine storage version: %w", err) - } - plan, err := buildPlan(lg, current, target) - if err != nil { - return fmt.Errorf("cannot create migration plan: %w", err) - } - return plan.unsafeExecute(lg, tx) -} - func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx backend.BatchTx)) string { t.Helper() be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) diff --git a/server/storage/storage.go b/server/storage/storage.go index 047d1bb02..9207e1e4d 100644 --- a/server/storage/storage.go +++ b/server/storage/storage.go @@ -15,6 +15,9 @@ package storage import ( + "sync" + + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/storage/wal" @@ -34,12 +37,17 @@ type Storage interface { Release(snap raftpb.Snapshot) error // Sync WAL Sync() error + // MinimalEtcdVersion returns minimal etcd storage able to interpret WAL log. + MinimalEtcdVersion() *semver.Version } type storage struct { lg *zap.Logger s *snap.Snapshotter - w *wal.WAL + + // Mutex protected variables + mux sync.RWMutex + w *wal.WAL } func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage { @@ -48,6 +56,8 @@ func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage { // SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry. func (st *storage) SaveSnap(snap raftpb.Snapshot) error { + st.mux.RLock() + defer st.mux.RUnlock() walsnap := walpb.Snapshot{ Index: snap.Metadata.Index, Term: snap.Metadata.Term, @@ -69,6 +79,8 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { // - releases the locks to the wal files that are older than the provided wal for the given snap. // - deletes any .snap.db files that are older than the given snap. func (st *storage) Release(snap raftpb.Snapshot) error { + st.mux.RLock() + defer st.mux.RUnlock() if err := st.w.ReleaseLockTo(snap.Metadata.Index); err != nil { return err } @@ -76,13 +88,46 @@ func (st *storage) Release(snap raftpb.Snapshot) error { } func (st *storage) Save(s raftpb.HardState, ents []raftpb.Entry) error { + st.mux.RLock() + defer st.mux.RUnlock() return st.w.Save(s, ents) } func (st *storage) Close() error { + st.mux.Lock() + defer st.mux.Unlock() return st.w.Close() } func (st *storage) Sync() error { + st.mux.RLock() + defer st.mux.RUnlock() return st.w.Sync() } + +func (st *storage) MinimalEtcdVersion() *semver.Version { + st.mux.Lock() + defer st.mux.Unlock() + walsnap := walpb.Snapshot{} + + sn, err := st.s.Load() + if err != nil && err != snap.ErrNoSnapshot { + panic(err) + } + if sn != nil { + walsnap.Index = sn.Metadata.Index + walsnap.Term = sn.Metadata.Term + walsnap.ConfState = &sn.Metadata.ConfState + } + w, err := st.w.Reopen(st.lg, walsnap) + if err != nil { + panic(err) + } + _, _, ents, err := w.ReadAll() + if err != nil { + panic(err) + } + v := wal.MinimalEtcdVersion(ents) + st.w = w + return v +} diff --git a/server/storage/wal/testing/waltesting.go b/server/storage/wal/testing/waltesting.go new file mode 100644 index 000000000..7ba279df6 --- /dev/null +++ b/server/storage/wal/testing/waltesting.go @@ -0,0 +1,89 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testing + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.uber.org/zap/zaptest" +) + +func NewTmpWAL(t testing.TB, reqs []etcdserverpb.InternalRaftRequest) (*wal.WAL, string) { + t.Helper() + dir, err := ioutil.TempDir(t.TempDir(), "etcd_wal_test") + if err != nil { + panic(err) + } + tmpPath := filepath.Join(dir, "wal") + lg := zaptest.NewLogger(t) + w, err := wal.Create(lg, tmpPath, nil) + if err != nil { + t.Fatalf("Failed to create WAL: %v", err) + } + err = w.Close() + if err != nil { + t.Fatalf("Failed to close WAL: %v", err) + } + if len(reqs) != 0 { + w, err = wal.Open(lg, tmpPath, walpb.Snapshot{}) + if err != nil { + t.Fatalf("Failed to open WAL: %v", err) + } + _, state, _, err := w.ReadAll() + if err != nil { + t.Fatalf("Failed to read WAL: %v", err) + } + entries := []raftpb.Entry{} + for _, req := range reqs { + entries = append(entries, raftpb.Entry{ + Term: 1, + Index: 1, + Type: raftpb.EntryNormal, + Data: pbutil.MustMarshal(&req), + }) + } + err = w.Save(state, entries) + if err != nil { + t.Fatalf("Failed to save WAL: %v", err) + } + err = w.Close() + if err != nil { + t.Fatalf("Failed to close WAL: %v", err) + } + } + + w, err = wal.OpenForRead(lg, tmpPath, walpb.Snapshot{}) + if err != nil { + t.Fatalf("Failed to open WAL: %v", err) + } + return w, tmpPath +} + +func Reopen(t testing.TB, walPath string) *wal.WAL { + t.Helper() + lg := zaptest.NewLogger(t) + w, err := wal.OpenForRead(lg, walPath, walpb.Snapshot{}) + if err != nil { + t.Fatalf("Failed to open WAL: %v", err) + } + return w +} diff --git a/server/storage/wal/version.go b/server/storage/wal/version.go index 6e8dbfd7b..6a4903b98 100644 --- a/server/storage/wal/version.go +++ b/server/storage/wal/version.go @@ -28,14 +28,29 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" ) -// MinimalEtcdVersion returns minimal etcd able to interpret provided WAL log, -// determined by looking at entries since the last snapshot and returning the highest -// etcd version annotation from used messages, fields, enums and their values. -func (w *WAL) MinimalEtcdVersion() *semver.Version { +// ReadWALVersion reads remaining entries from opened WAL and returns struct +// that implements schema.WAL interface. +func ReadWALVersion(w *WAL) (*walVersion, error) { _, _, ents, err := w.ReadAll() if err != nil { - panic(err) + return nil, err } + return &walVersion{entries: ents}, nil +} + +type walVersion struct { + entries []raftpb.Entry +} + +// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log, +func (w *walVersion) MinimalEtcdVersion() *semver.Version { + return MinimalEtcdVersion(w.entries) +} + +// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log, +// determined by looking at entries since the last snapshot and returning the highest +// etcd version annotation from used messages, fields, enums and their values. +func MinimalEtcdVersion(ents []raftpb.Entry) *semver.Version { var maxVer *semver.Version for _, ent := range ents { maxVer = maxVersion(maxVer, etcdVersionFromEntry(ent)) @@ -51,6 +66,7 @@ func etcdVersionFromEntry(ent raftpb.Entry) *semver.Version { func etcdVersionFromData(entryType raftpb.EntryType, data []byte) *semver.Version { var msg protoreflect.Message + var ver *semver.Version switch entryType { case raftpb.EntryNormal: var raftReq etcdserverpb.InternalRaftRequest @@ -59,6 +75,12 @@ func etcdVersionFromData(entryType raftpb.EntryType, data []byte) *semver.Versio return nil } msg = proto.MessageReflect(&raftReq) + if raftReq.ClusterVersionSet != nil { + ver, err = semver.NewVersion(raftReq.ClusterVersionSet.Ver) + if err != nil { + panic(err) + } + } case raftpb.EntryConfChange: var confChange raftpb.ConfChange err := pbutil.Unmarshaler(&confChange).Unmarshal(data) @@ -76,7 +98,7 @@ func etcdVersionFromData(entryType raftpb.EntryType, data []byte) *semver.Versio default: panic("unhandled") } - return etcdVersionFromMessage(msg) + return maxVersion(etcdVersionFromMessage(msg), ver) } func etcdVersionFromMessage(m protoreflect.Message) *semver.Version { diff --git a/server/storage/wal/version_test.go b/server/storage/wal/version_test.go index 5aa83250c..52965662c 100644 --- a/server/storage/wal/version_test.go +++ b/server/storage/wal/version_test.go @@ -40,6 +40,9 @@ func TestEtcdVersionFromEntry(t *testing.T) { raftReq := etcdserverpb.InternalRaftRequest{Header: &etcdserverpb.RequestHeader{AuthRevision: 1}} normalRequestData := pbutil.MustMarshal(&raftReq) + clusterVersionV3_6Req := etcdserverpb.InternalRaftRequest{ClusterVersionSet: &membershippb.ClusterVersionSetRequest{Ver: "3.6.0"}} + clusterVersionV3_6Data := pbutil.MustMarshal(&clusterVersionV3_6Req) + confChange := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode} confChangeData := pbutil.MustMarshal(&confChange) @@ -61,6 +64,16 @@ func TestEtcdVersionFromEntry(t *testing.T) { }, expect: &V3_1, }, + { + name: "Setting cluster version implies version within", + input: raftpb.Entry{ + Term: 1, + Index: 2, + Type: raftpb.EntryNormal, + Data: clusterVersionV3_6Data, + }, + expect: &V3_6, + }, { name: "Using ConfigChange implies v3.4", input: raftpb.Entry{ diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index a24c9d406..187cfe397 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -234,6 +234,14 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { return w, nil } +func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) { + err := w.Close() + if err != nil { + lg.Panic("failed to close WAL during reopen", zap.Error(err)) + } + return Open(lg, w.dir, snap) +} + func (w *WAL) SetUnsafeNoFsync() { w.unsafeNoSync = true } diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go new file mode 100644 index 000000000..4769c4878 --- /dev/null +++ b/tests/e2e/cluster_downgrade_test.go @@ -0,0 +1,144 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/client/pkg/v3/fileutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestDowngradeUpgrade(t *testing.T) { + currentEtcdBinary := "" + lastReleaseBinary := e2e.BinDir + "/etcd-last-release" + if !fileutil.Exist(lastReleaseBinary) { + t.Skipf("%q does not exist", lastReleaseBinary) + } + currentVersion := semver.New(version.Version) + lastVersion := semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1} + currentVersionStr := fmt.Sprintf("%d.%d", currentVersion.Major, currentVersion.Minor) + lastVersionStr := fmt.Sprintf("%d.%d", lastVersion.Major, lastVersion.Minor) + + e2e.BeforeTest(t) + dataDirPath := t.TempDir() + + epc := startEtcd(t, currentEtcdBinary, dataDirPath) + validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + + downgradeEnable(t, epc, lastVersion) + validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: currentVersionStr}) + + stopEtcd(t, epc) + epc = startEtcd(t, lastReleaseBinary, dataDirPath) + validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: lastVersionStr}) + expectLog(t, epc, "the cluster has been downgraded") + + stopEtcd(t, epc) + epc = startEtcd(t, currentEtcdBinary, dataDirPath) + // TODO: Verify cluster version after upgrade when we fix cluster version set timeout + validateVersion(t, epc, version.Versions{Server: currentVersionStr}) +} + +func startEtcd(t *testing.T, execPath, dataDirPath string) *e2e.EtcdProcessCluster { + epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + ExecPath: execPath, + DataDirPath: dataDirPath, + ClusterSize: 1, + InitialToken: "new", + KeepDataDir: true, + // TODO: REMOVE snapshot override when snapshotting is automated after lowering storage versiont l + SnapshotCount: 5, + }) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + prefixArgs := []string{e2e.CtlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ",")} + t.Log("Write keys to ensure wal snapshot is created so cluster version set is snapshotted") + e2e.ExecuteWithTimeout(t, 20*time.Second, func() { + for i := 0; i < 10; i++ { + if err := e2e.SpawnWithExpect(append(prefixArgs, "put", fmt.Sprintf("%d", i), "value"), "OK"); err != nil { + t.Fatal(err) + } + } + }) + return epc +} + +func downgradeEnable(t *testing.T, epc *e2e.EtcdProcessCluster, ver semver.Version) { + t.Log("etcdctl downgrade...") + c, err := clientv3.New(clientv3.Config{ + Endpoints: epc.EndpointsV3(), + }) + if err != nil { + t.Fatal(err) + } + defer c.Close() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + // TODO: Fix request always timing out even thou it succeeds + c.Downgrade(ctx, 1, ver.String()) + cancel() + + expectLog(t, epc, "The server is ready to downgrade") +} + +func stopEtcd(t *testing.T, epc *e2e.EtcdProcessCluster) { + t.Log("Stopping the server...") + if err := epc.Procs[0].Stop(); err != nil { + t.Fatal(err) + } +} + +func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.Versions) { + t.Log("Validate version") + // Two separate calls to expect as it doesn't support multiple matches on the same line + e2e.ExecuteWithTimeout(t, 20*time.Second, func() { + if expect.Server != "" { + err := e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server) + if err != nil { + t.Fatal(err) + } + } + if expect.Cluster != "" { + err := e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster) + if err != nil { + t.Fatal(err) + } + } + }) +} + +func expectLog(t *testing.T, epc *e2e.EtcdProcessCluster, expectLog string) { + t.Helper() + e2e.ExecuteWithTimeout(t, 30*time.Second, func() { + _, err := epc.Procs[0].Logs().Expect(expectLog) + if err != nil { + t.Fatal(err) + } + }) +} diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go index 6007e241c..39211e7dc 100644 --- a/tests/e2e/ctl_v3_grpc_test.go +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -24,7 +24,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -105,7 +104,7 @@ func TestAuthority(t *testing.T) { t.Fatal(err) } - executeWithTimeout(t, 5*time.Second, func() { + e2e.ExecuteWithTimeout(t, 5*time.Second, func() { assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc) }) }) @@ -154,20 +153,6 @@ func firstMatch(t *testing.T, expectLine string, logs ...e2e.LogsExpect) string return <-match } -func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) { - donec := make(chan struct{}) - go func() { - defer close(donec) - f() - }() - - select { - case <-time.After(timeout): - testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) - case <-donec: - } -} - type etcdctlV3 struct { cfg *e2e.EtcdProcessClusterConfig endpoints []string diff --git a/tests/e2e/utl_migrate_test.go b/tests/e2e/utl_migrate_test.go index efc97f7f4..7623fc1b8 100644 --- a/tests/e2e/utl_migrate_test.go +++ b/tests/e2e/utl_migrate_test.go @@ -85,7 +85,7 @@ func TestEtctlutlMigrate(t *testing.T) { { name: "Downgrade v3.6 to v3.5 should fail until it's implemented", targetVersion: "3.5", - expectLogsSubString: "Error: cannot create migration plan: downgrades are not yet supported", + expectLogsSubString: "cannot downgrade storage, WAL contains newer entries", expectStorageVersion: &schema.V3_6, }, { diff --git a/tests/framework/e2e/util.go b/tests/framework/e2e/util.go index a3b903863..5d66ba4cb 100644 --- a/tests/framework/e2e/util.go +++ b/tests/framework/e2e/util.go @@ -117,3 +117,17 @@ func ToTLS(s string) string { func SkipInShortMode(t testing.TB) { testutil.SkipTestIfShortMode(t, "e2e tests are not running in --short mode") } + +func ExecuteWithTimeout(t *testing.T, timeout time.Duration, f func()) { + donec := make(chan struct{}) + go func() { + defer close(donec) + f() + }() + + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + } +} diff --git a/tests/integration/utl_wal_version_test.go b/tests/integration/utl_wal_version_test.go index 774b25385..e5318c664 100644 --- a/tests/integration/utl_wal_version_test.go +++ b/tests/integration/utl_wal_version_test.go @@ -64,6 +64,9 @@ func TestEtcdVersionFromWAL(t *testing.T) { panic(err) } defer w.Close() - ver := w.MinimalEtcdVersion() - assert.Equal(t, &semver.Version{Major: 3, Minor: 5}, ver) + walVersion, err := wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, &semver.Version{Major: 3, Minor: 6}, walVersion.MinimalEtcdVersion()) }