From 758fc0f8ad0a099d3c4c37fd766e75a9de9deb49 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 6 Oct 2021 11:36:24 +0200 Subject: [PATCH 1/6] server: Depend only on cluster version to detect downgrade Problem with old code was that during downgrade only members with downgrade target version were allowed to join. This is unrealistic as it doesn't handle any members to disconnect/rejoin. --- server/etcdserver/adapters.go | 8 +- server/etcdserver/api/membership/cluster.go | 23 +++--- server/etcdserver/version/downgrade.go | 24 +----- server/etcdserver/version/downgrade_test.go | 61 ++------------- server/etcdserver/version/monitor.go | 49 +++++++++--- server/etcdserver/version/monitor_test.go | 35 ++++++++- server/etcdserver/version/version_test.go | 86 +++++++++++++++++++-- 7 files changed, 174 insertions(+), 112 deletions(-) diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index e5e943d15..ea51df96e 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -18,7 +18,6 @@ import ( "context" "github.com/coreos/go-semver/semver" - "go.uber.org/zap" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/membershippb" @@ -88,15 +87,12 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { return &v } -func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) { +func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error { if s.tx == nil { s.Lock() defer s.Unlock() } - err := schema.UnsafeMigrate(s.lg, s.tx, target) - if err != nil { - s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err)) - } + return schema.UnsafeMigrate(s.lg, s.tx, target) } func (s *serverVersionAdapter) Lock() { diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 649e8a62f..795556b88 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -271,12 +271,15 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { if c.be != nil { c.downgradeInfo = c.be.DowngradeInfoFromBackend() } - d := &serverversion.DowngradeInfo{Enabled: false} - if c.downgradeInfo != nil { - d = &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} - } sv := semver.Must(semver.NewVersion(version.Version)) - serverversion.MustDetectDowngrade(c.lg, sv, c.version, d) + if c.downgradeInfo != nil && c.downgradeInfo.Enabled { + c.lg.Info( + "cluster is downgrading to target version", + zap.String("target-cluster-version", c.downgradeInfo.TargetVersion), + zap.String("current-server-version", sv.String()), + ) + } + serverversion.MustDetectDowngrade(c.lg, sv, c.version) onSet(c.lg, c.version) for _, m := range c.members { @@ -548,7 +551,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s oldVer := c.version c.version = ver sv := semver.Must(semver.NewVersion(version.Version)) - serverversion.MustDetectDowngrade(c.lg, sv, c.version, c.downgradeInfo) + serverversion.MustDetectDowngrade(c.lg, sv, c.version) if c.v2store != nil { mustSaveClusterVersionToStore(c.lg, c.v2store, ver) } @@ -759,14 +762,6 @@ func (c *RaftCluster) SetDowngradeInfo(d *serverversion.DowngradeInfo, shouldApp } c.downgradeInfo = d - - if d.Enabled { - c.lg.Info( - "The server is ready to downgrade", - zap.String("target-version", d.TargetVersion), - zap.String("server-version", version.Version), - ) - } } // IsMemberExist returns if the member with the given id exists in cluster. diff --git a/server/etcdserver/version/downgrade.go b/server/etcdserver/version/downgrade.go index d70fd63ac..60cab4931 100644 --- a/server/etcdserver/version/downgrade.go +++ b/server/etcdserver/version/downgrade.go @@ -37,31 +37,11 @@ func isValidDowngrade(verFrom *semver.Version, verTo *semver.Version) bool { return verTo.Equal(*allowedDowngradeVersion(verFrom)) } -// MustDetectDowngrade will detect unexpected downgrade when the local server is recovered. -func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) { +// MustDetectDowngrade will detect local server joining cluster that doesn't support it's version. +func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) { // only keep major.minor version for comparison against cluster version sv = &semver.Version{Major: sv.Major, Minor: sv.Minor} - // if the cluster enables downgrade, check local version against downgrade target version. - if d != nil && d.Enabled && d.TargetVersion != "" { - if sv.Equal(*d.GetTargetVersion()) { - if cv != nil { - lg.Info( - "cluster is downgrading to target version", - zap.String("target-cluster-version", d.TargetVersion), - zap.String("determined-cluster-version", version.Cluster(cv.String())), - zap.String("current-server-version", sv.String()), - ) - } - return - } - lg.Panic( - "invalid downgrade; server version is not allowed to join when downgrade is enabled", - zap.String("current-server-version", sv.String()), - zap.String("target-cluster-version", d.TargetVersion), - ) - } - // if the cluster disables downgrade, check local version against determined cluster version. // the validation passes when local version is not less than cluster version if cv != nil && sv.LessThan(*cv) { diff --git a/server/etcdserver/version/downgrade_test.go b/server/etcdserver/version/downgrade_test.go index 97c8d2125..a16a1bafc 100644 --- a/server/etcdserver/version/downgrade_test.go +++ b/server/etcdserver/version/downgrade_test.go @@ -29,92 +29,47 @@ func TestMustDetectDowngrade(t *testing.T) { lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} oneMinorHigher := &semver.Version{Major: lv.Major, Minor: lv.Minor + 1} oneMinorLower := &semver.Version{Major: lv.Major, Minor: lv.Minor - 1} - downgradeEnabledHigherVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorHigher.String()} - downgradeEnabledEqualVersion := &DowngradeInfo{Enabled: true, TargetVersion: lv.String()} - downgradeEnabledLowerVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorLower.String()} - downgradeDisabled := &DowngradeInfo{Enabled: false} tests := []struct { name string clusterVersion *semver.Version - downgrade *DowngradeInfo success bool message string }{ { - "Succeeded when downgrade is disabled and cluster version is nil", + "Succeeded when cluster version is nil", nil, - downgradeDisabled, true, "", }, { - "Succeeded when downgrade is disabled and cluster version is one minor lower", + "Succeeded when cluster version is one minor lower", oneMinorLower, - downgradeDisabled, true, "", }, { - "Succeeded when downgrade is disabled and cluster version is server version", + "Succeeded when cluster version is server version", lv, - downgradeDisabled, true, "", }, { - "Failed when downgrade is disabled and server version is lower than determined cluster version ", + "Failed when server version is lower than determined cluster version ", oneMinorHigher, - downgradeDisabled, false, "invalid downgrade; server version is lower than determined cluster version", }, - { - "Succeeded when downgrade is enabled and cluster version is nil", - nil, - downgradeEnabledEqualVersion, - true, - "", - }, - { - "Failed when downgrade is enabled and server version is target version", - lv, - downgradeEnabledEqualVersion, - true, - "cluster is downgrading to target version", - }, - { - "Succeeded when downgrade to lower version and server version is cluster version ", - lv, - downgradeEnabledLowerVersion, - false, - "invalid downgrade; server version is not allowed to join when downgrade is enabled", - }, - { - "Failed when downgrade is enabled and local version is out of range and cluster version is nil", - nil, - downgradeEnabledHigherVersion, - false, - "invalid downgrade; server version is not allowed to join when downgrade is enabled", - }, - - { - "Failed when downgrade is enabled and local version is out of range", - lv, - downgradeEnabledHigherVersion, - false, - "invalid downgrade; server version is not allowed to join when downgrade is enabled", - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { lg := zaptest.NewLogger(t) sv := semver.Must(semver.NewVersion(version.Version)) - err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion, tt.downgrade) + err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion) if tt.success != (err == nil) { - t.Errorf("Unexpected status, got %q, wanted: %v", err, tt.success) + t.Errorf("Unexpected success, got: %v, wanted: %v", err == nil, tt.success) // TODO test err } if err != nil && tt.message != fmt.Sprintf("%s", err) { @@ -124,11 +79,11 @@ func TestMustDetectDowngrade(t *testing.T) { } } -func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) (err interface{}) { +func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) (err interface{}) { defer func() { err = recover() }() - MustDetectDowngrade(lg, sv, cv, d) + MustDetectDowngrade(lg, sv, cv) return err } diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 1a8e73e47..39cbbb464 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -39,7 +39,7 @@ type Server interface { DowngradeCancel(ctx context.Context) error GetStorageVersion() *semver.Version - UpdateStorageVersion(semver.Version) + UpdateStorageVersion(semver.Version) error Lock() Unlock() @@ -61,18 +61,35 @@ func (m *Monitor) UpdateClusterVersionIfNeeded() { } } -// decideClusterVersion decides the cluster version based on the members versions if all members agree on a higher one. +// decideClusterVersion decides whether to change cluster version and its next value. +// New cluster version is based on the members versions server and whether cluster is downgrading. +// Returns nil if cluster version should be left unchanged. func (m *Monitor) decideClusterVersion() *semver.Version { clusterVersion := m.s.GetClusterVersion() - membersMinimalVersion := m.membersMinimalVersion() + minimalServerVersion := m.membersMinimalServerVersion() if clusterVersion == nil { - if membersMinimalVersion != nil { - return membersMinimalVersion + if minimalServerVersion != nil { + return minimalServerVersion } return semver.New(version.MinClusterVersion) } - if membersMinimalVersion != nil && clusterVersion.LessThan(*membersMinimalVersion) && IsValidVersionChange(clusterVersion, membersMinimalVersion) { - return membersMinimalVersion + if minimalServerVersion == nil { + return nil + } + downgrade := m.s.GetDowngradeInfo() + if downgrade != nil && downgrade.Enabled { + if IsValidVersionChange(clusterVersion, downgrade.GetTargetVersion()) && IsValidVersionChange(minimalServerVersion, downgrade.GetTargetVersion()) { + return downgrade.GetTargetVersion() + } + m.lg.Error("Cannot downgrade cluster version, version change is not valid", + zap.String("downgrade-version", downgrade.TargetVersion), + zap.String("cluster-version", clusterVersion.String()), + zap.String("minimal-server-version", minimalServerVersion.String()), + ) + return nil + } + if clusterVersion.LessThan(*minimalServerVersion) && IsValidVersionChange(clusterVersion, minimalServerVersion) { + return minimalServerVersion } return nil } @@ -91,7 +108,19 @@ func (m *Monitor) UpdateStorageVersionIfNeeded() { if sv != nil { m.lg.Info("storage version differs from storage version.", zap.String("cluster-version", cv.String()), zap.String("storage-version", sv.String())) } - m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor}) + err := m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor}) + if err != nil { + m.lg.Error("failed update storage version", zap.String("cluster-version", cv.String()), zap.Error(err)) + return + } + d := m.s.GetDowngradeInfo() + if d != nil && d.Enabled { + m.lg.Info( + "The server is ready to downgrade", + zap.String("target-version", d.TargetVersion), + zap.String("server-version", version.Version), + ) + } } } @@ -112,11 +141,11 @@ func (m *Monitor) CancelDowngradeIfNeeded() { } } -// membersMinimalVersion returns the min server version in the map, or nil if the min +// membersMinimalServerVersion returns the min server version in the map, or nil if the min // version in unknown. // It prints out log if there is a member with a higher version than the // local version. -func (m *Monitor) membersMinimalVersion() *semver.Version { +func (m *Monitor) membersMinimalServerVersion() *semver.Version { vers := m.s.GetMembersVersions() var minV *semver.Version lv := semver.Must(semver.NewVersion(version.Version)) diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index ffc908e7b..4f94db64a 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -50,7 +50,7 @@ func TestMemberMinimalVersion(t *testing.T) { monitor := NewMonitor(zaptest.NewLogger(t), &storageMock{ memberVersions: tt.memberVersions, }) - minV := monitor.membersMinimalVersion() + minV := monitor.membersMinimalServerVersion() if !reflect.DeepEqual(minV, tt.wantVersion) { t.Errorf("#%d: ver = %+v, want %+v", i, minV, tt.wantVersion) } @@ -204,6 +204,36 @@ func TestUpdateClusterVersionIfNeeded(t *testing.T) { clusterVersion: &V3_5, expectClusterVersion: &V3_6, }, + { + name: "Should downgrade cluster version if downgrade is set to allow older members to join", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.6.0", Server: "3.6.0"}, + "b": {Cluster: "3.6.0", Server: "3.6.0"}, + }, + clusterVersion: &V3_6, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectClusterVersion: &V3_5, + }, + { + name: "Should maintain downgrade target version to allow older members to join", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.6.0"}, + "b": {Cluster: "3.5.0", Server: "3.6.0"}, + }, + clusterVersion: &V3_5, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectClusterVersion: &V3_5, + }, + { + name: "Don't downgrade below supported range", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.6.0"}, + "b": {Cluster: "3.5.0", Server: "3.6.0"}, + }, + clusterVersion: &V3_5, + downgrade: &DowngradeInfo{TargetVersion: "3.4.0", Enabled: true}, + expectClusterVersion: &V3_5, + }, } for _, tt := range tests { @@ -369,8 +399,9 @@ func (s *storageMock) GetStorageVersion() *semver.Version { return s.storageVersion } -func (s *storageMock) UpdateStorageVersion(v semver.Version) { +func (s *storageMock) UpdateStorageVersion(v semver.Version) error { s.storageVersion = &v + return nil } func (s *storageMock) Lock() { diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go index 6beaec0b3..70578ad6b 100644 --- a/server/etcdserver/version/version_test.go +++ b/server/etcdserver/version/version_test.go @@ -62,6 +62,68 @@ func TestUpgradeThreeNodes(t *testing.T) { assert.Equal(t, newCluster(lg, 3, V3_7), c) } +func TestDowngradeSingleNode(t *testing.T) { + lg := zaptest.NewLogger(t) + c := newCluster(lg, 1, V3_6) + c.StepMonitors() + assert.Equal(t, newCluster(lg, 1, V3_6), c) + + assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5)) + c.StepMonitors() + assert.Equal(t, V3_5, c.clusterVersion) + + c.ReplaceMemberBinary(0, V3_5) + c.StepMonitors() + + assert.Equal(t, newCluster(lg, 1, V3_5), c) +} + +func TestDowngradeThreeNode(t *testing.T) { + lg := zaptest.NewLogger(t) + c := newCluster(lg, 3, V3_6) + c.StepMonitors() + assert.Equal(t, newCluster(lg, 3, V3_6), c) + + assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5)) + c.StepMonitors() + assert.Equal(t, V3_5, c.clusterVersion) + + c.ReplaceMemberBinary(0, V3_5) + c.StepMonitors() + c.ReplaceMemberBinary(1, V3_5) + c.StepMonitors() + c.ReplaceMemberBinary(2, V3_5) + c.StepMonitors() + + assert.Equal(t, newCluster(lg, 3, V3_5), c) +} + +func TestNewerMemberCanReconnectDuringDowngrade(t *testing.T) { + lg := zaptest.NewLogger(t) + c := newCluster(lg, 3, V3_6) + c.StepMonitors() + assert.Equal(t, newCluster(lg, 3, V3_6), c) + + assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5)) + c.StepMonitors() + assert.Equal(t, V3_5, c.clusterVersion) + + c.ReplaceMemberBinary(0, V3_5) + c.StepMonitors() + + c.MemberCrashes(2) + c.StepMonitors() + c.MemberReconnects(2) + c.StepMonitors() + + c.ReplaceMemberBinary(1, V3_5) + c.StepMonitors() + c.ReplaceMemberBinary(2, V3_5) + c.StepMonitors() + + assert.Equal(t, newCluster(lg, 3, V3_5), c) +} + func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMock { cluster := &clusterMock{ lg: lg, @@ -71,6 +133,7 @@ func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMoc majorMinVer := semver.Version{Major: ver.Major, Minor: ver.Minor} for i := 0; i < memberCount; i++ { m := &memberMock{ + isRunning: true, cluster: cluster, serverVersion: ver, storageVersion: majorMinVer, @@ -113,22 +176,34 @@ func (c *clusterMock) Version() *Manager { func (c *clusterMock) MembersVersions() map[string]*version.Versions { result := map[string]*version.Versions{} for i, m := range c.members { - result[fmt.Sprintf("%d", i)] = &version.Versions{ - Server: m.serverVersion.String(), - Cluster: c.clusterVersion.String(), + if m.isRunning { + result[fmt.Sprintf("%d", i)] = &version.Versions{ + Server: m.serverVersion.String(), + Cluster: c.clusterVersion.String(), + } } } return result } func (c *clusterMock) ReplaceMemberBinary(mid int, newServerVersion semver.Version) { - MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion, c.downgradeInfo) + MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion) c.members[mid].serverVersion = newServerVersion } +func (c *clusterMock) MemberCrashes(mid int) { + c.members[mid].isRunning = false +} + +func (c *clusterMock) MemberReconnects(mid int) { + MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion) + c.members[mid].isRunning = true +} + type memberMock struct { cluster *clusterMock + isRunning bool isLeader bool serverVersion semver.Version storageVersion semver.Version @@ -174,8 +249,9 @@ func (m *memberMock) GetStorageVersion() *semver.Version { return &m.storageVersion } -func (m *memberMock) UpdateStorageVersion(v semver.Version) { +func (m *memberMock) UpdateStorageVersion(v semver.Version) error { m.storageVersion = v + return nil } func (m *memberMock) TriggerSnapshot() { From f5d71fa38966406b299453b0284e10c35af3305b Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 6 Oct 2021 12:28:30 +0200 Subject: [PATCH 2/6] server: Detect when WAL includes unapplied cluster version set to higher version This is because etcd v3.5 will panic when it encounters ClusterVersionSet entry with version >3.5.0. For downgrades to v3.5 to work we need to make sure this entry is snapshotted. --- server/etcdserver/adapters.go | 3 +-- server/storage/wal/version.go | 9 ++++++++- server/storage/wal/version_test.go | 13 +++++++++++++ tests/integration/utl_wal_version_test.go | 2 +- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index ea51df96e..5d9580735 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -43,8 +43,7 @@ func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter { var _ serverversion.Server = (*serverVersionAdapter)(nil) func (s *serverVersionAdapter) UpdateClusterVersion(version string) { - // TODO switch to updateClusterVersionV3 in 3.6 - s.GoAttach(func() { s.updateClusterVersionV2(version) }) + s.GoAttach(func() { s.updateClusterVersionV3(version) }) } func (s *serverVersionAdapter) LinearizableReadNotify(ctx context.Context) error { diff --git a/server/storage/wal/version.go b/server/storage/wal/version.go index 6e8dbfd7b..b7b7bc5d7 100644 --- a/server/storage/wal/version.go +++ b/server/storage/wal/version.go @@ -51,6 +51,7 @@ func etcdVersionFromEntry(ent raftpb.Entry) *semver.Version { func etcdVersionFromData(entryType raftpb.EntryType, data []byte) *semver.Version { var msg protoreflect.Message + var ver *semver.Version switch entryType { case raftpb.EntryNormal: var raftReq etcdserverpb.InternalRaftRequest @@ -59,6 +60,12 @@ func etcdVersionFromData(entryType raftpb.EntryType, data []byte) *semver.Versio return nil } msg = proto.MessageReflect(&raftReq) + if raftReq.ClusterVersionSet != nil { + ver, err = semver.NewVersion(raftReq.ClusterVersionSet.Ver) + if err != nil { + panic(err) + } + } case raftpb.EntryConfChange: var confChange raftpb.ConfChange err := pbutil.Unmarshaler(&confChange).Unmarshal(data) @@ -76,7 +83,7 @@ func etcdVersionFromData(entryType raftpb.EntryType, data []byte) *semver.Versio default: panic("unhandled") } - return etcdVersionFromMessage(msg) + return maxVersion(etcdVersionFromMessage(msg), ver) } func etcdVersionFromMessage(m protoreflect.Message) *semver.Version { diff --git a/server/storage/wal/version_test.go b/server/storage/wal/version_test.go index 5aa83250c..52965662c 100644 --- a/server/storage/wal/version_test.go +++ b/server/storage/wal/version_test.go @@ -40,6 +40,9 @@ func TestEtcdVersionFromEntry(t *testing.T) { raftReq := etcdserverpb.InternalRaftRequest{Header: &etcdserverpb.RequestHeader{AuthRevision: 1}} normalRequestData := pbutil.MustMarshal(&raftReq) + clusterVersionV3_6Req := etcdserverpb.InternalRaftRequest{ClusterVersionSet: &membershippb.ClusterVersionSetRequest{Ver: "3.6.0"}} + clusterVersionV3_6Data := pbutil.MustMarshal(&clusterVersionV3_6Req) + confChange := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode} confChangeData := pbutil.MustMarshal(&confChange) @@ -61,6 +64,16 @@ func TestEtcdVersionFromEntry(t *testing.T) { }, expect: &V3_1, }, + { + name: "Setting cluster version implies version within", + input: raftpb.Entry{ + Term: 1, + Index: 2, + Type: raftpb.EntryNormal, + Data: clusterVersionV3_6Data, + }, + expect: &V3_6, + }, { name: "Using ConfigChange implies v3.4", input: raftpb.Entry{ diff --git a/tests/integration/utl_wal_version_test.go b/tests/integration/utl_wal_version_test.go index 774b25385..3ffbd46ef 100644 --- a/tests/integration/utl_wal_version_test.go +++ b/tests/integration/utl_wal_version_test.go @@ -65,5 +65,5 @@ func TestEtcdVersionFromWAL(t *testing.T) { } defer w.Close() ver := w.MinimalEtcdVersion() - assert.Equal(t, &semver.Version{Major: 3, Minor: 5}, ver) + assert.Equal(t, &semver.Version{Major: 3, Minor: 6}, ver) } From 335dc98c8dbb5ebfdc5cf2ab4348de8acc862c63 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 6 Oct 2021 15:10:30 +0200 Subject: [PATCH 3/6] server: Use server version to decide if to downgrade has finished --- server/etcdserver/version/monitor.go | 4 +++- server/etcdserver/version/monitor_test.go | 20 +++++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 39cbbb464..05da0b228 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -185,11 +185,12 @@ func (m *Monitor) membersMinimalServerVersion() *semver.Version { // It can be used to decide the whether the cluster finishes downgrading to target version. func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool { vers := m.s.GetMembersVersions() + targetVersion = &semver.Version{Major: targetVersion.Major, Minor: targetVersion.Minor} for mid, ver := range vers { if ver == nil { return false } - v, err := semver.NewVersion(ver.Cluster) + v, err := semver.NewVersion(ver.Server) if err != nil { m.lg.Warn( "failed to parse server version of remote member", @@ -199,6 +200,7 @@ func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool { ) return false } + v = &semver.Version{Major: v.Major, Minor: v.Minor} if !targetVersion.Equal(*v) { m.lg.Warn("remotes server has mismatching etcd version", zap.String("remote-member-id", mid), diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index 4f94db64a..a6b31cec8 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -127,7 +127,7 @@ func TestVersionMatchTarget(t *testing.T) { "When cannot parse peer version", &semver.Version{Major: 3, Minor: 4}, map[string]*version.Versions{ - "mem1": {Server: "3.4.1", Cluster: "3.4"}, + "mem1": {Server: "3.4", Cluster: "3.4.0"}, "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, }, @@ -277,6 +277,24 @@ func TestCancelDowngradeIfNeeded(t *testing.T) { "b": {Cluster: "3.6.0", Server: "3.6.2"}, }, }, + { + name: "Continue downgrade if just started", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.6.1"}, + "b": {Cluster: "3.5.0", Server: "3.6.2"}, + }, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectDowngrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + }, + { + name: "Continue downgrade if there is at least one member with not matching", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.5.1"}, + "b": {Cluster: "3.5.0", Server: "3.6.2"}, + }, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectDowngrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + }, { name: "Cancel downgrade if all members have downgraded", memberVersions: map[string]*version.Versions{ From 431adc5878d9f8d6941c7ca96c725b97af40e0b3 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 6 Oct 2021 13:55:56 +0200 Subject: [PATCH 4/6] server: Implement storage downgrades By validating if WAL doesn't include any incompatible entries we can implement storage downgrades. --- etcdutl/etcdutl/migrate_command.go | 16 +++- server/etcdserver/adapters.go | 2 +- server/mock/mockstorage/storage_recorder.go | 4 +- server/storage/schema/migration.go | 14 +--- server/storage/schema/migration_test.go | 8 +- server/storage/schema/schema.go | 34 +++++--- server/storage/schema/schema_test.go | 73 +++++++++++------ server/storage/storage.go | 47 ++++++++++- server/storage/wal/testing/waltesting.go | 89 +++++++++++++++++++++ server/storage/wal/version.go | 25 ++++-- server/storage/wal/wal.go | 8 ++ tests/e2e/utl_migrate_test.go | 2 +- tests/integration/utl_wal_version_test.go | 7 +- 13 files changed, 264 insertions(+), 65 deletions(-) create mode 100644 server/storage/wal/testing/waltesting.go diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 9c429e716..e9bac4f37 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -26,6 +26,8 @@ import ( "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/server/v3/storage/schema" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) // NewMigrateCommand prints out the version of etcd. @@ -90,12 +92,24 @@ func (o *migrateOptions) Config() (*migrateConfig, error) { dbPath := datadir.ToBackendFileName(o.dataDir) c.be = backend.NewDefaultBackend(dbPath) + walPath := datadir.ToWalDir(o.dataDir) + w, err := wal.OpenForRead(GetLogger(), walPath, walpb.Snapshot{}) + if err != nil { + return nil, fmt.Errorf(`failed to open wal: %v`, err) + } + defer w.Close() + c.walVersion, err = wal.ReadWALVersion(w) + if err != nil { + return nil, fmt.Errorf(`failed to read wal: %v`, err) + } + return c, nil } type migrateConfig struct { be backend.Backend targetVersion *semver.Version + walVersion schema.WALVersion force bool } @@ -112,7 +126,7 @@ func migrateCommandFunc(c *migrateConfig) error { lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(¤t))) return nil } - err = schema.Migrate(lg, tx, *c.targetVersion) + err = schema.Migrate(lg, tx, c.walVersion, *c.targetVersion) if err != nil { if !c.force { return err diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 5d9580735..1def9b69e 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -91,7 +91,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error s.Lock() defer s.Unlock() } - return schema.UnsafeMigrate(s.lg, s.tx, target) + return schema.UnsafeMigrate(s.lg, s.tx, s.r.storage, target) } func (s *serverVersionAdapter) Lock() { diff --git a/server/mock/mockstorage/storage_recorder.go b/server/mock/mockstorage/storage_recorder.go index db989cd2c..73cad169f 100644 --- a/server/mock/mockstorage/storage_recorder.go +++ b/server/mock/mockstorage/storage_recorder.go @@ -15,6 +15,7 @@ package mockstorage import ( + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" @@ -57,4 +58,5 @@ func (p *storageRecorder) Sync() error { return nil } -func (p *storageRecorder) Close() error { return nil } +func (p *storageRecorder) Close() error { return nil } +func (p *storageRecorder) MinimalEtcdVersion() *semver.Version { return nil } diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index bac67bec5..c8c4a701b 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -24,19 +24,7 @@ import ( type migrationPlan []migrationStep -func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (p migrationPlan, err error) { - // TODO(serathius): Implement downgrades - if target.LessThan(current) { - lg.Error("Target version is lower than the current version, downgrades are not yet supported", - zap.String("storage-version", current.String()), - zap.String("target-storage-version", target.String()), - ) - return nil, fmt.Errorf("downgrades are not yet supported") - } - return buildPlan(lg, current, target) -} - -func buildPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) { +func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) { current = trimToMinor(current) target = trimToMinor(target) if current.Major != target.Major { diff --git a/server/storage/schema/migration_test.go b/server/storage/schema/migration_test.go index d81a6208d..95e4d9739 100644 --- a/server/storage/schema/migration_test.go +++ b/server/storage/schema/migration_test.go @@ -46,11 +46,9 @@ func TestNewPlan(t *testing.T) { target: V3_6, }, { - name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported", - current: V3_6, - target: V3_5, - expectError: true, - expectErrorMsg: "downgrades are not yet supported", + name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported", + current: V3_6, + target: V3_5, }, { name: "Upgrade v3.6 to v3.7 should fail as v3.7 is unknown", diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 198b90f3e..f3b29c011 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -52,15 +52,21 @@ func localBinaryVersion() semver.Version { return semver.Version{Major: v.Major, Minor: v.Minor} } -// Migrate updates storage schema to provided target version. -func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { - tx.Lock() - defer tx.Unlock() - return UnsafeMigrate(lg, tx, target) +type WALVersion interface { + // MinimalEtcdVersion returns minimal etcd version able to interpret WAL log. + MinimalEtcdVersion() *semver.Version } -// UnsafeMigrate is non-threadsafe version of Migrate. -func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { +// Migrate updates storage schema to provided target version. +// Downgrading requires that provided WAL doesn't contain unsupported entries. +func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { + tx.Lock() + defer tx.Unlock() + return UnsafeMigrate(lg, tx, w, target) +} + +// UnsafeMigrate is non thread-safe version of Migrate. +func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { return fmt.Errorf("cannot detect storage schema version: %w", err) @@ -69,6 +75,12 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) er if err != nil { return fmt.Errorf("cannot create migration plan: %w", err) } + if target.LessThan(current) { + minVersion := w.MinimalEtcdVersion() + if minVersion != nil && target.LessThan(*minVersion) { + return fmt.Errorf("cannot downgrade storage, WAL contains newer entries") + } + } return plan.unsafeExecute(lg, tx) } @@ -101,12 +113,16 @@ func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Vers func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, error) { // changes should be taken from higher version + var higherV = v if isUpgrade { - v = semver.Version{Major: v.Major, Minor: v.Minor + 1} + higherV = semver.Version{Major: v.Major, Minor: v.Minor + 1} } - actions, found := schemaChanges[v] + actions, found := schemaChanges[higherV] if !found { + if isUpgrade { + return nil, fmt.Errorf("version %q is not supported", higherV.String()) + } return nil, fmt.Errorf("version %q is not supported", v.String()) } return actions, nil diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index 823400760..f0fb943b4 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -15,15 +15,18 @@ package schema import ( - "fmt" "testing" "time" "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" + "go.etcd.io/etcd/server/v3/storage/wal" + waltesting "go.etcd.io/etcd/server/v3/storage/wal/testing" "go.uber.org/zap" ) @@ -75,7 +78,7 @@ func TestValidate(t *testing.T) { name: `V3.7 schema is unknown and should return error`, version: V3_7, expectError: true, - expectErrorMsg: "downgrades are not yet supported", + expectErrorMsg: `version "3.7.0" is not supported`, }, } for _, tc := range tcs { @@ -103,6 +106,7 @@ func TestMigrate(t *testing.T) { // Overrides which keys should be set (default based on version) overrideKeys func(tx backend.BatchTx) targetVersion semver.Version + walEntries []etcdserverpb.InternalRaftRequest expectVersion *semver.Version expectError bool @@ -168,33 +172,52 @@ func TestMigrate(t *testing.T) { targetVersion: V3_6, expectVersion: &V3_7, expectError: true, - expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + expectErrorMsg: `cannot create migration plan: version "3.7.0" is not supported`, }, { - name: "Downgrading v3.6 to v3.5 is not supported", - version: V3_6, - targetVersion: V3_5, + name: "Downgrading v3.6 to v3.5 works as there are no v3.6 wal entries", + version: V3_6, + targetVersion: V3_5, + walEntries: []etcdserverpb.InternalRaftRequest{ + {Range: &etcdserverpb.RangeRequest{Key: []byte("\x00"), RangeEnd: []byte("\xff")}}, + }, + expectVersion: nil, + }, + { + name: "Downgrading v3.6 to v3.5 fails if there are newer WAL entries", + version: V3_6, + targetVersion: V3_5, + walEntries: []etcdserverpb.InternalRaftRequest{ + {ClusterVersionSet: &membershippb.ClusterVersionSetRequest{Ver: "3.6.0"}}, + }, expectVersion: &V3_6, expectError: true, - expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + expectErrorMsg: "cannot downgrade storage, WAL contains newer entries", }, { - name: "Downgrading v3.5 to v3.4 is not supported", + name: "Downgrading v3.5 to v3.4 is not supported as schema was introduced in v3.6", version: V3_5, targetVersion: V3_4, expectVersion: nil, expectError: true, - expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + expectErrorMsg: `cannot create migration plan: version "3.5.0" is not supported`, }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { lg := zap.NewNop() dataPath := setupBackendData(t, tc.version, tc.overrideKeys) + w, _ := waltesting.NewTmpWAL(t, tc.walEntries) + defer w.Close() + walVersion, err := wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } b := backend.NewDefaultBackend(dataPath) defer b.Close() - err := Migrate(lg, b.BatchTx(), tc.targetVersion) + + err = Migrate(lg, b.BatchTx(), walVersion, tc.targetVersion) if (err != nil) != tc.expectError { t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError) } @@ -241,17 +264,29 @@ func TestMigrateIsReversible(t *testing.T) { tx.Lock() defer tx.Unlock() assertBucketState(t, tx, Meta, tc.state) + w, walPath := waltesting.NewTmpWAL(t, nil) + walVersion, err := wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } // Upgrade to current version ver := localBinaryVersion() - err := testUnsafeMigrate(lg, tx, ver) + err = UnsafeMigrate(lg, tx, walVersion, ver) if err != nil { t.Errorf("Migrate(lg, tx, %q) returned error %+v", ver, err) } assert.Equal(t, &ver, UnsafeReadStorageVersion(tx)) // Downgrade back to initial version - err = testUnsafeMigrate(lg, tx, tc.initialVersion) + w.Close() + w = waltesting.Reopen(t, walPath) + defer w.Close() + walVersion, err = wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } + err = UnsafeMigrate(lg, tx, walVersion, tc.initialVersion) if err != nil { t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err) } @@ -262,20 +297,6 @@ func TestMigrateIsReversible(t *testing.T) { } } -// Does the same as UnsafeMigrate but skips version checks -// TODO(serathius): Use UnsafeMigrate when downgrades are implemented -func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { - current, err := UnsafeDetectSchemaVersion(lg, tx) - if err != nil { - return fmt.Errorf("cannot determine storage version: %w", err) - } - plan, err := buildPlan(lg, current, target) - if err != nil { - return fmt.Errorf("cannot create migration plan: %w", err) - } - return plan.unsafeExecute(lg, tx) -} - func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx backend.BatchTx)) string { t.Helper() be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) diff --git a/server/storage/storage.go b/server/storage/storage.go index 047d1bb02..9207e1e4d 100644 --- a/server/storage/storage.go +++ b/server/storage/storage.go @@ -15,6 +15,9 @@ package storage import ( + "sync" + + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/storage/wal" @@ -34,12 +37,17 @@ type Storage interface { Release(snap raftpb.Snapshot) error // Sync WAL Sync() error + // MinimalEtcdVersion returns minimal etcd storage able to interpret WAL log. + MinimalEtcdVersion() *semver.Version } type storage struct { lg *zap.Logger s *snap.Snapshotter - w *wal.WAL + + // Mutex protected variables + mux sync.RWMutex + w *wal.WAL } func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage { @@ -48,6 +56,8 @@ func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage { // SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry. func (st *storage) SaveSnap(snap raftpb.Snapshot) error { + st.mux.RLock() + defer st.mux.RUnlock() walsnap := walpb.Snapshot{ Index: snap.Metadata.Index, Term: snap.Metadata.Term, @@ -69,6 +79,8 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { // - releases the locks to the wal files that are older than the provided wal for the given snap. // - deletes any .snap.db files that are older than the given snap. func (st *storage) Release(snap raftpb.Snapshot) error { + st.mux.RLock() + defer st.mux.RUnlock() if err := st.w.ReleaseLockTo(snap.Metadata.Index); err != nil { return err } @@ -76,13 +88,46 @@ func (st *storage) Release(snap raftpb.Snapshot) error { } func (st *storage) Save(s raftpb.HardState, ents []raftpb.Entry) error { + st.mux.RLock() + defer st.mux.RUnlock() return st.w.Save(s, ents) } func (st *storage) Close() error { + st.mux.Lock() + defer st.mux.Unlock() return st.w.Close() } func (st *storage) Sync() error { + st.mux.RLock() + defer st.mux.RUnlock() return st.w.Sync() } + +func (st *storage) MinimalEtcdVersion() *semver.Version { + st.mux.Lock() + defer st.mux.Unlock() + walsnap := walpb.Snapshot{} + + sn, err := st.s.Load() + if err != nil && err != snap.ErrNoSnapshot { + panic(err) + } + if sn != nil { + walsnap.Index = sn.Metadata.Index + walsnap.Term = sn.Metadata.Term + walsnap.ConfState = &sn.Metadata.ConfState + } + w, err := st.w.Reopen(st.lg, walsnap) + if err != nil { + panic(err) + } + _, _, ents, err := w.ReadAll() + if err != nil { + panic(err) + } + v := wal.MinimalEtcdVersion(ents) + st.w = w + return v +} diff --git a/server/storage/wal/testing/waltesting.go b/server/storage/wal/testing/waltesting.go new file mode 100644 index 000000000..7ba279df6 --- /dev/null +++ b/server/storage/wal/testing/waltesting.go @@ -0,0 +1,89 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testing + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.uber.org/zap/zaptest" +) + +func NewTmpWAL(t testing.TB, reqs []etcdserverpb.InternalRaftRequest) (*wal.WAL, string) { + t.Helper() + dir, err := ioutil.TempDir(t.TempDir(), "etcd_wal_test") + if err != nil { + panic(err) + } + tmpPath := filepath.Join(dir, "wal") + lg := zaptest.NewLogger(t) + w, err := wal.Create(lg, tmpPath, nil) + if err != nil { + t.Fatalf("Failed to create WAL: %v", err) + } + err = w.Close() + if err != nil { + t.Fatalf("Failed to close WAL: %v", err) + } + if len(reqs) != 0 { + w, err = wal.Open(lg, tmpPath, walpb.Snapshot{}) + if err != nil { + t.Fatalf("Failed to open WAL: %v", err) + } + _, state, _, err := w.ReadAll() + if err != nil { + t.Fatalf("Failed to read WAL: %v", err) + } + entries := []raftpb.Entry{} + for _, req := range reqs { + entries = append(entries, raftpb.Entry{ + Term: 1, + Index: 1, + Type: raftpb.EntryNormal, + Data: pbutil.MustMarshal(&req), + }) + } + err = w.Save(state, entries) + if err != nil { + t.Fatalf("Failed to save WAL: %v", err) + } + err = w.Close() + if err != nil { + t.Fatalf("Failed to close WAL: %v", err) + } + } + + w, err = wal.OpenForRead(lg, tmpPath, walpb.Snapshot{}) + if err != nil { + t.Fatalf("Failed to open WAL: %v", err) + } + return w, tmpPath +} + +func Reopen(t testing.TB, walPath string) *wal.WAL { + t.Helper() + lg := zaptest.NewLogger(t) + w, err := wal.OpenForRead(lg, walPath, walpb.Snapshot{}) + if err != nil { + t.Fatalf("Failed to open WAL: %v", err) + } + return w +} diff --git a/server/storage/wal/version.go b/server/storage/wal/version.go index b7b7bc5d7..6a4903b98 100644 --- a/server/storage/wal/version.go +++ b/server/storage/wal/version.go @@ -28,14 +28,29 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" ) -// MinimalEtcdVersion returns minimal etcd able to interpret provided WAL log, -// determined by looking at entries since the last snapshot and returning the highest -// etcd version annotation from used messages, fields, enums and their values. -func (w *WAL) MinimalEtcdVersion() *semver.Version { +// ReadWALVersion reads remaining entries from opened WAL and returns struct +// that implements schema.WAL interface. +func ReadWALVersion(w *WAL) (*walVersion, error) { _, _, ents, err := w.ReadAll() if err != nil { - panic(err) + return nil, err } + return &walVersion{entries: ents}, nil +} + +type walVersion struct { + entries []raftpb.Entry +} + +// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log, +func (w *walVersion) MinimalEtcdVersion() *semver.Version { + return MinimalEtcdVersion(w.entries) +} + +// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log, +// determined by looking at entries since the last snapshot and returning the highest +// etcd version annotation from used messages, fields, enums and their values. +func MinimalEtcdVersion(ents []raftpb.Entry) *semver.Version { var maxVer *semver.Version for _, ent := range ents { maxVer = maxVersion(maxVer, etcdVersionFromEntry(ent)) diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index a24c9d406..187cfe397 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -234,6 +234,14 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { return w, nil } +func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) { + err := w.Close() + if err != nil { + lg.Panic("failed to close WAL during reopen", zap.Error(err)) + } + return Open(lg, w.dir, snap) +} + func (w *WAL) SetUnsafeNoFsync() { w.unsafeNoSync = true } diff --git a/tests/e2e/utl_migrate_test.go b/tests/e2e/utl_migrate_test.go index efc97f7f4..7623fc1b8 100644 --- a/tests/e2e/utl_migrate_test.go +++ b/tests/e2e/utl_migrate_test.go @@ -85,7 +85,7 @@ func TestEtctlutlMigrate(t *testing.T) { { name: "Downgrade v3.6 to v3.5 should fail until it's implemented", targetVersion: "3.5", - expectLogsSubString: "Error: cannot create migration plan: downgrades are not yet supported", + expectLogsSubString: "cannot downgrade storage, WAL contains newer entries", expectStorageVersion: &schema.V3_6, }, { diff --git a/tests/integration/utl_wal_version_test.go b/tests/integration/utl_wal_version_test.go index 3ffbd46ef..e5318c664 100644 --- a/tests/integration/utl_wal_version_test.go +++ b/tests/integration/utl_wal_version_test.go @@ -64,6 +64,9 @@ func TestEtcdVersionFromWAL(t *testing.T) { panic(err) } defer w.Close() - ver := w.MinimalEtcdVersion() - assert.Equal(t, &semver.Version{Major: 3, Minor: 6}, ver) + walVersion, err := wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, &semver.Version{Major: 3, Minor: 6}, walVersion.MinimalEtcdVersion()) } From 6c2be0822d10be3793ceed2dc54fde8b5a404215 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 6 Oct 2021 16:39:06 +0200 Subject: [PATCH 5/6] tests: Add e2e tests for downgrades --- tests/e2e/cluster_downgrade_test.go | 144 ++++++++++++++++++++++++++++ tests/e2e/ctl_v3_grpc_test.go | 17 +--- tests/framework/e2e/util.go | 14 +++ 3 files changed, 159 insertions(+), 16 deletions(-) create mode 100644 tests/e2e/cluster_downgrade_test.go diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go new file mode 100644 index 000000000..4769c4878 --- /dev/null +++ b/tests/e2e/cluster_downgrade_test.go @@ -0,0 +1,144 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/client/pkg/v3/fileutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestDowngradeUpgrade(t *testing.T) { + currentEtcdBinary := "" + lastReleaseBinary := e2e.BinDir + "/etcd-last-release" + if !fileutil.Exist(lastReleaseBinary) { + t.Skipf("%q does not exist", lastReleaseBinary) + } + currentVersion := semver.New(version.Version) + lastVersion := semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1} + currentVersionStr := fmt.Sprintf("%d.%d", currentVersion.Major, currentVersion.Minor) + lastVersionStr := fmt.Sprintf("%d.%d", lastVersion.Major, lastVersion.Minor) + + e2e.BeforeTest(t) + dataDirPath := t.TempDir() + + epc := startEtcd(t, currentEtcdBinary, dataDirPath) + validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + + downgradeEnable(t, epc, lastVersion) + validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: currentVersionStr}) + + stopEtcd(t, epc) + epc = startEtcd(t, lastReleaseBinary, dataDirPath) + validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: lastVersionStr}) + expectLog(t, epc, "the cluster has been downgraded") + + stopEtcd(t, epc) + epc = startEtcd(t, currentEtcdBinary, dataDirPath) + // TODO: Verify cluster version after upgrade when we fix cluster version set timeout + validateVersion(t, epc, version.Versions{Server: currentVersionStr}) +} + +func startEtcd(t *testing.T, execPath, dataDirPath string) *e2e.EtcdProcessCluster { + epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + ExecPath: execPath, + DataDirPath: dataDirPath, + ClusterSize: 1, + InitialToken: "new", + KeepDataDir: true, + // TODO: REMOVE snapshot override when snapshotting is automated after lowering storage versiont l + SnapshotCount: 5, + }) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + prefixArgs := []string{e2e.CtlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ",")} + t.Log("Write keys to ensure wal snapshot is created so cluster version set is snapshotted") + e2e.ExecuteWithTimeout(t, 20*time.Second, func() { + for i := 0; i < 10; i++ { + if err := e2e.SpawnWithExpect(append(prefixArgs, "put", fmt.Sprintf("%d", i), "value"), "OK"); err != nil { + t.Fatal(err) + } + } + }) + return epc +} + +func downgradeEnable(t *testing.T, epc *e2e.EtcdProcessCluster, ver semver.Version) { + t.Log("etcdctl downgrade...") + c, err := clientv3.New(clientv3.Config{ + Endpoints: epc.EndpointsV3(), + }) + if err != nil { + t.Fatal(err) + } + defer c.Close() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + // TODO: Fix request always timing out even thou it succeeds + c.Downgrade(ctx, 1, ver.String()) + cancel() + + expectLog(t, epc, "The server is ready to downgrade") +} + +func stopEtcd(t *testing.T, epc *e2e.EtcdProcessCluster) { + t.Log("Stopping the server...") + if err := epc.Procs[0].Stop(); err != nil { + t.Fatal(err) + } +} + +func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.Versions) { + t.Log("Validate version") + // Two separate calls to expect as it doesn't support multiple matches on the same line + e2e.ExecuteWithTimeout(t, 20*time.Second, func() { + if expect.Server != "" { + err := e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server) + if err != nil { + t.Fatal(err) + } + } + if expect.Cluster != "" { + err := e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster) + if err != nil { + t.Fatal(err) + } + } + }) +} + +func expectLog(t *testing.T, epc *e2e.EtcdProcessCluster, expectLog string) { + t.Helper() + e2e.ExecuteWithTimeout(t, 30*time.Second, func() { + _, err := epc.Procs[0].Logs().Expect(expectLog) + if err != nil { + t.Fatal(err) + } + }) +} diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go index 6007e241c..39211e7dc 100644 --- a/tests/e2e/ctl_v3_grpc_test.go +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -24,7 +24,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -105,7 +104,7 @@ func TestAuthority(t *testing.T) { t.Fatal(err) } - executeWithTimeout(t, 5*time.Second, func() { + e2e.ExecuteWithTimeout(t, 5*time.Second, func() { assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc) }) }) @@ -154,20 +153,6 @@ func firstMatch(t *testing.T, expectLine string, logs ...e2e.LogsExpect) string return <-match } -func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) { - donec := make(chan struct{}) - go func() { - defer close(donec) - f() - }() - - select { - case <-time.After(timeout): - testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) - case <-donec: - } -} - type etcdctlV3 struct { cfg *e2e.EtcdProcessClusterConfig endpoints []string diff --git a/tests/framework/e2e/util.go b/tests/framework/e2e/util.go index a3b903863..5d66ba4cb 100644 --- a/tests/framework/e2e/util.go +++ b/tests/framework/e2e/util.go @@ -117,3 +117,17 @@ func ToTLS(s string) string { func SkipInShortMode(t testing.TB) { testutil.SkipTestIfShortMode(t, "e2e tests are not running in --short mode") } + +func ExecuteWithTimeout(t *testing.T, timeout time.Duration, f func()) { + donec := make(chan struct{}) + go func() { + defer close(donec) + f() + }() + + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + } +} From 9d47a97b0b79a40b22ce62a724d5aed54b50356d Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 15 Oct 2021 16:24:47 +0200 Subject: [PATCH 6/6] server: Remove lock from adapter to avoid deadlock --- server/etcdserver/adapters.go | 31 ++++++----------------- server/etcdserver/version/monitor.go | 5 ---- server/etcdserver/version/monitor_test.go | 11 -------- server/etcdserver/version/version_test.go | 6 ----- 4 files changed, 8 insertions(+), 45 deletions(-) diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 1def9b69e..bc4b68645 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -23,20 +23,17 @@ import ( "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/api/v3/version" serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" - "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" ) // serverVersionAdapter implements Server interface needed by serverversion.Monitor type serverVersionAdapter struct { *EtcdServer - tx backend.BatchTx } func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter { return &serverVersionAdapter{ EtcdServer: s, - tx: nil, } } @@ -75,11 +72,10 @@ func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions } func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { - if s.tx == nil { - s.Lock() - defer s.Unlock() - } - v, err := schema.UnsafeDetectSchemaVersion(s.lg, s.tx) + tx := s.be.BatchTx() + tx.Lock() + defer tx.Unlock() + v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx) if err != nil { return nil } @@ -87,19 +83,8 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { } func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error { - if s.tx == nil { - s.Lock() - defer s.Unlock() - } - return schema.UnsafeMigrate(s.lg, s.tx, s.r.storage, target) -} - -func (s *serverVersionAdapter) Lock() { - s.tx = s.be.BatchTx() - s.tx.Lock() -} - -func (s *serverVersionAdapter) Unlock() { - s.tx.Unlock() - s.tx = nil + tx := s.be.BatchTx() + tx.Lock() + defer tx.Unlock() + return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 05da0b228..8ac8d8e8d 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -40,9 +40,6 @@ type Server interface { GetStorageVersion() *semver.Version UpdateStorageVersion(semver.Version) error - - Lock() - Unlock() } func NewMonitor(lg *zap.Logger, storage Server) *Monitor { @@ -100,8 +97,6 @@ func (m *Monitor) UpdateStorageVersionIfNeeded() { if cv == nil { return } - m.s.Lock() - defer m.s.Unlock() sv := m.s.GetStorageVersion() if sv == nil || sv.Major != cv.Major || sv.Minor != cv.Minor { diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index a6b31cec8..b11f09581 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -421,14 +421,3 @@ func (s *storageMock) UpdateStorageVersion(v semver.Version) error { s.storageVersion = &v return nil } - -func (s *storageMock) Lock() { - if s.locked { - panic("Deadlock") - } - s.locked = true -} - -func (s *storageMock) Unlock() { - s.locked = false -} diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go index 70578ad6b..c82ca808c 100644 --- a/server/etcdserver/version/version_test.go +++ b/server/etcdserver/version/version_test.go @@ -256,9 +256,3 @@ func (m *memberMock) UpdateStorageVersion(v semver.Version) error { func (m *memberMock) TriggerSnapshot() { } - -func (m *memberMock) Lock() { -} - -func (m *memberMock) Unlock() { -}