diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 9c429e716..e9bac4f37 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -26,6 +26,8 @@ import ( "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/server/v3/storage/schema" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) // NewMigrateCommand prints out the version of etcd. @@ -90,12 +92,24 @@ func (o *migrateOptions) Config() (*migrateConfig, error) { dbPath := datadir.ToBackendFileName(o.dataDir) c.be = backend.NewDefaultBackend(dbPath) + walPath := datadir.ToWalDir(o.dataDir) + w, err := wal.OpenForRead(GetLogger(), walPath, walpb.Snapshot{}) + if err != nil { + return nil, fmt.Errorf(`failed to open wal: %v`, err) + } + defer w.Close() + c.walVersion, err = wal.ReadWALVersion(w) + if err != nil { + return nil, fmt.Errorf(`failed to read wal: %v`, err) + } + return c, nil } type migrateConfig struct { be backend.Backend targetVersion *semver.Version + walVersion schema.WALVersion force bool } @@ -112,7 +126,7 @@ func migrateCommandFunc(c *migrateConfig) error { lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(¤t))) return nil } - err = schema.Migrate(lg, tx, *c.targetVersion) + err = schema.Migrate(lg, tx, c.walVersion, *c.targetVersion) if err != nil { if !c.force { return err diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 5d9580735..1def9b69e 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -91,7 +91,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error s.Lock() defer s.Unlock() } - return schema.UnsafeMigrate(s.lg, s.tx, target) + return schema.UnsafeMigrate(s.lg, s.tx, s.r.storage, target) } func (s *serverVersionAdapter) Lock() { diff --git a/server/mock/mockstorage/storage_recorder.go b/server/mock/mockstorage/storage_recorder.go index db989cd2c..73cad169f 100644 --- a/server/mock/mockstorage/storage_recorder.go +++ b/server/mock/mockstorage/storage_recorder.go @@ -15,6 +15,7 @@ package mockstorage import ( + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" @@ -57,4 +58,5 @@ func (p *storageRecorder) Sync() error { return nil } -func (p *storageRecorder) Close() error { return nil } +func (p *storageRecorder) Close() error { return nil } +func (p *storageRecorder) MinimalEtcdVersion() *semver.Version { return nil } diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index bac67bec5..c8c4a701b 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -24,19 +24,7 @@ import ( type migrationPlan []migrationStep -func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (p migrationPlan, err error) { - // TODO(serathius): Implement downgrades - if target.LessThan(current) { - lg.Error("Target version is lower than the current version, downgrades are not yet supported", - zap.String("storage-version", current.String()), - zap.String("target-storage-version", target.String()), - ) - return nil, fmt.Errorf("downgrades are not yet supported") - } - return buildPlan(lg, current, target) -} - -func buildPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) { +func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) { current = trimToMinor(current) target = trimToMinor(target) if current.Major != target.Major { diff --git a/server/storage/schema/migration_test.go b/server/storage/schema/migration_test.go index d81a6208d..95e4d9739 100644 --- a/server/storage/schema/migration_test.go +++ b/server/storage/schema/migration_test.go @@ -46,11 +46,9 @@ func TestNewPlan(t *testing.T) { target: V3_6, }, { - name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported", - current: V3_6, - target: V3_5, - expectError: true, - expectErrorMsg: "downgrades are not yet supported", + name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported", + current: V3_6, + target: V3_5, }, { name: "Upgrade v3.6 to v3.7 should fail as v3.7 is unknown", diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 198b90f3e..f3b29c011 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -52,15 +52,21 @@ func localBinaryVersion() semver.Version { return semver.Version{Major: v.Major, Minor: v.Minor} } -// Migrate updates storage schema to provided target version. -func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { - tx.Lock() - defer tx.Unlock() - return UnsafeMigrate(lg, tx, target) +type WALVersion interface { + // MinimalEtcdVersion returns minimal etcd version able to interpret WAL log. + MinimalEtcdVersion() *semver.Version } -// UnsafeMigrate is non-threadsafe version of Migrate. -func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { +// Migrate updates storage schema to provided target version. +// Downgrading requires that provided WAL doesn't contain unsupported entries. +func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { + tx.Lock() + defer tx.Unlock() + return UnsafeMigrate(lg, tx, w, target) +} + +// UnsafeMigrate is non thread-safe version of Migrate. +func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { return fmt.Errorf("cannot detect storage schema version: %w", err) @@ -69,6 +75,12 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) er if err != nil { return fmt.Errorf("cannot create migration plan: %w", err) } + if target.LessThan(current) { + minVersion := w.MinimalEtcdVersion() + if minVersion != nil && target.LessThan(*minVersion) { + return fmt.Errorf("cannot downgrade storage, WAL contains newer entries") + } + } return plan.unsafeExecute(lg, tx) } @@ -101,12 +113,16 @@ func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Vers func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, error) { // changes should be taken from higher version + var higherV = v if isUpgrade { - v = semver.Version{Major: v.Major, Minor: v.Minor + 1} + higherV = semver.Version{Major: v.Major, Minor: v.Minor + 1} } - actions, found := schemaChanges[v] + actions, found := schemaChanges[higherV] if !found { + if isUpgrade { + return nil, fmt.Errorf("version %q is not supported", higherV.String()) + } return nil, fmt.Errorf("version %q is not supported", v.String()) } return actions, nil diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index 823400760..f0fb943b4 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -15,15 +15,18 @@ package schema import ( - "fmt" "testing" "time" "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" + "go.etcd.io/etcd/server/v3/storage/wal" + waltesting "go.etcd.io/etcd/server/v3/storage/wal/testing" "go.uber.org/zap" ) @@ -75,7 +78,7 @@ func TestValidate(t *testing.T) { name: `V3.7 schema is unknown and should return error`, version: V3_7, expectError: true, - expectErrorMsg: "downgrades are not yet supported", + expectErrorMsg: `version "3.7.0" is not supported`, }, } for _, tc := range tcs { @@ -103,6 +106,7 @@ func TestMigrate(t *testing.T) { // Overrides which keys should be set (default based on version) overrideKeys func(tx backend.BatchTx) targetVersion semver.Version + walEntries []etcdserverpb.InternalRaftRequest expectVersion *semver.Version expectError bool @@ -168,33 +172,52 @@ func TestMigrate(t *testing.T) { targetVersion: V3_6, expectVersion: &V3_7, expectError: true, - expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + expectErrorMsg: `cannot create migration plan: version "3.7.0" is not supported`, }, { - name: "Downgrading v3.6 to v3.5 is not supported", - version: V3_6, - targetVersion: V3_5, + name: "Downgrading v3.6 to v3.5 works as there are no v3.6 wal entries", + version: V3_6, + targetVersion: V3_5, + walEntries: []etcdserverpb.InternalRaftRequest{ + {Range: &etcdserverpb.RangeRequest{Key: []byte("\x00"), RangeEnd: []byte("\xff")}}, + }, + expectVersion: nil, + }, + { + name: "Downgrading v3.6 to v3.5 fails if there are newer WAL entries", + version: V3_6, + targetVersion: V3_5, + walEntries: []etcdserverpb.InternalRaftRequest{ + {ClusterVersionSet: &membershippb.ClusterVersionSetRequest{Ver: "3.6.0"}}, + }, expectVersion: &V3_6, expectError: true, - expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + expectErrorMsg: "cannot downgrade storage, WAL contains newer entries", }, { - name: "Downgrading v3.5 to v3.4 is not supported", + name: "Downgrading v3.5 to v3.4 is not supported as schema was introduced in v3.6", version: V3_5, targetVersion: V3_4, expectVersion: nil, expectError: true, - expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`, + expectErrorMsg: `cannot create migration plan: version "3.5.0" is not supported`, }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { lg := zap.NewNop() dataPath := setupBackendData(t, tc.version, tc.overrideKeys) + w, _ := waltesting.NewTmpWAL(t, tc.walEntries) + defer w.Close() + walVersion, err := wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } b := backend.NewDefaultBackend(dataPath) defer b.Close() - err := Migrate(lg, b.BatchTx(), tc.targetVersion) + + err = Migrate(lg, b.BatchTx(), walVersion, tc.targetVersion) if (err != nil) != tc.expectError { t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError) } @@ -241,17 +264,29 @@ func TestMigrateIsReversible(t *testing.T) { tx.Lock() defer tx.Unlock() assertBucketState(t, tx, Meta, tc.state) + w, walPath := waltesting.NewTmpWAL(t, nil) + walVersion, err := wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } // Upgrade to current version ver := localBinaryVersion() - err := testUnsafeMigrate(lg, tx, ver) + err = UnsafeMigrate(lg, tx, walVersion, ver) if err != nil { t.Errorf("Migrate(lg, tx, %q) returned error %+v", ver, err) } assert.Equal(t, &ver, UnsafeReadStorageVersion(tx)) // Downgrade back to initial version - err = testUnsafeMigrate(lg, tx, tc.initialVersion) + w.Close() + w = waltesting.Reopen(t, walPath) + defer w.Close() + walVersion, err = wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } + err = UnsafeMigrate(lg, tx, walVersion, tc.initialVersion) if err != nil { t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err) } @@ -262,20 +297,6 @@ func TestMigrateIsReversible(t *testing.T) { } } -// Does the same as UnsafeMigrate but skips version checks -// TODO(serathius): Use UnsafeMigrate when downgrades are implemented -func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { - current, err := UnsafeDetectSchemaVersion(lg, tx) - if err != nil { - return fmt.Errorf("cannot determine storage version: %w", err) - } - plan, err := buildPlan(lg, current, target) - if err != nil { - return fmt.Errorf("cannot create migration plan: %w", err) - } - return plan.unsafeExecute(lg, tx) -} - func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx backend.BatchTx)) string { t.Helper() be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) diff --git a/server/storage/storage.go b/server/storage/storage.go index 047d1bb02..9207e1e4d 100644 --- a/server/storage/storage.go +++ b/server/storage/storage.go @@ -15,6 +15,9 @@ package storage import ( + "sync" + + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/storage/wal" @@ -34,12 +37,17 @@ type Storage interface { Release(snap raftpb.Snapshot) error // Sync WAL Sync() error + // MinimalEtcdVersion returns minimal etcd storage able to interpret WAL log. + MinimalEtcdVersion() *semver.Version } type storage struct { lg *zap.Logger s *snap.Snapshotter - w *wal.WAL + + // Mutex protected variables + mux sync.RWMutex + w *wal.WAL } func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage { @@ -48,6 +56,8 @@ func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage { // SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry. func (st *storage) SaveSnap(snap raftpb.Snapshot) error { + st.mux.RLock() + defer st.mux.RUnlock() walsnap := walpb.Snapshot{ Index: snap.Metadata.Index, Term: snap.Metadata.Term, @@ -69,6 +79,8 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { // - releases the locks to the wal files that are older than the provided wal for the given snap. // - deletes any .snap.db files that are older than the given snap. func (st *storage) Release(snap raftpb.Snapshot) error { + st.mux.RLock() + defer st.mux.RUnlock() if err := st.w.ReleaseLockTo(snap.Metadata.Index); err != nil { return err } @@ -76,13 +88,46 @@ func (st *storage) Release(snap raftpb.Snapshot) error { } func (st *storage) Save(s raftpb.HardState, ents []raftpb.Entry) error { + st.mux.RLock() + defer st.mux.RUnlock() return st.w.Save(s, ents) } func (st *storage) Close() error { + st.mux.Lock() + defer st.mux.Unlock() return st.w.Close() } func (st *storage) Sync() error { + st.mux.RLock() + defer st.mux.RUnlock() return st.w.Sync() } + +func (st *storage) MinimalEtcdVersion() *semver.Version { + st.mux.Lock() + defer st.mux.Unlock() + walsnap := walpb.Snapshot{} + + sn, err := st.s.Load() + if err != nil && err != snap.ErrNoSnapshot { + panic(err) + } + if sn != nil { + walsnap.Index = sn.Metadata.Index + walsnap.Term = sn.Metadata.Term + walsnap.ConfState = &sn.Metadata.ConfState + } + w, err := st.w.Reopen(st.lg, walsnap) + if err != nil { + panic(err) + } + _, _, ents, err := w.ReadAll() + if err != nil { + panic(err) + } + v := wal.MinimalEtcdVersion(ents) + st.w = w + return v +} diff --git a/server/storage/wal/testing/waltesting.go b/server/storage/wal/testing/waltesting.go new file mode 100644 index 000000000..7ba279df6 --- /dev/null +++ b/server/storage/wal/testing/waltesting.go @@ -0,0 +1,89 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testing + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.uber.org/zap/zaptest" +) + +func NewTmpWAL(t testing.TB, reqs []etcdserverpb.InternalRaftRequest) (*wal.WAL, string) { + t.Helper() + dir, err := ioutil.TempDir(t.TempDir(), "etcd_wal_test") + if err != nil { + panic(err) + } + tmpPath := filepath.Join(dir, "wal") + lg := zaptest.NewLogger(t) + w, err := wal.Create(lg, tmpPath, nil) + if err != nil { + t.Fatalf("Failed to create WAL: %v", err) + } + err = w.Close() + if err != nil { + t.Fatalf("Failed to close WAL: %v", err) + } + if len(reqs) != 0 { + w, err = wal.Open(lg, tmpPath, walpb.Snapshot{}) + if err != nil { + t.Fatalf("Failed to open WAL: %v", err) + } + _, state, _, err := w.ReadAll() + if err != nil { + t.Fatalf("Failed to read WAL: %v", err) + } + entries := []raftpb.Entry{} + for _, req := range reqs { + entries = append(entries, raftpb.Entry{ + Term: 1, + Index: 1, + Type: raftpb.EntryNormal, + Data: pbutil.MustMarshal(&req), + }) + } + err = w.Save(state, entries) + if err != nil { + t.Fatalf("Failed to save WAL: %v", err) + } + err = w.Close() + if err != nil { + t.Fatalf("Failed to close WAL: %v", err) + } + } + + w, err = wal.OpenForRead(lg, tmpPath, walpb.Snapshot{}) + if err != nil { + t.Fatalf("Failed to open WAL: %v", err) + } + return w, tmpPath +} + +func Reopen(t testing.TB, walPath string) *wal.WAL { + t.Helper() + lg := zaptest.NewLogger(t) + w, err := wal.OpenForRead(lg, walPath, walpb.Snapshot{}) + if err != nil { + t.Fatalf("Failed to open WAL: %v", err) + } + return w +} diff --git a/server/storage/wal/version.go b/server/storage/wal/version.go index b7b7bc5d7..6a4903b98 100644 --- a/server/storage/wal/version.go +++ b/server/storage/wal/version.go @@ -28,14 +28,29 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" ) -// MinimalEtcdVersion returns minimal etcd able to interpret provided WAL log, -// determined by looking at entries since the last snapshot and returning the highest -// etcd version annotation from used messages, fields, enums and their values. -func (w *WAL) MinimalEtcdVersion() *semver.Version { +// ReadWALVersion reads remaining entries from opened WAL and returns struct +// that implements schema.WAL interface. +func ReadWALVersion(w *WAL) (*walVersion, error) { _, _, ents, err := w.ReadAll() if err != nil { - panic(err) + return nil, err } + return &walVersion{entries: ents}, nil +} + +type walVersion struct { + entries []raftpb.Entry +} + +// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log, +func (w *walVersion) MinimalEtcdVersion() *semver.Version { + return MinimalEtcdVersion(w.entries) +} + +// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log, +// determined by looking at entries since the last snapshot and returning the highest +// etcd version annotation from used messages, fields, enums and their values. +func MinimalEtcdVersion(ents []raftpb.Entry) *semver.Version { var maxVer *semver.Version for _, ent := range ents { maxVer = maxVersion(maxVer, etcdVersionFromEntry(ent)) diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index a24c9d406..187cfe397 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -234,6 +234,14 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { return w, nil } +func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) { + err := w.Close() + if err != nil { + lg.Panic("failed to close WAL during reopen", zap.Error(err)) + } + return Open(lg, w.dir, snap) +} + func (w *WAL) SetUnsafeNoFsync() { w.unsafeNoSync = true } diff --git a/tests/e2e/utl_migrate_test.go b/tests/e2e/utl_migrate_test.go index efc97f7f4..7623fc1b8 100644 --- a/tests/e2e/utl_migrate_test.go +++ b/tests/e2e/utl_migrate_test.go @@ -85,7 +85,7 @@ func TestEtctlutlMigrate(t *testing.T) { { name: "Downgrade v3.6 to v3.5 should fail until it's implemented", targetVersion: "3.5", - expectLogsSubString: "Error: cannot create migration plan: downgrades are not yet supported", + expectLogsSubString: "cannot downgrade storage, WAL contains newer entries", expectStorageVersion: &schema.V3_6, }, { diff --git a/tests/integration/utl_wal_version_test.go b/tests/integration/utl_wal_version_test.go index 3ffbd46ef..e5318c664 100644 --- a/tests/integration/utl_wal_version_test.go +++ b/tests/integration/utl_wal_version_test.go @@ -64,6 +64,9 @@ func TestEtcdVersionFromWAL(t *testing.T) { panic(err) } defer w.Close() - ver := w.MinimalEtcdVersion() - assert.Equal(t, &semver.Version{Major: 3, Minor: 6}, ver) + walVersion, err := wal.ReadWALVersion(w) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, &semver.Version{Major: 3, Minor: 6}, walVersion.MinimalEtcdVersion()) }