Merge pull request #13405 from serathius/downgrade-b

Implement single node downgrades
This commit is contained in:
Piotr Tabor 2021-10-29 23:22:10 +02:00 committed by GitHub
commit 6c2f5dc78a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 647 additions and 241 deletions

View File

@ -26,6 +26,8 @@ import (
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
)
// NewMigrateCommand prints out the version of etcd.
@ -90,12 +92,24 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(dbPath)
walPath := datadir.ToWalDir(o.dataDir)
w, err := wal.OpenForRead(GetLogger(), walPath, walpb.Snapshot{})
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %v`, err)
}
defer w.Close()
c.walVersion, err = wal.ReadWALVersion(w)
if err != nil {
return nil, fmt.Errorf(`failed to read wal: %v`, err)
}
return c, nil
}
type migrateConfig struct {
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
force bool
}
@ -112,7 +126,7 @@ func migrateCommandFunc(c *migrateConfig) error {
lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}
err = schema.Migrate(lg, tx, *c.targetVersion)
err = schema.Migrate(lg, tx, c.walVersion, *c.targetVersion)
if err != nil {
if !c.force {
return err

View File

@ -18,34 +18,29 @@ import (
"context"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/api/v3/version"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
)
// serverVersionAdapter implements Server interface needed by serverversion.Monitor
type serverVersionAdapter struct {
*EtcdServer
tx backend.BatchTx
}
func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter {
return &serverVersionAdapter{
EtcdServer: s,
tx: nil,
}
}
var _ serverversion.Server = (*serverVersionAdapter)(nil)
func (s *serverVersionAdapter) UpdateClusterVersion(version string) {
// TODO switch to updateClusterVersionV3 in 3.6
s.GoAttach(func() { s.updateClusterVersionV2(version) })
s.GoAttach(func() { s.updateClusterVersionV3(version) })
}
func (s *serverVersionAdapter) LinearizableReadNotify(ctx context.Context) error {
@ -77,34 +72,19 @@ func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions
}
func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
if s.tx == nil {
s.Lock()
defer s.Unlock()
}
v, err := schema.UnsafeDetectSchemaVersion(s.lg, s.tx)
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx)
if err != nil {
return nil
}
return &v
}
func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) {
if s.tx == nil {
s.Lock()
defer s.Unlock()
}
err := schema.UnsafeMigrate(s.lg, s.tx, target)
if err != nil {
s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err))
}
}
func (s *serverVersionAdapter) Lock() {
s.tx = s.be.BatchTx()
s.tx.Lock()
}
func (s *serverVersionAdapter) Unlock() {
s.tx.Unlock()
s.tx = nil
func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target)
}

View File

@ -271,12 +271,15 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
if c.be != nil {
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
d := &serverversion.DowngradeInfo{Enabled: false}
if c.downgradeInfo != nil {
d = &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
}
sv := semver.Must(semver.NewVersion(version.Version))
serverversion.MustDetectDowngrade(c.lg, sv, c.version, d)
if c.downgradeInfo != nil && c.downgradeInfo.Enabled {
c.lg.Info(
"cluster is downgrading to target version",
zap.String("target-cluster-version", c.downgradeInfo.TargetVersion),
zap.String("current-server-version", sv.String()),
)
}
serverversion.MustDetectDowngrade(c.lg, sv, c.version)
onSet(c.lg, c.version)
for _, m := range c.members {
@ -548,7 +551,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
oldVer := c.version
c.version = ver
sv := semver.Must(semver.NewVersion(version.Version))
serverversion.MustDetectDowngrade(c.lg, sv, c.version, c.downgradeInfo)
serverversion.MustDetectDowngrade(c.lg, sv, c.version)
if c.v2store != nil {
mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
}
@ -759,14 +762,6 @@ func (c *RaftCluster) SetDowngradeInfo(d *serverversion.DowngradeInfo, shouldApp
}
c.downgradeInfo = d
if d.Enabled {
c.lg.Info(
"The server is ready to downgrade",
zap.String("target-version", d.TargetVersion),
zap.String("server-version", version.Version),
)
}
}
// IsMemberExist returns if the member with the given id exists in cluster.

View File

@ -37,31 +37,11 @@ func isValidDowngrade(verFrom *semver.Version, verTo *semver.Version) bool {
return verTo.Equal(*allowedDowngradeVersion(verFrom))
}
// MustDetectDowngrade will detect unexpected downgrade when the local server is recovered.
func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) {
// MustDetectDowngrade will detect local server joining cluster that doesn't support it's version.
func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) {
// only keep major.minor version for comparison against cluster version
sv = &semver.Version{Major: sv.Major, Minor: sv.Minor}
// if the cluster enables downgrade, check local version against downgrade target version.
if d != nil && d.Enabled && d.TargetVersion != "" {
if sv.Equal(*d.GetTargetVersion()) {
if cv != nil {
lg.Info(
"cluster is downgrading to target version",
zap.String("target-cluster-version", d.TargetVersion),
zap.String("determined-cluster-version", version.Cluster(cv.String())),
zap.String("current-server-version", sv.String()),
)
}
return
}
lg.Panic(
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
zap.String("current-server-version", sv.String()),
zap.String("target-cluster-version", d.TargetVersion),
)
}
// if the cluster disables downgrade, check local version against determined cluster version.
// the validation passes when local version is not less than cluster version
if cv != nil && sv.LessThan(*cv) {

View File

@ -29,92 +29,47 @@ func TestMustDetectDowngrade(t *testing.T) {
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
oneMinorHigher := &semver.Version{Major: lv.Major, Minor: lv.Minor + 1}
oneMinorLower := &semver.Version{Major: lv.Major, Minor: lv.Minor - 1}
downgradeEnabledHigherVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorHigher.String()}
downgradeEnabledEqualVersion := &DowngradeInfo{Enabled: true, TargetVersion: lv.String()}
downgradeEnabledLowerVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorLower.String()}
downgradeDisabled := &DowngradeInfo{Enabled: false}
tests := []struct {
name string
clusterVersion *semver.Version
downgrade *DowngradeInfo
success bool
message string
}{
{
"Succeeded when downgrade is disabled and cluster version is nil",
"Succeeded when cluster version is nil",
nil,
downgradeDisabled,
true,
"",
},
{
"Succeeded when downgrade is disabled and cluster version is one minor lower",
"Succeeded when cluster version is one minor lower",
oneMinorLower,
downgradeDisabled,
true,
"",
},
{
"Succeeded when downgrade is disabled and cluster version is server version",
"Succeeded when cluster version is server version",
lv,
downgradeDisabled,
true,
"",
},
{
"Failed when downgrade is disabled and server version is lower than determined cluster version ",
"Failed when server version is lower than determined cluster version ",
oneMinorHigher,
downgradeDisabled,
false,
"invalid downgrade; server version is lower than determined cluster version",
},
{
"Succeeded when downgrade is enabled and cluster version is nil",
nil,
downgradeEnabledEqualVersion,
true,
"",
},
{
"Failed when downgrade is enabled and server version is target version",
lv,
downgradeEnabledEqualVersion,
true,
"cluster is downgrading to target version",
},
{
"Succeeded when downgrade to lower version and server version is cluster version ",
lv,
downgradeEnabledLowerVersion,
false,
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
},
{
"Failed when downgrade is enabled and local version is out of range and cluster version is nil",
nil,
downgradeEnabledHigherVersion,
false,
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
},
{
"Failed when downgrade is enabled and local version is out of range",
lv,
downgradeEnabledHigherVersion,
false,
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
sv := semver.Must(semver.NewVersion(version.Version))
err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion, tt.downgrade)
err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion)
if tt.success != (err == nil) {
t.Errorf("Unexpected status, got %q, wanted: %v", err, tt.success)
t.Errorf("Unexpected success, got: %v, wanted: %v", err == nil, tt.success)
// TODO test err
}
if err != nil && tt.message != fmt.Sprintf("%s", err) {
@ -124,11 +79,11 @@ func TestMustDetectDowngrade(t *testing.T) {
}
}
func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) (err interface{}) {
func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) (err interface{}) {
defer func() {
err = recover()
}()
MustDetectDowngrade(lg, sv, cv, d)
MustDetectDowngrade(lg, sv, cv)
return err
}

View File

@ -39,10 +39,7 @@ type Server interface {
DowngradeCancel(ctx context.Context) error
GetStorageVersion() *semver.Version
UpdateStorageVersion(semver.Version)
Lock()
Unlock()
UpdateStorageVersion(semver.Version) error
}
func NewMonitor(lg *zap.Logger, storage Server) *Monitor {
@ -61,18 +58,35 @@ func (m *Monitor) UpdateClusterVersionIfNeeded() {
}
}
// decideClusterVersion decides the cluster version based on the members versions if all members agree on a higher one.
// decideClusterVersion decides whether to change cluster version and its next value.
// New cluster version is based on the members versions server and whether cluster is downgrading.
// Returns nil if cluster version should be left unchanged.
func (m *Monitor) decideClusterVersion() *semver.Version {
clusterVersion := m.s.GetClusterVersion()
membersMinimalVersion := m.membersMinimalVersion()
minimalServerVersion := m.membersMinimalServerVersion()
if clusterVersion == nil {
if membersMinimalVersion != nil {
return membersMinimalVersion
if minimalServerVersion != nil {
return minimalServerVersion
}
return semver.New(version.MinClusterVersion)
}
if membersMinimalVersion != nil && clusterVersion.LessThan(*membersMinimalVersion) && IsValidVersionChange(clusterVersion, membersMinimalVersion) {
return membersMinimalVersion
if minimalServerVersion == nil {
return nil
}
downgrade := m.s.GetDowngradeInfo()
if downgrade != nil && downgrade.Enabled {
if IsValidVersionChange(clusterVersion, downgrade.GetTargetVersion()) && IsValidVersionChange(minimalServerVersion, downgrade.GetTargetVersion()) {
return downgrade.GetTargetVersion()
}
m.lg.Error("Cannot downgrade cluster version, version change is not valid",
zap.String("downgrade-version", downgrade.TargetVersion),
zap.String("cluster-version", clusterVersion.String()),
zap.String("minimal-server-version", minimalServerVersion.String()),
)
return nil
}
if clusterVersion.LessThan(*minimalServerVersion) && IsValidVersionChange(clusterVersion, minimalServerVersion) {
return minimalServerVersion
}
return nil
}
@ -83,15 +97,25 @@ func (m *Monitor) UpdateStorageVersionIfNeeded() {
if cv == nil {
return
}
m.s.Lock()
defer m.s.Unlock()
sv := m.s.GetStorageVersion()
if sv == nil || sv.Major != cv.Major || sv.Minor != cv.Minor {
if sv != nil {
m.lg.Info("storage version differs from storage version.", zap.String("cluster-version", cv.String()), zap.String("storage-version", sv.String()))
}
m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor})
err := m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor})
if err != nil {
m.lg.Error("failed update storage version", zap.String("cluster-version", cv.String()), zap.Error(err))
return
}
d := m.s.GetDowngradeInfo()
if d != nil && d.Enabled {
m.lg.Info(
"The server is ready to downgrade",
zap.String("target-version", d.TargetVersion),
zap.String("server-version", version.Version),
)
}
}
}
@ -112,11 +136,11 @@ func (m *Monitor) CancelDowngradeIfNeeded() {
}
}
// membersMinimalVersion returns the min server version in the map, or nil if the min
// membersMinimalServerVersion returns the min server version in the map, or nil if the min
// version in unknown.
// It prints out log if there is a member with a higher version than the
// local version.
func (m *Monitor) membersMinimalVersion() *semver.Version {
func (m *Monitor) membersMinimalServerVersion() *semver.Version {
vers := m.s.GetMembersVersions()
var minV *semver.Version
lv := semver.Must(semver.NewVersion(version.Version))
@ -156,11 +180,12 @@ func (m *Monitor) membersMinimalVersion() *semver.Version {
// It can be used to decide the whether the cluster finishes downgrading to target version.
func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool {
vers := m.s.GetMembersVersions()
targetVersion = &semver.Version{Major: targetVersion.Major, Minor: targetVersion.Minor}
for mid, ver := range vers {
if ver == nil {
return false
}
v, err := semver.NewVersion(ver.Cluster)
v, err := semver.NewVersion(ver.Server)
if err != nil {
m.lg.Warn(
"failed to parse server version of remote member",
@ -170,6 +195,7 @@ func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool {
)
return false
}
v = &semver.Version{Major: v.Major, Minor: v.Minor}
if !targetVersion.Equal(*v) {
m.lg.Warn("remotes server has mismatching etcd version",
zap.String("remote-member-id", mid),

View File

@ -50,7 +50,7 @@ func TestMemberMinimalVersion(t *testing.T) {
monitor := NewMonitor(zaptest.NewLogger(t), &storageMock{
memberVersions: tt.memberVersions,
})
minV := monitor.membersMinimalVersion()
minV := monitor.membersMinimalServerVersion()
if !reflect.DeepEqual(minV, tt.wantVersion) {
t.Errorf("#%d: ver = %+v, want %+v", i, minV, tt.wantVersion)
}
@ -127,7 +127,7 @@ func TestVersionMatchTarget(t *testing.T) {
"When cannot parse peer version",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4"},
"mem1": {Server: "3.4", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
@ -204,6 +204,36 @@ func TestUpdateClusterVersionIfNeeded(t *testing.T) {
clusterVersion: &V3_5,
expectClusterVersion: &V3_6,
},
{
name: "Should downgrade cluster version if downgrade is set to allow older members to join",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.6.0", Server: "3.6.0"},
"b": {Cluster: "3.6.0", Server: "3.6.0"},
},
clusterVersion: &V3_6,
downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
expectClusterVersion: &V3_5,
},
{
name: "Should maintain downgrade target version to allow older members to join",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.5.0", Server: "3.6.0"},
"b": {Cluster: "3.5.0", Server: "3.6.0"},
},
clusterVersion: &V3_5,
downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
expectClusterVersion: &V3_5,
},
{
name: "Don't downgrade below supported range",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.5.0", Server: "3.6.0"},
"b": {Cluster: "3.5.0", Server: "3.6.0"},
},
clusterVersion: &V3_5,
downgrade: &DowngradeInfo{TargetVersion: "3.4.0", Enabled: true},
expectClusterVersion: &V3_5,
},
}
for _, tt := range tests {
@ -247,6 +277,24 @@ func TestCancelDowngradeIfNeeded(t *testing.T) {
"b": {Cluster: "3.6.0", Server: "3.6.2"},
},
},
{
name: "Continue downgrade if just started",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.5.0", Server: "3.6.1"},
"b": {Cluster: "3.5.0", Server: "3.6.2"},
},
downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
expectDowngrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
},
{
name: "Continue downgrade if there is at least one member with not matching",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.5.0", Server: "3.5.1"},
"b": {Cluster: "3.5.0", Server: "3.6.2"},
},
downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
expectDowngrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
},
{
name: "Cancel downgrade if all members have downgraded",
memberVersions: map[string]*version.Versions{
@ -369,17 +417,7 @@ func (s *storageMock) GetStorageVersion() *semver.Version {
return s.storageVersion
}
func (s *storageMock) UpdateStorageVersion(v semver.Version) {
func (s *storageMock) UpdateStorageVersion(v semver.Version) error {
s.storageVersion = &v
}
func (s *storageMock) Lock() {
if s.locked {
panic("Deadlock")
}
s.locked = true
}
func (s *storageMock) Unlock() {
s.locked = false
return nil
}

View File

@ -62,6 +62,68 @@ func TestUpgradeThreeNodes(t *testing.T) {
assert.Equal(t, newCluster(lg, 3, V3_7), c)
}
func TestDowngradeSingleNode(t *testing.T) {
lg := zaptest.NewLogger(t)
c := newCluster(lg, 1, V3_6)
c.StepMonitors()
assert.Equal(t, newCluster(lg, 1, V3_6), c)
assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5))
c.StepMonitors()
assert.Equal(t, V3_5, c.clusterVersion)
c.ReplaceMemberBinary(0, V3_5)
c.StepMonitors()
assert.Equal(t, newCluster(lg, 1, V3_5), c)
}
func TestDowngradeThreeNode(t *testing.T) {
lg := zaptest.NewLogger(t)
c := newCluster(lg, 3, V3_6)
c.StepMonitors()
assert.Equal(t, newCluster(lg, 3, V3_6), c)
assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5))
c.StepMonitors()
assert.Equal(t, V3_5, c.clusterVersion)
c.ReplaceMemberBinary(0, V3_5)
c.StepMonitors()
c.ReplaceMemberBinary(1, V3_5)
c.StepMonitors()
c.ReplaceMemberBinary(2, V3_5)
c.StepMonitors()
assert.Equal(t, newCluster(lg, 3, V3_5), c)
}
func TestNewerMemberCanReconnectDuringDowngrade(t *testing.T) {
lg := zaptest.NewLogger(t)
c := newCluster(lg, 3, V3_6)
c.StepMonitors()
assert.Equal(t, newCluster(lg, 3, V3_6), c)
assert.NoError(t, c.Version().DowngradeEnable(context.Background(), &V3_5))
c.StepMonitors()
assert.Equal(t, V3_5, c.clusterVersion)
c.ReplaceMemberBinary(0, V3_5)
c.StepMonitors()
c.MemberCrashes(2)
c.StepMonitors()
c.MemberReconnects(2)
c.StepMonitors()
c.ReplaceMemberBinary(1, V3_5)
c.StepMonitors()
c.ReplaceMemberBinary(2, V3_5)
c.StepMonitors()
assert.Equal(t, newCluster(lg, 3, V3_5), c)
}
func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMock {
cluster := &clusterMock{
lg: lg,
@ -71,6 +133,7 @@ func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMoc
majorMinVer := semver.Version{Major: ver.Major, Minor: ver.Minor}
for i := 0; i < memberCount; i++ {
m := &memberMock{
isRunning: true,
cluster: cluster,
serverVersion: ver,
storageVersion: majorMinVer,
@ -113,22 +176,34 @@ func (c *clusterMock) Version() *Manager {
func (c *clusterMock) MembersVersions() map[string]*version.Versions {
result := map[string]*version.Versions{}
for i, m := range c.members {
result[fmt.Sprintf("%d", i)] = &version.Versions{
Server: m.serverVersion.String(),
Cluster: c.clusterVersion.String(),
if m.isRunning {
result[fmt.Sprintf("%d", i)] = &version.Versions{
Server: m.serverVersion.String(),
Cluster: c.clusterVersion.String(),
}
}
}
return result
}
func (c *clusterMock) ReplaceMemberBinary(mid int, newServerVersion semver.Version) {
MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion, c.downgradeInfo)
MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion)
c.members[mid].serverVersion = newServerVersion
}
func (c *clusterMock) MemberCrashes(mid int) {
c.members[mid].isRunning = false
}
func (c *clusterMock) MemberReconnects(mid int) {
MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion)
c.members[mid].isRunning = true
}
type memberMock struct {
cluster *clusterMock
isRunning bool
isLeader bool
serverVersion semver.Version
storageVersion semver.Version
@ -174,15 +249,10 @@ func (m *memberMock) GetStorageVersion() *semver.Version {
return &m.storageVersion
}
func (m *memberMock) UpdateStorageVersion(v semver.Version) {
func (m *memberMock) UpdateStorageVersion(v semver.Version) error {
m.storageVersion = v
return nil
}
func (m *memberMock) TriggerSnapshot() {
}
func (m *memberMock) Lock() {
}
func (m *memberMock) Unlock() {
}

View File

@ -15,6 +15,7 @@
package mockstorage
import (
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
@ -57,4 +58,5 @@ func (p *storageRecorder) Sync() error {
return nil
}
func (p *storageRecorder) Close() error { return nil }
func (p *storageRecorder) Close() error { return nil }
func (p *storageRecorder) MinimalEtcdVersion() *semver.Version { return nil }

View File

@ -24,19 +24,7 @@ import (
type migrationPlan []migrationStep
func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (p migrationPlan, err error) {
// TODO(serathius): Implement downgrades
if target.LessThan(current) {
lg.Error("Target version is lower than the current version, downgrades are not yet supported",
zap.String("storage-version", current.String()),
zap.String("target-storage-version", target.String()),
)
return nil, fmt.Errorf("downgrades are not yet supported")
}
return buildPlan(lg, current, target)
}
func buildPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) {
func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) {
current = trimToMinor(current)
target = trimToMinor(target)
if current.Major != target.Major {

View File

@ -46,11 +46,9 @@ func TestNewPlan(t *testing.T) {
target: V3_6,
},
{
name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported",
current: V3_6,
target: V3_5,
expectError: true,
expectErrorMsg: "downgrades are not yet supported",
name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported",
current: V3_6,
target: V3_5,
},
{
name: "Upgrade v3.6 to v3.7 should fail as v3.7 is unknown",

View File

@ -52,15 +52,21 @@ func localBinaryVersion() semver.Version {
return semver.Version{Major: v.Major, Minor: v.Minor}
}
// Migrate updates storage schema to provided target version.
func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
tx.Lock()
defer tx.Unlock()
return UnsafeMigrate(lg, tx, target)
type WALVersion interface {
// MinimalEtcdVersion returns minimal etcd version able to interpret WAL log.
MinimalEtcdVersion() *semver.Version
}
// UnsafeMigrate is non-threadsafe version of Migrate.
func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
// Migrate updates storage schema to provided target version.
// Downgrading requires that provided WAL doesn't contain unsupported entries.
func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error {
tx.Lock()
defer tx.Unlock()
return UnsafeMigrate(lg, tx, w, target)
}
// UnsafeMigrate is non thread-safe version of Migrate.
func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error {
current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot detect storage schema version: %w", err)
@ -69,6 +75,12 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) er
if err != nil {
return fmt.Errorf("cannot create migration plan: %w", err)
}
if target.LessThan(current) {
minVersion := w.MinimalEtcdVersion()
if minVersion != nil && target.LessThan(*minVersion) {
return fmt.Errorf("cannot downgrade storage, WAL contains newer entries")
}
}
return plan.unsafeExecute(lg, tx)
}
@ -101,12 +113,16 @@ func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Vers
func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, error) {
// changes should be taken from higher version
var higherV = v
if isUpgrade {
v = semver.Version{Major: v.Major, Minor: v.Minor + 1}
higherV = semver.Version{Major: v.Major, Minor: v.Minor + 1}
}
actions, found := schemaChanges[v]
actions, found := schemaChanges[higherV]
if !found {
if isUpgrade {
return nil, fmt.Errorf("version %q is not supported", higherV.String())
}
return nil, fmt.Errorf("version %q is not supported", v.String())
}
return actions, nil

View File

@ -15,15 +15,18 @@
package schema
import (
"fmt"
"testing"
"time"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/wal"
waltesting "go.etcd.io/etcd/server/v3/storage/wal/testing"
"go.uber.org/zap"
)
@ -75,7 +78,7 @@ func TestValidate(t *testing.T) {
name: `V3.7 schema is unknown and should return error`,
version: V3_7,
expectError: true,
expectErrorMsg: "downgrades are not yet supported",
expectErrorMsg: `version "3.7.0" is not supported`,
},
}
for _, tc := range tcs {
@ -103,6 +106,7 @@ func TestMigrate(t *testing.T) {
// Overrides which keys should be set (default based on version)
overrideKeys func(tx backend.BatchTx)
targetVersion semver.Version
walEntries []etcdserverpb.InternalRaftRequest
expectVersion *semver.Version
expectError bool
@ -168,33 +172,52 @@ func TestMigrate(t *testing.T) {
targetVersion: V3_6,
expectVersion: &V3_7,
expectError: true,
expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`,
expectErrorMsg: `cannot create migration plan: version "3.7.0" is not supported`,
},
{
name: "Downgrading v3.6 to v3.5 is not supported",
version: V3_6,
targetVersion: V3_5,
name: "Downgrading v3.6 to v3.5 works as there are no v3.6 wal entries",
version: V3_6,
targetVersion: V3_5,
walEntries: []etcdserverpb.InternalRaftRequest{
{Range: &etcdserverpb.RangeRequest{Key: []byte("\x00"), RangeEnd: []byte("\xff")}},
},
expectVersion: nil,
},
{
name: "Downgrading v3.6 to v3.5 fails if there are newer WAL entries",
version: V3_6,
targetVersion: V3_5,
walEntries: []etcdserverpb.InternalRaftRequest{
{ClusterVersionSet: &membershippb.ClusterVersionSetRequest{Ver: "3.6.0"}},
},
expectVersion: &V3_6,
expectError: true,
expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`,
expectErrorMsg: "cannot downgrade storage, WAL contains newer entries",
},
{
name: "Downgrading v3.5 to v3.4 is not supported",
name: "Downgrading v3.5 to v3.4 is not supported as schema was introduced in v3.6",
version: V3_5,
targetVersion: V3_4,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`,
expectErrorMsg: `cannot create migration plan: version "3.5.0" is not supported`,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
dataPath := setupBackendData(t, tc.version, tc.overrideKeys)
w, _ := waltesting.NewTmpWAL(t, tc.walEntries)
defer w.Close()
walVersion, err := wal.ReadWALVersion(w)
if err != nil {
t.Fatal(err)
}
b := backend.NewDefaultBackend(dataPath)
defer b.Close()
err := Migrate(lg, b.BatchTx(), tc.targetVersion)
err = Migrate(lg, b.BatchTx(), walVersion, tc.targetVersion)
if (err != nil) != tc.expectError {
t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError)
}
@ -241,17 +264,29 @@ func TestMigrateIsReversible(t *testing.T) {
tx.Lock()
defer tx.Unlock()
assertBucketState(t, tx, Meta, tc.state)
w, walPath := waltesting.NewTmpWAL(t, nil)
walVersion, err := wal.ReadWALVersion(w)
if err != nil {
t.Fatal(err)
}
// Upgrade to current version
ver := localBinaryVersion()
err := testUnsafeMigrate(lg, tx, ver)
err = UnsafeMigrate(lg, tx, walVersion, ver)
if err != nil {
t.Errorf("Migrate(lg, tx, %q) returned error %+v", ver, err)
}
assert.Equal(t, &ver, UnsafeReadStorageVersion(tx))
// Downgrade back to initial version
err = testUnsafeMigrate(lg, tx, tc.initialVersion)
w.Close()
w = waltesting.Reopen(t, walPath)
defer w.Close()
walVersion, err = wal.ReadWALVersion(w)
if err != nil {
t.Fatal(err)
}
err = UnsafeMigrate(lg, tx, walVersion, tc.initialVersion)
if err != nil {
t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err)
}
@ -262,20 +297,6 @@ func TestMigrateIsReversible(t *testing.T) {
}
}
// Does the same as UnsafeMigrate but skips version checks
// TODO(serathius): Use UnsafeMigrate when downgrades are implemented
func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot determine storage version: %w", err)
}
plan, err := buildPlan(lg, current, target)
if err != nil {
return fmt.Errorf("cannot create migration plan: %w", err)
}
return plan.unsafeExecute(lg, tx)
}
func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx backend.BatchTx)) string {
t.Helper()
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)

