From 2de36c0596d25f2bfeea5c93fb71a22ef9e624e2 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 4 Oct 2021 15:32:42 +0200 Subject: [PATCH 1/8] server: Cover monitor with upgrade unit tests --- server/etcdserver/adapters.go | 2 +- server/etcdserver/version/monitor.go | 8 +- server/etcdserver/version/monitor_test.go | 190 ++++++++++++++++++++-- server/etcdserver/version/version_test.go | 175 ++++++++++++++++++++ 4 files changed, 359 insertions(+), 16 deletions(-) create mode 100644 server/etcdserver/version/version_test.go diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index a6611359c..5f3ff9993 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -63,7 +63,7 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() } -func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions { +func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions { return getVersions(s.lg, s.cluster, s.id, s.peerRt) } diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 19e91f7ef..fef2e38f3 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -31,7 +31,7 @@ type Monitor struct { type Server interface { GetClusterVersion() *semver.Version GetDowngradeInfo() *membership.DowngradeInfo - GetVersions() map[string]*version.Versions + GetMembersVersions() map[string]*version.Versions UpdateClusterVersion(string) DowngradeCancel() @@ -99,7 +99,7 @@ func (m *Monitor) UpdateStorageVersionIfNeeded() { func (m *Monitor) CancelDowngradeIfNeeded() { d := m.s.GetDowngradeInfo() - if !d.Enabled { + if d == nil || !d.Enabled { return } @@ -115,7 +115,7 @@ func (m *Monitor) CancelDowngradeIfNeeded() { // The returned version is the min server version in the map, or nil if the min // version in unknown. func (m *Monitor) decideClusterVersion() *semver.Version { - vers := m.s.GetVersions() + vers := m.s.GetMembersVersions() var cv *semver.Version lv := semver.Must(semver.NewVersion(version.Version)) @@ -153,7 +153,7 @@ func (m *Monitor) decideClusterVersion() *semver.Version { // versionsMatchTarget returns true if all server versions are equal to target version, otherwise return false. // It can be used to decide the whether the cluster finishes downgrading to target version. func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool { - vers := m.s.GetVersions() + vers := m.s.GetMembersVersions() for mid, ver := range vers { if ver == nil { return false diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index b76915b11..d2a7b36d1 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -5,15 +5,15 @@ import ( "testing" "github.com/coreos/go-semver/semver" - "go.uber.org/zap" + "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" ) -var testLogger = zap.NewExample() - var ( + V3_0 = semver.Version{Major: 3, Minor: 0} V3_5 = semver.Version{Major: 3, Minor: 5} V3_6 = semver.Version{Major: 3, Minor: 6} ) @@ -47,8 +47,8 @@ func TestDecideClusterVersion(t *testing.T) { } for i, tt := range tests { - monitor := NewMonitor(testLogger, &storageMock{ - versions: tt.vers, + monitor := NewMonitor(zaptest.NewLogger(t), &storageMock{ + memberVersions: tt.vers, }) dver := monitor.decideClusterVersion() if !reflect.DeepEqual(dver, tt.wdver) { @@ -97,7 +97,7 @@ func TestDecideStorageVersion(t *testing.T) { clusterVersion: tt.clusterVersion, storageVersion: tt.storageVersion, } - monitor := NewMonitor(testLogger, s) + monitor := NewMonitor(zaptest.NewLogger(t), s) monitor.UpdateStorageVersionIfNeeded() if !reflect.DeepEqual(s.storageVersion, tt.expectStorageVersion) { t.Errorf("Unexpected storage version value, got = %+v, want %+v", s.storageVersion, tt.expectStorageVersion) @@ -147,8 +147,8 @@ func TestVersionMatchTarget(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - monitor := NewMonitor(testLogger, &storageMock{ - versions: tt.versionMap, + monitor := NewMonitor(zaptest.NewLogger(t), &storageMock{ + memberVersions: tt.versionMap, }) actual := monitor.versionsMatchTarget(tt.targetVersion) if actual != tt.expectedFinished { @@ -158,8 +158,176 @@ func TestVersionMatchTarget(t *testing.T) { } } +func TestUpdateClusterVersionIfNeeded(t *testing.T) { + tests := []struct { + name string + clusterVersion *semver.Version + memberVersions map[string]*version.Versions + downgrade *membership.DowngradeInfo + expectClusterVersion *semver.Version + }{ + { + name: "Default to 3.0 if there are no members", + expectClusterVersion: &V3_0, + }, + { + name: "Should pick lowest server version from members", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.7.0", Server: "3.6.0"}, + "b": {Cluster: "3.4.0", Server: "3.5.0"}, + }, + expectClusterVersion: &V3_5, + }, + { + name: "Sets minimal version when member has broken version", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.7.0", Server: "3.6.0"}, + "b": {Cluster: "xxxx", Server: "yyyy"}, + }, + expectClusterVersion: &V3_0, + }, + { + name: "Should pick lowest server version from members (cv already set)", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.7.0", Server: "3.6.0"}, + "b": {Cluster: "3.4.0", Server: "3.5.0"}, + }, + clusterVersion: &V3_5, + expectClusterVersion: &V3_5, + }, + { + name: "Should upgrade cluster version if all members have upgraded (have higher server version)", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.6.0"}, + "b": {Cluster: "3.5.0", Server: "3.6.0"}, + }, + clusterVersion: &V3_5, + expectClusterVersion: &V3_6, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &storageMock{ + clusterVersion: tt.clusterVersion, + memberVersions: tt.memberVersions, + downgradeInfo: tt.downgrade, + } + monitor := NewMonitor(zaptest.NewLogger(t), s) + + // Run multiple times to ensure that results are stable + for i := 0; i < 3; i++ { + monitor.UpdateClusterVersionIfNeeded() + assert.Equal(t, tt.expectClusterVersion, s.clusterVersion) + } + }) + } +} + +func TestCancelDowngradeIfNeeded(t *testing.T) { + tests := []struct { + name string + memberVersions map[string]*version.Versions + downgrade *membership.DowngradeInfo + expectDowngrade *membership.DowngradeInfo + }{ + { + name: "No action if there no downgrade in progress", + }, + { + name: "Cancel downgrade if there are no members", + downgrade: &membership.DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectDowngrade: nil, + }, + // Next entries go through all states that should happen during downgrade + { + name: "No action if downgrade was not started", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.6.0", Server: "3.6.1"}, + "b": {Cluster: "3.6.0", Server: "3.6.2"}, + }, + }, + { + name: "Cancel downgrade if all members have downgraded", + memberVersions: map[string]*version.Versions{ + "a": {Cluster: "3.5.0", Server: "3.5.1"}, + "b": {Cluster: "3.5.0", Server: "3.5.2"}, + }, + downgrade: &membership.DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + expectDowngrade: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &storageMock{ + memberVersions: tt.memberVersions, + downgradeInfo: tt.downgrade, + } + monitor := NewMonitor(zaptest.NewLogger(t), s) + + // Run multiple times to ensure that results are stable + for i := 0; i < 3; i++ { + monitor.CancelDowngradeIfNeeded() + assert.Equal(t, tt.expectDowngrade, s.downgradeInfo) + } + }) + } +} + +func TestUpdateStorageVersionIfNeeded(t *testing.T) { + tests := []struct { + name string + clusterVersion *semver.Version + storageVersion *semver.Version + expectStorageVersion *semver.Version + }{ + { + name: "No action if cluster version is nil", + }, + { + name: "Should set storage version if cluster version is set", + clusterVersion: &V3_5, + expectStorageVersion: &V3_5, + }, + { + name: "No action if storage version was already set", + storageVersion: &V3_5, + expectStorageVersion: &V3_5, + }, + { + name: "No action if storage version equals cluster version", + clusterVersion: &V3_5, + storageVersion: &V3_5, + expectStorageVersion: &V3_5, + }, + { + name: "Should set storage version to cluster version", + clusterVersion: &V3_6, + storageVersion: &V3_5, + expectStorageVersion: &V3_6, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &storageMock{ + clusterVersion: tt.clusterVersion, + storageVersion: tt.storageVersion, + } + monitor := NewMonitor(zaptest.NewLogger(t), s) + + // Run multiple times to ensure that results are stable + for i := 0; i < 3; i++ { + monitor.UpdateStorageVersionIfNeeded() + assert.Equal(t, tt.expectStorageVersion, s.storageVersion) + } + }) + } +} + type storageMock struct { - versions map[string]*version.Versions + memberVersions map[string]*version.Versions clusterVersion *semver.Version storageVersion *semver.Version downgradeInfo *membership.DowngradeInfo @@ -184,8 +352,8 @@ func (s *storageMock) GetDowngradeInfo() *membership.DowngradeInfo { return s.downgradeInfo } -func (s *storageMock) GetVersions() map[string]*version.Versions { - return s.versions +func (s *storageMock) GetMembersVersions() map[string]*version.Versions { + return s.memberVersions } func (s *storageMock) GetStorageVersion() *semver.Version { diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go new file mode 100644 index 000000000..5178494c2 --- /dev/null +++ b/server/etcdserver/version/version_test.go @@ -0,0 +1,175 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" +) + +var ( + V3_7 = semver.Version{Major: 3, Minor: 7} +) + +func TestUpgradeSingleNode(t *testing.T) { + lg := zaptest.NewLogger(t) + c := newCluster(lg, 1, V3_6) + c.StepMonitors() + assert.Equal(t, newCluster(lg, 1, V3_6), c) + + c.ReplaceMemberBinary(0, V3_7) + c.StepMonitors() + c.StepMonitors() + + assert.Equal(t, newCluster(lg, 1, V3_7), c) +} + +func TestUpgradeThreeNodes(t *testing.T) { + lg := zaptest.NewLogger(t) + c := newCluster(lg, 3, V3_6) + c.StepMonitors() + assert.Equal(t, newCluster(lg, 3, V3_6), c) + + c.ReplaceMemberBinary(0, V3_7) + c.StepMonitors() + c.ReplaceMemberBinary(1, V3_7) + c.StepMonitors() + c.ReplaceMemberBinary(2, V3_7) + c.StepMonitors() + c.StepMonitors() + + assert.Equal(t, newCluster(lg, 3, V3_7), c) +} + +func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMock { + cluster := &clusterMock{ + clusterVersion: ver, + members: make([]*memberMock, 0, memberCount), + } + majorMinVer := semver.Version{Major: ver.Major, Minor: ver.Minor} + for i := 0; i < memberCount; i++ { + m := &memberMock{ + cluster: cluster, + serverVersion: ver, + storageVersion: majorMinVer, + } + m.monitor = NewMonitor(lg.Named(fmt.Sprintf("m%d", i)), m) + cluster.members = append(cluster.members, m) + } + cluster.members[0].isLeader = true + return cluster +} + +func (c *clusterMock) StepMonitors() { + // Execute monitor functions in random order as it is not guaranteed + fs := []func(){} + for _, m := range c.members { + fs = append(fs, m.monitor.UpdateStorageVersionIfNeeded) + if m.isLeader { + fs = append(fs, m.monitor.CancelDowngradeIfNeeded, m.monitor.UpdateClusterVersionIfNeeded) + } + } + rand.Shuffle(len(fs), func(i, j int) { + fs[i], fs[j] = fs[j], fs[i] + }) + for _, f := range fs { + f() + } +} + +type clusterMock struct { + clusterVersion semver.Version + downgradeInfo *membership.DowngradeInfo + members []*memberMock +} + +func (c *clusterMock) DowngradeEnable(ver semver.Version) { + c.downgradeInfo = &membership.DowngradeInfo{TargetVersion: ver.String(), Enabled: true} +} + +func (c *clusterMock) MembersVersions() map[string]*version.Versions { + result := map[string]*version.Versions{} + for i, m := range c.members { + result[fmt.Sprintf("%d", i)] = &version.Versions{ + Server: m.serverVersion.String(), + Cluster: c.clusterVersion.String(), + } + } + return result +} + +func (c *clusterMock) ReplaceMemberBinary(mid int, newServerVersion semver.Version) { + if newServerVersion.LessThan(c.clusterVersion) { + panic("Members cannot join clusters with higher version") + } + c.members[mid].serverVersion = newServerVersion +} + +type memberMock struct { + cluster *clusterMock + + isLeader bool + serverVersion semver.Version + storageVersion semver.Version + monitor *Monitor +} + +var _ Server = (*memberMock)(nil) + +func (m *memberMock) UpdateClusterVersion(version string) { + m.cluster.clusterVersion = *semver.New(version) +} + +func (m *memberMock) DowngradeCancel() { + m.cluster.downgradeInfo = nil +} + +func (m *memberMock) GetClusterVersion() *semver.Version { + return &m.cluster.clusterVersion +} + +func (m *memberMock) GetDowngradeInfo() *membership.DowngradeInfo { + return m.cluster.downgradeInfo +} + +func (m *memberMock) GetMembersVersions() map[string]*version.Versions { + return m.cluster.MembersVersions() +} + +func (m *memberMock) GetStorageVersion() *semver.Version { + return &m.storageVersion +} + +func (m *memberMock) UpdateStorageVersion(v semver.Version) { + m.storageVersion = v +} + +func (m *memberMock) TriggerSnapshot() { +} + +func (m *memberMock) Lock() { +} + +func (m *memberMock) Unlock() { +} From 378159af30bc9a9e2091b7220fd8d0d6742fe5a4 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 4 Oct 2021 15:53:54 +0200 Subject: [PATCH 2/8] server: Refactor cluster version decision code --- server/etcdserver/adapters.go | 2 +- server/etcdserver/cluster_util.go | 6 +-- server/etcdserver/version/monitor.go | 58 +++++++++++------------ server/etcdserver/version/monitor_test.go | 14 +++--- 4 files changed, 38 insertions(+), 42 deletions(-) diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 5f3ff9993..994d0d3b5 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -64,7 +64,7 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo { } func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions { - return getVersions(s.lg, s.cluster, s.id, s.peerRt) + return getMembersVersions(s.lg, s.cluster, s.id, s.peerRt) } func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { diff --git a/server/etcdserver/cluster_util.go b/server/etcdserver/cluster_util.go index 61028a898..531876f9e 100644 --- a/server/etcdserver/cluster_util.go +++ b/server/etcdserver/cluster_util.go @@ -134,11 +134,11 @@ func getRemotePeerURLs(cl *membership.RaftCluster, local string) []string { return us } -// getVersions returns the versions of the members in the given cluster. +// getMembersVersions returns the versions of the members in the given cluster. // The key of the returned map is the member's ID. The value of the returned map // is the semver versions string, including server and cluster. // If it fails to get the version of a member, the key will be nil. -func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions { +func getMembersVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions { members := cl.Members() vers := make(map[string]*version.Versions) for _, m := range members { @@ -184,7 +184,7 @@ func allowedVersionRange(downgradeEnabled bool) (minV *semver.Version, maxV *sem // out of the range. // We set this rule since when the local member joins, another member might be offline. func isCompatibleWithCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool { - vers := getVersions(lg, cl, local, rt) + vers := getMembersVersions(lg, cl, local, rt) minV, maxV := allowedVersionRange(getDowngradeEnabledFromRemotePeers(lg, cl, local, rt)) return isCompatibleWithVers(lg, vers, local, minV, maxV) } diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index fef2e38f3..92b432b0c 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -49,34 +49,29 @@ func NewMonitor(lg *zap.Logger, storage Server) *Monitor { } } -// UpdateClusterVersionIfNeeded updates the cluster version if all members agrees on a higher one. -// It prints out log if there is a member with a higher version than the -// local version. +// UpdateClusterVersionIfNeeded updates the cluster version. func (m *Monitor) UpdateClusterVersionIfNeeded() { - v := m.decideClusterVersion() - if v != nil { - // only keep major.minor version for comparison - v = &semver.Version{ - Major: v.Major, - Minor: v.Minor, - } + newClusterVersion := m.decideClusterVersion() + if newClusterVersion != nil { + newClusterVersion = &semver.Version{Major: newClusterVersion.Major, Minor: newClusterVersion.Minor} + m.s.UpdateClusterVersion(newClusterVersion.String()) } +} - // if the current version is nil: - // 1. use the decided version if possible - // 2. or use the min cluster version - if m.s.GetClusterVersion() == nil { - verStr := version.MinClusterVersion - if v != nil { - verStr = v.String() +// decideClusterVersion decides the cluster version based on the members versions if all members agree on a higher one. +func (m *Monitor) decideClusterVersion() *semver.Version { + clusterVersion := m.s.GetClusterVersion() + membersMinimalVersion := m.membersMinimalVersion() + if clusterVersion == nil { + if membersMinimalVersion != nil { + return membersMinimalVersion } - m.s.UpdateClusterVersion(verStr) - return + return semver.New(version.MinClusterVersion) } - - if v != nil && membership.IsValidVersionChange(m.s.GetClusterVersion(), v) { - m.s.UpdateClusterVersion(v.String()) + if membersMinimalVersion != nil && clusterVersion.LessThan(*membersMinimalVersion) && membership.IsValidVersionChange(clusterVersion, membersMinimalVersion) { + return membersMinimalVersion } + return nil } // UpdateStorageVersionIfNeeded updates the storage version if it differs from cluster version. @@ -111,12 +106,13 @@ func (m *Monitor) CancelDowngradeIfNeeded() { } } -// decideClusterVersion decides the cluster version based on the versions map. -// The returned version is the min server version in the map, or nil if the min +// membersMinimalVersion returns the min server version in the map, or nil if the min // version in unknown. -func (m *Monitor) decideClusterVersion() *semver.Version { +// It prints out log if there is a member with a higher version than the +// local version. +func (m *Monitor) membersMinimalVersion() *semver.Version { vers := m.s.GetMembersVersions() - var cv *semver.Version + var minV *semver.Version lv := semver.Must(semver.NewVersion(version.Version)) for mid, ver := range vers { @@ -141,13 +137,13 @@ func (m *Monitor) decideClusterVersion() *semver.Version { zap.String("remote-member-version", ver.Server), ) } - if cv == nil { - cv = v - } else if v.LessThan(*cv) { - cv = v + if minV == nil { + minV = v + } else if v.LessThan(*minV) { + minV = v } } - return cv + return minV } // versionsMatchTarget returns true if all server versions are equal to target version, otherwise return false. diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index d2a7b36d1..489f792b7 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -18,10 +18,10 @@ var ( V3_6 = semver.Version{Major: 3, Minor: 6} ) -func TestDecideClusterVersion(t *testing.T) { +func TestMemberMinimalVersion(t *testing.T) { tests := []struct { - vers map[string]*version.Versions - wdver *semver.Version + memberVersions map[string]*version.Versions + wantVersion *semver.Version }{ { map[string]*version.Versions{"a": {Server: "2.0.0"}}, @@ -48,11 +48,11 @@ func TestDecideClusterVersion(t *testing.T) { for i, tt := range tests { monitor := NewMonitor(zaptest.NewLogger(t), &storageMock{ - memberVersions: tt.vers, + memberVersions: tt.memberVersions, }) - dver := monitor.decideClusterVersion() - if !reflect.DeepEqual(dver, tt.wdver) { - t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver) + minV := monitor.membersMinimalVersion() + if !reflect.DeepEqual(minV, tt.wantVersion) { + t.Errorf("#%d: ver = %+v, want %+v", i, minV, tt.wantVersion) } } } From 1e5e57f26898c02551d06a067edd48fe550bddf1 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 5 Oct 2021 15:01:02 +0200 Subject: [PATCH 3/8] server: Move downgrade detection code to version package --- server/etcdserver/adapters.go | 3 +- server/etcdserver/api/capability.go | 4 +- server/etcdserver/api/membership/cluster.go | 37 +++------ .../etcdserver/api/membership/cluster_test.go | 73 ------------------ .../api/membership/membership_test.go | 5 +- server/etcdserver/api/membership/store.go | 5 +- server/etcdserver/apply.go | 5 +- server/etcdserver/server.go | 2 +- server/etcdserver/v3_server.go | 3 +- .../{api/membership => version}/downgrade.go | 22 +++++- .../membership => version}/downgrade_test.go | 76 ++++++++++++++++++- server/etcdserver/version/monitor.go | 5 +- server/etcdserver/version/monitor_test.go | 15 ++-- server/etcdserver/version/version_test.go | 7 +- server/storage/schema/membership.go | 7 +- 15 files changed, 135 insertions(+), 134 deletions(-) rename server/etcdserver/{api/membership => version}/downgrade.go (77%) rename server/etcdserver/{api/membership => version}/downgrade_test.go (71%) diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 994d0d3b5..467992e1f 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -23,7 +23,6 @@ import ( "go.uber.org/zap" "go.etcd.io/etcd/api/v3/version" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" ) @@ -59,7 +58,7 @@ func (s *serverVersionAdapter) GetClusterVersion() *semver.Version { return s.cluster.Version() } -func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo { +func (s *serverVersionAdapter) GetDowngradeInfo() *serverversion.DowngradeInfo { return s.cluster.DowngradeInfo() } diff --git a/server/etcdserver/api/capability.go b/server/etcdserver/api/capability.go index dcd6a0761..9c243294e 100644 --- a/server/etcdserver/api/capability.go +++ b/server/etcdserver/api/capability.go @@ -18,7 +18,7 @@ import ( "sync" "go.etcd.io/etcd/api/v3/version" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" "go.uber.org/zap" "github.com/coreos/go-semver/semver" @@ -64,7 +64,7 @@ func UpdateCapability(lg *zap.Logger, v *semver.Version) { return } enableMapMu.Lock() - if curVersion != nil && !membership.IsValidVersionChange(v, curVersion) { + if curVersion != nil && !serverversion.IsValidVersionChange(v, curVersion) { enableMapMu.Unlock() return } diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 9f486592b..ad6633ebd 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -33,6 +33,7 @@ import ( "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" "github.com/coreos/go-semver/semver" "github.com/prometheus/client_golang/prometheus" @@ -58,7 +59,7 @@ type RaftCluster struct { // removed id cannot be reused. removed map[types.ID]bool - downgradeInfo *DowngradeInfo + downgradeInfo *serverversion.DowngradeInfo versionChanged *notify.Notifier } @@ -113,7 +114,7 @@ func NewCluster(lg *zap.Logger) *RaftCluster { lg: lg, members: make(map[types.ID]*Member), removed: make(map[types.ID]bool), - downgradeInfo: &DowngradeInfo{Enabled: false}, + downgradeInfo: &serverversion.DowngradeInfo{Enabled: false}, } } @@ -268,11 +269,11 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { if c.be != nil { c.downgradeInfo = c.be.DowngradeInfoFromBackend() } - d := &DowngradeInfo{Enabled: false} + d := &serverversion.DowngradeInfo{Enabled: false} if c.downgradeInfo != nil { - d = &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} + d = &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} } - mustDetectDowngrade(c.lg, c.version, d) + serverversion.MustDetectDowngrade(c.lg, c.version, d) onSet(c.lg, c.version) for _, m := range c.members { @@ -540,7 +541,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s } oldVer := c.version c.version = ver - mustDetectDowngrade(c.lg, c.version, c.downgradeInfo) + serverversion.MustDetectDowngrade(c.lg, c.version, c.downgradeInfo) if c.v2store != nil { mustSaveClusterVersionToStore(c.lg, c.v2store, ver) } @@ -715,22 +716,6 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R return nil } -// IsValidVersionChange checks the two scenario when version is valid to change: -// 1. Downgrade: cluster version is 1 minor version higher than local version, -// cluster version should change. -// 2. Cluster start: when not all members version are available, cluster version -// is set to MinVersion(3.0), when all members are at higher version, cluster version -// is lower than local version, cluster version should change -func IsValidVersionChange(cv *semver.Version, lv *semver.Version) bool { - cv = &semver.Version{Major: cv.Major, Minor: cv.Minor} - lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} - - if isValidDowngrade(cv, lv) || (cv.Major == lv.Major && cv.LessThan(*lv)) { - return true - } - return false -} - // IsLocalMemberLearner returns if the local member is raft learner func (c *RaftCluster) IsLocalMemberLearner() bool { c.Lock() @@ -747,17 +732,17 @@ func (c *RaftCluster) IsLocalMemberLearner() bool { } // DowngradeInfo returns the downgrade status of the cluster -func (c *RaftCluster) DowngradeInfo() *DowngradeInfo { +func (c *RaftCluster) DowngradeInfo() *serverversion.DowngradeInfo { c.Lock() defer c.Unlock() if c.downgradeInfo == nil { - return &DowngradeInfo{Enabled: false} + return &serverversion.DowngradeInfo{Enabled: false} } - d := &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} + d := &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} return d } -func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApplyV3) { +func (c *RaftCluster) SetDowngradeInfo(d *serverversion.DowngradeInfo, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() diff --git a/server/etcdserver/api/membership/cluster_test.go b/server/etcdserver/api/membership/cluster_test.go index 23d81fec1..e96e4cca3 100644 --- a/server/etcdserver/api/membership/cluster_test.go +++ b/server/etcdserver/api/membership/cluster_test.go @@ -21,7 +21,6 @@ import ( "reflect" "testing" - "github.com/coreos/go-semver/semver" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/client/pkg/v3/testutil" @@ -947,75 +946,3 @@ func TestIsReadyToPromoteMember(t *testing.T) { } } } - -func TestIsVersionChangable(t *testing.T) { - v0 := semver.Must(semver.NewVersion("2.4.0")) - v1 := semver.Must(semver.NewVersion("3.4.0")) - v2 := semver.Must(semver.NewVersion("3.5.0")) - v3 := semver.Must(semver.NewVersion("3.5.1")) - v4 := semver.Must(semver.NewVersion("3.6.0")) - - tests := []struct { - name string - currentVersion *semver.Version - localVersion *semver.Version - expectedResult bool - }{ - { - name: "When local version is one minor lower than cluster version", - currentVersion: v2, - localVersion: v1, - expectedResult: true, - }, - { - name: "When local version is one minor and one patch lower than cluster version", - currentVersion: v3, - localVersion: v1, - expectedResult: true, - }, - { - name: "When local version is one minor higher than cluster version", - currentVersion: v1, - localVersion: v2, - expectedResult: true, - }, - { - name: "When local version is two minor higher than cluster version", - currentVersion: v1, - localVersion: v4, - expectedResult: true, - }, - { - name: "When local version is one major higher than cluster version", - currentVersion: v0, - localVersion: v1, - expectedResult: false, - }, - { - name: "When local version is equal to cluster version", - currentVersion: v1, - localVersion: v1, - expectedResult: false, - }, - { - name: "When local version is one patch higher than cluster version", - currentVersion: v2, - localVersion: v3, - expectedResult: false, - }, - { - name: "When local version is two minor lower than cluster version", - currentVersion: v4, - localVersion: v1, - expectedResult: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if ret := IsValidVersionChange(tt.currentVersion, tt.localVersion); ret != tt.expectedResult { - t.Errorf("Expected %v; Got %v", tt.expectedResult, ret) - } - }) - } -} diff --git a/server/etcdserver/api/membership/membership_test.go b/server/etcdserver/api/membership/membership_test.go index 69f76e383..221831d7b 100644 --- a/server/etcdserver/api/membership/membership_test.go +++ b/server/etcdserver/api/membership/membership_test.go @@ -6,6 +6,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/server/v3/etcdserver/version" "go.uber.org/zap" ) @@ -50,5 +51,5 @@ func (b *backendMock) MustSaveMemberToBackend(*Member) {} func (b *backendMock) TrimMembershipFromBackend() error { return nil } func (b *backendMock) MustDeleteMemberFromBackend(types.ID) {} -func (b *backendMock) MustSaveDowngradeToBackend(*DowngradeInfo) {} -func (b *backendMock) DowngradeInfoFromBackend() *DowngradeInfo { return nil } +func (b *backendMock) MustSaveDowngradeToBackend(*version.DowngradeInfo) {} +func (b *backendMock) DowngradeInfoFromBackend() *version.DowngradeInfo { return nil } diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index 0e4c296f2..bee385b06 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -18,6 +18,7 @@ import ( "path" "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/server/v3/etcdserver/version" "github.com/coreos/go-semver/semver" "go.uber.org/zap" @@ -43,8 +44,8 @@ type MemberBackend interface { } type DowngradeInfoBackend interface { - MustSaveDowngradeToBackend(*DowngradeInfo) - DowngradeInfoFromBackend() *DowngradeInfo + MustSaveDowngradeToBackend(*version.DowngradeInfo) + DowngradeInfoFromBackend() *version.DowngradeInfo } func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID { diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 8a14c08e7..c8ff51674 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -31,6 +31,7 @@ import ( "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver/api" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" serverstorage "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/storage/mvcc" @@ -946,9 +947,9 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt } func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) { - d := membership.DowngradeInfo{Enabled: false} + d := version.DowngradeInfo{Enabled: false} if r.Enabled { - d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} + d = version.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} } a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 310f436e5..bec28e45e 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -636,7 +636,7 @@ type ServerPeerV2 interface { DowngradeEnabledHandler() http.Handler } -func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() } +func (s *EtcdServer) DowngradeInfo() *serverversion.DowngradeInfo { return s.cluster.DowngradeInfo() } type downgradeEnabledHandler struct { lg *zap.Logger diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index b999eb2f4..8e745b880 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -28,6 +28,7 @@ import ( "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease/leasehttp" "go.etcd.io/etcd/server/v3/storage/mvcc" @@ -932,7 +933,7 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg } resp.Version = cv.String() - allowedTargetVersion := membership.AllowedDowngradeVersion(cv) + allowedTargetVersion := version.AllowedDowngradeVersion(cv) if !targetVersion.Equal(*allowedTargetVersion) { return nil, ErrInvalidDowngradeTargetVersion } diff --git a/server/etcdserver/api/membership/downgrade.go b/server/etcdserver/version/downgrade.go similarity index 77% rename from server/etcdserver/api/membership/downgrade.go rename to server/etcdserver/version/downgrade.go index 9fdafe22a..0f55c1b23 100644 --- a/server/etcdserver/api/membership/downgrade.go +++ b/server/etcdserver/version/downgrade.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package membership +package version import ( "github.com/coreos/go-semver/semver" @@ -37,8 +37,8 @@ func isValidDowngrade(verFrom *semver.Version, verTo *semver.Version) bool { return verTo.Equal(*AllowedDowngradeVersion(verFrom)) } -// mustDetectDowngrade will detect unexpected downgrade when the local server is recovered. -func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) { +// MustDetectDowngrade will detect unexpected downgrade when the local server is recovered. +func MustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) { lv := semver.Must(semver.NewVersion(version.Version)) // only keep major.minor version for comparison against cluster version lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} @@ -78,3 +78,19 @@ func AllowedDowngradeVersion(ver *semver.Version) *semver.Version { // Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x) return &semver.Version{Major: ver.Major, Minor: ver.Minor - 1} } + +// IsValidVersionChange checks the two scenario when version is valid to change: +// 1. Downgrade: cluster version is 1 minor version higher than local version, +// cluster version should change. +// 2. Cluster start: when not all members version are available, cluster version +// is set to MinVersion(3.0), when all members are at higher version, cluster version +// is lower than local version, cluster version should change +func IsValidVersionChange(cv *semver.Version, lv *semver.Version) bool { + cv = &semver.Version{Major: cv.Major, Minor: cv.Minor} + lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} + + if isValidDowngrade(cv, lv) || (cv.Major == lv.Major && cv.LessThan(*lv)) { + return true + } + return false +} diff --git a/server/etcdserver/api/membership/downgrade_test.go b/server/etcdserver/version/downgrade_test.go similarity index 71% rename from server/etcdserver/api/membership/downgrade_test.go rename to server/etcdserver/version/downgrade_test.go index 8bb612d35..4d4d8cdc6 100644 --- a/server/etcdserver/api/membership/downgrade_test.go +++ b/server/etcdserver/version/downgrade_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package membership +package version import ( "bytes" @@ -122,7 +122,7 @@ func TestMustDetectDowngrade(t *testing.T) { lcfg.ErrorOutputPaths = []string{logPath} lg, _ := lcfg.Build() - mustDetectDowngrade(lg, tests[iint].clusterVersion, tests[iint].downgrade) + MustDetectDowngrade(lg, tests[iint].clusterVersion, tests[iint].downgrade) return } @@ -193,3 +193,75 @@ func TestIsValidDowngrade(t *testing.T) { }) } } + +func TestIsVersionChangable(t *testing.T) { + v0 := semver.Must(semver.NewVersion("2.4.0")) + v1 := semver.Must(semver.NewVersion("3.4.0")) + v2 := semver.Must(semver.NewVersion("3.5.0")) + v3 := semver.Must(semver.NewVersion("3.5.1")) + v4 := semver.Must(semver.NewVersion("3.6.0")) + + tests := []struct { + name string + currentVersion *semver.Version + localVersion *semver.Version + expectedResult bool + }{ + { + name: "When local version is one minor lower than cluster version", + currentVersion: v2, + localVersion: v1, + expectedResult: true, + }, + { + name: "When local version is one minor and one patch lower than cluster version", + currentVersion: v3, + localVersion: v1, + expectedResult: true, + }, + { + name: "When local version is one minor higher than cluster version", + currentVersion: v1, + localVersion: v2, + expectedResult: true, + }, + { + name: "When local version is two minor higher than cluster version", + currentVersion: v1, + localVersion: v4, + expectedResult: true, + }, + { + name: "When local version is one major higher than cluster version", + currentVersion: v0, + localVersion: v1, + expectedResult: false, + }, + { + name: "When local version is equal to cluster version", + currentVersion: v1, + localVersion: v1, + expectedResult: false, + }, + { + name: "When local version is one patch higher than cluster version", + currentVersion: v2, + localVersion: v3, + expectedResult: false, + }, + { + name: "When local version is two minor lower than cluster version", + currentVersion: v4, + localVersion: v1, + expectedResult: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if ret := IsValidVersionChange(tt.currentVersion, tt.localVersion); ret != tt.expectedResult { + t.Errorf("Expected %v; Got %v", tt.expectedResult, ret) + } + }) + } +} diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 92b432b0c..43c78b47a 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -17,7 +17,6 @@ package version import ( "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/api/v3/version" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.uber.org/zap" ) @@ -30,7 +29,7 @@ type Monitor struct { // Server lists EtcdServer methods needed by Monitor type Server interface { GetClusterVersion() *semver.Version - GetDowngradeInfo() *membership.DowngradeInfo + GetDowngradeInfo() *DowngradeInfo GetMembersVersions() map[string]*version.Versions UpdateClusterVersion(string) DowngradeCancel() @@ -68,7 +67,7 @@ func (m *Monitor) decideClusterVersion() *semver.Version { } return semver.New(version.MinClusterVersion) } - if membersMinimalVersion != nil && clusterVersion.LessThan(*membersMinimalVersion) && membership.IsValidVersionChange(clusterVersion, membersMinimalVersion) { + if membersMinimalVersion != nil && clusterVersion.LessThan(*membersMinimalVersion) && IsValidVersionChange(clusterVersion, membersMinimalVersion) { return membersMinimalVersion } return nil diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index 489f792b7..6bc5e836a 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -9,7 +9,6 @@ import ( "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/version" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" ) var ( @@ -163,7 +162,7 @@ func TestUpdateClusterVersionIfNeeded(t *testing.T) { name string clusterVersion *semver.Version memberVersions map[string]*version.Versions - downgrade *membership.DowngradeInfo + downgrade *DowngradeInfo expectClusterVersion *semver.Version }{ { @@ -228,15 +227,15 @@ func TestCancelDowngradeIfNeeded(t *testing.T) { tests := []struct { name string memberVersions map[string]*version.Versions - downgrade *membership.DowngradeInfo - expectDowngrade *membership.DowngradeInfo + downgrade *DowngradeInfo + expectDowngrade *DowngradeInfo }{ { name: "No action if there no downgrade in progress", }, { name: "Cancel downgrade if there are no members", - downgrade: &membership.DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, expectDowngrade: nil, }, // Next entries go through all states that should happen during downgrade @@ -253,7 +252,7 @@ func TestCancelDowngradeIfNeeded(t *testing.T) { "a": {Cluster: "3.5.0", Server: "3.5.1"}, "b": {Cluster: "3.5.0", Server: "3.5.2"}, }, - downgrade: &membership.DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, + downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true}, expectDowngrade: nil, }, } @@ -330,7 +329,7 @@ type storageMock struct { memberVersions map[string]*version.Versions clusterVersion *semver.Version storageVersion *semver.Version - downgradeInfo *membership.DowngradeInfo + downgradeInfo *DowngradeInfo locked bool } @@ -348,7 +347,7 @@ func (s *storageMock) GetClusterVersion() *semver.Version { return s.clusterVersion } -func (s *storageMock) GetDowngradeInfo() *membership.DowngradeInfo { +func (s *storageMock) GetDowngradeInfo() *DowngradeInfo { return s.downgradeInfo } diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go index 5178494c2..91a186729 100644 --- a/server/etcdserver/version/version_test.go +++ b/server/etcdserver/version/version_test.go @@ -25,7 +25,6 @@ import ( "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/version" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" ) var ( @@ -100,12 +99,12 @@ func (c *clusterMock) StepMonitors() { type clusterMock struct { clusterVersion semver.Version - downgradeInfo *membership.DowngradeInfo + downgradeInfo *DowngradeInfo members []*memberMock } func (c *clusterMock) DowngradeEnable(ver semver.Version) { - c.downgradeInfo = &membership.DowngradeInfo{TargetVersion: ver.String(), Enabled: true} + c.downgradeInfo = &DowngradeInfo{TargetVersion: ver.String(), Enabled: true} } func (c *clusterMock) MembersVersions() map[string]*version.Versions { @@ -149,7 +148,7 @@ func (m *memberMock) GetClusterVersion() *semver.Version { return &m.cluster.clusterVersion } -func (m *memberMock) GetDowngradeInfo() *membership.DowngradeInfo { +func (m *memberMock) GetDowngradeInfo() *DowngradeInfo { return m.cluster.downgradeInfo } diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 6328413b7..0b9cc0e49 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -20,6 +20,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/storage/backend" "github.com/coreos/go-semver/semver" @@ -152,7 +153,7 @@ func (s *membershipBackend) MustSaveClusterVersionToBackend(ver *semver.Version) // MustSaveDowngradeToBackend saves downgrade info to backend. // The field is populated since etcd v3.5. -func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *membership.DowngradeInfo) { +func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.DowngradeInfo) { dkey := ClusterDowngradeKeyName dvalue, err := json.Marshal(downgrade) if err != nil { @@ -203,7 +204,7 @@ func (s *membershipBackend) ClusterVersionFromBackend() *semver.Version { // DowngradeInfoFromBackend reads downgrade info from backend. // The field is populated since etcd v3.5. -func (s *membershipBackend) DowngradeInfoFromBackend() *membership.DowngradeInfo { +func (s *membershipBackend) DowngradeInfoFromBackend() *version.DowngradeInfo { dkey := ClusterDowngradeKeyName tx := s.be.ReadTx() tx.Lock() @@ -219,7 +220,7 @@ func (s *membershipBackend) DowngradeInfoFromBackend() *membership.DowngradeInfo zap.Int("number-of-key", len(keys)), ) } - var d membership.DowngradeInfo + var d version.DowngradeInfo if err := json.Unmarshal(vals[0], &d); err != nil { s.lg.Panic("failed to unmarshal downgrade information", zap.Error(err)) } From e47c3c22d20e64f139b267b289c21bf483688ab8 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 5 Oct 2021 16:54:52 +0200 Subject: [PATCH 4/8] server: Move downgrade API logic into version package --- server/etcdserver/adapters.go | 26 +++++--- server/etcdserver/api/v3rpc/util.go | 11 +-- server/etcdserver/errors.go | 45 ++++++------- server/etcdserver/server.go | 4 ++ server/etcdserver/v3_server.go | 49 ++------------ server/etcdserver/version/errors.go | 23 +++++++ server/etcdserver/version/monitor.go | 11 ++- server/etcdserver/version/monitor_test.go | 12 +++- server/etcdserver/version/version.go | 81 +++++++++++++++++++++++ server/etcdserver/version/version_test.go | 15 ++++- 10 files changed, 193 insertions(+), 84 deletions(-) create mode 100644 server/etcdserver/version/errors.go create mode 100644 server/etcdserver/version/version.go diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 467992e1f..e5e943d15 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -18,12 +18,14 @@ import ( "context" "github.com/coreos/go-semver/semver" - "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/api/v3/version" serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/schema" ) // serverVersionAdapter implements Server interface needed by serverversion.Monitor @@ -46,12 +48,20 @@ func (s *serverVersionAdapter) UpdateClusterVersion(version string) { s.GoAttach(func() { s.updateClusterVersionV2(version) }) } -func (s *serverVersionAdapter) DowngradeCancel() { - ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - if _, err := s.downgradeCancel(ctx); err != nil { - s.lg.Warn("failed to cancel downgrade", zap.Error(err)) - } - cancel() +func (s *serverVersionAdapter) LinearizableReadNotify(ctx context.Context) error { + return s.linearizableReadNotify(ctx) +} + +func (s *serverVersionAdapter) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error { + raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()} + _, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + return err +} + +func (s *serverVersionAdapter) DowngradeCancel(ctx context.Context) error { + raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false} + _, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + return err } func (s *serverVersionAdapter) GetClusterVersion() *semver.Version { diff --git a/server/etcdserver/api/v3rpc/util.go b/server/etcdserver/api/v3rpc/util.go index f054f29c8..cef6476bc 100644 --- a/server/etcdserver/api/v3rpc/util.go +++ b/server/etcdserver/api/v3rpc/util.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/mvcc" @@ -58,11 +59,11 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt, etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee, - etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable, - etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat, - etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion, - etcdserver.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess, - etcdserver.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade, + etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable, + etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat, + version.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion, + version.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess, + version.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade, lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound, lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist, diff --git a/server/etcdserver/errors.go b/server/etcdserver/errors.go index dc2a85fdd..9d9b07e13 100644 --- a/server/etcdserver/errors.go +++ b/server/etcdserver/errors.go @@ -20,30 +20,27 @@ import ( ) var ( - ErrUnknownMethod = errors.New("etcdserver: unknown method") - ErrStopped = errors.New("etcdserver: server stopped") - ErrCanceled = errors.New("etcdserver: request cancelled") - ErrTimeout = errors.New("etcdserver: request timed out") - ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") - ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") - ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") - ErrLeaderChanged = errors.New("etcdserver: leader changed") - ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") - ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader") - ErrNoLeader = errors.New("etcdserver: no leader") - ErrNotLeader = errors.New("etcdserver: not leader") - ErrRequestTooLarge = errors.New("etcdserver: request is too large") - ErrNoSpace = errors.New("etcdserver: no space") - ErrTooManyRequests = errors.New("etcdserver: too many requests") - ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") - ErrKeyNotFound = errors.New("etcdserver: key not found") - ErrCorrupt = errors.New("etcdserver: corrupt cluster") - ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee") - ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade") - ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format") - ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version") - ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress") - ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job") + ErrUnknownMethod = errors.New("etcdserver: unknown method") + ErrStopped = errors.New("etcdserver: server stopped") + ErrCanceled = errors.New("etcdserver: request cancelled") + ErrTimeout = errors.New("etcdserver: request timed out") + ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") + ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") + ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") + ErrLeaderChanged = errors.New("etcdserver: leader changed") + ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") + ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader") + ErrNoLeader = errors.New("etcdserver: no leader") + ErrNotLeader = errors.New("etcdserver: not leader") + ErrRequestTooLarge = errors.New("etcdserver: request is too large") + ErrNoSpace = errors.New("etcdserver: no space") + ErrTooManyRequests = errors.New("etcdserver: too many requests") + ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") + ErrKeyNotFound = errors.New("etcdserver: key not found") + ErrCorrupt = errors.New("etcdserver: corrupt cluster") + ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee") + ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade") + ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format") ) type DiscoveryError struct { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index bec28e45e..2de2477ec 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2359,3 +2359,7 @@ func (s *EtcdServer) IsMemberExist(id types.ID) bool { func (s *EtcdServer) raftStatus() raft.Status { return s.r.Node.Status() } + +func (s *EtcdServer) Version() *serverversion.Manager { + return serverversion.NewManager(s.Logger(), newServerVersionAdapter(s)) +} diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 8e745b880..9885fc01c 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -23,12 +23,10 @@ import ( "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" - "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease/leasehttp" "go.etcd.io/etcd/server/v3/storage/mvcc" @@ -920,48 +918,27 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg return nil, err } - // gets leaders commit index and wait for local store to finish applying that index - // to avoid using stale downgrade information - err = s.linearizableReadNotify(ctx) - if err != nil { - return nil, err - } - cv := s.ClusterVersion() if cv == nil { return nil, ErrClusterVersionUnavailable } resp.Version = cv.String() - - allowedTargetVersion := version.AllowedDowngradeVersion(cv) - if !targetVersion.Equal(*allowedTargetVersion) { - return nil, ErrInvalidDowngradeTargetVersion + err = s.Version().DowngradeValidate(ctx, targetVersion) + if err != nil { + return nil, err } - downgradeInfo := s.cluster.DowngradeInfo() - if downgradeInfo.Enabled { - // Todo: return the downgrade status along with the error msg - return nil, ErrDowngradeInProcess - } return resp, nil } func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { - // validate downgrade capability before starting downgrade - v := r.Version lg := s.Logger() - if resp, err := s.downgradeValidate(ctx, v); err != nil { - lg.Warn("reject downgrade request", zap.Error(err)) - return resp, err - } - targetVersion, err := convertToClusterVersion(v) + targetVersion, err := convertToClusterVersion(r.Version) if err != nil { lg.Warn("reject downgrade request", zap.Error(err)) return nil, err } - - raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()} - _, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + err = s.Version().DowngradeEnable(ctx, targetVersion) if err != nil { lg.Warn("reject downgrade request", zap.Error(err)) return nil, err @@ -971,21 +948,9 @@ func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest } func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) { - // gets leaders commit index and wait for local store to finish applying that index - // to avoid using stale downgrade information - if err := s.linearizableReadNotify(ctx); err != nil { - return nil, err - } - - downgradeInfo := s.cluster.DowngradeInfo() - if !downgradeInfo.Enabled { - return nil, ErrNoInflightDowngrade - } - - raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false} - _, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + err := s.Version().DowngradeCancel(ctx) if err != nil { - return nil, err + s.lg.Warn("failed to cancel downgrade", zap.Error(err)) } resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()} return &resp, nil diff --git a/server/etcdserver/version/errors.go b/server/etcdserver/version/errors.go new file mode 100644 index 000000000..906aa9f41 --- /dev/null +++ b/server/etcdserver/version/errors.go @@ -0,0 +1,23 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import "errors" + +var ( + ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version") + ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress") + ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job") +) diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 43c78b47a..1a8e73e47 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -15,6 +15,8 @@ package version import ( + "context" + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/api/v3/version" "go.uber.org/zap" @@ -32,7 +34,9 @@ type Server interface { GetDowngradeInfo() *DowngradeInfo GetMembersVersions() map[string]*version.Versions UpdateClusterVersion(string) - DowngradeCancel() + LinearizableReadNotify(ctx context.Context) error + DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error + DowngradeCancel(ctx context.Context) error GetStorageVersion() *semver.Version UpdateStorageVersion(semver.Version) @@ -101,7 +105,10 @@ func (m *Monitor) CancelDowngradeIfNeeded() { v := semver.Must(semver.NewVersion(targetVersion)) if m.versionsMatchTarget(v) { m.lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion)) - m.s.DowngradeCancel() + err := m.s.DowngradeCancel(context.Background()) + if err != nil { + m.lg.Warn("failed to cancel downgrade", zap.Error(err)) + } } } diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index 6bc5e836a..ffc908e7b 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -1,6 +1,7 @@ package version import ( + "context" "reflect" "testing" @@ -339,8 +340,17 @@ func (s *storageMock) UpdateClusterVersion(version string) { s.clusterVersion = semver.New(version) } -func (s *storageMock) DowngradeCancel() { +func (s *storageMock) LinearizableReadNotify(ctx context.Context) error { + return nil +} + +func (s *storageMock) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error { + return nil +} + +func (s *storageMock) DowngradeCancel(ctx context.Context) error { s.downgradeInfo = nil + return nil } func (s *storageMock) GetClusterVersion() *semver.Version { diff --git a/server/etcdserver/version/version.go b/server/etcdserver/version/version.go new file mode 100644 index 000000000..b2c62b15c --- /dev/null +++ b/server/etcdserver/version/version.go @@ -0,0 +1,81 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "context" + + "github.com/coreos/go-semver/semver" + "go.uber.org/zap" +) + +// Manager contains logic to manage etcd cluster version downgrade process. +type Manager struct { + lg *zap.Logger + s Server +} + +// NewManager returns a new manager instance +func NewManager(lg *zap.Logger, s Server) *Manager { + return &Manager{ + lg: lg, + s: s, + } +} + +// DowngradeValidate validates if cluster is downloadable to provided target version and returns error if not. +func (m *Manager) DowngradeValidate(ctx context.Context, targetVersion *semver.Version) error { + // gets leaders commit index and wait for local store to finish applying that index + // to avoid using stale downgrade information + err := m.s.LinearizableReadNotify(ctx) + if err != nil { + return err + } + cv := m.s.GetClusterVersion() + allowedTargetVersion := AllowedDowngradeVersion(cv) + if !targetVersion.Equal(*allowedTargetVersion) { + return ErrInvalidDowngradeTargetVersion + } + + downgradeInfo := m.s.GetDowngradeInfo() + if downgradeInfo != nil && downgradeInfo.Enabled { + // Todo: return the downgrade status along with the error msg + return ErrDowngradeInProcess + } + return nil +} + +// DowngradeEnable initiates etcd cluster version downgrade process. +func (m *Manager) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error { + // validate downgrade capability before starting downgrade + err := m.DowngradeValidate(ctx, targetVersion) + if err != nil { + return err + } + return m.s.DowngradeEnable(context.Background(), targetVersion) +} + +// DowngradeCancel cancels ongoing downgrade process. +func (m *Manager) DowngradeCancel(ctx context.Context) error { + err := m.s.LinearizableReadNotify(ctx) + if err != nil { + return err + } + downgradeInfo := m.s.GetDowngradeInfo() + if !downgradeInfo.Enabled { + return ErrNoInflightDowngrade + } + return m.s.DowngradeCancel(ctx) +} diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go index 91a186729..5dd01d01e 100644 --- a/server/etcdserver/version/version_test.go +++ b/server/etcdserver/version/version_test.go @@ -15,6 +15,7 @@ package version import ( + "context" "fmt" "math/rand" "testing" @@ -73,7 +74,7 @@ func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMoc serverVersion: ver, storageVersion: majorMinVer, } - m.monitor = NewMonitor(lg.Named(fmt.Sprintf("m%d", i)), m) + m.monitor = NewMonitor(lg.Named(fmt.Sprintf("m%d", i)), m) cluster.members = append(cluster.members, m) } cluster.members[0].isLeader = true @@ -140,8 +141,18 @@ func (m *memberMock) UpdateClusterVersion(version string) { m.cluster.clusterVersion = *semver.New(version) } -func (m *memberMock) DowngradeCancel() { +func (m *memberMock) LinearizableReadNotify(ctx context.Context) error { + return nil +} + +func (m *memberMock) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error { m.cluster.downgradeInfo = nil + return nil +} + +func (m *memberMock) DowngradeCancel(context.Context) error { + m.cluster.downgradeInfo = nil + return nil } func (m *memberMock) GetClusterVersion() *semver.Version { From 703df1c49102f1078c5fdef0139bb96347ec2181 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 6 Oct 2021 13:29:07 +0200 Subject: [PATCH 5/8] server: Move wal versioning functions to wal package --- server/storage/schema/schema_test.go | 1 + server/storage/{schema/wal.go => wal/version.go} | 9 ++++----- .../storage/{schema/wal_test.go => wal/version_test.go} | 5 +++-- tests/integration/utl_wal_version_test.go | 7 +++---- 4 files changed, 11 insertions(+), 11 deletions(-) rename server/storage/{schema/wal.go => wal/version.go} (94%) rename server/storage/{schema/wal_test.go => wal/version_test.go} (98%) diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index 11b4d23af..823400760 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -28,6 +28,7 @@ import ( ) var ( + V3_4 = semver.Version{Major: 3, Minor: 4} V3_7 = semver.Version{Major: 3, Minor: 7} ) diff --git a/server/storage/schema/wal.go b/server/storage/wal/version.go similarity index 94% rename from server/storage/schema/wal.go rename to server/storage/wal/version.go index aef7e8484..6e8dbfd7b 100644 --- a/server/storage/schema/wal.go +++ b/server/storage/wal/version.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schema +package wal import ( "fmt" @@ -26,14 +26,13 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/storage/wal" ) -// MinimalStorageVersionFromWAL returns minimal etcd storage able to interpret provided WAL log, +// MinimalEtcdVersion returns minimal etcd able to interpret provided WAL log, // determined by looking at entries since the last snapshot and returning the highest // etcd version annotation from used messages, fields, enums and their values. -func MinimalStorageVersionFromWAL(wal *wal.WAL) *semver.Version { - _, _, ents, err := wal.ReadAll() +func (w *WAL) MinimalEtcdVersion() *semver.Version { + _, _, ents, err := w.ReadAll() if err != nil { panic(err) } diff --git a/server/storage/schema/wal_test.go b/server/storage/wal/version_test.go similarity index 98% rename from server/storage/schema/wal_test.go rename to server/storage/wal/version_test.go index 423ce2a13..5aa83250c 100644 --- a/server/storage/schema/wal_test.go +++ b/server/storage/wal/version_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schema +package wal import ( "fmt" @@ -21,7 +21,6 @@ import ( "github.com/coreos/go-semver/semver" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/pkg/v3/pbutil" @@ -33,6 +32,8 @@ var ( V3_1 = semver.Version{Major: 3, Minor: 1} V3_3 = semver.Version{Major: 3, Minor: 3} V3_4 = semver.Version{Major: 3, Minor: 4} + V3_5 = semver.Version{Major: 3, Minor: 5} + V3_6 = semver.Version{Major: 3, Minor: 6} ) func TestEtcdVersionFromEntry(t *testing.T) { diff --git a/tests/integration/utl_wal_version_test.go b/tests/integration/utl_wal_version_test.go index cbed561bd..deed4a46c 100644 --- a/tests/integration/utl_wal_version_test.go +++ b/tests/integration/utl_wal_version_test.go @@ -26,7 +26,6 @@ import ( "go.etcd.io/etcd/client/pkg/v3/testutil" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" - "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/storage/wal" "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) @@ -59,11 +58,11 @@ func TestEtcdVersionFromWAL(t *testing.T) { cli.Close() srv.Close() - wal, err := wal.Open(zap.NewNop(), cfg.Dir+"/member/wal", walpb.Snapshot{}) + w, err := wal.Open(zap.NewNop(), cfg.Dir+"/member/wal", walpb.Snapshot{}) if err != nil { panic(err) } - defer wal.Close() - ver := schema.MinimalStorageVersionFromWAL(wal) + defer w.Close() + ver := w.MinimalEtcdVersion() assert.Equal(t, &semver.Version{Major: 3, Minor: 5}, ver) } From d039f016c5cecdf7e4d504e8a7dc54d6692022eb Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 6 Oct 2021 13:53:06 +0200 Subject: [PATCH 6/8] server: Move Storage interface to storage package --- server/etcdserver/bootstrap.go | 2 +- server/etcdserver/raft.go | 3 ++- server/{etcdserver => storage}/storage.go | 32 ++++++++++++++++------- 3 files changed, 26 insertions(+), 11 deletions(-) rename server/{etcdserver => storage}/storage.go (79%) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index f1f74fea9..75b791122 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -529,7 +529,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *m Node: n, heartbeat: b.heartbeat, raftStorage: b.storage, - storage: NewStorage(wal, ss), + storage: serverstorage.NewStorage(b.lg, wal, ss), }, ) } diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 4fb8da6ed..69e6a8c21 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" + serverstorage "go.etcd.io/etcd/server/v3/storage" "go.uber.org/zap" ) @@ -102,7 +103,7 @@ type raftNodeConfig struct { isIDRemoved func(id uint64) bool raft.Node raftStorage *raft.MemoryStorage - storage Storage + storage serverstorage.Storage heartbeat time.Duration // for logging // transport specifies the transport to send and receive msgs to members. // Sending messages MUST NOT block. It is okay to drop messages, since diff --git a/server/etcdserver/storage.go b/server/storage/storage.go similarity index 79% rename from server/etcdserver/storage.go rename to server/storage/storage.go index 8170c29db..047d1bb02 100644 --- a/server/etcdserver/storage.go +++ b/server/storage/storage.go @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package storage import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/storage/wal" "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.uber.org/zap" ) type Storage interface { @@ -36,12 +37,13 @@ type Storage interface { } type storage struct { - *wal.WAL - *snap.Snapshotter + lg *zap.Logger + s *snap.Snapshotter + w *wal.WAL } -func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { - return &storage{w, s} +func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage { + return &storage{lg: lg, w: w, s: s} } // SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry. @@ -54,21 +56,33 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { // save the snapshot file before writing the snapshot to the wal. // This makes it possible for the snapshot file to become orphaned, but prevents // a WAL snapshot entry from having no corresponding snapshot file. - err := st.Snapshotter.SaveSnap(snap) + err := st.s.SaveSnap(snap) if err != nil { return err } // gofail: var raftBeforeWALSaveSnaphot struct{} - return st.WAL.SaveSnapshot(walsnap) + return st.w.SaveSnapshot(walsnap) } // Release releases resources older than the given snap and are no longer needed: // - releases the locks to the wal files that are older than the provided wal for the given snap. // - deletes any .snap.db files that are older than the given snap. func (st *storage) Release(snap raftpb.Snapshot) error { - if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil { + if err := st.w.ReleaseLockTo(snap.Metadata.Index); err != nil { return err } - return st.Snapshotter.ReleaseSnapDBs(snap) + return st.s.ReleaseSnapDBs(snap) +} + +func (st *storage) Save(s raftpb.HardState, ents []raftpb.Entry) error { + return st.w.Save(s, ents) +} + +func (st *storage) Close() error { + return st.w.Close() +} + +func (st *storage) Sync() error { + return st.w.Sync() } From 620832a4a54dbf90c269c025a4576214b0526633 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 5 Oct 2021 17:55:36 +0200 Subject: [PATCH 7/8] server: Use panic instead of os.Exit in mustDetectDowngrade --- server/etcdserver/version/downgrade.go | 4 +- server/etcdserver/version/downgrade_test.go | 66 ++++++--------------- 2 files changed, 19 insertions(+), 51 deletions(-) diff --git a/server/etcdserver/version/downgrade.go b/server/etcdserver/version/downgrade.go index 0f55c1b23..f43370455 100644 --- a/server/etcdserver/version/downgrade.go +++ b/server/etcdserver/version/downgrade.go @@ -56,7 +56,7 @@ func MustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) { } return } - lg.Fatal( + lg.Panic( "invalid downgrade; server version is not allowed to join when downgrade is enabled", zap.String("current-server-version", version.Version), zap.String("target-cluster-version", d.TargetVersion), @@ -66,7 +66,7 @@ func MustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) { // if the cluster disables downgrade, check local version against determined cluster version. // the validation passes when local version is not less than cluster version if cv != nil && lv.LessThan(*cv) { - lg.Fatal( + lg.Panic( "invalid downgrade; server version is lower than determined cluster version", zap.String("current-server-version", version.Version), zap.String("determined-cluster-version", version.Cluster(cv.String())), diff --git a/server/etcdserver/version/downgrade_test.go b/server/etcdserver/version/downgrade_test.go index 4d4d8cdc6..e3ba69963 100644 --- a/server/etcdserver/version/downgrade_test.go +++ b/server/etcdserver/version/downgrade_test.go @@ -15,18 +15,13 @@ package version import ( - "bytes" "fmt" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "strconv" "testing" "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/api/v3/version" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) func TestMustDetectDowngrade(t *testing.T) { @@ -112,57 +107,30 @@ func TestMustDetectDowngrade(t *testing.T) { }, } - if os.Getenv("DETECT_DOWNGRADE") != "" { - i := os.Getenv("DETECT_DOWNGRADE") - iint, _ := strconv.Atoi(i) - logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-must-detect-downgrade-%v", iint)) - - lcfg := zap.NewProductionConfig() - lcfg.OutputPaths = []string{logPath} - lcfg.ErrorOutputPaths = []string{logPath} - lg, _ := lcfg.Build() - - MustDetectDowngrade(lg, tests[iint].clusterVersion, tests[iint].downgrade) - return - } - - for i, tt := range tests { + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-must-detect-downgrade-%d", i)) - t.Log(logPath) - defer os.RemoveAll(logPath) + lg := zaptest.NewLogger(t) + err := tryMustDetectDowngrade(lg, tt.clusterVersion, tt.downgrade) - cmd := exec.Command(os.Args[0], "-test.run=TestMustDetectDowngrade") - cmd.Env = append(os.Environ(), fmt.Sprintf("DETECT_DOWNGRADE=%d", i)) - if err := cmd.Start(); err != nil { - t.Fatal(err) + if tt.success != (err == nil) { + t.Errorf("Unexpected status, got %q, wanted: %v", err, tt.success) + // TODO test err } - - errCmd := cmd.Wait() - - data, err := ioutil.ReadFile(logPath) - if err == nil { - if !bytes.Contains(data, []byte(tt.message)) { - t.Errorf("Expected to find %v in log", tt.message) - } - } else { - t.Fatal(err) - } - - if !tt.success { - e, ok := errCmd.(*exec.ExitError) - if !ok || e.Success() { - t.Errorf("Expected exit with status 1; Got %v", err) - } - } - - if tt.success && errCmd != nil { - t.Errorf("Expected not failure; Got %v", errCmd) + if err != nil && tt.message != fmt.Sprintf("%s", err) { + t.Errorf("Unexpected message, got %q, wanted: %v", err, tt.message) } }) } } +func tryMustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) (err interface{}) { + defer func() { + err = recover() + }() + MustDetectDowngrade(lg, cv, d) + return err +} + func TestIsValidDowngrade(t *testing.T) { tests := []struct { name string From f92b4f9a2827e5d2bc3ef96f4385dc957c2531de Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 5 Oct 2021 17:21:20 +0200 Subject: [PATCH 8/8] server: Integrate version validation logic into tests --- server/etcdserver/api/membership/cluster.go | 6 ++++-- server/etcdserver/version/downgrade.go | 19 +++++++++---------- server/etcdserver/version/downgrade_test.go | 7 ++++--- server/etcdserver/version/version.go | 2 +- server/etcdserver/version/version_test.go | 15 +++++++++------ 5 files changed, 27 insertions(+), 22 deletions(-) diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index ad6633ebd..f1b41ee20 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -273,7 +273,8 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { if c.downgradeInfo != nil { d = &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} } - serverversion.MustDetectDowngrade(c.lg, c.version, d) + sv := semver.Must(semver.NewVersion(version.Version)) + serverversion.MustDetectDowngrade(c.lg, sv, c.version, d) onSet(c.lg, c.version) for _, m := range c.members { @@ -541,7 +542,8 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s } oldVer := c.version c.version = ver - serverversion.MustDetectDowngrade(c.lg, c.version, c.downgradeInfo) + sv := semver.Must(semver.NewVersion(version.Version)) + serverversion.MustDetectDowngrade(c.lg, sv, c.version, c.downgradeInfo) if c.v2store != nil { mustSaveClusterVersionToStore(c.lg, c.v2store, ver) } diff --git a/server/etcdserver/version/downgrade.go b/server/etcdserver/version/downgrade.go index f43370455..d70fd63ac 100644 --- a/server/etcdserver/version/downgrade.go +++ b/server/etcdserver/version/downgrade.go @@ -34,47 +34,46 @@ func (d *DowngradeInfo) GetTargetVersion() *semver.Version { // isValidDowngrade verifies whether the cluster can be downgraded from verFrom to verTo func isValidDowngrade(verFrom *semver.Version, verTo *semver.Version) bool { - return verTo.Equal(*AllowedDowngradeVersion(verFrom)) + return verTo.Equal(*allowedDowngradeVersion(verFrom)) } // MustDetectDowngrade will detect unexpected downgrade when the local server is recovered. -func MustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) { - lv := semver.Must(semver.NewVersion(version.Version)) +func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) { // only keep major.minor version for comparison against cluster version - lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} + sv = &semver.Version{Major: sv.Major, Minor: sv.Minor} // if the cluster enables downgrade, check local version against downgrade target version. if d != nil && d.Enabled && d.TargetVersion != "" { - if lv.Equal(*d.GetTargetVersion()) { + if sv.Equal(*d.GetTargetVersion()) { if cv != nil { lg.Info( "cluster is downgrading to target version", zap.String("target-cluster-version", d.TargetVersion), zap.String("determined-cluster-version", version.Cluster(cv.String())), - zap.String("current-server-version", version.Version), + zap.String("current-server-version", sv.String()), ) } return } lg.Panic( "invalid downgrade; server version is not allowed to join when downgrade is enabled", - zap.String("current-server-version", version.Version), + zap.String("current-server-version", sv.String()), zap.String("target-cluster-version", d.TargetVersion), ) } // if the cluster disables downgrade, check local version against determined cluster version. // the validation passes when local version is not less than cluster version - if cv != nil && lv.LessThan(*cv) { + if cv != nil && sv.LessThan(*cv) { lg.Panic( "invalid downgrade; server version is lower than determined cluster version", - zap.String("current-server-version", version.Version), + zap.String("current-server-version", sv.String()), zap.String("determined-cluster-version", version.Cluster(cv.String())), ) } } -func AllowedDowngradeVersion(ver *semver.Version) *semver.Version { +func allowedDowngradeVersion(ver *semver.Version) *semver.Version { // Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x) return &semver.Version{Major: ver.Major, Minor: ver.Minor - 1} } diff --git a/server/etcdserver/version/downgrade_test.go b/server/etcdserver/version/downgrade_test.go index e3ba69963..97c8d2125 100644 --- a/server/etcdserver/version/downgrade_test.go +++ b/server/etcdserver/version/downgrade_test.go @@ -110,7 +110,8 @@ func TestMustDetectDowngrade(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { lg := zaptest.NewLogger(t) - err := tryMustDetectDowngrade(lg, tt.clusterVersion, tt.downgrade) + sv := semver.Must(semver.NewVersion(version.Version)) + err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion, tt.downgrade) if tt.success != (err == nil) { t.Errorf("Unexpected status, got %q, wanted: %v", err, tt.success) @@ -123,11 +124,11 @@ func TestMustDetectDowngrade(t *testing.T) { } } -func tryMustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) (err interface{}) { +func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) (err interface{}) { defer func() { err = recover() }() - MustDetectDowngrade(lg, cv, d) + MustDetectDowngrade(lg, sv, cv, d) return err } diff --git a/server/etcdserver/version/version.go b/server/etcdserver/version/version.go index b2c62b15c..eb9a370c1 100644 --- a/server/etcdserver/version/version.go +++ b/server/etcdserver/version/version.go @@ -44,7 +44,7 @@ func (m *Manager) DowngradeValidate(ctx context.Context, targetVersion *semver.V return err } cv := m.s.GetClusterVersion() - allowedTargetVersion := AllowedDowngradeVersion(cv) + allowedTargetVersion := allowedDowngradeVersion(cv) if !targetVersion.Equal(*allowedTargetVersion) { return ErrInvalidDowngradeTargetVersion } diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go index 5dd01d01e..6beaec0b3 100644 --- a/server/etcdserver/version/version_test.go +++ b/server/etcdserver/version/version_test.go @@ -64,6 +64,7 @@ func TestUpgradeThreeNodes(t *testing.T) { func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMock { cluster := &clusterMock{ + lg: lg, clusterVersion: ver, members: make([]*memberMock, 0, memberCount), } @@ -99,13 +100,14 @@ func (c *clusterMock) StepMonitors() { } type clusterMock struct { + lg *zap.Logger clusterVersion semver.Version downgradeInfo *DowngradeInfo members []*memberMock } -func (c *clusterMock) DowngradeEnable(ver semver.Version) { - c.downgradeInfo = &DowngradeInfo{TargetVersion: ver.String(), Enabled: true} +func (c *clusterMock) Version() *Manager { + return NewManager(c.lg, c.members[0]) } func (c *clusterMock) MembersVersions() map[string]*version.Versions { @@ -120,9 +122,7 @@ func (c *clusterMock) MembersVersions() map[string]*version.Versions { } func (c *clusterMock) ReplaceMemberBinary(mid int, newServerVersion semver.Version) { - if newServerVersion.LessThan(c.clusterVersion) { - panic("Members cannot join clusters with higher version") - } + MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion, c.downgradeInfo) c.members[mid].serverVersion = newServerVersion } @@ -146,7 +146,10 @@ func (m *memberMock) LinearizableReadNotify(ctx context.Context) error { } func (m *memberMock) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error { - m.cluster.downgradeInfo = nil + m.cluster.downgradeInfo = &DowngradeInfo{ + TargetVersion: targetVersion.String(), + Enabled: true, + } return nil }