server: Implement storage downgrades

By validating if WAL doesn't include any incompatible entries we can
implement storage downgrades.
This commit is contained in:
Marek Siarkowicz 2021-10-06 13:55:56 +02:00
parent 335dc98c8d
commit 431adc5878
13 changed files with 264 additions and 65 deletions

View File

@ -26,6 +26,8 @@ import (
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
)
// NewMigrateCommand prints out the version of etcd.
@ -90,12 +92,24 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(dbPath)
walPath := datadir.ToWalDir(o.dataDir)
w, err := wal.OpenForRead(GetLogger(), walPath, walpb.Snapshot{})
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %v`, err)
}
defer w.Close()
c.walVersion, err = wal.ReadWALVersion(w)
if err != nil {
return nil, fmt.Errorf(`failed to read wal: %v`, err)
}
return c, nil
}
type migrateConfig struct {
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
force bool
}
@ -112,7 +126,7 @@ func migrateCommandFunc(c *migrateConfig) error {
lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}
err = schema.Migrate(lg, tx, *c.targetVersion)
err = schema.Migrate(lg, tx, c.walVersion, *c.targetVersion)
if err != nil {
if !c.force {
return err

View File

@ -91,7 +91,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error
s.Lock()
defer s.Unlock()
}
return schema.UnsafeMigrate(s.lg, s.tx, target)
return schema.UnsafeMigrate(s.lg, s.tx, s.r.storage, target)
}
func (s *serverVersionAdapter) Lock() {

View File

@ -15,6 +15,7 @@
package mockstorage
import (
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
@ -57,4 +58,5 @@ func (p *storageRecorder) Sync() error {
return nil
}
func (p *storageRecorder) Close() error { return nil }
func (p *storageRecorder) Close() error { return nil }
func (p *storageRecorder) MinimalEtcdVersion() *semver.Version { return nil }

View File

@ -24,19 +24,7 @@ import (
type migrationPlan []migrationStep
func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (p migrationPlan, err error) {
// TODO(serathius): Implement downgrades
if target.LessThan(current) {
lg.Error("Target version is lower than the current version, downgrades are not yet supported",
zap.String("storage-version", current.String()),
zap.String("target-storage-version", target.String()),
)
return nil, fmt.Errorf("downgrades are not yet supported")
}
return buildPlan(lg, current, target)
}
func buildPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) {
func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (plan migrationPlan, err error) {
current = trimToMinor(current)
target = trimToMinor(target)
if current.Major != target.Major {

View File

@ -46,11 +46,9 @@ func TestNewPlan(t *testing.T) {
target: V3_6,
},
{
name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported",
current: V3_6,
target: V3_5,
expectError: true,
expectErrorMsg: "downgrades are not yet supported",
name: "Downgrade v3.6 to v3.5 should fail as downgrades are not yet supported",
current: V3_6,
target: V3_5,
},
{
name: "Upgrade v3.6 to v3.7 should fail as v3.7 is unknown",

View File

@ -52,15 +52,21 @@ func localBinaryVersion() semver.Version {
return semver.Version{Major: v.Major, Minor: v.Minor}
}
// Migrate updates storage schema to provided target version.
func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
tx.Lock()
defer tx.Unlock()
return UnsafeMigrate(lg, tx, target)
type WALVersion interface {
// MinimalEtcdVersion returns minimal etcd version able to interpret WAL log.
MinimalEtcdVersion() *semver.Version
}
// UnsafeMigrate is non-threadsafe version of Migrate.
func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
// Migrate updates storage schema to provided target version.
// Downgrading requires that provided WAL doesn't contain unsupported entries.
func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error {
tx.Lock()
defer tx.Unlock()
return UnsafeMigrate(lg, tx, w, target)
}
// UnsafeMigrate is non thread-safe version of Migrate.
func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error {
current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot detect storage schema version: %w", err)
@ -69,6 +75,12 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) er
if err != nil {
return fmt.Errorf("cannot create migration plan: %w", err)
}
if target.LessThan(current) {
minVersion := w.MinimalEtcdVersion()
if minVersion != nil && target.LessThan(*minVersion) {
return fmt.Errorf("cannot downgrade storage, WAL contains newer entries")
}
}
return plan.unsafeExecute(lg, tx)
}
@ -101,12 +113,16 @@ func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Vers
func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, error) {
// changes should be taken from higher version
var higherV = v
if isUpgrade {
v = semver.Version{Major: v.Major, Minor: v.Minor + 1}
higherV = semver.Version{Major: v.Major, Minor: v.Minor + 1}
}
actions, found := schemaChanges[v]
actions, found := schemaChanges[higherV]
if !found {
if isUpgrade {
return nil, fmt.Errorf("version %q is not supported", higherV.String())
}
return nil, fmt.Errorf("version %q is not supported", v.String())
}
return actions, nil

View File

@ -15,15 +15,18 @@
package schema
import (
"fmt"
"testing"
"time"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/wal"
waltesting "go.etcd.io/etcd/server/v3/storage/wal/testing"
"go.uber.org/zap"
)
@ -75,7 +78,7 @@ func TestValidate(t *testing.T) {
name: `V3.7 schema is unknown and should return error`,
version: V3_7,
expectError: true,
expectErrorMsg: "downgrades are not yet supported",
expectErrorMsg: `version "3.7.0" is not supported`,
},
}
for _, tc := range tcs {
@ -103,6 +106,7 @@ func TestMigrate(t *testing.T) {
// Overrides which keys should be set (default based on version)
overrideKeys func(tx backend.BatchTx)
targetVersion semver.Version
walEntries []etcdserverpb.InternalRaftRequest
expectVersion *semver.Version
expectError bool
@ -168,33 +172,52 @@ func TestMigrate(t *testing.T) {
targetVersion: V3_6,
expectVersion: &V3_7,
expectError: true,
expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`,
expectErrorMsg: `cannot create migration plan: version "3.7.0" is not supported`,
},
{
name: "Downgrading v3.6 to v3.5 is not supported",
version: V3_6,
targetVersion: V3_5,
name: "Downgrading v3.6 to v3.5 works as there are no v3.6 wal entries",
version: V3_6,
targetVersion: V3_5,
walEntries: []etcdserverpb.InternalRaftRequest{
{Range: &etcdserverpb.RangeRequest{Key: []byte("\x00"), RangeEnd: []byte("\xff")}},
},
expectVersion: nil,
},
{
name: "Downgrading v3.6 to v3.5 fails if there are newer WAL entries",
version: V3_6,
targetVersion: V3_5,
walEntries: []etcdserverpb.InternalRaftRequest{
{ClusterVersionSet: &membershippb.ClusterVersionSetRequest{Ver: "3.6.0"}},
},
expectVersion: &V3_6,
expectError: true,
expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`,
expectErrorMsg: "cannot downgrade storage, WAL contains newer entries",
},
{
name: "Downgrading v3.5 to v3.4 is not supported",
name: "Downgrading v3.5 to v3.4 is not supported as schema was introduced in v3.6",
version: V3_5,
targetVersion: V3_4,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot create migration plan: downgrades are not yet supported`,
expectErrorMsg: `cannot create migration plan: version "3.5.0" is not supported`,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
dataPath := setupBackendData(t, tc.version, tc.overrideKeys)
w, _ := waltesting.NewTmpWAL(t, tc.walEntries)
defer w.Close()
walVersion, err := wal.ReadWALVersion(w)
if err != nil {
t.Fatal(err)
}
b := backend.NewDefaultBackend(dataPath)
defer b.Close()
err := Migrate(lg, b.BatchTx(), tc.targetVersion)
err = Migrate(lg, b.BatchTx(), walVersion, tc.targetVersion)
if (err != nil) != tc.expectError {
t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError)
}
@ -241,17 +264,29 @@ func TestMigrateIsReversible(t *testing.T) {
tx.Lock()
defer tx.Unlock()
assertBucketState(t, tx, Meta, tc.state)
w, walPath := waltesting.NewTmpWAL(t, nil)
walVersion, err := wal.ReadWALVersion(w)
if err != nil {
t.Fatal(err)
}
// Upgrade to current version
ver := localBinaryVersion()
err := testUnsafeMigrate(lg, tx, ver)
err = UnsafeMigrate(lg, tx, walVersion, ver)
if err != nil {
t.Errorf("Migrate(lg, tx, %q) returned error %+v", ver, err)
}
assert.Equal(t, &ver, UnsafeReadStorageVersion(tx))
// Downgrade back to initial version
err = testUnsafeMigrate(lg, tx, tc.initialVersion)
w.Close()
w = waltesting.Reopen(t, walPath)
defer w.Close()
walVersion, err = wal.ReadWALVersion(w)
if err != nil {
t.Fatal(err)
}
err = UnsafeMigrate(lg, tx, walVersion, tc.initialVersion)
if err != nil {
t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err)
}
@ -262,20 +297,6 @@ func TestMigrateIsReversible(t *testing.T) {
}
}
// Does the same as UnsafeMigrate but skips version checks
// TODO(serathius): Use UnsafeMigrate when downgrades are implemented
func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot determine storage version: %w", err)
}
plan, err := buildPlan(lg, current, target)
if err != nil {
return fmt.Errorf("cannot create migration plan: %w", err)
}
return plan.unsafeExecute(lg, tx)
}
func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx backend.BatchTx)) string {
t.Helper()
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)

