mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Depend only on cluster version to detect downgrade
Problem with old code was that during downgrade only members with downgrade target version were allowed to join. This is unrealistic as it doesn't handle any members to disconnect/rejoin.
This commit is contained in:
parent
11f7729660
commit
758fc0f8ad
@ -18,7 +18,6 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.uber.org/zap"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/membershippb"
|
||||
@ -88,15 +87,12 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
|
||||
return &v
|
||||
}
|
||||
|
||||
func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) {
|
||||
func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error {
|
||||
if s.tx == nil {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
}
|
||||
err := schema.UnsafeMigrate(s.lg, s.tx, target)
|
||||
if err != nil {
|
||||
s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err))
|
||||
}
|
||||
return schema.UnsafeMigrate(s.lg, s.tx, target)
|
||||
}
|
||||
|
||||
func (s *serverVersionAdapter) Lock() {
|
||||
|
@ -271,12 +271,15 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
|
||||
if c.be != nil {
|
||||
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
|
||||
}
|
||||
d := &serverversion.DowngradeInfo{Enabled: false}
|
||||
if c.downgradeInfo != nil {
|
||||
d = &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
|
||||
}
|
||||
sv := semver.Must(semver.NewVersion(version.Version))
|
||||
serverversion.MustDetectDowngrade(c.lg, sv, c.version, d)
|
||||
if c.downgradeInfo != nil && c.downgradeInfo.Enabled {
|
||||
c.lg.Info(
|
||||
"cluster is downgrading to target version",
|
||||
zap.String("target-cluster-version", c.downgradeInfo.TargetVersion),
|
||||
zap.String("current-server-version", sv.String()),
|
||||
)
|
||||
}
|
||||
serverversion.MustDetectDowngrade(c.lg, sv, c.version)
|
||||
onSet(c.lg, c.version)
|
||||
|
||||
for _, m := range c.members {
|
||||
@ -548,7 +551,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
|
||||
oldVer := c.version
|
||||
c.version = ver
|
||||
sv := semver.Must(semver.NewVersion(version.Version))
|
||||
serverversion.MustDetectDowngrade(c.lg, sv, c.version, c.downgradeInfo)
|
||||
serverversion.MustDetectDowngrade(c.lg, sv, c.version)
|
||||
if c.v2store != nil {
|
||||
mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
|
||||
}
|
||||
@ -759,14 +762,6 @@ func (c *RaftCluster) SetDowngradeInfo(d *serverversion.DowngradeInfo, shouldApp
|
||||
}
|
||||
|
||||
c.downgradeInfo = d
|
||||
|
||||
if d.Enabled {
|
||||
c.lg.Info(
|
||||
"The server is ready to downgrade",
|
||||
zap.String("target-version", d.TargetVersion),
|
||||
zap.String("server-version", version.Version),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// IsMemberExist returns if the member with the given id exists in cluster.
|
||||
|
@ -37,31 +37,11 @@ func isValidDowngrade(verFrom *semver.Version, verTo *semver.Version) bool {
|
||||
return verTo.Equal(*allowedDowngradeVersion(verFrom))
|
||||
}
|
||||
|
||||
// MustDetectDowngrade will detect unexpected downgrade when the local server is recovered.
|
||||
func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) {
|
||||
// MustDetectDowngrade will detect local server joining cluster that doesn't support it's version.
|
||||
func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) {
|
||||
// only keep major.minor version for comparison against cluster version
|
||||
sv = &semver.Version{Major: sv.Major, Minor: sv.Minor}
|
||||
|
||||
// if the cluster enables downgrade, check local version against downgrade target version.
|
||||
if d != nil && d.Enabled && d.TargetVersion != "" {
|
||||
if sv.Equal(*d.GetTargetVersion()) {
|
||||
if cv != nil {
|
||||
lg.Info(
|
||||
"cluster is downgrading to target version",
|
||||
zap.String("target-cluster-version", d.TargetVersion),
|
||||
zap.String("determined-cluster-version", version.Cluster(cv.String())),
|
||||
zap.String("current-server-version", sv.String()),
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
lg.Panic(
|
||||
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
|
||||
zap.String("current-server-version", sv.String()),
|
||||
zap.String("target-cluster-version", d.TargetVersion),
|
||||
)
|
||||
}
|
||||
|
||||
// if the cluster disables downgrade, check local version against determined cluster version.
|
||||
// the validation passes when local version is not less than cluster version
|
||||
if cv != nil && sv.LessThan(*cv) {
|
||||
|
@ -29,92 +29,47 @@ func TestMustDetectDowngrade(t *testing.T) {
|
||||
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
|
||||
oneMinorHigher := &semver.Version{Major: lv.Major, Minor: lv.Minor + 1}
|
||||
oneMinorLower := &semver.Version{Major: lv.Major, Minor: lv.Minor - 1}
|
||||
downgradeEnabledHigherVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorHigher.String()}
|
||||
downgradeEnabledEqualVersion := &DowngradeInfo{Enabled: true, TargetVersion: lv.String()}
|
||||
downgradeEnabledLowerVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorLower.String()}
|
||||
downgradeDisabled := &DowngradeInfo{Enabled: false}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
clusterVersion *semver.Version
|
||||
downgrade *DowngradeInfo
|
||||
success bool
|
||||
message string
|
||||
}{
|
||||
{
|
||||
"Succeeded when downgrade is disabled and cluster version is nil",
|
||||
"Succeeded when cluster version is nil",
|
||||
nil,
|
||||
downgradeDisabled,
|
||||
true,
|
||||
"",
|
||||
},
|
||||
{
|
||||
"Succeeded when downgrade is disabled and cluster version is one minor lower",
|
||||
"Succeeded when cluster version is one minor lower",
|
||||
oneMinorLower,
|
||||
downgradeDisabled,
|
||||
true,
|
||||
"",
|
||||
},
|
||||
{
|
||||
"Succeeded when downgrade is disabled and cluster version is server version",
|
||||
"Succeeded when cluster version is server version",
|
||||
lv,
|
||||
downgradeDisabled,
|
||||
true,
|
||||
"",
|
||||
},
|
||||
{
|
||||
"Failed when downgrade is disabled and server version is lower than determined cluster version ",
|
||||
"Failed when server version is lower than determined cluster version ",
|
||||
oneMinorHigher,
|
||||
downgradeDisabled,
|
||||
false,
|
||||
"invalid downgrade; server version is lower than determined cluster version",
|
||||
},
|
||||
{
|
||||
"Succeeded when downgrade is enabled and cluster version is nil",
|
||||
nil,
|
||||
downgradeEnabledEqualVersion,
|
||||
true,
|
||||
"",
|
||||
},
|
||||
{
|
||||
"Failed when downgrade is enabled and server version is target version",
|
||||
lv,
|
||||
downgradeEnabledEqualVersion,
|
||||
true,
|
||||
"cluster is downgrading to target version",
|
||||
},
|
||||
{
|
||||
"Succeeded when downgrade to lower version and server version is cluster version ",
|
||||
lv,
|
||||
downgradeEnabledLowerVersion,
|
||||
false,
|
||||
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
|
||||
},
|
||||
{
|
||||
"Failed when downgrade is enabled and local version is out of range and cluster version is nil",
|
||||
nil,
|
||||
downgradeEnabledHigherVersion,
|
||||
false,
|
||||
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
|
||||
},
|
||||
|
||||
{
|
||||
"Failed when downgrade is enabled and local version is out of range",
|
||||
lv,
|
||||
downgradeEnabledHigherVersion,
|
||||
false,
|
||||
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
sv := semver.Must(semver.NewVersion(version.Version))
|
||||
err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion, tt.downgrade)
|
||||
err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion)
|
||||
|
||||
if tt.success != (err == nil) {
|
||||
t.Errorf("Unexpected status, got %q, wanted: %v", err, tt.success)
|
||||
t.Errorf("Unexpected success, got: %v, wanted: %v", err == nil, tt.success)
|
||||
// TODO test err
|
||||
}
|
||||
if err != nil && tt.message != fmt.Sprintf("%s", err) {
|
||||
@ -124,11 +79,11 @@ func TestMustDetectDowngrade(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) (err interface{}) {
|
||||
func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) (err interface{}) {
|
||||
defer func() {
|
||||
err = recover()
|
||||
}()
|
||||
MustDetectDowngrade(lg, sv, cv, d)
|
||||
MustDetectDowngrade(lg, sv, cv)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,7 @@ type Server interface {
|
||||
DowngradeCancel(ctx context.Context) error
|
||||
|
||||
GetStorageVersion() *semver.Version
|
||||
UpdateStorageVersion(semver.Version)
|
||||
UpdateStorageVersion(semver.Version) error
|
||||
|
||||
Lock()
|
||||
Unlock()
|
||||
@ -61,18 +61,35 @@ func (m *Monitor) UpdateClusterVersionIfNeeded() {
|
||||
}
|
||||
}
|
||||
|
||||
// decideClusterVersion decides the cluster version based on the members versions if all members agree on a higher one.
|
||||
// decideClusterVersion decides whether to change cluster version and its next value.
|
||||
// New cluster version is based on the members versions server and whether cluster is downgrading.
|
||||
// Returns nil if cluster version should be left unchanged.
|
||||
func (m *Monitor) decideClusterVersion() *semver.Version {
|
||||
clusterVersion := m.s.GetClusterVersion()
|
||||
membersMinimalVersion := m.membersMinimalVersion()
|
||||
minimalServerVersion := m.membersMinimalServerVersion()
|
||||
if clusterVersion == nil {
|
||||
if membersMinimalVersion != nil {
|
||||
return membersMinimalVersion
|
||||
if minimalServerVersion != nil {
|
||||
return minimalServerVersion
|
||||
}
|
||||
return semver.New(version.MinClusterVersion)
|
||||
}
|
||||
if membersMinimalVersion != nil && clusterVersion.LessThan(*membersMinimalVersion) && IsValidVersionChange(clusterVersion, membersMinimalVersion) {
|
||||
return membersMinimalVersion
|
||||
if minimalServerVersion == nil {
|
||||
return nil
|
||||
}
|
||||
downgrade := m.s.GetDowngradeInfo()
|
||||
if downgrade != nil && downgrade.Enabled {
|
||||
if IsValidVersionChange(clusterVersion, downgrade.GetTargetVersion()) && IsValidVersionChange(minimalServerVersion, downgrade.GetTargetVersion()) {
|
||||
return downgrade.GetTargetVersion()
|
||||
}
|
||||
m.lg.Error("Cannot downgrade cluster version, version change is not valid",
|
||||
zap.String("downgrade-version", downgrade.TargetVersion),
|
||||
zap.String("cluster-version", clusterVersion.String()),
|
||||
zap.String("minimal-server-version", minimalServerVersion.String()),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
if clusterVersion.LessThan(*minimalServerVersion) && IsValidVersionChange(clusterVersion, minimalServerVersion) {
|
||||
return minimalServerVersion
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -91,7 +108,19 @@ func (m *Monitor) UpdateStorageVersionIfNeeded() {
|
||||
if sv != nil {
|
||||
m.lg.Info("storage version differs from storage version.", zap.String("cluster-version", cv.String()), zap.String("storage-version", sv.String()))
|
||||
}
|
||||
m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor})
|
||||
err := m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor})
|
||||
if err != nil {
|
||||
m.lg.Error("failed update storage version", zap.String("cluster-version", cv.String()), zap.Error(err))
|
||||
return
|
||||
}
|
||||
d := m.s.GetDowngradeInfo()
|
||||
if d != nil && d.Enabled {
|
||||
m.lg.Info(
|
||||
"The server is ready to downgrade",
|
||||
zap.String("target-version", d.TargetVersion),
|
||||
zap.String("server-version", version.Version),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,11 +141,11 @@ func (m *Monitor) CancelDowngradeIfNeeded() {
|
||||
}
|
||||
}
|
||||
|
||||
// membersMinimalVersion returns the min server version in the map, or nil if the min
|
||||
// membersMinimalServerVersion returns the min server version in the map, or nil if the min
|
||||
// version in unknown.
|
||||
// It prints out log if there is a member with a higher version than the
|
||||
// local version.
|
||||
func (m *Monitor) membersMinimalVersion() *semver.Version {
|
||||
func (m *Monitor) membersMinimalServerVersion() *semver.Version {
|
||||
vers := m.s.GetMembersVersions()
|
||||
var minV *semver.Version
|
||||
lv := semver.Must(semver.NewVersion(version.Version))
|
||||
|
@ -50,7 +50,7 @@ func TestMemberMinimalVersion(t *testing.T) {
|
||||
monitor := NewMonitor(zaptest.NewLogger(t), &storageMock{
|
||||
memberVersions: tt.memberVersions,
|
||||
})
|
||||
minV := monitor.membersMinimalVersion()
|
||||
minV := monitor.membersMinimalServerVersion()
|
||||
if !reflect.DeepEqual(minV, tt.wantVersion) {
|
||||
t.Errorf("#%d: ver = %+v, want %+v", i, minV, tt.wantVersion)
|
||||
}
|
||||
@ -204,6 +204,36 @@ func TestUpdateClusterVersionIfNeeded(t *testing.T) {
|
||||
clusterVersion: &V3_5,
|
||||
expectClusterVersion: &V3_6,
|
||||
},
|
||||
{
|
||||
name: "Should downgrade cluster version if downgrade is set to allow older members to join",
|
||||
memberVersions: map[string]*version.Versions{
|
||||
"a": {Cluster: "3.6.0", Server: "3.6.0"},
|
||||
"b": {Cluster: "3.6.0", Server: "3.6.0"},
|
||||
},
|
||||
clusterVersion: &V3_6,
|
||||
downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
|
||||
expectClusterVersion: &V3_5,
|
||||
},
|
||||
{
|
||||
name: "Should maintain downgrade target version to allow older members to join",
|
||||
memberVersions: map[string]*version.Versions{
|
||||
"a": {Cluster: "3.5.0", Server: "3.6.0"},
|
||||
"b": {Cluster: "3.5.0", Server: "3.6.0"},
|
||||
},
|
||||
clusterVersion: &V3_5,
|
||||
downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
|
||||
expectClusterVersion: &V3_5,
|
||||
},
|
||||
{
|
||||
name: "Don't downgrade below supported range",
|
||||
memberVersions: map[string]*version.Versions{
|
||||
"a": {Cluster: "3.5.0", Server: "3.6.0"},
|
||||
"b": {Cluster: "3.5.0", Server: "3.6.0"},
|
||||
},
|
||||
clusterVersion: &V3_5,
|
||||
downgrade: &DowngradeInfo{TargetVersion: "3.4.0", Enabled: true},
|
||||
expectClusterVersion: &V3_5,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
@ -369,8 +399,9 @@ func (s *storageMock) GetStorageVersion() *semver.Version {
|
||||
return s.storageVersion
|
||||
}
|
||||
|
||||
func (s *storageMock) UpdateStorageVersion(v semver.Version) {
|
||||
func (s *storageMock) UpdateStorageVersion(v semver.Version) error {
|
||||
s.storageVersion = &v
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *storageMock) Lock() {
|
||||
|
@ -62,6 +62,68 @@ func TestUpgradeThreeNodes(t *testing.T) {
|
||||
assert.Equal(t, newCluster(lg, 3, V3_7), c)
|
||||
}
|
||||
|
||||
func TestDowngradeSingleNode(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
c := newCluster(lg, 1, V3_6)
|
||||
c.StepMonitors()
|
||||
assert.Equal(t, newCluster(lg, 1, V3_6), c)
|
||||
|
||||
assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5))
|
||||
c.StepMonitors()
|
||||
assert.Equal(t, V3_5, c.clusterVersion)
|
||||
|
||||
c.ReplaceMemberBinary(0, V3_5)
|
||||
c.StepMonitors()
|
||||
|
||||
assert.Equal(t, newCluster(lg, 1, V3_5), c)
|
||||
}
|
||||
|
||||
func TestDowngradeThreeNode(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
c := newCluster(lg, 3, V3_6)
|
||||
c.StepMonitors()
|
||||
assert.Equal(t, newCluster(lg, 3, V3_6), c)
|
||||
|
||||
assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5))
|
||||
c.StepMonitors()
|
||||
assert.Equal(t, V3_5, c.clusterVersion)
|
||||
|
||||
c.ReplaceMemberBinary(0, V3_5)
|
||||
c.StepMonitors()
|
||||
c.ReplaceMemberBinary(1, V3_5)
|
||||
c.StepMonitors()
|
||||
c.ReplaceMemberBinary(2, V3_5)
|
||||
c.StepMonitors()
|
||||
|
||||
assert.Equal(t, newCluster(lg, 3, V3_5), c)
|
||||
}
|
||||
|
||||
func TestNewerMemberCanReconnectDuringDowngrade(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
c := newCluster(lg, 3, V3_6)
|
||||
c.StepMonitors()
|
||||
assert.Equal(t, newCluster(lg, 3, V3_6), c)
|
||||
|
||||
assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5))
|
||||
c.StepMonitors()
|
||||
assert.Equal(t, V3_5, c.clusterVersion)
|
||||
|
||||
c.ReplaceMemberBinary(0, V3_5)
|
||||
c.StepMonitors()
|
||||
|
||||
c.MemberCrashes(2)
|
||||
c.StepMonitors()
|
||||
c.MemberReconnects(2)
|
||||
c.StepMonitors()
|
||||
|
||||
c.ReplaceMemberBinary(1, V3_5)
|
||||
c.StepMonitors()
|
||||
c.ReplaceMemberBinary(2, V3_5)
|
||||
c.StepMonitors()
|
||||
|
||||
assert.Equal(t, newCluster(lg, 3, V3_5), c)
|
||||
}
|
||||
|
||||
func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMock {
|
||||
cluster := &clusterMock{
|
||||
lg: lg,
|
||||
@ -71,6 +133,7 @@ func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMoc
|
||||
majorMinVer := semver.Version{Major: ver.Major, Minor: ver.Minor}
|
||||
for i := 0; i < memberCount; i++ {
|
||||
m := &memberMock{
|
||||
isRunning: true,
|
||||
cluster: cluster,
|
||||
serverVersion: ver,
|
||||
storageVersion: majorMinVer,
|
||||
@ -113,22 +176,34 @@ func (c *clusterMock) Version() *Manager {
|
||||
func (c *clusterMock) MembersVersions() map[string]*version.Versions {
|
||||
result := map[string]*version.Versions{}
|
||||
for i, m := range c.members {
|
||||
result[fmt.Sprintf("%d", i)] = &version.Versions{
|
||||
Server: m.serverVersion.String(),
|
||||
Cluster: c.clusterVersion.String(),
|
||||
if m.isRunning {
|
||||
result[fmt.Sprintf("%d", i)] = &version.Versions{
|
||||
Server: m.serverVersion.String(),
|
||||
Cluster: c.clusterVersion.String(),
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (c *clusterMock) ReplaceMemberBinary(mid int, newServerVersion semver.Version) {
|
||||
MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion, c.downgradeInfo)
|
||||
MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion)
|
||||
c.members[mid].serverVersion = newServerVersion
|
||||
}
|
||||
|
||||
func (c *clusterMock) MemberCrashes(mid int) {
|
||||
c.members[mid].isRunning = false
|
||||
}
|
||||
|
||||
func (c *clusterMock) MemberReconnects(mid int) {
|
||||
MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion)
|
||||
c.members[mid].isRunning = true
|
||||
}
|
||||
|
||||
type memberMock struct {
|
||||
cluster *clusterMock
|
||||
|
||||
isRunning bool
|
||||
isLeader bool
|
||||
serverVersion semver.Version
|
||||
storageVersion semver.Version
|
||||
@ -174,8 +249,9 @@ func (m *memberMock) GetStorageVersion() *semver.Version {
|
||||
return &m.storageVersion
|
||||
}
|
||||
|
||||
func (m *memberMock) UpdateStorageVersion(v semver.Version) {
|
||||
func (m *memberMock) UpdateStorageVersion(v semver.Version) error {
|
||||
m.storageVersion = v
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memberMock) TriggerSnapshot() {
|
||||
|
Loading…
x
Reference in New Issue
Block a user