View File

@ -15,6 +15,9 @@
package storage
import (
"sync"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/wal"
@ -34,12 +37,17 @@ type Storage interface {
Release(snap raftpb.Snapshot) error
// Sync WAL
Sync() error
// MinimalEtcdVersion returns minimal etcd storage able to interpret WAL log.
MinimalEtcdVersion() *semver.Version
}
type storage struct {
lg *zap.Logger
s *snap.Snapshotter
w *wal.WAL
// Mutex protected variables
mux sync.RWMutex
w *wal.WAL
}
func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage {
@ -48,6 +56,8 @@ func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage {
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
st.mux.RLock()
defer st.mux.RUnlock()
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
@ -69,6 +79,8 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
// - releases the locks to the wal files that are older than the provided wal for the given snap.
// - deletes any .snap.db files that are older than the given snap.
func (st *storage) Release(snap raftpb.Snapshot) error {
st.mux.RLock()
defer st.mux.RUnlock()
if err := st.w.ReleaseLockTo(snap.Metadata.Index); err != nil {
return err
}
@ -76,13 +88,46 @@ func (st *storage) Release(snap raftpb.Snapshot) error {
}
func (st *storage) Save(s raftpb.HardState, ents []raftpb.Entry) error {
st.mux.RLock()
defer st.mux.RUnlock()
return st.w.Save(s, ents)
}
func (st *storage) Close() error {
st.mux.Lock()
defer st.mux.Unlock()
return st.w.Close()
}
func (st *storage) Sync() error {
st.mux.RLock()
defer st.mux.RUnlock()
return st.w.Sync()
}
func (st *storage) MinimalEtcdVersion() *semver.Version {
st.mux.Lock()
defer st.mux.Unlock()
walsnap := walpb.Snapshot{}
sn, err := st.s.Load()
if err != nil && err != snap.ErrNoSnapshot {
panic(err)
}
if sn != nil {
walsnap.Index = sn.Metadata.Index
walsnap.Term = sn.Metadata.Term
walsnap.ConfState = &sn.Metadata.ConfState
}
w, err := st.w.Reopen(st.lg, walsnap)
if err != nil {
panic(err)
}
_, _, ents, err := w.ReadAll()
if err != nil {
panic(err)
}
v := wal.MinimalEtcdVersion(ents)
st.w = w
return v
}

View File

@ -0,0 +1,89 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testing
import (
"io/ioutil"
"path/filepath"
"testing"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.uber.org/zap/zaptest"
)
func NewTmpWAL(t testing.TB, reqs []etcdserverpb.InternalRaftRequest) (*wal.WAL, string) {
t.Helper()
dir, err := ioutil.TempDir(t.TempDir(), "etcd_wal_test")
if err != nil {
panic(err)
}
tmpPath := filepath.Join(dir, "wal")
lg := zaptest.NewLogger(t)
w, err := wal.Create(lg, tmpPath, nil)
if err != nil {
t.Fatalf("Failed to create WAL: %v", err)
}
err = w.Close()
if err != nil {
t.Fatalf("Failed to close WAL: %v", err)
}
if len(reqs) != 0 {
w, err = wal.Open(lg, tmpPath, walpb.Snapshot{})
if err != nil {
t.Fatalf("Failed to open WAL: %v", err)
}
_, state, _, err := w.ReadAll()
if err != nil {
t.Fatalf("Failed to read WAL: %v", err)
}
entries := []raftpb.Entry{}
for _, req := range reqs {
entries = append(entries, raftpb.Entry{
Term: 1,
Index: 1,
Type: raftpb.EntryNormal,
Data: pbutil.MustMarshal(&req),
})
}
err = w.Save(state, entries)
if err != nil {
t.Fatalf("Failed to save WAL: %v", err)
}
err = w.Close()
if err != nil {
t.Fatalf("Failed to close WAL: %v", err)
}
}
w, err = wal.OpenForRead(lg, tmpPath, walpb.Snapshot{})
if err != nil {
t.Fatalf("Failed to open WAL: %v", err)
}
return w, tmpPath
}
func Reopen(t testing.TB, walPath string) *wal.WAL {
t.Helper()
lg := zaptest.NewLogger(t)
w, err := wal.OpenForRead(lg, walPath, walpb.Snapshot{})
if err != nil {
t.Fatalf("Failed to open WAL: %v", err)
}
return w
}

View File

@ -28,14 +28,29 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
)
// MinimalEtcdVersion returns minimal etcd able to interpret provided WAL log,
// determined by looking at entries since the last snapshot and returning the highest
// etcd version annotation from used messages, fields, enums and their values.
func (w *WAL) MinimalEtcdVersion() *semver.Version {
// ReadWALVersion reads remaining entries from opened WAL and returns struct
// that implements schema.WAL interface.
func ReadWALVersion(w *WAL) (*walVersion, error) {
_, _, ents, err := w.ReadAll()
if err != nil {
panic(err)
return nil, err
}
return &walVersion{entries: ents}, nil
}
type walVersion struct {
entries []raftpb.Entry
}
// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log,
func (w *walVersion) MinimalEtcdVersion() *semver.Version {
return MinimalEtcdVersion(w.entries)
}
// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log,
// determined by looking at entries since the last snapshot and returning the highest
// etcd version annotation from used messages, fields, enums and their values.
func MinimalEtcdVersion(ents []raftpb.Entry) *semver.Version {
var maxVer *semver.Version
for _, ent := range ents {
maxVer = maxVersion(maxVer, etcdVersionFromEntry(ent))
@ -51,6 +66,7 @@ func etcdVersionFromEntry(ent raftpb.Entry) *semver.Version {
func etcdVersionFromData(entryType raftpb.EntryType, data []byte) *semver.Version {
var msg protoreflect.Message
var ver *semver.Version
switch entryType {
case raftpb.EntryNormal:
var raftReq etcdserverpb.InternalRaftRequest
@ -59,6 +75,12 @@ func etcdVersionFromData(entryType raftpb.EntryType, data []byte) *semver.Versio
return nil
}
msg = proto.MessageReflect(&raftReq)
if raftReq.ClusterVersionSet != nil {
ver, err = semver.NewVersion(raftReq.ClusterVersionSet.Ver)
if err != nil {
panic(err)
}
}
case raftpb.EntryConfChange:
var confChange raftpb.ConfChange
err := pbutil.Unmarshaler(&confChange).Unmarshal(data)
@ -76,7 +98,7 @@ func etcdVersionFromData(entryType raftpb.EntryType, data []byte) *semver.Versio
default:
panic("unhandled")
}
return etcdVersionFromMessage(msg)
return maxVersion(etcdVersionFromMessage(msg), ver)
}
func etcdVersionFromMessage(m protoreflect.Message) *semver.Version {

View File

@ -40,6 +40,9 @@ func TestEtcdVersionFromEntry(t *testing.T) {
raftReq := etcdserverpb.InternalRaftRequest{Header: &etcdserverpb.RequestHeader{AuthRevision: 1}}
normalRequestData := pbutil.MustMarshal(&raftReq)
clusterVersionV3_6Req := etcdserverpb.InternalRaftRequest{ClusterVersionSet: &membershippb.ClusterVersionSetRequest{Ver: "3.6.0"}}
clusterVersionV3_6Data := pbutil.MustMarshal(&clusterVersionV3_6Req)
confChange := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode}
confChangeData := pbutil.MustMarshal(&confChange)
@ -61,6 +64,16 @@ func TestEtcdVersionFromEntry(t *testing.T) {
},
expect: &V3_1,
},
{
name: "Setting cluster version implies version within",
input: raftpb.Entry{
Term: 1,
Index: 2,
Type: raftpb.EntryNormal,
Data: clusterVersionV3_6Data,
},
expect: &V3_6,
},
{
name: "Using ConfigChange implies v3.4",
input: raftpb.Entry{

View File

@ -234,6 +234,14 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
return w, nil
}
func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) {
err := w.Close()
if err != nil {
lg.Panic("failed to close WAL during reopen", zap.Error(err))
}
return Open(lg, w.dir, snap)
}
func (w *WAL) SetUnsafeNoFsync() {
w.unsafeNoSync = true
}

View File

@ -0,0 +1,144 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
func TestDowngradeUpgrade(t *testing.T) {
currentEtcdBinary := ""
lastReleaseBinary := e2e.BinDir + "/etcd-last-release"
if !fileutil.Exist(lastReleaseBinary) {
t.Skipf("%q does not exist", lastReleaseBinary)
}
currentVersion := semver.New(version.Version)
lastVersion := semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1}
currentVersionStr := fmt.Sprintf("%d.%d", currentVersion.Major, currentVersion.Minor)
lastVersionStr := fmt.Sprintf("%d.%d", lastVersion.Major, lastVersion.Minor)
e2e.BeforeTest(t)
dataDirPath := t.TempDir()
epc := startEtcd(t, currentEtcdBinary, dataDirPath)
validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
downgradeEnable(t, epc, lastVersion)
validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
stopEtcd(t, epc)
epc = startEtcd(t, lastReleaseBinary, dataDirPath)
validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
expectLog(t, epc, "the cluster has been downgraded")
stopEtcd(t, epc)
epc = startEtcd(t, currentEtcdBinary, dataDirPath)
// TODO: Verify cluster version after upgrade when we fix cluster version set timeout
validateVersion(t, epc, version.Versions{Server: currentVersionStr})
}
func startEtcd(t *testing.T, execPath, dataDirPath string) *e2e.EtcdProcessCluster {
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ExecPath: execPath,
DataDirPath: dataDirPath,
ClusterSize: 1,
InitialToken: "new",
KeepDataDir: true,
// TODO: REMOVE snapshot override when snapshotting is automated after lowering storage versiont l
SnapshotCount: 5,
})
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})
prefixArgs := []string{e2e.CtlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ",")}
t.Log("Write keys to ensure wal snapshot is created so cluster version set is snapshotted")
e2e.ExecuteWithTimeout(t, 20*time.Second, func() {
for i := 0; i < 10; i++ {
if err := e2e.SpawnWithExpect(append(prefixArgs, "put", fmt.Sprintf("%d", i), "value"), "OK"); err != nil {
t.Fatal(err)
}
}
})
return epc
}
func downgradeEnable(t *testing.T, epc *e2e.EtcdProcessCluster, ver semver.Version) {
t.Log("etcdctl downgrade...")
c, err := clientv3.New(clientv3.Config{
Endpoints: epc.EndpointsV3(),
})
if err != nil {
t.Fatal(err)
}
defer c.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
// TODO: Fix request always timing out even thou it succeeds
c.Downgrade(ctx, 1, ver.String())
cancel()
expectLog(t, epc, "The server is ready to downgrade")
}
func stopEtcd(t *testing.T, epc *e2e.EtcdProcessCluster) {
t.Log("Stopping the server...")
if err := epc.Procs[0].Stop(); err != nil {
t.Fatal(err)
}
}
func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.Versions) {
t.Log("Validate version")
// Two separate calls to expect as it doesn't support multiple matches on the same line
e2e.ExecuteWithTimeout(t, 20*time.Second, func() {
if expect.Server != "" {
err := e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server)
if err != nil {
t.Fatal(err)
}
}
if expect.Cluster != "" {
err := e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster)
if err != nil {
t.Fatal(err)
}
}
})
}
func expectLog(t *testing.T, epc *e2e.EtcdProcessCluster, expectLog string) {
t.Helper()
e2e.ExecuteWithTimeout(t, 30*time.Second, func() {
_, err := epc.Procs[0].Logs().Expect(expectLog)
if err != nil {
t.Fatal(err)
}
})
}