View File

@ -15,6 +15,9 @@
package storage
import (
"sync"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/wal"
@ -34,12 +37,17 @@ type Storage interface {
Release(snap raftpb.Snapshot) error
// Sync WAL
Sync() error
// MinimalEtcdVersion returns minimal etcd storage able to interpret WAL log.
MinimalEtcdVersion() *semver.Version
}
type storage struct {
lg *zap.Logger
s *snap.Snapshotter
w *wal.WAL
// Mutex protected variables
mux sync.RWMutex
w *wal.WAL
}
func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage {
@ -48,6 +56,8 @@ func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage {
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
st.mux.RLock()
defer st.mux.RUnlock()
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
@ -69,6 +79,8 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
// - releases the locks to the wal files that are older than the provided wal for the given snap.
// - deletes any .snap.db files that are older than the given snap.
func (st *storage) Release(snap raftpb.Snapshot) error {
st.mux.RLock()
defer st.mux.RUnlock()
if err := st.w.ReleaseLockTo(snap.Metadata.Index); err != nil {
return err
}
@ -76,13 +88,46 @@ func (st *storage) Release(snap raftpb.Snapshot) error {
}
func (st *storage) Save(s raftpb.HardState, ents []raftpb.Entry) error {
st.mux.RLock()
defer st.mux.RUnlock()
return st.w.Save(s, ents)
}
func (st *storage) Close() error {
st.mux.Lock()
defer st.mux.Unlock()
return st.w.Close()
}
func (st *storage) Sync() error {
st.mux.RLock()
defer st.mux.RUnlock()
return st.w.Sync()
}
func (st *storage) MinimalEtcdVersion() *semver.Version {
st.mux.Lock()
defer st.mux.Unlock()
walsnap := walpb.Snapshot{}
sn, err := st.s.Load()
if err != nil && err != snap.ErrNoSnapshot {
panic(err)
}
if sn != nil {
walsnap.Index = sn.Metadata.Index
walsnap.Term = sn.Metadata.Term
walsnap.ConfState = &sn.Metadata.ConfState
}
w, err := st.w.Reopen(st.lg, walsnap)
if err != nil {
panic(err)
}
_, _, ents, err := w.ReadAll()
if err != nil {
panic(err)
}
v := wal.MinimalEtcdVersion(ents)
st.w = w
return v
}

View File

@ -0,0 +1,89 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testing
import (
"io/ioutil"
"path/filepath"
"testing"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.uber.org/zap/zaptest"
)
func NewTmpWAL(t testing.TB, reqs []etcdserverpb.InternalRaftRequest) (*wal.WAL, string) {
t.Helper()
dir, err := ioutil.TempDir(t.TempDir(), "etcd_wal_test")
if err != nil {
panic(err)
}
tmpPath := filepath.Join(dir, "wal")
lg := zaptest.NewLogger(t)
w, err := wal.Create(lg, tmpPath, nil)
if err != nil {
t.Fatalf("Failed to create WAL: %v", err)
}
err = w.Close()
if err != nil {
t.Fatalf("Failed to close WAL: %v", err)
}
if len(reqs) != 0 {
w, err = wal.Open(lg, tmpPath, walpb.Snapshot{})
if err != nil {
t.Fatalf("Failed to open WAL: %v", err)
}
_, state, _, err := w.ReadAll()
if err != nil {
t.Fatalf("Failed to read WAL: %v", err)
}
entries := []raftpb.Entry{}
for _, req := range reqs {
entries = append(entries, raftpb.Entry{
Term: 1,
Index: 1,
Type: raftpb.EntryNormal,
Data: pbutil.MustMarshal(&req),
})
}
err = w.Save(state, entries)
if err != nil {
t.Fatalf("Failed to save WAL: %v", err)
}
err = w.Close()
if err != nil {
t.Fatalf("Failed to close WAL: %v", err)
}
}
w, err = wal.OpenForRead(lg, tmpPath, walpb.Snapshot{})
if err != nil {
t.Fatalf("Failed to open WAL: %v", err)
}
return w, tmpPath
}
func Reopen(t testing.TB, walPath string) *wal.WAL {
t.Helper()
lg := zaptest.NewLogger(t)
w, err := wal.OpenForRead(lg, walPath, walpb.Snapshot{})
if err != nil {
t.Fatalf("Failed to open WAL: %v", err)
}
return w
}

View File

@ -28,14 +28,29 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
)
// MinimalEtcdVersion returns minimal etcd able to interpret provided WAL log,
// determined by looking at entries since the last snapshot and returning the highest
// etcd version annotation from used messages, fields, enums and their values.
func (w *WAL) MinimalEtcdVersion() *semver.Version {
// ReadWALVersion reads remaining entries from opened WAL and returns struct
// that implements schema.WAL interface.
func ReadWALVersion(w *WAL) (*walVersion, error) {
_, _, ents, err := w.ReadAll()
if err != nil {
panic(err)
return nil, err
}
return &walVersion{entries: ents}, nil
}
type walVersion struct {
entries []raftpb.Entry
}
// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log,
func (w *walVersion) MinimalEtcdVersion() *semver.Version {
return MinimalEtcdVersion(w.entries)
}
// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log,
// determined by looking at entries since the last snapshot and returning the highest
// etcd version annotation from used messages, fields, enums and their values.
func MinimalEtcdVersion(ents []raftpb.Entry) *semver.Version {
var maxVer *semver.Version
for _, ent := range ents {
maxVer = maxVersion(maxVer, etcdVersionFromEntry(ent))

View File

@ -234,6 +234,14 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
return w, nil
}
func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) {
err := w.Close()
if err != nil {
lg.Panic("failed to close WAL during reopen", zap.Error(err))
}
return Open(lg, w.dir, snap)
}
func (w *WAL) SetUnsafeNoFsync() {
w.unsafeNoSync = true
}

View File

@ -85,7 +85,7 @@ func TestEtctlutlMigrate(t *testing.T) {
{
name: "Downgrade v3.6 to v3.5 should fail until it's implemented",
targetVersion: "3.5",
expectLogsSubString: "Error: cannot create migration plan: downgrades are not yet supported",
expectLogsSubString: "cannot downgrade storage, WAL contains newer entries",
expectStorageVersion: &schema.V3_6,
},
{

View File

@ -64,6 +64,9 @@ func TestEtcdVersionFromWAL(t *testing.T) {
panic(err)
}
defer w.Close()
ver := w.MinimalEtcdVersion()
assert.Equal(t, &semver.Version{Major: 3, Minor: 6}, ver)
walVersion, err := wal.ReadWALVersion(w)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, &semver.Version{Major: 3, Minor: 6}, walVersion.MinimalEtcdVersion())
}