View File

@ -24,7 +24,6 @@ import (
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
@ -105,7 +104,7 @@ func TestAuthority(t *testing.T) {
t.Fatal(err)
}
executeWithTimeout(t, 5*time.Second, func() {
e2e.ExecuteWithTimeout(t, 5*time.Second, func() {
assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc)
})
})
@ -154,20 +153,6 @@ func firstMatch(t *testing.T, expectLine string, logs ...e2e.LogsExpect) string
return <-match
}
func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) {
donec := make(chan struct{})
go func() {
defer close(donec)
f()
}()
select {
case <-time.After(timeout):
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
case <-donec:
}
}
type etcdctlV3 struct {
cfg *e2e.EtcdProcessClusterConfig
endpoints []string

View File

@ -85,7 +85,7 @@ func TestEtctlutlMigrate(t *testing.T) {
{
name: "Downgrade v3.6 to v3.5 should fail until it's implemented",
targetVersion: "3.5",
expectLogsSubString: "Error: cannot create migration plan: downgrades are not yet supported",
expectLogsSubString: "cannot downgrade storage, WAL contains newer entries",
expectStorageVersion: &schema.V3_6,
},
{

View File

@ -117,3 +117,17 @@ func ToTLS(s string) string {
func SkipInShortMode(t testing.TB) {
testutil.SkipTestIfShortMode(t, "e2e tests are not running in --short mode")
}
func ExecuteWithTimeout(t *testing.T, timeout time.Duration, f func()) {
donec := make(chan struct{})
go func() {
defer close(donec)
f()
}()
select {
case <-time.After(timeout):
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
case <-donec:
}
}

View File

@ -64,6 +64,9 @@ func TestEtcdVersionFromWAL(t *testing.T) {
panic(err)
}
defer w.Close()
ver := w.MinimalEtcdVersion()
assert.Equal(t, &semver.Version{Major: 3, Minor: 5}, ver)
walVersion, err := wal.ReadWALVersion(w)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, &semver.Version{Major: 3, Minor: 6}, walVersion.MinimalEtcdVersion())
}